X-Git-Url: http://dolda2000.com/gitweb/?a=blobdiff_plain;f=src%2Fjagi%2Fscgi%2FEventServer.java;h=202b8ddad9f109b70bd9dff9e714fa5a292dcd25;hb=794a5c90597dcfeffafbc5976cde812072023d0a;hp=e2055c7d13e485e6cac0c94e959bcb452652bb95;hpb=644fbf48c22b6f6c3fbd8b1a867088aa7014e9d5;p=jagi.git diff --git a/src/jagi/scgi/EventServer.java b/src/jagi/scgi/EventServer.java index e2055c7..202b8dd 100644 --- a/src/jagi/scgi/EventServer.java +++ b/src/jagi/scgi/EventServer.java @@ -21,11 +21,6 @@ public class EventServer implements Runnable { tgt -> new Thread(tgt, "Request handler thread")); public EventServer(ServerSocketChannel sk, Function handler) { - try { - sk.configureBlocking(false); - } catch(IOException e) { - throw(new RuntimeException(e)); - } this.sk = sk; this.handler = handler; } @@ -152,12 +147,15 @@ public class EventServer implements Runnable { public final SocketChannel sk; public final WritableByteChannel out; private final ByteBuffer buf; + private final long max; private boolean eof = false; private double lastread; + private long cur = 0; - public TransferInput(SocketChannel sk, WritableByteChannel out) { + public TransferInput(SocketChannel sk, WritableByteChannel out, long max) { this.sk = sk; this.out = out; + this.max = max; buf = ByteBuffer.allocate(65536); buf.flip(); } @@ -171,11 +169,15 @@ public class EventServer implements Runnable { double now = Driver.current().time(); if((events & SelectionKey.OP_READ) != 0) { buf.rewind(); + if(buf.remaining() > max - cur) + buf.limit(buf.position() + (int)Math.min(max - cur, Integer.MAX_VALUE)); int rv = sk.read(buf); - if(rv < 0) + if(rv < 0) { eof = true; - else if(rv > 0) + } else if(rv > 0) { lastread = now; + cur += rv; + } buf.flip(); while(buf.remaining() > 0) out.write(buf); @@ -270,15 +272,27 @@ public class EventServer implements Runnable { switch(st) { case "feed-input": Object sink = resp.get("jagi.input-sink"); + Object clen = req.env.get("HTTP_CONTENT_LENGTH"); + long max = 0; + if(clen instanceof String) { + try { + max = Long.parseLong((String)clen); + } catch(NumberFormatException e) { + } + } if(sink instanceof WritableByteChannel) { - ev.add(new TransferInput(req.sk, (WritableByteChannel)sink).then(() -> submit(() -> handle(req, next)))); + ev.add(new TransferInput(req.sk, (WritableByteChannel)sink, max).then(() -> submit(() -> handle(req, next)))); } else if(sink instanceof OutputStream) { - ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink)).then(() -> submit(() -> handle(req, next)))); + ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink), max).then(() -> submit(() -> handle(req, next)))); } else { throw(new IllegalArgumentException("input-sink: " + sink)); } handoff = true; break; + case "chain": + submit(() -> handle(req, next)); + handoff = true; + break; default: throw(new IllegalArgumentException("jagi.status: " + st)); } @@ -320,6 +334,7 @@ public class EventServer implements Runnable { int headlen = 0; ByteBuffer head = null; Map env = null; + Request req = null; Client(SocketChannel sk) { this.sk = sk; @@ -387,8 +402,7 @@ public class EventServer implements Runnable { if((events & SelectionKey.OP_READ) != 0) { if((env == null) && !readhead()) return; - Request req = new Request(env, sk); - submit(() -> EventServer.this.handle(req, handler)); + req = new Request(env, sk); handoff = true; } if(Driver.current().time() > (lastread + timeout)) @@ -396,6 +410,8 @@ public class EventServer implements Runnable { } public void close() { + if(req != null) + submit(() -> EventServer.this.handle(req, handler)); if(!handoff) { try { sk.close(); @@ -412,11 +428,8 @@ public class EventServer implements Runnable { public int events() {return(SelectionKey.OP_ACCEPT);} public void handle(int events) throws IOException { - if((events & SelectionKey.OP_ACCEPT) != 0) { - SocketChannel cl = sk.accept(); - cl.configureBlocking(false); - Driver.current().add(new Client(cl)); - } + if((events & SelectionKey.OP_ACCEPT) != 0) + Driver.current().add(new Client(sk.accept())); } public void close() {