Handle unterminated streams in feed-input.
authorFredrik Tolf <fredrik@dolda2000.com>
Fri, 18 Feb 2022 02:57:27 +0000 (03:57 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Fri, 18 Feb 2022 02:57:27 +0000 (03:57 +0100)
src/jagi/scgi/EventServer.java

index e2055c7..1a6bc2a 100644 (file)
@@ -152,12 +152,15 @@ public class EventServer implements Runnable {
        public final SocketChannel sk;
        public final WritableByteChannel out;
        private final ByteBuffer buf;
+       private final long max;
        private boolean eof = false;
        private double lastread;
+       private long cur = 0;
 
-       public TransferInput(SocketChannel sk, WritableByteChannel out) {
+       public TransferInput(SocketChannel sk, WritableByteChannel out, long max) {
            this.sk = sk;
            this.out = out;
+           this.max = max;
            buf = ByteBuffer.allocate(65536);
            buf.flip();
        }
@@ -171,11 +174,15 @@ public class EventServer implements Runnable {
            double now = Driver.current().time();
            if((events & SelectionKey.OP_READ) != 0) {
                buf.rewind();
+               if(buf.remaining() > max - cur)
+                   buf.limit(buf.position() + (int)Math.min(max - cur, Integer.MAX_VALUE));
                int rv = sk.read(buf);
-               if(rv < 0)
+               if(rv < 0) {
                    eof = true;
-               else if(rv > 0)
+               } else if(rv > 0) {
                    lastread = now;
+                   cur += rv;
+               }
                buf.flip();
                while(buf.remaining() > 0)
                    out.write(buf);
@@ -270,15 +277,27 @@ public class EventServer implements Runnable {
                    switch(st) {
                    case "feed-input":
                        Object sink = resp.get("jagi.input-sink");
+                       Object clen = req.env.get("HTTP_CONTENT_LENGTH");
+                       long max = 0;
+                       if(clen instanceof String) {
+                           try {
+                               max = Long.parseLong((String)clen);
+                           } catch(NumberFormatException e) {
+                           }
+                       }
                        if(sink instanceof WritableByteChannel) {
-                           ev.add(new TransferInput(req.sk, (WritableByteChannel)sink).then(() -> submit(() -> handle(req, next))));
+                           ev.add(new TransferInput(req.sk, (WritableByteChannel)sink, max).then(() -> submit(() -> handle(req, next))));
                        } else if(sink instanceof OutputStream) {
-                           ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink)).then(() -> submit(() -> handle(req, next))));
+                           ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink), max).then(() -> submit(() -> handle(req, next))));
                        } else {
                            throw(new IllegalArgumentException("input-sink: " + sink));
                        }
                        handoff = true;
                        break;
+                   case "chain":
+                       submit(() -> handle(req, next));
+                       handoff = true;
+                       break;
                    default:
                        throw(new IllegalArgumentException("jagi.status: " + st));
                    }