Source code for zlmdb.tests.lmdb.testlib

#
# Copyright 2013 The py-lmdb authors, all rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted only as authorized by the OpenLDAP
# Public License.
#
# A copy of this license is available in the file LICENSE in the
# top-level directory of the distribution or, alternatively, at
# <http://www.OpenLDAP.org/license.html>.
#
# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
#
# Individual files and/or contributed packages may be copyright by
# other parties and/or subject to additional restrictions.
#
# This work also contains materials derived from public sources.
#
# Additional information about OpenLDAP can be obtained at
# <http://www.openldap.org/>.
#

import atexit
import gc
import os
import shutil
import stat
import tempfile
import traceback
import unittest

import zlmdb.lmdb as lmdb

[docs] _cleanups = []
[docs] def cleanup(): while _cleanups: func = _cleanups.pop() try: func() except Exception: traceback.print_exc()
atexit.register(cleanup)
[docs] class LmdbTest(unittest.TestCase):
[docs] def tearDown(self): cleanup()
[docs] def temp_dir(create=True): path = tempfile.mkdtemp(prefix='lmdb_test') assert path is not None, 'tempfile.mkdtemp failed' if not create: os.rmdir(path) _cleanups.append(lambda: shutil.rmtree(path, ignore_errors=True)) return path
[docs] def temp_file(create=True): fd, path = tempfile.mkstemp(prefix='lmdb_test') assert path is not None, 'tempfile.mkstemp failed' os.close(fd) if not create: os.unlink(path) _cleanups.append(lambda: os.path.exists(path) and os.unlink(path)) pathlock = path + '-lock' _cleanups.append(lambda: os.path.exists(pathlock) and os.unlink(pathlock)) return path
[docs] def temp_env(path=None, max_dbs=10, **kwargs): if not path: path = temp_dir() env = lmdb.open(path, max_dbs=max_dbs, **kwargs) _cleanups.append(env.close) return path, env
[docs] def path_mode(path): return stat.S_IMODE(os.stat(path).st_mode)
[docs] def debug_collect(): if hasattr(gc, 'set_debug') and hasattr(gc, 'get_debug'): old = gc.get_debug() gc.set_debug(gc.DEBUG_LEAK) gc.collect() gc.set_debug(old) else: for x in range(10): # PyPy doesn't collect objects with __del__ on first attempt. gc.collect()
[docs] INT_TYPES = (int,)
# B(ascii 'string') -> bytes
[docs] B = lambda s: bytes(s, 'ascii')
# BL('s1', 's2') -> ['bytes1', 'bytes2']
[docs] BL = lambda *args: list(map(B, args))
# TS('s1', 's2') -> ('bytes1', 'bytes2')
[docs] BT = lambda *args: tuple(B(s) for s in args)
# O(int) -> length-1 bytes
[docs] O = lambda arg: B(chr(arg))
# OCT(s) -> parse string as octal
[docs] OCT = lambda s: int(s, 8)
[docs] KEYS = BL('a', 'b', 'baa', 'd')
[docs] ITEMS = [(k, B('')) for k in KEYS]
[docs] REV_ITEMS = ITEMS[::-1]
[docs] VALUES = [B('') for k in KEYS]
[docs] KEYS2 = BL('a', 'b', 'baa', 'd', 'e', 'f', 'g', 'h')
[docs] ITEMS2 = [(k, B('')) for k in KEYS2]
[docs] REV_ITEMS2 = ITEMS2[::-1]
[docs] VALUES2 = [B('') for k in KEYS2]
[docs] KEYSFIXED = BL('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h')
[docs] VALUES_MULTI = [(B('r'), B('s')) for k in KEYSFIXED]
[docs] ITEMS_MULTI_FIXEDKEY = [ (kv[0], v) for kv in list(zip(KEYSFIXED, VALUES_MULTI)) for v in kv[1] ]
[docs] def _put_items(items, t, db=None): for k, v in items: if db: t.put(k, v, db=db) else: t.put(k, v)
[docs] def putData(t, db=None): _put_items(ITEMS, t, db=db)
[docs] def putBigData(t, db=None): _put_items(ITEMS2, t, db=db)
[docs] def putBigDataMultiFixed(t, db=None): _put_items(ITEMS_MULTI_FIXEDKEY, t, db=db)