Added a basic event-loop driver.
[jagi.git] / src / jagi / event / Driver.java
CommitLineData
aac2f975
FT
1package jagi.event;
2
3import java.util.*;
4import java.util.logging.*;
5import java.util.concurrent.*;
6import java.io.*;
7import java.nio.*;
8import java.nio.channels.*;
9import java.nio.channels.spi.*;
10
11public class Driver {
12 private static final Logger log = Logger.getLogger("jagi.event");
13 private static final Logger hlog = Logger.getLogger("jagi.event.handler");
14 private static final ThreadLocal<Driver> current = new ThreadLocal<>();
15 private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
16 private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
17 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
18 this::thread);
19
20 protected Thread thread(Runnable tgt) {
21 return(new Thread(tgt));
22 }
23
24 protected void handle(Watcher w, int evs) {
25 try {
26 current.set(this);
27 w.handle(evs);
28 } catch(Throwable t) {
29 error(w, t);
30 } finally {
31 current.remove();
32 }
33 }
34
35 protected void submit(Runnable task) {
36 worker.submit(task);
37 }
38
39 protected void error(Watcher w, Throwable t) {
40 hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
41 remove(w);
42 }
43
44 class SelectPool implements Runnable {
45 final SelectorProvider provider;
46 final Selector poll;
47 final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
48 final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
49 final Map<Watcher, Object> paused = new IdentityHashMap<>();
50
51 SelectPool(SelectorProvider provider) {
52 this.provider = provider;
53 try {
54 this.poll = provider.openSelector();
55 } catch(IOException e) {
56 /* I think this counts more as an assertion error. */
57 throw(new RuntimeException(e));
58 }
59 }
60
61 void handle(Watcher w, int evs) {
62 try {
63 pause(w);
64 submit(() -> {
65 try {
66 Driver.this.handle(w, evs);
67 } finally {
68 resume(w);
69 }
70 });
71 } catch(Throwable t) {
72 try {
73 synchronized(selectors) {
74 remove(w);
75 }
76 } catch(Exception e) {
77 t.addSuppressed(e);
78 }
79 log.log(Level.SEVERE, "unexpected error when submitting event", t);
80 }
81 }
82
83 void start() {
84 thread(this).start();
85 }
86
87 public void run() {
88 boolean quit = false;
89 Throwable error = null;
90 try {
91 while(true) {
92 double now = time();
93 long timeout = 0;
94 synchronized(selectors) {
95 Double first = timeheap.keypeek();
96 if((first == null) && watching.isEmpty()) {
97 quit = true;
98 selectors.remove(provider);
99 return;
100 }
101 if(first != null)
102 timeout = (long)Math.ceil((first - now) * 1000);
103 }
104 poll.selectedKeys().clear();
105 try {
106 poll.select(timeout);
107 } catch(IOException e) {
108 throw(new RuntimeException(e));
109 }
110 for(SelectionKey key : poll.selectedKeys())
111 handle((Watcher)key.attachment(), key.readyOps());
112 }
113 } catch(Throwable t) {
114 error = t;
115 throw(t);
116 } finally {
117 if(!quit)
118 log.log(Level.SEVERE, "selector exited abnormally", error);
119 }
120 }
121
122 void pause(Watcher w) {
123 if(paused.containsKey(w))
124 throw(new IllegalStateException(w + ": already paused"));
125 SelectionKey wc = watching.get(w);
126 Object tc = timeheap.remove(w);
127 if((wc == null) && (tc == null))
128 throw(new IllegalStateException(w + ": not registered"));
129 if(wc != null)
130 wc.interestOps(0);
131 paused.put(w, this);
132 }
133
134 void resume(Watcher w) {
135 if(paused.remove(w) == null)
136 return;
137 SelectionKey wc = watching.get(w);
138 int evs = w.events();
139 double timeout = w.timeout();
140 boolean hastime = timeout < Double.POSITIVE_INFINITY;
141 if((evs == 0) && !hastime) {
142 remove(w);
143 return;
144 }
145 wc.interestOps(evs);
146 if(hastime)
147 timeheap.add(w, timeout);
148 poll.wakeup();
149 }
150
151 void add(Watcher w, SelectableChannel ch) {
152 if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
153 throw(new IllegalStateException(w + ": already registered"));
154 int evs = w.events();
155 double timeout = w.timeout();
156 boolean hastime = timeout < Double.POSITIVE_INFINITY;
157 if((evs == 0) && !hastime) {
158 w.close();
159 return;
160 }
161 try {
162 watching.put(w, ch.register(poll, evs, w));
163 } catch(ClosedChannelException e) {
164 throw(new RuntimeException("attempted to watch closed channel", e));
165 }
166 if(hastime)
167 timeheap.add(w, timeout);
168 poll.wakeup();
169 }
170
171 void remove(Watcher w) {
172 SelectionKey wc = watching.remove(w);
173 Object tc = timeheap.remove(w);
174 Object pc = paused.remove(w);
175 if(wc != null)
176 wc.cancel();
177 if(((wc != null) || (tc != null)) && (pc != null))
178 throw(new RuntimeException(w + ": inconsistent internal state"));
179 if(wc == null)
180 throw(new IllegalStateException(w + ": not registered"));
181 w.close();
182 poll.wakeup();
183 }
184
185 void update(Watcher w) {
186 SelectionKey wc = watching.get(w);
187 if(wc == null)
188 throw(new IllegalStateException(w + ": not registered"));
189 int evs = w.events();
190 double timeout = w.timeout();
191 boolean hastime = timeout < Double.POSITIVE_INFINITY;
192 if((evs == 0) && !hastime) {
193 remove(w);
194 return;
195 }
196 wc.interestOps(evs);
197 if(hastime)
198 timeheap.set(w, timeout);
199 else
200 timeheap.remove(w);
201 poll.wakeup();
202 }
203 }
204
205 private SelectPool pool(SelectorProvider provider) {
206 SelectPool pool = selectors.get(provider);
207 if(pool == null) {
208 pool = new SelectPool(provider);
209 selectors.put(provider, pool);
210 pool.start();
211 }
212 return(pool);
213 }
214
215 public void add(Watcher w) {
216 SelectableChannel ch = w.channel();
217 synchronized(selectors) {
218 pool(ch.provider()).add(w, ch);
219 }
220 }
221
222 public void remove(Watcher w) {
223 SelectableChannel ch = w.channel();
224 synchronized(selectors) {
225 pool(ch.provider()).remove(w);
226 }
227 }
228
229 public void update(Watcher w) {
230 SelectableChannel ch = w.channel();
231 synchronized(selectors) {
232 pool(ch.provider()).update(w);
233 }
234 }
235
236 public double time() {
237 return(rtime());
238 }
239
240 private static final long rtimeoff = System.nanoTime();
241 public static double rtime() {
242 return((System.nanoTime() - rtimeoff) / 1e9);
243 }
244
245 private static Driver global = null;
246 public static Driver get() {
247 if(global == null) {
248 synchronized(Driver.class) {
249 if(global == null)
250 global = new Driver();
251 }
252 }
253 return(global);
254 }
255
256 public static Driver current() {
257 Driver ret = current.get();
258 if(ret == null)
259 throw(new IllegalStateException("no current driver"));
260 return(ret);
261 }
262}