Merge branch 'master' of git.dolda2000.com:/srv/git/r/pdm
[pdm.git] / pdm / cli.py
1 """Management for daemon processes
2
3 This module provides some client support for the daemon management
4 provided in the pdm.srv module.
5 """
6
7 import socket, pickle, struct, select, threading
8
9 __all__ = ["client", "replclient"]
10
11 class protoerr(Exception):
12     pass
13
14 def resolve(spec):
15     if isinstance(spec, socket.socket):
16         return spec
17     sk = None
18     try:
19         if "/" in spec:
20             sk = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
21             sk.connect(spec)
22         elif spec.isdigit():
23             sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24             sk.connect(("localhost", int(spec)))
25         elif ":" in spec:
26             sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
27             p = spec.rindex(":")
28             sk.connect((spec[:p], int(spec[p + 1:])))
29         else:
30             raise Exception("Unknown target specification %r" % spec)
31         rv = sk
32         sk = None
33     finally:
34         if sk is not None: sk.close()
35     return rv
36
37 class client(object):
38     def __init__(self, sk, proto = None):
39         self.sk = resolve(sk)
40         self.buf = ""
41         line = self.readline()
42         if line != "+PDM1":
43             raise protoerr("Illegal protocol signature")
44         if proto is not None:
45             self.select(proto)
46
47     def close(self):
48         self.sk.close()
49
50     def fileno(self):
51         return self.sk.fileno()
52
53     def readline(self):
54         while True:
55             p = self.buf.find("\n")
56             if p >= 0:
57                 ret = self.buf[:p]
58                 self.buf = self.buf[p + 1:]
59                 return ret
60             ret = self.sk.recv(1024)
61             if ret == "":
62                 return None
63             self.buf += ret
64
65     def select(self, proto):
66         if "\n" in proto:
67             raise Exception("Illegal protocol specified: %r" % proto)
68         self.sk.send(proto + "\n")
69         rep = self.readline()
70         if len(rep) < 1 or rep[0] != "+":
71             raise protoerr("Error reply when selecting protocol %s: %s" % (proto, rep[1:]))
72
73     def __enter__(self):
74         return self
75
76     def __exit__(self, *excinfo):
77         self.close()
78         return False
79
80 class replclient(client):
81     def __init__(self, sk):
82         super(replclient, self).__init__(sk, "repl")
83
84     def run(self, code):
85         while True:
86             ncode = code.replace("\n\n", "\n")
87             if ncode == code: break
88             code = ncode
89         while len(code) > 0 and code[-1] == "\n":
90             code = code[:-1]
91         self.sk.send(code + "\n\n")
92         buf = ""
93         while True:
94             ln = self.readline()
95             if ln[0] == " ":
96                 buf += ln[1:] + "\n"
97             elif ln[0] == "+":
98                 return buf
99             elif ln[0] == "-":
100                 raise protoerr("Error reply: %s" % ln[1:])
101             else:
102                 raise protoerr("Illegal reply: %s" % ln)
103
104 class perfproxy(object):
105     def __init__(self, cl, id, proto):
106         self.cl = cl
107         self.id = id
108         self.proto = proto
109         self.subscribers = set()
110
111     def lookup(self, name):
112         self.cl.lock.acquire()
113         try:
114             id = self.cl.nextid
115             self.cl.nextid += 1
116         finally:
117             self.cl.lock.release()
118         (proto,) = self.cl.run("lookup", id, self.id, name)
119         proxy = perfproxy(self.cl, id, proto)
120         self.cl.proxies[id] = proxy
121         return proxy
122
123     def listdir(self):
124         return self.cl.run("ls", self.id)[0]
125
126     def readattr(self):
127         return self.cl.run("readattr", self.id)[0]
128
129     def attrinfo(self):
130         return self.cl.run("attrinfo", self.id)[0]
131
132     def invoke(self, method, *args, **kwargs):
133         return self.cl.run("invoke", self.id, method, args, kwargs)[0]
134
135     def subscribe(self, cb):
136         if cb in self.subscribers:
137             raise ValueError("Already subscribed")
138         if len(self.subscribers) == 0:
139             self.cl.run("subs", self.id)
140         self.subscribers.add(cb)
141
142     def unsubscribe(self, cb):
143         if cb not in self.subscribers:
144             raise ValueError("Not subscribed")
145         self.subscribers.remove(cb)
146         if len(self.subscribers) == 0:
147             self.cl.run("unsubs", self.id)
148
149     def notify(self, ev):
150         for cb in self.subscribers:
151             try:
152                 cb(ev)
153             except: pass
154
155     def close(self):
156         if self.id is not None:
157             self.cl.run("unbind", self.id)
158             del self.cl.proxies[self.id]
159             self.id = None
160
161     def __del__(self):
162         self.close()
163
164     def __enter__(self):
165         return self
166
167     def __exit__(self, *excinfo):
168         self.close()
169         return False
170
171 class perfclient(client):
172     def __init__(self, sk):
173         super(perfclient, self).__init__(sk, "perf")
174         self.nextid = 0
175         self.lock = threading.Lock()
176         self.proxies = {}
177         self.names = {}
178
179     def send(self, ob):
180         buf = pickle.dumps(ob)
181         buf = struct.pack(">l", len(buf)) + buf
182         self.sk.send(buf)
183
184     def recvb(self, num):
185         buf = ""
186         while len(buf) < num:
187             data = self.sk.recv(num - len(buf))
188             if data == "":
189                 raise EOFError()
190             buf += data
191         return buf
192
193     def recv(self):
194         return pickle.loads(self.recvb(struct.unpack(">l", self.recvb(4))[0]))
195
196     def event(self, id, ev):
197         proxy = self.proxies.get(id)
198         if proxy is None: return
199         proxy.notify(ev)
200
201     def dispatch(self, timeout = None):
202         rfd, wfd, efd = select.select([self.sk], [], [], timeout)
203         if self.sk in rfd:
204             msg = self.recv()
205             if msg[0] == "*":
206                 self.event(msg[1], msg[2])
207             else:
208                 raise ValueError("Unexpected non-event message: %r" % msg[0])
209
210     def recvreply(self):
211         while True:
212             reply = self.recv()
213             if reply[0] in ("+", "-"):
214                 return reply
215             elif reply[0] == "*":
216                 self.event(reply[1], reply[2])
217             else:
218                 raise ValueError("Illegal reply header: %r" % reply[0])
219
220     def run(self, cmd, *args):
221         self.lock.acquire()
222         try:
223             self.send((cmd,) + args)
224             reply = self.recvreply()
225             if reply[0] == "+":
226                 return reply[1:]
227             else:
228                 raise reply[1]
229         finally:
230             self.lock.release()
231
232     def lookup(self, module, obnm):
233         self.lock.acquire()
234         try:
235             id = self.nextid
236             self.nextid += 1
237         finally:
238             self.lock.release()
239         (proto,) = self.run("bind", id, module, obnm)
240         proxy = perfproxy(self, id, proto)
241         self.proxies[id] = proxy
242         return proxy
243
244     def find(self, name):
245         ret = self.names.get(name)
246         if ret is None:
247             if "/" in name:
248                 p = name.rindex("/")
249                 ret = self.find(name[:p]).lookup(name[p + 1:])
250             else:
251                 p = name.rindex(".")
252                 ret = self.lookup(name[:p], name[p + 1:])
253             self.names[name] = ret
254         return ret