Java: Hopefully working HubListeners.
[doldaconnect.git] / lib / java / dolda / dolcon / Session.java
index de6ea18..b2ff07b 100644 (file)
@@ -3,14 +3,12 @@ package dolda.dolcon;
 import java.util.*;
 import dolda.dolcon.protocol.*;
 
-public class Session implements NotifyListener {
-    private Connection conn;
+public class Session {
+    Connection conn;
     private String state;
-    private Set<HubListener> hubls = new HashSet<HubListener>();
     private boolean listening = false;
-    private String[] hubstate = {"none"};
-    private String[][] states = {hubstate};
-    private Map<Integer, Hub> hubs = new TreeMap<Integer, Hub>();
+    private Dispatcher dispatcher;
+    HubManager hm = null;
     
     public Session(String aspec, String username, List<Authenticator> auth) throws AuthException, ProtocolException, InterruptedException {
        state = "connecting";
@@ -24,6 +22,8 @@ public class Session implements NotifyListener {
        state = "auth";
        authenticate(username, auth);
        state = "";
+       dispatcher = new Dispatcher();
+       dispatcher.start();
     }
     
     public Session(String aspec, String username, Authenticator... auth) throws AuthException, ProtocolException, InterruptedException {
@@ -41,7 +41,6 @@ public class Session implements NotifyListener {
            String use = null;
            Authenticator au = null;
            for(Authenticator a : auth) {
-               System.out.println(a);
                use = a.handles(mechs);
                if(use != null) {
                    au = a;
@@ -67,71 +66,23 @@ public class Session implements NotifyListener {
        }
     }
     
-    private void checkstates() {
-       boolean active = false;
-       for(String[] sp : states) {
-           if(sp[0] != "none") {
-               active = true;
-               break;
-           }
+    private HubManager gethm() {
+       if(hm == null) {
+           hm = new HubManager(this);
        }
-       if(listening && !active)
-           conn.removeNotifyListener(this);
-       else if(!listening && active)
-           conn.addNotifyListener(this);
-    }
-
-    private int atoi(String a) {
-       return(Integer.parseInt(a));
-    }
-
-    private void fetchhubs() {
-       synchronized(hubstate) {
-           if(hubstate[0] != "none")
-               return;
-           hubstate[0] = "fetch";
-       }
-       Command cmd = new Command("lsnodes");
-       cmd.new Listener() {
-               public void done(Response r) {
-                   if(r.code != 200)
-                       return;
-                   for(List<String> line : r.lines) {
-                       Hub h = new Hub(atoi(line.get(0)));
-                       h.fnet = line.get(1).intern();
-                       h.name = line.get(2);
-                       h.numpeers = atoi(line.get(3));
-                       h.state = new String[] {"syn", "hs", "est", "dead"}[atoi(line.get(4))];
-                       h.gid = line.get(5);
-                       hubs.put(h.id, h);
-                   }
-               }
-               
-               public void error(Exception e) {
-               }
-           };
-       conn.qcmd(new Command("notify fn:act on"), cmd);
+       return(hm);
     }
     
-    public void addHubListener(HubListener hl, boolean addexisting) {
-       fetchhubs();
-       synchronized(hubls) {
-           hubls.add(hl);
-       }
+    public synchronized void addHubListener(HubListener hl, boolean addexisting) {
+       gethm().addls(hl, addexisting);
     }
     
-    public void removeHubListener(HubListener hl) {
-       synchronized(hubls) {
-           hubls.remove(hl);
-           if(hubls.isEmpty()) {
-               hubs.clear();
-               hubstate[0] = "none";
-               checkstates();
-           }
-       }
+    public synchronized void removeHubListener(HubListener hl) {
+       gethm().rmls(hl);
     }
-
-    public void notified(Response resp) {
+    
+    public synchronized Collection<Hub> getHubs() throws InterruptedException {
+       return(gethm().gethubs());
     }
     
     public void close() {
@@ -142,5 +93,40 @@ public class Session implements NotifyListener {
     protected void finalize() {
        if(state != "closed")
            close();
+       dispatcher.interrupt();
+    }
+    
+    void dispatch(Runnable ev) {
+       dispatcher.dispatch(ev);
+    }
+
+    private static class Dispatcher extends Thread {
+       private Queue<Runnable> q = new LinkedList<Runnable>();
+       
+       private Dispatcher() {
+           setDaemon(true);
+       }
+       
+       public void dispatch(Runnable ev) {
+           synchronized(q) {
+               q.offer(ev);
+               q.notifyAll();
+           }
+       }
+       
+       public void run() {
+           while(true) {
+               try {
+                   Runnable r;
+                   synchronized(q) {
+                       while((r = q.poll()) == null)
+                           q.wait();
+                   }
+                   r.run();
+               } catch(Throwable t) {
+                   t.printStackTrace();
+               }
+           }
+       }
     }
 }