--- /dev/null
+import threading, weakref
+
+class entry(object):
+ __slots__ = ["p", "n", "id", "obj", "st", "lk"]
+ def __init__(self, id, c):
+ self.id = id
+ self.obj = None
+ self.st = None
+ self.lk = None
+ self.n = c.mru
+ self.p = None
+ if c.mru is not None:
+ c.mru.p = self
+ c.mru = self
+ else:
+ c.mru = c.lru = self
+ c.n += 1
+
+ def relink(self, c):
+ if c.mru is self:
+ return
+ if self.n is not None:
+ self.n.p = self.p
+ self.p.n = self.n
+ if c.lru is self:
+ c.lru = self.p
+ self.p = None
+ self.n = c.mru
+ c.mru.p = self
+
+ def remove(self, c):
+ if self.n is not None:
+ self.n.p = self.p
+ if self.p is not None:
+ self.p.n = self.n
+ if c.mru is self:
+ c.mru = self.n
+ if c.lru is self:
+ c.lru = self.p
+ c.n -= 1
+
+class cache(object):
+ def __init__(self, *, keep=1000):
+ self.keep = keep
+ self.cur = {}
+ self.mru = self.lru = None
+ self.n = 0
+ self.lk = threading.Lock()
+
+ def _trim(self, n):
+ ent = self.lru
+ for i in range(self.n - n):
+ if ent.st == "l":
+ ent.obj = weakref.ref(ent.obj)
+ ent.st = "w"
+ elif ent.st == "w" and ent.obj() is None:
+ del self.cur[ent.id]
+ ent.remove(self)
+ ent.st = "r"
+ ent = ent.p
+
+ def get(self, id, load=True):
+ while True:
+ with self.lk:
+ ent = self.cur.get(id)
+ if ent is None:
+ if not load:
+ raise KeyError(id)
+ self.cur[id] = ent = entry(id, self)
+ ent.lk = lk = threading.Lock()
+ ent.st = "ld"
+ st = None
+ self._trim(self.keep)
+ elif ent.st == "l":
+ ent.relink(self)
+ return ent.obj
+ elif ent.st == "w":
+ ret = ent.obj()
+ if ret is None:
+ del self.cur[id]
+ ent.remove(self)
+ ent.st = "r"
+ continue
+ return ret
+ elif ent.st == "ld":
+ lk = ent.lk
+ st = "ld"
+ if lk is None:
+ continue
+ elif ent.st == "r":
+ continue
+ with lk:
+ if st is None:
+ try:
+ ret = ent.obj = self.load(id)
+ ent.st = "l"
+ return ret
+ except:
+ with self.lk:
+ del self.cur[id]
+ ent.remove(self)
+ ent.st = "r"
+ raise
+ finally:
+ ent.lk = None
+ elif st == "ld":
+ continue
+
+ def put(self, id, ob):
+ while True:
+ with self.lk:
+ ent = self.cur.get(id)
+ if ent is None:
+ self.cur[id] = ent = entry(id, self)
+ ent.obj = ob
+ ent.st = "l"
+ self._trim(self.keep)
+ return
+ elif ent.st == "l":
+ ent.obj = ob
+ return
+ elif ent.st == "w":
+ ent.obj = ob
+ return
+ elif ent.st == "r":
+ continue
+ elif ent.st == "ld":
+ lk = ent.lk
+ if lk is None:
+ continue
+ with lk:
+ continue
+
+ def remove(self, id):
+ while True:
+ with self.lk:
+ ent = self.cur.get(id)
+ if ent is None:
+ return
+ elif ent.st == "ld":
+ lk = ent.lk
+ if lk is None:
+ continue
+ else:
+ del self.cur[id]
+ ent.remove(self)
+ ent.st = "r"
+ return
+ with lk:
+ continue
+
+ def load(self, id):
+ raise KeyError()
--- /dev/null
+import threading, pickle
+from . import db, index, cache
+from .db import txnfun
+
+class environment(object):
+ def __init__(self, path):
+ self.path = path
+ self.lk = threading.Lock()
+ self.bk = None
+
+ def __call__(self):
+ with self.lk:
+ if self.bk is None:
+ self.bk = db.environment(self.path)
+ return self.bk
+
+ def close(self):
+ with self.lk:
+ if self.bk is not None:
+ self.bk.close()
+ self.bk = None
+
+class storedesc(object):
+ pass
+
+def storedescs(obj):
+ t = type(obj)
+ ret = getattr(t, "__didex_attr", None)
+ if ret is None:
+ ret = []
+ for nm, val in t.__dict__.items():
+ if isinstance(val, storedesc):
+ ret.append((nm, val))
+ t.__didex_attr = ret
+ return ret
+
+class store(object):
+ def __init__(self, name, *, env=None, path=".", ncache=None):
+ self.name = name
+ self.lk = threading.Lock()
+ if env:
+ self.env = env
+ else:
+ self.env = environment(path)
+ self._db = None
+ if ncache is None:
+ ncache = cache.cache()
+ self.cache = ncache
+ self.cache.load = self._load
+
+ def db(self):
+ with self.lk:
+ if self._db is None:
+ self._db = self.env().db(self.name)
+ return self._db
+
+ def _load(self, id):
+ try:
+ return pickle.loads(self.db().get(id))
+ except:
+ raise KeyError(id, "could not unpickle data")
+
+ def _encode(self, obj):
+ return pickle.dumps(obj)
+
+ def get(self, id, *, load=True):
+ return self.cache.get(id, load=load)
+
+ @txnfun(lambda self: self.db().env.env)
+ def register(self, obj, *, tx):
+ id = self.db().add(self._encode(obj), tx=tx)
+ for nm, attr in storedescs(obj):
+ attr.register(id, obj, tx)
+ self.cache.put(id, obj)
+ return id
+
+ @txnfun(lambda self: self.db().env.env)
+ def unregister(self, id, *, tx):
+ obj = self.get(id)
+ for nm, attr in storedescs(obj):
+ attr.unregister(id, obj, tx)
+ self.db().remove(id, tx=tx)
+ self.cache.remove(id)
+
+ @txnfun(lambda self: self.db().env.env)
+ def update(self, id, *, tx):
+ obj = self.get(id, load=False)
+ for nm, attr, in storedescs(obj):
+ attr.update(id, obj, tx)
+ self.db().replace(id, self._encode(obj), tx=tx)
--- /dev/null
+import threading
+from . import store, lib
+from .store import storedesc
+
+class cursor(lib.closable):
+ def __init__(self, bk, st):
+ self.bk = bk
+ self.st = st
+
+ def close(self):
+ self.bk.close()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ k, id = next(self.bk)
+ return k, self.st.get(id)
+
+ def skip(self, n=1):
+ self.bk.skip(n)
+
+class base(storedesc):
+ def __init__(self, store, indextype, name, datatype, default):
+ self.store = store
+ self.indextype = indextype
+ self.name = name
+ self.typ = datatype
+ self.default = default
+ self.idx = None
+ self.lk = threading.Lock()
+ self.mattr = "__idx_%s_new" % name
+ self.iattr = "__idx_%s_cur" % name
+
+ def index(self):
+ with self.lk:
+ if self.idx is None:
+ self.idx = self.indextype(self.store.db(), self.name, self.typ)
+ return self.idx
+
+ def __get__(self, obj, cls):
+ if obj is None: return self
+ return getattr(obj, self.mattr, self.default)
+
+ def __set__(self, obj, val):
+ setattr(obj, self.mattr, val)
+
+ def __delete__(self, obj):
+ delattr(obj, self.mattr)
+
+ def get(self, **kwargs):
+ return cursor(self.index().get(**kwargs), self.store)
+
+class simple(base):
+ def __init__(self, store, indextype, name, datatype, default=None):
+ super().__init__(store, indextype, name, datatype, default)
+
+ def register(self, id, obj, tx):
+ val = self.__get__(obj, None)
+ self.index().put(val, id, tx=tx)
+ tx.postcommit(lambda: setattr(obj, self.iattr, val))
+
+ def unregister(self, id, obj, tx):
+ self.index().remove(getattr(obj, self.iattr), id, tx=tx)
+ tx.postcommit(lambda: delattr(obj, self.iattr))
+
+ def update(self, id, obj, tx):
+ val = self.__get__(obj, None)
+ ival = getattr(obj, self.iattr)
+ if val != ival:
+ idx = self.index()
+ idx.remove(ival, id, tx=tx)
+ idx.put(val, id, tx=tx)
+ tx.postcommit(lambda: setattr(obj, self.iattr, val))
+
+class multi(base):
+ def __init__(self, store, indextype, name, datatype):
+ super().__init__(store, indextype, name, datatype, ())
+
+ def register(self, id, obj, tx):
+ vals = frozenset(self.__get__(obj, None))
+ idx = self.index()
+ for val in vals:
+ idx.put(val, id, tx=tx)
+ tx.postcommit(lambda: setattr(obj, self.iattr, vals))
+
+ def unregister(self, id, obj, tx):
+ idx = self.index()
+ for val in getattr(obj, self.iattr):
+ idx.remove(val, id, tx=tx)
+ tx.postcommit(lambda: delattr(obj, self.iattr))
+
+ def update(self, id, obj, tx):
+ vals = frozenset(self.__get__(obj, None))
+ ivals = getattr(obj, self.iattr)
+ if vals != ivals:
+ idx = self.index()
+ for val in ivals - vals:
+ idx.remove(val, id, tx=tx)
+ for val in vals - ivals:
+ idx.put(val, id, tx=tx)
+ tx.postcommit(lambda: setattr(obj, self.iattr, vals))