Configure channel blocking as part of adding to the event-loop.
[jagi.git] / src / jagi / event / Driver.java
1 package jagi.event;
2
3 import java.util.*;
4 import java.util.logging.*;
5 import java.util.concurrent.*;
6 import java.io.*;
7 import java.nio.*;
8 import java.nio.channels.*;
9 import java.nio.channels.spi.*;
10
11 public class Driver {
12     private static final Logger log = Logger.getLogger("jagi.event");
13     private static final Logger hlog = Logger.getLogger("jagi.event.handler");
14     private static final ThreadLocal<Driver> current = new ThreadLocal<>();
15     private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
16     private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
17                                                                   5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
18                                                                   this::thread);
19
20     protected Thread thread(Runnable tgt) {
21         return(new Thread(tgt));
22     }
23
24     protected void handle(Watcher w, int evs) {
25         try {
26             current.set(this);
27             w.handle(evs);
28         } catch(Throwable t) {
29             error(w, t, "handling event");
30         } finally {
31             current.remove();
32         }
33     }
34
35     protected void close(Watcher w) {
36         try {
37             current.set(this);
38             w.close();
39         } catch(Throwable t) {
40             error(w, t, "closing");
41         } finally {
42             current.remove();
43         }
44     }
45
46     protected void submit(Runnable task) {
47         worker.submit(task);
48     }
49
50     protected void error(Watcher w, Throwable t, String thing) {
51         hlog.log(Level.WARNING, w + ": uncaught error when " + thing, t);
52         remove(w);
53     }
54
55     class SelectPool implements Runnable {
56         final SelectorProvider provider;
57         final Selector poll;
58         final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
59         final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
60         final Map<Watcher, Object> paused = new IdentityHashMap<>();
61         final Collection<SelectionKey> cancelled = new HashSet<>();
62
63         SelectPool(SelectorProvider provider) {
64             this.provider = provider;
65             try {
66                 this.poll = provider.openSelector();
67             } catch(IOException e) {
68                 /* I think this counts more as an assertion error. */
69                 throw(new RuntimeException(e));
70             }
71         }
72
73         void handle(Watcher w, int evs) {
74             if(!watching.containsKey(w))
75                 return;
76             try {
77                 pause(w);
78                 submit(() -> {
79                         try {
80                             Driver.this.handle(w, evs);
81                         } finally {
82                             resume(w);
83                         }
84                     });
85             } catch(Throwable t) {
86                 try {
87                     synchronized(selectors) {
88                         remove(w);
89                     }
90                 } catch(Exception e) {
91                     t.addSuppressed(e);
92                 }
93                 log.log(Level.SEVERE, "unexpected error when submitting event", t);
94             }
95         }
96
97         void start() {
98             thread(this).start();
99         }
100
101         public void run() {
102             boolean quit = false;
103             Throwable error = null;
104             try {
105                 double now = time();
106                 while(true) {
107                     long timeout = 0;
108                     synchronized(selectors) {
109                         Double first = timeheap.keypeek();
110                         if((first == null) && watching.isEmpty()) {
111                             quit = true;
112                             selectors.remove(provider);
113                             return;
114                         }
115                         if(first != null)
116                             timeout = Math.max((long)Math.ceil((first - now) * 1000), 1);
117                     }
118                     Collection<SelectionKey> precancelled;
119                     synchronized(cancelled) {
120                         precancelled = new ArrayList<>(cancelled);
121                     }
122                     if(!precancelled.isEmpty())
123                         timeout = 1;
124                     poll.selectedKeys().clear();
125                     try {
126                         poll.select(timeout);
127                     } catch(IOException e) {
128                         throw(new RuntimeException(e));
129                     }
130                     if(!precancelled.isEmpty()) {
131                         synchronized(cancelled) {
132                             cancelled.removeAll(precancelled);
133                             cancelled.notifyAll();
134                         }
135                     }
136                     for(SelectionKey key : poll.selectedKeys())
137                         handle((Watcher)key.attachment(), key.readyOps());
138                     now = time();
139                     while(true) {
140                         Double first = timeheap.keypeek();
141                         if((first == null) || (first > now))
142                             break;
143                         handle(timeheap.remove(), 0);
144                     }
145                 }
146             } catch(Throwable t) {
147                 error = t;
148                 throw(t);
149             } finally {
150                 if(!quit)
151                     log.log(Level.SEVERE, "selector exited abnormally", error);
152             }
153         }
154
155         void pause(Watcher w) {
156             if(paused.containsKey(w))
157                 throw(new IllegalStateException(w + ": already paused"));
158             SelectionKey wc = watching.get(w);
159             Object tc = timeheap.remove(w);
160             if((wc == null) && (tc == null))
161                 throw(new IllegalStateException(w + ": not registered"));
162             if(wc != null)
163                 wc.interestOps(0);
164             paused.put(w, this);
165         }
166
167         void resume(Watcher w) {
168             if(paused.remove(w) == null)
169                 return;
170             SelectionKey wc = watching.get(w);
171             int evs = w.events();
172             double timeout = w.timeout();
173             boolean hastime = timeout < Double.POSITIVE_INFINITY;
174             if(evs < 0) {
175                 remove(w);
176                 return;
177             }
178             wc.interestOps(evs);
179             if(hastime)
180                 timeheap.add(w, timeout);
181             poll.wakeup();
182         }
183
184         void add(Watcher w, SelectableChannel ch) {
185             if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
186                 throw(new IllegalStateException(w + ": already registered"));
187             try {
188                 ch.configureBlocking(false);
189             } catch(IOException e) {
190                 throw(new RuntimeException(ch + ": could not make non-blocking", e));
191             }
192             int evs = w.events();
193             double timeout = w.timeout();
194             boolean hastime = timeout < Double.POSITIVE_INFINITY;
195             if(evs < 0) {
196                 submit(() -> close(w));
197                 return;
198             }
199             w.added(Driver.this);
200             try {
201                 watching.put(w, ch.register(poll, evs, w));
202             } catch(ClosedChannelException e) {
203                 throw(new RuntimeException("attempted to watch closed channel", e));
204             }
205             if(hastime)
206                 timeheap.add(w, timeout);
207             poll.wakeup();
208         }
209
210         void remove(Watcher w) {
211             SelectionKey wc = watching.remove(w);
212             Object tc = timeheap.remove(w);
213             Object pc = paused.remove(w);
214             if(wc != null) {
215                 synchronized(cancelled) {
216                     cancelled.add(wc);
217                     wc.cancel();
218                     poll.wakeup();
219                     boolean irq = false;
220                     while(cancelled.contains(wc)) {
221                         try {
222                             cancelled.wait();
223                         } catch(InterruptedException e) {
224                             irq = true;
225                         }
226                     }
227                     if(irq)
228                         Thread.currentThread().interrupt();
229                 }
230             }
231             if(((wc != null) || (tc != null)) && (pc != null))
232                 throw(new RuntimeException(w + ": inconsistent internal state"));
233             if(wc == null)
234                 throw(new IllegalStateException(w + ": not registered"));
235             submit(() -> close(w));
236         }
237
238         void update(Watcher w) {
239             SelectionKey wc = watching.get(w);
240             if(wc == null)
241                 throw(new IllegalStateException(w + ": not registered"));
242             int evs = w.events();
243             double timeout = w.timeout();
244             boolean hastime = timeout < Double.POSITIVE_INFINITY;
245             if(evs < 0) {
246                 remove(w);
247                 return;
248             }
249             wc.interestOps(evs);
250             if(hastime)
251                 timeheap.set(w, timeout);
252             else
253                 timeheap.remove(w);
254             poll.wakeup();
255         }
256     }
257
258     private SelectPool pool(SelectorProvider provider) {
259         SelectPool pool = selectors.get(provider);
260         if(pool == null) {
261             pool = new SelectPool(provider);
262             selectors.put(provider, pool);
263             pool.start();
264         }
265         return(pool);
266     }
267
268     public void add(Watcher w) {
269         SelectableChannel ch = w.channel();
270         synchronized(selectors) {
271             pool(ch.provider()).add(w, ch);
272         }
273     }
274
275     public void remove(Watcher w) {
276         SelectableChannel ch = w.channel();
277         synchronized(selectors) {
278             pool(ch.provider()).remove(w);
279         }
280     }
281
282     public void update(Watcher w) {
283         SelectableChannel ch = w.channel();
284         synchronized(selectors) {
285             pool(ch.provider()).update(w);
286         }
287     }
288
289     public double time() {
290         return(rtime());
291     }
292
293     private static final long rtimeoff = System.nanoTime();
294     public static double rtime() {
295         return((System.nanoTime() - rtimeoff) / 1e9);
296     }
297
298     private static Driver global = null;
299     public static Driver get() {
300         if(global == null) {
301             synchronized(Driver.class) {
302                 if(global == null)
303                     global = new Driver();
304             }
305         }
306         return(global);
307     }
308
309     public static Driver current() {
310         Driver ret = current.get();
311         if(ret == null)
312             throw(new IllegalStateException("no current driver"));
313         return(ret);
314     }
315 }