4 import java.util.logging.*;
5 import java.util.concurrent.*;
8 import java.nio.channels.*;
9 import java.nio.channels.spi.*;
12 private static final Logger log = Logger.getLogger("jagi.event");
13 private static final Logger hlog = Logger.getLogger("jagi.event.handler");
14 private static final ThreadLocal<Driver> current = new ThreadLocal<>();
15 private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
16 private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
17 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
20 protected Thread thread(Runnable tgt) {
21 return(new Thread(tgt));
24 protected void handle(Watcher w, int evs) {
28 } catch(Throwable t) {
35 protected void submit(Runnable task) {
39 protected void error(Watcher w, Throwable t) {
40 hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
44 class SelectPool implements Runnable {
45 final SelectorProvider provider;
47 final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
48 final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
49 final Map<Watcher, Object> paused = new IdentityHashMap<>();
51 SelectPool(SelectorProvider provider) {
52 this.provider = provider;
54 this.poll = provider.openSelector();
55 } catch(IOException e) {
56 /* I think this counts more as an assertion error. */
57 throw(new RuntimeException(e));
61 void handle(Watcher w, int evs) {
62 if(!watching.containsKey(w))
68 Driver.this.handle(w, evs);
73 } catch(Throwable t) {
75 synchronized(selectors) {
78 } catch(Exception e) {
81 log.log(Level.SEVERE, "unexpected error when submitting event", t);
91 Throwable error = null;
96 synchronized(selectors) {
97 Double first = timeheap.keypeek();
98 if((first == null) && watching.isEmpty()) {
100 selectors.remove(provider);
104 timeout = (long)Math.ceil((first - now) * 1000);
106 poll.selectedKeys().clear();
108 poll.select(timeout);
109 } catch(IOException e) {
110 throw(new RuntimeException(e));
112 for(SelectionKey key : poll.selectedKeys())
113 handle((Watcher)key.attachment(), key.readyOps());
116 Double first = timeheap.keypeek();
117 if((first == null) || (first > now))
119 handle(timeheap.remove(), 0);
122 } catch(Throwable t) {
127 log.log(Level.SEVERE, "selector exited abnormally", error);
131 void pause(Watcher w) {
132 if(paused.containsKey(w))
133 throw(new IllegalStateException(w + ": already paused"));
134 SelectionKey wc = watching.get(w);
135 Object tc = timeheap.remove(w);
136 if((wc == null) && (tc == null))
137 throw(new IllegalStateException(w + ": not registered"));
143 void resume(Watcher w) {
144 if(paused.remove(w) == null)
146 SelectionKey wc = watching.get(w);
147 int evs = w.events();
148 double timeout = w.timeout();
149 boolean hastime = timeout < Double.POSITIVE_INFINITY;
156 timeheap.add(w, timeout);
160 void add(Watcher w, SelectableChannel ch) {
161 if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
162 throw(new IllegalStateException(w + ": already registered"));
163 int evs = w.events();
164 double timeout = w.timeout();
165 boolean hastime = timeout < Double.POSITIVE_INFINITY;
170 w.added(Driver.this);
172 watching.put(w, ch.register(poll, evs, w));
173 } catch(ClosedChannelException e) {
174 throw(new RuntimeException("attempted to watch closed channel", e));
177 timeheap.add(w, timeout);
181 void remove(Watcher w) {
182 SelectionKey wc = watching.remove(w);
183 Object tc = timeheap.remove(w);
184 Object pc = paused.remove(w);
187 if(((wc != null) || (tc != null)) && (pc != null))
188 throw(new RuntimeException(w + ": inconsistent internal state"));
190 throw(new IllegalStateException(w + ": not registered"));
195 void update(Watcher w) {
196 SelectionKey wc = watching.get(w);
198 throw(new IllegalStateException(w + ": not registered"));
199 int evs = w.events();
200 double timeout = w.timeout();
201 boolean hastime = timeout < Double.POSITIVE_INFINITY;
208 timeheap.set(w, timeout);
215 private SelectPool pool(SelectorProvider provider) {
216 SelectPool pool = selectors.get(provider);
218 pool = new SelectPool(provider);
219 selectors.put(provider, pool);
225 public void add(Watcher w) {
226 SelectableChannel ch = w.channel();
227 synchronized(selectors) {
228 pool(ch.provider()).add(w, ch);
232 public void remove(Watcher w) {
233 SelectableChannel ch = w.channel();
234 synchronized(selectors) {
235 pool(ch.provider()).remove(w);
239 public void update(Watcher w) {
240 SelectableChannel ch = w.channel();
241 synchronized(selectors) {
242 pool(ch.provider()).update(w);
246 public double time() {
250 private static final long rtimeoff = System.nanoTime();
251 public static double rtime() {
252 return((System.nanoTime() - rtimeoff) / 1e9);
255 private static Driver global = null;
256 public static Driver get() {
258 synchronized(Driver.class) {
260 global = new Driver();
266 public static Driver current() {
267 Driver ret = current.get();
269 throw(new IllegalStateException("no current driver"));