Handle cancelled selection keys... "properly"?
[jagi.git] / src / jagi / event / Driver.java
1 package jagi.event;
2
3 import java.util.*;
4 import java.util.logging.*;
5 import java.util.concurrent.*;
6 import java.io.*;
7 import java.nio.*;
8 import java.nio.channels.*;
9 import java.nio.channels.spi.*;
10
11 public class Driver {
12     private static final Logger log = Logger.getLogger("jagi.event");
13     private static final Logger hlog = Logger.getLogger("jagi.event.handler");
14     private static final ThreadLocal<Driver> current = new ThreadLocal<>();
15     private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
16     private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
17                                                                   5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
18                                                                   this::thread);
19
20     protected Thread thread(Runnable tgt) {
21         return(new Thread(tgt));
22     }
23
24     protected void handle(Watcher w, int evs) {
25         try {
26             current.set(this);
27             w.handle(evs);
28         } catch(Throwable t) {
29             error(w, t, "handling event");
30         } finally {
31             current.remove();
32         }
33     }
34
35     protected void close(Watcher w) {
36         try {
37             current.set(this);
38             w.close();
39         } catch(Throwable t) {
40             error(w, t, "closing");
41         } finally {
42             current.remove();
43         }
44     }
45
46     protected void submit(Runnable task) {
47         worker.submit(task);
48     }
49
50     protected void error(Watcher w, Throwable t, String thing) {
51         hlog.log(Level.WARNING, w + ": uncaught error when " + thing, t);
52         remove(w);
53     }
54
55     class SelectPool implements Runnable {
56         final SelectorProvider provider;
57         final Selector poll;
58         final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
59         final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
60         final Map<Watcher, Object> paused = new IdentityHashMap<>();
61         final Collection<SelectionKey> cancelled = new HashSet<>();
62
63         SelectPool(SelectorProvider provider) {
64             this.provider = provider;
65             try {
66                 this.poll = provider.openSelector();
67             } catch(IOException e) {
68                 /* I think this counts more as an assertion error. */
69                 throw(new RuntimeException(e));
70             }
71         }
72
73         void handle(Watcher w, int evs) {
74             if(!watching.containsKey(w))
75                 return;
76             try {
77                 pause(w);
78                 submit(() -> {
79                         try {
80                             Driver.this.handle(w, evs);
81                         } finally {
82                             resume(w);
83                         }
84                     });
85             } catch(Throwable t) {
86                 try {
87                     synchronized(selectors) {
88                         remove(w);
89                     }
90                 } catch(Exception e) {
91                     t.addSuppressed(e);
92                 }
93                 log.log(Level.SEVERE, "unexpected error when submitting event", t);
94             }
95         }
96
97         void start() {
98             thread(this).start();
99         }
100
101         public void run() {
102             boolean quit = false;
103             Throwable error = null;
104             try {
105                 double now = time();
106                 while(true) {
107                     long timeout = 0;
108                     synchronized(selectors) {
109                         Double first = timeheap.keypeek();
110                         if((first == null) && watching.isEmpty()) {
111                             quit = true;
112                             selectors.remove(provider);
113                             return;
114                         }
115                         if(first != null)
116                             timeout = Math.max((long)Math.ceil((first - now) * 1000), 1);
117                     }
118                     Collection<SelectionKey> precancelled;
119                     synchronized(cancelled) {
120                         precancelled = new ArrayList<>(cancelled);
121                     }
122                     if(!precancelled.isEmpty())
123                         timeout = 1;
124                     poll.selectedKeys().clear();
125                     try {
126                         poll.select(timeout);
127                     } catch(IOException e) {
128                         throw(new RuntimeException(e));
129                     }
130                     if(!precancelled.isEmpty()) {
131                         synchronized(cancelled) {
132                             cancelled.removeAll(precancelled);
133                             cancelled.notifyAll();
134                         }
135                     }
136                     for(SelectionKey key : poll.selectedKeys())
137                         handle((Watcher)key.attachment(), key.readyOps());
138                     now = time();
139                     while(true) {
140                         Double first = timeheap.keypeek();
141                         if((first == null) || (first > now))
142                             break;
143                         handle(timeheap.remove(), 0);
144                     }
145                 }
146             } catch(Throwable t) {
147                 error = t;
148                 throw(t);
149             } finally {
150                 if(!quit)
151                     log.log(Level.SEVERE, "selector exited abnormally", error);
152             }
153         }
154
155         void pause(Watcher w) {
156             if(paused.containsKey(w))
157                 throw(new IllegalStateException(w + ": already paused"));
158             SelectionKey wc = watching.get(w);
159             Object tc = timeheap.remove(w);
160             if((wc == null) && (tc == null))
161                 throw(new IllegalStateException(w + ": not registered"));
162             if(wc != null)
163                 wc.interestOps(0);
164             paused.put(w, this);
165         }
166
167         void resume(Watcher w) {
168             if(paused.remove(w) == null)
169                 return;
170             SelectionKey wc = watching.get(w);
171             int evs = w.events();
172             double timeout = w.timeout();
173             boolean hastime = timeout < Double.POSITIVE_INFINITY;
174             if(evs < 0) {
175                 remove(w);
176                 return;
177             }
178             wc.interestOps(evs);
179             if(hastime)
180                 timeheap.add(w, timeout);
181             poll.wakeup();
182         }
183
184         void add(Watcher w, SelectableChannel ch) {
185             if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
186                 throw(new IllegalStateException(w + ": already registered"));
187             int evs = w.events();
188             double timeout = w.timeout();
189             boolean hastime = timeout < Double.POSITIVE_INFINITY;
190             if(evs < 0) {
191                 submit(() -> close(w));
192                 return;
193             }
194             w.added(Driver.this);
195             try {
196                 watching.put(w, ch.register(poll, evs, w));
197             } catch(ClosedChannelException e) {
198                 throw(new RuntimeException("attempted to watch closed channel", e));
199             }
200             if(hastime)
201                 timeheap.add(w, timeout);
202             poll.wakeup();
203         }
204
205         void remove(Watcher w) {
206             SelectionKey wc = watching.remove(w);
207             Object tc = timeheap.remove(w);
208             Object pc = paused.remove(w);
209             if(wc != null) {
210                 synchronized(cancelled) {
211                     cancelled.add(wc);
212                     wc.cancel();
213                     poll.wakeup();
214                     boolean irq = false;
215                     while(cancelled.contains(wc)) {
216                         try {
217                             cancelled.wait();
218                         } catch(InterruptedException e) {
219                             irq = true;
220                         }
221                     }
222                     if(irq)
223                         Thread.currentThread().interrupt();
224                 }
225             }
226             if(((wc != null) || (tc != null)) && (pc != null))
227                 throw(new RuntimeException(w + ": inconsistent internal state"));
228             if(wc == null)
229                 throw(new IllegalStateException(w + ": not registered"));
230             submit(() -> close(w));
231         }
232
233         void update(Watcher w) {
234             SelectionKey wc = watching.get(w);
235             if(wc == null)
236                 throw(new IllegalStateException(w + ": not registered"));
237             int evs = w.events();
238             double timeout = w.timeout();
239             boolean hastime = timeout < Double.POSITIVE_INFINITY;
240             if(evs < 0) {
241                 remove(w);
242                 return;
243             }
244             wc.interestOps(evs);
245             if(hastime)
246                 timeheap.set(w, timeout);
247             else
248                 timeheap.remove(w);
249             poll.wakeup();
250         }
251     }
252
253     private SelectPool pool(SelectorProvider provider) {
254         SelectPool pool = selectors.get(provider);
255         if(pool == null) {
256             pool = new SelectPool(provider);
257             selectors.put(provider, pool);
258             pool.start();
259         }
260         return(pool);
261     }
262
263     public void add(Watcher w) {
264         SelectableChannel ch = w.channel();
265         synchronized(selectors) {
266             pool(ch.provider()).add(w, ch);
267         }
268     }
269
270     public void remove(Watcher w) {
271         SelectableChannel ch = w.channel();
272         synchronized(selectors) {
273             pool(ch.provider()).remove(w);
274         }
275     }
276
277     public void update(Watcher w) {
278         SelectableChannel ch = w.channel();
279         synchronized(selectors) {
280             pool(ch.provider()).update(w);
281         }
282     }
283
284     public double time() {
285         return(rtime());
286     }
287
288     private static final long rtimeoff = System.nanoTime();
289     public static double rtime() {
290         return((System.nanoTime() - rtimeoff) / 1e9);
291     }
292
293     private static Driver global = null;
294     public static Driver get() {
295         if(global == null) {
296             synchronized(Driver.class) {
297                 if(global == null)
298                     global = new Driver();
299             }
300         }
301         return(global);
302     }
303
304     public static Driver current() {
305         Driver ret = current.get();
306         if(ret == null)
307             throw(new IllegalStateException("no current driver"));
308         return(ret);
309     }
310 }