Added live-object store and simple indexing.
authorFredrik Tolf <fredrik@dolda2000.com>
Fri, 20 Mar 2015 04:21:29 +0000 (05:21 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Fri, 20 Mar 2015 04:21:29 +0000 (05:21 +0100)
didex/cache.py [new file with mode: 0644]
didex/store.py [new file with mode: 0644]
didex/values.py [new file with mode: 0644]

diff --git a/didex/cache.py b/didex/cache.py
new file mode 100644 (file)
index 0000000..76742aa
--- /dev/null
@@ -0,0 +1,153 @@
+import threading, weakref
+
+class entry(object):
+    __slots__ = ["p", "n", "id", "obj", "st", "lk"]
+    def __init__(self, id, c):
+        self.id = id
+        self.obj = None
+        self.st = None
+        self.lk = None
+        self.n = c.mru
+        self.p = None
+        if c.mru is not None:
+            c.mru.p = self
+            c.mru = self
+        else:
+            c.mru = c.lru = self
+        c.n += 1
+
+    def relink(self, c):
+        if c.mru is self:
+            return
+        if self.n is not None:
+            self.n.p = self.p
+        self.p.n = self.n
+        if c.lru is self:
+            c.lru = self.p
+        self.p = None
+        self.n = c.mru
+        c.mru.p = self
+
+    def remove(self, c):
+        if self.n is not None:
+            self.n.p = self.p
+        if self.p is not None:
+            self.p.n = self.n
+        if c.mru is self:
+            c.mru = self.n
+        if c.lru is self:
+            c.lru = self.p
+        c.n -= 1
+
+class cache(object):
+    def __init__(self, *, keep=1000):
+        self.keep = keep
+        self.cur = {}
+        self.mru = self.lru = None
+        self.n = 0
+        self.lk = threading.Lock()
+
+    def _trim(self, n):
+        ent = self.lru
+        for i in range(self.n - n):
+            if ent.st == "l":
+                ent.obj = weakref.ref(ent.obj)
+                ent.st = "w"
+            elif ent.st == "w" and ent.obj() is None:
+                del self.cur[ent.id]
+                ent.remove(self)
+                ent.st = "r"
+            ent = ent.p
+
+    def get(self, id, load=True):
+        while True:
+            with self.lk:
+                ent = self.cur.get(id)
+                if ent is None:
+                    if not load:
+                        raise KeyError(id)
+                    self.cur[id] = ent = entry(id, self)
+                    ent.lk = lk = threading.Lock()
+                    ent.st = "ld"
+                    st = None
+                    self._trim(self.keep)
+                elif ent.st == "l":
+                    ent.relink(self)
+                    return ent.obj
+                elif ent.st == "w":
+                    ret = ent.obj()
+                    if ret is None:
+                        del self.cur[id]
+                        ent.remove(self)
+                        ent.st = "r"
+                        continue
+                    return ret
+                elif ent.st == "ld":
+                    lk = ent.lk
+                    st = "ld"
+                    if lk is None:
+                        continue
+                elif ent.st == "r":
+                    continue
+            with lk:
+                if st is None:
+                    try:
+                        ret = ent.obj = self.load(id)
+                        ent.st = "l"
+                        return ret
+                    except:
+                        with self.lk:
+                            del self.cur[id]
+                            ent.remove(self)
+                            ent.st = "r"
+                        raise
+                    finally:
+                        ent.lk = None
+                elif st == "ld":
+                    continue
+
+    def put(self, id, ob):
+        while True:
+            with self.lk:
+                ent = self.cur.get(id)
+                if ent is None:
+                    self.cur[id] = ent = entry(id, self)
+                    ent.obj = ob
+                    ent.st = "l"
+                    self._trim(self.keep)
+                    return
+                elif ent.st == "l":
+                    ent.obj = ob
+                    return
+                elif ent.st == "w":
+                    ent.obj = ob
+                    return
+                elif ent.st == "r":
+                    continue
+                elif ent.st == "ld":
+                    lk = ent.lk
+                    if lk is None:
+                        continue
+            with lk:
+                continue
+
+    def remove(self, id):
+        while True:
+            with self.lk:
+                ent = self.cur.get(id)
+                if ent is None:
+                    return
+                elif ent.st == "ld":
+                    lk = ent.lk
+                    if lk is None:
+                        continue
+                else:
+                    del self.cur[id]
+                    ent.remove(self)
+                    ent.st = "r"
+                    return
+            with lk:
+                continue
+
+    def load(self, id):
+        raise KeyError()
diff --git a/didex/store.py b/didex/store.py
new file mode 100644 (file)
index 0000000..15d2eca
--- /dev/null
@@ -0,0 +1,90 @@
+import threading, pickle
+from . import db, index, cache
+from .db import txnfun
+
+class environment(object):
+    def __init__(self, path):
+        self.path = path
+        self.lk = threading.Lock()
+        self.bk = None
+
+    def __call__(self):
+        with self.lk:
+            if self.bk is None:
+                self.bk = db.environment(self.path)
+            return self.bk
+
+    def close(self):
+        with self.lk:
+            if self.bk is not None:
+                self.bk.close()
+                self.bk = None
+
+class storedesc(object):
+    pass
+
+def storedescs(obj):
+    t = type(obj)
+    ret = getattr(t, "__didex_attr", None)
+    if ret is None:
+        ret = []
+        for nm, val in t.__dict__.items():
+            if isinstance(val, storedesc):
+                ret.append((nm, val))
+        t.__didex_attr = ret
+    return ret
+
+class store(object):
+    def __init__(self, name, *, env=None, path=".", ncache=None):
+        self.name = name
+        self.lk = threading.Lock()
+        if env:
+            self.env = env
+        else:
+            self.env = environment(path)
+        self._db = None
+        if ncache is None:
+            ncache = cache.cache()
+        self.cache = ncache
+        self.cache.load = self._load
+
+    def db(self):
+        with self.lk:
+            if self._db is None:
+                self._db = self.env().db(self.name)
+            return self._db
+
+    def _load(self, id):
+        try:
+            return pickle.loads(self.db().get(id))
+        except:
+            raise KeyError(id, "could not unpickle data")
+
+    def _encode(self, obj):
+        return pickle.dumps(obj)
+
+    def get(self, id, *, load=True):
+        return self.cache.get(id, load=load)
+
+    @txnfun(lambda self: self.db().env.env)
+    def register(self, obj, *, tx):
+        id = self.db().add(self._encode(obj), tx=tx)
+        for nm, attr in storedescs(obj):
+            attr.register(id, obj, tx)
+        self.cache.put(id, obj)
+        return id
+
+    @txnfun(lambda self: self.db().env.env)
+    def unregister(self, id, *, tx):
+        obj = self.get(id)
+        for nm, attr in storedescs(obj):
+            attr.unregister(id, obj, tx)
+        self.db().remove(id, tx=tx)
+        self.cache.remove(id)
+
+    @txnfun(lambda self: self.db().env.env)
+    def update(self, id, *, tx):
+        obj = self.get(id, load=False)
+        for nm, attr, in storedescs(obj):
+            attr.update(id, obj, tx)
+        self.db().replace(id, self._encode(obj), tx=tx)
diff --git a/didex/values.py b/didex/values.py
new file mode 100644 (file)
index 0000000..0e7bddc
--- /dev/null
@@ -0,0 +1,102 @@
+import threading
+from . import store, lib
+from .store import storedesc
+
+class cursor(lib.closable):
+    def __init__(self, bk, st):
+        self.bk = bk
+        self.st = st
+
+    def close(self):
+        self.bk.close()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        k, id = next(self.bk)
+        return k, self.st.get(id)
+
+    def skip(self, n=1):
+        self.bk.skip(n)
+
+class base(storedesc):
+    def __init__(self, store, indextype, name, datatype, default):
+        self.store = store
+        self.indextype = indextype
+        self.name = name
+        self.typ = datatype
+        self.default = default
+        self.idx = None
+        self.lk = threading.Lock()
+        self.mattr = "__idx_%s_new" % name
+        self.iattr = "__idx_%s_cur" % name
+
+    def index(self):
+        with self.lk:
+            if self.idx is None:
+                self.idx = self.indextype(self.store.db(), self.name, self.typ)
+            return self.idx
+
+    def __get__(self, obj, cls):
+        if obj is None: return self
+        return getattr(obj, self.mattr, self.default)
+
+    def __set__(self, obj, val):
+        setattr(obj, self.mattr, val)
+
+    def __delete__(self, obj):
+        delattr(obj, self.mattr)
+
+    def get(self, **kwargs):
+        return cursor(self.index().get(**kwargs), self.store)
+
+class simple(base):
+    def __init__(self, store, indextype, name, datatype, default=None):
+        super().__init__(store, indextype, name, datatype, default)
+
+    def register(self, id, obj, tx):
+        val = self.__get__(obj, None)
+        self.index().put(val, id, tx=tx)
+        tx.postcommit(lambda: setattr(obj, self.iattr, val))
+
+    def unregister(self, id, obj, tx):
+        self.index().remove(getattr(obj, self.iattr), id, tx=tx)
+        tx.postcommit(lambda: delattr(obj, self.iattr))
+
+    def update(self, id, obj, tx):
+        val = self.__get__(obj, None)
+        ival = getattr(obj, self.iattr)
+        if val != ival:
+            idx = self.index()
+            idx.remove(ival, id, tx=tx)
+            idx.put(val, id, tx=tx)
+            tx.postcommit(lambda: setattr(obj, self.iattr, val))
+
+class multi(base):
+    def __init__(self, store, indextype, name, datatype):
+        super().__init__(store, indextype, name, datatype, ())
+
+    def register(self, id, obj, tx):
+        vals = frozenset(self.__get__(obj, None))
+        idx = self.index()
+        for val in vals:
+            idx.put(val, id, tx=tx)
+        tx.postcommit(lambda: setattr(obj, self.iattr, vals))
+
+    def unregister(self, id, obj, tx):
+        idx = self.index()
+        for val in getattr(obj, self.iattr):
+            idx.remove(val, id, tx=tx)
+        tx.postcommit(lambda: delattr(obj, self.iattr))
+
+    def update(self, id, obj, tx):
+        vals = frozenset(self.__get__(obj, None))
+        ivals = getattr(obj, self.iattr)
+        if vals != ivals:
+            idx = self.index()
+            for val in ivals - vals:
+                idx.remove(val, id, tx=tx)
+            for val in vals - ivals:
+                idx.put(val, id, tx=tx)
+            tx.postcommit(lambda: setattr(obj, self.iattr, vals))