Source code for acdcli.cache.schema

import logging
from sqlite3 import OperationalError
from .cursors import *

logger = logging.getLogger(__name__)

# _KeyValueStorage


_CREATION_SCRIPT = """
    CREATE TABLE metadata (
        "key" VARCHAR(64) NOT NULL,
        value VARCHAR,
        PRIMARY KEY ("key")
    );

    CREATE TABLE nodes (
        id VARCHAR(50) NOT NULL,
        type VARCHAR(15),
        name VARCHAR(256),
        description VARCHAR(500),
        created DATETIME,
        modified DATETIME,
        updated DATETIME,
        status VARCHAR(9),
        PRIMARY KEY (id),
        UNIQUE (id),
        CHECK (status IN ('AVAILABLE', 'TRASH', 'PURGED', 'PENDING'))
    );

    CREATE TABLE labels (
        id VARCHAR(50) NOT NULL,
        name VARCHAR(256) NOT NULL,
        PRIMARY KEY (id, name),
        FOREIGN KEY(id) REFERENCES nodes (id)
    );

    CREATE TABLE files (
        id VARCHAR(50) NOT NULL,
        md5 VARCHAR(32),
        size BIGINT,
        PRIMARY KEY (id),
        UNIQUE (id),
        FOREIGN KEY(id) REFERENCES nodes (id)
    );

    CREATE TABLE parentage (
        parent VARCHAR(50) NOT NULL,
        child VARCHAR(50) NOT NULL,
        PRIMARY KEY (parent, child),
        FOREIGN KEY(parent) REFERENCES folders (id),
        FOREIGN KEY(child) REFERENCES nodes (id)
    );

    CREATE INDEX ix_parentage_child ON parentage(child);
    CREATE INDEX ix_nodes_names ON nodes(name);
    PRAGMA user_version = 3;
    """

_GEN_DROP_TABLES_SQL = \
    'SELECT "DROP TABLE " || name || ";" FROM sqlite_master WHERE type == "table"'

_migrations = []
"""list of all schema migrations"""


def _migration(func):
    """scheme migration annotation; must be used in correct order"""
    _migrations.append(func)
    return func


@_migration
def _0_to_1(conn):
    conn.executescript(
        'ALTER TABLE nodes ADD updated DATETIME;'
        'ALTER TABLE nodes ADD description VARCHAR(500);'
        'PRAGMA user_version = 1;'
    )
    conn.commit()


@_migration
def _1_to_2(conn):
    conn.executescript(
        'DROP TABLE IF EXISTS folders;'
        'CREATE INDEX IF NOT EXISTS ix_nodes_names ON nodes(name);'
        'REINDEX;'
        'PRAGMA user_version = 2;'
    )
    conn.commit()


@_migration
def _2_to_3(conn):
    conn.executescript(
        'CREATE INDEX IF NOT EXISTS ix_parentage_child ON parentage(child);'
        'REINDEX;'
        'PRAGMA user_version = 3;'
    )
    conn.commit()


[docs]class SchemaMixin(object): _DB_SCHEMA_VER = 3
[docs] def init(self): try: self.create_tables() except OperationalError: pass with cursor(self._conn) as c: c.execute('PRAGMA user_version;') r = c.fetchone() ver = r[0] logger.info('DB schema version is %i.' % ver) if self._DB_SCHEMA_VER > ver: self._migrate(ver) self.KeyValueStorage = _KeyValueStorage(self._conn)
[docs] def create_tables(self): self._conn.executescript(_CREATION_SCRIPT) self._conn.commit()
def _migrate(self, version): for i, migration in enumerate(_migrations[version:]): v = i + version logger.info('Migrating from schema version %i to %i' % (v, v + 1)) migration(self._conn)
[docs] def drop_all(self): drop_sql = [] with cursor(self._conn) as c: c.execute(_GEN_DROP_TABLES_SQL) dt = c.fetchone() while dt: drop_sql.append(dt[0]) dt = c.fetchone() with mod_cursor(self._conn) as c: for drop in drop_sql: c.execute(drop) self._conn.commit() logger.info('Dropped all tables.') return True
class _KeyValueStorage(object): def __init__(self, conn): self.conn = conn def __getitem__(self, key: str): with cursor(self.conn) as c: c.execute('SELECT value FROM metadata WHERE key = (?)', [key]) r = c.fetchone() if r: return r['value'] else: raise KeyError def __setitem__(self, key: str, value: str): with mod_cursor(self.conn) as c: c.execute('INSERT OR REPLACE INTO metadata VALUES (?, ?)', [key, value]) # def __len__(self): # return self.Session.query(Metadate).count() def get(self, key: str, default: str = None): with cursor(self.conn) as c: c.execute('SELECT value FROM metadata WHERE key == ?', [key]) r = c.fetchone() return r['value'] if r else default def update(self, dict_: dict): for key in dict_.keys(): self.__setitem__(key, dict_[key])