From a95055e8b37a82bf0e4ffca5aba29b3139c9d082 Mon Sep 17 00:00:00 2001 From: Fredrik Tolf Date: Mon, 16 Mar 2015 07:10:37 +0100 Subject: [PATCH] Initial import. --- .gitignore | 1 + didex/__init__.py | 0 didex/db.py | 154 ++++++++++++++++++++++++++++++++++++++ didex/index.py | 215 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ didex/lib.py | 6 ++ 5 files changed, 376 insertions(+) create mode 100644 .gitignore create mode 100644 didex/__init__.py create mode 100644 didex/db.py create mode 100644 didex/index.py create mode 100644 didex/lib.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/didex/__init__.py b/didex/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/didex/db.py b/didex/db.py new file mode 100644 index 0000000..224cb2f --- /dev/null +++ b/didex/db.py @@ -0,0 +1,154 @@ +import time, threading, struct +from . import lib +from bsddb3 import db as bd + +deadlock = bd.DBLockDeadlockError + +class environment(lib.closable): + def __init__(self, path, *, create=True, recover=False, mode=0o666): + self.env = bd.DBEnv() + self.env.set_lk_detect(bd.DB_LOCK_RANDOM) + fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN + if recover: + fl |= bd.DB_RECOVER + if create: + fl |= bd.DB_CREATE + self.env.open(path, fl, mode) + self.lastckp = self.lastarch = time.time() + self.lock = threading.Lock() + self.dbs = {} + + def close(self): + env = self.env + if env is None: + return + env.close() + self.env = None + + def maint(self): + now = time.time() + try: + if now - self.lastckp > 60: + self.env.txn_checkpoint(1024) + self.lastckp = now + if now - self.lastarch > 3600: + self.env.log_archive(bd.DB_ARCH_REMOVE) + self.lastarch = now + except deadlock: + pass + + def db(self, name, create=True, mode=0o666): + with self.lock: + if name not in self.dbs: + self.dbs[name] = database(self, name, create, mode) + return self.dbs[name] + + def __del__(self): + self.close() + def __enter__(self): + return self + def __exit__(self, *excinfo): + self.close() + return False + +def opendb(env, fnm, dnm, typ, fl, mode): + ret = bd.DB(env) + while True: + try: + self.main.open(fnm, dnm, typ, fl, mode) + except deadlock: + continue + return ret + +class txn(object): + def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC): + self.tx = env.txn_begin(None, flags) + self.done = False + + def commit(self): + self.done = True + self.tx.commit(0) + + def abort(self): + self.done = True + self.tx.abort() + + def __enter__(self): + return self + + def __exit__(self, etype, exc, tb): + if not self.done: + self.abort() + return False + +class database(object): + def __init__(self, env, name, create, mode): + self.env = env + self.mode = mode + self.fnm = name + fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT + if create: + fl |= bd.DB_CREATE + self.cf = self._opendb("cf", bd.DB_HASH, fl) + self.ob = self._opendb("ob", bd.DB_HASH, fl) + + def _opendb(self, dnm, typ, fl, init=None): + ret = bd.DB(self.env.env) + if init: init(ret) + while True: + try: + ret.open(self.fnm, dnm, typ, fl, self.mode) + except deadlock: + continue + return ret + + def _nextseq(self, tx): + if self.cf.has_key(b"seq", txn=tx.tx): + seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0] + else: + seq = 1 + self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx) + return seq + + def add(self, ob): + while True: + try: + with txn(self.env.env) as tx: + seq = self._nextseq(tx) + self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE) + tx.commit() + return seq + except deadlock: + continue + + def replace(self, id, ob): + while True: + try: + with txn(self.env.env) as tx: + key = struct.pack(">Q", id) + if not self.ob.has_key(key, txn=tx.tx): + raise KeyError(id) + self.ob.put(key, ob, txn=tx.tx) + tx.commit() + return + except deadlock: + continue + + def get(self, id): + while True: + try: + return self.ob[struct.pack(">Q", id)] + except KeyError: + raise KeyError(id) from None + except deadlock: + continue + + def remove(self, id): + while True: + try: + with txn(self.env.env) as tx: + self.ob.delete(struct.pack(">Q", id), txn=tx.tx) + tx.commit() + return + except deadlock: + continue diff --git a/didex/index.py b/didex/index.py new file mode 100644 index 0000000..5d206ff --- /dev/null +++ b/didex/index.py @@ -0,0 +1,215 @@ +import struct, contextlib +from . import db, lib +from .db import bd + +deadlock = bd.DBLockDeadlockError +notfound = bd.DBNotFoundError + +class simpletype(object): + def __init__(self, encode, decode): + self.enc = encode + self.dec = decode + + def encode(self, ob): + return self.enc(ob) + def decode(self, dat): + return self.dec(dat) + def compare(self, a, b): + if a < b: + return -1 + elif a > b: + return 1 + else: + return 0 + + @classmethod + def struct(cls, fmt): + return cls(lambda ob: struct.pack(fmt, ob), + lambda dat: struct.unpack(fmt, dat)[0]) + +class maybe(object): + def __init__(self, bk): + self.bk = bk + + def encode(self, ob): + if ob is None: return b"" + return b"\0" + self.bk.encode(ob) + def decode(self, dat): + if dat == b"": return None + return self.bk.dec(dat[1:]) + def compare(self, a, b): + if a is b is None: + return 0 + elif a is None: + return -1 + elif b is None: + return 1 + else: + return self.bk.compare(a[1:], b[1:]) + +t_int = simpletype.struct(">Q") + +class index(object): + def __init__(self, db, name, datatype): + self.db = db + self.nm = name + self.typ = datatype + +missing = object() + +class ordered(index, lib.closable): + def __init__(self, db, name, datatype, duplicates, create=True): + super().__init__(db, name, datatype) + self.dup = duplicates + fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT + if create: fl |= bd.DB_CREATE + def initdb(db): + def compare(a, b): + if a == b == "": return 0 + return self.typ.compare(self.typ.decode(a), self.typ.decode(b)) + db.set_flags(bd.DB_DUPSORT) + db.set_bt_compare(compare) + self.bk = db._opendb("i-" + name, bd.DB_BTREE, fl, initdb) + self.bk.set_get_returns_none(False) + + def close(self): + self.bk.close() + + class cursor(lib.closable): + def __init__(self, idx, cur, item, stop): + self.idx = idx + self.cur = cur + self.item = item + self.stop = stop + + def close(self): + if self.cur is not None: + self.cur.close() + + def __iter__(self): + return self + + def peek(self): + if self.item is None: + raise StopIteration() + rk, rv = self.item + rk = self.idx.typ.decode(rk) + rv = struct.unpack(">Q", rv)[0] + if self.stop(rk): + self.item = None + raise StopIteration() + return rk, rv + + def __next__(self): + rk, rv = self.peek() + try: + while True: + try: + self.item = self.cur.next() + break + except deadlock: + continue + except notfound: + self.item = None + return rk, rv + + def skip(self, n=1): + try: + for i in range(n): + next(self) + except StopIteration: + return + + def get(self, *, match=missing, ge=missing, gt=missing, lt=missing, le=missing, all=False): + while True: + try: + cur = self.bk.cursor() + done = False + try: + if match is not missing: + try: + k, v = cur.set(self.typ.encode(match)) + except notfound: + return self.cursor(None, None, None, None) + else: + done = True + return self.cursor(self, cur, (k, v), lambda o: (self.typ.compare(o, match) != 0)) + elif all: + try: + k, v = cur.first() + except notfound: + return self.cursor(None, None, None, None) + else: + done = True + return self.cursor(self, cur, (k, v), lambda o: False) + elif ge is not missing or gt is not missing or lt is not missing or le is not missing: + skip = False + try: + if ge is not missing: + k, v = cur.set_range(self.typ.encode(ge)) + elif gt is not missing: + k, v = cur.set_range(self.typ.encode(gt)) + skip = True + else: + k, v = cur.first() + except notfound: + return self.cursor(None, None, None, None) + if lt is not missing: + stop = lambda o: self.typ.compare(o, lt) >= 0 + elif le is not missing: + stop = lambda o: self.typ.compare(o, le) > 0 + else: + stop = lambda o: False + ret = self.cursor(self, cur, (k, v), stop) + if skip: + try: + while self.typ.compare(ret.peek()[0], gt) == 0: + next(ret) + except StopIteration: + pass + done = True + return ret + else: + raise NameError("invalid get() specification") + finally: + if not done: + cur.close() + except deadlock: + continue + + def put(self, key, id): + while True: + try: + with db.txn(self.db.env.env) as tx: + obid = struct.pack(">Q", id) + if not self.db.ob.has_key(obid, txn=tx.tx): + raise ValueError("no such object in database: " + str(id)) + try: + self.bk.put(self.typ.encode(key), obid, txn=tx.tx, flags=bd.DB_NODUPDATA) + except bd.DBKeyExistError: + return False + tx.commit() + return True + except deadlock: + continue + + def remove(self, key, id): + while True: + try: + with db.txn(self.db.env.env) as tx: + obid = struct.pack(">Q", id) + if not self.db.ob.has_key(obid, txn=tx.tx): + raise ValueError("no such object in database: " + str(id)) + cur = self.bk.cursor(txn=tx.tx) + try: + try: + cur.get_both(self.typ.encode(key), obid) + except notfound: + return False + cur.delete() + finally: + cur.close() + tx.commit() + return True + except deadlock: + continue diff --git a/didex/lib.py b/didex/lib.py new file mode 100644 index 0000000..2c73a39 --- /dev/null +++ b/didex/lib.py @@ -0,0 +1,6 @@ +class closable(object): + def __enter__(self): + return self + def __exit__(self, *excinfo): + self.close() + return False -- 2.11.0