Source code for acdcli.cache.query

import logging
from datetime import datetime
from .cursors import cursor

logger = logging.getLogger(__name__)


[docs]def datetime_from_string(dt: str) -> datetime: try: dt = datetime.strptime(dt, '%Y-%m-%d %H:%M:%S.%f+00:00') except ValueError: dt = datetime.strptime(dt, '%Y-%m-%d %H:%M:%S+00:00') return dt
CONFLICTING_NODE_SQL = """SELECT n.*, f.* FROM nodes n JOIN parentage p ON n.id = p.child LEFT OUTER JOIN files f ON n.id = f.id WHERE p.parent = (?) AND LOWER(name) = (?) AND status = 'AVAILABLE' ORDER BY n.name""" CHILDREN_SQL = """SELECT n.*, f.* FROM nodes n JOIN parentage p ON n.id = p.child LEFT OUTER JOIN files f ON n.id = f.id WHERE p.parent = (?) ORDER BY n.name""" CHILDRENS_NAMES_SQL = """SELECT n.name FROM nodes n JOIN parentage p ON n.id = p.child WHERE p.parent = (?) AND n.status == 'AVAILABLE' ORDER BY n.name""" NUM_CHILDREN_SQL = """SELECT COUNT(n.id) FROM nodes n JOIN parentage p ON n.id = p.child WHERE p.parent = (?) AND n.status == 'AVAILABLE'""" NUM_PARENTS_SQL = """SELECT COUNT(n.id) FROM nodes n JOIN parentage p ON n.id = p.parent WHERE p.child = (?) AND n.status == 'AVAILABLE'""" NUM_NODES_SQL = 'SELECT COUNT(*) FROM nodes' NUM_FILES_SQL = 'SELECT COUNT(*) FROM files' NUM_FOLDERS_SQL = 'SELECT COUNT(*) FROM nodes WHERE type == "folder"' CHILD_OF_SQL = """SELECT n.*, f.* FROM nodes n JOIN parentage p ON n.id = p.child LEFT OUTER JOIN files f ON n.id = f.id WHERE n.name = (?) AND p.parent = (?) ORDER BY n.status""" NODE_BY_ID_SQL = """SELECT n.*, f.* FROM nodes n LEFT OUTER JOIN files f ON n.id = f.id WHERE n.id = (?)""" USAGE_SQL = 'SELECT SUM(size) FROM files' FIND_BY_NAME_SQL = """SELECT n.*, f.* FROM nodes n LEFT OUTER JOIN files f ON n.id = f.id WHERE n.name LIKE ? ORDER BY n.name""" FIND_BY_REGEX_SQL = """SELECT n.*, f.* FROM nodes n LEFT OUTER JOIN files f ON n.id = f.id WHERE n.name REGEXP ? ORDER BY n.name""" FIND_BY_MD5_SQL = """SELECT n.*, f.* FROM nodes n LEFT OUTER JOIN files f ON n.id = f.id WHERE f.md5 == (?) ORDER BY n.name""" FIND_FIRST_PARENT_SQL = """SELECT n.* FROM nodes n JOIN parentage p ON n.id = p.parent WHERE p.child = (?) ORDER BY n.status, n.id""" # TODO: exclude files in trashed folders?! FILE_SIZE_EXISTS_SQL = """SELECT COUNT(*) FROM files f JOIN nodes n ON n.id = f.id WHERE f.size == (?) AND n.status == 'AVAILABLE'"""
[docs]class Node(object):
[docs] def __init__(self, row): self.id = row['id'] self.type = row['type'] self.name = row['name'] self.description = row['description'] self.cre = row['created'] self.mod = row['modified'] self.updated = row['updated'] self.status = row['status'] try: self.md5 = row['md5'] except IndexError: self.md5 = None try: self.size = row['size'] except IndexError: self.size = 0
def __lt__(self, other): return self.name < other.name def __hash__(self): return hash(self.id) def __repr__(self): return 'Node(%r, %r)' % (self.id, self.name) @property def is_folder(self): return self.type == 'folder' @property def is_file(self): return self.type == 'file' @property def is_available(self): return self.status == 'AVAILABLE' @property def is_trashed(self): return self.status == 'TRASH' @property def created(self): return datetime_from_string(self.cre) @property def modified(self): return datetime_from_string(self.mod) @property def simple_name(self): if self.is_file: return self.name return (self.name if self.name else '') + '/'
[docs]class QueryMixin(object):
[docs] def get_node(self, id) -> 'Union[Node|None]': with cursor(self._conn) as c: c.execute(NODE_BY_ID_SQL, [id]) r = c.fetchone() if r: return Node(r)
[docs] def get_root_node(self): return self.get_node(self.root_id)
[docs] def get_conflicting_node(self, name: str, parent_id: str): """Finds conflicting node in folder specified by *parent_id*, if one exists.""" with cursor(self._conn) as c: c.execute(CONFLICTING_NODE_SQL, [parent_id, name.lower()]) r = c.fetchone() if r: return Node(r)
[docs] def resolve(self, path: str, trash=False) -> 'Union[Node|None]': segments = list(filter(bool, path.split('/'))) if not segments: if not self.root_id: return with cursor(self._conn) as c: c.execute(NODE_BY_ID_SQL, [self.root_id]) r = c.fetchone() return Node(r) parent = self.root_id for i, segment in enumerate(segments): with cursor(self._conn) as c: c.execute(CHILD_OF_SQL, [segment, parent]) r = c.fetchone() r2 = c.fetchone() if not r: return r = Node(r) if not r.is_available: if not trash: return if r2: logger.debug('None-unique trash name "%s" in %s.' % (segment, parent)) return if i + 1 == len(segments): return r if r.is_folder: parent = r.id continue else: return
[docs] def childrens_names(self, folder_id) -> 'List[str]': with cursor(self._conn) as c: c.execute(CHILDRENS_NAMES_SQL, [folder_id]) kids = [] row = c.fetchone() while row: kids.append(row['name']) row = c.fetchone() return kids
[docs] def get_node_count(self) -> int: with cursor(self._conn) as c: c.execute(NUM_NODES_SQL) r = c.fetchone()[0] return r
[docs] def get_folder_count(self) -> int: with cursor(self._conn) as c: c.execute(NUM_FOLDERS_SQL) r = c.fetchone()[0] return r
[docs] def get_file_count(self) -> int: with cursor(self._conn) as c: c.execute(NUM_FILES_SQL) r = c.fetchone()[0] return r
[docs] def calculate_usage(self): with cursor(self._conn) as c: c.execute(USAGE_SQL) r = c.fetchone() return r[0] if r and r[0] else 0
[docs] def num_children(self, folder_id) -> int: with cursor(self._conn) as c: c.execute(NUM_CHILDREN_SQL, [folder_id]) num = c.fetchone()[0] return num
[docs] def num_parents(self, node_id) -> int: with cursor(self._conn) as c: c.execute(NUM_PARENTS_SQL, [node_id]) num = c.fetchone()[0] return num
[docs] def get_child(self, folder_id, child_name) -> 'Union[Node|None]': with cursor(self._conn) as c: c.execute(CHILD_OF_SQL, [child_name, folder_id]) r = c.fetchone() if r: r = Node(r) if r.is_available: return r
[docs] def list_children(self, folder_id, trash=False) -> 'Tuple[List[Node], List[Node]]': files = [] folders = [] with cursor(self._conn) as c: c.execute(CHILDREN_SQL, [folder_id]) node = c.fetchone() while node: node = Node(node) if node.is_available or trash: if node.is_file: files.append(node) elif node.is_folder: folders.append(node) node = c.fetchone() return folders, files
[docs] def list_trashed_children(self, folder_id) -> 'Tuple[List[Node], List[Node]]': folders, files = self.list_children(folder_id, True) folders[:] = [f for f in folders if f.is_trashed] files[:] = [f for f in files if f.is_trashed] return folders, files
[docs] def first_path(self, node_id: str) -> str: if node_id == self.root_id: return '/' with cursor(self._conn) as c: c.execute(FIND_FIRST_PARENT_SQL, (node_id,)) r = c.fetchone() node = Node(r) if node.id == self.root_id: return node.simple_name return self.first_path(node.id) + node.name + '/'
[docs] def find_by_name(self, name: str) -> 'List[Node]': nodes = [] with cursor(self._conn) as c: c.execute(FIND_BY_NAME_SQL, ['%' + name + '%']) r = c.fetchone() while r: nodes.append(Node(r)) r = c.fetchone() return nodes
[docs] def find_by_md5(self, md5) -> 'List[Node]': nodes = [] with cursor(self._conn) as c: c.execute(FIND_BY_MD5_SQL, (md5,)) r = c.fetchone() while r: nodes.append(Node(r)) r = c.fetchone() return nodes
[docs] def find_by_regex(self, regex) -> 'List[Node]': nodes = [] with cursor(self._conn) as c: c.execute(FIND_BY_REGEX_SQL, (regex,)) r = c.fetchone() while r: nodes.append(Node(r)) r = c.fetchone() return nodes
[docs] def file_size_exists(self, size) -> bool: with cursor(self._conn) as c: c.execute(FILE_SIZE_EXISTS_SQL, [size]) no = c.fetchone()[0] return bool(no)