import configparser
import logging
import os
import re
import sqlite3
import sys
from threading import local
from acdcli.utils.conf import get_conf
from .cursors import *
from .format import FormatterMixin
from .query import QueryMixin
from .schema import SchemaMixin
from .sync import SyncMixin
logger = logging.getLogger(__name__)
_ROOT_ID_SQL = 'SELECT id FROM nodes WHERE name IS NULL AND type == "folder" ORDER BY created'
_SETTINGS_FILENAME = 'cache.ini'
_def_conf = configparser.ConfigParser()
_def_conf['sqlite'] = dict(filename='nodes.db', busy_timeout=30000, journal_mode='wal')
_def_conf['blacklist'] = dict(folders=[])
[docs]class IntegrityError(Exception):
[docs] def __init__(self, msg):
self.msg = msg
def __str__(self):
return repr(self.msg)
def _create_conn(path: str) -> sqlite3.Connection:
c = sqlite3.connect(path)
c.row_factory = sqlite3.Row # allow dict-like access on rows with col name
return c
def _regex_match(pattern: str, cell: str) -> bool:
if cell is None:
return False
return re.match(pattern, cell, re.IGNORECASE) is not None
[docs]class NodeCache(SchemaMixin, QueryMixin, SyncMixin, FormatterMixin):
IntegrityCheckType = dict(full=0, quick=1, none=2)
"""types of SQLite integrity checks"""
[docs] def __init__(self, cache_path: str='', settings_path='', check=IntegrityCheckType['full']):
self._conf = get_conf(settings_path, _SETTINGS_FILENAME, _def_conf)
self.db_path = os.path.join(cache_path, self._conf['sqlite']['filename'])
self.tl = local()
self.integrity_check(check)
try:
self.init()
except sqlite3.DatabaseError as e:
raise IntegrityError(e)
self._conn.create_function('REGEXP', _regex_match.__code__.co_argcount, _regex_match)
with cursor(self._conn) as c:
c.execute(_ROOT_ID_SQL)
row = c.fetchone()
if not row:
self.root_id = ''
return
first_id = row['id']
if c.fetchone():
raise IntegrityError('Could not uniquely identify root node.')
self.root_id = first_id
self._execute_pragma('busy_timeout', self._conf['sqlite']['busy_timeout'])
if sys.version_info[:3] != (3, 6, 0):
self._execute_pragma('journal_mode', self._conf['sqlite']['journal_mode'])
@property
def _conn(self) -> sqlite3.Connection:
if not hasattr(self.tl, '_conn'):
self.tl._conn = _create_conn(self.db_path)
return self.tl._conn
def _execute_pragma(self, key, value) -> str:
with cursor(self._conn) as c:
c.execute('PRAGMA %s=%s;' % (key, value))
r = c.fetchone()
if r:
logger.debug('Set %s to %s. Result: %s.' % (key, value, r[0]))
return r[0]
@classmethod
[docs] def remove_db_file(cls, cache_path='', settings_path='') -> bool:
"""Removes database file."""
import os
import random
import string
import tempfile
conf = get_conf(settings_path, _SETTINGS_FILENAME, _def_conf)
db_path = os.path.join(cache_path, conf['sqlite']['filename'])
tmp_name = ''.join(random.choice(string.ascii_lowercase) for _ in range(16))
tmp_name = os.path.join(tempfile.gettempdir(), tmp_name)
try:
os.rename(db_path, tmp_name)
except OSError:
logger.critical('Error renaming/removing database file "%s".' % db_path)
return False
else:
try:
os.remove(tmp_name)
except OSError:
logger.info('Database file was moved, but not deleted.')
return True
[docs] def integrity_check(self, type_: IntegrityCheckType):
"""Performs a `self-integrity check
<https://www.sqlite.org/pragma.html#pragma_integrity_check>`_ on the database."""
with cursor(self._conn) as c:
if type_ == NodeCache.IntegrityCheckType['full']:
r = c.execute('PRAGMA integrity_check;')
elif type_ == NodeCache.IntegrityCheckType['quick']:
r = c.execute('PRAGMA quick_check;')
else:
return
r = c.fetchone()
if not r or r[0] != 'ok':
logger.warn('Sqlite database integrity check failed. '
'You may need to clear the cache if you encounter any errors.')