# Copyright 2013-2025 The py-lmdb authors, all rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted only as authorized by the OpenLDAP
# Public License.
#
# A copy of this license is available in the file LICENSE in the
# top-level directory of the distribution or, alternatively, at
# <http://www.OpenLDAP.org/license.html>.
#
# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
#
# Individual files and/or contributed packages may be copyright by
# other parties and/or subject to additional restrictions.
#
# This work also contains materials derived from public sources.
#
# Additional information about OpenLDAP can be obtained at
# <http://www.openldap.org/>.
"""
Async wrappers for py-lmdb via :func:`asyncio.loop.run_in_executor`.
Usage::
import lmdb
import lmdb.aio
env = lmdb.open('/tmp/mydb')
aenv = lmdb.aio.wrap(env)
async with aenv.begin(write=True) as txn:
await txn.put(b'key', b'value')
val = await txn.get(b'key')
"""
import asyncio
import functools
from . import Cursor, Environment, Transaction
[docs]
def wrap(env, executor=None):
"""Wrap an :class:`lmdb.Environment` for async use.
*executor* is passed to :meth:`loop.run_in_executor`. ``None`` (the
default) uses the loop's default executor.
"""
return AsyncEnvironment(env, executor)
[docs]
class _AsyncContextWrapper:
"""Wraps a coroutine so it can be used as both ``await`` and ``async with``.
Supports::
txn = await aenv.begin(write=True) # just await
async with aenv.begin(write=True) as txn: # context manager
"""
[docs]
__slots__ = ('_coro', '_result')
def __init__(self, coro):
[docs]
self._result = None
[docs]
def __await__(self):
return self._coro.__await__()
[docs]
async def __aenter__(self):
self._result = await self._coro
return self._result
[docs]
async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self._result.__aexit__(exc_type, exc_val, exc_tb)
# ---------------------------------------------------------------------------
# Proxy method factories
# ---------------------------------------------------------------------------
[docs]
def _sync_method(sync):
"""Return a method that calls *sync* directly, without an executor."""
@functools.wraps(sync)
def method(self, *args, **kwargs):
return sync(getattr(self, self._WRAPS), *args, **kwargs)
return method
[docs]
def _async_method(sync):
"""Return a coroutine method that calls *sync* in the executor."""
@functools.wraps(sync)
async def method(self, *args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self._executor,
functools.partial(sync, getattr(self, self._WRAPS), *args, **kwargs),
)
return method
[docs]
def _async_method_locked(sync):
"""Like :func:`_async_method`, but acquires ``self._lock`` first."""
@functools.wraps(sync)
async def method(self, *args, **kwargs):
async with self._lock:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self._executor,
functools.partial(sync, getattr(self, self._WRAPS), *args, **kwargs),
)
return method
[docs]
def _collect_locked(sync):
"""Like :func:`_async_method_locked`, but *sync* returns an iterator
consumed in the executor and returned as a list."""
@functools.wraps(sync)
async def method(self, *args, **kwargs):
async with self._lock:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self._executor,
lambda: list(sync(getattr(self, self._WRAPS), *args, **kwargs)),
)
return method
# ---------------------------------------------------------------------------
# Async wrappers
# ---------------------------------------------------------------------------
[docs]
class AsyncEnvironment:
"""Async wrapper for :py:class:`lmdb.Environment`.
Created by :py:func:`wrap`. All methods of the underlying
:py:class:`~lmdb.Environment` are available and are dispatched to an
executor, except for the low-overhead accessors ``path()``,
``max_key_size()``, ``max_readers()``, and ``flags()``, which are called
directly.
Supports ``async with`` for lifetime management — the environment is
closed on exit.
"""
[docs]
__slots__ = ('_env', '_executor')
def __init__(self, env, executor=None):
[docs]
self._executor = executor
# -- context manager --------------------------------------------------
[docs]
async def __aenter__(self):
return self
[docs]
async def __aexit__(self, *_exc):
self._env.close()
# -- methods that return wrapped objects -------------------------------
[docs]
def begin(self, *args, **kwargs):
"""Start a new transaction, returning an :py:class:`AsyncTransaction`.
Accepts the same arguments as :py:meth:`lmdb.Environment.begin`.
Can be used with ``await`` or ``async with``::
async with aenv.begin(write=True) as txn:
await txn.put(b'key', b'value')
"""
async def _begin():
loop = asyncio.get_running_loop()
txn = await loop.run_in_executor(
self._executor,
functools.partial(self._env.begin, *args, **kwargs),
)
return AsyncTransaction(txn, self._executor)
return _AsyncContextWrapper(_begin())
# -- proxied methods --------------------------------------------------
[docs]
path = _sync_method(Environment.path)
[docs]
max_key_size = _sync_method(Environment.max_key_size)
[docs]
max_readers = _sync_method(Environment.max_readers)
[docs]
flags = _sync_method(Environment.flags)
[docs]
stat = _async_method(Environment.stat)
[docs]
info = _async_method(Environment.info)
[docs]
close = _async_method(Environment.close)
[docs]
copy = _async_method(Environment.copy)
[docs]
copyfd = _async_method(Environment.copyfd)
[docs]
sync = _async_method(Environment.sync)
[docs]
readers = _async_method(Environment.readers)
[docs]
reader_check = _async_method(Environment.reader_check)
[docs]
set_mapsize = _async_method(Environment.set_mapsize)
[docs]
open_db = _async_method(Environment.open_db)
[docs]
dbs = _async_method(Environment.dbs)
# -- attribute fallback -----------------------------------------------
[docs]
def __getattr__(self, name):
attr = getattr(self._env, name)
if callable(attr):
raise AttributeError(name)
return attr
[docs]
class AsyncTransaction:
"""Async wrapper for :py:class:`lmdb.Transaction`.
All methods of the underlying :py:class:`~lmdb.Transaction` are available.
Most are dispatched to an executor; ``id()`` is called directly.
An :py:class:`asyncio.Lock` serializes all operations dispatched through
this transaction, including operations on its cursors. This makes
:py:func:`asyncio.gather` safe on the same transaction.
Supports ``async with`` — write transactions are committed on clean exit
and aborted on exception.
"""
[docs]
__slots__ = ('_txn', '_executor', '_lock')
def __init__(self, txn, executor=None):
[docs]
self._executor = executor
[docs]
self._lock = asyncio.Lock()
# -- context manager --------------------------------------------------
[docs]
async def __aenter__(self):
return self
[docs]
async def __aexit__(self, exc_type, _exc_val, _exc_tb):
async with self._lock:
if exc_type:
self._txn.abort()
else:
loop = asyncio.get_running_loop()
await loop.run_in_executor(self._executor, self._txn.commit)
# -- methods that return wrapped objects -------------------------------
[docs]
def cursor(self, *args, **kwargs):
"""Open a cursor, returning an :py:class:`AsyncCursor`.
Accepts the same arguments as :py:meth:`lmdb.Transaction.cursor`.
Can be used with ``await`` or ``async with``::
async with txn.cursor() as cur:
await cur.first()
items = await cur.iternext()
"""
async def _cursor():
async with self._lock:
loop = asyncio.get_running_loop()
cur = await loop.run_in_executor(
self._executor,
functools.partial(self._txn.cursor, *args, **kwargs),
)
return AsyncCursor(cur, self._executor, self._lock)
return _AsyncContextWrapper(_cursor())
# -- proxied methods --------------------------------------------------
[docs]
id = _sync_method(Transaction.id)
[docs]
stat = _async_method_locked(Transaction.stat)
[docs]
drop = _async_method_locked(Transaction.drop)
[docs]
commit = _async_method_locked(Transaction.commit)
[docs]
abort = _async_method_locked(Transaction.abort)
[docs]
get = _async_method_locked(Transaction.get)
[docs]
put = _async_method_locked(Transaction.put)
[docs]
replace = _async_method_locked(Transaction.replace)
[docs]
pop = _async_method_locked(Transaction.pop)
[docs]
delete = _async_method_locked(Transaction.delete)
# -- attribute fallback -----------------------------------------------
[docs]
def __getattr__(self, name):
attr = getattr(self._txn, name)
if callable(attr):
raise AttributeError(name)
return attr
[docs]
class AsyncCursor:
"""Async wrapper for :py:class:`lmdb.Cursor`.
All methods of the underlying :py:class:`~lmdb.Cursor` are available.
Most are dispatched to an executor; ``key()``, ``value()``, and
``item()`` are called directly.
Iterator methods (``iternext()``, ``iterprev()``, etc.) are consumed in
the executor and returned as a list.
Shares the parent transaction's :py:class:`asyncio.Lock`.
Supports ``async with`` — the cursor is closed on exit.
"""
[docs]
__slots__ = ('_cursor', '_executor', '_lock')
def __init__(self, cursor, executor=None, lock=None):
[docs]
self._executor = executor
[docs]
self._lock = lock or asyncio.Lock()
# -- context manager --------------------------------------------------
[docs]
async def __aenter__(self):
return self
[docs]
async def __aexit__(self, *_exc):
self._cursor.close()
# -- proxied methods --------------------------------------------------
[docs]
key = _sync_method(Cursor.key)
[docs]
value = _sync_method(Cursor.value)
[docs]
item = _sync_method(Cursor.item)
[docs]
close = _async_method_locked(Cursor.close)
[docs]
first = _async_method_locked(Cursor.first)
[docs]
first_dup = _async_method_locked(Cursor.first_dup)
[docs]
last = _async_method_locked(Cursor.last)
[docs]
last_dup = _async_method_locked(Cursor.last_dup)
[docs]
prev = _async_method_locked(Cursor.prev)
[docs]
prev_dup = _async_method_locked(Cursor.prev_dup)
[docs]
prev_nodup = _async_method_locked(Cursor.prev_nodup)
[docs]
next = _async_method_locked(Cursor.next)
[docs]
next_dup = _async_method_locked(Cursor.next_dup)
[docs]
next_nodup = _async_method_locked(Cursor.next_nodup)
[docs]
set_key = _async_method_locked(Cursor.set_key)
[docs]
set_key_dup = _async_method_locked(Cursor.set_key_dup)
[docs]
set_range = _async_method_locked(Cursor.set_range)
[docs]
set_range_dup = _async_method_locked(Cursor.set_range_dup)
[docs]
delete = _async_method_locked(Cursor.delete)
[docs]
count = _async_method_locked(Cursor.count)
[docs]
put = _async_method_locked(Cursor.put)
[docs]
putmulti = _async_method_locked(Cursor.putmulti)
[docs]
replace = _async_method_locked(Cursor.replace)
[docs]
pop = _async_method_locked(Cursor.pop)
[docs]
get = _async_method_locked(Cursor.get)
[docs]
getmulti = _async_method_locked(Cursor.getmulti)
[docs]
iternext = _collect_locked(Cursor.iternext)
[docs]
iternext_dup = _collect_locked(Cursor.iternext_dup)
[docs]
iternext_nodup = _collect_locked(Cursor.iternext_nodup)
[docs]
iterprev = _collect_locked(Cursor.iterprev)
[docs]
iterprev_dup = _collect_locked(Cursor.iterprev_dup)
[docs]
iterprev_nodup = _collect_locked(Cursor.iterprev_nodup)
# -- attribute fallback -----------------------------------------------
[docs]
def __getattr__(self, name):
attr = getattr(self._cursor, name)
if callable(attr):
raise AttributeError(name)
return attr