X-Git-Url: http://dolda2000.com/gitweb/?p=didex.git;a=blobdiff_plain;f=didex%2Fdb.py;h=316fa02f00804aa7bccdf79129b2f3354ffbb382;hp=224cb2fdb5b61d1c48d83166cbb103695456ea57;hb=da5de0141d1328b254425b6a70d7a2c1f3c41c2b;hpb=a95055e8b37a82bf0e4ffca5aba29b3139c9d082 diff --git a/didex/db.py b/didex/db.py index 224cb2f..316fa02 100644 --- a/didex/db.py +++ b/didex/db.py @@ -64,10 +64,18 @@ class txn(object): def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC): self.tx = env.txn_begin(None, flags) self.done = False + self.pcommit = set() def commit(self): self.done = True self.tx.commit(0) + def run1(list): + if len(list) > 0: + try: + list[0]() + finally: + run1(list[1:]) + run1(list(self.pcommit)) def abort(self): self.done = True @@ -81,6 +89,26 @@ class txn(object): self.abort() return False + def postcommit(self, fun): + self.pcommit.add(fun) + +def txnfun(envfun): + def fxf(fun): + def wrapper(self, *args, tx=None, **kwargs): + if tx is None: + while True: + try: + with txn(envfun(self)) as ltx: + ret = fun(self, *args, tx=ltx, **kwargs) + ltx.commit() + return ret + except deadlock: + continue + else: + return fun(self, *args, tx=tx, **kwargs) + return wrapper + return fxf + class database(object): def __init__(self, env, name, create, mode): self.env = env @@ -102,7 +130,8 @@ class database(object): continue return ret - def _nextseq(self, tx): + @txnfun(lambda self: self.env.env) + def _nextseq(self, *, tx): if self.cf.has_key(b"seq", txn=tx.tx): seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0] else: @@ -110,45 +139,29 @@ class database(object): self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx) return seq - def add(self, ob): - while True: - try: - with txn(self.env.env) as tx: - seq = self._nextseq(tx) - self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE) - tx.commit() - return seq - except deadlock: - continue - - def replace(self, id, ob): - while True: - try: - with txn(self.env.env) as tx: - key = struct.pack(">Q", id) - if not self.ob.has_key(key, txn=tx.tx): - raise KeyError(id) - self.ob.put(key, ob, txn=tx.tx) - tx.commit() - return - except deadlock: - continue + @txnfun(lambda self: self.env.env) + def add(self, ob, *, tx): + seq = self._nextseq(tx=tx) + self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE) + return seq - def get(self, id): - while True: - try: - return self.ob[struct.pack(">Q", id)] - except KeyError: - raise KeyError(id) from None - except deadlock: - continue + @txnfun(lambda self: self.env.env) + def replace(self, id, ob, *, tx): + key = struct.pack(">Q", id) + if not self.ob.has_key(key, txn=tx.tx): + raise KeyError(id) + self.ob.put(key, ob, txn=tx.tx) + + @txnfun(lambda self: self.env.env) + def get(self, id, *, tx): + ret = self.ob.get(struct.pack(">Q", id), None) + if ret is None: + raise KeyError(id) + return ret - def remove(self, id): - while True: - try: - with txn(self.env.env) as tx: - self.ob.delete(struct.pack(">Q", id), txn=tx.tx) - tx.commit() - return - except deadlock: - continue + @txnfun(lambda self: self.env.env) + def remove(self, id, *, tx): + key = struct.pack(">Q", id) + if not self.ob.has_key(key, txn=tx.tx): + raise KeyError(id) + self.ob.delete(key, txn=tx.tx)