Fixed cache bug.
[didex.git] / didex / db.py
1 import time, threading, struct
2 from . import lib
3 from bsddb3 import db as bd
4
5 deadlock = bd.DBLockDeadlockError
6
7 class environment(lib.closable):
8     def __init__(self, path, *, create=True, recover=False, mode=0o666):
9         self.env = bd.DBEnv()
10         self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
11         fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
12         if recover:
13             fl |= bd.DB_RECOVER
14         if create:
15             fl |= bd.DB_CREATE
16         self.env.open(path, fl, mode)
17         self.lastckp = self.lastarch = time.time()
18         self.lock = threading.Lock()
19         self.dbs = {}
20
21     def close(self):
22         env = self.env
23         if env is None:
24             return
25         env.close()
26         self.env = None
27
28     def maint(self):
29         now = time.time()
30         try:
31             if now - self.lastckp > 60:
32                 self.env.txn_checkpoint(1024)
33                 self.lastckp = now
34             if now - self.lastarch > 3600:
35                 self.env.log_archive(bd.DB_ARCH_REMOVE)
36                 self.lastarch = now
37         except deadlock:
38             pass
39
40     def db(self, name, create=True, mode=0o666):
41         with self.lock:
42             if name not in self.dbs:
43                 self.dbs[name] = database(self, name, create, mode)
44             return self.dbs[name]
45
46     def __del__(self):
47         self.close()
48     def __enter__(self):
49         return self
50     def __exit__(self, *excinfo):
51         self.close()
52         return False
53
54 def opendb(env, fnm, dnm, typ, fl, mode):
55     ret = bd.DB(env)
56     while True:
57         try:
58             self.main.open(fnm, dnm, typ, fl, mode)
59         except deadlock:
60             continue
61         return ret
62
63 class txn(object):
64     def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
65         self.tx = env.txn_begin(None, flags)
66         self.done = False
67         self.pcommit = set()
68
69     def commit(self):
70         self.done = True
71         self.tx.commit(0)
72         def run1(list):
73             if len(list) > 0:
74                 try:
75                     list[0]()
76                 finally:
77                     run1(list[1:])
78         run1(list(self.pcommit))
79
80     def abort(self):
81         self.done = True
82         self.tx.abort()
83
84     def __enter__(self):
85         return self
86
87     def __exit__(self, etype, exc, tb):
88         if not self.done:
89             self.abort()
90         return False
91
92     def postcommit(self, fun):
93         self.pcommit.add(fun)
94
95 def txnfun(envfun):
96     def fxf(fun):
97         def wrapper(self, *args, tx=None, **kwargs):
98             if tx is None:
99                 while True:
100                     try:
101                         with txn(envfun(self)) as ltx:
102                             ret = fun(self, *args, tx=ltx, **kwargs)
103                             ltx.commit()
104                             return ret
105                     except deadlock:
106                         continue
107             else:
108                 return fun(self, *args, tx=tx, **kwargs)
109         return wrapper
110     return fxf
111
112 class database(object):
113     def __init__(self, env, name, create, mode):
114         self.env = env
115         self.mode = mode
116         self.fnm = name
117         fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
118         if create:
119             fl |= bd.DB_CREATE
120         self.cf = self._opendb("cf", bd.DB_HASH, fl)
121         self.ob = self._opendb("ob", bd.DB_HASH, fl)
122
123     def _opendb(self, dnm, typ, fl, init=None):
124         ret = bd.DB(self.env.env)
125         if init: init(ret)
126         while True:
127             try:
128                 ret.open(self.fnm, dnm, typ, fl, self.mode)
129             except deadlock:
130                 continue
131             return ret
132
133     @txnfun(lambda self: self.env.env)
134     def _nextseq(self, *, tx):
135         if self.cf.has_key(b"seq", txn=tx.tx):
136             seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
137         else:
138             seq = 1
139         self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
140         return seq
141
142     @txnfun(lambda self: self.env.env)
143     def add(self, ob, *, tx):
144         seq = self._nextseq(tx=tx)
145         self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
146         return seq
147
148     @txnfun(lambda self: self.env.env)
149     def replace(self, id, ob, *, tx):
150         key = struct.pack(">Q", id)
151         if not self.ob.has_key(key, txn=tx.tx):
152             raise KeyError(id)
153         self.ob.put(key, ob, txn=tx.tx)
154
155     @txnfun(lambda self: self.env.env)
156     def get(self, id, *, tx):
157         ret = self.ob.get(struct.pack(">Q", id), None)
158         if ret is None:
159             raise KeyError(id)
160         return ret
161
162     @txnfun(lambda self: self.env.env)
163     def remove(self, id, *, tx):
164         key = struct.pack(">Q", id)
165         if not self.ob.has_key(key, txn=tx.tx):
166             raise KeyError(id)
167         self.ob.delete(key, txn=tx.tx)