Commit | Line | Data |
---|---|---|
965619c0 FT |
1 | package jagi.scgi; |
2 | ||
3 | import jagi.*; | |
4 | import jagi.event.*; | |
5 | import java.util.*; | |
6 | import java.util.function.*; | |
7 | import java.util.concurrent.*; | |
8 | import java.util.logging.*; | |
9 | import java.io.*; | |
10 | import java.nio.*; | |
11 | import java.nio.channels.*; | |
12 | ||
13 | public class EventServer implements Runnable { | |
14 | private static final double timeout = 5; | |
15 | private static final Logger log = Logger.getLogger("jagi.server"); | |
16 | private final ServerSocketChannel sk; | |
17 | private final Function handler; | |
18 | private final Driver ev = Driver.get(); | |
19 | private final ExecutorService handlers = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors() * 2, | |
20 | 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(64), | |
21 | tgt -> new Thread(tgt, "Request handler thread")); | |
22 | ||
23 | public EventServer(ServerSocketChannel sk, Function handler) { | |
24 | try { | |
25 | sk.configureBlocking(false); | |
26 | } catch(IOException e) { | |
27 | throw(new RuntimeException(e)); | |
28 | } | |
29 | this.sk = sk; | |
30 | this.handler = handler; | |
31 | } | |
32 | ||
33 | public static class Request { | |
34 | public final Map<Object, Object> env; | |
35 | public final SocketChannel sk; | |
36 | ||
37 | public Request(Map<Object, Object> env, SocketChannel sk) { | |
38 | this.env = env; | |
39 | this.sk = sk; | |
40 | } | |
41 | ||
42 | public void close() { | |
43 | ArrayList<Object> cleanup = new ArrayList<>((Collection<?>)env.get("jagi.cleanup")); | |
44 | cleanup.add(sk); | |
45 | RuntimeException ce = null; | |
46 | for(Object obj : cleanup) { | |
47 | if(obj instanceof AutoCloseable) { | |
48 | try { | |
49 | ((AutoCloseable)obj).close(); | |
50 | } catch(Exception e) { | |
51 | if(ce == null) | |
52 | ce = new RuntimeException("error(s) occurred during cleanup"); | |
53 | ce.addSuppressed(e); | |
54 | } | |
55 | } | |
56 | } | |
57 | if(ce != null) | |
58 | throw(ce); | |
59 | } | |
60 | } | |
61 | ||
62 | protected void error(Request req, Throwable error) { | |
63 | log.log(Level.WARNING, "uncaught exception while handling request", error); | |
64 | } | |
65 | ||
66 | public static abstract class ChainWatcher implements Watcher { | |
67 | private Runnable then; | |
68 | public ChainWatcher then(Runnable then) {this.then = then; return(this);} | |
69 | ||
70 | public void close() { | |
71 | if(then != null) | |
72 | then.run(); | |
73 | } | |
74 | } | |
75 | ||
76 | public static class BufferedOutput extends ChainWatcher { | |
77 | public final SocketChannel sk; | |
78 | public final ByteBuffer buf; | |
79 | private double lastwrite; | |
80 | ||
81 | public BufferedOutput(SocketChannel sk, ByteBuffer buf) { | |
82 | this.sk = sk; | |
83 | this.buf = buf; | |
84 | } | |
85 | ||
86 | public void added(Driver d) {lastwrite = d.time();} | |
87 | public SelectableChannel channel() {return(sk);} | |
88 | public int events() {return((buf.remaining() > 0) ? SelectionKey.OP_WRITE : -1);} | |
89 | public double timeout() {return(lastwrite + timeout);} | |
90 | ||
91 | public void handle(int events) throws IOException { | |
92 | double now = Driver.current().time(); | |
93 | if((events & SelectionKey.OP_WRITE) != 0) { | |
94 | if(sk.write(buf) > 0) | |
95 | lastwrite = now; | |
96 | } | |
97 | if(now > lastwrite + timeout) | |
98 | buf.position(buf.limit()); | |
99 | } | |
100 | } | |
101 | ||
102 | public static class TransferOutput extends ChainWatcher { | |
103 | public final SocketChannel sk; | |
104 | public final ReadableByteChannel in; | |
105 | private final ByteBuffer buf; | |
106 | private boolean eof = false; | |
107 | private double lastwrite; | |
108 | ||
109 | public TransferOutput(SocketChannel sk, ReadableByteChannel in) { | |
110 | this.sk = sk; | |
111 | this.in = in; | |
112 | buf = ByteBuffer.allocate(65536); | |
113 | buf.flip(); | |
114 | } | |
115 | ||
116 | public void added(Driver d) {lastwrite = d.time();} | |
117 | public SelectableChannel channel() {return(sk);} | |
118 | public int events() {return((eof && (buf.remaining() == 0)) ? -1 : SelectionKey.OP_WRITE);} | |
119 | public double timeout() {return(lastwrite + timeout);} | |
120 | ||
121 | public void handle(int events) throws IOException { | |
122 | if(!eof && (buf.remaining() == 0)) { | |
123 | buf.rewind(); | |
124 | while(buf.remaining() > 0) { | |
125 | if(in.read(buf) < 0) | |
126 | break; | |
127 | } | |
128 | } | |
129 | double now = Driver.current().time(); | |
130 | if((events & SelectionKey.OP_WRITE) != 0) { | |
131 | if(sk.write(buf) > 0) | |
132 | lastwrite = now; | |
133 | } | |
134 | if(now > lastwrite + timeout) { | |
135 | eof = true; | |
136 | buf.position(buf.limit()); | |
137 | } | |
138 | } | |
139 | ||
140 | public void close() { | |
141 | try { | |
142 | in.close(); | |
143 | } catch(IOException e) { | |
144 | log.log(Level.WARNING, "failed to close transfer channel: " + in, e); | |
145 | } finally { | |
146 | super.close(); | |
147 | } | |
148 | } | |
149 | } | |
150 | ||
151 | public static class TransferInput extends ChainWatcher { | |
152 | public final SocketChannel sk; | |
153 | public final WritableByteChannel out; | |
154 | private final ByteBuffer buf; | |
155 | private boolean eof = false; | |
156 | private double lastread; | |
157 | ||
158 | public TransferInput(SocketChannel sk, WritableByteChannel out) { | |
159 | this.sk = sk; | |
160 | this.out = out; | |
161 | buf = ByteBuffer.allocate(65536); | |
162 | buf.flip(); | |
163 | } | |
164 | ||
165 | public void added(Driver d) {lastread = d.time();} | |
166 | public SelectableChannel channel() {return(sk);} | |
167 | public int events() {return(eof ? -1 : SelectionKey.OP_READ);} | |
168 | public double timeout() {return(lastread + timeout);} | |
169 | ||
170 | public void handle(int events) throws IOException { | |
171 | double now = Driver.current().time(); | |
172 | if((events & SelectionKey.OP_READ) != 0) { | |
173 | buf.rewind(); | |
174 | int rv = sk.read(buf); | |
175 | if(rv < 0) | |
176 | eof = true; | |
177 | else if(rv > 0) | |
178 | lastread = now; | |
179 | buf.flip(); | |
180 | while(buf.remaining() > 0) | |
181 | out.write(buf); | |
182 | } | |
183 | if(now > lastread + timeout) { | |
184 | eof = true; | |
185 | buf.position(buf.limit()); | |
186 | } | |
187 | } | |
188 | ||
189 | public void close() { | |
190 | try { | |
191 | out.close(); | |
192 | } catch(IOException e) { | |
193 | log.log(Level.WARNING, "failed to close transfer channel: " + out, e); | |
194 | } finally { | |
195 | super.close(); | |
196 | } | |
197 | } | |
198 | } | |
199 | ||
200 | protected void respond(Request req, String status, Map resp) { | |
201 | Object output = resp.get("jagi.output"); | |
202 | ByteArrayOutputStream buf = new ByteArrayOutputStream(); | |
203 | try { | |
204 | Writer head = new OutputStreamWriter(buf, Utils.UTF8); | |
205 | head.write("Status: "); | |
206 | head.write(status); | |
207 | head.write("\n"); | |
208 | for(Iterator it = resp.entrySet().iterator(); it.hasNext();) { | |
209 | Map.Entry ent = (Map.Entry)it.next(); | |
210 | Object val = ent.getValue(); | |
211 | if((ent.getKey() instanceof String) && (val != null)) { | |
212 | String key = (String)ent.getKey(); | |
213 | if(key.startsWith("http.")) { | |
214 | String nm = key.substring(5); | |
215 | if(nm.equalsIgnoreCase("status")) | |
216 | continue; | |
217 | if(val instanceof Collection) { | |
218 | for(Object part : (Collection)val) { | |
219 | head.write(nm); | |
220 | head.write(": "); | |
221 | head.write(part.toString()); | |
222 | head.write("\n"); | |
223 | } | |
224 | } else { | |
225 | head.write(nm); | |
226 | head.write(": "); | |
227 | head.write(val.toString()); | |
228 | head.write("\n"); | |
229 | } | |
230 | } | |
231 | } | |
232 | } | |
233 | head.write("\n"); | |
234 | head.flush(); | |
235 | } catch(IOException e) { | |
236 | throw(new RuntimeException("cannot happen")); | |
237 | } | |
238 | ChainWatcher out; | |
239 | if(output == null) { | |
240 | out = new BufferedOutput(req.sk, ByteBuffer.allocate(0)); | |
241 | } else if(output instanceof byte[]) { | |
242 | out = new BufferedOutput(req.sk, ByteBuffer.wrap((byte[])output)); | |
243 | } else if(output instanceof ByteBuffer) { | |
244 | out = new BufferedOutput(req.sk, (ByteBuffer)output); | |
245 | } else if(output instanceof String) { | |
246 | out = new BufferedOutput(req.sk, ByteBuffer.wrap(((String)output).getBytes(Utils.UTF8))); | |
247 | } else if(output instanceof CharSequence) { | |
248 | out = new BufferedOutput(req.sk, Utils.UTF8.encode(CharBuffer.wrap((CharSequence)output))); | |
249 | } else if(output instanceof InputStream) { | |
250 | out = new TransferOutput(req.sk, Channels.newChannel((InputStream)output)); | |
251 | } else if(output instanceof ReadableByteChannel) { | |
252 | out = new TransferOutput(req.sk, (ReadableByteChannel)output); | |
253 | } else { | |
254 | throw(new IllegalArgumentException("response-body: " + output)); | |
255 | } | |
256 | out.then(() -> submit(req::close)); | |
257 | ev.add(new BufferedOutput(req.sk, ByteBuffer.wrap(buf.toByteArray())).then(() -> ev.add(out))); | |
258 | } | |
259 | ||
260 | @SuppressWarnings("unchecked") | |
261 | protected void handle(Request req, Function handler) { | |
262 | boolean handoff = false; | |
263 | try { | |
264 | Throwable error = null; | |
265 | try { | |
266 | Map resp = (Map)handler.apply(req.env); | |
267 | String st; | |
268 | if((st = (String)resp.get("jagi.status")) != null) { | |
269 | Function next = (Function)resp.get("jagi.next"); | |
270 | switch(st) { | |
271 | case "feed-input": | |
272 | Object sink = resp.get("jagi.input-sink"); | |
273 | if(sink instanceof WritableByteChannel) { | |
274 | ev.add(new TransferInput(req.sk, (WritableByteChannel)sink).then(() -> submit(() -> handle(req, next)))); | |
275 | } else if(sink instanceof OutputStream) { | |
276 | ev.add(new TransferInput(req.sk, Channels.newChannel((OutputStream)sink)).then(() -> submit(() -> handle(req, next)))); | |
277 | } else { | |
278 | throw(new IllegalArgumentException("input-sink: " + sink)); | |
279 | } | |
280 | handoff = true; | |
281 | break; | |
282 | default: | |
283 | throw(new IllegalArgumentException("jagi.status: " + st)); | |
284 | } | |
285 | } else if((st = (String)resp.get("http.status")) != null) { | |
286 | respond(req, st, resp); | |
287 | handoff = true; | |
644fbf48 FT |
288 | } else { |
289 | throw(new IllegalArgumentException("neither http.status nor jagi.status set")); | |
965619c0 FT |
290 | } |
291 | } catch(Throwable t) { | |
292 | error = t; | |
293 | throw(t); | |
294 | } finally { | |
295 | if(!handoff) { | |
296 | try { | |
297 | req.close(); | |
298 | } catch(Throwable ce) { | |
299 | if(error == null) { | |
300 | throw(ce); | |
301 | } else { | |
302 | error.addSuppressed(ce); | |
303 | } | |
304 | } | |
305 | } | |
306 | } | |
307 | } catch(Throwable t) { | |
308 | error(req, t); | |
309 | } | |
310 | } | |
311 | ||
312 | protected void submit(Runnable task) { | |
313 | handlers.submit(task); | |
314 | } | |
315 | ||
316 | class Client implements Watcher { | |
317 | final SocketChannel sk; | |
318 | double lastread; | |
319 | boolean eof = false, handoff = false; | |
320 | int headlen = 0; | |
321 | ByteBuffer head = null; | |
322 | Map<Object, Object> env = null; | |
323 | ||
324 | Client(SocketChannel sk) { | |
325 | this.sk = sk; | |
326 | } | |
327 | ||
328 | public void added(Driver d) {lastread = d.time();} | |
329 | public SelectableChannel channel() {return(sk);} | |
330 | public double timeout() {return(lastread + timeout);} | |
331 | public int events() { | |
332 | if(eof) | |
333 | return(-1); | |
334 | if(env == null) | |
335 | return(SelectionKey.OP_READ); | |
336 | return(-1); | |
337 | } | |
338 | ||
339 | boolean readhead() throws IOException { | |
340 | if(head == null) { | |
341 | ByteBuffer buf = ByteBuffer.allocate(1); | |
342 | while(true) { | |
343 | buf.rewind(); | |
344 | int rv = sk.read(buf); | |
345 | if(rv < 0) { | |
346 | eof = true; | |
347 | return(false); | |
348 | } else if(rv == 0) { | |
349 | return(false); | |
350 | } else { | |
351 | lastread = Driver.current().time(); | |
352 | int c = buf.get(0); | |
353 | if((c >= '0') && (c <= '9')) { | |
354 | headlen = (headlen * 10) + (c - '0'); | |
355 | } else if(c == ':') { | |
356 | head = ByteBuffer.allocate(headlen + 1); | |
357 | break; | |
358 | } else { | |
359 | eof = true; | |
360 | return(false); | |
361 | } | |
362 | } | |
363 | } | |
364 | } | |
365 | while(true) { | |
366 | if(head.remaining() == 0) { | |
367 | if(head.get(head.limit() - 1) != ',') { | |
368 | /* Unterminated netstring */ | |
369 | eof = true; | |
370 | return(false); | |
371 | } | |
372 | head.limit(head.limit() - 1); | |
373 | env = Jagi.mkenv(Scgi.splithead(head), sk); | |
374 | return(true); | |
375 | } | |
376 | int rv = sk.read(head); | |
377 | if(rv < 0) { | |
378 | eof = true; | |
379 | return(false); | |
380 | } else if(rv == 0) { | |
381 | return(false); | |
382 | } | |
383 | } | |
384 | } | |
385 | ||
386 | public void handle(int events) throws IOException { | |
387 | if((events & SelectionKey.OP_READ) != 0) { | |
388 | if((env == null) && !readhead()) | |
389 | return; | |
390 | Request req = new Request(env, sk); | |
391 | submit(() -> EventServer.this.handle(req, handler)); | |
392 | handoff = true; | |
393 | } | |
394 | if(Driver.current().time() > (lastread + timeout)) | |
395 | eof = true; | |
396 | } | |
397 | ||
398 | public void close() { | |
399 | if(!handoff) { | |
400 | try { | |
401 | sk.close(); | |
402 | } catch(IOException e) { | |
403 | } | |
404 | } | |
405 | } | |
406 | } | |
407 | ||
408 | class Accepter implements Watcher { | |
409 | boolean closed = false; | |
410 | ||
411 | public SelectableChannel channel() {return(sk);} | |
412 | public int events() {return(SelectionKey.OP_ACCEPT);} | |
413 | ||
414 | public void handle(int events) throws IOException { | |
415 | if((events & SelectionKey.OP_ACCEPT) != 0) { | |
416 | SocketChannel cl = sk.accept(); | |
417 | cl.configureBlocking(false); | |
418 | Driver.current().add(new Client(cl)); | |
419 | } | |
420 | } | |
421 | ||
422 | public void close() { | |
423 | synchronized(this) { | |
424 | closed = true; | |
425 | notifyAll(); | |
426 | } | |
427 | } | |
428 | } | |
429 | ||
430 | public void run() { | |
431 | Accepter main = new Accepter(); | |
432 | ev.add(main); | |
433 | try { | |
434 | synchronized(main) { | |
435 | while(!main.closed) { | |
436 | main.wait(); | |
437 | } | |
438 | } | |
439 | } catch(InterruptedException e) { | |
440 | ev.remove(main); | |
441 | } finally { | |
442 | try { | |
443 | sk.close(); | |
444 | } catch(IOException e) { | |
445 | throw(new RuntimeException(e)); | |
446 | } | |
447 | } | |
448 | } | |
449 | } |