+ req.write(data)
+ if req.status:
+ reqevent.response([req.status, req.headers])
+ req.flushreq()
+ self.ckflush(req)
+ except closed:
+ pass
+ except:
+ log.error("exception occurred when handling request", exc_info=True)
+ finally:
+ req.close()
+
+ def loop(self):
+ th = threading.current_thread()
+ with self.lk:
+ self.current.add(th)
+ try:
+ while True:
+ with self.lk:
+ self.free.add(th)
+ try:
+ self.pcond.notify_all()
+ now = start = time.time()
+ while self.wreq is None:
+ self.rcond.wait(start + self.live - now)
+ now = time.time()
+ if now - start > self.live:
+ if len(self.current) > self.min:
+ self.current.remove(th)
+ return
+ else:
+ start = now
+ req, self.wreq = self.wreq, None
+ self.pcond.notify_all()
+ finally:
+ self.free.remove(th)
+ self._handle(req)
+ req = None
+ finally:
+ with self.lk:
+ try:
+ self.current.remove(th)
+ except KeyError:
+ pass
+ self.pcond.notify_all()
+
+ def handle(self, req):
+ while True:
+ with self.lk:
+ if len(self.free) < 1 and len(self.current) < self.max:
+ self.newthread()
+ while self.wreq is not None:
+ self.pcond.wait()
+ if self.wreq is None:
+ self.wreq = req
+ self.rcond.notify(1)
+ return
+
+ def close(self):
+ self.live = 0
+ self.min = 0
+ with self.lk:
+ while len(self.current) > 0:
+ self.rcond.notify_all()
+ self.pcond.wait(1)
+
+class resplex(handler):
+ def __init__(self, **kw):
+ super().__init__(**kw)
+ self.current = set()
+ self.lk = threading.Lock()
+ self.cqueue = queue.Queue(5)
+ self.cnpipe = os.pipe()
+ self.rthread = reqthread(name="Response thread", target=self.handle2)
+ self.rthread.start()
+
+ def ckflush(self, req):
+ raise Exception("resplex handler does not support the write() function")
+
+ def handle(self, req):
+ reqthread(target=self.handle1, args=[req]).start()
+
+ def handle1(self, req):
+ try:
+ th = threading.current_thread()
+ with self.lk:
+ self.current.add(th)
+ try:
+ env = req.mkenv()
+ respobj = req.handlewsgi(env, req.startreq)
+ respiter = iter(respobj)
+ if not req.status:
+ log.error("request handler returned without calling start_request")
+ if hasattr(respiter, "close"):
+ respiter.close()
+ return
+ else:
+ self.cqueue.put((req, respiter))
+ os.write(self.cnpipe[1], b" ")
+ req = None