Use -1 from Watcher.events() to signal closure, rather than 0.
[jagi.git] / src / jagi / event / Driver.java
CommitLineData
aac2f975
FT
1package jagi.event;
2
3import java.util.*;
4import java.util.logging.*;
5import java.util.concurrent.*;
6import java.io.*;
7import java.nio.*;
8import java.nio.channels.*;
9import java.nio.channels.spi.*;
10
11public class Driver {
12 private static final Logger log = Logger.getLogger("jagi.event");
13 private static final Logger hlog = Logger.getLogger("jagi.event.handler");
14 private static final ThreadLocal<Driver> current = new ThreadLocal<>();
15 private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
16 private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
17 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
18 this::thread);
19
20 protected Thread thread(Runnable tgt) {
21 return(new Thread(tgt));
22 }
23
24 protected void handle(Watcher w, int evs) {
25 try {
26 current.set(this);
27 w.handle(evs);
28 } catch(Throwable t) {
29 error(w, t);
30 } finally {
31 current.remove();
32 }
33 }
34
35 protected void submit(Runnable task) {
36 worker.submit(task);
37 }
38
39 protected void error(Watcher w, Throwable t) {
40 hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
41 remove(w);
42 }
43
44 class SelectPool implements Runnable {
45 final SelectorProvider provider;
46 final Selector poll;
47 final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
48 final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
49 final Map<Watcher, Object> paused = new IdentityHashMap<>();
50
51 SelectPool(SelectorProvider provider) {
52 this.provider = provider;
53 try {
54 this.poll = provider.openSelector();
55 } catch(IOException e) {
56 /* I think this counts more as an assertion error. */
57 throw(new RuntimeException(e));
58 }
59 }
60
61 void handle(Watcher w, int evs) {
62 try {
63 pause(w);
64 submit(() -> {
65 try {
66 Driver.this.handle(w, evs);
67 } finally {
68 resume(w);
69 }
70 });
71 } catch(Throwable t) {
72 try {
73 synchronized(selectors) {
74 remove(w);
75 }
76 } catch(Exception e) {
77 t.addSuppressed(e);
78 }
79 log.log(Level.SEVERE, "unexpected error when submitting event", t);
80 }
81 }
82
83 void start() {
84 thread(this).start();
85 }
86
87 public void run() {
88 boolean quit = false;
89 Throwable error = null;
90 try {
91 while(true) {
92 double now = time();
93 long timeout = 0;
94 synchronized(selectors) {
95 Double first = timeheap.keypeek();
96 if((first == null) && watching.isEmpty()) {
97 quit = true;
98 selectors.remove(provider);
99 return;
100 }
101 if(first != null)
102 timeout = (long)Math.ceil((first - now) * 1000);
103 }
104 poll.selectedKeys().clear();
105 try {
106 poll.select(timeout);
107 } catch(IOException e) {
108 throw(new RuntimeException(e));
109 }
110 for(SelectionKey key : poll.selectedKeys())
111 handle((Watcher)key.attachment(), key.readyOps());
112 }
113 } catch(Throwable t) {
114 error = t;
115 throw(t);
116 } finally {
117 if(!quit)
118 log.log(Level.SEVERE, "selector exited abnormally", error);
119 }
120 }
121
122 void pause(Watcher w) {
123 if(paused.containsKey(w))
124 throw(new IllegalStateException(w + ": already paused"));
125 SelectionKey wc = watching.get(w);
126 Object tc = timeheap.remove(w);
127 if((wc == null) && (tc == null))
128 throw(new IllegalStateException(w + ": not registered"));
129 if(wc != null)
130 wc.interestOps(0);
131 paused.put(w, this);
132 }
133
134 void resume(Watcher w) {
135 if(paused.remove(w) == null)
136 return;
137 SelectionKey wc = watching.get(w);
138 int evs = w.events();
139 double timeout = w.timeout();
140 boolean hastime = timeout < Double.POSITIVE_INFINITY;
2c1781f3 141 if(evs < 0) {
aac2f975
FT
142 remove(w);
143 return;
144 }
145 wc.interestOps(evs);
146 if(hastime)
147 timeheap.add(w, timeout);
148 poll.wakeup();
149 }
150
151 void add(Watcher w, SelectableChannel ch) {
152 if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
153 throw(new IllegalStateException(w + ": already registered"));
154 int evs = w.events();
155 double timeout = w.timeout();
156 boolean hastime = timeout < Double.POSITIVE_INFINITY;
2c1781f3
FT
157 if(evs < 0) {
158 submit(w::close);
aac2f975
FT
159 return;
160 }
2c1781f3 161 w.added(Driver.this);
aac2f975
FT
162 try {
163 watching.put(w, ch.register(poll, evs, w));
164 } catch(ClosedChannelException e) {
165 throw(new RuntimeException("attempted to watch closed channel", e));
166 }
167 if(hastime)
168 timeheap.add(w, timeout);
169 poll.wakeup();
170 }
171
172 void remove(Watcher w) {
173 SelectionKey wc = watching.remove(w);
174 Object tc = timeheap.remove(w);
175 Object pc = paused.remove(w);
176 if(wc != null)
177 wc.cancel();
178 if(((wc != null) || (tc != null)) && (pc != null))
179 throw(new RuntimeException(w + ": inconsistent internal state"));
180 if(wc == null)
181 throw(new IllegalStateException(w + ": not registered"));
2c1781f3 182 submit(w::close);
aac2f975
FT
183 poll.wakeup();
184 }
185
186 void update(Watcher w) {
187 SelectionKey wc = watching.get(w);
188 if(wc == null)
189 throw(new IllegalStateException(w + ": not registered"));
190 int evs = w.events();
191 double timeout = w.timeout();
192 boolean hastime = timeout < Double.POSITIVE_INFINITY;
2c1781f3 193 if(evs < 0) {
aac2f975
FT
194 remove(w);
195 return;
196 }
197 wc.interestOps(evs);
198 if(hastime)
199 timeheap.set(w, timeout);
200 else
201 timeheap.remove(w);
202 poll.wakeup();
203 }
204 }
205
206 private SelectPool pool(SelectorProvider provider) {
207 SelectPool pool = selectors.get(provider);
208 if(pool == null) {
209 pool = new SelectPool(provider);
210 selectors.put(provider, pool);
211 pool.start();
212 }
213 return(pool);
214 }
215
216 public void add(Watcher w) {
217 SelectableChannel ch = w.channel();
218 synchronized(selectors) {
219 pool(ch.provider()).add(w, ch);
220 }
221 }
222
223 public void remove(Watcher w) {
224 SelectableChannel ch = w.channel();
225 synchronized(selectors) {
226 pool(ch.provider()).remove(w);
227 }
228 }
229
230 public void update(Watcher w) {
231 SelectableChannel ch = w.channel();
232 synchronized(selectors) {
233 pool(ch.provider()).update(w);
234 }
235 }
236
237 public double time() {
238 return(rtime());
239 }
240
241 private static final long rtimeoff = System.nanoTime();
242 public static double rtime() {
243 return((System.nanoTime() - rtimeoff) / 1e9);
244 }
245
246 private static Driver global = null;
247 public static Driver get() {
248 if(global == null) {
249 synchronized(Driver.class) {
250 if(global == null)
251 global = new Driver();
252 }
253 }
254 return(global);
255 }
256
257 public static Driver current() {
258 Driver ret = current.get();
259 if(ret == null)
260 throw(new IllegalStateException("no current driver"));
261 return(ret);
262 }
263}