Source code for zlmdb.tests.lmdb.testlib

#
# Copyright 2013 The py-lmdb authors, all rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted only as authorized by the OpenLDAP
# Public License.
#
# A copy of this license is available in the file LICENSE in the
# top-level directory of the distribution or, alternatively, at
# <http://www.OpenLDAP.org/license.html>.
#
# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
#
# Individual files and/or contributed packages may be copyright by
# other parties and/or subject to additional restrictions.
#
# This work also contains materials derived from public sources.
#
# Additional information about OpenLDAP can be obtained at
# <http://www.openldap.org/>.
#

from __future__ import absolute_import
import atexit
import gc
import os
import shutil
import stat
import sys
import tempfile
import traceback
import unittest

try:
    import __builtin__
except ImportError:
    import builtins as __builtin__

import zlmdb.lmdb as lmdb

[docs] _cleanups = []
[docs] def cleanup(): while _cleanups: func = _cleanups.pop() try: func() except Exception: traceback.print_exc()
atexit.register(cleanup)
[docs] class LmdbTest(unittest.TestCase):
[docs] def tearDown(self): cleanup()
[docs] def temp_dir(create=True): path = tempfile.mkdtemp(prefix="lmdb_test") assert path is not None, "tempfile.mkdtemp failed" if not create: os.rmdir(path) _cleanups.append(lambda: shutil.rmtree(path, ignore_errors=True)) if hasattr(path, "decode"): path = path.decode(sys.getfilesystemencoding()) return path
[docs] def temp_file(create=True): fd, path = tempfile.mkstemp(prefix="lmdb_test") assert path is not None, "tempfile.mkstemp failed" os.close(fd) if not create: os.unlink(path) _cleanups.append(lambda: os.path.exists(path) and os.unlink(path)) pathlock = path + "-lock" _cleanups.append(lambda: os.path.exists(pathlock) and os.unlink(pathlock)) if hasattr(path, "decode"): path = path.decode(sys.getfilesystemencoding()) return path
[docs] def temp_env(path=None, max_dbs=10, **kwargs): if not path: path = temp_dir() env = lmdb.open(path, max_dbs=max_dbs, **kwargs) _cleanups.append(env.close) return path, env
[docs] def path_mode(path): return stat.S_IMODE(os.stat(path).st_mode)
[docs] def debug_collect(): if hasattr(gc, "set_debug") and hasattr(gc, "get_debug"): old = gc.get_debug() gc.set_debug(gc.DEBUG_LEAK) gc.collect() gc.set_debug(old) else: for x in range(10): # PyPy doesn't collect objects with __del__ on first attempt. gc.collect()
[docs] UnicodeType = getattr(__builtin__, "unicode", str)
[docs] BytesType = getattr(__builtin__, "bytes", str)
try:
[docs] INT_TYPES = (int, long)
except NameError: INT_TYPES = (int,) # B(ascii 'string') -> bytes try: bytes("") # Python>=2.6, alias for str().
[docs] def B(s): return s
except TypeError: # Python3.x, requires encoding parameter. def B(s): return bytes(s, "ascii") # BL('s1', 's2') -> ['bytes1', 'bytes2']
[docs] def BL(*args): return list(map(B, args))
# BT('s1', 's2') -> ('bytes1', 'bytes2')
[docs] def BT(*args): return tuple(B(s) for s in args)
# byte_chr(int) -> single byte from int (via chr)
[docs] def byte_chr(arg): return B(chr(arg))
# OCT(s) -> parse string as octal
[docs] def OCT(s): return int(s, 8)
[docs] KEYS = BL("a", "b", "baa", "d")
[docs] ITEMS = [(k, B("")) for k in KEYS]
[docs] REV_ITEMS = ITEMS[::-1]
[docs] VALUES = [B("") for k in KEYS]
[docs] KEYS2 = BL("a", "b", "baa", "d", "e", "f", "g", "h")
[docs] ITEMS2 = [(k, B("")) for k in KEYS2]
[docs] REV_ITEMS2 = ITEMS2[::-1]
[docs] VALUES2 = [B("") for k in KEYS2]
[docs] KEYSFIXED = BL("a", "b", "c", "d", "e", "f", "g", "h")
[docs] VALUES_MULTI = [(B("r"), B("s")) for k in KEYSFIXED]
[docs] ITEMS_MULTI_FIXEDKEY = [ (kv[0], v) for kv in list(zip(KEYSFIXED, VALUES_MULTI)) for v in kv[1] ]
[docs] def _put_items(items, t, db=None): for k, v in items: if db: t.put(k, v, db=db) else: t.put(k, v)
[docs] def putData(t, db=None): _put_items(ITEMS, t, db=db)
[docs] def putBigData(t, db=None): _put_items(ITEMS2, t, db=db)
[docs] def putBigDataMultiFixed(t, db=None): _put_items(ITEMS_MULTI_FIXEDKEY, t, db=db)