1 """Management for daemon processes
3 This module provides some client support for the daemon management
4 provided in the pdm.srv module.
7 import socket, pickle, struct, select, threading
9 __all__ = ["client", "replclient"]
11 class protoerr(Exception):
15 if isinstance(spec, socket.socket):
20 sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
23 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24 sk.connect(("localhost", int(spec)))
26 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
28 sk.connect((spec[:p], int(spec[p + 1:])))
30 raise Exception("Unknown target specification %r" % spec)
34 if sk is not None: sk.close()
38 def __init__(self, sk, proto = None):
41 line = self.readline()
43 raise protoerr("Illegal protocol signature")
51 return self.sk.fileno()
55 p = self.buf.find(b"\n")
58 self.buf = self.buf[p + 1:]
60 ret = self.sk.recv(1024)
65 def select(self, proto):
66 if isinstance(proto, str):
67 proto = proto.encode("ascii")
69 raise Exception("Illegal protocol specified: %r" % proto)
70 self.sk.send(proto + b"\n")
72 if len(rep) < 1 or rep[0] != b"+"[0]:
73 raise protoerr("Error reply when selecting protocol %s: %s" % (proto, rep[1:]))
78 def __exit__(self, *excinfo):
82 class replclient(client):
83 def __init__(self, sk):
84 super().__init__(sk, "repl")
88 ncode = code.replace("\n\n", "\n")
89 if ncode == code: break
91 while len(code) > 0 and code[-1] == "\n":
93 self.sk.send((code + "\n\n").encode("utf-8"))
99 elif ln[0] == b"+"[0]:
100 return buf.decode("utf-8")
101 elif ln[0] == b"-"[0]:
102 raise protoerr("Error reply: %s" % ln[1:].decode("utf-8"))
104 raise protoerr("Illegal reply: %s" % ln)
106 class perfproxy(object):
107 def __init__(self, cl, id, proto):
111 self.subscribers = set()
113 def lookup(self, name):
114 self.cl.lock.acquire()
119 self.cl.lock.release()
120 (proto,) = self.cl.run("lookup", id, self.id, name)
121 proxy = perfproxy(self.cl, id, proto)
122 self.cl.proxies[id] = proxy
126 return self.cl.run("ls", self.id)[0]
129 return self.cl.run("readattr", self.id)[0]
132 return self.cl.run("attrinfo", self.id)[0]
134 def invoke(self, method, *args, **kwargs):
135 return self.cl.run("invoke", self.id, method, args, kwargs)[0]
137 def subscribe(self, cb):
138 if cb in self.subscribers:
139 raise ValueError("Already subscribed")
140 if len(self.subscribers) == 0:
141 self.cl.run("subs", self.id)
142 self.subscribers.add(cb)
144 def unsubscribe(self, cb):
145 if cb not in self.subscribers:
146 raise ValueError("Not subscribed")
147 self.subscribers.remove(cb)
148 if len(self.subscribers) == 0:
149 self.cl.run("unsubs", self.id)
151 def notify(self, ev):
152 for cb in self.subscribers:
158 self.cl.run("unbind", self.id)
159 del self.cl.proxies[self.id]
164 def __exit__(self, *excinfo):
168 class perfclient(client):
169 def __init__(self, sk):
170 super().__init__(sk, "perf")
172 self.lock = threading.Lock()
177 buf = pickle.dumps(ob)
178 buf = struct.pack(">l", len(buf)) + buf
181 def recvb(self, num):
183 while len(buf) < num:
184 data = self.sk.recv(num - len(buf))
191 return pickle.loads(self.recvb(struct.unpack(">l", self.recvb(4))[0]))
193 def event(self, id, ev):
194 proxy = self.proxies.get(id)
195 if proxy is None: return
198 def dispatch(self, timeout = None):
199 rfd, wfd, efd = select.select([self.sk], [], [], timeout)
203 self.event(msg[1], msg[2])
205 raise ValueError("Unexpected non-event message: %r" % msg[0])
210 if reply[0] in ("+", "-"):
212 elif reply[0] == "*":
213 self.event(reply[1], reply[2])
215 raise ValueError("Illegal reply header: %r" % reply[0])
217 def run(self, cmd, *args):
220 self.send((cmd,) + args)
221 reply = self.recvreply()
229 def lookup(self, module, obnm):
236 (proto,) = self.run("bind", id, module, obnm)
237 proxy = perfproxy(self, id, proto)
238 self.proxies[id] = proxy
241 def find(self, name):
242 ret = self.names.get(name)
246 ret = self.find(name[:p]).lookup(name[p + 1:])
249 ret = self.lookup(name[:p], name[p + 1:])
250 self.names[name] = ret