Handle cancelled selection keys... "properly"?
[jagi.git] / src / jagi / event / Driver.java
index 84238d2..6ee920f 100644 (file)
@@ -26,7 +26,18 @@ public class Driver {
            current.set(this);
            w.handle(evs);
        } catch(Throwable t) {
-           error(w, t);
+           error(w, t, "handling event");
+       } finally {
+           current.remove();
+       }
+    }
+
+    protected void close(Watcher w) {
+       try {
+           current.set(this);
+           w.close();
+       } catch(Throwable t) {
+           error(w, t, "closing");
        } finally {
            current.remove();
        }
@@ -36,8 +47,8 @@ public class Driver {
        worker.submit(task);
     }
 
-    protected void error(Watcher w, Throwable t) {
-       hlog.log(Level.WARNING, w + ": uncaught error when handling event", t);
+    protected void error(Watcher w, Throwable t, String thing) {
+       hlog.log(Level.WARNING, w + ": uncaught error when " + thing, t);
        remove(w);
     }
 
@@ -47,6 +58,7 @@ public class Driver {
        final Map<Watcher, SelectionKey> watching = new IdentityHashMap<>();
        final Heap<Watcher, Double> timeheap = new Heap<>(Comparator.naturalOrder());
        final Map<Watcher, Object> paused = new IdentityHashMap<>();
+       final Collection<SelectionKey> cancelled = new HashSet<>();
 
        SelectPool(SelectorProvider provider) {
            this.provider = provider;
@@ -59,6 +71,8 @@ public class Driver {
        }
 
        void handle(Watcher w, int evs) {
+           if(!watching.containsKey(w))
+               return;
            try {
                pause(w);
                submit(() -> {
@@ -88,8 +102,8 @@ public class Driver {
            boolean quit = false;
            Throwable error = null;
            try {
+               double now = time();
                while(true) {
-                   double now = time();
                    long timeout = 0;
                    synchronized(selectors) {
                        Double first = timeheap.keypeek();
@@ -99,16 +113,35 @@ public class Driver {
                            return;
                        }
                        if(first != null)
-                           timeout = (long)Math.ceil((first - now) * 1000);
+                           timeout = Math.max((long)Math.ceil((first - now) * 1000), 1);
                    }
+                   Collection<SelectionKey> precancelled;
+                   synchronized(cancelled) {
+                       precancelled = new ArrayList<>(cancelled);
+                   }
+                   if(!precancelled.isEmpty())
+                       timeout = 1;
                    poll.selectedKeys().clear();
                    try {
                        poll.select(timeout);
                    } catch(IOException e) {
                        throw(new RuntimeException(e));
                    }
+                   if(!precancelled.isEmpty()) {
+                       synchronized(cancelled) {
+                           cancelled.removeAll(precancelled);
+                           cancelled.notifyAll();
+                       }
+                   }
                    for(SelectionKey key : poll.selectedKeys())
                        handle((Watcher)key.attachment(), key.readyOps());
+                   now = time();
+                   while(true) {
+                       Double first = timeheap.keypeek();
+                       if((first == null) || (first > now))
+                           break;
+                       handle(timeheap.remove(), 0);
+                   }
                }
            } catch(Throwable t) {
                error = t;
@@ -155,7 +188,7 @@ public class Driver {
            double timeout = w.timeout();
            boolean hastime = timeout < Double.POSITIVE_INFINITY;
            if(evs < 0) {
-               submit(w::close);
+               submit(() -> close(w));
                return;
            }
            w.added(Driver.this);
@@ -173,14 +206,28 @@ public class Driver {
            SelectionKey wc = watching.remove(w);
            Object tc = timeheap.remove(w);
            Object pc = paused.remove(w);
-           if(wc != null)
-               wc.cancel();
+           if(wc != null) {
+               synchronized(cancelled) {
+                   cancelled.add(wc);
+                   wc.cancel();
+                   poll.wakeup();
+                   boolean irq = false;
+                   while(cancelled.contains(wc)) {
+                       try {
+                           cancelled.wait();
+                       } catch(InterruptedException e) {
+                           irq = true;
+                       }
+                   }
+                   if(irq)
+                       Thread.currentThread().interrupt();
+               }
+           }
            if(((wc != null) || (tc != null)) && (pc != null))
                throw(new RuntimeException(w + ": inconsistent internal state"));
            if(wc == null)
                throw(new IllegalStateException(w + ": not registered"));
-           submit(w::close);
-           poll.wakeup();
+           submit(() -> close(w));
        }
 
        void update(Watcher w) {