Java: Hopefully working HubListeners. java
authorFredrik Tolf <fredrik@dolda2000.com>
Wed, 6 Feb 2008 04:57:08 +0000 (05:57 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Wed, 6 Feb 2008 04:57:08 +0000 (05:57 +0100)
lib/java/dolda/dolcon/Hub.java
lib/java/dolda/dolcon/HubManager.java [new file with mode: 0644]
lib/java/dolda/dolcon/PasswordAuth.java
lib/java/dolda/dolcon/Session.java
lib/java/dolda/dolcon/Test.java [new file with mode: 0644]

index 125f172..b538053 100644 (file)
@@ -1,12 +1,32 @@
 package dolda.dolcon;
 
+import java.util.*;
+
 public class Hub {
-    int id, numpeers;
-    String fnet, name, gid;
-    String state;
+    int id, numpeers = 0;
+    final String fnet;
+    String name = "", gid = "";
+    String state = "syn";
+    Set<Listener> ls = new HashSet<Listener>();
     
-    public Hub(int id) {
+    public Hub(int id, String fnet) {
        this.id = id;
+       this.fnet = fnet.intern();
+    }
+    
+    public interface Listener {
+       public void chName(Hub hub);
+       public void chNumPeers(Hub hub);
+       public void chState(Hub hub);
+    }
+    
+    public Hub copy() {
+       Hub ret = new Hub(id, fnet);
+       ret.numpeers = numpeers;
+       ret.gid = gid;
+       ret.state = state;
+       ret.name = name;
+       return(ret);
     }
     
     public int getId() {
@@ -32,4 +52,14 @@ public class Hub {
     public String getState() {
        return(state);
     }
+    
+    public void addListener(Listener ls) {
+       synchronized(this.ls) {
+           this.ls.add(ls);
+       }
+    }
+    
+    public String toString() {
+       return("Hub (" + id + ", " + fnet + ", \"" + name + "\")");
+    }
 }
diff --git a/lib/java/dolda/dolcon/HubManager.java b/lib/java/dolda/dolcon/HubManager.java
new file mode 100644 (file)
index 0000000..d0e4344
--- /dev/null
@@ -0,0 +1,195 @@
+package dolda.dolcon;
+
+import java.util.*;
+import dolda.dolcon.protocol.*;
+
+class HubManager implements NotifyListener {
+    private Set<HubListener> hubls = new HashSet<HubListener>();
+    private Set<HubListener> delayed = new HashSet<HubListener>();
+    private Map<Integer, Hub> hubs = new TreeMap<Integer, Hub>();
+    private String state = "none";
+    private Session sess;
+    
+    HubManager(Session sess) {
+       this.sess = sess;
+    }
+    
+    private int atoi(String a) {
+       return(Integer.parseInt(a));
+    }
+
+    private void addall(final HubListener ls) {
+       for(final Hub hub : hubs.values()) {
+           sess.dispatch(new Runnable() {
+                   public void run() {
+                       ls.added(hub);
+                   }
+               });
+       }
+    }
+    
+    private void fetchhubs() {
+       synchronized(this) {
+           if(state != "none")
+               return;
+           state = "fetch";
+       }
+       Command cmd = new Command("lsnodes");
+       cmd.new Listener() {
+               public void done(Response r) {
+                   if(r.code != 200)
+                       return;
+                   for(List<String> line : r.lines) {
+                       Hub h = new Hub(atoi(line.get(0)), line.get(1));
+                       h.name = line.get(2);
+                       h.numpeers = atoi(line.get(3));
+                       h.state = new String[] {"syn", "hs", "est", "dead"}[atoi(line.get(4))];
+                       h.gid = line.get(5);
+                       hubs.put(h.id, h);
+                   }
+                   synchronized(HubManager.this) {
+                       state = "";
+                       HubManager.this.notifyAll();
+                       for(HubListener ls : delayed) {
+                           addall(ls);
+                       }
+                   }
+               }
+               
+               public void error(Exception e) {
+                   synchronized(HubManager.this) {
+                       state = "closed";
+                   }
+               }
+           };
+       sess.conn.qcmd(new Command("notify", "fn:act", "on"), cmd);
+       sess.conn.addNotifyListener(this);
+    }
+    
+    public Collection<Hub> gethubs() throws InterruptedException {
+       fetchhubs();
+       synchronized(this) {
+           while((state != "") && (state != "closed"))
+               wait();
+       }
+       Collection<Hub> ret = new LinkedList<Hub>();
+       synchronized(hubs) {
+           for(Hub h : hubs.values())
+               ret.add(h.copy());
+       }
+       return(ret);
+    }
+    
+    public void addls(HubListener hl, boolean addexisting) {
+       fetchhubs();
+       synchronized(hubls) {
+           hubls.add(hl);
+       }
+       if(addexisting) {
+           synchronized(this) {
+               if(state != "")
+                   delayed.add(hl);
+               else
+                   addall(hl);
+           }
+       }
+    }
+    
+    public void rmls(HubListener hl) {
+       synchronized(sess) {
+           synchronized(hubls) {
+               hubls.remove(hl);
+               if(hubls.isEmpty()) {
+                   synchronized(hubs) {
+                       hubs.clear();
+                   }
+                   state = "closed";
+                   sess.conn.removeNotifyListener(this);
+                   sess.hm = null;
+               }
+           }
+       }
+    }
+    
+    public void notified(Response resp) {
+       synchronized(this) {
+           if(state != "")
+               return;
+       }
+       if(resp.code == 604) {
+           final Hub h = new Hub(atoi(resp.token(0, 0)), resp.token(0, 1));
+           synchronized(hubs) {
+               hubs.put(h.id, h);
+           }
+           sess.dispatch(new Runnable() {
+                   public void run() {
+                       synchronized(hubls) {
+                           for(HubListener ls : hubls)
+                               ls.added(h);
+                       }
+                   }
+               });
+       } else if(resp.code == 603) {
+           final Hub h;
+           synchronized(hubs) {
+               h = hubs.remove(atoi(resp.token(0, 0)));
+           }
+           sess.dispatch(new Runnable() {
+                   public void run() {
+                       synchronized(hubls) {
+                           for(HubListener ls : hubls)
+                               ls.removed(h);
+                       }
+                   }
+               });
+       } else if(resp.code == 601) {
+           final Hub h;
+           final String state = new String[] {"syn", "hs", "est", "dead"}[atoi(resp.token(0, 1))];
+           synchronized(hubs) {
+               h = hubs.get(atoi(resp.token(0, 0)));
+           }
+           h.state = state;
+           sess.dispatch(new Runnable() {
+                   public void run() {
+                       synchronized(h.ls) {
+                           for(Hub.Listener ls : h.ls) {
+                               ls.chState(h);
+                           }
+                       }
+                   }
+               });
+       } else if(resp.code == 602) {
+           final Hub h;
+           final String name = resp.token(0, 1);
+           synchronized(hubs) {
+               h = hubs.get(atoi(resp.token(0, 0)));
+           }
+           h.name = name;
+           sess.dispatch(new Runnable() {
+                   public void run() {
+                       synchronized(h.ls) {
+                           for(Hub.Listener ls : h.ls) {
+                               ls.chName(h);
+                           }
+                       }
+                   }
+               });
+       } else if(resp.code == 605) {
+           final Hub h;
+           final int np = atoi(resp.token(0, 1));
+           synchronized(hubs) {
+               h = hubs.get(atoi(resp.token(0, 0)));
+           }
+           h.numpeers = np;
+           sess.dispatch(new Runnable() {
+                   public void run() {
+                       synchronized(h.ls) {
+                           for(Hub.Listener ls : h.ls) {
+                               ls.chNumPeers(h);
+                           }
+                       }
+                   }
+               });
+       }
+    }
+}
index 9f62d7a..7c9191c 100644 (file)
@@ -12,7 +12,6 @@ public class PasswordAuth implements Authenticator {
     }
     
     public String handles(List<String> name) {
-       System.out.println(name);
        if(name.contains("pam"))
            return("pam");
        return(null);
index de6ea18..b2ff07b 100644 (file)
@@ -3,14 +3,12 @@ package dolda.dolcon;
 import java.util.*;
 import dolda.dolcon.protocol.*;
 
-public class Session implements NotifyListener {
-    private Connection conn;
+public class Session {
+    Connection conn;
     private String state;
-    private Set<HubListener> hubls = new HashSet<HubListener>();
     private boolean listening = false;
-    private String[] hubstate = {"none"};
-    private String[][] states = {hubstate};
-    private Map<Integer, Hub> hubs = new TreeMap<Integer, Hub>();
+    private Dispatcher dispatcher;
+    HubManager hm = null;
     
     public Session(String aspec, String username, List<Authenticator> auth) throws AuthException, ProtocolException, InterruptedException {
        state = "connecting";
@@ -24,6 +22,8 @@ public class Session implements NotifyListener {
        state = "auth";
        authenticate(username, auth);
        state = "";
+       dispatcher = new Dispatcher();
+       dispatcher.start();
     }
     
     public Session(String aspec, String username, Authenticator... auth) throws AuthException, ProtocolException, InterruptedException {
@@ -41,7 +41,6 @@ public class Session implements NotifyListener {
            String use = null;
            Authenticator au = null;
            for(Authenticator a : auth) {
-               System.out.println(a);
                use = a.handles(mechs);
                if(use != null) {
                    au = a;
@@ -67,71 +66,23 @@ public class Session implements NotifyListener {
        }
     }
     
-    private void checkstates() {
-       boolean active = false;
-       for(String[] sp : states) {
-           if(sp[0] != "none") {
-               active = true;
-               break;
-           }
+    private HubManager gethm() {
+       if(hm == null) {
+           hm = new HubManager(this);
        }
-       if(listening && !active)
-           conn.removeNotifyListener(this);
-       else if(!listening && active)
-           conn.addNotifyListener(this);
-    }
-
-    private int atoi(String a) {
-       return(Integer.parseInt(a));
-    }
-
-    private void fetchhubs() {
-       synchronized(hubstate) {
-           if(hubstate[0] != "none")
-               return;
-           hubstate[0] = "fetch";
-       }
-       Command cmd = new Command("lsnodes");
-       cmd.new Listener() {
-               public void done(Response r) {
-                   if(r.code != 200)
-                       return;
-                   for(List<String> line : r.lines) {
-                       Hub h = new Hub(atoi(line.get(0)));
-                       h.fnet = line.get(1).intern();
-                       h.name = line.get(2);
-                       h.numpeers = atoi(line.get(3));
-                       h.state = new String[] {"syn", "hs", "est", "dead"}[atoi(line.get(4))];
-                       h.gid = line.get(5);
-                       hubs.put(h.id, h);
-                   }
-               }
-               
-               public void error(Exception e) {
-               }
-           };
-       conn.qcmd(new Command("notify fn:act on"), cmd);
+       return(hm);
     }
     
-    public void addHubListener(HubListener hl, boolean addexisting) {
-       fetchhubs();
-       synchronized(hubls) {
-           hubls.add(hl);
-       }
+    public synchronized void addHubListener(HubListener hl, boolean addexisting) {
+       gethm().addls(hl, addexisting);
     }
     
-    public void removeHubListener(HubListener hl) {
-       synchronized(hubls) {
-           hubls.remove(hl);
-           if(hubls.isEmpty()) {
-               hubs.clear();
-               hubstate[0] = "none";
-               checkstates();
-           }
-       }
+    public synchronized void removeHubListener(HubListener hl) {
+       gethm().rmls(hl);
     }
-
-    public void notified(Response resp) {
+    
+    public synchronized Collection<Hub> getHubs() throws InterruptedException {
+       return(gethm().gethubs());
     }
     
     public void close() {
@@ -142,5 +93,40 @@ public class Session implements NotifyListener {
     protected void finalize() {
        if(state != "closed")
            close();
+       dispatcher.interrupt();
+    }
+    
+    void dispatch(Runnable ev) {
+       dispatcher.dispatch(ev);
+    }
+
+    private static class Dispatcher extends Thread {
+       private Queue<Runnable> q = new LinkedList<Runnable>();
+       
+       private Dispatcher() {
+           setDaemon(true);
+       }
+       
+       public void dispatch(Runnable ev) {
+           synchronized(q) {
+               q.offer(ev);
+               q.notifyAll();
+           }
+       }
+       
+       public void run() {
+           while(true) {
+               try {
+                   Runnable r;
+                   synchronized(q) {
+                       while((r = q.poll()) == null)
+                           q.wait();
+                   }
+                   r.run();
+               } catch(Throwable t) {
+                   t.printStackTrace();
+               }
+           }
+       }
     }
 }
diff --git a/lib/java/dolda/dolcon/Test.java b/lib/java/dolda/dolcon/Test.java
new file mode 100644 (file)
index 0000000..ae661fb
--- /dev/null
@@ -0,0 +1,37 @@
+package dolda.dolcon;
+
+import java.util.*;
+
+class Test {
+    public static void main(String[] args) throws Exception {
+       System.out.print("Password: ");
+       PasswordAuth auth = new PasswordAuth(new Scanner(System.in).nextLine());
+       long st = System.currentTimeMillis();
+       Session sess = new Session(args[0], args[1], auth);
+       sess.addHubListener(new HubListener() {
+               public void added(Hub h) {
+                   h.addListener(new Hub.Listener() {
+                           public void chState(Hub h) {
+                               System.out.println(h.getId() + ": " + h.getState());
+                           }
+                           
+                           public void chNumPeers(Hub h) {
+                               System.out.println(h.getId() + ": " + h.getNumPeers());
+                           }
+                           
+                           public void chName(Hub h) {
+                               System.out.println(h.getId() + ": " + h.getName());
+                           }
+                       });
+               }
+               
+               public void removed(Hub h) {
+               }
+           }, true);
+       /*
+       System.out.println(sess.getHubs());
+       sess.close();
+       System.out.println(System.currentTimeMillis() - st);
+       */
+    }
+}