Added a basic event-loop driver.
authorFredrik Tolf <fredrik@dolda2000.com>
Sun, 13 Feb 2022 17:00:18 +0000 (18:00 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Sun, 13 Feb 2022 17:00:18 +0000 (18:00 +0100)
src/jagi/event/Driver.java [new file with mode: 0644]
src/jagi/event/Heap.java [new file with mode: 0644]
src/jagi/event/Watcher.java [new file with mode: 0644]

diff --git a/src/jagi/event/Driver.java b/src/jagi/event/Driver.java
new file mode 100644 (file)
index 0000000..c9ca638
--- /dev/null
@@ -0,0 +1,262 @@
+package jagi.event;
+
+import java.util.*;
+import java.util.logging.*;
+import java.util.concurrent.*;
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.*;
+import java.nio.channels.spi.*;
+
+public class Driver {
+    private static final Logger log = Logger.getLogger("jagi.event");
+    private static final Logger hlog = Logger.getLogger("jagi.event.handler");
+    private static final ThreadLocal<Driver> current = new ThreadLocal<>();
+    private final Map<SelectorProvider, SelectPool> selectors = new HashMap<>();
+    private final ExecutorService worker = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(),
+                                                                 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(128),
+                                                                 this::thread);
+
+    protected Thread thread(Runnable tgt) {
+       return(new Thread(tgt));
+    }
+
+    protected void handle(Watcher w, int evs) {
+       try {
+           current.set(this);
+           w.handle(evs);
+       } catch(Throwable t) {
+           error(w, t);
+       } finally {
+           current.remove();
+       }
+    }
+
+    protected void submit(Runnable task) {
+       worker.submit(task);
+    }
+
+    protected void error(Watcher w, Throwable t) {
+       hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
+       remove(w);
+    }
+
+    class SelectPool implements Runnable {
+       final SelectorProvider provider;
+       final Selector poll;
+       final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
+       final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
+       final Map<Watcher, Object> paused = new IdentityHashMap<>();
+
+       SelectPool(SelectorProvider provider) {
+           this.provider = provider;
+           try {
+               this.poll = provider.openSelector();
+           } catch(IOException e) {
+               /* I think this counts more as an assertion error. */
+               throw(new RuntimeException(e));
+           }
+       }
+
+       void handle(Watcher w, int evs) {
+           try {
+               pause(w);
+               submit(() -> {
+                       try {
+                           Driver.this.handle(w, evs);
+                       } finally {
+                           resume(w);
+                       }
+                   });
+           } catch(Throwable t) {
+               try {
+                   synchronized(selectors) {
+                       remove(w);
+                   }
+               } catch(Exception e) {
+                   t.addSuppressed(e);
+               }
+               log.log(Level.SEVERE, "unexpected error when submitting event", t);
+           }
+       }
+
+       void start() {
+           thread(this).start();
+       }
+
+       public void run() {
+           boolean quit = false;
+           Throwable error = null;
+           try {
+               while(true) {
+                   double now = time();
+                   long timeout = 0;
+                   synchronized(selectors) {
+                       Double first = timeheap.keypeek();
+                       if((first == null) && watching.isEmpty()) {
+                           quit = true;
+                           selectors.remove(provider);
+                           return;
+                       }
+                       if(first != null)
+                           timeout = (long)Math.ceil((first - now) * 1000);
+                   }
+                   poll.selectedKeys().clear();
+                   try {
+                       poll.select(timeout);
+                   } catch(IOException e) {
+                       throw(new RuntimeException(e));
+                   }
+                   for(SelectionKey key : poll.selectedKeys())
+                       handle((Watcher)key.attachment(), key.readyOps());
+               }
+           } catch(Throwable t) {
+               error = t;
+               throw(t);
+           } finally {
+               if(!quit)
+                   log.log(Level.SEVERE, "selector exited abnormally", error);
+           }
+       }
+
+       void pause(Watcher w) {
+           if(paused.containsKey(w))
+               throw(new IllegalStateException(w + ": already paused"));
+           SelectionKey wc = watching.get(w);
+           Object tc = timeheap.remove(w);
+           if((wc == null) && (tc == null))
+               throw(new IllegalStateException(w + ": not registered"));
+           if(wc != null)
+               wc.interestOps(0);
+           paused.put(w, this);
+       }
+
+       void resume(Watcher w) {
+           if(paused.remove(w) == null)
+               return;
+           SelectionKey wc = watching.get(w);
+           int evs = w.events();
+           double timeout = w.timeout();
+           boolean hastime = timeout < Double.POSITIVE_INFINITY;
+           if((evs == 0) && !hastime) {
+               remove(w);
+               return;
+           }
+           wc.interestOps(evs);
+           if(hastime)
+               timeheap.add(w, timeout);
+           poll.wakeup();
+       }
+
+       void add(Watcher w, SelectableChannel ch) {
+           if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w))
+               throw(new IllegalStateException(w + ": already registered"));
+           int evs = w.events();
+           double timeout = w.timeout();
+           boolean hastime = timeout < Double.POSITIVE_INFINITY;
+           if((evs == 0) && !hastime) {
+               w.close();
+               return;
+           }
+           try {
+               watching.put(w, ch.register(poll, evs, w));
+           } catch(ClosedChannelException e) {
+               throw(new RuntimeException("attempted to watch closed channel", e));
+           }
+           if(hastime)
+               timeheap.add(w, timeout);
+           poll.wakeup();
+       }
+
+       void remove(Watcher w) {
+           SelectionKey wc = watching.remove(w);
+           Object tc = timeheap.remove(w);
+           Object pc = paused.remove(w);
+           if(wc != null)
+               wc.cancel();
+           if(((wc != null) || (tc != null)) && (pc != null))
+               throw(new RuntimeException(w + ": inconsistent internal state"));
+           if(wc == null)
+               throw(new IllegalStateException(w + ": not registered"));
+           w.close();
+           poll.wakeup();
+       }
+
+       void update(Watcher w) {
+           SelectionKey wc = watching.get(w);
+           if(wc == null)
+               throw(new IllegalStateException(w + ": not registered"));
+           int evs = w.events();
+           double timeout = w.timeout();
+           boolean hastime = timeout < Double.POSITIVE_INFINITY;
+           if((evs == 0) && !hastime) {
+               remove(w);
+               return;
+           }
+           wc.interestOps(evs);
+           if(hastime)
+               timeheap.set(w, timeout);
+           else
+               timeheap.remove(w);
+           poll.wakeup();
+       }
+    }
+
+    private SelectPool pool(SelectorProvider provider) {
+       SelectPool pool = selectors.get(provider);
+       if(pool == null) {
+           pool = new SelectPool(provider);
+           selectors.put(provider, pool);
+           pool.start();
+       }
+       return(pool);
+    }
+
+    public void add(Watcher w) {
+       SelectableChannel ch = w.channel();
+       synchronized(selectors) {
+           pool(ch.provider()).add(w, ch);
+       }
+    }
+
+    public void remove(Watcher w) {
+       SelectableChannel ch = w.channel();
+       synchronized(selectors) {
+           pool(ch.provider()).remove(w);
+       }
+    }
+
+    public void update(Watcher w) {
+       SelectableChannel ch = w.channel();
+       synchronized(selectors) {
+           pool(ch.provider()).update(w);
+       }
+    }
+
+    public double time() {
+       return(rtime());
+    }
+
+    private static final long rtimeoff = System.nanoTime();
+    public static double rtime() {
+       return((System.nanoTime() - rtimeoff) / 1e9);
+    }
+
+    private static Driver global = null;
+    public static Driver get() {
+       if(global == null) {
+           synchronized(Driver.class) {
+               if(global == null)
+                   global = new Driver();
+           }
+       }
+       return(global);
+    }
+
+    public static Driver current() {
+       Driver ret = current.get();
+       if(ret == null)
+           throw(new IllegalStateException("no current driver"));
+       return(ret);
+    }
+}
diff --git a/src/jagi/event/Heap.java b/src/jagi/event/Heap.java
new file mode 100644 (file)
index 0000000..aec2991
--- /dev/null
@@ -0,0 +1,159 @@
+package jagi.event;
+
+import java.util.*;
+
+public class Heap<V, K> {
+    private static final Object[] empty = {};
+    private final Comparator<? super K> cmp;
+    private final Map<V, Integer> index = new IdentityHashMap<>();
+    private Object[] vbuf = empty, kbuf = empty;
+    private int size;
+
+    public Heap(Comparator<? super K> cmp) {
+       this.cmp = cmp;
+    }
+
+    @SuppressWarnings("unchecked")
+    private V val(int i) {return((V)vbuf[i]);}
+    @SuppressWarnings("unchecked")
+    private K key(int i) {return((K)kbuf[i]);}
+
+    private void raise(V val, K key, int i) {
+       while(i > 0) {
+           int p = (i - 1) >>> 1;
+           if(cmp.compare(key(p), key) <= 0)
+               break;
+           vbuf[i] = vbuf[p];
+           kbuf[i] = kbuf[p];
+           index.put(val(i), i);
+           i = p;
+       }
+       vbuf[i] = val;
+       kbuf[i] = key;
+       index.put(val, i);
+    }
+
+    private void lower(V val, K key, int i) {
+       while(true) {
+           int c1 = (i << 1) + 1, c2 = c1 + 1;
+           if(c1 >= size)
+               break;
+           int c = ((c2 < size) && (cmp.compare(key(c1), key(c2)) > 0)) ? c2 : c1;
+           if(cmp.compare(key(c), key) > 0)
+               break;
+           vbuf[i] = vbuf[c];
+           kbuf[i] = kbuf[c];
+           index.put(val(i), i);
+           i = c;
+       }
+       vbuf[i] = val;
+       kbuf[i] = key;
+       index.put(val, i);
+    }
+
+    private void adjust(V val, K key, int i) {
+       if((i > 0) && cmp.compare(key((i - 1) >> 1), key) > 0)
+           raise(val, key, i);
+       else
+           lower(val, key, i);
+    }
+
+    public int size() {
+       return(size);
+    }
+
+    public V peek() {
+       return((size > 0) ? val(0) : null);
+    }
+
+    public V poll() {
+       if(size == 0)
+           return(null);
+       V ret = val(0);
+       remove(0);
+       return(ret);
+    }
+
+    public V remove() {
+       if(size == 0)
+           throw(new NoSuchElementException());
+       V ret = val(0);
+       remove(0);
+       return(ret);
+    }
+
+    public K keypeek() {
+       return((size > 0) ? key(0) : null);
+    }
+
+    public void add(V val, K key) {
+       if(index.containsKey(val))
+           throw(new IllegalStateException());
+       int p = size++;
+       if(p >= vbuf.length) {
+           int n = Math.max(vbuf.length * 2, 16);
+           vbuf = Arrays.copyOf(vbuf, n);
+           kbuf = Arrays.copyOf(kbuf, n);
+       }
+       raise(val, key, p);
+    }
+
+    public K update(V val, K key) {
+       Integer p = index.get(val);
+       if(p == null)
+           throw(new NoSuchElementException());
+       K ret = key(p);
+       adjust(val, key, p);
+       return(ret);
+    }
+
+    public K set(V val, K key) {
+       Integer p = index.get(val);
+       if(p == null) {
+           add(val, key);
+           return(null);
+       }
+       K ret = key(p);
+       adjust(val, key, p);
+       return(ret);
+    }
+
+    private K remove(int p) {
+       K ret = key(p);
+       size--;
+       if(p == size) {
+       } else if(p < size) {
+           adjust(val(size), key(size), p);
+       } else {
+           throw(new AssertionError());
+       }
+       vbuf[size] = null;
+       kbuf[size] = null;
+       return(ret);
+    }
+
+    public K remove(V val) {
+       Integer p = index.remove(val);
+       if(p == null)
+           return(null);
+       return(remove(p));
+    }
+
+    public boolean contains(V val) {
+       return(index.containsKey(val));
+    }
+
+    public String toString() {
+       StringBuilder buf = new StringBuilder();
+       buf.append('[');
+       for(int i = 0; i < size; i++) {
+           if(i > 0)
+               buf.append(", ");
+           buf.append(String.valueOf(kbuf[i]));
+           buf.append('=');
+           buf.append(String.valueOf(vbuf[i]));
+       }
+       buf.append(']');
+       return(buf.toString());
+    }
+}
diff --git a/src/jagi/event/Watcher.java b/src/jagi/event/Watcher.java
new file mode 100644 (file)
index 0000000..4b85c80
--- /dev/null
@@ -0,0 +1,12 @@
+package jagi.event;
+
+import java.io.*;
+import java.nio.channels.*;
+
+public interface Watcher {
+    public SelectableChannel channel();
+    public int events();
+    public void handle(int events) throws Exception;
+    public default void close() {}
+    public default double timeout() {return(Double.POSITIVE_INFINITY);}
+}