X-Git-Url: http://dolda2000.com/gitweb/?p=didex.git;a=blobdiff_plain;f=didex%2Fdb.py;h=068d404185d45f006b1a79821d573d9fcfade944;hp=224cb2fdb5b61d1c48d83166cbb103695456ea57;hb=947dfab3c174ecce6bd1ff18bdc4df7e0e4087c1;hpb=a95055e8b37a82bf0e4ffca5aba29b3139c9d082 diff --git a/didex/db.py b/didex/db.py index 224cb2f..068d404 100644 --- a/didex/db.py +++ b/didex/db.py @@ -2,6 +2,8 @@ import time, threading, struct from . import lib from bsddb3 import db as bd +__all__ = ["environment", "database"] + deadlock = bd.DBLockDeadlockError class environment(lib.closable): @@ -62,12 +64,22 @@ def opendb(env, fnm, dnm, typ, fl, mode): class txn(object): def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC): - self.tx = env.txn_begin(None, flags) + self.tx = env.env.txn_begin(None, flags) + self.env = env self.done = False + self.pcommit = set() def commit(self): self.done = True self.tx.commit(0) + def run1(list): + if len(list) > 0: + try: + list[0]() + finally: + run1(list[1:]) + run1(list(self.pcommit)) + self.env.maint() def abort(self): self.done = True @@ -81,28 +93,55 @@ class txn(object): self.abort() return False + def postcommit(self, fun): + self.pcommit.add(fun) + +def dloopfun(fun): + def wrapper(self, *args, **kwargs): + while True: + try: + return fun(self, *args, **kwargs) + except deadlock: + continue + return wrapper + +def txnfun(envfun): + def fxf(fun): + def wrapper(self, *args, tx=None, **kwargs): + if tx is None: + while True: + try: + with txn(envfun(self)) as ltx: + ret = fun(self, *args, tx=ltx, **kwargs) + ltx.commit() + return ret + except deadlock: + continue + else: + return fun(self, *args, tx=tx, **kwargs) + return wrapper + return fxf + class database(object): def __init__(self, env, name, create, mode): self.env = env self.mode = mode self.fnm = name - fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT + fl = bd.DB_THREAD if create: fl |= bd.DB_CREATE self.cf = self._opendb("cf", bd.DB_HASH, fl) self.ob = self._opendb("ob", bd.DB_HASH, fl) - def _opendb(self, dnm, typ, fl, init=None): + @txnfun(lambda self: self.env) + def _opendb(self, dnm, typ, fl, init=None, *, tx): ret = bd.DB(self.env.env) if init: init(ret) - while True: - try: - ret.open(self.fnm, dnm, typ, fl, self.mode) - except deadlock: - continue - return ret + ret.open(self.fnm, dnm, typ, fl, self.mode, txn=tx.tx) + return ret - def _nextseq(self, tx): + @txnfun(lambda self: self.env) + def _nextseq(self, *, tx): if self.cf.has_key(b"seq", txn=tx.tx): seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0] else: @@ -110,45 +149,29 @@ class database(object): self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx) return seq - def add(self, ob): - while True: - try: - with txn(self.env.env) as tx: - seq = self._nextseq(tx) - self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE) - tx.commit() - return seq - except deadlock: - continue - - def replace(self, id, ob): - while True: - try: - with txn(self.env.env) as tx: - key = struct.pack(">Q", id) - if not self.ob.has_key(key, txn=tx.tx): - raise KeyError(id) - self.ob.put(key, ob, txn=tx.tx) - tx.commit() - return - except deadlock: - continue + @txnfun(lambda self: self.env) + def add(self, ob, *, tx): + seq = self._nextseq(tx=tx) + self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE) + return seq - def get(self, id): - while True: - try: - return self.ob[struct.pack(">Q", id)] - except KeyError: - raise KeyError(id) from None - except deadlock: - continue + @txnfun(lambda self: self.env) + def replace(self, id, ob, *, tx): + key = struct.pack(">Q", id) + if not self.ob.has_key(key, txn=tx.tx): + raise KeyError(id) + self.ob.put(key, ob, txn=tx.tx) + + @txnfun(lambda self: self.env) + def get(self, id, *, tx): + ret = self.ob.get(struct.pack(">Q", id), None) + if ret is None: + raise KeyError(id) + return ret - def remove(self, id): - while True: - try: - with txn(self.env.env) as tx: - self.ob.delete(struct.pack(">Q", id), txn=tx.tx) - tx.commit() - return - except deadlock: - continue + @txnfun(lambda self: self.env) + def remove(self, id, *, tx): + key = struct.pack(">Q", id) + if not self.ob.has_key(key, txn=tx.tx): + raise KeyError(id) + self.ob.delete(key, txn=tx.tx)