c4c64a93e7d468039aaac06f6a550d38e334ca54
[pdm.git] / pdm / cli.py
1 """Management for daemon processes
2
3 This module provides some client support for the daemon management
4 provided in the pdm.srv module.
5 """
6
7 import socket, pickle, struct, select, threading
8
9 __all__ = ["client", "replclient"]
10
11 class protoerr(Exception):
12     pass
13
14 def resolve(spec):
15     if isinstance(spec, socket.socket):
16         return spec
17     sk = None
18     try:
19         if "/" in spec:
20             sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
21             sk.connect(spec)
22         elif spec.isdigit():
23             sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24             sk.connect(("localhost", int(spec)))
25         elif ":" in spec:
26             sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
27             p = spec.rindex(":")
28             sk.connect((spec[:p], int(spec[p + 1:])))
29         else:
30             raise Exception("Unknown target specification %r" % spec)
31         rv = sk
32         sk = None
33     finally:
34         if sk is not None: sk.close()
35     return rv
36
37 class client(object):
38     def __init__(self, sk, proto = None):
39         self.sk = resolve(sk)
40         self.buf = b""
41         line = self.readline()
42         if line != b"+PDM1":
43             raise protoerr("Illegal protocol signature")
44         if proto is not None:
45             self.select(proto)
46
47     def close(self):
48         self.sk.close()
49
50     def readline(self):
51         while True:
52             p = self.buf.find(b"\n")
53             if p >= 0:
54                 ret = self.buf[:p]
55                 self.buf = self.buf[p + 1:]
56                 return ret
57             ret = self.sk.recv(1024)
58             if ret == b"":
59                 return None
60             self.buf += ret
61
62     def select(self, proto):
63         if isinstance(proto, str):
64             proto = proto.encode("ascii")
65         if b"\n" in proto:
66             raise Exception("Illegal protocol specified: %r" % proto)
67         self.sk.send(proto + b"\n")
68         rep = self.readline()
69         if len(rep) < 1 or rep[0] != b"+"[0]:
70             raise protoerr("Error reply when selecting protocol %s: %s" % (proto, rep[1:]))
71
72     def __enter__(self):
73         return self
74
75     def __exit__(self, *excinfo):
76         self.close()
77         return False
78
79 class replclient(client):
80     def __init__(self, sk):
81         super(replclient, self).__init__(sk, "repl")
82
83     def run(self, code):
84         while True:
85             ncode = code.replace("\n\n", "\n")
86             if ncode == code: break
87             code = ncode
88         while len(code) > 0 and code[-1] == "\n":
89             code = code[:-1]
90         self.sk.send((code + "\n\n").encode("utf-8"))
91         buf = b""
92         while True:
93             ln = self.readline()
94             if ln[0] == b" "[0]:
95                 buf += ln[1:] + b"\n"
96             elif ln[0] == b"+"[0]:
97                 return buf.decode("utf-8")
98             elif ln[0] == b"-"[0]:
99                 raise protoerr("Error reply: %s" % ln[1:].decode("utf-8"))
100             else:
101                 raise protoerr("Illegal reply: %s" % ln)
102
103 class perfproxy(object):
104     def __init__(self, cl, id, proto):
105         self.cl = cl
106         self.id = id
107         self.proto = proto
108         self.subscribers = set()
109
110     def lookup(self, name):
111         self.cl.lock.acquire()
112         try:
113             id = self.cl.nextid
114             self.cl.nextid += 1
115         finally:
116             self.cl.lock.release()
117         (proto,) = self.cl.run("lookup", id, self.id, name)
118         proxy = perfproxy(self.cl, id, proto)
119         self.cl.proxies[id] = proxy
120         return proxy
121
122     def listdir(self):
123         return self.cl.run("ls", self.id)[0]
124
125     def readattr(self):
126         return self.cl.run("readattr", self.id)[0]
127
128     def attrinfo(self):
129         return self.cl.run("attrinfo", self.id)[0]
130
131     def invoke(self, method, *args, **kwargs):
132         return self.cl.run("invoke", self.id, method, args, kwargs)[0]
133
134     def subscribe(self, cb):
135         if cb in self.subscribers:
136             raise ValueError("Already subscribed")
137         if len(self.subscribers) == 0:
138             self.cl.run("subs", self.id)
139         self.subscribers.add(cb)
140
141     def unsubscribe(self, cb):
142         if cb not in self.subscribers:
143             raise ValueError("Not subscribed")
144         self.subscribers.remove(cb)
145         if len(self.subscribers) == 0:
146             self.cl.run("unsubs", self.id)
147
148     def notify(self, ev):
149         for cb in self.subscribers:
150             try:
151                 cb(ev)
152             except: pass
153
154     def close(self):
155         self.cl.run("unbind", self.id)
156         del self.cl.proxies[self.id]
157
158     def __enter__(self):
159         return self
160
161     def __exit__(self, *excinfo):
162         self.close()
163         return False
164
165 class perfclient(client):
166     def __init__(self, sk):
167         super(perfclient, self).__init__(sk, "perf")
168         self.nextid = 0
169         self.lock = threading.Lock()
170         self.proxies = {}
171         self.names = {}
172
173     def send(self, ob):
174         buf = pickle.dumps(ob)
175         buf = struct.pack(">l", len(buf)) + buf
176         self.sk.send(buf)
177
178     def recvb(self, num):
179         buf = b""
180         while len(buf) < num:
181             data = self.sk.recv(num - len(buf))
182             if data == b"":
183                 raise EOFError()
184             buf += data
185         return buf
186
187     def recv(self):
188         return pickle.loads(self.recvb(struct.unpack(">l", self.recvb(4))[0]))
189
190     def event(self, id, ev):
191         proxy = self.proxies.get(id)
192         if proxy is None: return
193         proxy.notify(ev)
194
195     def dispatch(self, timeout = None):
196         rfd, wfd, efd = select.select([self.sk], [], [], timeout)
197         if self.sk in rfd:
198             msg = self.recv()
199             if msg[0] == "*":
200                 self.event(msg[1], msg[2])
201             else:
202                 raise ValueError("Unexpected non-event message: %r" % msg[0])
203
204     def recvreply(self):
205         while True:
206             reply = self.recv()
207             if reply[0] in ("+", "-"):
208                 return reply
209             elif reply[0] == "*":
210                 self.event(reply[1], reply[2])
211             else:
212                 raise ValueError("Illegal reply header: %r" % reply[0])
213
214     def run(self, cmd, *args):
215         self.lock.acquire()
216         try:
217             self.send((cmd,) + args)
218             reply = self.recvreply()
219             if reply[0] == "+":
220                 return reply[1:]
221             else:
222                 raise reply[1]
223         finally:
224             self.lock.release()
225
226     def lookup(self, module, obnm):
227         self.lock.acquire()
228         try:
229             id = self.nextid
230             self.nextid += 1
231         finally:
232             self.lock.release()
233         (proto,) = self.run("bind", id, module, obnm)
234         proxy = perfproxy(self, id, proto)
235         self.proxies[id] = proxy
236         return proxy
237
238     def find(self, name):
239         ret = self.names.get(name)
240         if ret is None:
241             if "/" in name:
242                 p = name.rindex("/")
243                 ret = self.find(name[:p]).lookup(name[p + 1:])
244             else:
245                 p = name.rindex(".")
246                 ret = self.lookup(name[:p], name[p + 1:])
247             self.names[name] = ret
248         return ret