Initial commit.
authorFredrik Tolf <fredrik@dolda2000.com>
Wed, 23 Nov 2011 00:36:50 +0000 (01:36 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Wed, 23 Nov 2011 00:36:50 +0000 (01:36 +0100)
.gitignore [new file with mode: 0644]
pdm/__init__.py [new file with mode: 0644]
pdm/cli.py [new file with mode: 0644]
pdm/perf.py [new file with mode: 0644]
pdm/srv.py [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..0d20b64
--- /dev/null
@@ -0,0 +1 @@
+*.pyc
diff --git a/pdm/__init__.py b/pdm/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/pdm/cli.py b/pdm/cli.py
new file mode 100644 (file)
index 0000000..da6bcc5
--- /dev/null
@@ -0,0 +1,246 @@
+"""Management for daemon processes
+
+This module provides some client support for the daemon management
+provided in the pdm.srv module.
+"""
+
+import socket, pickle, struct, select, threading
+
+__all__ = ["client", "replclient"]
+
+class protoerr(Exception):
+    pass
+
+def resolve(spec):
+    if isinstance(spec, socket.socket):
+        return spec
+    sk = None
+    try:
+        if "/" in spec:
+            sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+            sk.connect(spec)
+        elif spec.isdigit():
+            sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            sk.connect(("localhost", int(spec)))
+        elif ":" in spec:
+            sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            p = spec.rindex(":")
+            sk.connect((spec[:p], int(spec[p + 1:])))
+        else:
+            raise Exception("Unknown target specification %r" % spec)
+        rv = sk
+        sk = None
+    finally:
+        if sk is not None: sk.close()
+    return rv
+
+class client(object):
+    def __init__(self, sk, proto = None):
+        self.sk = resolve(sk)
+        self.buf = ""
+        line = self.readline()
+        if line != "+PDM1":
+            raise protoerr("Illegal protocol signature")
+        if proto is not None:
+            self.select(proto)
+
+    def close(self):
+        self.sk.close()
+
+    def readline(self):
+        while True:
+            p = self.buf.find("\n")
+            if p >= 0:
+                ret = self.buf[:p]
+                self.buf = self.buf[p + 1:]
+                return ret
+            ret = self.sk.recv(1024)
+            if ret == "":
+                return None
+            self.buf += ret
+
+    def select(self, proto):
+        if "\n" in proto:
+            raise Exception("Illegal protocol specified: %r" % proto)
+        self.sk.send(proto + "\n")
+        rep = self.readline()
+        if len(rep) < 1 or rep[0] != "+":
+            raise protoerr("Error reply when selecting protocol %s: %s" % (proto, rep[1:]))
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *excinfo):
+        self.close()
+        return False
+
+class replclient(client):
+    def __init__(self, sk):
+        super(replclient, self).__init__(sk, "repl")
+
+    def run(self, code):
+        while True:
+            ncode = code.replace("\n\n", "\n")
+            if ncode == code: break
+            code = ncode
+        while len(code) > 0 and code[-1] == "\n":
+            code = code[:-1]
+        self.sk.send(code + "\n\n")
+        buf = ""
+        while True:
+            ln = self.readline()
+            if ln[0] == " ":
+                buf += ln[1:] + "\n"
+            elif ln[0] == "+":
+                return buf
+            elif ln[0] == "-":
+                raise protoerr("Error reply: %s" % ln[1:])
+            else:
+                raise protoerr("Illegal reply: %s" % ln)
+
+class perfproxy(object):
+    def __init__(self, cl, id, proto):
+        self.cl = cl
+        self.id = id
+        self.proto = proto
+        self.subscribers = set()
+
+    def lookup(self, name):
+        self.cl.lock.acquire()
+        try:
+            id = self.cl.nextid
+            self.cl.nextid += 1
+        finally:
+            self.cl.lock.release()
+        (proto,) = self.cl.run("lookup", id, self.id, name)
+        proxy = perfproxy(self.cl, id, proto)
+        self.cl.proxies[id] = proxy
+        return proxy
+
+    def listdir(self):
+        return self.cl.run("ls", self.id)[0]
+
+    def readattr(self):
+        return self.cl.run("readattr", self.id)[0]
+
+    def attrinfo(self):
+        return self.cl.run("attrinfo", self.id)[0]
+
+    def invoke(self, method, *args, **kwargs):
+        return self.cl.run("invoke", self.id, method, args, kwargs)[0]
+
+    def subscribe(self, cb):
+        if cb in self.subscribers:
+            raise ValueError("Already subscribed")
+        if len(self.subscribers) == 0:
+            self.cl.run("subs", self.id)
+        self.subscribers.add(cb)
+
+    def unsubscribe(self):
+        if cb not in self.subscribers:
+            raise ValueError("Not subscribed")
+        self.subscribers.remove(cb)
+        if len(self.subscribers) == 0:
+            self.cl.run("unsubs", self.id)
+
+    def notify(self, ev):
+        for cb in self.subscribers:
+            try:
+                cb(ev)
+            except: pass
+
+    def close(self):
+        self.cl.run("unbind", self.id)
+        del self.cl.proxies[self.id]
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *excinfo):
+        self.close()
+        return False
+
+class perfclient(client):
+    def __init__(self, sk):
+        super(perfclient, self).__init__(sk, "perf")
+        self.nextid = 0
+        self.lock = threading.Lock()
+        self.proxies = {}
+        self.names = {}
+
+    def send(self, ob):
+        buf = pickle.dumps(ob)
+        buf = struct.pack(">l", len(buf)) + buf
+        self.sk.send(buf)
+
+    def recvb(self, num):
+        buf = ""
+        while len(buf) < num:
+            data = self.sk.recv(num - len(buf))
+            if data == "":
+                raise EOFError()
+            buf += data
+        return buf
+
+    def recv(self):
+        return pickle.loads(self.recvb(struct.unpack(">l", self.recvb(4))[0]))
+
+    def event(self, id, ev):
+        proxy = self.proxies.get(id)
+        if proxy is None: return
+        proxy.notify(ev)
+
+    def dispatch(self, timeout = None):
+        rfd, wfd, efd = select.select([self.sk], [], [], timeout)
+        if self.sk in rfd:
+            msg = self.recv()
+            if msg[0] == "*":
+                self.event(msg[1], msg[2])
+            else:
+                raise ValueError("Unexpected non-event message: %r" % msg[0])
+
+    def recvreply(self):
+        while True:
+            reply = self.recv()
+            if reply[0] in ("+", "-"):
+                return reply
+            elif reply[0] == "*":
+                self.event(reply[1], reply[2])
+            else:
+                raise ValueError("Illegal reply header: %r" % reply[0])
+
+    def run(self, cmd, *args):
+        self.lock.acquire()
+        try:
+            self.send((cmd,) + args)
+            reply = self.recvreply()
+            if reply[0] == "+":
+                return reply[1:]
+            else:
+                raise reply[1]
+        finally:
+            self.lock.release()
+
+    def lookup(self, module, obnm):
+        self.lock.acquire()
+        try:
+            id = self.nextid
+            self.nextid += 1
+        finally:
+            self.lock.release()
+        (proto,) = self.run("bind", id, module, obnm)
+        proxy = perfproxy(self, id, proto)
+        self.proxies[id] = proxy
+        return proxy
+
+    def find(self, name):
+        ret = self.names.get(name)
+        if ret is None:
+            if "/" in name:
+                p = name.rindex("/")
+                ret = self.find(name[:p]).lookup(name[p + 1:])
+            else:
+                p = name.rindex(".")
+                ret = self.lookup(name[:p], name[p + 1:])
+            self.names[name] = ret
+        return ret
diff --git a/pdm/perf.py b/pdm/perf.py
new file mode 100644 (file)
index 0000000..30f60ba
--- /dev/null
@@ -0,0 +1,114 @@
+import os, sys, resource, time, socket
+
+class attrinfo(object):
+    def __init__(self, desc = None):
+        self.desc = desc
+
+class perfobj(object):
+    def __init__(self, *args, **kwargs):
+        super(perfobj, self).__init__()
+    
+    def pdm_protocols(self):
+        return []
+
+class simpleattr(perfobj):
+    def __init__(self, func, info = None, *args, **kwargs):
+        super(simpleattr, self).__init__(*args, **kwargs)
+        self.func = func
+        if info is None:
+            info = attrinfo()
+        self.info = info
+
+    def readattr(self):
+        return self.func()
+
+    def attrinfo(self):
+        return self.info
+
+    def pdm_protocols(self):
+        return super(simpleattr, self).pdm_protocols() + ["attr"]
+
+class valueattr(perfobj):
+    def __init__(self, init, info = None, *args, **kwargs):
+        super(valueattr, self).__init__(*args, **kwargs)
+        self.value = init
+        if info is None:
+            info = attrinfo()
+        self.info = info
+
+    def readattr(self):
+        return self.value
+
+    def attrinfo(self):
+        return self.info
+
+    def pdm_protocols(self):
+        return super(valueattr, self).pdm_protocols() + ["attr"]
+
+
+class eventobj(perfobj):
+    def __init__(self, *args, **kwargs):
+        super(eventobj, self).__init__(*args, **kwargs)
+        self.subscribers = set()
+
+    def subscribe(self, cb):
+        if cb in self.subscribers:
+            raise ValueError("Already subscribed")
+        self.subscribers.add(cb)
+
+    def unsubscribe(self, cb):
+        self.subscribers.remove(cb)
+
+    def notify(self, event):
+        for cb in self.subscribers:
+            try:
+                cb(event)
+            except: pass
+
+    def pdm_protocols(self):
+        return super(eventobj, self).pdm_protocols() + ["event"]
+
+class staticdir(perfobj):
+    def __init__(self, *args, **kwargs):
+        super(staticdir, self).__init__(*args, **kwargs)
+        self.map = {}
+
+    def __setitem__(self, name, ob):
+        self.map[name] = ob
+
+    def __delitem__(self, name):
+        del self.map[name]
+        
+    def __getitem__(self, name):
+        return self.map[name]
+
+    def get(self, name, default = None):
+        return self.map.get(name, default)
+
+    def listdir(self):
+        return self.map.keys()
+
+    def lookup(self, name):
+        return self.map[name]
+
+    def pdm_protocols(self):
+        return super(staticdir, self).pdm_protocols() + ["dir"]
+
+sysres = staticdir()
+itime = time.time()
+ires = resource.getrusage(resource.RUSAGE_SELF)
+def ct():
+    ru = resource.getrusage(resource.RUSAGE_SELF)
+    return (ru.ru_utime - ires.ru_utime) + (ru.ru_stime - ires.ru_stime)
+sysres["realtime"] = simpleattr(func = lambda: time.time() - itime)
+sysres["cputime"] = simpleattr(func = ct)
+sysres["utime"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF).ru_utime - ires.ru_utime)
+sysres["stime"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF).ru_stime - ires.ru_stime)
+sysres["maxrss"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
+sysres["rusage"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF))
+
+sysinfo = staticdir()
+sysinfo["pid"] = simpleattr(func = os.getpid)
+sysinfo["uname"] = simpleattr(func = os.uname)
+sysinfo["hostname"] = simpleattr(func = socket.gethostname)
+sysinfo["platform"] = valueattr(init = sys.platform)
diff --git a/pdm/srv.py b/pdm/srv.py
new file mode 100644 (file)
index 0000000..75b8918
--- /dev/null
@@ -0,0 +1,383 @@
+"""Management for daemon processes
+
+This module contains a utility to listen for management commands on a
+socket, lending itself to managing daemon processes.
+"""
+
+import os, sys, socket, threading, grp, select
+import types, pprint, traceback
+import pickle, struct
+
+__all__ = ["listener", "unixlistener", "tcplistener", "listen"]
+
+protocols = {}
+
+class repl(object):
+    def __init__(self, cl):
+        self.cl = cl
+        self.mod = types.ModuleType("repl")
+        self.mod.echo = self.echo
+        self.printer = pprint.PrettyPrinter(indent = 4, depth = 6)
+        cl.send("+REPL\n")
+
+    def sendlines(self, text):
+        for line in text.split("\n"):
+            self.cl.send(" " + line + "\n")
+
+    def echo(self, ob):
+        self.sendlines(self.printer.pformat(ob))
+
+    def command(self, cmd):
+        try:
+            try:
+                ccode = compile(cmd, "PDM Input", "eval")
+            except SyntaxError:
+                ccode = compile(cmd, "PDM Input", "exec")
+                exec ccode in self.mod.__dict__
+                self.cl.send("+OK\n")
+            else:
+                self.echo(eval(ccode, self.mod.__dict__))
+                self.cl.send("+OK\n")
+        except:
+            for line in traceback.format_exception(*sys.exc_info()):
+                self.cl.send(" " + line)
+            self.cl.send("+EXC\n")
+
+    def handle(self, buf):
+        p = buf.find("\n\n")
+        if p < 0:
+            return buf
+        cmd = buf[:p + 1]
+        self.command(cmd)
+        return buf[p + 2:]
+protocols["repl"] = repl
+
+class perf(object):
+    def __init__(self, cl):
+        self.cl = cl
+        self.odtab = {}
+        cl.send("+PERF1\n")
+        self.buf = ""
+        self.lock = threading.Lock()
+        self.subscribed = {}
+
+    def closed(self):
+        for id, recv in self.subscribed.iteritems():
+            ob = self.odtab[id]
+            if ob is None: continue
+            ob, protos = ob
+            try:
+                ob.unsubscribe(recv)
+            except: pass
+
+    def send(self, *args):
+        self.lock.acquire()
+        try:
+            buf = pickle.dumps(args)
+            buf = struct.pack(">l", len(buf)) + buf
+            self.cl.send(buf)
+        finally:
+            self.lock.release()
+
+    def bindob(self, id, ob):
+        if not hasattr(ob, "pdm_protocols"):
+            raise ValueError("Object does not support PDM introspection")
+        try:
+            proto = ob.pdm_protocols()
+        except Exception, exc:
+            raise ValueError("PDM introspection failed", exc)
+        self.odtab[id] = ob, proto
+        return proto
+
+    def bind(self, id, module, obnm):
+        resmod = sys.modules.get(module)
+        if resmod is None:
+            self.send("-", ImportError("No such module: %s" % module))
+            return
+        try:
+            ob = getattr(resmod, obnm)
+        except AttributeError:
+            self.send("-", AttributeError("No such object: %s" % obnm))
+            return
+        try:
+            proto = self.bindob(id, ob)
+        except Exception, exc:
+            self.send("-", exc)
+            return
+        self.send("+", proto)
+
+    def getob(self, id, proto):
+        ob = self.odtab.get(id)
+        if ob is None:
+            self.send("-", ValueError("No such bound ID: %r" % id))
+            return None
+        ob, protos = ob
+        if proto not in protos:
+            self.send("-", ValueError("Object does not support that protocol"))
+            return None
+        return ob
+
+    def lookup(self, tgtid, srcid, obnm):
+        src = self.getob(srcid, "dir")
+        if src is None:
+            return
+        try:
+            ob = src.lookup(obnm)
+        except KeyError, exc:
+            self.send("-", exc)
+            return
+        try:
+            proto = self.bindob(tgtid, ob)
+        except Exception, exc:
+            self.send("-", exc)
+            return
+        self.send("+", proto)
+
+    def unbind(self, id):
+        ob = self.odtab.get(id)
+        if ob is None:
+            self.send("-", KeyError("No such name bound: %r" % id))
+            return
+        ob, protos = ob
+        del self.odtab[id]
+        recv = self.subscribed.get(id)
+        if recv is not None:
+            ob.unsubscribe(recv)
+            del self.subscribed[id]
+        self.send("+")
+
+    def listdir(self, id):
+        ob = self.getob(id, "dir")
+        if ob is None:
+            return
+        self.send("+", ob.listdir())
+
+    def readattr(self, id):
+        ob = self.getob(id, "attr")
+        if ob is None:
+            return
+        try:
+            ret = ob.readattr()
+        except Exception, exc:
+            self.send("-", Exception("Could not read attribute"))
+            return
+        self.send("+", ret)
+
+    def attrinfo(self, id):
+        ob = self.getob(id, "attr")
+        if ob is None:
+            return
+        self.send("+", ob.attrinfo())
+
+    def invoke(self, id, method, args, kwargs):
+        ob = self.getob(id, "invoke")
+        if ob is None:
+            return
+        try:
+            self.send("+", ob.invoke(method, *args, **kwargs))
+        except Exception, exc:
+            self.send("-", exc)
+
+    def event(self, id, ob, ev):
+        self.send("*", id, ev)
+
+    def subscribe(self, id):
+        ob = self.getob(id, "event")
+        if ob is None:
+            return
+        if id in self.subscribed:
+            self.send("-", ValueError("Already subscribed"))
+        def recv(ev):
+            self.event(id, ob, ev)
+        ob.subscribe(recv)
+        self.subscribed[id] = recv
+        self.send("+")
+
+    def unsubscribe(self, id):
+        ob = self.getob(id, "event")
+        if ob is None:
+            return
+        recv = self.subscribed.get(id)
+        if recv is None:
+            self.send("-", ValueError("Not subscribed"))
+        ob.unsubscribe(recv)
+        del self.subscribed[id]
+        self.send("+")
+
+    def command(self, data):
+        cmd = data[0]
+        if cmd == "bind":
+            self.bind(*data[1:])
+        elif cmd == "unbind":
+            self.unbind(*data[1:])
+        elif cmd == "lookup":
+            self.lookup(*data[1:])
+        elif cmd == "ls":
+            self.listdir(*data[1:])
+        elif cmd == "readattr":
+            self.readattr(*data[1:])
+        elif cmd == "attrinfo":
+            self.attrinfo(*data[1:])
+        elif cmd == "invoke":
+            self.invoke(*data[1:])
+        elif cmd == "subs":
+            self.subscribe(*data[1:])
+        elif cmd == "unsubs":
+            self.unsubscribe(*data[1:])
+        else:
+            self.send("-", Exception("Unknown command: %r" % (cmd,)))
+
+    def handle(self, buf):
+        if len(buf) < 4:
+            return buf
+        dlen = struct.unpack(">l", buf[:4])[0]
+        if len(buf) < dlen + 4:
+            return buf
+        data = pickle.loads(buf[4:dlen + 4])
+        self.command(data)
+        return buf[dlen + 4:]
+        
+protocols["perf"] = perf
+
+class client(threading.Thread):
+    def __init__(self, sk):
+        super(client, self).__init__(name = "Management client")
+        self.setDaemon(True)
+        self.sk = sk
+        self.handler = self
+
+    def send(self, data):
+        return self.sk.send(data)
+
+    def choose(self, proto):
+        if proto in protocols:
+            self.handler = protocols[proto](self)
+        else:
+            self.send("-ERR Unknown protocol: %s\n" % proto)
+            raise Exception()
+
+    def handle(self, buf):
+        p = buf.find("\n")
+        if p >= 0:
+            proto = buf[:p]
+            buf = buf[p + 1:]
+            self.choose(proto)
+        return buf
+
+    def run(self):
+        try:
+            buf = ""
+            self.send("+PDM1\n")
+            while True:
+                ret = self.sk.recv(1024)
+                if ret == "":
+                    return
+                buf += ret
+                while True:
+                    try:
+                        nbuf = self.handler.handle(buf)
+                    except:
+                        return
+                    if nbuf == buf:
+                        break
+                    buf = nbuf
+        finally:
+            #for line in traceback.format_exception(*sys.exc_info()):
+            #    print line
+            try:
+                self.sk.close()
+            finally:
+                if hasattr(self.handler, "closed"):
+                    self.handler.closed()
+            
+
+class listener(threading.Thread):
+    def __init__(self):
+        super(listener, self).__init__(name = "Management listener")
+        self.setDaemon(True)
+
+    def listen(self, sk):
+        self.running = True
+        while self.running:
+            rfd, wfd, efd = select.select([sk], [], [sk], 1)
+            for fd in rfd:
+                if fd == sk:
+                    nsk, addr = sk.accept()
+                    self.accept(nsk, addr)
+
+    def stop(self):
+        self.running = False
+        self.join()
+
+    def accept(self, sk, addr):
+        cl = client(sk)
+        cl.start()
+
+class unixlistener(listener):
+    def __init__(self, name, mode = 0600, group = None):
+        super(unixlistener, self).__init__()
+        self.name = name
+        self.mode = mode
+        self.group = group
+
+    def run(self):
+        sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        ul = False
+        try:
+            if os.path.exists(self.name) and os.path.stat.S_ISSOCK(os.stat(self.name).st_mode):
+                os.unlink(self.name)
+            sk.bind(self.name)
+            ul = True
+            os.chmod(self.name, self.mode)
+            if self.group is not None:
+                os.chown(self.name, os.getuid(), grp.getgrnam(self.group).gr_gid)
+            sk.listen(16)
+            self.listen(sk)
+        finally:
+            sk.close()
+            if ul:
+                os.unlink(self.name)
+
+class tcplistener(listener):
+    def __init__(self, port, bindaddr = "127.0.0.1"):
+        super(tcplistener, self).__init__()
+        self.port = port
+        self.bindaddr = bindaddr
+
+    def run(self):
+        sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        try:
+            sk.bind((self.bindaddr, self.port))
+            sk.listen(16)
+            self.listen(sk)
+        finally:
+            sk.close()
+
+def listen(spec):
+    if ":" in spec:
+        first = spec[:spec.index(":")]
+        last = spec[spec.rindex(":") + 1:]
+    else:
+        first = spec
+        last = spec
+    if "/" in first:
+        parts = spec.split(":")
+        mode = 0600
+        group = None
+        if len(parts) > 1:
+            mode = int(parts[1], 0)
+        if len(parts) > 2:
+            group = parts[2]
+        ret = unixlistener(parts[0], mode = mode, group = group)
+        ret.start()
+        return ret
+    if last.isdigit():
+        p = spec.rindex(":")
+        host = spec[:p]
+        port = int(spec[p + 1:])
+        ret = tcplistener(port, bindaddr = host)
+        ret.start()
+        return ret
+    raise ValueError("Unparsable listener specification: %r" % spec)
+
+import pdm.perf