Source code for zlmdb._database

#############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) typedef int GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################

import os
import shutil
import tempfile
import uuid
import pprint
import struct
import inspect
import time
from typing import Dict, Any, Tuple, List, Optional, Callable, Type

import lmdb
import yaml
import cbor2

from zlmdb._transaction import Transaction, TransactionStats
from zlmdb import _pmap
from zlmdb._pmap import MapStringJson, MapStringCbor, MapUuidJson, MapUuidCbor

import txaio

try:
    from twisted.python.reflect import qual
except ImportError:

    def qual(klass):
        return klass.__name__


KV_TYPE_TO_CLASS = {
    'string-json': (MapStringJson, lambda x: x, lambda x: x),
    'string-cbor': (MapStringCbor, lambda x: x, lambda x: x),
    'uuid-json': (MapUuidJson, lambda x: x, lambda x: x),
    'uuid-cbor': (MapUuidCbor, lambda x: x, lambda x: x),
}

_LMDB_MYPID_ENVS: Dict[str, Tuple['Database', int]] = {}


class ConfigurationElement(object):
    """
    Internal zLMDB configuration element base type.
    """

    __slots__ = (
        '_oid',
        '_name',
        '_description',
        '_tags',
    )

    def __init__(self,
                 oid: Optional[uuid.UUID] = None,
                 name: Optional[str] = None,
                 description: Optional[str] = None,
                 tags: Optional[List[str]] = None):
        self._oid = oid
        self._name = name
        self._description = description
        self._tags = tags

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, self.__class__):
            return False
        if other.oid != self.oid:
            return False
        if other.name != self.name:
            return False
        if other.description != self.description:
            return False
        if (self.tags and not other.tags) or (not self.tags and other.tags):
            return False
        if other.tags and self.tags:
            if set(other.tags) ^ set(self.tags):
                return False
        return True

    def __ne__(self, other: Any) -> bool:
        return not self.__eq__(other)

    @property
    def oid(self) -> Optional[uuid.UUID]:
        return self._oid

    @property
    def name(self) -> Optional[str]:
        return self._name

    @property
    def description(self) -> Optional[str]:
        return self._description

    @property
    def tags(self) -> Optional[List[str]]:
        return self._tags

    def __str__(self) -> str:
        return pprint.pformat(self.marshal())

    def marshal(self) -> Dict[str, Any]:
        value: Dict[str, Any] = {
            'oid': str(self._oid),
            'name': self._name,
        }
        if self.description:
            value['description'] = self._description
        if self.tags:
            value['tags'] = self._tags
        return value

    @staticmethod
    def parse(value: Dict[str, Any]) -> 'ConfigurationElement':
        assert type(value) == dict
        oid = value.get('oid', None)
        if oid:
            oid = uuid.UUID(oid)
        obj = ConfigurationElement(oid=oid,
                                   name=value.get('name', None),
                                   description=value.get('description', None),
                                   tags=value.get('tags', None))
        return obj


class Slot(ConfigurationElement):
    """
    Internal zLMDB database slot configuration element.
    """

    __slots__ = (
        '_slot',
        '_creator',
    )

    def __init__(self,
                 oid: Optional[uuid.UUID] = None,
                 name: Optional[str] = None,
                 description: Optional[str] = None,
                 tags: Optional[List[str]] = None,
                 slot: Optional[int] = None,
                 creator: Optional[str] = None):
        ConfigurationElement.__init__(self, oid=oid, name=name, description=description, tags=tags)
        self._slot = slot
        self._creator = creator

    @property
    def creator(self) -> Optional[str]:
        return self._creator

    @property
    def slot(self) -> Optional[int]:
        return self._slot

    def __str__(self) -> str:
        return pprint.pformat(self.marshal())

    def marshal(self) -> Dict[str, Any]:
        obj = ConfigurationElement.marshal(self)
        obj.update({
            'creator': self._creator,
            'slot': self._slot,
        })
        return obj

    @staticmethod
    def parse(data: Dict[str, Any]) -> 'Slot':
        assert type(data) == dict

        obj = ConfigurationElement.parse(data)

        slot = data.get('slot', None)
        creator = data.get('creator', None)

        drvd_obj = Slot(oid=obj.oid,
                        name=obj.name,
                        description=obj.description,
                        tags=obj.tags,
                        slot=slot,
                        creator=creator)
        return drvd_obj


class Schema(object):
    def __init__(self, meta, slots, slots_byname):
        self._meta = meta
        self._slots = slots
        self._slots_byname = slots_byname

    def __str__(self):
        return pprint.pformat(self._meta)

    def __getitem__(self, name):
        assert type(name) == str

        if name not in self._slots_byname:
            raise IndexError('no slot with name "{}"'.format(name))

        return self._slots[self._slots_byname[name]]

    def __setitem__(self, name, value):
        raise NotImplementedError('schema is read-only')

    def __delitem__(self, name):
        raise NotImplementedError('schema is read-only')

    def __len__(self):
        return len(self._slots_byname)

    def __iter__(self):
        raise Exception('not implemented')

    @staticmethod
    def parse(filename, klassmap=KV_TYPE_TO_CLASS):
        with open(filename) as f:
            _meta = yaml.safe_load(f.read())

            meta = {}
            slots = {}
            slots_byname = {}

            for slot in _meta.get('slots', []):
                _index = slot.get('index', None)
                assert type(_index) == int and _index >= 100 and _index < 65536
                assert _index not in slots

                _name = slot.get('name', None)
                assert type(_name) == str
                assert _name not in slots_byname

                _key = slot.get('key', None)
                assert _key in ['string', 'uuid']

                _value = slot.get('value', None)
                assert _value in ['json', 'cbor']

                _schema = slot.get('schema', None)
                assert _schema is None or type(_schema) == str

                _description = slot.get('description', None)
                assert (_description is None or type(_description) == str)

                if _schema:
                    _kv_type = '{}-{}-{}'.format(_key, _value, _schema)
                else:
                    _kv_type = '{}-{}'.format(_key, _value)

                _kv_klass, _marshal, _unmarshal = klassmap.get(_kv_type, (None, None, None))

                assert _kv_klass
                assert _marshal
                assert _unmarshal

                meta[_index] = {
                    'index': _index,
                    'name': _name,
                    'key': _key,
                    'value': _value,
                    'impl': _kv_klass.__name__ if _kv_klass else None,
                    'description': _description,
                }
                slots[_index] = _kv_klass(_index, marshal=_marshal, unmarshal=_unmarshal)
                slots_byname[_name] = _index

            return Schema(meta, slots, slots_byname)


[docs]class Database(object): """ ZLMDB database access. Objects of this class are generally "light-weight" (cheap to create and destroy), but do manage internal resource such as file descriptors. To manage these resources in a robust way, this class implements the Python context manager interface. """ __slots__ = ( 'log', '_is_temp', '_tempdir', '_dbpath', '_maxsize', '_readonly', '_lock', '_sync', '_create', '_open_now', '_writemap', '_context', '_slots', '_slots_by_index', '_env', ) def __init__(self, dbpath: Optional[str] = None, maxsize: int = 10485760, readonly: bool = False, lock: bool = True, sync: bool = True, create: bool = True, open_now: bool = True, writemap: bool = False, context: Any = None, log: Optional[txaio.interfaces.ILogger] = None): """ :param dbpath: LMDB database path: a directory with (at least) 2 files, a ``data.mdb`` and a ``lock.mdb``. If no database exists at the given path, create a new one. :param maxsize: Database size limit in bytes, with a default of 1MB. :param readonly: Open database read-only. When ``True``, deny any modifying database operations. Note that the LMDB lock file (``lock.mdb``) still needs to be written (by readers also), and hence at the filesystem level, a LMDB database directory must be writable. :param sync: Open database with sync on commit. :param create: Automatically create database if it does not yet exist. :param open_now: Open the database immediately (within this constructor). :param writemap: Use direct write to mmap'ed database rather than regular file IO writes. Be careful when using any storage other than locally attached filesystem/drive. :param context: Optional context within which this database instance is created. :param log: Log object to use for logging from this class. """ self._context = context if log: self.log = log else: if not txaio._explicit_framework: txaio.use_asyncio() self.log = txaio.make_logger() if dbpath: self._is_temp = False self._tempdir = None self._dbpath = dbpath else: self._is_temp = True self._tempdir = tempfile.TemporaryDirectory() self._dbpath = self._tempdir.name self._maxsize = maxsize self._readonly = readonly self._lock = lock self._sync = sync self._create = create self._open_now = open_now self._writemap = writemap self._context = context self._slots: Optional[Dict[uuid.UUID, Slot]] = None self._slots_by_index: Optional[Dict[uuid.UUID, int]] = None # in a context manager environment we initialize with LMDB handle # when we enter the actual temporary, managed context .. self._env: Optional[lmdb.Environment] = None # in a direct run environment, we immediately open LMDB if self._open_now: self.__enter__() def __enter__(self): """ Enter database runtime context and open the underlying LMDB database environment. .. note:: Enter the runtime context related to this object. The with statement will bind this method’s return value to the target(s) specified in the as clause of the statement, if any. [Source](https://docs.python.org/3/reference/datamodel.html#object.__enter__) .. note:: A context manager is an object that defines the runtime context to be established when executing a with statement. The context manager handles the entry into, and the exit from, the desired runtime context for the execution of the block of code. Context managers are normally invoked using the with statement (described in section The with statement), but can also be used by directly invoking their methods." [Source](https://docs.python.org/3/reference/datamodel.html#with-statement-context-managers) :return: This database instance (in open state). """ if not self._env: # protect against opening the same database file multiple times within the same process: # "It is a serious error to have open (multiple times) the same LMDB file in # the same process at the same time. Failure to heed this may lead to data # corruption and interpreter crash." # https://lmdb.readthedocs.io/en/release/#environment-class if not self._is_temp: if self._dbpath in _LMDB_MYPID_ENVS: other_obj, other_pid = _LMDB_MYPID_ENVS[self._dbpath] raise RuntimeError( 'tried to open same dbpath "{}" twice within same process: cannot open database ' 'for {} (PID {}, Context {}), already opened in {} (PID {}, Context {})'.format( self._dbpath, self, os.getpid(), self.context, other_obj, other_pid, other_obj.context)) _LMDB_MYPID_ENVS[self._dbpath] = self, os.getpid() # handle lmdb.LockError: mdb_txn_begin: Resource temporarily unavailable # "The environment was locked by another process." # https://lmdb.readthedocs.io/en/release/#lmdb.LockError # count number of retries retries = 0 # delay (in seconds) before retrying retry_delay = 0 while True: try: # https://lmdb.readthedocs.io/en/release/#lmdb.Environment # https://lmdb.readthedocs.io/en/release/#writemap-mode # map_size: Maximum size database may grow to; used to size the memory mapping. # lock=True is needed for concurrent access, even when only by readers (because of space mgmt) self._env = lmdb.open(self._dbpath, map_size=self._maxsize, create=self._create, readonly=self._readonly, sync=self._sync, subdir=True, lock=self._lock, writemap=self._writemap) # ok, good: we've got a LMDB env break # see https://github.com/crossbario/zlmdb/issues/53 except lmdb.LockError as e: retries += 1 if retries >= 3: # give up and signal to user code raise RuntimeError('cannot open LMDB environment (giving up ' 'after {} retries): {}'.format(retries, e)) # use synchronous (!) sleep (1st time is sleep(0), which releases execution of this process to OS) time.sleep(retry_delay) # increase sleep time by 10ms _next_ time. that is, for our 3 attempts # the delays are: 0ms, 10ms, 20ms retry_delay += 0.01 return self def __exit__(self, exc_type, exc_value, traceback): """ Exit runtime context and close the underlying LMDB database environment. .. note:: Exit the runtime context related to this object. The parameters describe the exception that caused the context to be exited. If the context was exited without an exception, all three arguments will be None. [Source](https://docs.python.org/3/reference/datamodel.html#object.__exit__). :param exc_type: :param exc_value: :param traceback: :return: """ if self._env: self._env.close() self._env = None if not self._is_temp and self._dbpath in _LMDB_MYPID_ENVS: del _LMDB_MYPID_ENVS[self._dbpath] @staticmethod def open(dbpath: Optional[str] = None, maxsize: int = 10485760, readonly: bool = False, lock: bool = True, sync: bool = True, create: bool = True, open_now: bool = True, writemap: bool = False, context: Any = None, log: Optional[txaio.interfaces.ILogger] = None) -> 'Database': if dbpath is not None and dbpath in _LMDB_MYPID_ENVS: db, _ = _LMDB_MYPID_ENVS[dbpath] print( '{}: reusing database instance for path "{}" in new context {} already opened from (first) context {}'. format(Database.open, dbpath, context, db.context)) else: db = Database(dbpath=dbpath, maxsize=maxsize, readonly=readonly, lock=lock, sync=sync, create=create, open_now=open_now, writemap=writemap, context=context, log=log) print('{}: creating new database instance for path "{}" in context {}'.format( Database.open, dbpath, context)) return db @property def context(self): """ :return: """ return self._context @property def dbpath(self) -> Optional[str]: """ :return: """ return self._dbpath @property def maxsize(self) -> int: """ :return: """ return self._maxsize @property def is_sync(self) -> bool: """ :return: """ return self._sync @property def is_readonly(self) -> bool: """ :return: """ return self._readonly @property def is_writemap(self) -> bool: """ :return: """ return self._writemap @property def is_open(self) -> bool: """ :return: """ return self._env is not None
[docs] @staticmethod def scratch(dbpath: str): """ :param dbpath: :return: """ if os.path.exists(dbpath): if os.path.isdir(dbpath): shutil.rmtree(dbpath) else: os.remove(dbpath)
[docs] def begin(self, write: bool = False, buffers: bool = False, stats: Optional[TransactionStats] = None) -> Transaction: """ :param write: :param buffers: :param stats: :return: """ assert self._env is not None if write and self._readonly: raise Exception('database is read-only') txn = Transaction(db=self, write=write, buffers=buffers, stats=stats) return txn
[docs] def sync(self, force: bool = False): """ :param force: :return: """ assert self._env is not None self._env.sync(force=force)
[docs] def config(self) -> Dict[str, Any]: """ :return: """ res = { 'is_temp': self._is_temp, 'dbpath': self._dbpath, 'maxsize': self._maxsize, 'readonly': self._readonly, 'lock': self._lock, 'sync': self._sync, 'create': self._create, 'open_now': self._open_now, 'writemap': self._writemap, 'context': str(self._context) if self._context else None, } return res
[docs] def stats(self, include_slots: bool = False) -> Dict[str, Any]: """ :param include_slots: :return: """ assert self._env is not None current_size = os.path.getsize(os.path.join(self._dbpath, 'data.mdb')) # psize Size of a database page in bytes. # depth Height of the B-tree. # branch_pages Number of internal (non-leaf) pages. # leaf_pages Number of leaf pages. # overflow_pages Number of overflow pages. # entries Number of data items. stats = self._env.stat() pages = stats['leaf_pages'] + stats['overflow_pages'] + stats['branch_pages'] used = stats['psize'] * pages self._cache_slots() res: Dict[str, Any] = { 'num_slots': len(self._slots) if self._slots else 0, 'current_size': current_size, 'max_size': self._maxsize, 'page_size': stats['psize'], 'pages': pages, 'used': used, 'free': 1. - float(used) / float(self._maxsize), 'read_only': self._readonly, 'sync_enabled': self._sync, } res.update(stats) # map_addr Address of database map in RAM. # map_size Size of database map in RAM. # last_pgno ID of last used page. # last_txnid ID of last committed transaction. # max_readers Number of reader slots allocated in the lock file. Equivalent to the value of # maxreaders= specified by the first process opening the Environment. # num_readers Maximum number of reader slots in simultaneous use since the lock file was initialized. res.update(self._env.info()) if include_slots: slots = self._get_slots() res['slots'] = [] with self.begin() as txn: for slot_id in slots: slot = slots[slot_id] pmap = _pmap.PersistentMap(slot.slot) res['slots'].append({ 'oid': str(slot_id), 'slot': slot.slot, 'name': slot.name, 'description': slot.description, 'records': pmap.count(txn), }) return res
def _cache_slots(self): """ :return: """ slots = {} slots_by_index = {} with self.begin() as txn: from_key = struct.pack('>H', 0) to_key = struct.pack('>H', 1) cursor = txn._txn.cursor() found = cursor.set_range(from_key) while found: _key = cursor.key() if _key >= to_key: break if len(_key) >= 4: # key = struct.unpack('>H', _key[0:2]) slot_index = struct.unpack('>H', _key[2:4])[0] slot = Slot.parse(cbor2.loads(cursor.value())) assert slot.slot == slot_index slots[slot.oid] = slot slots_by_index[slot.oid] = slot_index found = cursor.next() self._slots = slots self._slots_by_index = slots_by_index def _get_slots(self, cached=True) -> Dict[uuid.UUID, Slot]: """ :param cached: :return: """ if self._slots is None or not cached: self._cache_slots() assert self._slots return self._slots def _get_free_slot(self) -> int: """ :return: """ if self._slots_by_index is None: self._cache_slots() assert self._slots_by_index is not None slot_indexes = sorted(self._slots_by_index.values()) if len(slot_indexes) > 0: return slot_indexes[-1] + 1 else: return 1 def _set_slot(self, slot_index: int, slot: Optional[Slot]): """ :param slot_index: :param slot: :return: """ assert type(slot_index) == int assert 0 < slot_index < 65536 assert slot is None or isinstance(slot, Slot) if self._slots is None: self._cache_slots() assert self._slots is not None assert self._slots_by_index is not None key = b'\0\0' + struct.pack('>H', slot_index) if slot: assert slot_index == slot.slot assert slot.oid data = cbor2.dumps(slot.marshal()) with self.begin(write=True) as txn: txn._txn.put(key, data) self._slots[slot.oid] = slot self._slots_by_index[slot.oid] = slot_index self.log.debug('Wrote metadata for table <{oid}> to slot {slot_index:03d}', oid=slot.oid, slot_index=slot_index) else: with self.begin(write=True) as txn: result = txn.get(key) if result: txn._txn.delete(key) slot = Slot.parse(cbor2.loads(result)) if slot.oid in self._slots: del self._slots[slot.oid] if slot.oid in self._slots_by_index: del self._slots_by_index[slot.oid] self.log.debug('Deleted metadata for table <{oid}> from slot {slot_index:03d}', oid=slot.oid, slot_index=slot_index)
[docs] def attach_table(self, klass: Type[_pmap.PersistentMap]): """ :param klass: :return: """ if not inspect.isclass(klass): raise TypeError( 'cannot attach object {} as database table: a subclass of zlmdb.PersistentMap is required'.format( klass)) name = qual(klass) if not issubclass(klass, _pmap.PersistentMap): raise TypeError( 'cannot attach object of class {} as a database table: a subclass of zlmdb.PersistentMap is required'. format(name)) if not hasattr(klass, '_zlmdb_oid') or not klass._zlmdb_oid: raise TypeError('{} is not decorated as table slot'.format(klass)) description = klass.__doc__.strip() if klass.__doc__ else None if self._slots is None: self._cache_slots() pmap = self._attach_slot(klass._zlmdb_oid, klass, marshal=klass._zlmdb_marshal, parse=klass._zlmdb_parse, build=klass._zlmdb_build, cast=klass._zlmdb_cast, compress=klass._zlmdb_compress, create=True, name=name, description=description) return pmap
def _attach_slot(self, oid: uuid.UUID, klass: Type[_pmap.PersistentMap], marshal: Optional[Callable] = None, parse: Optional[Callable] = None, build: Optional[Callable] = None, cast: Optional[Callable] = None, compress: Optional[int] = None, create: bool = True, name: Optional[str] = None, description: Optional[str] = None): """ :param oid: :param klass: :param marshal: :param parse: :param build: :param cast: :param compress: :param create: :param name: :param description: :return: """ assert isinstance(oid, uuid.UUID) assert issubclass(klass, _pmap.PersistentMap) assert marshal is None or callable(marshal) assert parse is None or callable(parse) assert build is None or callable(build) assert cast is None or callable(cast) # either marshal+parse (for CBOR/JSON) OR build+cast (for Flatbuffers) OR all unset assert (not marshal and not parse and not build and not cast) or \ (not marshal and not parse and build and cast) or \ (marshal and parse and not build and not cast) assert compress is None or compress in [_pmap.PersistentMap.COMPRESS_ZLIB, _pmap.PersistentMap.COMPRESS_SNAPPY] assert type(create) == bool assert name is None or type(name) == str assert description is None or type(description) == str assert self._slots_by_index is not None if oid not in self._slots_by_index: self.log.debug('No slot found in database for DB table <{oid}>: <{name}>', name=name, oid=oid) if create: slot_index = self._get_free_slot() slot = Slot(oid=oid, creator='unknown', slot=slot_index, name=name, description=description) self._set_slot(slot_index, slot) self.log.info('Allocated new slot {slot_index:03d} for database table <{oid}>: {name}', slot_index=slot_index, oid=oid, name=name) else: raise RuntimeError('No slot found in database for DB table <{}>: "{}"'.format(oid, name)) else: slot_index = self._slots_by_index[oid] # pmap = _pmap.PersistentMap(slot_index) # with self.begin() as txn: # records = pmap.count(txn) self.log.debug('Database table <{name}> attached [oid=<{oid}>, slot=<{slot_index:03d}>]', name=name, oid=oid, slot_index=slot_index) if marshal: slot_pmap = klass(slot_index, marshal=marshal, unmarshal=parse, compress=compress) # type: ignore elif build: slot_pmap = klass(slot_index, build=build, cast=cast, compress=compress) # type: ignore else: slot_pmap = klass(slot_index, compress=compress) return slot_pmap