+package jagi.event;
+
+import java.util.*;
+import java.util.logging.*;
+import java.util.concurrent.*;
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.*;
+import java.nio.channels.spi.*;
+
+public class Driver {
+ private static final Logger log = Logger.getLogger("jagi.event");
+ private static final Logger hlog = Logger.getLogger("jagi.event.handler");
+ private static final ThreadLocal<Driver> current = new ThreadLocal<>();
+ private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
+ private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
+ 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
+ this::thread);
+
+ protected Thread thread(Runnable tgt) {
+ return(new Thread(tgt));
+ }
+
+ protected void handle(Watcher w, int evs) {
+ try {
+ current.set(this);
+ w.handle(evs);
+ } catch(Throwable t) {
+ error(w, t);
+ } finally {
+ current.remove();
+ }
+ }
+
+ protected void submit(Runnable task) {
+ worker.submit(task);
+ }
+
+ protected void error(Watcher w, Throwable t) {
+ hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
+ remove(w);
+ }
+
+ class SelectPool implements Runnable {
+ final SelectorProvider provider;
+ final Selector poll;
+ final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
+ final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
+ final Map<Watcher, Object> paused = new IdentityHashMap<>();
+
+ SelectPool(SelectorProvider provider) {
+ this.provider = provider;
+ try {
+ this.poll = provider.openSelector();
+ } catch(IOException e) {
+ /* I think this counts more as an assertion error. */
+ throw(new RuntimeException(e));
+ }
+ }
+
+ void handle(Watcher w, int evs) {
+ try {
+ pause(w);
+ submit(() -> {
+ try {
+ Driver.this.handle(w, evs);
+ } finally {
+ resume(w);
+ }
+ });
+ } catch(Throwable t) {
+ try {
+ synchronized(selectors) {
+ remove(w);
+ }
+ } catch(Exception e) {
+ t.addSuppressed(e);
+ }
+ log.log(Level.SEVERE, "unexpected error when submitting event", t);
+ }
+ }
+
+ void start() {
+ thread(this).start();
+ }
+
+ public void run() {
+ boolean quit = false;
+ Throwable error = null;
+ try {
+ while(true) {
+ double now = time();
+ long timeout = 0;
+ synchronized(selectors) {
+ Double first = timeheap.keypeek();
+ if((first == null) && watching.isEmpty()) {
+ quit = true;
+ selectors.remove(provider);
+ return;
+ }
+ if(first != null)
+ timeout = (long)Math.ceil((first - now) * 1000);
+ }
+ poll.selectedKeys().clear();
+ try {
+ poll.select(timeout);
+ } catch(IOException e) {
+ throw(new RuntimeException(e));
+ }
+ for(SelectionKey key : poll.selectedKeys())
+ handle((Watcher)key.attachment(), key.readyOps());
+ }
+ } catch(Throwable t) {
+ error = t;
+ throw(t);
+ } finally {
+ if(!quit)
+ log.log(Level.SEVERE, "selector exited abnormally", error);
+ }
+ }
+
+ void pause(Watcher w) {
+ if(paused.containsKey(w))
+ throw(new IllegalStateException(w + ": already paused"));
+ SelectionKey wc = watching.get(w);
+ Object tc = timeheap.remove(w);
+ if((wc == null) && (tc == null))
+ throw(new IllegalStateException(w + ": not registered"));
+ if(wc != null)
+ wc.interestOps(0);
+ paused.put(w, this);
+ }
+
+ void resume(Watcher w) {
+ if(paused.remove(w) == null)
+ return;
+ SelectionKey wc = watching.get(w);
+ int evs = w.events();
+ double timeout = w.timeout();
+ boolean hastime = timeout < Double.POSITIVE_INFINITY;
+ if((evs == 0) && !hastime) {
+ remove(w);
+ return;
+ }
+ wc.interestOps(evs);
+ if(hastime)
+ timeheap.add(w, timeout);
+ poll.wakeup();
+ }
+
+ void add(Watcher w, SelectableChannel ch) {
+ if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
+ throw(new IllegalStateException(w + ": already registered"));
+ int evs = w.events();
+ double timeout = w.timeout();
+ boolean hastime = timeout < Double.POSITIVE_INFINITY;
+ if((evs == 0) && !hastime) {
+ w.close();
+ return;
+ }
+ try {
+ watching.put(w, ch.register(poll, evs, w));
+ } catch(ClosedChannelException e) {
+ throw(new RuntimeException("attempted to watch closed channel", e));
+ }
+ if(hastime)
+ timeheap.add(w, timeout);
+ poll.wakeup();
+ }
+
+ void remove(Watcher w) {
+ SelectionKey wc = watching.remove(w);
+ Object tc = timeheap.remove(w);
+ Object pc = paused.remove(w);
+ if(wc != null)
+ wc.cancel();
+ if(((wc != null) || (tc != null)) && (pc != null))
+ throw(new RuntimeException(w + ": inconsistent internal state"));
+ if(wc == null)
+ throw(new IllegalStateException(w + ": not registered"));
+ w.close();
+ poll.wakeup();
+ }
+
+ void update(Watcher w) {
+ SelectionKey wc = watching.get(w);
+ if(wc == null)
+ throw(new IllegalStateException(w + ": not registered"));
+ int evs = w.events();
+ double timeout = w.timeout();
+ boolean hastime = timeout < Double.POSITIVE_INFINITY;
+ if((evs == 0) && !hastime) {
+ remove(w);
+ return;
+ }
+ wc.interestOps(evs);
+ if(hastime)
+ timeheap.set(w, timeout);
+ else
+ timeheap.remove(w);
+ poll.wakeup();
+ }
+ }
+
+ private SelectPool pool(SelectorProvider provider) {
+ SelectPool pool = selectors.get(provider);
+ if(pool == null) {
+ pool = new SelectPool(provider);
+ selectors.put(provider, pool);
+ pool.start();
+ }
+ return(pool);
+ }
+
+ public void add(Watcher w) {
+ SelectableChannel ch = w.channel();
+ synchronized(selectors) {
+ pool(ch.provider()).add(w, ch);
+ }
+ }
+
+ public void remove(Watcher w) {
+ SelectableChannel ch = w.channel();
+ synchronized(selectors) {
+ pool(ch.provider()).remove(w);
+ }
+ }
+
+ public void update(Watcher w) {
+ SelectableChannel ch = w.channel();
+ synchronized(selectors) {
+ pool(ch.provider()).update(w);
+ }
+ }
+
+ public double time() {
+ return(rtime());
+ }
+
+ private static final long rtimeoff = System.nanoTime();
+ public static double rtime() {
+ return((System.nanoTime() - rtimeoff) / 1e9);
+ }
+
+ private static Driver global = null;
+ public static Driver get() {
+ if(global == null) {
+ synchronized(Driver.class) {
+ if(global == null)
+ global = new Driver();
+ }
+ }
+ return(global);
+ }
+
+ public static Driver current() {
+ Driver ret = current.get();
+ if(ret == null)
+ throw(new IllegalStateException("no current driver"));
+ return(ret);
+ }
+}