Actually implement event-driver timeouts.
[jagi.git] / src / jagi / event / Driver.java
1 package jagi.event;
2
3 import java.util.*;
4 import java.util.logging.*;
5 import java.util.concurrent.*;
6 import java.io.*;
7 import java.nio.*;
8 import java.nio.channels.*;
9 import java.nio.channels.spi.*;
10
11 public class Driver {
12     private static final Logger log = Logger.getLogger("jagi.event");
13     private static final Logger hlog = Logger.getLogger("jagi.event.handler");
14     private static final ThreadLocal<Driver> current = new ThreadLocal<>();
15     private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
16     private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
17                                                                   5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
18                                                                   this::thread);
19
20     protected Thread thread(Runnable tgt) {
21         return(new Thread(tgt));
22     }
23
24     protected void handle(Watcher w, int evs) {
25         try {
26             current.set(this);
27             w.handle(evs);
28         } catch(Throwable t) {
29             error(w, t);
30         } finally {
31             current.remove();
32         }
33     }
34
35     protected void submit(Runnable task) {
36         worker.submit(task);
37     }
38
39     protected void error(Watcher w, Throwable t) {
40         hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
41         remove(w);
42     }
43
44     class SelectPool implements Runnable {
45         final SelectorProvider provider;
46         final Selector poll;
47         final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
48         final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
49         final Map<Watcher, Object> paused = new IdentityHashMap<>();
50
51         SelectPool(SelectorProvider provider) {
52             this.provider = provider;
53             try {
54                 this.poll = provider.openSelector();
55             } catch(IOException e) {
56                 /* I think this counts more as an assertion error. */
57                 throw(new RuntimeException(e));
58             }
59         }
60
61         void handle(Watcher w, int evs) {
62             if(!watching.containsKey(w))
63                 return;
64             try {
65                 pause(w);
66                 submit(() -> {
67                         try {
68                             Driver.this.handle(w, evs);
69                         } finally {
70                             resume(w);
71                         }
72                     });
73             } catch(Throwable t) {
74                 try {
75                     synchronized(selectors) {
76                         remove(w);
77                     }
78                 } catch(Exception e) {
79                     t.addSuppressed(e);
80                 }
81                 log.log(Level.SEVERE, "unexpected error when submitting event", t);
82             }
83         }
84
85         void start() {
86             thread(this).start();
87         }
88
89         public void run() {
90             boolean quit = false;
91             Throwable error = null;
92             try {
93                 double now = time();
94                 while(true) {
95                     long timeout = 0;
96                     synchronized(selectors) {
97                         Double first = timeheap.keypeek();
98                         if((first == null) && watching.isEmpty()) {
99                             quit = true;
100                             selectors.remove(provider);
101                             return;
102                         }
103                         if(first != null)
104                             timeout = (long)Math.ceil((first - now) * 1000);
105                     }
106                     poll.selectedKeys().clear();
107                     try {
108                         poll.select(timeout);
109                     } catch(IOException e) {
110                         throw(new RuntimeException(e));
111                     }
112                     for(SelectionKey key : poll.selectedKeys())
113                         handle((Watcher)key.attachment(), key.readyOps());
114                     now = time();
115                     while(true) {
116                         Double first = timeheap.keypeek();
117                         if((first == null) || (first > now))
118                             break;
119                         handle(timeheap.remove(), 0);
120                     }
121                 }
122             } catch(Throwable t) {
123                 error = t;
124                 throw(t);
125             } finally {
126                 if(!quit)
127                     log.log(Level.SEVERE, "selector exited abnormally", error);
128             }
129         }
130
131         void pause(Watcher w) {
132             if(paused.containsKey(w))
133                 throw(new IllegalStateException(w + ": already paused"));
134             SelectionKey wc = watching.get(w);
135             Object tc = timeheap.remove(w);
136             if((wc == null) && (tc == null))
137                 throw(new IllegalStateException(w + ": not registered"));
138             if(wc != null)
139                 wc.interestOps(0);
140             paused.put(w, this);
141         }
142
143         void resume(Watcher w) {
144             if(paused.remove(w) == null)
145                 return;
146             SelectionKey wc = watching.get(w);
147             int evs = w.events();
148             double timeout = w.timeout();
149             boolean hastime = timeout < Double.POSITIVE_INFINITY;
150             if(evs < 0) {
151                 remove(w);
152                 return;
153             }
154             wc.interestOps(evs);
155             if(hastime)
156                 timeheap.add(w, timeout);
157             poll.wakeup();
158         }
159
160         void add(Watcher w, SelectableChannel ch) {
161             if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
162                 throw(new IllegalStateException(w + ": already registered"));
163             int evs = w.events();
164             double timeout = w.timeout();
165             boolean hastime = timeout < Double.POSITIVE_INFINITY;
166             if(evs < 0) {
167                 submit(w::close);
168                 return;
169             }
170             w.added(Driver.this);
171             try {
172                 watching.put(w, ch.register(poll, evs, w));
173             } catch(ClosedChannelException e) {
174                 throw(new RuntimeException("attempted to watch closed channel", e));
175             }
176             if(hastime)
177                 timeheap.add(w, timeout);
178             poll.wakeup();
179         }
180
181         void remove(Watcher w) {
182             SelectionKey wc = watching.remove(w);
183             Object tc = timeheap.remove(w);
184             Object pc = paused.remove(w);
185             if(wc != null)
186                 wc.cancel();
187             if(((wc != null) || (tc != null)) && (pc != null))
188                 throw(new RuntimeException(w + ": inconsistent internal state"));
189             if(wc == null)
190                 throw(new IllegalStateException(w + ": not registered"));
191             submit(w::close);
192             poll.wakeup();
193         }
194
195         void update(Watcher w) {
196             SelectionKey wc = watching.get(w);
197             if(wc == null)
198                 throw(new IllegalStateException(w + ": not registered"));
199             int evs = w.events();
200             double timeout = w.timeout();
201             boolean hastime = timeout < Double.POSITIVE_INFINITY;
202             if(evs < 0) {
203                 remove(w);
204                 return;
205             }
206             wc.interestOps(evs);
207             if(hastime)
208                 timeheap.set(w, timeout);
209             else
210                 timeheap.remove(w);
211             poll.wakeup();
212         }
213     }
214
215     private SelectPool pool(SelectorProvider provider) {
216         SelectPool pool = selectors.get(provider);
217         if(pool == null) {
218             pool = new SelectPool(provider);
219             selectors.put(provider, pool);
220             pool.start();
221         }
222         return(pool);
223     }
224
225     public void add(Watcher w) {
226         SelectableChannel ch = w.channel();
227         synchronized(selectors) {
228             pool(ch.provider()).add(w, ch);
229         }
230     }
231
232     public void remove(Watcher w) {
233         SelectableChannel ch = w.channel();
234         synchronized(selectors) {
235             pool(ch.provider()).remove(w);
236         }
237     }
238
239     public void update(Watcher w) {
240         SelectableChannel ch = w.channel();
241         synchronized(selectors) {
242             pool(ch.provider()).update(w);
243         }
244     }
245
246     public double time() {
247         return(rtime());
248     }
249
250     private static final long rtimeoff = System.nanoTime();
251     public static double rtime() {
252         return((System.nanoTime() - rtimeoff) / 1e9);
253     }
254
255     private static Driver global = null;
256     public static Driver get() {
257         if(global == null) {
258             synchronized(Driver.class) {
259                 if(global == null)
260                     global = new Driver();
261             }
262         }
263         return(global);
264     }
265
266     public static Driver current() {
267         Driver ret = current.get();
268         if(ret == null)
269             throw(new IllegalStateException("no current driver"));
270         return(ret);
271     }
272 }