#
# Copyright 2013 The py-lmdb authors, all rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted only as authorized by the OpenLDAP
# Public License.
#
# A copy of this license is available in the file LICENSE in the
# top-level directory of the distribution or, alternatively, at
# <http://www.OpenLDAP.org/license.html>.
#
# OpenLDAP is a registered trademark of the OpenLDAP Foundation.
#
# Individual files and/or contributed packages may be copyright by
# other parties and/or subject to additional restrictions.
#
# This work also contains materials derived from public sources.
#
# Additional information about OpenLDAP can be obtained at
# <http://www.openldap.org/>.
#
"""
Basic tools for working with LMDB.
copy: Consistent high speed backup an environment.
%prog copy -e source.lmdb target.lmdb
copyfd: Consistent high speed backup an environment to stdout.
%prog copyfd -e source.lmdb > target.lmdb/data.mdb
drop: Delete one or more sub-databases.
%prog drop db1
dump: Dump one or more databases to disk in 'cdbmake' format.
Usage: dump [db1=file1.cdbmake db2=file2.cdbmake]
If no databases are given, dumps the main database to 'main.cdbmake'.
edit: Add/delete/replace values from a database.
%prog edit --set key=value --set-file key=/path \\
--add key=value --add-file key=/path/to/file \\
--delete key
get: Read one or more values from a database.
%prog get [<key1> [<keyN> [..]]]
readers: Display readers in the lock table
%prog readers -e /path/to/db [-c]
If -c is specified, clear stale readers.
restore: Read one or more database from disk in 'cdbmake' format.
%prog restore db1=file1.cdbmake db2=file2.cdbmake
The special db name ":main:" may be used to indicate the main DB.
rewrite: Re-create an environment using MDB_APPEND
%prog rewrite -e src.lmdb -E dst.lmdb [<db1> [<dbN> ..]]
If no databases are given, rewrites only the main database.
shell: Open interactive console with ENV set to the open environment.
stat: Print environment statistics.
warm: Read environment into page cache sequentially.
watch: Show live environment statistics
"""
from __future__ import absolute_import
from __future__ import with_statement
import array
import collections
import csv
import functools
import optparse
import os
import pprint
import signal
import string
import struct
import sys
import time
# Python3.x bikeshedded trechery.
try:
from io import BytesIO as StringIO
except ImportError:
try:
from cStringIO import StringIO # type: ignore
except ImportError:
from StringIO import StringIO # type: ignore
import zlmdb._lmdb_vendor as lmdb
# How strings get encoded to and decoded from DB
[docs]
def _to_bytes(s):
"""Given either a Python 2.x or 3.x str, return either a str (Python 2.x)
or a bytes instance (Python 3.x)."""
return globals().get("unicode", str)(s).encode(ENCODING)
[docs]
def isprint(c):
"""Return ``True`` if the character `c` can be printed visibly and without
adversely affecting printing position (e.g. newline)."""
return c in string.printable and ord(c) > 16
[docs]
def xxd(s):
"""Return a vaguely /usr/bin/xxd formatted representation of the bytestring
`s`."""
sio = StringIO()
pr = _to_bytes("")
for idx, ch in enumerate(s):
if sys.version_info[0] >= 3:
ch = chr(ch)
if not (idx % 16):
if idx:
sio.write(_to_bytes(" "))
sio.write(pr)
sio.write(_to_bytes("\n"))
sio.write(_to_bytes("%07x:" % idx))
pr = _to_bytes("")
if not (idx % 2):
sio.write(_to_bytes(" "))
sio.write(_to_bytes("%02x" % (ord(ch),)))
pr += _to_bytes(ch) if isprint(ch) else _to_bytes(".")
if idx % 16:
need = 15 - (idx % 16)
# fill remainder of last line.
sio.write(_to_bytes(" ") * need)
sio.write(_to_bytes(" ") * (need // 2))
sio.write(_to_bytes(" "))
sio.write(pr)
sio.write(_to_bytes("\n"))
return sio.getvalue().decode(ENCODING)
[docs]
def make_parser():
parser = optparse.OptionParser()
parser.prog = "python -mlmdb"
parser.usage = "%prog [options] <command>\n" + __doc__.rstrip()
parser.add_option("-e", "--env", help="Environment file to open")
parser.add_option("-d", "--db", help="Database to open (default: main)")
parser.add_option("-r", "--read", help="Open environment read-only")
parser.add_option(
"-S",
"--map_size",
type="int",
default="10",
help="Map size in megabytes (default: 10)",
)
parser.add_option(
"-s",
"--use-single-file",
action="store_true",
help="The database was created as a single file and not a subdirectory",
)
# FIXME: implement --all
# parser.add_option('-a', '--all', action='store_true',
# help='Make "dump" dump all databases')
parser.add_option("-E", "--target_env", help='Target environment file for "dumpfd"')
parser.add_option(
"-x", "--xxd", action="store_true", help="Print values in xxd format"
)
parser.add_option(
"-M",
"--max-dbs",
type="int",
default=128,
help="Maximum open DBs (default: 128)",
)
parser.add_option(
"--out-fd", type="int", default=1, help='"copyfd" command target fd'
)
group = parser.add_option_group('Options for "copy" command')
group.add_option(
"--compact",
action="store_true",
default=False,
help="Perform compaction while copying.",
)
group = parser.add_option_group('Options for "edit" command')
group.add_option("--set", action="append", help="List of key=value pairs to set.")
group.add_option(
"--set-file", action="append", help="List of key pairs to read from files."
)
group.add_option("--add", action="append", help="List of key=value pairs to add.")
group.add_option(
"--add-file", action="append", help="List of key pairs to read from files."
)
group.add_option(
"--delete", action="append", help="List of key=value pairs to delete."
)
group = parser.add_option_group('Options for "readers" command')
group.add_option(
"-c", "--clean", action="store_true", help="Clean stale readers? (default: no)"
)
group = parser.add_option_group('Options for "watch" command')
group.add_option(
"--csv", action="store_true", help="Generate CSV instead of terminal output."
)
group.add_option(
"--interval", type="int", default=1, help="Interval size (default: 1sec)"
)
group.add_option(
"--window", type="int", default=10, help="Average window size (default: 10)"
)
return parser
[docs]
def die(fmt, *args):
if args:
fmt %= args
sys.stderr.write("lmdb.tool: %s\n" % (fmt,))
raise SystemExit(1)
[docs]
def dump_cursor_to_fp(cursor, fp):
for key, value in cursor:
fp.write(_to_bytes("+%d,%d:" % (len(key), len(value))))
fp.write(key)
fp.write(_to_bytes("->"))
fp.write(value)
fp.write(_to_bytes("\n"))
fp.write(_to_bytes("\n"))
[docs]
def db_map_from_args(args):
db_map = {}
for arg in args:
dbname, sep, path = arg.partition("=")
if not sep:
die('DB specification missing "=": %r', arg)
if dbname == ":main:":
dbname = None
if dbname in db_map:
die("DB specified twice: %r", arg)
db_map[dbname] = (ENV.open_db(_to_bytes(dbname) if dbname else None), path)
if not db_map:
db_map[":main:"] = (ENV.open_db(None), "main.cdbmake")
return db_map
[docs]
def cmd_copy(opts, args):
if len(args) != 1:
die("Please specify output directory (see --help)")
output_dir = args[0]
if os.path.exists(output_dir):
die("Output directory %r already exists.", output_dir)
os.makedirs(output_dir, int("0755", 8))
print("Running copy to %r...." % (output_dir,))
ENV.copy(output_dir, compact=opts.compact)
[docs]
def cmd_copyfd(opts, args):
if args:
die('"copyfd" command takes no arguments (see --help)')
try:
os.fdopen(opts.out_fd, "w", 0)
except OSError:
e = sys.exc_info()[1]
die("Bad --out-fd %d: %s", opts.out_fd, e)
ENV.copyfd(opts.out_fd)
[docs]
def cmd_dump(opts, args):
db_map = db_map_from_args(args)
with ENV.begin(buffers=True) as txn:
for dbname, (db, path) in db_map.items():
with open(path, "wb", BUF_SIZE) as fp:
print("Dumping to %r..." % (path,))
cursor = txn.cursor(db=db)
dump_cursor_to_fp(cursor, fp)
[docs]
def restore_cursor_from_fp(txn, fp, db):
read = fp.read
read1 = functools.partial(read, 1)
read_until = lambda sep: b"".join(iter(read1, sep)) # NOQA: E731
rec_nr = 0
while True:
rec_nr += 1
plus = read(1)
if plus == b"\n":
break
elif plus != b"+":
die("bad or missing plus, line/record #%d", rec_nr)
try:
klen = int(read_until(b","), 10)
dlen = int(read_until(b":"), 10)
except ValueError:
die("bad or missing length, line/record #%d", rec_nr)
key = read(klen)
if read(2) != b"->":
die("bad or missing separator, line/record #%d", rec_nr)
data = read(dlen)
if (len(key) + len(data)) != (klen + dlen):
die("short key or data, line/record #%d", rec_nr)
if read(1) != b"\n":
die("bad line ending, line/record #%d", rec_nr)
txn.put(key, data, db=db)
return rec_nr
[docs]
def cmd_drop(opts, args):
if not args:
die("Must specify at least one sub-database (see --help)")
dbs = map(ENV.open_db, (map(_to_bytes, args)))
for idx, db in enumerate(dbs):
name = args[idx]
if name == ":main:":
die("Cannot drop main DB")
print("Dropping DB %r..." % (name,))
with ENV.begin(write=True) as txn:
txn.drop(db)
[docs]
def cmd_readers(opts, args):
if opts.clean:
print("Cleaned %d stale entries." % (ENV.reader_check(),))
print(ENV.readers())
[docs]
def cmd_restore(opts, args):
db_map = db_map_from_args(args)
with ENV.begin(buffers=True, write=True) as txn:
for dbname, (db, path) in db_map.items():
with open(path, "rb", BUF_SIZE) as fp:
print("Restoring from %r..." % (path,))
count = restore_cursor_from_fp(txn, fp, db)
print("Loaded %d keys from %r" % (count, path))
[docs]
def delta(hst):
return [(hst[i] - hst[i - 1]) for i in range(1, len(hst))]
[docs]
SYS_BLOCK = "/sys/block"
[docs]
def _find_diskstat(path):
if not os.path.exists(SYS_BLOCK):
return
st = os.stat(path)
devs = "%s:%s" % (st.st_dev >> 8, st.st_dev & 0xFF)
def maybe(rootpath):
dpath = os.path.join(rootpath, "dev")
if os.path.exists(dpath):
with open(dpath) as fp:
if fp.read().strip() == devs:
return os.path.join(rootpath, "stat")
for name in os.listdir(SYS_BLOCK):
basepath = os.path.join(SYS_BLOCK, name)
statpath = maybe(basepath)
if statpath:
return statpath
for name in os.listdir(basepath):
base2path = os.path.join(basepath, name)
statpath = maybe(base2path)
if statpath:
return statpath
[docs]
class DiskStatter(object):
[docs]
FIELDS = (
"reads",
"reads_merged",
"sectors_read",
"read_ms",
"writes",
"writes_merged",
"sectors_written",
"write_ms",
"io_count",
"io_ms",
"total_ms",
)
def __init__(self, path):
self.refresh()
[docs]
def refresh(self):
self.fp.seek(0)
vars(self).update(
(self.FIELDS[i], int(s)) for i, s in enumerate(self.fp.read().split())
)
[docs]
def cmd_watch(opts, args):
info = None
stat = None
def window(func):
history = collections.deque()
def windowfunc():
history.append(func())
if len(history) > opts.window:
history.popleft()
if len(history) <= 1:
return 0
n = sum(delta(history)) / float(len(history) - 1)
return n / opts.interval
return windowfunc
envmb = lambda: (info["last_pgno"] * stat["psize"]) / 1048576.0 # NOQA
cols = [
("%d", "Depth", lambda: stat["depth"]),
("%d", "Branch", lambda: stat["branch_pages"]),
("%d", "Leaf", lambda: stat["leaf_pages"]),
("%+d", "Leaf/s", window(lambda: stat["leaf_pages"])),
("%d", "Oflow", lambda: stat["overflow_pages"]),
("%+d", "Oflow/s", window(lambda: stat["overflow_pages"])),
("%d", "Recs", lambda: stat["entries"]),
("%+d", "Recs/s", window(lambda: stat["entries"])),
("%d", "Rdrs", lambda: info["num_readers"]),
("%.2f", "EnvMb", envmb),
("%+.2f", "EnvMb/s", window(envmb)),
("%d", "Txs", lambda: info["last_txnid"]),
("%+.2f", "Txs/s", window(lambda: info["last_txnid"])),
]
statter = None
statpath = _find_diskstat(ENV.path())
if statpath:
statter = DiskStatter(statpath)
cols += [
("%+d", "SctRd/s", window(lambda: statter.sectors_read)),
("%+d", "SctWr/s", window(lambda: statter.sectors_written)),
]
term_width = 0
widths = [len(head) for _, head, _ in cols]
if opts.csv:
writer = csv.writer(sys.stdout, quoting=csv.QUOTE_ALL)
writer.writerow([head for _, head, _ in cols])
cnt = 0
try:
while True:
stat = ENV.stat()
info = ENV.info()
if statter:
statter.refresh()
vals = []
for i, (fmt, head, func) in enumerate(cols):
val = fmt % func()
vals.append(val)
widths[i] = max(widths[i], len(val))
if opts.csv:
writer.writerow(vals)
else:
if term_width != _TERM_WIDTH or not (cnt % (_TERM_HEIGHT - 2)):
for i, (fmt, head, func) in enumerate(cols):
sys.stdout.write(head.rjust(widths[i] + 1))
sys.stdout.write("\n")
term_width = _TERM_WIDTH
for i, val in enumerate(vals):
sys.stdout.write(val.rjust(widths[i] + 1))
sys.stdout.write("\n")
time.sleep(opts.interval)
cnt += 1
except KeyboardInterrupt:
pass
[docs]
def cmd_warm(opts, args):
stat = ENV.stat()
info = ENV.info()
bufsize = 32768
last_offset = stat["psize"] * info["last_pgno"]
buf = array.array("B", _to_bytes("\x00" * bufsize))
t0 = time.time()
if opts.use_single_file:
fp = open(opts.env, "rb", bufsize)
else:
fp = open(opts.env + "/data.mdb", "rb", bufsize)
while fp.tell() < last_offset:
fp.readinto(buf)
print(
"Warmed %.2fmb in %dms" % (last_offset / 1048576.0, 1000 * (time.time() - t0))
)
[docs]
def cmd_rewrite(opts, args):
if not opts.target_env:
die("Must specify target environment path with -E")
src_info = ENV.info()
target_env = lmdb.open(
opts.target_env,
map_size=src_info["map_size"] * 2,
max_dbs=opts.max_dbs,
sync=False,
writemap=True,
map_async=True,
metasync=False,
)
dbs = []
for arg in args:
name = None if arg == ":main:" else arg
src_db = ENV.open_db(_to_bytes(name))
dst_db = target_env.open_db(_to_bytes(name))
dbs.append((arg, src_db, dst_db))
if not dbs:
dbs.append((":main:", ENV.open_db(None), target_env.open_db(None)))
for name, src_db, dst_db in dbs:
print("Writing %r..." % (name,))
with target_env.begin(db=dst_db, write=True) as wtxn:
with ENV.begin(db=src_db, buffers=True) as rtxn:
for key, value in rtxn.cursor():
wtxn.put(key, value, append=True)
print("Syncing..")
target_env.sync(True)
[docs]
def cmd_get(opts, args):
print_header = len(args) > 1
with ENV.begin(buffers=True, db=DB) as txn:
for arg in args:
value = txn.get(_to_bytes(arg))
if value is None:
print("%r: missing" % (arg,))
continue
if print_header:
print("%r:" % (arg,))
if opts.xxd:
print(xxd(value))
else:
print(bytes(value))
[docs]
def cmd_edit(opts, args):
if args:
die("Edit command only takes options, not arguments (see --help)")
with ENV.begin(write=True) as txn:
cursor = txn.cursor(db=DB)
for elem in opts.add or []:
key, _, value = _to_bytes(elem).partition(_to_bytes("="))
cursor.put(key, value, overwrite=False)
for elem in opts.set or []:
key, _, value = _to_bytes(elem).partition(_to_bytes("="))
cursor.put(key, value)
for key in opts.delete or []:
txn.delete(_to_bytes(key), db=DB)
for elem in opts.add_file or []:
key, _, path = _to_bytes(elem).partition(_to_bytes("="))
with open(path, "rb") as fp:
cursor.put(key, fp.read(), overwrite=False)
for elem in opts.set_file or []:
key, _, path = _to_bytes(elem).partition(_to_bytes("="))
with open(path, "rb") as fp:
cursor.put(key, fp.read())
[docs]
def cmd_shell(opts, args):
import code
import readline # NOQA
code.InteractiveConsole(globals()).interact()
[docs]
def cmd_stat(opts, args):
pprint.pprint(ENV.stat())
pprint.pprint(ENV.info())
[docs]
def _get_term_width(default=(80, 25)):
try:
import fcntl # No fcntl on win32
import termios # No termios on win32
s = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, "1234")
height, width = struct.unpack("hh", s)
return width, height
except Exception:
return default
[docs]
def _on_sigwinch(*args):
global _TERM_WIDTH, _TERM_HEIGHT
_TERM_WIDTH, _TERM_HEIGHT = _get_term_width()
[docs]
def main(argv=None):
parser = make_parser()
opts, args = parser.parse_args(argv)
if not args:
die("Please specify a command (see --help)")
if not opts.env:
die("Please specify environment (--env)")
global ENV
ENV = lmdb.open(
opts.env,
map_size=opts.map_size * 1048576,
subdir=not opts.use_single_file,
max_dbs=opts.max_dbs,
create=False,
readonly=opts.read == "READ",
)
if opts.db:
global DB
DB = ENV.open_db(_to_bytes(opts.db))
if hasattr(signal, "SIGWINCH"): # Disable on win32.
signal.signal(signal.SIGWINCH, _on_sigwinch)
_on_sigwinch()
func = globals().get("cmd_" + args[0])
if not func:
die("No such command: %r" % (args[0],))
func(opts, args[1:])
if __name__ == "__main__":
main(sys.argv[1:])