python: Start rewriting WSGI handlers with modular handling models.
authorFredrik Tolf <fredrik@dolda2000.com>
Sun, 5 Jan 2014 07:20:25 +0000 (08:20 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Sun, 5 Jan 2014 07:20:25 +0000 (08:20 +0100)
python3/ashd-wsgi3
python3/ashd/serve.py

index ba7038d..79dda81 100755 (executable)
@@ -1,6 +1,6 @@
 #!/usr/bin/python3
 
-import sys, os, getopt, threading, logging, time, locale, collections
+import sys, os, getopt, threading, socket, logging, time, locale, collections
 import ashd.proto, ashd.util, ashd.perf, ashd.serve
 try:
     import pdm.srv
@@ -156,47 +156,41 @@ def recode(thing):
     else:
         return str(thing).encode("latin-1")
 
-class reqthread(ashd.serve.wsgithread):
-    def __init__(self, req):
-        super().__init__()
-        self.req = req.dup()
+reqhandler = ashd.serve.freethread()
 
-    def handlewsgi(self):
-        return handler(self.env, self.startreq)
+class request(ashd.serve.wsgirequest):
+    def __init__(self, *, bkreq, **kw):
+        super().__init__(**kw)
+        self.bkreq = bkreq.dup()
+
+    def mkenv(self):
+        return mkenv(self.bkreq)
+
+    def handlewsgi(self, env, startreq):
+        return handler(env, startreq)
+
+    def fileno(self):
+        return self.bkreq.bsk.fileno()
 
     def writehead(self, status, headers):
-        buf = bytearray()
-        buf += b"HTTP/1.1 " + recode(status) + b"\n"
+        w = self.buffer.extend
+        w(b"HTTP/1.1 " + recode(status) + b"\n")
         for nm, val in headers:
-            buf += recode(nm) + b": " + recode(val) + b"\n"
-        buf += b"\n"
-        try:
-            self.req.sk.write(buf)
-        except IOError:
-            raise ashd.serve.closed()
+            w(recode(nm) + b": " + recode(val) + b"\n")
+        w(b"\n")
 
-    def writedata(self, data):
+    def flush(self):
         try:
-            self.req.sk.write(data)
-            self.req.sk.flush()
+            ret = self.bkreq.bsk.send(self.buffer, socket.MSG_DONTWAIT)
+            self.buffer[:ret] = b""
         except IOError:
             raise ashd.serve.closed()
 
-    def handle(self):
-        self.env = mkenv(self.req)
-        with ashd.perf.request(self.env) as reqevent:
-            super().handle()
-            if self.status:
-                reqevent.response([self.status, self.headers])
-    
-    def run(self):
-        try:
-            guard(super().run)
-        finally:
-            self.req.close()
-            sys.stderr.flush()
-    
+    def close(self):
+        self.bkreq.close()
+
 def handle(req):
-    reqthread(req).start()
+    reqhandler.handle(request(bkreq=req, handler=reqhandler))
 
 ashd.util.serveloop(handle)
+reqhandler.close()
index fe839a2..30c835e 100644 (file)
@@ -1,4 +1,5 @@
-import os, threading, time, logging
+import sys, os, threading, time, logging, select
+from . import perf
 
 log = logging.getLogger("ashd.serve")
 seq = 1
@@ -11,44 +12,36 @@ def reqseq():
         seq += 1
         return s
 
-class reqthread(threading.Thread):
-    def __init__(self, name=None):
-        if name is None:
-            name = "Request handler %i" % reqseq()
-        super().__init__(name=name)
-
-    def handle(self):
-        raise Exception()
-
-    def run(self):
-        try:
-            self.handle()
-        except:
-            log.error("exception occurred when handling request", exc_info=True)
-
 class closed(IOError):
     def __init__(self):
         super().__init__("The client has closed the connection.")
 
-class wsgithread(reqthread):
-    def __init__(self, **kwargs):
-        super().__init__(**kwargs)
+class reqthread(threading.Thread):
+    def __init__(self, *, name=None, **kw):
+        if name is None:
+            name = "Request handler %i" % reqseq()
+        super().__init__(name=name, **kw)
+
+class wsgirequest(object):
+    def __init__(self, handler):
         self.status = None
         self.headers = []
         self.respsent = False
+        self.handler = handler
+        self.buffer = bytearray()
 
     def handlewsgi(self):
         raise Exception()
+    def fileno(self):
+        raise Exception()
     def writehead(self, status, headers):
         raise Exception()
-    def writedata(self, data):
+    def flush(self):
         raise Exception()
-
-    def write(self, data):
-        if not data:
-            return
-        self.flushreq()
-        self.writedata(data)
+    def close(self):
+        pass
+    def writedata(self, data):
+        self.buffer.extend(data)
 
     def flushreq(self):
         if not self.respsent:
@@ -57,64 +50,81 @@ class wsgithread(reqthread):
             self.respsent = True
             self.writehead(self.status, self.headers)
 
+    def write(self, data):
+        if not data:
+            return
+        self.flushreq()
+        self.writedata(data)
+        self.handler.ckflush(self)
+
     def startreq(self, status, headers, exc_info=None):
         if self.status:
-            if exc_info:                # Nice calling convetion ^^
+            if exc_info:
                 try:
                     if self.respsent:
                         raise exc_info[1]
                 finally:
-                    exc_info = None     # CPython GC bug?
+                    exc_info = None
             else:
                 raise Exception("Can only start responding once.")
         self.status = status
         self.headers = headers
         return self.write
-    def handle(self):
+
+class handler(object):
+    def handle(self, request):
+        raise Exception()
+    def ckflush(self, req):
+        raise Exception()
+    def close(self):
+        pass
+
+class freethread(handler):
+    def __init__(self, **kw):
+        super().__init__(**kw)
+        self.current = set()
+        self.lk = threading.Lock()
+
+    def handle(self, req):
+        reqthread(target=self.run, args=[req]).start()
+
+    def ckflush(self, req):
+        while len(req.buffer) > 0:
+            rls, wls, els = select.select([], [req], [req])
+            req.flush()
+
+    def run(self, req):
         try:
-            respiter = self.handlewsgi()
+            th = threading.current_thread()
+            with self.lk:
+                self.current.add(th)
             try:
-                for data in respiter:
-                    self.write(data)
-                if self.status:
-                    self.flushreq()
+                env = req.mkenv()
+                with perf.request(env) as reqevent:
+                    respiter = req.handlewsgi(env, req.startreq)
+                    for data in respiter:
+                        req.write(data)
+                    if req.status:
+                        reqevent.response([req.status, req.headers])
+                        req.flushreq()
+                    self.ckflush(req)
+            except closed:
+                pass
+            except:
+                log.error("exception occurred when handling request", exc_info=True)
             finally:
-                if hasattr(respiter, "close"):
-                    respiter.close()
-        except closed:
-            pass
-
-class calllimiter(object):
-    def __init__(self, limit):
-        self.limit = limit
-        self.lock = threading.Condition()
-        self.inflight = 0
-
-    def waited(self, time):
-        if time > 10:
-            raise RuntimeError("Waited too long")
-
-    def __enter__(self):
-        with self.lock:
-            start = time.time()
-            while self.inflight >= self.limit:
-                self.lock.wait(10)
-                self.waited(time.time() - start)
-            self.inflight += 1
-            return self
-
-    def __exit__(self, *excinfo):
-        with self.lock:
-            self.inflight -= 1
-            self.lock.notify()
-        return False
-
-    def call(self, target):
-        with self:
-            return target()
-
-class abortlimiter(calllimiter):
-    def waited(self, time):
-        if time > 10:
-            os.abort()
+                with self.lk:
+                    self.current.remove(th)
+        finally:
+            req.close()
+
+    def close(self):
+        while True:
+            with self.lk:
+                if len(self.current) > 0:
+                    th = next(iter(self.current))
+                else:
+                    th = None
+            th.join()
+
+names = {"free": freethread}