Added a basic event-loop driver.
[jagi.git] / src / jagi / event / Driver.java
1 package jagi.event;
2
3 import java.util.*;
4 import java.util.logging.*;
5 import java.util.concurrent.*;
6 import java.io.*;
7 import java.nio.*;
8 import java.nio.channels.*;
9 import java.nio.channels.spi.*;
10
11 public class Driver {
12     private static final Logger log = Logger.getLogger("jagi.event");
13     private static final Logger hlog = Logger.getLogger("jagi.event.handler");
14     private static final ThreadLocal<Driver> current = new ThreadLocal<>();
15     private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
16     private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
17                                                                   5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
18                                                                   this::thread);
19
20     protected Thread thread(Runnable tgt) {
21         return(new Thread(tgt));
22     }
23
24     protected void handle(Watcher w, int evs) {
25         try {
26             current.set(this);
27             w.handle(evs);
28         } catch(Throwable t) {
29             error(w, t);
30         } finally {
31             current.remove();
32         }
33     }
34
35     protected void submit(Runnable task) {
36         worker.submit(task);
37     }
38
39     protected void error(Watcher w, Throwable t) {
40         hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
41         remove(w);
42     }
43
44     class SelectPool implements Runnable {
45         final SelectorProvider provider;
46         final Selector poll;
47         final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
48         final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
49         final Map<Watcher, Object> paused = new IdentityHashMap<>();
50
51         SelectPool(SelectorProvider provider) {
52             this.provider = provider;
53             try {
54                 this.poll = provider.openSelector();
55             } catch(IOException e) {
56                 /* I think this counts more as an assertion error. */
57                 throw(new RuntimeException(e));
58             }
59         }
60
61         void handle(Watcher w, int evs) {
62             try {
63                 pause(w);
64                 submit(() -> {
65                         try {
66                             Driver.this.handle(w, evs);
67                         } finally {
68                             resume(w);
69                         }
70                     });
71             } catch(Throwable t) {
72                 try {
73                     synchronized(selectors) {
74                         remove(w);
75                     }
76                 } catch(Exception e) {
77                     t.addSuppressed(e);
78                 }
79                 log.log(Level.SEVERE, "unexpected error when submitting event", t);
80             }
81         }
82
83         void start() {
84             thread(this).start();
85         }
86
87         public void run() {
88             boolean quit = false;
89             Throwable error = null;
90             try {
91                 while(true) {
92                     double now = time();
93                     long timeout = 0;
94                     synchronized(selectors) {
95                         Double first = timeheap.keypeek();
96                         if((first == null) && watching.isEmpty()) {
97                             quit = true;
98                             selectors.remove(provider);
99                             return;
100                         }
101                         if(first != null)
102                             timeout = (long)Math.ceil((first - now) * 1000);
103                     }
104                     poll.selectedKeys().clear();
105                     try {
106                         poll.select(timeout);
107                     } catch(IOException e) {
108                         throw(new RuntimeException(e));
109                     }
110                     for(SelectionKey key : poll.selectedKeys())
111                         handle((Watcher)key.attachment(), key.readyOps());
112                 }
113             } catch(Throwable t) {
114                 error = t;
115                 throw(t);
116             } finally {
117                 if(!quit)
118                     log.log(Level.SEVERE, "selector exited abnormally", error);
119             }
120         }
121
122         void pause(Watcher w) {
123             if(paused.containsKey(w))
124                 throw(new IllegalStateException(w + ": already paused"));
125             SelectionKey wc = watching.get(w);
126             Object tc = timeheap.remove(w);
127             if((wc == null) && (tc == null))
128                 throw(new IllegalStateException(w + ": not registered"));
129             if(wc != null)
130                 wc.interestOps(0);
131             paused.put(w, this);
132         }
133
134         void resume(Watcher w) {
135             if(paused.remove(w) == null)
136                 return;
137             SelectionKey wc = watching.get(w);
138             int evs = w.events();
139             double timeout = w.timeout();
140             boolean hastime = timeout < Double.POSITIVE_INFINITY;
141             if((evs == 0) && !hastime) {
142                 remove(w);
143                 return;
144             }
145             wc.interestOps(evs);
146             if(hastime)
147                 timeheap.add(w, timeout);
148             poll.wakeup();
149         }
150
151         void add(Watcher w, SelectableChannel ch) {
152             if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
153                 throw(new IllegalStateException(w + ": already registered"));
154             int evs = w.events();
155             double timeout = w.timeout();
156             boolean hastime = timeout < Double.POSITIVE_INFINITY;
157             if((evs == 0) && !hastime) {
158                 w.close();
159                 return;
160             }
161             try {
162                 watching.put(w, ch.register(poll, evs, w));
163             } catch(ClosedChannelException e) {
164                 throw(new RuntimeException("attempted to watch closed channel", e));
165             }
166             if(hastime)
167                 timeheap.add(w, timeout);
168             poll.wakeup();
169         }
170
171         void remove(Watcher w) {
172             SelectionKey wc = watching.remove(w);
173             Object tc = timeheap.remove(w);
174             Object pc = paused.remove(w);
175             if(wc != null)
176                 wc.cancel();
177             if(((wc != null) || (tc != null)) && (pc != null))
178                 throw(new RuntimeException(w + ": inconsistent internal state"));
179             if(wc == null)
180                 throw(new IllegalStateException(w + ": not registered"));
181             w.close();
182             poll.wakeup();
183         }
184
185         void update(Watcher w) {
186             SelectionKey wc = watching.get(w);
187             if(wc == null)
188                 throw(new IllegalStateException(w + ": not registered"));
189             int evs = w.events();
190             double timeout = w.timeout();
191             boolean hastime = timeout < Double.POSITIVE_INFINITY;
192             if((evs == 0) && !hastime) {
193                 remove(w);
194                 return;
195             }
196             wc.interestOps(evs);
197             if(hastime)
198                 timeheap.set(w, timeout);
199             else
200                 timeheap.remove(w);
201             poll.wakeup();
202         }
203     }
204
205     private SelectPool pool(SelectorProvider provider) {
206         SelectPool pool = selectors.get(provider);
207         if(pool == null) {
208             pool = new SelectPool(provider);
209             selectors.put(provider, pool);
210             pool.start();
211         }
212         return(pool);
213     }
214
215     public void add(Watcher w) {
216         SelectableChannel ch = w.channel();
217         synchronized(selectors) {
218             pool(ch.provider()).add(w, ch);
219         }
220     }
221
222     public void remove(Watcher w) {
223         SelectableChannel ch = w.channel();
224         synchronized(selectors) {
225             pool(ch.provider()).remove(w);
226         }
227     }
228
229     public void update(Watcher w) {
230         SelectableChannel ch = w.channel();
231         synchronized(selectors) {
232             pool(ch.provider()).update(w);
233         }
234     }
235
236     public double time() {
237         return(rtime());
238     }
239
240     private static final long rtimeoff = System.nanoTime();
241     public static double rtime() {
242         return((System.nanoTime() - rtimeoff) / 1e9);
243     }
244
245     private static Driver global = null;
246     public static Driver get() {
247         if(global == null) {
248             synchronized(Driver.class) {
249                 if(global == null)
250                     global = new Driver();
251             }
252         }
253         return(global);
254     }
255
256     public static Driver current() {
257         Driver ret = current.get();
258         if(ret == null)
259             throw(new IllegalStateException("no current driver"));
260         return(ret);
261     }
262 }