X-Git-Url: http://dolda2000.com/gitweb/?a=blobdiff_plain;f=python3%2Fashd%2Fserve.py;h=db68a5f6c4ab2cf509a874cacde0ec7343ef9078;hb=76ff6c4dcf0bed028475ef646f4d637d8e91d1a7;hp=cddf81582d10498ed81bebb387a4428604467281;hpb=a962c94401805275ce98a6512c455103e376703a;p=ashd.git diff --git a/python3/ashd/serve.py b/python3/ashd/serve.py index cddf815..db68a5f 100644 --- a/python3/ashd/serve.py +++ b/python3/ashd/serve.py @@ -23,7 +23,7 @@ class reqthread(threading.Thread): super().__init__(name=name, **kw) class wsgirequest(object): - def __init__(self, handler): + def __init__(self, *, handler): self.status = None self.headers = [] self.respsent = False @@ -88,6 +88,8 @@ class handler(object): return {} class single(handler): + cname = "single" + def handle(self, req): try: env = req.mkenv() @@ -106,7 +108,19 @@ class single(handler): finally: req.close() +def dbg(*a): + f = True + for o in a: + if not f: + sys.stderr.write(" ") + sys.stderr.write(str(a)) + f = False + sys.stderr.write("\n") + sys.stderr.flush() + class freethread(handler): + cname = "free" + def __init__(self, *, max=None, timeout=None, **kw): super().__init__(**kw) self.current = set() @@ -138,8 +152,9 @@ class freethread(handler): while len(self.current) >= self.max: self.tcond.wait() th = reqthread(target=self.run, args=[req]) + th.registered = False th.start() - while th.is_alive() and th not in self.current: + while not th.registered: self.tcond.wait() def run(self, req): @@ -147,6 +162,7 @@ class freethread(handler): th = threading.current_thread() with self.lk: self.current.add(th) + th.registered = True self.tcond.notify_all() try: env = req.mkenv() @@ -178,132 +194,48 @@ class freethread(handler): return th.join() -class threadpool(handler): - def __init__(self, *, min=0, max=20, live=300, **kw): +class resplex(handler): + cname = "rplex" + + def __init__(self, *, max=None, **kw): super().__init__(**kw) self.current = set() - self.free = set() - self.lk = threading.RLock() - self.pcond = threading.Condition(self.lk) - self.rcond = threading.Condition(self.lk) - self.wreq = None - self.min = min + self.lk = threading.Lock() + self.tcond = threading.Condition(self.lk) self.max = max - self.live = live - for i in range(self.min): - self.newthread() + self.cqueue = queue.Queue(5) + self.cnpipe = os.pipe() + self.rthread = reqthread(name="Response thread", target=self.handle2) + self.rthread.start() @classmethod - def parseargs(cls, *, min=None, max=None, live=None, **args): + def parseargs(cls, *, max=None, **args): ret = super().parseargs(**args) - if min: - ret["min"] = int(min) if max: ret["max"] = int(max) - if live: - ret["live"] = int(live) return ret - def newthread(self): - with self.lk: - th = reqthread(target=self.loop) - th.start() - while not th in self.current: - self.pcond.wait() - - def _handle(self, req): - try: - env = req.mkenv() - with perf.request(env) as reqevent: - respiter = req.handlewsgi(env, req.startreq) - for data in respiter: - req.write(data) - if req.status: - reqevent.response([req.status, req.headers]) - req.flushreq() - self.ckflush(req) - except closed: - pass - except: - log.error("exception occurred when handling request", exc_info=True) - finally: - req.close() - - def loop(self): - th = threading.current_thread() - with self.lk: - self.current.add(th) - try: - while True: - with self.lk: - self.free.add(th) - try: - self.pcond.notify_all() - now = start = time.time() - while self.wreq is None: - self.rcond.wait(start + self.live - now) - now = time.time() - if now - start > self.live: - if len(self.current) > self.min: - self.current.remove(th) - return - else: - start = now - req, self.wreq = self.wreq, None - self.pcond.notify_all() - finally: - self.free.remove(th) - self._handle(req) - req = None - finally: - with self.lk: - try: - self.current.remove(th) - except KeyError: - pass - self.pcond.notify_all() - - def handle(self, req): - while True: - with self.lk: - if len(self.free) < 1 and len(self.current) < self.max: - self.newthread() - while self.wreq is not None: - self.pcond.wait() - if self.wreq is None: - self.wreq = req - self.rcond.notify(1) - return - - def close(self): - self.live = 0 - self.min = 0 - with self.lk: - while len(self.current) > 0: - self.rcond.notify_all() - self.pcond.wait(1) - -class resplex(handler): - def __init__(self, **kw): - super().__init__(**kw) - self.current = set() - self.lk = threading.Lock() - self.cqueue = queue.Queue(5) - self.cnpipe = os.pipe() - self.rthread = reqthread(name="Response thread", target=self.handle2) - self.rthread.start() - def ckflush(self, req): raise Exception("resplex handler does not support the write() function") def handle(self, req): - reqthread(target=self.handle1, args=[req]).start() + with self.lk: + if self.max is not None: + while len(self.current) >= self.max: + self.tcond.wait() + th = reqthread(target=self.handle1, args=[req]) + th.registered = False + th.start() + while not th.registered: + self.tcond.wait() def handle1(self, req): try: th = threading.current_thread() with self.lk: self.current.add(th) + th.registered = True + self.tcond.notify_all() try: env = req.mkenv() respobj = req.handlewsgi(env, req.startreq) @@ -318,7 +250,9 @@ class resplex(handler): os.write(self.cnpipe[1], b" ") req = None finally: - self.current.remove(th) + with self.lk: + self.current.remove(th) + self.tcond.notify_all() except closed: pass except: @@ -352,15 +286,22 @@ class resplex(handler): data = next(respiter) except StopIteration: rem = True - req.flushreq() + try: + req.flushreq() + except: + log.error("exception occurred when handling response data", exc_info=True) except: rem = True log.error("exception occurred when iterating response", exc_info=True) if not rem: if data: - req.flushreq() - req.writedata(data) - else: + try: + req.flushreq() + req.writedata(data) + except: + log.error("exception occurred when handling response data", exc_info=True) + rem = True + if rem: current[req] = None try: if hasattr(respiter, "close"): @@ -399,7 +340,7 @@ class resplex(handler): ckiter(req) except: log.critical("unexpected exception occurred in response handler thread", exc_info=True) - sys.exit(1) + os.abort() def close(self): while True: @@ -412,10 +353,10 @@ class resplex(handler): os.close(self.cnpipe[1]) self.rthread.join() -names = {"single": single, - "free": freethread, - "pool": threadpool, - "rplex": resplex} +names = {cls.cname: cls for cls in globals().values() if + isinstance(cls, type) and + issubclass(cls, handler) and + hasattr(cls, "cname")} def parsehspec(spec): if ":" not in spec: