X-Git-Url: http://dolda2000.com/gitweb/?a=blobdiff_plain;f=src%2Fjagi%2Fevent%2FDriver.java;h=8fe83a7e4b55bc7011bad467c7957f0d984e1c0a;hb=1ee6412bc01b15aa8fad07a8bbce694ca099a8cb;hp=84238d2c9578b4c3dab92df2ea862bd798b1a07c;hpb=2c1781f3f6ef1f48af308b2a28ede6ae5c411d9a;p=jagi.git diff --git a/src/jagi/event/Driver.java b/src/jagi/event/Driver.java index 84238d2..8fe83a7 100644 --- a/src/jagi/event/Driver.java +++ b/src/jagi/event/Driver.java @@ -26,7 +26,18 @@ public class Driver { current.set(this); w.handle(evs); } catch(Throwable t) { - error(w, t); + error(w, t, "handling event"); + } finally { + current.remove(); + } + } + + protected void close(Watcher w) { + try { + current.set(this); + w.close(); + } catch(Throwable t) { + error(w, t, "closing"); } finally { current.remove(); } @@ -36,8 +47,8 @@ public class Driver { worker.submit(task); } - protected void error(Watcher w, Throwable t) { - hlog.log(Level.WARNING, w + ": uncaught error when handling event", t); + protected void error(Watcher w, Throwable t, String thing) { + hlog.log(Level.WARNING, w + ": uncaught error when " + thing, t); remove(w); } @@ -47,6 +58,7 @@ public class Driver { final Map watching = new IdentityHashMap<>(); final Heap timeheap = new Heap<>(Comparator.naturalOrder()); final Map paused = new IdentityHashMap<>(); + final Collection cancelled = new HashSet<>(); SelectPool(SelectorProvider provider) { this.provider = provider; @@ -59,6 +71,8 @@ public class Driver { } void handle(Watcher w, int evs) { + if(!watching.containsKey(w)) + return; try { pause(w); submit(() -> { @@ -88,8 +102,8 @@ public class Driver { boolean quit = false; Throwable error = null; try { + double now = time(); while(true) { - double now = time(); long timeout = 0; synchronized(selectors) { Double first = timeheap.keypeek(); @@ -99,16 +113,35 @@ public class Driver { return; } if(first != null) - timeout = (long)Math.ceil((first - now) * 1000); + timeout = Math.max((long)Math.ceil((first - now) * 1000), 1); } + Collection precancelled; + synchronized(cancelled) { + precancelled = new ArrayList<>(cancelled); + } + if(!precancelled.isEmpty()) + timeout = 1; poll.selectedKeys().clear(); try { poll.select(timeout); } catch(IOException e) { throw(new RuntimeException(e)); } + if(!precancelled.isEmpty()) { + synchronized(cancelled) { + cancelled.removeAll(precancelled); + cancelled.notifyAll(); + } + } for(SelectionKey key : poll.selectedKeys()) handle((Watcher)key.attachment(), key.readyOps()); + now = time(); + while(true) { + Double first = timeheap.keypeek(); + if((first == null) || (first > now)) + break; + handle(timeheap.remove(), 0); + } } } catch(Throwable t) { error = t; @@ -151,11 +184,16 @@ public class Driver { void add(Watcher w, SelectableChannel ch) { if(watching.containsKey(w) || paused.containsKey(w) || timeheap.contains(w)) throw(new IllegalStateException(w + ": already registered")); + try { + ch.configureBlocking(false); + } catch(IOException e) { + throw(new RuntimeException(ch + ": could not make non-blocking", e)); + } int evs = w.events(); double timeout = w.timeout(); boolean hastime = timeout < Double.POSITIVE_INFINITY; if(evs < 0) { - submit(w::close); + submit(() -> close(w)); return; } w.added(Driver.this); @@ -173,14 +211,28 @@ public class Driver { SelectionKey wc = watching.remove(w); Object tc = timeheap.remove(w); Object pc = paused.remove(w); - if(wc != null) - wc.cancel(); + if(wc != null) { + synchronized(cancelled) { + cancelled.add(wc); + wc.cancel(); + poll.wakeup(); + boolean irq = false; + while(cancelled.contains(wc)) { + try { + cancelled.wait(); + } catch(InterruptedException e) { + irq = true; + } + } + if(irq) + Thread.currentThread().interrupt(); + } + } if(((wc != null) || (tc != null)) && (pc != null)) throw(new RuntimeException(w + ": inconsistent internal state")); if(wc == null) throw(new IllegalStateException(w + ": not registered")); - submit(w::close); - poll.wakeup(); + submit(() -> close(w)); } void update(Watcher w) {