Removed unused duplicates flag from ordered index.
[didex.git] / didex / db.py
CommitLineData
a95055e8
FT
1import time, threading, struct
2from . import lib
3from bsddb3 import db as bd
4
5deadlock = bd.DBLockDeadlockError
6
7class environment(lib.closable):
8 def __init__(self, path, *, create=True, recover=False, mode=0o666):
9 self.env = bd.DBEnv()
10 self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
11 fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
12 if recover:
13 fl |= bd.DB_RECOVER
14 if create:
15 fl |= bd.DB_CREATE
16 self.env.open(path, fl, mode)
17 self.lastckp = self.lastarch = time.time()
18 self.lock = threading.Lock()
19 self.dbs = {}
20
21 def close(self):
22 env = self.env
23 if env is None:
24 return
25 env.close()
26 self.env = None
27
28 def maint(self):
29 now = time.time()
30 try:
31 if now - self.lastckp > 60:
32 self.env.txn_checkpoint(1024)
33 self.lastckp = now
34 if now - self.lastarch > 3600:
35 self.env.log_archive(bd.DB_ARCH_REMOVE)
36 self.lastarch = now
37 except deadlock:
38 pass
39
40 def db(self, name, create=True, mode=0o666):
41 with self.lock:
42 if name not in self.dbs:
43 self.dbs[name] = database(self, name, create, mode)
44 return self.dbs[name]
45
46 def __del__(self):
47 self.close()
48 def __enter__(self):
49 return self
50 def __exit__(self, *excinfo):
51 self.close()
52 return False
53
54def opendb(env, fnm, dnm, typ, fl, mode):
55 ret = bd.DB(env)
56 while True:
57 try:
58 self.main.open(fnm, dnm, typ, fl, mode)
59 except deadlock:
60 continue
61 return ret
62
63class txn(object):
64 def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
65 self.tx = env.txn_begin(None, flags)
66 self.done = False
67
68 def commit(self):
69 self.done = True
70 self.tx.commit(0)
71
72 def abort(self):
73 self.done = True
74 self.tx.abort()
75
76 def __enter__(self):
77 return self
78
79 def __exit__(self, etype, exc, tb):
80 if not self.done:
81 self.abort()
82 return False
83
8950191c
FT
84def txnfun(envfun):
85 def fxf(fun):
86 def wrapper(self, *args, tx=None, **kwargs):
87 if tx is None:
88 while True:
89 try:
90 with txn(envfun(self)) as ltx:
91 ret = fun(self, *args, tx=ltx, **kwargs)
92 ltx.commit()
93 return ret
94 except deadlock:
95 continue
96 else:
97 return fun(self, *args, tx=tx, **kwargs)
98 return wrapper
99 return fxf
100
a95055e8
FT
101class database(object):
102 def __init__(self, env, name, create, mode):
103 self.env = env
104 self.mode = mode
105 self.fnm = name
106 fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
107 if create:
108 fl |= bd.DB_CREATE
109 self.cf = self._opendb("cf", bd.DB_HASH, fl)
110 self.ob = self._opendb("ob", bd.DB_HASH, fl)
111
112 def _opendb(self, dnm, typ, fl, init=None):
113 ret = bd.DB(self.env.env)
114 if init: init(ret)
115 while True:
116 try:
117 ret.open(self.fnm, dnm, typ, fl, self.mode)
118 except deadlock:
119 continue
120 return ret
121
8950191c
FT
122 @txnfun(lambda self: self.env.env)
123 def _nextseq(self, *, tx):
a95055e8
FT
124 if self.cf.has_key(b"seq", txn=tx.tx):
125 seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
126 else:
127 seq = 1
128 self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
129 return seq
130
8950191c
FT
131 @txnfun(lambda self: self.env.env)
132 def add(self, ob, *, tx):
133 seq = self._nextseq(tx=tx)
134 self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
135 return seq
a95055e8 136
8950191c
FT
137 @txnfun(lambda self: self.env.env)
138 def replace(self, id, ob, *, tx):
139 key = struct.pack(">Q", id)
140 if not self.ob.has_key(key, txn=tx.tx):
141 raise KeyError(id)
142 self.ob.put(key, ob, txn=tx.tx)
143
144 @txnfun(lambda self: self.env.env)
145 def get(self, id, *, tx):
146 ret = self.ob.get(struct.pack(">Q", id), None)
147 if ret is None:
148 raise KeyError(id)
149 return ret
a95055e8 150
8950191c
FT
151 @txnfun(lambda self: self.env.env)
152 def remove(self, id, *, tx):
153 key = struct.pack(">Q", id)
154 if not self.ob.has_key(key, txn=tx.tx):
155 raise KeyError(id)
156 self.ob.delete(key, txn=tx.tx)