Return fileno from clients, for use in select.
[pdm.git] / pdm / cli.py
1 """Management for daemon processes
2
3 This module provides some client support for the daemon management
4 provided in the pdm.srv module.
5 """
6
7 import socket, pickle, struct, select, threading
8
9 __all__ = ["client", "replclient"]
10
11 class protoerr(Exception):
12     pass
13
14 def resolve(spec):
15     if isinstance(spec, socket.socket):
16         return spec
17     sk = None
18     try:
19         if "/" in spec:
20             sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
21             sk.connect(spec)
22         elif spec.isdigit():
23             sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24             sk.connect(("localhost", int(spec)))
25         elif ":" in spec:
26             sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
27             p = spec.rindex(":")
28             sk.connect((spec[:p], int(spec[p + 1:])))
29         else:
30             raise Exception("Unknown target specification %r" % spec)
31         rv = sk
32         sk = None
33     finally:
34         if sk is not None: sk.close()
35     return rv
36
37 class client(object):
38     def __init__(self, sk, proto = None):
39         self.sk = resolve(sk)
40         self.buf = ""
41         line = self.readline()
42         if line != "+PDM1":
43             raise protoerr("Illegal protocol signature")
44         if proto is not None:
45             self.select(proto)
46
47     def close(self):
48         self.sk.close()
49
50     def fileno(self):
51         return self.sk.fileno()
52
53     def readline(self):
54         while True:
55             p = self.buf.find("\n")
56             if p >= 0:
57                 ret = self.buf[:p]
58                 self.buf = self.buf[p + 1:]
59                 return ret
60             ret = self.sk.recv(1024)
61             if ret == "":
62                 return None
63             self.buf += ret
64
65     def select(self, proto):
66         if "\n" in proto:
67             raise Exception("Illegal protocol specified: %r" % proto)
68         self.sk.send(proto + "\n")
69         rep = self.readline()
70         if len(rep) < 1 or rep[0] != "+":
71             raise protoerr("Error reply when selecting protocol %s: %s" % (proto, rep[1:]))
72
73     def __enter__(self):
74         return self
75
76     def __exit__(self, *excinfo):
77         self.close()
78         return False
79
80 class replclient(client):
81     def __init__(self, sk):
82         super(replclient, self).__init__(sk, "repl")
83
84     def run(self, code):
85         while True:
86             ncode = code.replace("\n\n", "\n")
87             if ncode == code: break
88             code = ncode
89         while len(code) > 0 and code[-1] == "\n":
90             code = code[:-1]
91         self.sk.send(code + "\n\n")
92         buf = ""
93         while True:
94             ln = self.readline()
95             if ln[0] == " ":
96                 buf += ln[1:] + "\n"
97             elif ln[0] == "+":
98                 return buf
99             elif ln[0] == "-":
100                 raise protoerr("Error reply: %s" % ln[1:])
101             else:
102                 raise protoerr("Illegal reply: %s" % ln)
103
104 class perfproxy(object):
105     def __init__(self, cl, id, proto):
106         self.cl = cl
107         self.id = id
108         self.proto = proto
109         self.subscribers = set()
110
111     def lookup(self, name):
112         self.cl.lock.acquire()
113         try:
114             id = self.cl.nextid
115             self.cl.nextid += 1
116         finally:
117             self.cl.lock.release()
118         (proto,) = self.cl.run("lookup", id, self.id, name)
119         proxy = perfproxy(self.cl, id, proto)
120         self.cl.proxies[id] = proxy
121         return proxy
122
123     def listdir(self):
124         return self.cl.run("ls", self.id)[0]
125
126     def readattr(self):
127         return self.cl.run("readattr", self.id)[0]
128
129     def attrinfo(self):
130         return self.cl.run("attrinfo", self.id)[0]
131
132     def invoke(self, method, *args, **kwargs):
133         return self.cl.run("invoke", self.id, method, args, kwargs)[0]
134
135     def subscribe(self, cb):
136         if cb in self.subscribers:
137             raise ValueError("Already subscribed")
138         if len(self.subscribers) == 0:
139             self.cl.run("subs", self.id)
140         self.subscribers.add(cb)
141
142     def unsubscribe(self, cb):
143         if cb not in self.subscribers:
144             raise ValueError("Not subscribed")
145         self.subscribers.remove(cb)
146         if len(self.subscribers) == 0:
147             self.cl.run("unsubs", self.id)
148
149     def notify(self, ev):
150         for cb in self.subscribers:
151             try:
152                 cb(ev)
153             except: pass
154
155     def close(self):
156         self.cl.run("unbind", self.id)
157         del self.cl.proxies[self.id]
158
159     def __enter__(self):
160         return self
161
162     def __exit__(self, *excinfo):
163         self.close()
164         return False
165
166 class perfclient(client):
167     def __init__(self, sk):
168         super(perfclient, self).__init__(sk, "perf")
169         self.nextid = 0
170         self.lock = threading.Lock()
171         self.proxies = {}
172         self.names = {}
173
174     def send(self, ob):
175         buf = pickle.dumps(ob)
176         buf = struct.pack(">l", len(buf)) + buf
177         self.sk.send(buf)
178
179     def recvb(self, num):
180         buf = ""
181         while len(buf) < num:
182             data = self.sk.recv(num - len(buf))
183             if data == "":
184                 raise EOFError()
185             buf += data
186         return buf
187
188     def recv(self):
189         return pickle.loads(self.recvb(struct.unpack(">l", self.recvb(4))[0]))
190
191     def event(self, id, ev):
192         proxy = self.proxies.get(id)
193         if proxy is None: return
194         proxy.notify(ev)
195
196     def dispatch(self, timeout = None):
197         rfd, wfd, efd = select.select([self.sk], [], [], timeout)
198         if self.sk in rfd:
199             msg = self.recv()
200             if msg[0] == "*":
201                 self.event(msg[1], msg[2])
202             else:
203                 raise ValueError("Unexpected non-event message: %r" % msg[0])
204
205     def recvreply(self):
206         while True:
207             reply = self.recv()
208             if reply[0] in ("+", "-"):
209                 return reply
210             elif reply[0] == "*":
211                 self.event(reply[1], reply[2])
212             else:
213                 raise ValueError("Illegal reply header: %r" % reply[0])
214
215     def run(self, cmd, *args):
216         self.lock.acquire()
217         try:
218             self.send((cmd,) + args)
219             reply = self.recvreply()
220             if reply[0] == "+":
221                 return reply[1:]
222             else:
223                 raise reply[1]
224         finally:
225             self.lock.release()
226
227     def lookup(self, module, obnm):
228         self.lock.acquire()
229         try:
230             id = self.nextid
231             self.nextid += 1
232         finally:
233             self.lock.release()
234         (proto,) = self.run("bind", id, module, obnm)
235         proxy = perfproxy(self, id, proto)
236         self.proxies[id] = proxy
237         return proxy
238
239     def find(self, name):
240         ret = self.names.get(name)
241         if ret is None:
242             if "/" in name:
243                 p = name.rindex("/")
244                 ret = self.find(name[:p]).lookup(name[p + 1:])
245             else:
246                 p = name.rindex(".")
247                 ret = self.lookup(name[:p], name[p + 1:])
248             self.names[name] = ret
249         return ret