Source code for privex.db.sqlite

"""
This module holds :class:`.SqliteWrapper` - a somewhat higher level class for interacting with SQLite3 databases.

**Copyright**::

        +===================================================+
        |                 © 2019 Privex Inc.                |
        |               https://www.privex.io               |
        +===================================================+
        |                                                   |
        |        Privex's Python Database Library           |
        |        License: X11 / MIT                         |
        |                                                   |
        |        Originally Developed by Privex Inc.        |
        |        Core Developer(s):                         |
        |                                                   |
        |          (+)  Chris (@someguy123) [Privex]        |
        |                                                   |
        +===================================================+

    Copyright (c) 2019     Privex Inc.   ( https://www.privex.io )


"""
import asyncio
import os
import sqlite3
import logging
import warnings
from os.path import expanduser, join, dirname, isabs
from typing import List, Tuple, Optional, Any, Union, Set, Iterable

from async_property import async_property
from privex.helpers import empty, DictObject, is_namedtuple, empty_if

from privex.db.base import GenericDBWrapper, GenericAsyncDBWrapper, _should_zip, cursor_to_dict, DBExecution
from privex.db.query.sqlite import SqliteQueryBuilder
from privex.db.query import SqliteAsyncQueryBuilder
from privex.db.types import GenericAsyncCursor

log = logging.getLogger(__name__)


def parse_db_args(ins, db=None, memory_persist=False, connection_kwargs=None, default_kwargs=None):
    connection_kwargs = empty_if(connection_kwargs, {})
    default_conn_kwargs = empty_if(
        default_kwargs, dict(isolation_level=ins.isolation_level, timeout=ins.db_timeout)
    )
    
    db = 'file::memory:?cache=shared' if memory_persist else empty_if(db, ins.DEFAULT_DB)
    
    if ':memory:' not in db:
        db_folder = dirname(db)
        if not isabs(db):
            log.debug("Passed 'db' argument isn't absolute: %s", db)
            db = join(ins.DEFAULT_DB_FOLDER, db)
            log.debug("Prepended DEFAULT_DB_FOLDER to 'db' argument: %s", db)
            db_folder = dirname(db)
        
        if not os.path.exists(db_folder):
            log.debug("Database folder '%s' doesn't exist. Creating it + any missing parent folders", db_folder)
            os.makedirs(db_folder)
    else:
        log.debug("Passed 'db' argument is %s - using in-memory sqlite3 database.", db)
        if 'file:' in db:
            default_conn_kwargs['uri'] = True
    
    return db, {**default_conn_kwargs, **connection_kwargs}


[docs]class SqliteWrapper(GenericDBWrapper): """ Lightweight wrapper class for interacting with Sqlite3 databases. **Simple direct class usage** >>> db_path = expanduser('~/.my_app/my_db.db') >>> db = SqliteWrapper(db=db_path) >>> users = db.fetchall("SELECT * FROM users;") **Usage** Below is an example wrapper class which uses :class:`.SqliteWrapper` as it's parent class. It overrides the class attributes :py:attr:`.DEFAULT_DB_FOLDER`, :py:attr:`.DEFAULT_DB_NAME`, and :py:attr:`.DEFAULT_DB` - so that if no database path is passed to ``MyManager``, then the database file path contained in ``MyManager.DEFAULT_DB`` will be used as a default. It also overrides :py:attr:`.SCHEMAS` to define 2 tables (``users`` and ``items``) which will be automatically created when the class is instantiated, unless they already exist. It adds two methods ``get_items`` (returns an iterator .. code-block:: python class MyManager(SqliteWrapper): ### # If a database path isn't specified, then the class attribute DEFAULT_DB will be used. ### DEFAULT_DB_FOLDER: str = expanduser('~/.my_app') DEFAULT_DB_NAME: str = 'my_app.db' DEFAULT_DB: str = join(DEFAULT_DB_FOLDER, DEFAULT_DB_NAME) ### # The SCHEMAS class attribute contains a list of tuples, with each tuple containing the name of a # table, as well as the SQL query required to create the table if it doesn't exist. ### SCHEMAS: List[Tuple[str, str]] = [ ('users', "CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT);"), ('items', "CREATE TABLE items (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT);"), ] def get_items(self): # This is an example of a helper method you might want to define, which simply calls # self.fetchall with a pre-defined SQL query return self.fetchall("SELECT * FROM items;") def find_item(self, id: int): # This is an example of a helper method you might want to define, which simply calls # self.fetchone with a pre-defined SQL query, and interpolates the 'id' parameter into # the prepared statement. return self.fetchone("SELECT * FROM items WHERE id = ?;", [id]); """ DEFAULT_DB_FOLDER: str = expanduser('~/.privex_sqlite') """If an absolute path isn't given, store the sqlite3 database file in this folder""" DEFAULT_DB_NAME: str = 'privex_sqlite.db' """If no database is specified to :meth:`.__init__`, then use this (appended to :py:attr:`.DEFAULT_DB_FOLDER`)""" DEFAULT_DB: str = join(DEFAULT_DB_FOLDER, DEFAULT_DB_NAME) """ Combined :py:attr:`.DEFAULT_DB_FOLDER` and :py:attr:`.DEFAULT_DB_NAME` used as default absolute path for the sqlite3 database """ DEFAULT_TABLE_QUERY = "SELECT count(name) as table_count FROM sqlite_master WHERE type = 'table' AND name = ?" DEFAULT_TABLE_LIST_QUERY = "SELECT name FROM sqlite_master WHERE type = 'table'" db: str """Path to the SQLite3 database for this class instance""" _conn: Optional[sqlite3.Connection] """Instance variable which holds the current SQLite3 connection object""" _builder: Optional[SqliteQueryBuilder]
[docs] def __init__(self, db: str = None, isolation_level=None, **kwargs): """ :param str db: Relative / absolute path to SQLite3 database file to use. :param isolation_level: Isolation level for SQLite3 connection. Defaults to ``None`` (autocommit). See the `Python SQLite3 Docs`_ for more information. :key int db_timeout: Amount of time to wait for any SQLite3 locks to expire before giving up :key str query_mode: Either ``'flat'`` (query returns tuples) or ``'dict'`` (query returns dicts). More details in PyDoc block under :py:attr:`.query_mode` :key bool memory_persist: Use a shared in-memory database, which can be accessed by other instances of this class (in this process) - which is cleared after all memory connections are closed. Shortcut for ``db='file::memory:?cache=shared'`` .. _Python SQLite3 Docs: https://docs.python.org/3.8/library/sqlite3.html#sqlite3.Connection.isolation_level """ self.isolation_level = isolation_level self.db_timeout = int(kwargs.pop('db_timeout', 30)) self.query_mode = kwargs.pop('query_mode', 'dict') self._conn = None self._builder = None memory_persist = kwargs.pop('memory_persist', False) db, conn_kwargs = parse_db_args( self, db, memory_persist=memory_persist, connection_kwargs=kwargs.pop('connection_kwargs', {}) ) self.db = db # conn_kwargs = {**default_conn_kwargs, **kwargs.pop('connection_kwargs', {})} super().__init__( db=db, connector_func=sqlite3.connect, connector_args=[db], query_mode=self.query_mode, connector_kwargs=conn_kwargs, **kwargs )
# make_connection: sqlite3.Connection # noinspection PyTypeChecker @property def conn(self) -> sqlite3.Connection: """Get or create an SQLite3 connection using DB file :py:attr:`.db` and return it""" return super().conn # type: sqlite3.Connection def _get_cursor(self, cursor_name=None, cursor_class=None, *args, **kwargs): return super()._get_cursor(cursor_name=cursor_name, cursor_class=cursor_class, *args, **kwargs) # noinspection PyTypeChecker
[docs] def builder(self, table: str) -> SqliteQueryBuilder: return SqliteQueryBuilder(table=table, connection=self.conn)
# noinspection PyTypeChecker
[docs] def insert(self, _table: str, _cursor: sqlite3.Cursor = None, **fields) -> Union[DictObject, sqlite3.Cursor]: return super().insert(_table, _cursor, **fields)
def __enter__(self): self._conn = self.conn return self def __exit__(self, exc_type, exc_val, exc_tb): if self._conn is not None: self._conn.close() self._conn = None
try: import aiosqlite
[docs] class SqliteAsyncWrapper(GenericAsyncDBWrapper): """ **Usage** Creating an instance:: >>> from privex.db import SqliteAsyncWrapper >>> db = SqliteAsyncWrapper('my_app.db') Inserting rows:: >>> db.insert('users', first_name='John', last_name='Doe') >>> db.insert('users', first_name='Dave', last_name='Johnson') Running raw queries:: >>> # fetchone() allows you to run a raw query, and a dict is returned with the first row result >>> row = await db.fetchone("SELECT * FROM users WHERE first_name = ?;", ['John']) >>> row['first_name'] John >>> row['last_name'] Doe >>> # fetchall() runs a query and returns an iterator of the returned rows >>> rows = await db.fetchall("SELECT * FROM users;") >>> for user in rows: ... print(f"First Name: {row['first_name']} || Last Name: {row['last_name']}") ... First Name: John || Last Name: Doe First Name: Dave || Last Name: Johnson >>> # action() is for running queries where you don't want to fetch any results. It simply returns the >>> # affected row count as an integer. >>> row_count = await db.action('UPDATE users SET first_name = ? WHERE id = ?;', ['David', 2]) >>> print(row_count) 1 Creating tables if they don't already exist:: >>> # If the table 'users' doesn't exist, the CREATE TABLE query will be executed. >>> await db.create_schema( ... 'users', ... "CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, first_name TEXT, last_name TEXT);" ... ) >>> Using the query builder:: >>> # You can either use it directly >>> q = db.builder('users') >>> q.select('first_name', 'last_name').where('first_name', 'John').where_or('last_name', 'Doe') >>> results = q.all() >>> async for row in results: ... print(f"First Name: {row['first_name']} || Last Name: {row['last_name']}") ... First Name: John || Last Name: Doe >>> # Or, you can use it in a ``with`` statement to maintain a singular connection, which means you >>> # can use .fetch_next to fetch a singular row at a time (you can still use .all() and .fetch()) >>> async with db.builder('users') as q: ... q.select('first_name', 'last_name') ... row = q.fetch_next() ... print('Name:', row['first_name'], row['last_name']) # John Doe ... row = q.fetch_next() ... print('Name:', row['first_name'], row['last_name']) # Dave Johnson ... Name: John Doe Name: Dave Johnson Creating a wrapper sub-class of SqliteAsyncWrapper: .. code-block:: python class MyManager(SqliteAsyncWrapper): ### # If a database path isn't specified, then the class attribute DEFAULT_DB will be used. ### DEFAULT_DB_FOLDER: str = expanduser('~/.my_app') DEFAULT_DB_NAME: str = 'my_app.db' DEFAULT_DB: str = join(DEFAULT_DB_FOLDER, DEFAULT_DB_NAME) ### # The SCHEMAS class attribute contains a list of tuples, with each tuple containing the name of a # table, as well as the SQL query required to create the table if it doesn't exist. ### SCHEMAS: List[Tuple[str, str]] = [ ('users', "CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT);"), ('items', "CREATE TABLE items (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT);"), ] async def get_items(self): # This is an example of a helper method you might want to define, which simply calls # self.fetchall with a pre-defined SQL query return await self.fetchall("SELECT * FROM items;") async def find_item(self, id: int): # This is an example of a helper method you might want to define, which simply calls # self.fetchone with a pre-defined SQL query, and interpolates the 'id' parameter into # the prepared statement. return await self.fetchone("SELECT * FROM items WHERE id = ?;", [id]); """ AIO_CUR = aiosqlite.Cursor DEFAULT_DB_FOLDER: str = expanduser('~/.privex_sqlite') """If an absolute path isn't given, store the sqlite3 database file in this folder""" DEFAULT_DB_NAME: str = 'privex_sqlite.db' """If no database is specified to :meth:`.__init__`, then use this (appended to :py:attr:`.DEFAULT_DB_FOLDER`)""" DEFAULT_DB: str = join(DEFAULT_DB_FOLDER, DEFAULT_DB_NAME) """ Combined :py:attr:`.DEFAULT_DB_FOLDER` and :py:attr:`.DEFAULT_DB_NAME` used as default absolute path for the sqlite3 database """ DEFAULT_TABLE_QUERY = "SELECT count(name) as table_count FROM sqlite_master WHERE type = 'table' AND name = ?" DEFAULT_TABLE_LIST_QUERY = "SELECT name FROM sqlite_master WHERE type = 'table'" db: str """Path to the SQLite3 database for this class instance""" _conn: Optional[aiosqlite.Connection] """Instance variable which holds the current SQLite3 connection object""" _builder: Optional[SqliteAsyncQueryBuilder] def __init__(self, db: str = None, isolation_level=None, **kwargs): """ :param str db: Relative / absolute path to SQLite3 database file to use. :param isolation_level: Isolation level for SQLite3 connection. Defaults to ``None`` (autocommit). See the `Python SQLite3 Docs`_ for more information. :key int db_timeout: Amount of time to wait for any SQLite3 locks to expire before giving up :key str query_mode: Either ``'flat'`` (query returns tuples) or ``'dict'`` (query returns dicts). More details in PyDoc block under :py:attr:`.query_mode` .. _Python SQLite3 Docs: https://docs.python.org/3.8/library/sqlite3.html#sqlite3.Connection.isolation_level """ self.isolation_level = isolation_level self.db_timeout = int(kwargs.pop('db_timeout', 30)) self.query_mode = kwargs.pop('query_mode', 'dict') self._conn = None self._builder = None memory_persist = kwargs.pop('memory_persist', False) db, conn_kwargs = parse_db_args( self, db, memory_persist=memory_persist, connection_kwargs=kwargs.pop('connection_kwargs', {}) ) # db = self.DEFAULT_DB if db is None else db # if ':memory:' not in db: # db_folder = dirname(db) # if not isabs(db): # log.debug("Passed 'db' argument isn't absolute: %s", db) # db = join(self.DEFAULT_DB_FOLDER, db) # log.debug("Prepended DEFAULT_DB_FOLDER to 'db' argument: %s", db) # db_folder = dirname(db) # # if not os.path.exists(db_folder): # log.debug("Database folder '%s' doesn't exist. Creating it + any missing parent folders", db_folder) # os.makedirs(db_folder) # else: # log.debug("Passed 'db' argument is :memory: - using in-memory sqlite3 database.") self.db = db super().__init__( db=db, connector_func=aiosqlite.connect, connector_args=[db], query_mode=self.query_mode, connector_kwargs=conn_kwargs, **kwargs )
[docs] async def get_cursor(self, cursor_name=None, cursor_class=None, *args, **kwargs) -> aiosqlite.Cursor: # Disable cursor_mgr by default, as aiosqlite already has a context manager. kwargs = dict(kwargs) kwargs['cursor_mgr'] = kwargs.pop('cursor_mgr', False) # noinspection PyTypeChecker return await super().get_cursor(cursor_name=cursor_name, cursor_class=cursor_class, *args, **kwargs)
@async_property async def cursor(self) -> aiosqlite.Cursor: # if self._cursor is None: # self._cursor = self.get_cursor(cursor_mgr=False, close_callback=self._close_callback) # if asyncio.iscoroutine(self._cursor): # self._cursor = await self._cursor return await self.get_cursor(cursor_mgr=False, close_callback=self._close_callback) # noinspection PyTypeChecker
[docs] @async_property async def conn(self) -> aiosqlite.Connection: """Get or create an SQLite3 connection using DB file :py:attr:`.db` and return it""" return await super().conn # type: aiosqlite.Connection
# noinspection PyTypeChecker def builder(self, table: str) -> SqliteAsyncQueryBuilder: return SqliteAsyncQueryBuilder( table=table, connection_args=self.connector_args, connection_kwargs=self.connector_kwargs ) async def _get_cursor(self, cursor_name=None, cursor_class=None, *args, **kwargs): # conn = await self.conn # return conn.cursor() return await self._get_connection(new=True, await_conn=False) # noinspection PyTypeChecker
[docs] async def insert(self, _table: str, _cursor: AIO_CUR = None, **fields) -> Union[DictObject, AIO_CUR]: return await super().insert(_table, _cursor, **fields)
_Q_OUT_TYPE = GenericAsyncDBWrapper._Q_OUT_TYPE # async def _query(self, sql: str, *params, fetch='all', **kwparams) -> _Q_OUT_TYPE: # conn = await self.conn # async with conn as db: # query_mode = kwparams.pop('query_mode', self.query_mode) # async with db.execute(sql, *params, **kwparams) as cur: # if fetch == 'all': # if self.AUTO_ZIP_COLS and query_mode == 'dict': # res = [self._zip_cols(cur, r) for r in cur] # elif fetch == 'one': # res = cur[0] # if res is None: # return None, cur, cursor_to_dict(cur) # if _should_zip(res, query_mode=query_mode, auto_zip=self.AUTO_ZIP_COLS): # res = self._zip_cols(cur, tuple(res)) # elif fetch == 'no': # res = None # else: # raise AttributeError("The parameter 'fetch' must be either 'all', 'one' or 'no'.") # if self.enable_execution_log: # self._execution_log += [DBExecution(sql, res, cur, cursor_to_dict(cur))] # return res, cur, cursor_to_dict(cur) async def execute(self, query: str, *params: Iterable, fetch='all', **kwargs) \ -> Tuple[Iterable, DictObject]: # cursor_name = kwargs.pop('cursor_name', None) cleanup_cursor = kwargs.pop('cleanup_cursor', True) _cur: aiosqlite.Connection = kwargs.pop('cursor', None) res = None # cur = _cur if _cur is None: # noinspection PyTypeChecker _cur: aiosqlite.Connection = await self._get_connection(new=True, await_conn=False) # if not cleanup_cursor: # # _cur. # cur = await _cur.execute(query, *params) # if fetch == 'all': res = await cur.fetchall() # if fetch == 'one': res = await cur.fetchone() # return res, cur async with _cur as conn: async with conn.execute(query, *params) as cur: if fetch == 'all': res = await cur.fetchall() if fetch == 'one': res = await cur.fetchone() cur_dict = cursor_to_dict(cur) await conn.commit() return res, cur_dict def __enter__(self): self._conn = self.conn return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close_cursor() if self._conn is not None: conn = await self._conn await conn.close() del self._conn self._conn = None def __exit__(self, exc_type, exc_val, exc_tb): self._conn = None
except ImportError: warnings.warn("Could not import 'aiosqlite'. SqliteAsyncWrapper will not be available.")