Initial import.
[didex.git] / didex / db.py
CommitLineData
a95055e8
FT
1import time, threading, struct
2from . import lib
3from bsddb3 import db as bd
4
5deadlock = bd.DBLockDeadlockError
6
7class environment(lib.closable):
8 def __init__(self, path, *, create=True, recover=False, mode=0o666):
9 self.env = bd.DBEnv()
10 self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
11 fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
12 if recover:
13 fl |= bd.DB_RECOVER
14 if create:
15 fl |= bd.DB_CREATE
16 self.env.open(path, fl, mode)
17 self.lastckp = self.lastarch = time.time()
18 self.lock = threading.Lock()
19 self.dbs = {}
20
21 def close(self):
22 env = self.env
23 if env is None:
24 return
25 env.close()
26 self.env = None
27
28 def maint(self):
29 now = time.time()
30 try:
31 if now - self.lastckp > 60:
32 self.env.txn_checkpoint(1024)
33 self.lastckp = now
34 if now - self.lastarch > 3600:
35 self.env.log_archive(bd.DB_ARCH_REMOVE)
36 self.lastarch = now
37 except deadlock:
38 pass
39
40 def db(self, name, create=True, mode=0o666):
41 with self.lock:
42 if name not in self.dbs:
43 self.dbs[name] = database(self, name, create, mode)
44 return self.dbs[name]
45
46 def __del__(self):
47 self.close()
48 def __enter__(self):
49 return self
50 def __exit__(self, *excinfo):
51 self.close()
52 return False
53
54def opendb(env, fnm, dnm, typ, fl, mode):
55 ret = bd.DB(env)
56 while True:
57 try:
58 self.main.open(fnm, dnm, typ, fl, mode)
59 except deadlock:
60 continue
61 return ret
62
63class txn(object):
64 def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
65 self.tx = env.txn_begin(None, flags)
66 self.done = False
67
68 def commit(self):
69 self.done = True
70 self.tx.commit(0)
71
72 def abort(self):
73 self.done = True
74 self.tx.abort()
75
76 def __enter__(self):
77 return self
78
79 def __exit__(self, etype, exc, tb):
80 if not self.done:
81 self.abort()
82 return False
83
84class database(object):
85 def __init__(self, env, name, create, mode):
86 self.env = env
87 self.mode = mode
88 self.fnm = name
89 fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
90 if create:
91 fl |= bd.DB_CREATE
92 self.cf = self._opendb("cf", bd.DB_HASH, fl)
93 self.ob = self._opendb("ob", bd.DB_HASH, fl)
94
95 def _opendb(self, dnm, typ, fl, init=None):
96 ret = bd.DB(self.env.env)
97 if init: init(ret)
98 while True:
99 try:
100 ret.open(self.fnm, dnm, typ, fl, self.mode)
101 except deadlock:
102 continue
103 return ret
104
105 def _nextseq(self, tx):
106 if self.cf.has_key(b"seq", txn=tx.tx):
107 seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
108 else:
109 seq = 1
110 self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
111 return seq
112
113 def add(self, ob):
114 while True:
115 try:
116 with txn(self.env.env) as tx:
117 seq = self._nextseq(tx)
118 self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
119 tx.commit()
120 return seq
121 except deadlock:
122 continue
123
124 def replace(self, id, ob):
125 while True:
126 try:
127 with txn(self.env.env) as tx:
128 key = struct.pack(">Q", id)
129 if not self.ob.has_key(key, txn=tx.tx):
130 raise KeyError(id)
131 self.ob.put(key, ob, txn=tx.tx)
132 tx.commit()
133 return
134 except deadlock:
135 continue
136
137 def get(self, id):
138 while True:
139 try:
140 return self.ob[struct.pack(">Q", id)]
141 except KeyError:
142 raise KeyError(id) from None
143 except deadlock:
144 continue
145
146 def remove(self, id):
147 while True:
148 try:
149 with txn(self.env.env) as tx:
150 self.ob.delete(struct.pack(">Q", id), txn=tx.tx)
151 tx.commit()
152 return
153 except deadlock:
154 continue