Configure channel blocking as part of adding to the event-loop.
[jagi.git] / src / jagi / event / Driver.java
CommitLineData
aac2f975
FT
1package jagi.event;
2
3import java.util.*;
4import java.util.logging.*;
5import java.util.concurrent.*;
6import java.io.*;
7import java.nio.*;
8import java.nio.channels.*;
9import java.nio.channels.spi.*;
10
11public class Driver {
12 private static final Logger log = Logger.getLogger("jagi.event");
13 private static final Logger hlog = Logger.getLogger("jagi.event.handler");
14 private static final ThreadLocal<Driver> current = new ThreadLocal<>();
15 private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
16 private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
17 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
18 this::thread);
19
20 protected Thread thread(Runnable tgt) {
21 return(new Thread(tgt));
22 }
23
24 protected void handle(Watcher w, int evs) {
25 try {
26 current.set(this);
27 w.handle(evs);
28 } catch(Throwable t) {
7a3bde5c
FT
29 error(w, t, "handling event");
30 } finally {
31 current.remove();
32 }
33 }
34
35 protected void close(Watcher w) {
36 try {
37 current.set(this);
38 w.close();
39 } catch(Throwable t) {
40 error(w, t, "closing");
aac2f975
FT
41 } finally {
42 current.remove();
43 }
44 }
45
46 protected void submit(Runnable task) {
47 worker.submit(task);
48 }
49
7a3bde5c
FT
50 protected void error(Watcher w, Throwable t, String thing) {
51 hlog.log(Level.WARNING, w + ": uncaught error when " + thing, t);
aac2f975
FT
52 remove(w);
53 }
54
55 class SelectPool implements Runnable {
56 final SelectorProvider provider;
57 final Selector poll;
58 final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
59 final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
60 final Map<Watcher, Object> paused = new IdentityHashMap<>();
e6788877 61 final Collection<SelectionKey> cancelled = new HashSet<>();
aac2f975
FT
62
63 SelectPool(SelectorProvider provider) {
64 this.provider = provider;
65 try {
66 this.poll = provider.openSelector();
67 } catch(IOException e) {
68 /* I think this counts more as an assertion error. */
69 throw(new RuntimeException(e));
70 }
71 }
72
73 void handle(Watcher w, int evs) {
2a11bb22
FT
74 if(!watching.containsKey(w))
75 return;
aac2f975
FT
76 try {
77 pause(w);
78 submit(() -> {
79 try {
80 Driver.this.handle(w, evs);
81 } finally {
82 resume(w);
83 }
84 });
85 } catch(Throwable t) {
86 try {
87 synchronized(selectors) {
88 remove(w);
89 }
90 } catch(Exception e) {
91 t.addSuppressed(e);
92 }
93 log.log(Level.SEVERE, "unexpected error when submitting event", t);
94 }
95 }
96
97 void start() {
98 thread(this).start();
99 }
100
101 public void run() {
102 boolean quit = false;
103 Throwable error = null;
104 try {
2a11bb22 105 double now = time();
aac2f975 106 while(true) {
aac2f975
FT
107 long timeout = 0;
108 synchronized(selectors) {
109 Double first = timeheap.keypeek();
110 if((first == null) && watching.isEmpty()) {
111 quit = true;
112 selectors.remove(provider);
113 return;
114 }
115 if(first != null)
e6788877 116 timeout = Math.max((long)Math.ceil((first - now) * 1000), 1);
aac2f975 117 }
e6788877
FT
118 Collection<SelectionKey> precancelled;
119 synchronized(cancelled) {
120 precancelled = new ArrayList<>(cancelled);
121 }
122 if(!precancelled.isEmpty())
123 timeout = 1;
aac2f975
FT
124 poll.selectedKeys().clear();
125 try {
126 poll.select(timeout);
127 } catch(IOException e) {
128 throw(new RuntimeException(e));
129 }
e6788877
FT
130 if(!precancelled.isEmpty()) {
131 synchronized(cancelled) {
132 cancelled.removeAll(precancelled);
133 cancelled.notifyAll();
134 }
135 }
aac2f975
FT
136 for(SelectionKey key : poll.selectedKeys())
137 handle((Watcher)key.attachment(), key.readyOps());
2a11bb22
FT
138 now = time();
139 while(true) {
140 Double first = timeheap.keypeek();
141 if((first == null) || (first > now))
142 break;
143 handle(timeheap.remove(), 0);
144 }
aac2f975
FT
145 }
146 } catch(Throwable t) {
147 error = t;
148 throw(t);
149 } finally {
150 if(!quit)
151 log.log(Level.SEVERE, "selector exited abnormally", error);
152 }
153 }
154
155 void pause(Watcher w) {
156 if(paused.containsKey(w))
157 throw(new IllegalStateException(w + ": already paused"));
158 SelectionKey wc = watching.get(w);
159 Object tc = timeheap.remove(w);
160 if((wc == null) && (tc == null))
161 throw(new IllegalStateException(w + ": not registered"));
162 if(wc != null)
163 wc.interestOps(0);
164 paused.put(w, this);
165 }
166
167 void resume(Watcher w) {
168 if(paused.remove(w) == null)
169 return;
170 SelectionKey wc = watching.get(w);
171 int evs = w.events();
172 double timeout = w.timeout();
173 boolean hastime = timeout < Double.POSITIVE_INFINITY;
2c1781f3 174 if(evs < 0) {
aac2f975
FT
175 remove(w);
176 return;
177 }
178 wc.interestOps(evs);
179 if(hastime)
180 timeheap.add(w, timeout);
181 poll.wakeup();
182 }
183
184 void add(Watcher w, SelectableChannel ch) {
185 if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
186 throw(new IllegalStateException(w + ": already registered"));
1ee6412b
FT
187 try {
188 ch.configureBlocking(false);
189 } catch(IOException e) {
190 throw(new RuntimeException(ch + ": could not make non-blocking", e));
191 }
aac2f975
FT
192 int evs = w.events();
193 double timeout = w.timeout();
194 boolean hastime = timeout < Double.POSITIVE_INFINITY;
2c1781f3 195 if(evs < 0) {
7a3bde5c 196 submit(() -> close(w));
aac2f975
FT
197 return;
198 }
2c1781f3 199 w.added(Driver.this);
aac2f975
FT
200 try {
201 watching.put(w, ch.register(poll, evs, w));
202 } catch(ClosedChannelException e) {
203 throw(new RuntimeException("attempted to watch closed channel", e));
204 }
205 if(hastime)
206 timeheap.add(w, timeout);
207 poll.wakeup();
208 }
209
210 void remove(Watcher w) {
211 SelectionKey wc = watching.remove(w);
212 Object tc = timeheap.remove(w);
213 Object pc = paused.remove(w);
e6788877
FT
214 if(wc != null) {
215 synchronized(cancelled) {
216 cancelled.add(wc);
217 wc.cancel();
218 poll.wakeup();
219 boolean irq = false;
220 while(cancelled.contains(wc)) {
221 try {
222 cancelled.wait();
223 } catch(InterruptedException e) {
224 irq = true;
225 }
226 }
227 if(irq)
228 Thread.currentThread().interrupt();
229 }
230 }
aac2f975
FT
231 if(((wc != null) || (tc != null)) && (pc != null))
232 throw(new RuntimeException(w + ": inconsistent internal state"));
233 if(wc == null)
234 throw(new IllegalStateException(w + ": not registered"));
7a3bde5c 235 submit(() -> close(w));
aac2f975
FT
236 }
237
238 void update(Watcher w) {
239 SelectionKey wc = watching.get(w);
240 if(wc == null)
241 throw(new IllegalStateException(w + ": not registered"));
242 int evs = w.events();
243 double timeout = w.timeout();
244 boolean hastime = timeout < Double.POSITIVE_INFINITY;
2c1781f3 245 if(evs < 0) {
aac2f975
FT
246 remove(w);
247 return;
248 }
249 wc.interestOps(evs);
250 if(hastime)
251 timeheap.set(w, timeout);
252 else
253 timeheap.remove(w);
254 poll.wakeup();
255 }
256 }
257
258 private SelectPool pool(SelectorProvider provider) {
259 SelectPool pool = selectors.get(provider);
260 if(pool == null) {
261 pool = new SelectPool(provider);
262 selectors.put(provider, pool);
263 pool.start();
264 }
265 return(pool);
266 }
267
268 public void add(Watcher w) {
269 SelectableChannel ch = w.channel();
270 synchronized(selectors) {
271 pool(ch.provider()).add(w, ch);
272 }
273 }
274
275 public void remove(Watcher w) {
276 SelectableChannel ch = w.channel();
277 synchronized(selectors) {
278 pool(ch.provider()).remove(w);
279 }
280 }
281
282 public void update(Watcher w) {
283 SelectableChannel ch = w.channel();
284 synchronized(selectors) {
285 pool(ch.provider()).update(w);
286 }
287 }
288
289 public double time() {
290 return(rtime());
291 }
292
293 private static final long rtimeoff = System.nanoTime();
294 public static double rtime() {
295 return((System.nanoTime() - rtimeoff) / 1e9);
296 }
297
298 private static Driver global = null;
299 public static Driver get() {
300 if(global == null) {
301 synchronized(Driver.class) {
302 if(global == null)
303 global = new Driver();
304 }
305 }
306 return(global);
307 }
308
309 public static Driver current() {
310 Driver ret = current.get();
311 if(ret == null)
312 throw(new IllegalStateException("no current driver"));
313 return(ret);
314 }
315}