Ensure the client watcher has been properly closed before handling its constructed...
[jagi.git] / src / jagi / scgi / EventServer.java
index 3df6c03..202b8dd 100644 (file)
@@ -21,11 +21,6 @@ public class EventServer implements Runnable {
                                                                    tgt -> new Thread(tgt, "Request handler thread"));
 
     public EventServer(ServerSocketChannel sk, Function handler) {
-       try {
-           sk.configureBlocking(false);
-       } catch(IOException e) {
-           throw(new RuntimeException(e));
-       }
        this.sk = sk;
        this.handler = handler;
     }
@@ -152,12 +147,15 @@ public class EventServer implements Runnable {
        public final SocketChannel sk;
        public final WritableByteChannel out;
        private final ByteBuffer buf;
+       private final long max;
        private boolean eof = false;
        private double lastread;
+       private long cur = 0;
 
-       public TransferInput(SocketChannel sk, WritableByteChannel out) {
+       public TransferInput(SocketChannel sk, WritableByteChannel out, long max) {
            this.sk = sk;
            this.out = out;
+           this.max = max;
            buf = ByteBuffer.allocate(65536);
            buf.flip();
        }
@@ -171,11 +169,15 @@ public class EventServer implements Runnable {
            double now = Driver.current().time();
            if((events & SelectionKey.OP_READ) != 0) {
                buf.rewind();
+               if(buf.remaining() > max - cur)
+                   buf.limit(buf.position() + (int)Math.min(max - cur, Integer.MAX_VALUE));
                int rv = sk.read(buf);
-               if(rv < 0)
+               if(rv < 0) {
                    eof = true;
-               else if(rv > 0)
+               } else if(rv > 0) {
                    lastread = now;
+                   cur += rv;
+               }
                buf.flip();
                while(buf.remaining() > 0)
                    out.write(buf);
@@ -270,21 +272,35 @@ public class EventServer implements Runnable {
                    switch(st) {
                    case "feed-input":
                        Object sink = resp.get("jagi.input-sink");
+                       Object clen = req.env.get("HTTP_CONTENT_LENGTH");
+                       long max = 0;
+                       if(clen instanceof String) {
+                           try {
+                               max = Long.parseLong((String)clen);
+                           } catch(NumberFormatException e) {
+                           }
+                       }
                        if(sink instanceof WritableByteChannel) {
-                           ev.add(new TransferInput(req.sk, (WritableByteChannel)sink).then(() -> submit(() -> handle(req, next))));
+                           ev.add(new TransferInput(req.sk, (WritableByteChannel)sink, max).then(() -> submit(() -> handle(req, next))));
                        } else if(sink instanceof OutputStream) {
-                           ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink)).then(() -> submit(() -> handle(req, next))));
+                           ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink), max).then(() -> submit(() -> handle(req, next))));
                        } else {
                            throw(new IllegalArgumentException("input-sink: " + sink));
                        }
                        handoff = true;
                        break;
+                   case "chain":
+                       submit(() -> handle(req, next));
+                       handoff = true;
+                       break;
                    default:
                        throw(new IllegalArgumentException("jagi.status: " + st));
                    }
                } else if((st = (String)resp.get("http.status")) != null) {
                    respond(req, st, resp);
                    handoff = true;
+               } else {
+                   throw(new IllegalArgumentException("neither http.status nor jagi.status set"));
                }
            } catch(Throwable t) {
                error = t;
@@ -318,6 +334,7 @@ public class EventServer implements Runnable {
        int headlen = 0;
        ByteBuffer head = null;
        Map<Object, Object> env = null;
+       Request req = null;
 
        Client(SocketChannel sk) {
            this.sk = sk;
@@ -385,8 +402,7 @@ public class EventServer implements Runnable {
            if((events & SelectionKey.OP_READ) != 0) {
                if((env == null) && !readhead())
                    return;
-               Request req = new Request(env, sk);
-               submit(() -> EventServer.this.handle(req, handler));
+               req = new Request(env, sk);
                handoff = true;
            }
            if(Driver.current().time() > (lastread + timeout))
@@ -394,6 +410,8 @@ public class EventServer implements Runnable {
        }
 
        public void close() {
+           if(req != null)
+               submit(() -> EventServer.this.handle(req, handler));
            if(!handoff) {
                try {
                    sk.close();
@@ -410,11 +428,8 @@ public class EventServer implements Runnable {
        public int events() {return(SelectionKey.OP_ACCEPT);}
 
        public void handle(int events) throws IOException {
-           if((events & SelectionKey.OP_ACCEPT) != 0) {
-               SocketChannel cl = sk.accept();
-               cl.configureBlocking(false);
-               Driver.current().add(new Client(cl));
-           }
+           if((events & SelectionKey.OP_ACCEPT) != 0)
+               Driver.current().add(new Client(sk.accept()));
        }
 
        public void close() {