6 import java.util.function.*;
7 import java.util.concurrent.*;
8 import java.util.logging.*;
11 import java.nio.channels.*;
13 public class EventServer implements Runnable {
14 private static final double timeout = 5;
15 private static final Logger log = Logger.getLogger("jagi.server");
16 private final ServerSocketChannel sk;
17 private final Function handler;
18 private final Driver ev = Driver.get();
19 private final ExecutorService handlers = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors() * 2,
20 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(64),
21 tgt -> new Thread(tgt, "Request handler thread"));
23 public EventServer(ServerSocketChannel sk, Function handler) {
25 this.handler = handler;
28 public static class Request {
29 public final Map<Object, Object> env;
30 public final SocketChannel sk;
32 public Request(Map<Object, Object> env, SocketChannel sk) {
38 ArrayList<Object> cleanup = new ArrayList<>((Collection<?>)env.get("jagi.cleanup"));
40 RuntimeException ce = null;
41 for(Object obj : cleanup) {
42 if(obj instanceof AutoCloseable) {
44 ((AutoCloseable)obj).close();
45 } catch(Exception e) {
47 ce = new RuntimeException("error(s) occurred during cleanup");
57 protected void error(Request req, Throwable error) {
58 log.log(Level.WARNING, "uncaught exception while handling request", error);
61 public static abstract class ChainWatcher implements Watcher {
62 private Runnable then;
63 public ChainWatcher then(Runnable then) {this.then = then; return(this);}
71 public static class BufferedOutput extends ChainWatcher {
72 public final SocketChannel sk;
73 public final ByteBuffer buf;
74 private double lastwrite;
76 public BufferedOutput(SocketChannel sk, ByteBuffer buf) {
81 public void added(Driver d) {lastwrite = d.time();}
82 public SelectableChannel channel() {return(sk);}
83 public int events() {return((buf.remaining() > 0) ? SelectionKey.OP_WRITE : -1);}
84 public double timeout() {return(lastwrite + timeout);}
86 public void handle(int events) throws IOException {
87 double now = Driver.current().time();
88 if((events & SelectionKey.OP_WRITE) != 0) {
92 if(now > lastwrite + timeout)
93 buf.position(buf.limit());
97 public static class TransferOutput extends ChainWatcher {
98 public final SocketChannel sk;
99 public final ReadableByteChannel in;
100 private final ByteBuffer buf;
101 private boolean eof = false;
102 private double lastwrite;
104 public TransferOutput(SocketChannel sk, ReadableByteChannel in) {
107 buf = ByteBuffer.allocate(65536);
111 public void added(Driver d) {lastwrite = d.time();}
112 public SelectableChannel channel() {return(sk);}
113 public int events() {return((eof && (buf.remaining() == 0)) ? -1 : SelectionKey.OP_WRITE);}
114 public double timeout() {return(lastwrite + timeout);}
116 public void handle(int events) throws IOException {
117 if(!eof && (buf.remaining() == 0)) {
119 while(buf.remaining() > 0) {
124 double now = Driver.current().time();
125 if((events & SelectionKey.OP_WRITE) != 0) {
126 if(sk.write(buf) > 0)
129 if(now > lastwrite + timeout) {
131 buf.position(buf.limit());
135 public void close() {
138 } catch(IOException e) {
139 log.log(Level.WARNING, "failed to close transfer channel: " + in, e);
146 public static class TransferInput extends ChainWatcher {
147 public final SocketChannel sk;
148 public final WritableByteChannel out;
149 private final ByteBuffer buf;
150 private final long max;
151 private boolean eof = false;
152 private double lastread;
153 private long cur = 0;
155 public TransferInput(SocketChannel sk, WritableByteChannel out, long max) {
159 buf = ByteBuffer.allocate(65536);
163 public void added(Driver d) {lastread = d.time();}
164 public SelectableChannel channel() {return(sk);}
165 public int events() {return(eof ? -1 : SelectionKey.OP_READ);}
166 public double timeout() {return(lastread + timeout);}
168 public void handle(int events) throws IOException {
169 double now = Driver.current().time();
170 if((events & SelectionKey.OP_READ) != 0) {
172 if(buf.remaining() > max - cur)
173 buf.limit(buf.position() + (int)Math.min(max - cur, Integer.MAX_VALUE));
174 int rv = sk.read(buf);
179 if((cur += rv) >= max)
183 while(buf.remaining() > 0)
186 if(now > lastread + timeout) {
188 buf.position(buf.limit());
192 public void close() {
195 } catch(IOException e) {
196 log.log(Level.WARNING, "failed to close transfer channel: " + out, e);
203 protected void respond(Request req, String status, Map resp) {
204 Object output = resp.get("jagi.output");
205 ByteArrayOutputStream buf = new ByteArrayOutputStream();
207 Writer head = new OutputStreamWriter(buf, Utils.UTF8);
208 head.write("Status: ");
211 for(Iterator it = resp.entrySet().iterator(); it.hasNext();) {
212 Map.Entry ent = (Map.Entry)it.next();
213 Object val = ent.getValue();
214 if((ent.getKey() instanceof String) && (val != null)) {
215 String key = (String)ent.getKey();
216 if(key.startsWith("http.")) {
217 String nm = key.substring(5);
218 if(nm.equalsIgnoreCase("status"))
220 if(val instanceof Collection) {
221 for(Object part : (Collection)val) {
224 head.write(part.toString());
230 head.write(val.toString());
238 } catch(IOException e) {
239 throw(new RuntimeException("cannot happen"));
243 out = new BufferedOutput(req.sk, ByteBuffer.allocate(0));
244 } else if(output instanceof byte[]) {
245 out = new BufferedOutput(req.sk, ByteBuffer.wrap((byte[])output));
246 } else if(output instanceof ByteBuffer) {
247 out = new BufferedOutput(req.sk, (ByteBuffer)output);
248 } else if(output instanceof String) {
249 out = new BufferedOutput(req.sk, ByteBuffer.wrap(((String)output).getBytes(Utils.UTF8)));
250 } else if(output instanceof CharSequence) {
251 out = new BufferedOutput(req.sk, Utils.UTF8.encode(CharBuffer.wrap((CharSequence)output)));
252 } else if(output instanceof InputStream) {
253 out = new TransferOutput(req.sk, Channels.newChannel((InputStream)output));
254 } else if(output instanceof ReadableByteChannel) {
255 out = new TransferOutput(req.sk, (ReadableByteChannel)output);
257 throw(new IllegalArgumentException("response-body: " + output));
259 out.then(() -> submit(req::close));
260 ev.add(new BufferedOutput(req.sk, ByteBuffer.wrap(buf.toByteArray())).then(() -> ev.add(out)));
263 @SuppressWarnings("unchecked")
264 protected void handle(Request req, Function handler) {
265 boolean handoff = false;
267 Throwable error = null;
269 Map resp = (Map)handler.apply(req.env);
271 if((st = (String)resp.get("jagi.status")) != null) {
272 Function next = (Function)resp.get("jagi.next");
275 Object sink = resp.get("jagi.input-sink");
276 Object clen = req.env.get("HTTP_CONTENT_LENGTH");
278 if(clen instanceof String) {
280 max = Long.parseLong((String)clen);
281 } catch(NumberFormatException e) {
284 if(sink instanceof WritableByteChannel) {
285 ev.add(new TransferInput(req.sk, (WritableByteChannel)sink, max).then(() -> submit(() -> handle(req, next))));
286 } else if(sink instanceof OutputStream) {
287 ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink), max).then(() -> submit(() -> handle(req, next))));
289 throw(new IllegalArgumentException("input-sink: " + sink));
294 submit(() -> handle(req, next));
298 throw(new IllegalArgumentException("jagi.status: " + st));
300 } else if((st = (String)resp.get("http.status")) != null) {
301 respond(req, st, resp);
304 throw(new IllegalArgumentException("neither http.status nor jagi.status set"));
306 } catch(Throwable t) {
313 } catch(Throwable ce) {
317 error.addSuppressed(ce);
322 } catch(Throwable t) {
327 protected void submit(Runnable task) {
328 handlers.submit(task);
331 class Client implements Watcher {
332 final SocketChannel sk;
334 boolean eof = false, handoff = false;
336 ByteBuffer head = null;
337 Map<Object, Object> env = null;
340 Client(SocketChannel sk) {
344 public void added(Driver d) {lastread = d.time();}
345 public SelectableChannel channel() {return(sk);}
346 public double timeout() {return(lastread + timeout);}
347 public int events() {
351 return(SelectionKey.OP_READ);
355 boolean readhead() throws IOException {
357 ByteBuffer buf = ByteBuffer.allocate(1);
360 int rv = sk.read(buf);
367 lastread = Driver.current().time();
369 if((c >= '0') && (c <= '9')) {
370 headlen = (headlen * 10) + (c - '0');
371 } else if(c == ':') {
372 head = ByteBuffer.allocate(headlen + 1);
382 if(head.remaining() == 0) {
383 if(head.get(head.limit() - 1) != ',') {
384 /* Unterminated netstring */
388 head.limit(head.limit() - 1);
389 env = Jagi.mkenv(Scgi.splithead(head), sk);
392 int rv = sk.read(head);
402 public void handle(int events) throws IOException {
403 if((events & SelectionKey.OP_READ) != 0) {
404 if((env == null) && !readhead())
406 req = new Request(env, sk);
409 if(Driver.current().time() > (lastread + timeout))
413 public void close() {
415 submit(() -> EventServer.this.handle(req, handler));
419 } catch(IOException e) {
425 class Accepter implements Watcher {
426 boolean closed = false;
428 public SelectableChannel channel() {return(sk);}
429 public int events() {return(SelectionKey.OP_ACCEPT);}
431 public void handle(int events) throws IOException {
432 if((events & SelectionKey.OP_ACCEPT) != 0)
433 Driver.current().add(new Client(sk.accept()));
436 public void close() {
445 Accepter main = new Accepter();
449 while(!main.closed) {
453 } catch(InterruptedException e) {
458 } catch(IOException e) {
459 throw(new RuntimeException(e));