Added a basic event-loop driver.
[jagi.git] / src / jagi / event / Driver.java
diff --git a/src/jagi/event/Driver.java b/src/jagi/event/Driver.java
new file mode 100644 (file)
index 0000000..c9ca638
--- /dev/null
@@ -0,0 +1,262 @@
+package jagi.event;
+
+import java.util.*;
+import java.util.logging.*;
+import java.util.concurrent.*;
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.*;
+import java.nio.channels.spi.*;
+
+public class Driver {
+    private static final Logger log = Logger.getLogger("jagi.event");
+    private static final Logger hlog = Logger.getLogger("jagi.event.handler");
+    private static final ThreadLocal<Driver> current = new ThreadLocal<>();
+    private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
+    private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
+                                                                 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
+                                                                 this::thread);
+
+    protected Thread thread(Runnable tgt) {
+       return(new Thread(tgt));
+    }
+
+    protected void handle(Watcher w, int evs) {
+       try {
+           current.set(this);
+           w.handle(evs);
+       } catch(Throwable t) {
+           error(w, t);
+       } finally {
+           current.remove();
+       }
+    }
+
+    protected void submit(Runnable task) {
+       worker.submit(task);
+    }
+
+    protected void error(Watcher w, Throwable t) {
+       hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
+       remove(w);
+    }
+
+    class SelectPool implements Runnable {
+       final SelectorProvider provider;
+       final Selector poll;
+       final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
+       final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
+       final Map<Watcher, Object> paused = new IdentityHashMap<>();
+
+       SelectPool(SelectorProvider provider) {
+           this.provider = provider;
+           try {
+               this.poll = provider.openSelector();
+           } catch(IOException e) {
+               /* I think this counts more as an assertion error. */
+               throw(new RuntimeException(e));
+           }
+       }
+
+       void handle(Watcher w, int evs) {
+           try {
+               pause(w);
+               submit(() -> {
+                       try {
+                           Driver.this.handle(w, evs);
+                       } finally {
+                           resume(w);
+                       }
+                   });
+           } catch(Throwable t) {
+               try {
+                   synchronized(selectors) {
+                       remove(w);
+                   }
+               } catch(Exception e) {
+                   t.addSuppressed(e);
+               }
+               log.log(Level.SEVERE, "unexpected error when submitting event", t);
+           }
+       }
+
+       void start() {
+           thread(this).start();
+       }
+
+       public void run() {
+           boolean quit = false;
+           Throwable error = null;
+           try {
+               while(true) {
+                   double now = time();
+                   long timeout = 0;
+                   synchronized(selectors) {
+                       Double first = timeheap.keypeek();
+                       if((first == null) && watching.isEmpty()) {
+                           quit = true;
+                           selectors.remove(provider);
+                           return;
+                       }
+                       if(first != null)
+                           timeout = (long)Math.ceil((first - now) * 1000);
+                   }
+                   poll.selectedKeys().clear();
+                   try {
+                       poll.select(timeout);
+                   } catch(IOException e) {
+                       throw(new RuntimeException(e));
+                   }
+                   for(SelectionKey key : poll.selectedKeys())
+                       handle((Watcher)key.attachment(), key.readyOps());
+               }
+           } catch(Throwable t) {
+               error = t;
+               throw(t);
+           } finally {
+               if(!quit)
+                   log.log(Level.SEVERE, "selector exited abnormally", error);
+           }
+       }
+
+       void pause(Watcher w) {
+           if(paused.containsKey(w))
+               throw(new IllegalStateException(w + ": already paused"));
+           SelectionKey wc = watching.get(w);
+           Object tc = timeheap.remove(w);
+           if((wc == null) && (tc == null))
+               throw(new IllegalStateException(w + ": not registered"));
+           if(wc != null)
+               wc.interestOps(0);
+           paused.put(w, this);
+       }
+
+       void resume(Watcher w) {
+           if(paused.remove(w) == null)
+               return;
+           SelectionKey wc = watching.get(w);
+           int evs = w.events();
+           double timeout = w.timeout();
+           boolean hastime = timeout < Double.POSITIVE_INFINITY;
+           if((evs == 0) && !hastime) {
+               remove(w);
+               return;
+           }
+           wc.interestOps(evs);
+           if(hastime)
+               timeheap.add(w, timeout);
+           poll.wakeup();
+       }
+
+       void add(Watcher w, SelectableChannel ch) {
+           if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
+               throw(new IllegalStateException(w + ": already registered"));
+           int evs = w.events();
+           double timeout = w.timeout();
+           boolean hastime = timeout < Double.POSITIVE_INFINITY;
+           if((evs == 0) && !hastime) {
+               w.close();
+               return;
+           }
+           try {
+               watching.put(w, ch.register(poll, evs, w));
+           } catch(ClosedChannelException e) {
+               throw(new RuntimeException("attempted to watch closed channel", e));
+           }
+           if(hastime)
+               timeheap.add(w, timeout);
+           poll.wakeup();
+       }
+
+       void remove(Watcher w) {
+           SelectionKey wc = watching.remove(w);
+           Object tc = timeheap.remove(w);
+           Object pc = paused.remove(w);
+           if(wc != null)
+               wc.cancel();
+           if(((wc != null) || (tc != null)) && (pc != null))
+               throw(new RuntimeException(w + ": inconsistent internal state"));
+           if(wc == null)
+               throw(new IllegalStateException(w + ": not registered"));
+           w.close();
+           poll.wakeup();
+       }
+
+       void update(Watcher w) {
+           SelectionKey wc = watching.get(w);
+           if(wc == null)
+               throw(new IllegalStateException(w + ": not registered"));
+           int evs = w.events();
+           double timeout = w.timeout();
+           boolean hastime = timeout < Double.POSITIVE_INFINITY;
+           if((evs == 0) && !hastime) {
+               remove(w);
+               return;
+           }
+           wc.interestOps(evs);
+           if(hastime)
+               timeheap.set(w, timeout);
+           else
+               timeheap.remove(w);
+           poll.wakeup();
+       }
+    }
+
+    private SelectPool pool(SelectorProvider provider) {
+       SelectPool pool = selectors.get(provider);
+       if(pool == null) {
+           pool = new SelectPool(provider);
+           selectors.put(provider, pool);
+           pool.start();
+       }
+       return(pool);
+    }
+
+    public void add(Watcher w) {
+       SelectableChannel ch = w.channel();
+       synchronized(selectors) {
+           pool(ch.provider()).add(w, ch);
+       }
+    }
+
+    public void remove(Watcher w) {
+       SelectableChannel ch = w.channel();
+       synchronized(selectors) {
+           pool(ch.provider()).remove(w);
+       }
+    }
+
+    public void update(Watcher w) {
+       SelectableChannel ch = w.channel();
+       synchronized(selectors) {
+           pool(ch.provider()).update(w);
+       }
+    }
+
+    public double time() {
+       return(rtime());
+    }
+
+    private static final long rtimeoff = System.nanoTime();
+    public static double rtime() {
+       return((System.nanoTime() - rtimeoff) / 1e9);
+    }
+
+    private static Driver global = null;
+    public static Driver get() {
+       if(global == null) {
+           synchronized(Driver.class) {
+               if(global == null)
+                   global = new Driver();
+           }
+       }
+       return(global);
+    }
+
+    public static Driver current() {
+       Driver ret = current.get();
+       if(ret == null)
+           throw(new IllegalStateException("no current driver"));
+       return(ret);
+    }
+}