Improved cursor functionality.
[didex.git] / didex / db.py
1 import time, threading, struct
2 from . import lib
3 from bsddb3 import db as bd
4
5 __all__ = ["environment", "database"]
6
7 deadlock = bd.DBLockDeadlockError
8
9 class environment(lib.closable):
10     def __init__(self, path, *, create=True, recover=False, mode=0o666):
11         self.env = bd.DBEnv()
12         self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
13         fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
14         if recover:
15             fl |= bd.DB_RECOVER
16         if create:
17             fl |= bd.DB_CREATE
18         self.env.open(path, fl, mode)
19         self.lastckp = self.lastarch = time.time()
20         self.lock = threading.Lock()
21         self.dbs = {}
22
23     def close(self):
24         env = self.env
25         if env is None:
26             return
27         env.close()
28         self.env = None
29
30     def maint(self):
31         now = time.time()
32         try:
33             if now - self.lastckp > 60:
34                 self.env.txn_checkpoint(1024)
35                 self.lastckp = now
36             if now - self.lastarch > 3600:
37                 self.env.log_archive(bd.DB_ARCH_REMOVE)
38                 self.lastarch = now
39         except deadlock:
40             pass
41
42     def db(self, name, create=True, mode=0o666):
43         with self.lock:
44             if name not in self.dbs:
45                 self.dbs[name] = database(self, name, create, mode)
46             return self.dbs[name]
47
48     def __del__(self):
49         self.close()
50     def __enter__(self):
51         return self
52     def __exit__(self, *excinfo):
53         self.close()
54         return False
55
56 def opendb(env, fnm, dnm, typ, fl, mode):
57     ret = bd.DB(env)
58     while True:
59         try:
60             self.main.open(fnm, dnm, typ, fl, mode)
61         except deadlock:
62             continue
63         return ret
64
65 class txn(object):
66     def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
67         self.tx = env.txn_begin(None, flags)
68         self.done = False
69         self.pcommit = set()
70
71     def commit(self):
72         self.done = True
73         self.tx.commit(0)
74         def run1(list):
75             if len(list) > 0:
76                 try:
77                     list[0]()
78                 finally:
79                     run1(list[1:])
80         run1(list(self.pcommit))
81
82     def abort(self):
83         self.done = True
84         self.tx.abort()
85
86     def __enter__(self):
87         return self
88
89     def __exit__(self, etype, exc, tb):
90         if not self.done:
91             self.abort()
92         return False
93
94     def postcommit(self, fun):
95         self.pcommit.add(fun)
96
97 def dloopfun(fun):
98     def wrapper(self, *args, **kwargs):
99         while True:
100             try:
101                 return fun(self, *args, **kwargs)
102             except deadlock:
103                 continue
104     return wrapper
105
106 def txnfun(envfun):
107     def fxf(fun):
108         def wrapper(self, *args, tx=None, **kwargs):
109             if tx is None:
110                 while True:
111                     try:
112                         with txn(envfun(self)) as ltx:
113                             ret = fun(self, *args, tx=ltx, **kwargs)
114                             ltx.commit()
115                             return ret
116                     except deadlock:
117                         continue
118             else:
119                 return fun(self, *args, tx=tx, **kwargs)
120         return wrapper
121     return fxf
122
123 class database(object):
124     def __init__(self, env, name, create, mode):
125         self.env = env
126         self.mode = mode
127         self.fnm = name
128         fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
129         if create:
130             fl |= bd.DB_CREATE
131         self.cf = self._opendb("cf", bd.DB_HASH, fl)
132         self.ob = self._opendb("ob", bd.DB_HASH, fl)
133
134     def _opendb(self, dnm, typ, fl, init=None):
135         ret = bd.DB(self.env.env)
136         if init: init(ret)
137         while True:
138             try:
139                 ret.open(self.fnm, dnm, typ, fl, self.mode)
140             except deadlock:
141                 continue
142             return ret
143
144     @txnfun(lambda self: self.env.env)
145     def _nextseq(self, *, tx):
146         if self.cf.has_key(b"seq", txn=tx.tx):
147             seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
148         else:
149             seq = 1
150         self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
151         return seq
152
153     @txnfun(lambda self: self.env.env)
154     def add(self, ob, *, tx):
155         seq = self._nextseq(tx=tx)
156         self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
157         return seq
158
159     @txnfun(lambda self: self.env.env)
160     def replace(self, id, ob, *, tx):
161         key = struct.pack(">Q", id)
162         if not self.ob.has_key(key, txn=tx.tx):
163             raise KeyError(id)
164         self.ob.put(key, ob, txn=tx.tx)
165
166     @txnfun(lambda self: self.env.env)
167     def get(self, id, *, tx):
168         ret = self.ob.get(struct.pack(">Q", id), None)
169         if ret is None:
170             raise KeyError(id)
171         return ret
172
173     @txnfun(lambda self: self.env.env)
174     def remove(self, id, *, tx):
175         key = struct.pack(">Q", id)
176         if not self.ob.has_key(key, txn=tx.tx):
177             raise KeyError(id)
178         self.ob.delete(key, txn=tx.tx)