From 7f97a47e579701c8e033ad73259434777f70ef3e Mon Sep 17 00:00:00 2001 From: Fredrik Tolf Date: Wed, 23 Nov 2011 01:36:50 +0100 Subject: [PATCH] Initial commit. --- .gitignore | 1 + pdm/__init__.py | 0 pdm/cli.py | 246 ++++++++++++++++++++++++++++++++++++ pdm/perf.py | 114 +++++++++++++++++ pdm/srv.py | 383 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 744 insertions(+) create mode 100644 .gitignore create mode 100644 pdm/__init__.py create mode 100644 pdm/cli.py create mode 100644 pdm/perf.py create mode 100644 pdm/srv.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d20b64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/pdm/__init__.py b/pdm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pdm/cli.py b/pdm/cli.py new file mode 100644 index 0000000..da6bcc5 --- /dev/null +++ b/pdm/cli.py @@ -0,0 +1,246 @@ +"""Management for daemon processes + +This module provides some client support for the daemon management +provided in the pdm.srv module. +""" + +import socket, pickle, struct, select, threading + +__all__ = ["client", "replclient"] + +class protoerr(Exception): + pass + +def resolve(spec): + if isinstance(spec, socket.socket): + return spec + sk = None + try: + if "/" in spec: + sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sk.connect(spec) + elif spec.isdigit(): + sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sk.connect(("localhost", int(spec))) + elif ":" in spec: + sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + p = spec.rindex(":") + sk.connect((spec[:p], int(spec[p + 1:]))) + else: + raise Exception("Unknown target specification %r" % spec) + rv = sk + sk = None + finally: + if sk is not None: sk.close() + return rv + +class client(object): + def __init__(self, sk, proto = None): + self.sk = resolve(sk) + self.buf = "" + line = self.readline() + if line != "+PDM1": + raise protoerr("Illegal protocol signature") + if proto is not None: + self.select(proto) + + def close(self): + self.sk.close() + + def readline(self): + while True: + p = self.buf.find("\n") + if p >= 0: + ret = self.buf[:p] + self.buf = self.buf[p + 1:] + return ret + ret = self.sk.recv(1024) + if ret == "": + return None + self.buf += ret + + def select(self, proto): + if "\n" in proto: + raise Exception("Illegal protocol specified: %r" % proto) + self.sk.send(proto + "\n") + rep = self.readline() + if len(rep) < 1 or rep[0] != "+": + raise protoerr("Error reply when selecting protocol %s: %s" % (proto, rep[1:])) + + def __enter__(self): + return self + + def __exit__(self, *excinfo): + self.close() + return False + +class replclient(client): + def __init__(self, sk): + super(replclient, self).__init__(sk, "repl") + + def run(self, code): + while True: + ncode = code.replace("\n\n", "\n") + if ncode == code: break + code = ncode + while len(code) > 0 and code[-1] == "\n": + code = code[:-1] + self.sk.send(code + "\n\n") + buf = "" + while True: + ln = self.readline() + if ln[0] == " ": + buf += ln[1:] + "\n" + elif ln[0] == "+": + return buf + elif ln[0] == "-": + raise protoerr("Error reply: %s" % ln[1:]) + else: + raise protoerr("Illegal reply: %s" % ln) + +class perfproxy(object): + def __init__(self, cl, id, proto): + self.cl = cl + self.id = id + self.proto = proto + self.subscribers = set() + + def lookup(self, name): + self.cl.lock.acquire() + try: + id = self.cl.nextid + self.cl.nextid += 1 + finally: + self.cl.lock.release() + (proto,) = self.cl.run("lookup", id, self.id, name) + proxy = perfproxy(self.cl, id, proto) + self.cl.proxies[id] = proxy + return proxy + + def listdir(self): + return self.cl.run("ls", self.id)[0] + + def readattr(self): + return self.cl.run("readattr", self.id)[0] + + def attrinfo(self): + return self.cl.run("attrinfo", self.id)[0] + + def invoke(self, method, *args, **kwargs): + return self.cl.run("invoke", self.id, method, args, kwargs)[0] + + def subscribe(self, cb): + if cb in self.subscribers: + raise ValueError("Already subscribed") + if len(self.subscribers) == 0: + self.cl.run("subs", self.id) + self.subscribers.add(cb) + + def unsubscribe(self): + if cb not in self.subscribers: + raise ValueError("Not subscribed") + self.subscribers.remove(cb) + if len(self.subscribers) == 0: + self.cl.run("unsubs", self.id) + + def notify(self, ev): + for cb in self.subscribers: + try: + cb(ev) + except: pass + + def close(self): + self.cl.run("unbind", self.id) + del self.cl.proxies[self.id] + + def __enter__(self): + return self + + def __exit__(self, *excinfo): + self.close() + return False + +class perfclient(client): + def __init__(self, sk): + super(perfclient, self).__init__(sk, "perf") + self.nextid = 0 + self.lock = threading.Lock() + self.proxies = {} + self.names = {} + + def send(self, ob): + buf = pickle.dumps(ob) + buf = struct.pack(">l", len(buf)) + buf + self.sk.send(buf) + + def recvb(self, num): + buf = "" + while len(buf) < num: + data = self.sk.recv(num - len(buf)) + if data == "": + raise EOFError() + buf += data + return buf + + def recv(self): + return pickle.loads(self.recvb(struct.unpack(">l", self.recvb(4))[0])) + + def event(self, id, ev): + proxy = self.proxies.get(id) + if proxy is None: return + proxy.notify(ev) + + def dispatch(self, timeout = None): + rfd, wfd, efd = select.select([self.sk], [], [], timeout) + if self.sk in rfd: + msg = self.recv() + if msg[0] == "*": + self.event(msg[1], msg[2]) + else: + raise ValueError("Unexpected non-event message: %r" % msg[0]) + + def recvreply(self): + while True: + reply = self.recv() + if reply[0] in ("+", "-"): + return reply + elif reply[0] == "*": + self.event(reply[1], reply[2]) + else: + raise ValueError("Illegal reply header: %r" % reply[0]) + + def run(self, cmd, *args): + self.lock.acquire() + try: + self.send((cmd,) + args) + reply = self.recvreply() + if reply[0] == "+": + return reply[1:] + else: + raise reply[1] + finally: + self.lock.release() + + def lookup(self, module, obnm): + self.lock.acquire() + try: + id = self.nextid + self.nextid += 1 + finally: + self.lock.release() + (proto,) = self.run("bind", id, module, obnm) + proxy = perfproxy(self, id, proto) + self.proxies[id] = proxy + return proxy + + def find(self, name): + ret = self.names.get(name) + if ret is None: + if "/" in name: + p = name.rindex("/") + ret = self.find(name[:p]).lookup(name[p + 1:]) + else: + p = name.rindex(".") + ret = self.lookup(name[:p], name[p + 1:]) + self.names[name] = ret + return ret diff --git a/pdm/perf.py b/pdm/perf.py new file mode 100644 index 0000000..30f60ba --- /dev/null +++ b/pdm/perf.py @@ -0,0 +1,114 @@ +import os, sys, resource, time, socket + +class attrinfo(object): + def __init__(self, desc = None): + self.desc = desc + +class perfobj(object): + def __init__(self, *args, **kwargs): + super(perfobj, self).__init__() + + def pdm_protocols(self): + return [] + +class simpleattr(perfobj): + def __init__(self, func, info = None, *args, **kwargs): + super(simpleattr, self).__init__(*args, **kwargs) + self.func = func + if info is None: + info = attrinfo() + self.info = info + + def readattr(self): + return self.func() + + def attrinfo(self): + return self.info + + def pdm_protocols(self): + return super(simpleattr, self).pdm_protocols() + ["attr"] + +class valueattr(perfobj): + def __init__(self, init, info = None, *args, **kwargs): + super(valueattr, self).__init__(*args, **kwargs) + self.value = init + if info is None: + info = attrinfo() + self.info = info + + def readattr(self): + return self.value + + def attrinfo(self): + return self.info + + def pdm_protocols(self): + return super(valueattr, self).pdm_protocols() + ["attr"] + + +class eventobj(perfobj): + def __init__(self, *args, **kwargs): + super(eventobj, self).__init__(*args, **kwargs) + self.subscribers = set() + + def subscribe(self, cb): + if cb in self.subscribers: + raise ValueError("Already subscribed") + self.subscribers.add(cb) + + def unsubscribe(self, cb): + self.subscribers.remove(cb) + + def notify(self, event): + for cb in self.subscribers: + try: + cb(event) + except: pass + + def pdm_protocols(self): + return super(eventobj, self).pdm_protocols() + ["event"] + +class staticdir(perfobj): + def __init__(self, *args, **kwargs): + super(staticdir, self).__init__(*args, **kwargs) + self.map = {} + + def __setitem__(self, name, ob): + self.map[name] = ob + + def __delitem__(self, name): + del self.map[name] + + def __getitem__(self, name): + return self.map[name] + + def get(self, name, default = None): + return self.map.get(name, default) + + def listdir(self): + return self.map.keys() + + def lookup(self, name): + return self.map[name] + + def pdm_protocols(self): + return super(staticdir, self).pdm_protocols() + ["dir"] + +sysres = staticdir() +itime = time.time() +ires = resource.getrusage(resource.RUSAGE_SELF) +def ct(): + ru = resource.getrusage(resource.RUSAGE_SELF) + return (ru.ru_utime - ires.ru_utime) + (ru.ru_stime - ires.ru_stime) +sysres["realtime"] = simpleattr(func = lambda: time.time() - itime) +sysres["cputime"] = simpleattr(func = ct) +sysres["utime"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF).ru_utime - ires.ru_utime) +sysres["stime"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF).ru_stime - ires.ru_stime) +sysres["maxrss"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) +sysres["rusage"] = simpleattr(func = lambda: resource.getrusage(resource.RUSAGE_SELF)) + +sysinfo = staticdir() +sysinfo["pid"] = simpleattr(func = os.getpid) +sysinfo["uname"] = simpleattr(func = os.uname) +sysinfo["hostname"] = simpleattr(func = socket.gethostname) +sysinfo["platform"] = valueattr(init = sys.platform) diff --git a/pdm/srv.py b/pdm/srv.py new file mode 100644 index 0000000..75b8918 --- /dev/null +++ b/pdm/srv.py @@ -0,0 +1,383 @@ +"""Management for daemon processes + +This module contains a utility to listen for management commands on a +socket, lending itself to managing daemon processes. +""" + +import os, sys, socket, threading, grp, select +import types, pprint, traceback +import pickle, struct + +__all__ = ["listener", "unixlistener", "tcplistener", "listen"] + +protocols = {} + +class repl(object): + def __init__(self, cl): + self.cl = cl + self.mod = types.ModuleType("repl") + self.mod.echo = self.echo + self.printer = pprint.PrettyPrinter(indent = 4, depth = 6) + cl.send("+REPL\n") + + def sendlines(self, text): + for line in text.split("\n"): + self.cl.send(" " + line + "\n") + + def echo(self, ob): + self.sendlines(self.printer.pformat(ob)) + + def command(self, cmd): + try: + try: + ccode = compile(cmd, "PDM Input", "eval") + except SyntaxError: + ccode = compile(cmd, "PDM Input", "exec") + exec ccode in self.mod.__dict__ + self.cl.send("+OK\n") + else: + self.echo(eval(ccode, self.mod.__dict__)) + self.cl.send("+OK\n") + except: + for line in traceback.format_exception(*sys.exc_info()): + self.cl.send(" " + line) + self.cl.send("+EXC\n") + + def handle(self, buf): + p = buf.find("\n\n") + if p < 0: + return buf + cmd = buf[:p + 1] + self.command(cmd) + return buf[p + 2:] +protocols["repl"] = repl + +class perf(object): + def __init__(self, cl): + self.cl = cl + self.odtab = {} + cl.send("+PERF1\n") + self.buf = "" + self.lock = threading.Lock() + self.subscribed = {} + + def closed(self): + for id, recv in self.subscribed.iteritems(): + ob = self.odtab[id] + if ob is None: continue + ob, protos = ob + try: + ob.unsubscribe(recv) + except: pass + + def send(self, *args): + self.lock.acquire() + try: + buf = pickle.dumps(args) + buf = struct.pack(">l", len(buf)) + buf + self.cl.send(buf) + finally: + self.lock.release() + + def bindob(self, id, ob): + if not hasattr(ob, "pdm_protocols"): + raise ValueError("Object does not support PDM introspection") + try: + proto = ob.pdm_protocols() + except Exception, exc: + raise ValueError("PDM introspection failed", exc) + self.odtab[id] = ob, proto + return proto + + def bind(self, id, module, obnm): + resmod = sys.modules.get(module) + if resmod is None: + self.send("-", ImportError("No such module: %s" % module)) + return + try: + ob = getattr(resmod, obnm) + except AttributeError: + self.send("-", AttributeError("No such object: %s" % obnm)) + return + try: + proto = self.bindob(id, ob) + except Exception, exc: + self.send("-", exc) + return + self.send("+", proto) + + def getob(self, id, proto): + ob = self.odtab.get(id) + if ob is None: + self.send("-", ValueError("No such bound ID: %r" % id)) + return None + ob, protos = ob + if proto not in protos: + self.send("-", ValueError("Object does not support that protocol")) + return None + return ob + + def lookup(self, tgtid, srcid, obnm): + src = self.getob(srcid, "dir") + if src is None: + return + try: + ob = src.lookup(obnm) + except KeyError, exc: + self.send("-", exc) + return + try: + proto = self.bindob(tgtid, ob) + except Exception, exc: + self.send("-", exc) + return + self.send("+", proto) + + def unbind(self, id): + ob = self.odtab.get(id) + if ob is None: + self.send("-", KeyError("No such name bound: %r" % id)) + return + ob, protos = ob + del self.odtab[id] + recv = self.subscribed.get(id) + if recv is not None: + ob.unsubscribe(recv) + del self.subscribed[id] + self.send("+") + + def listdir(self, id): + ob = self.getob(id, "dir") + if ob is None: + return + self.send("+", ob.listdir()) + + def readattr(self, id): + ob = self.getob(id, "attr") + if ob is None: + return + try: + ret = ob.readattr() + except Exception, exc: + self.send("-", Exception("Could not read attribute")) + return + self.send("+", ret) + + def attrinfo(self, id): + ob = self.getob(id, "attr") + if ob is None: + return + self.send("+", ob.attrinfo()) + + def invoke(self, id, method, args, kwargs): + ob = self.getob(id, "invoke") + if ob is None: + return + try: + self.send("+", ob.invoke(method, *args, **kwargs)) + except Exception, exc: + self.send("-", exc) + + def event(self, id, ob, ev): + self.send("*", id, ev) + + def subscribe(self, id): + ob = self.getob(id, "event") + if ob is None: + return + if id in self.subscribed: + self.send("-", ValueError("Already subscribed")) + def recv(ev): + self.event(id, ob, ev) + ob.subscribe(recv) + self.subscribed[id] = recv + self.send("+") + + def unsubscribe(self, id): + ob = self.getob(id, "event") + if ob is None: + return + recv = self.subscribed.get(id) + if recv is None: + self.send("-", ValueError("Not subscribed")) + ob.unsubscribe(recv) + del self.subscribed[id] + self.send("+") + + def command(self, data): + cmd = data[0] + if cmd == "bind": + self.bind(*data[1:]) + elif cmd == "unbind": + self.unbind(*data[1:]) + elif cmd == "lookup": + self.lookup(*data[1:]) + elif cmd == "ls": + self.listdir(*data[1:]) + elif cmd == "readattr": + self.readattr(*data[1:]) + elif cmd == "attrinfo": + self.attrinfo(*data[1:]) + elif cmd == "invoke": + self.invoke(*data[1:]) + elif cmd == "subs": + self.subscribe(*data[1:]) + elif cmd == "unsubs": + self.unsubscribe(*data[1:]) + else: + self.send("-", Exception("Unknown command: %r" % (cmd,))) + + def handle(self, buf): + if len(buf) < 4: + return buf + dlen = struct.unpack(">l", buf[:4])[0] + if len(buf) < dlen + 4: + return buf + data = pickle.loads(buf[4:dlen + 4]) + self.command(data) + return buf[dlen + 4:] + +protocols["perf"] = perf + +class client(threading.Thread): + def __init__(self, sk): + super(client, self).__init__(name = "Management client") + self.setDaemon(True) + self.sk = sk + self.handler = self + + def send(self, data): + return self.sk.send(data) + + def choose(self, proto): + if proto in protocols: + self.handler = protocols[proto](self) + else: + self.send("-ERR Unknown protocol: %s\n" % proto) + raise Exception() + + def handle(self, buf): + p = buf.find("\n") + if p >= 0: + proto = buf[:p] + buf = buf[p + 1:] + self.choose(proto) + return buf + + def run(self): + try: + buf = "" + self.send("+PDM1\n") + while True: + ret = self.sk.recv(1024) + if ret == "": + return + buf += ret + while True: + try: + nbuf = self.handler.handle(buf) + except: + return + if nbuf == buf: + break + buf = nbuf + finally: + #for line in traceback.format_exception(*sys.exc_info()): + # print line + try: + self.sk.close() + finally: + if hasattr(self.handler, "closed"): + self.handler.closed() + + +class listener(threading.Thread): + def __init__(self): + super(listener, self).__init__(name = "Management listener") + self.setDaemon(True) + + def listen(self, sk): + self.running = True + while self.running: + rfd, wfd, efd = select.select([sk], [], [sk], 1) + for fd in rfd: + if fd == sk: + nsk, addr = sk.accept() + self.accept(nsk, addr) + + def stop(self): + self.running = False + self.join() + + def accept(self, sk, addr): + cl = client(sk) + cl.start() + +class unixlistener(listener): + def __init__(self, name, mode = 0600, group = None): + super(unixlistener, self).__init__() + self.name = name + self.mode = mode + self.group = group + + def run(self): + sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + ul = False + try: + if os.path.exists(self.name) and os.path.stat.S_ISSOCK(os.stat(self.name).st_mode): + os.unlink(self.name) + sk.bind(self.name) + ul = True + os.chmod(self.name, self.mode) + if self.group is not None: + os.chown(self.name, os.getuid(), grp.getgrnam(self.group).gr_gid) + sk.listen(16) + self.listen(sk) + finally: + sk.close() + if ul: + os.unlink(self.name) + +class tcplistener(listener): + def __init__(self, port, bindaddr = "127.0.0.1"): + super(tcplistener, self).__init__() + self.port = port + self.bindaddr = bindaddr + + def run(self): + sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sk.bind((self.bindaddr, self.port)) + sk.listen(16) + self.listen(sk) + finally: + sk.close() + +def listen(spec): + if ":" in spec: + first = spec[:spec.index(":")] + last = spec[spec.rindex(":") + 1:] + else: + first = spec + last = spec + if "/" in first: + parts = spec.split(":") + mode = 0600 + group = None + if len(parts) > 1: + mode = int(parts[1], 0) + if len(parts) > 2: + group = parts[2] + ret = unixlistener(parts[0], mode = mode, group = group) + ret.start() + return ret + if last.isdigit(): + p = spec.rindex(":") + host = spec[:p] + port = int(spec[p + 1:]) + ret = tcplistener(port, bindaddr = host) + ret.start() + return ret + raise ValueError("Unparsable listener specification: %r" % spec) + +import pdm.perf -- 2.11.0