Actually implement event-driver timeouts.
[jagi.git] / src / jagi / event / Driver.java
CommitLineData
aac2f975
FT
1package jagi.event;
2
3import java.util.*;
4import java.util.logging.*;
5import java.util.concurrent.*;
6import java.io.*;
7import java.nio.*;
8import java.nio.channels.*;
9import java.nio.channels.spi.*;
10
11public class Driver {
12 private static final Logger log = Logger.getLogger("jagi.event");
13 private static final Logger hlog = Logger.getLogger("jagi.event.handler");
14 private static final ThreadLocal<Driver> current = new ThreadLocal<>();
15 private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
16 private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
17 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
18 this::thread);
19
20 protected Thread thread(Runnable tgt) {
21 return(new Thread(tgt));
22 }
23
24 protected void handle(Watcher w, int evs) {
25 try {
26 current.set(this);
27 w.handle(evs);
28 } catch(Throwable t) {
29 error(w, t);
30 } finally {
31 current.remove();
32 }
33 }
34
35 protected void submit(Runnable task) {
36 worker.submit(task);
37 }
38
39 protected void error(Watcher w, Throwable t) {
40 hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
41 remove(w);
42 }
43
44 class SelectPool implements Runnable {
45 final SelectorProvider provider;
46 final Selector poll;
47 final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
48 final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
49 final Map<Watcher, Object> paused = new IdentityHashMap<>();
50
51 SelectPool(SelectorProvider provider) {
52 this.provider = provider;
53 try {
54 this.poll = provider.openSelector();
55 } catch(IOException e) {
56 /* I think this counts more as an assertion error. */
57 throw(new RuntimeException(e));
58 }
59 }
60
61 void handle(Watcher w, int evs) {
2a11bb22
FT
62 if(!watching.containsKey(w))
63 return;
aac2f975
FT
64 try {
65 pause(w);
66 submit(() -> {
67 try {
68 Driver.this.handle(w, evs);
69 } finally {
70 resume(w);
71 }
72 });
73 } catch(Throwable t) {
74 try {
75 synchronized(selectors) {
76 remove(w);
77 }
78 } catch(Exception e) {
79 t.addSuppressed(e);
80 }
81 log.log(Level.SEVERE, "unexpected error when submitting event", t);
82 }
83 }
84
85 void start() {
86 thread(this).start();
87 }
88
89 public void run() {
90 boolean quit = false;
91 Throwable error = null;
92 try {
2a11bb22 93 double now = time();
aac2f975 94 while(true) {
aac2f975
FT
95 long timeout = 0;
96 synchronized(selectors) {
97 Double first = timeheap.keypeek();
98 if((first == null) && watching.isEmpty()) {
99 quit = true;
100 selectors.remove(provider);
101 return;
102 }
103 if(first != null)
104 timeout = (long)Math.ceil((first - now) * 1000);
105 }
106 poll.selectedKeys().clear();
107 try {
108 poll.select(timeout);
109 } catch(IOException e) {
110 throw(new RuntimeException(e));
111 }
112 for(SelectionKey key : poll.selectedKeys())
113 handle((Watcher)key.attachment(), key.readyOps());
2a11bb22
FT
114 now = time();
115 while(true) {
116 Double first = timeheap.keypeek();
117 if((first == null) || (first > now))
118 break;
119 handle(timeheap.remove(), 0);
120 }
aac2f975
FT
121 }
122 } catch(Throwable t) {
123 error = t;
124 throw(t);
125 } finally {
126 if(!quit)
127 log.log(Level.SEVERE, "selector exited abnormally", error);
128 }
129 }
130
131 void pause(Watcher w) {
132 if(paused.containsKey(w))
133 throw(new IllegalStateException(w + ": already paused"));
134 SelectionKey wc = watching.get(w);
135 Object tc = timeheap.remove(w);
136 if((wc == null) && (tc == null))
137 throw(new IllegalStateException(w + ": not registered"));
138 if(wc != null)
139 wc.interestOps(0);
140 paused.put(w, this);
141 }
142
143 void resume(Watcher w) {
144 if(paused.remove(w) == null)
145 return;
146 SelectionKey wc = watching.get(w);
147 int evs = w.events();
148 double timeout = w.timeout();
149 boolean hastime = timeout < Double.POSITIVE_INFINITY;
2c1781f3 150 if(evs < 0) {
aac2f975
FT
151 remove(w);
152 return;
153 }
154 wc.interestOps(evs);
155 if(hastime)
156 timeheap.add(w, timeout);
157 poll.wakeup();
158 }
159
160 void add(Watcher w, SelectableChannel ch) {
161 if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
162 throw(new IllegalStateException(w + ": already registered"));
163 int evs = w.events();
164 double timeout = w.timeout();
165 boolean hastime = timeout < Double.POSITIVE_INFINITY;
2c1781f3
FT
166 if(evs < 0) {
167 submit(w::close);
aac2f975
FT
168 return;
169 }
2c1781f3 170 w.added(Driver.this);
aac2f975
FT
171 try {
172 watching.put(w, ch.register(poll, evs, w));
173 } catch(ClosedChannelException e) {
174 throw(new RuntimeException("attempted to watch closed channel", e));
175 }
176 if(hastime)
177 timeheap.add(w, timeout);
178 poll.wakeup();
179 }
180
181 void remove(Watcher w) {
182 SelectionKey wc = watching.remove(w);
183 Object tc = timeheap.remove(w);
184 Object pc = paused.remove(w);
185 if(wc != null)
186 wc.cancel();
187 if(((wc != null) || (tc != null)) && (pc != null))
188 throw(new RuntimeException(w + ": inconsistent internal state"));
189 if(wc == null)
190 throw(new IllegalStateException(w + ": not registered"));
2c1781f3 191 submit(w::close);
aac2f975
FT
192 poll.wakeup();
193 }
194
195 void update(Watcher w) {
196 SelectionKey wc = watching.get(w);
197 if(wc == null)
198 throw(new IllegalStateException(w + ": not registered"));
199 int evs = w.events();
200 double timeout = w.timeout();
201 boolean hastime = timeout < Double.POSITIVE_INFINITY;
2c1781f3 202 if(evs < 0) {
aac2f975
FT
203 remove(w);
204 return;
205 }
206 wc.interestOps(evs);
207 if(hastime)
208 timeheap.set(w, timeout);
209 else
210 timeheap.remove(w);
211 poll.wakeup();
212 }
213 }
214
215 private SelectPool pool(SelectorProvider provider) {
216 SelectPool pool = selectors.get(provider);
217 if(pool == null) {
218 pool = new SelectPool(provider);
219 selectors.put(provider, pool);
220 pool.start();
221 }
222 return(pool);
223 }
224
225 public void add(Watcher w) {
226 SelectableChannel ch = w.channel();
227 synchronized(selectors) {
228 pool(ch.provider()).add(w, ch);
229 }
230 }
231
232 public void remove(Watcher w) {
233 SelectableChannel ch = w.channel();
234 synchronized(selectors) {
235 pool(ch.provider()).remove(w);
236 }
237 }
238
239 public void update(Watcher w) {
240 SelectableChannel ch = w.channel();
241 synchronized(selectors) {
242 pool(ch.provider()).update(w);
243 }
244 }
245
246 public double time() {
247 return(rtime());
248 }
249
250 private static final long rtimeoff = System.nanoTime();
251 public static double rtime() {
252 return((System.nanoTime() - rtimeoff) / 1e9);
253 }
254
255 private static Driver global = null;
256 public static Driver get() {
257 if(global == null) {
258 synchronized(Driver.class) {
259 if(global == null)
260 global = new Driver();
261 }
262 }
263 return(global);
264 }
265
266 public static Driver current() {
267 Driver ret = current.get();
268 if(ret == null)
269 throw(new IllegalStateException("no current driver"));
270 return(ret);
271 }
272}