Commit | Line | Data |
---|---|---|
a95055e8 FT |
1 | import time, threading, struct |
2 | from . import lib | |
3 | from bsddb3 import db as bd | |
4 | ||
5 | deadlock = bd.DBLockDeadlockError | |
6 | ||
7 | class environment(lib.closable): | |
8 | def __init__(self, path, *, create=True, recover=False, mode=0o666): | |
9 | self.env = bd.DBEnv() | |
10 | self.env.set_lk_detect(bd.DB_LOCK_RANDOM) | |
11 | fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN | |
12 | if recover: | |
13 | fl |= bd.DB_RECOVER | |
14 | if create: | |
15 | fl |= bd.DB_CREATE | |
16 | self.env.open(path, fl, mode) | |
17 | self.lastckp = self.lastarch = time.time() | |
18 | self.lock = threading.Lock() | |
19 | self.dbs = {} | |
20 | ||
21 | def close(self): | |
22 | env = self.env | |
23 | if env is None: | |
24 | return | |
25 | env.close() | |
26 | self.env = None | |
27 | ||
28 | def maint(self): | |
29 | now = time.time() | |
30 | try: | |
31 | if now - self.lastckp > 60: | |
32 | self.env.txn_checkpoint(1024) | |
33 | self.lastckp = now | |
34 | if now - self.lastarch > 3600: | |
35 | self.env.log_archive(bd.DB_ARCH_REMOVE) | |
36 | self.lastarch = now | |
37 | except deadlock: | |
38 | pass | |
39 | ||
40 | def db(self, name, create=True, mode=0o666): | |
41 | with self.lock: | |
42 | if name not in self.dbs: | |
43 | self.dbs[name] = database(self, name, create, mode) | |
44 | return self.dbs[name] | |
45 | ||
46 | def __del__(self): | |
47 | self.close() | |
48 | def __enter__(self): | |
49 | return self | |
50 | def __exit__(self, *excinfo): | |
51 | self.close() | |
52 | return False | |
53 | ||
54 | def opendb(env, fnm, dnm, typ, fl, mode): | |
55 | ret = bd.DB(env) | |
56 | while True: | |
57 | try: | |
58 | self.main.open(fnm, dnm, typ, fl, mode) | |
59 | except deadlock: | |
60 | continue | |
61 | return ret | |
62 | ||
63 | class txn(object): | |
64 | def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC): | |
65 | self.tx = env.txn_begin(None, flags) | |
66 | self.done = False | |
67 | ||
68 | def commit(self): | |
69 | self.done = True | |
70 | self.tx.commit(0) | |
71 | ||
72 | def abort(self): | |
73 | self.done = True | |
74 | self.tx.abort() | |
75 | ||
76 | def __enter__(self): | |
77 | return self | |
78 | ||
79 | def __exit__(self, etype, exc, tb): | |
80 | if not self.done: | |
81 | self.abort() | |
82 | return False | |
83 | ||
84 | class database(object): | |
85 | def __init__(self, env, name, create, mode): | |
86 | self.env = env | |
87 | self.mode = mode | |
88 | self.fnm = name | |
89 | fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT | |
90 | if create: | |
91 | fl |= bd.DB_CREATE | |
92 | self.cf = self._opendb("cf", bd.DB_HASH, fl) | |
93 | self.ob = self._opendb("ob", bd.DB_HASH, fl) | |
94 | ||
95 | def _opendb(self, dnm, typ, fl, init=None): | |
96 | ret = bd.DB(self.env.env) | |
97 | if init: init(ret) | |
98 | while True: | |
99 | try: | |
100 | ret.open(self.fnm, dnm, typ, fl, self.mode) | |
101 | except deadlock: | |
102 | continue | |
103 | return ret | |
104 | ||
105 | def _nextseq(self, tx): | |
106 | if self.cf.has_key(b"seq", txn=tx.tx): | |
107 | seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0] | |
108 | else: | |
109 | seq = 1 | |
110 | self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx) | |
111 | return seq | |
112 | ||
113 | def add(self, ob): | |
114 | while True: | |
115 | try: | |
116 | with txn(self.env.env) as tx: | |
117 | seq = self._nextseq(tx) | |
118 | self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE) | |
119 | tx.commit() | |
120 | return seq | |
121 | except deadlock: | |
122 | continue | |
123 | ||
124 | def replace(self, id, ob): | |
125 | while True: | |
126 | try: | |
127 | with txn(self.env.env) as tx: | |
128 | key = struct.pack(">Q", id) | |
129 | if not self.ob.has_key(key, txn=tx.tx): | |
130 | raise KeyError(id) | |
131 | self.ob.put(key, ob, txn=tx.tx) | |
132 | tx.commit() | |
133 | return | |
134 | except deadlock: | |
135 | continue | |
136 | ||
137 | def get(self, id): | |
138 | while True: | |
139 | try: | |
140 | return self.ob[struct.pack(">Q", id)] | |
141 | except KeyError: | |
142 | raise KeyError(id) from None | |
143 | except deadlock: | |
144 | continue | |
145 | ||
146 | def remove(self, id): | |
147 | while True: | |
148 | try: | |
149 | with txn(self.env.env) as tx: | |
150 | self.ob.delete(struct.pack(">Q", id), txn=tx.tx) | |
151 | tx.commit() | |
152 | return | |
153 | except deadlock: | |
154 | continue |