python: Removed threadpool handler again.
[ashd.git] / python3 / ashd /
... / ...
1import sys, os, threading, time, logging, select, queue
2from . import perf
4log = logging.getLogger("ashd.serve")
5seq = 1
6seqlk = threading.Lock()
8def reqseq():
9 global seq
10 with seqlk:
11 s = seq
12 seq += 1
13 return s
15class closed(IOError):
16 def __init__(self):
17 super().__init__("The client has closed the connection.")
19class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
21 if name is None:
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
25class wsgirequest(object):
26 def __init__(self, handler):
27 self.status = None
28 self.headers = []
29 self.respsent = False
30 self.handler = handler
31 self.buffer = bytearray()
33 def handlewsgi(self):
34 raise Exception()
35 def fileno(self):
36 raise Exception()
37 def writehead(self, status, headers):
38 raise Exception()
39 def flush(self):
40 raise Exception()
41 def close(self):
42 pass
43 def writedata(self, data):
44 self.buffer.extend(data)
46 def flushreq(self):
47 if not self.respsent:
48 if not self.status:
49 raise Exception("Cannot send response body before starting response.")
50 self.respsent = True
51 self.writehead(self.status, self.headers)
53 def write(self, data):
54 if not data:
55 return
56 self.flushreq()
57 self.writedata(data)
58 self.handler.ckflush(self)
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
62 if exc_info:
63 try:
64 if self.respsent:
65 raise exc_info[1]
66 finally:
67 exc_info = None
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
74class handler(object):
75 def handle(self, request):
76 raise Exception()
77 def ckflush(self, req):
78 while len(req.buffer) > 0:
79 rls, wls, els =[], [req], [req])
80 req.flush()
81 def close(self):
82 pass
84 @classmethod
85 def parseargs(cls, **args):
86 if len(args) > 0:
87 raise ValueError("unknown handler argument: " + next(iter(args)))
88 return {}
90class single(handler):
91 def handle(self, req):
92 try:
93 env = req.mkenv()
94 with perf.request(env) as reqevent:
95 respiter = req.handlewsgi(env, req.startreq)
96 for data in respiter:
97 req.write(data)
98 if req.status:
99 reqevent.response([req.status, req.headers])
100 req.flushreq()
101 self.ckflush(req)
102 except closed:
103 pass
104 except:
105 log.error("exception occurred when handling request", exc_info=True)
106 finally:
107 req.close()
109class freethread(handler):
110 def __init__(self, *, max=None, timeout=None, **kw):
111 super().__init__(**kw)
112 self.current = set()
113 = threading.Lock()
114 self.tcond = threading.Condition(
115 self.max = max
116 self.timeout = timeout
118 @classmethod
119 def parseargs(cls, *, max=None, abort=None, **args):
120 ret = super().parseargs(**args)
121 if max:
122 ret["max"] = int(max)
123 if abort:
124 ret["timeout"] = int(abort)
125 return ret
127 def handle(self, req):
128 with
129 if self.max is not None:
130 if self.timeout is not None:
131 now = start = time.time()
132 while len(self.current) >= self.max:
133 self.tcond.wait(start + self.timeout - now)
134 now = time.time()
135 if now - start > self.timeout:
136 os.abort()
137 else:
138 while len(self.current) >= self.max:
139 self.tcond.wait()
140 th = reqthread(, args=[req])
141 th.start()
142 while th.is_alive() and th not in self.current:
143 self.tcond.wait()
145 def run(self, req):
146 try:
147 th = threading.current_thread()
148 with
149 self.current.add(th)
150 self.tcond.notify_all()
151 try:
152 env = req.mkenv()
153 with perf.request(env) as reqevent:
154 respiter = req.handlewsgi(env, req.startreq)
155 for data in respiter:
156 req.write(data)
157 if req.status:
158 reqevent.response([req.status, req.headers])
159 req.flushreq()
160 self.ckflush(req)
161 except closed:
162 pass
163 except:
164 log.error("exception occurred when handling request", exc_info=True)
165 finally:
166 with
167 self.current.remove(th)
168 self.tcond.notify_all()
169 finally:
170 req.close()
172 def close(self):
173 while True:
174 with
175 if len(self.current) > 0:
176 th = next(iter(self.current))
177 else:
178 return
179 th.join()
181class resplex(handler):
182 def __init__(self, *, max=None, **kw):
183 super().__init__(**kw)
184 self.current = set()
185 = threading.Lock()
186 self.tcond = threading.Condition(
187 self.max = max
188 self.cqueue = queue.Queue(5)
189 self.cnpipe = os.pipe()
190 self.rthread = reqthread(name="Response thread", target=self.handle2)
191 self.rthread.start()
193 @classmethod
194 def parseargs(cls, *, max=None, **args):
195 ret = super().parseargs(**args)
196 if max:
197 ret["max"] = int(max)
198 return ret
200 def ckflush(self, req):
201 raise Exception("resplex handler does not support the write() function")
203 def handle(self, req):
204 with
205 if self.max is not None:
206 while len(self.current) >= self.max:
207 self.tcond.wait()
208 th = reqthread(target=self.handle1, args=[req])
209 th.start()
210 while th.is_alive() and th not in self.current:
211 self.tcond.wait()
213 def handle1(self, req):
214 try:
215 th = threading.current_thread()
216 with
217 self.current.add(th)
218 self.tcond.notify_all()
219 try:
220 env = req.mkenv()
221 respobj = req.handlewsgi(env, req.startreq)
222 respiter = iter(respobj)
223 if not req.status:
224 log.error("request handler returned without calling start_request")
225 if hasattr(respiter, "close"):
226 respiter.close()
227 return
228 else:
229 self.cqueue.put((req, respiter))
230 os.write(self.cnpipe[1], b" ")
231 req = None
232 finally:
233 with
234 self.current.remove(th)
235 self.tcond.notify_all()
236 except closed:
237 pass
238 except:
239 log.error("exception occurred when handling request", exc_info=True)
240 finally:
241 if req is not None:
242 req.close()
244 def handle2(self):
245 try:
246 rp = self.cnpipe[0]
247 current = {}
249 def closereq(req):
250 respiter = current[req]
251 try:
252 if respiter is not None and hasattr(respiter, "close"):
253 respiter.close()
254 except:
255 log.error("exception occurred when closing iterator", exc_info=True)
256 try:
257 req.close()
258 except:
259 log.error("exception occurred when closing request", exc_info=True)
260 del current[req]
261 def ckiter(req):
262 respiter = current[req]
263 if respiter is not None:
264 rem = False
265 try:
266 data = next(respiter)
267 except StopIteration:
268 rem = True
269 req.flushreq()
270 except:
271 rem = True
272 log.error("exception occurred when iterating response", exc_info=True)
273 if not rem:
274 if data:
275 req.flushreq()
276 req.writedata(data)
277 else:
278 current[req] = None
279 try:
280 if hasattr(respiter, "close"):
281 respiter.close()
282 except:
283 log.error("exception occurred when closing iterator", exc_info=True)
284 respiter = None
285 if respiter is None and not req.buffer:
286 closereq(req)
288 while True:
289 bufl = list(req for req in current.keys() if req.buffer)
290 rls, wls, els =[rp], bufl, [rp] + bufl)
291 if rp in rls:
292 ret =, 1024)
293 if not ret:
294 os.close(rp)
295 return
296 try:
297 while True:
298 req, respiter = self.cqueue.get(False)
299 current[req] = respiter
300 ckiter(req)
301 except queue.Empty:
302 pass
303 for req in wls:
304 try:
305 req.flush()
306 except closed:
307 closereq(req)
308 except:
309 log.error("exception occurred when writing response", exc_info=True)
310 closereq(req)
311 else:
312 if len(req.buffer) < 65536:
313 ckiter(req)
314 except:
315 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
316 os.abort()
318 def close(self):
319 while True:
320 with
321 if len(self.current) > 0:
322 th = next(iter(self.current))
323 else:
324 break
325 th.join()
326 os.close(self.cnpipe[1])
327 self.rthread.join()
329names = {"single": single,
330 "free": freethread,
331 "rplex": resplex}
333def parsehspec(spec):
334 if ":" not in spec:
335 return spec, {}
336 nm, spec = spec.split(":", 1)
337 args = {}
338 while spec:
339 if "," in spec:
340 part, spec = spec.split(",", 1)
341 else:
342 part, spec = spec, None
343 if "=" in part:
344 key, val = part.split("=", 1)
345 else:
346 key, val = part, ""
347 args[key] = val
348 return nm, args