Initial import.
authorFredrik Tolf <fredrik@dolda2000.com>
Mon, 16 Mar 2015 06:10:37 +0000 (07:10 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Mon, 16 Mar 2015 06:10:37 +0000 (07:10 +0100)
.gitignore [new file with mode: 0644]
didex/__init__.py [new file with mode: 0644]
didex/db.py [new file with mode: 0644]
didex/index.py [new file with mode: 0644]
didex/lib.py [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..bee8a64
--- /dev/null
@@ -0,0 +1 @@
+__pycache__
diff --git a/didex/__init__.py b/didex/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/didex/db.py b/didex/db.py
new file mode 100644 (file)
index 0000000..224cb2f
--- /dev/null
@@ -0,0 +1,154 @@
+import time, threading, struct
+from . import lib
+from bsddb3 import db as bd
+
+deadlock = bd.DBLockDeadlockError
+
+class environment(lib.closable):
+    def __init__(self, path, *, create=True, recover=False, mode=0o666):
+        self.env = bd.DBEnv()
+        self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
+        fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
+        if recover:
+            fl |= bd.DB_RECOVER
+        if create:
+            fl |= bd.DB_CREATE
+        self.env.open(path, fl, mode)
+        self.lastckp = self.lastarch = time.time()
+        self.lock = threading.Lock()
+        self.dbs = {}
+
+    def close(self):
+        env = self.env
+        if env is None:
+            return
+        env.close()
+        self.env = None
+
+    def maint(self):
+        now = time.time()
+        try:
+            if now - self.lastckp > 60:
+                self.env.txn_checkpoint(1024)
+                self.lastckp = now
+            if now - self.lastarch > 3600:
+                self.env.log_archive(bd.DB_ARCH_REMOVE)
+                self.lastarch = now
+        except deadlock:
+            pass
+
+    def db(self, name, create=True, mode=0o666):
+        with self.lock:
+            if name not in self.dbs:
+                self.dbs[name] = database(self, name, create, mode)
+            return self.dbs[name]
+
+    def __del__(self):
+        self.close()
+    def __enter__(self):
+        return self
+    def __exit__(self, *excinfo):
+        self.close()
+        return False
+
+def opendb(env, fnm, dnm, typ, fl, mode):
+    ret = bd.DB(env)
+    while True:
+        try:
+            self.main.open(fnm, dnm, typ, fl, mode)
+        except deadlock:
+            continue
+        return ret
+
+class txn(object):
+    def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
+        self.tx = env.txn_begin(None, flags)
+        self.done = False
+
+    def commit(self):
+        self.done = True
+        self.tx.commit(0)
+
+    def abort(self):
+        self.done = True
+        self.tx.abort()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, etype, exc, tb):
+        if not self.done:
+            self.abort()
+        return False
+
+class database(object):
+    def __init__(self, env, name, create, mode):
+        self.env = env
+        self.mode = mode
+        self.fnm = name
+        fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
+        if create:
+            fl |= bd.DB_CREATE
+        self.cf = self._opendb("cf", bd.DB_HASH, fl)
+        self.ob = self._opendb("ob", bd.DB_HASH, fl)
+
+    def _opendb(self, dnm, typ, fl, init=None):
+        ret = bd.DB(self.env.env)
+        if init: init(ret)
+        while True:
+            try:
+                ret.open(self.fnm, dnm, typ, fl, self.mode)
+            except deadlock:
+                continue
+            return ret
+
+    def _nextseq(self, tx):
+        if self.cf.has_key(b"seq", txn=tx.tx):
+            seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
+        else:
+            seq = 1
+        self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
+        return seq
+
+    def add(self, ob):
+        while True:
+            try:
+                with txn(self.env.env) as tx:
+                    seq = self._nextseq(tx)
+                    self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
+                    tx.commit()
+                    return seq
+            except deadlock:
+                continue
+
+    def replace(self, id, ob):
+        while True:
+            try:
+                with txn(self.env.env) as tx:
+                    key = struct.pack(">Q", id)
+                    if not self.ob.has_key(key, txn=tx.tx):
+                        raise KeyError(id)
+                    self.ob.put(key, ob, txn=tx.tx)
+                    tx.commit()
+                    return
+            except deadlock:
+                continue
+
+    def get(self, id):
+        while True:
+            try:
+                return self.ob[struct.pack(">Q", id)]
+            except KeyError:
+                raise KeyError(id) from None
+            except deadlock:
+                continue
+
+    def remove(self, id):
+        while True:
+            try:
+                with txn(self.env.env) as tx:
+                    self.ob.delete(struct.pack(">Q", id), txn=tx.tx)
+                    tx.commit()
+                    return
+            except deadlock:
+                continue
diff --git a/didex/index.py b/didex/index.py
new file mode 100644 (file)
index 0000000..5d206ff
--- /dev/null
@@ -0,0 +1,215 @@
+import struct, contextlib
+from . import db, lib
+from .db import bd
+
+deadlock = bd.DBLockDeadlockError
+notfound = bd.DBNotFoundError
+
+class simpletype(object):
+    def __init__(self, encode, decode):
+        self.enc = encode
+        self.dec = decode
+
+    def encode(self, ob):
+        return self.enc(ob)
+    def decode(self, dat):
+        return self.dec(dat)
+    def compare(self, a, b):
+        if a < b:
+            return -1
+        elif a > b:
+            return 1
+        else:
+            return 0
+
+    @classmethod
+    def struct(cls, fmt):
+        return cls(lambda ob: struct.pack(fmt, ob),
+                   lambda dat: struct.unpack(fmt, dat)[0])
+
+class maybe(object):
+    def __init__(self, bk):
+        self.bk = bk
+
+    def encode(self, ob):
+        if ob is None: return b""
+        return b"\0" + self.bk.encode(ob)
+    def decode(self, dat):
+        if dat == b"": return None
+        return self.bk.dec(dat[1:])
+    def compare(self, a, b):
+        if a is b is None:
+            return 0
+        elif a is None:
+            return -1
+        elif b is None:
+            return 1
+        else:
+            return self.bk.compare(a[1:], b[1:])
+
+t_int = simpletype.struct(">Q")
+
+class index(object):
+    def __init__(self, db, name, datatype):
+        self.db = db
+        self.nm = name
+        self.typ = datatype
+
+missing = object()
+
+class ordered(index, lib.closable):
+    def __init__(self, db, name, datatype, duplicates, create=True):
+        super().__init__(db, name, datatype)
+        self.dup = duplicates
+        fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
+        if create: fl |= bd.DB_CREATE
+        def initdb(db):
+            def compare(a, b):
+                if a == b == "": return 0
+                return self.typ.compare(self.typ.decode(a), self.typ.decode(b))
+            db.set_flags(bd.DB_DUPSORT)
+            db.set_bt_compare(compare)
+        self.bk = db._opendb("i-" + name, bd.DB_BTREE, fl, initdb)
+        self.bk.set_get_returns_none(False)
+
+    def close(self):
+        self.bk.close()
+
+    class cursor(lib.closable):
+        def __init__(self, idx, cur, item, stop):
+            self.idx = idx
+            self.cur = cur
+            self.item = item
+            self.stop = stop
+
+        def close(self):
+            if self.cur is not None:
+                self.cur.close()
+
+        def __iter__(self):
+            return self
+
+        def peek(self):
+            if self.item is None:
+                raise StopIteration()
+            rk, rv = self.item
+            rk = self.idx.typ.decode(rk)
+            rv = struct.unpack(">Q", rv)[0]
+            if self.stop(rk):
+                self.item = None
+                raise StopIteration()
+            return rk, rv
+
+        def __next__(self):
+            rk, rv = self.peek()
+            try:
+                while True:
+                    try:
+                        self.item = self.cur.next()
+                        break
+                    except deadlock:
+                        continue
+            except notfound:
+                self.item = None
+            return rk, rv
+
+        def skip(self, n=1):
+            try:
+                for i in range(n):
+                    next(self)
+            except StopIteration:
+                return
+
+    def get(self, *, match=missing, ge=missing, gt=missing, lt=missing, le=missing, all=False):
+        while True:
+            try:
+                cur = self.bk.cursor()
+                done = False
+                try:
+                    if match is not missing:
+                        try:
+                            k, v = cur.set(self.typ.encode(match))
+                        except notfound:
+                            return self.cursor(None, None, None, None)
+                        else:
+                            done = True
+                            return self.cursor(self, cur, (k, v), lambda o: (self.typ.compare(o, match) != 0))
+                    elif all:
+                        try:
+                            k, v = cur.first()
+                        except notfound:
+                            return self.cursor(None, None, None, None)
+                        else:
+                            done = True
+                            return self.cursor(self, cur, (k, v), lambda o: False)
+                    elif ge is not missing or gt is not missing or lt is not missing or le is not missing:
+                        skip = False
+                        try:
+                            if ge is not missing:
+                                k, v = cur.set_range(self.typ.encode(ge))
+                            elif gt is not missing:
+                                k, v = cur.set_range(self.typ.encode(gt))
+                                skip = True
+                            else:
+                                k, v = cur.first()
+                        except notfound:
+                            return self.cursor(None, None, None, None)
+                        if lt is not missing:
+                            stop = lambda o: self.typ.compare(o, lt) >= 0
+                        elif le is not missing:
+                            stop = lambda o: self.typ.compare(o, le) > 0
+                        else:
+                            stop = lambda o: False
+                        ret = self.cursor(self, cur, (k, v), stop)
+                        if skip:
+                            try:
+                                while self.typ.compare(ret.peek()[0], gt) == 0:
+                                    next(ret)
+                            except StopIteration:
+                                pass
+                        done = True
+                        return ret
+                    else:
+                        raise NameError("invalid get() specification")
+                finally:
+                    if not done:
+                        cur.close()
+            except deadlock:
+                continue
+
+    def put(self, key, id):
+        while True:
+            try:
+                with db.txn(self.db.env.env) as tx:
+                    obid = struct.pack(">Q", id)
+                    if not self.db.ob.has_key(obid, txn=tx.tx):
+                        raise ValueError("no such object in database: " + str(id))
+                    try:
+                        self.bk.put(self.typ.encode(key), obid, txn=tx.tx, flags=bd.DB_NODUPDATA)
+                    except bd.DBKeyExistError:
+                        return False
+                    tx.commit()
+                    return True
+            except deadlock:
+                continue
+
+    def remove(self, key, id):
+        while True:
+            try:
+                with db.txn(self.db.env.env) as tx:
+                    obid = struct.pack(">Q", id)
+                    if not self.db.ob.has_key(obid, txn=tx.tx):
+                        raise ValueError("no such object in database: " + str(id))
+                    cur = self.bk.cursor(txn=tx.tx)
+                    try:
+                        try:
+                            cur.get_both(self.typ.encode(key), obid)
+                        except notfound:
+                            return False
+                        cur.delete()
+                    finally:
+                        cur.close()
+                    tx.commit()
+                    return True
+            except deadlock:
+                continue
diff --git a/didex/lib.py b/didex/lib.py
new file mode 100644 (file)
index 0000000..2c73a39
--- /dev/null
@@ -0,0 +1,6 @@
+class closable(object):
+    def __enter__(self):
+        return self
+    def __exit__(self, *excinfo):
+        self.close()
+        return False