Added some convenient __all__ imports.
[didex.git] / didex / db.py
1 import time, threading, struct
2 from . import lib
3 from bsddb3 import db as bd
4
5 __all__ = ["environment", "database"]
6
7 deadlock = bd.DBLockDeadlockError
8
9 class environment(lib.closable):
10     def __init__(self, path, *, create=True, recover=False, mode=0o666):
11         self.env = bd.DBEnv()
12         self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
13         fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
14         if recover:
15             fl |= bd.DB_RECOVER
16         if create:
17             fl |= bd.DB_CREATE
18         self.env.open(path, fl, mode)
19         self.lastckp = self.lastarch = time.time()
20         self.lock = threading.Lock()
21         self.dbs = {}
22
23     def close(self):
24         env = self.env
25         if env is None:
26             return
27         env.close()
28         self.env = None
29
30     def maint(self):
31         now = time.time()
32         try:
33             if now - self.lastckp > 60:
34                 self.env.txn_checkpoint(1024)
35                 self.lastckp = now
36             if now - self.lastarch > 3600:
37                 self.env.log_archive(bd.DB_ARCH_REMOVE)
38                 self.lastarch = now
39         except deadlock:
40             pass
41
42     def db(self, name, create=True, mode=0o666):
43         with self.lock:
44             if name not in self.dbs:
45                 self.dbs[name] = database(self, name, create, mode)
46             return self.dbs[name]
47
48     def __del__(self):
49         self.close()
50     def __enter__(self):
51         return self
52     def __exit__(self, *excinfo):
53         self.close()
54         return False
55
56 def opendb(env, fnm, dnm, typ, fl, mode):
57     ret = bd.DB(env)
58     while True:
59         try:
60             self.main.open(fnm, dnm, typ, fl, mode)
61         except deadlock:
62             continue
63         return ret
64
65 class txn(object):
66     def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
67         self.tx = env.txn_begin(None, flags)
68         self.done = False
69         self.pcommit = set()
70
71     def commit(self):
72         self.done = True
73         self.tx.commit(0)
74         def run1(list):
75             if len(list) > 0:
76                 try:
77                     list[0]()
78                 finally:
79                     run1(list[1:])
80         run1(list(self.pcommit))
81
82     def abort(self):
83         self.done = True
84         self.tx.abort()
85
86     def __enter__(self):
87         return self
88
89     def __exit__(self, etype, exc, tb):
90         if not self.done:
91             self.abort()
92         return False
93
94     def postcommit(self, fun):
95         self.pcommit.add(fun)
96
97 def txnfun(envfun):
98     def fxf(fun):
99         def wrapper(self, *args, tx=None, **kwargs):
100             if tx is None:
101                 while True:
102                     try:
103                         with txn(envfun(self)) as ltx:
104                             ret = fun(self, *args, tx=ltx, **kwargs)
105                             ltx.commit()
106                             return ret
107                     except deadlock:
108                         continue
109             else:
110                 return fun(self, *args, tx=tx, **kwargs)
111         return wrapper
112     return fxf
113
114 class database(object):
115     def __init__(self, env, name, create, mode):
116         self.env = env
117         self.mode = mode
118         self.fnm = name
119         fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
120         if create:
121             fl |= bd.DB_CREATE
122         self.cf = self._opendb("cf", bd.DB_HASH, fl)
123         self.ob = self._opendb("ob", bd.DB_HASH, fl)
124
125     def _opendb(self, dnm, typ, fl, init=None):
126         ret = bd.DB(self.env.env)
127         if init: init(ret)
128         while True:
129             try:
130                 ret.open(self.fnm, dnm, typ, fl, self.mode)
131             except deadlock:
132                 continue
133             return ret
134
135     @txnfun(lambda self: self.env.env)
136     def _nextseq(self, *, tx):
137         if self.cf.has_key(b"seq", txn=tx.tx):
138             seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
139         else:
140             seq = 1
141         self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
142         return seq
143
144     @txnfun(lambda self: self.env.env)
145     def add(self, ob, *, tx):
146         seq = self._nextseq(tx=tx)
147         self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
148         return seq
149
150     @txnfun(lambda self: self.env.env)
151     def replace(self, id, ob, *, tx):
152         key = struct.pack(">Q", id)
153         if not self.ob.has_key(key, txn=tx.tx):
154             raise KeyError(id)
155         self.ob.put(key, ob, txn=tx.tx)
156
157     @txnfun(lambda self: self.env.env)
158     def get(self, id, *, tx):
159         ret = self.ob.get(struct.pack(">Q", id), None)
160         if ret is None:
161             raise KeyError(id)
162         return ret
163
164     @txnfun(lambda self: self.env.env)
165     def remove(self, id, *, tx):
166         key = struct.pack(">Q", id)
167         if not self.ob.has_key(key, txn=tx.tx):
168             raise KeyError(id)
169         self.ob.delete(key, txn=tx.tx)