Added post-commit callbacks to transactions.
[didex.git] / didex / db.py
... / ...
CommitLineData
1import time, threading, struct
2from . import lib
3from bsddb3 import db as bd
4
5deadlock = bd.DBLockDeadlockError
6
7class environment(lib.closable):
8 def __init__(self, path, *, create=True, recover=False, mode=0o666):
9 self.env = bd.DBEnv()
10 self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
11 fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
12 if recover:
13 fl |= bd.DB_RECOVER
14 if create:
15 fl |= bd.DB_CREATE
16 self.env.open(path, fl, mode)
17 self.lastckp = self.lastarch = time.time()
18 self.lock = threading.Lock()
19 self.dbs = {}
20
21 def close(self):
22 env = self.env
23 if env is None:
24 return
25 env.close()
26 self.env = None
27
28 def maint(self):
29 now = time.time()
30 try:
31 if now - self.lastckp > 60:
32 self.env.txn_checkpoint(1024)
33 self.lastckp = now
34 if now - self.lastarch > 3600:
35 self.env.log_archive(bd.DB_ARCH_REMOVE)
36 self.lastarch = now
37 except deadlock:
38 pass
39
40 def db(self, name, create=True, mode=0o666):
41 with self.lock:
42 if name not in self.dbs:
43 self.dbs[name] = database(self, name, create, mode)
44 return self.dbs[name]
45
46 def __del__(self):
47 self.close()
48 def __enter__(self):
49 return self
50 def __exit__(self, *excinfo):
51 self.close()
52 return False
53
54def opendb(env, fnm, dnm, typ, fl, mode):
55 ret = bd.DB(env)
56 while True:
57 try:
58 self.main.open(fnm, dnm, typ, fl, mode)
59 except deadlock:
60 continue
61 return ret
62
63class txn(object):
64 def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
65 self.tx = env.txn_begin(None, flags)
66 self.done = False
67 self.pcommit = set()
68
69 def commit(self):
70 self.done = True
71 self.tx.commit(0)
72 def run1(list):
73 if len(list) > 0:
74 try:
75 list[0]()
76 finally:
77 run1(list[1:])
78 run1(list(self.pcommit))
79
80 def abort(self):
81 self.done = True
82 self.tx.abort()
83
84 def __enter__(self):
85 return self
86
87 def __exit__(self, etype, exc, tb):
88 if not self.done:
89 self.abort()
90 return False
91
92 def postcommit(self, fun):
93 self.pcommit.add(fun)
94
95def txnfun(envfun):
96 def fxf(fun):
97 def wrapper(self, *args, tx=None, **kwargs):
98 if tx is None:
99 while True:
100 try:
101 with txn(envfun(self)) as ltx:
102 ret = fun(self, *args, tx=ltx, **kwargs)
103 ltx.commit()
104 return ret
105 except deadlock:
106 continue
107 else:
108 return fun(self, *args, tx=tx, **kwargs)
109 return wrapper
110 return fxf
111
112class database(object):
113 def __init__(self, env, name, create, mode):
114 self.env = env
115 self.mode = mode
116 self.fnm = name
117 fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
118 if create:
119 fl |= bd.DB_CREATE
120 self.cf = self._opendb("cf", bd.DB_HASH, fl)
121 self.ob = self._opendb("ob", bd.DB_HASH, fl)
122
123 def _opendb(self, dnm, typ, fl, init=None):
124 ret = bd.DB(self.env.env)
125 if init: init(ret)
126 while True:
127 try:
128 ret.open(self.fnm, dnm, typ, fl, self.mode)
129 except deadlock:
130 continue
131 return ret
132
133 @txnfun(lambda self: self.env.env)
134 def _nextseq(self, *, tx):
135 if self.cf.has_key(b"seq", txn=tx.tx):
136 seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
137 else:
138 seq = 1
139 self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
140 return seq
141
142 @txnfun(lambda self: self.env.env)
143 def add(self, ob, *, tx):
144 seq = self._nextseq(tx=tx)
145 self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
146 return seq
147
148 @txnfun(lambda self: self.env.env)
149 def replace(self, id, ob, *, tx):
150 key = struct.pack(">Q", id)
151 if not self.ob.has_key(key, txn=tx.tx):
152 raise KeyError(id)
153 self.ob.put(key, ob, txn=tx.tx)
154
155 @txnfun(lambda self: self.env.env)
156 def get(self, id, *, tx):
157 ret = self.ob.get(struct.pack(">Q", id), None)
158 if ret is None:
159 raise KeyError(id)
160 return ret
161
162 @txnfun(lambda self: self.env.env)
163 def remove(self, id, *, tx):
164 key = struct.pack(">Q", id)
165 if not self.ob.has_key(key, txn=tx.tx):
166 raise KeyError(id)
167 self.ob.delete(key, txn=tx.tx)