Merge branch 'master' into python3
[pdm.git] / pdm / cli.py
1 """Management for daemon processes
2
3 This module provides some client support for the daemon management
4 provided in the pdm.srv module.
5 """
6
7 import socket, pickle, struct, select, threading
8
9 __all__ = ["client", "replclient"]
10
11 class protoerr(Exception):
12     pass
13
14 def resolve(spec):
15     if isinstance(spec, socket.socket):
16         return spec
17     sk = None
18     try:
19         if "/" in spec:
20             sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
21             sk.connect(spec)
22         elif spec.isdigit():
23             sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24             sk.connect(("localhost", int(spec)))
25         elif ":" in spec:
26             sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
27             p = spec.rindex(":")
28             sk.connect((spec[:p], int(spec[p + 1:])))
29         else:
30             raise Exception("Unknown target specification %r" % spec)
31         rv = sk
32         sk = None
33     finally:
34         if sk is not None: sk.close()
35     return rv
36
37 class client(object):
38     def __init__(self, sk, proto = None):
39         self.sk = resolve(sk)
40         self.buf = b""
41         line = self.readline()
42         if line != b"+PDM1":
43             raise protoerr("Illegal protocol signature")
44         if proto is not None:
45             self.select(proto)
46
47     def close(self):
48         self.sk.close()
49
50     def fileno(self):
51         return self.sk.fileno()
52
53     def readline(self):
54         while True:
55             p = self.buf.find(b"\n")
56             if p >= 0:
57                 ret = self.buf[:p]
58                 self.buf = self.buf[p + 1:]
59                 return ret
60             ret = self.sk.recv(1024)
61             if ret == b"":
62                 return None
63             self.buf += ret
64
65     def select(self, proto):
66         if isinstance(proto, str):
67             proto = proto.encode("ascii")
68         if b"\n" in proto:
69             raise Exception("Illegal protocol specified: %r" % proto)
70         self.sk.send(proto + b"\n")
71         rep = self.readline()
72         if len(rep) < 1 or rep[0] != b"+"[0]:
73             raise protoerr("Error reply when selecting protocol %s: %s" % (proto, rep[1:]))
74
75     def __enter__(self):
76         return self
77
78     def __exit__(self, *excinfo):
79         self.close()
80         return False
81
82 class replclient(client):
83     def __init__(self, sk):
84         super().__init__(sk, "repl")
85
86     def run(self, code):
87         while True:
88             ncode = code.replace("\n\n", "\n")
89             if ncode == code: break
90             code = ncode
91         while len(code) > 0 and code[-1] == "\n":
92             code = code[:-1]
93         self.sk.send((code + "\n\n").encode("utf-8"))
94         buf = b""
95         while True:
96             ln = self.readline()
97             if ln[0] == b" "[0]:
98                 buf += ln[1:] + b"\n"
99             elif ln[0] == b"+"[0]:
100                 return buf.decode("utf-8")
101             elif ln[0] == b"-"[0]:
102                 raise protoerr("Error reply: %s" % ln[1:].decode("utf-8"))
103             else:
104                 raise protoerr("Illegal reply: %s" % ln)
105
106 class perfproxy(object):
107     def __init__(self, cl, id, proto):
108         self.cl = cl
109         self.id = id
110         self.proto = proto
111         self.subscribers = set()
112
113     def lookup(self, name):
114         self.cl.lock.acquire()
115         try:
116             id = self.cl.nextid
117             self.cl.nextid += 1
118         finally:
119             self.cl.lock.release()
120         (proto,) = self.cl.run("lookup", id, self.id, name)
121         proxy = perfproxy(self.cl, id, proto)
122         self.cl.proxies[id] = proxy
123         return proxy
124
125     def listdir(self):
126         return self.cl.run("ls", self.id)[0]
127
128     def readattr(self):
129         return self.cl.run("readattr", self.id)[0]
130
131     def attrinfo(self):
132         return self.cl.run("attrinfo", self.id)[0]
133
134     def invoke(self, method, *args, **kwargs):
135         return self.cl.run("invoke", self.id, method, args, kwargs)[0]
136
137     def subscribe(self, cb):
138         if cb in self.subscribers:
139             raise ValueError("Already subscribed")
140         if len(self.subscribers) == 0:
141             self.cl.run("subs", self.id)
142         self.subscribers.add(cb)
143
144     def unsubscribe(self, cb):
145         if cb not in self.subscribers:
146             raise ValueError("Not subscribed")
147         self.subscribers.remove(cb)
148         if len(self.subscribers) == 0:
149             self.cl.run("unsubs", self.id)
150
151     def notify(self, ev):
152         for cb in self.subscribers:
153             try:
154                 cb(ev)
155             except: pass
156
157     def close(self):
158         self.cl.run("unbind", self.id)
159         del self.cl.proxies[self.id]
160
161     def __enter__(self):
162         return self
163
164     def __exit__(self, *excinfo):
165         self.close()
166         return False
167
168 class perfclient(client):
169     def __init__(self, sk):
170         super().__init__(sk, "perf")
171         self.nextid = 0
172         self.lock = threading.Lock()
173         self.proxies = {}
174         self.names = {}
175
176     def send(self, ob):
177         buf = pickle.dumps(ob)
178         buf = struct.pack(">l", len(buf)) + buf
179         self.sk.send(buf)
180
181     def recvb(self, num):
182         buf = b""
183         while len(buf) < num:
184             data = self.sk.recv(num - len(buf))
185             if data == b"":
186                 raise EOFError()
187             buf += data
188         return buf
189
190     def recv(self):
191         return pickle.loads(self.recvb(struct.unpack(">l", self.recvb(4))[0]))
192
193     def event(self, id, ev):
194         proxy = self.proxies.get(id)
195         if proxy is None: return
196         proxy.notify(ev)
197
198     def dispatch(self, timeout = None):
199         rfd, wfd, efd = select.select([self.sk], [], [], timeout)
200         if self.sk in rfd:
201             msg = self.recv()
202             if msg[0] == "*":
203                 self.event(msg[1], msg[2])
204             else:
205                 raise ValueError("Unexpected non-event message: %r" % msg[0])
206
207     def recvreply(self):
208         while True:
209             reply = self.recv()
210             if reply[0] in ("+", "-"):
211                 return reply
212             elif reply[0] == "*":
213                 self.event(reply[1], reply[2])
214             else:
215                 raise ValueError("Illegal reply header: %r" % reply[0])
216
217     def run(self, cmd, *args):
218         self.lock.acquire()
219         try:
220             self.send((cmd,) + args)
221             reply = self.recvreply()
222             if reply[0] == "+":
223                 return reply[1:]
224             else:
225                 raise reply[1]
226         finally:
227             self.lock.release()
228
229     def lookup(self, module, obnm):
230         self.lock.acquire()
231         try:
232             id = self.nextid
233             self.nextid += 1
234         finally:
235             self.lock.release()
236         (proto,) = self.run("bind", id, module, obnm)
237         proxy = perfproxy(self, id, proto)
238         self.proxies[id] = proxy
239         return proxy
240
241     def find(self, name):
242         ret = self.names.get(name)
243         if ret is None:
244             if "/" in name:
245                 p = name.rindex("/")
246                 ret = self.find(name[:p]).lookup(name[p + 1:])
247             else:
248                 p = name.rindex(".")
249                 ret = self.lookup(name[:p], name[p + 1:])
250             self.names[name] = ret
251         return ret