4 import java.util.logging.*;
5 import java.util.concurrent.*;
8 import java.nio.channels.*;
9 import java.nio.channels.spi.*;
12 private static final Logger log = Logger.getLogger("jagi.event");
13 private static final Logger hlog = Logger.getLogger("jagi.event.handler");
14 private static final ThreadLocal<Driver> current = new ThreadLocal<>();
15 private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
16 private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
17 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
20 protected Thread thread(Runnable tgt) {
21 return(new Thread(tgt));
24 protected void handle(Watcher w, int evs) {
28 } catch(Throwable t) {
35 protected void submit(Runnable task) {
39 protected void error(Watcher w, Throwable t) {
40 hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
44 class SelectPool implements Runnable {
45 final SelectorProvider provider;
47 final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
48 final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
49 final Map<Watcher, Object> paused = new IdentityHashMap<>();
51 SelectPool(SelectorProvider provider) {
52 this.provider = provider;
54 this.poll = provider.openSelector();
55 } catch(IOException e) {
56 /* I think this counts more as an assertion error. */
57 throw(new RuntimeException(e));
61 void handle(Watcher w, int evs) {
66 Driver.this.handle(w, evs);
71 } catch(Throwable t) {
73 synchronized(selectors) {
76 } catch(Exception e) {
79 log.log(Level.SEVERE, "unexpected error when submitting event", t);
89 Throwable error = null;
94 synchronized(selectors) {
95 Double first = timeheap.keypeek();
96 if((first == null) && watching.isEmpty()) {
98 selectors.remove(provider);
102 timeout = (long)Math.ceil((first - now) * 1000);
104 poll.selectedKeys().clear();
106 poll.select(timeout);
107 } catch(IOException e) {
108 throw(new RuntimeException(e));
110 for(SelectionKey key : poll.selectedKeys())
111 handle((Watcher)key.attachment(), key.readyOps());
113 } catch(Throwable t) {
118 log.log(Level.SEVERE, "selector exited abnormally", error);
122 void pause(Watcher w) {
123 if(paused.containsKey(w))
124 throw(new IllegalStateException(w + ": already paused"));
125 SelectionKey wc = watching.get(w);
126 Object tc = timeheap.remove(w);
127 if((wc == null) && (tc == null))
128 throw(new IllegalStateException(w + ": not registered"));
134 void resume(Watcher w) {
135 if(paused.remove(w) == null)
137 SelectionKey wc = watching.get(w);
138 int evs = w.events();
139 double timeout = w.timeout();
140 boolean hastime = timeout < Double.POSITIVE_INFINITY;
147 timeheap.add(w, timeout);
151 void add(Watcher w, SelectableChannel ch) {
152 if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
153 throw(new IllegalStateException(w + ": already registered"));
154 int evs = w.events();
155 double timeout = w.timeout();
156 boolean hastime = timeout < Double.POSITIVE_INFINITY;
161 w.added(Driver.this);
163 watching.put(w, ch.register(poll, evs, w));
164 } catch(ClosedChannelException e) {
165 throw(new RuntimeException("attempted to watch closed channel", e));
168 timeheap.add(w, timeout);
172 void remove(Watcher w) {
173 SelectionKey wc = watching.remove(w);
174 Object tc = timeheap.remove(w);
175 Object pc = paused.remove(w);
178 if(((wc != null) || (tc != null)) && (pc != null))
179 throw(new RuntimeException(w + ": inconsistent internal state"));
181 throw(new IllegalStateException(w + ": not registered"));
186 void update(Watcher w) {
187 SelectionKey wc = watching.get(w);
189 throw(new IllegalStateException(w + ": not registered"));
190 int evs = w.events();
191 double timeout = w.timeout();
192 boolean hastime = timeout < Double.POSITIVE_INFINITY;
199 timeheap.set(w, timeout);
206 private SelectPool pool(SelectorProvider provider) {
207 SelectPool pool = selectors.get(provider);
209 pool = new SelectPool(provider);
210 selectors.put(provider, pool);
216 public void add(Watcher w) {
217 SelectableChannel ch = w.channel();
218 synchronized(selectors) {
219 pool(ch.provider()).add(w, ch);
223 public void remove(Watcher w) {
224 SelectableChannel ch = w.channel();
225 synchronized(selectors) {
226 pool(ch.provider()).remove(w);
230 public void update(Watcher w) {
231 SelectableChannel ch = w.channel();
232 synchronized(selectors) {
233 pool(ch.provider()).update(w);
237 public double time() {
241 private static final long rtimeoff = System.nanoTime();
242 public static double rtime() {
243 return((System.nanoTime() - rtimeoff) / 1e9);
246 private static Driver global = null;
247 public static Driver get() {
249 synchronized(Driver.class) {
251 global = new Driver();
257 public static Driver current() {
258 Driver ret = current.get();
260 throw(new IllegalStateException("no current driver"));