Ensure that environment maintanence runs regularly.
[didex.git] / didex / db.py
index 224cb2f..068d404 100644 (file)
@@ -2,6 +2,8 @@ import time, threading, struct
 from . import lib
 from bsddb3 import db as bd
 
+__all__ = ["environment", "database"]
+
 deadlock = bd.DBLockDeadlockError
 
 class environment(lib.closable):
@@ -62,12 +64,22 @@ def opendb(env, fnm, dnm, typ, fl, mode):
 
 class txn(object):
     def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
-        self.tx = env.txn_begin(None, flags)
+        self.tx = env.env.txn_begin(None, flags)
+        self.env = env
         self.done = False
+        self.pcommit = set()
 
     def commit(self):
         self.done = True
         self.tx.commit(0)
+        def run1(list):
+            if len(list) > 0:
+                try:
+                    list[0]()
+                finally:
+                    run1(list[1:])
+        run1(list(self.pcommit))
+        self.env.maint()
 
     def abort(self):
         self.done = True
@@ -81,28 +93,55 @@ class txn(object):
             self.abort()
         return False
 
+    def postcommit(self, fun):
+        self.pcommit.add(fun)
+
+def dloopfun(fun):
+    def wrapper(self, *args, **kwargs):
+        while True:
+            try:
+                return fun(self, *args, **kwargs)
+            except deadlock:
+                continue
+    return wrapper
+
+def txnfun(envfun):
+    def fxf(fun):
+        def wrapper(self, *args, tx=None, **kwargs):
+            if tx is None:
+                while True:
+                    try:
+                        with txn(envfun(self)) as ltx:
+                            ret = fun(self, *args, tx=ltx, **kwargs)
+                            ltx.commit()
+                            return ret
+                    except deadlock:
+                        continue
+            else:
+                return fun(self, *args, tx=tx, **kwargs)
+        return wrapper
+    return fxf
+
 class database(object):
     def __init__(self, env, name, create, mode):
         self.env = env
         self.mode = mode
         self.fnm = name
-        fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
+        fl = bd.DB_THREAD
         if create:
             fl |= bd.DB_CREATE
         self.cf = self._opendb("cf", bd.DB_HASH, fl)
         self.ob = self._opendb("ob", bd.DB_HASH, fl)
 
-    def _opendb(self, dnm, typ, fl, init=None):
+    @txnfun(lambda self: self.env)
+    def _opendb(self, dnm, typ, fl, init=None, *, tx):
         ret = bd.DB(self.env.env)
         if init: init(ret)
-        while True:
-            try:
-                ret.open(self.fnm, dnm, typ, fl, self.mode)
-            except deadlock:
-                continue
-            return ret
+        ret.open(self.fnm, dnm, typ, fl, self.mode, txn=tx.tx)
+        return ret
 
-    def _nextseq(self, tx):
+    @txnfun(lambda self: self.env)
+    def _nextseq(self, *, tx):
         if self.cf.has_key(b"seq", txn=tx.tx):
             seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
         else:
@@ -110,45 +149,29 @@ class database(object):
         self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
         return seq
 
-    def add(self, ob):
-        while True:
-            try:
-                with txn(self.env.env) as tx:
-                    seq = self._nextseq(tx)
-                    self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
-                    tx.commit()
-                    return seq
-            except deadlock:
-                continue
-
-    def replace(self, id, ob):
-        while True:
-            try:
-                with txn(self.env.env) as tx:
-                    key = struct.pack(">Q", id)
-                    if not self.ob.has_key(key, txn=tx.tx):
-                        raise KeyError(id)
-                    self.ob.put(key, ob, txn=tx.tx)
-                    tx.commit()
-                    return
-            except deadlock:
-                continue
+    @txnfun(lambda self: self.env)
+    def add(self, ob, *, tx):
+        seq = self._nextseq(tx=tx)
+        self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
+        return seq
 
-    def get(self, id):
-        while True:
-            try:
-                return self.ob[struct.pack(">Q", id)]
-            except KeyError:
-                raise KeyError(id) from None
-            except deadlock:
-                continue
+    @txnfun(lambda self: self.env)
+    def replace(self, id, ob, *, tx):
+        key = struct.pack(">Q", id)
+        if not self.ob.has_key(key, txn=tx.tx):
+            raise KeyError(id)
+        self.ob.put(key, ob, txn=tx.tx)
+
+    @txnfun(lambda self: self.env)
+    def get(self, id, *, tx):
+        ret = self.ob.get(struct.pack(">Q", id), None)
+        if ret is None:
+            raise KeyError(id)
+        return ret
 
-    def remove(self, id):
-        while True:
-            try:
-                with txn(self.env.env) as tx:
-                    self.ob.delete(struct.pack(">Q", id), txn=tx.tx)
-                    tx.commit()
-                    return
-            except deadlock:
-                continue
+    @txnfun(lambda self: self.env)
+    def remove(self, id, *, tx):
+        key = struct.pack(">Q", id)
+        if not self.ob.has_key(key, txn=tx.tx):
+            raise KeyError(id)
+        self.ob.delete(key, txn=tx.tx)