From: Fredrik Tolf Date: Wed, 6 Feb 2008 04:57:08 +0000 (+0100) Subject: Java: Hopefully working HubListeners. X-Git-Url: http://dolda2000.com/gitweb/?p=doldaconnect.git;a=commitdiff_plain;h=refs%2Fheads%2Fjava Java: Hopefully working HubListeners. --- diff --git a/lib/java/dolda/dolcon/Hub.java b/lib/java/dolda/dolcon/Hub.java index 125f172..b538053 100644 --- a/lib/java/dolda/dolcon/Hub.java +++ b/lib/java/dolda/dolcon/Hub.java @@ -1,12 +1,32 @@ package dolda.dolcon; +import java.util.*; + public class Hub { - int id, numpeers; - String fnet, name, gid; - String state; + int id, numpeers = 0; + final String fnet; + String name = "", gid = ""; + String state = "syn"; + Set ls = new HashSet(); - public Hub(int id) { + public Hub(int id, String fnet) { this.id = id; + this.fnet = fnet.intern(); + } + + public interface Listener { + public void chName(Hub hub); + public void chNumPeers(Hub hub); + public void chState(Hub hub); + } + + public Hub copy() { + Hub ret = new Hub(id, fnet); + ret.numpeers = numpeers; + ret.gid = gid; + ret.state = state; + ret.name = name; + return(ret); } public int getId() { @@ -32,4 +52,14 @@ public class Hub { public String getState() { return(state); } + + public void addListener(Listener ls) { + synchronized(this.ls) { + this.ls.add(ls); + } + } + + public String toString() { + return("Hub (" + id + ", " + fnet + ", \"" + name + "\")"); + } } diff --git a/lib/java/dolda/dolcon/HubManager.java b/lib/java/dolda/dolcon/HubManager.java new file mode 100644 index 0000000..d0e4344 --- /dev/null +++ b/lib/java/dolda/dolcon/HubManager.java @@ -0,0 +1,195 @@ +package dolda.dolcon; + +import java.util.*; +import dolda.dolcon.protocol.*; + +class HubManager implements NotifyListener { + private Set hubls = new HashSet(); + private Set delayed = new HashSet(); + private Map hubs = new TreeMap(); + private String state = "none"; + private Session sess; + + HubManager(Session sess) { + this.sess = sess; + } + + private int atoi(String a) { + return(Integer.parseInt(a)); + } + + private void addall(final HubListener ls) { + for(final Hub hub : hubs.values()) { + sess.dispatch(new Runnable() { + public void run() { + ls.added(hub); + } + }); + } + } + + private void fetchhubs() { + synchronized(this) { + if(state != "none") + return; + state = "fetch"; + } + Command cmd = new Command("lsnodes"); + cmd.new Listener() { + public void done(Response r) { + if(r.code != 200) + return; + for(List line : r.lines) { + Hub h = new Hub(atoi(line.get(0)), line.get(1)); + h.name = line.get(2); + h.numpeers = atoi(line.get(3)); + h.state = new String[] {"syn", "hs", "est", "dead"}[atoi(line.get(4))]; + h.gid = line.get(5); + hubs.put(h.id, h); + } + synchronized(HubManager.this) { + state = ""; + HubManager.this.notifyAll(); + for(HubListener ls : delayed) { + addall(ls); + } + } + } + + public void error(Exception e) { + synchronized(HubManager.this) { + state = "closed"; + } + } + }; + sess.conn.qcmd(new Command("notify", "fn:act", "on"), cmd); + sess.conn.addNotifyListener(this); + } + + public Collection gethubs() throws InterruptedException { + fetchhubs(); + synchronized(this) { + while((state != "") && (state != "closed")) + wait(); + } + Collection ret = new LinkedList(); + synchronized(hubs) { + for(Hub h : hubs.values()) + ret.add(h.copy()); + } + return(ret); + } + + public void addls(HubListener hl, boolean addexisting) { + fetchhubs(); + synchronized(hubls) { + hubls.add(hl); + } + if(addexisting) { + synchronized(this) { + if(state != "") + delayed.add(hl); + else + addall(hl); + } + } + } + + public void rmls(HubListener hl) { + synchronized(sess) { + synchronized(hubls) { + hubls.remove(hl); + if(hubls.isEmpty()) { + synchronized(hubs) { + hubs.clear(); + } + state = "closed"; + sess.conn.removeNotifyListener(this); + sess.hm = null; + } + } + } + } + + public void notified(Response resp) { + synchronized(this) { + if(state != "") + return; + } + if(resp.code == 604) { + final Hub h = new Hub(atoi(resp.token(0, 0)), resp.token(0, 1)); + synchronized(hubs) { + hubs.put(h.id, h); + } + sess.dispatch(new Runnable() { + public void run() { + synchronized(hubls) { + for(HubListener ls : hubls) + ls.added(h); + } + } + }); + } else if(resp.code == 603) { + final Hub h; + synchronized(hubs) { + h = hubs.remove(atoi(resp.token(0, 0))); + } + sess.dispatch(new Runnable() { + public void run() { + synchronized(hubls) { + for(HubListener ls : hubls) + ls.removed(h); + } + } + }); + } else if(resp.code == 601) { + final Hub h; + final String state = new String[] {"syn", "hs", "est", "dead"}[atoi(resp.token(0, 1))]; + synchronized(hubs) { + h = hubs.get(atoi(resp.token(0, 0))); + } + h.state = state; + sess.dispatch(new Runnable() { + public void run() { + synchronized(h.ls) { + for(Hub.Listener ls : h.ls) { + ls.chState(h); + } + } + } + }); + } else if(resp.code == 602) { + final Hub h; + final String name = resp.token(0, 1); + synchronized(hubs) { + h = hubs.get(atoi(resp.token(0, 0))); + } + h.name = name; + sess.dispatch(new Runnable() { + public void run() { + synchronized(h.ls) { + for(Hub.Listener ls : h.ls) { + ls.chName(h); + } + } + } + }); + } else if(resp.code == 605) { + final Hub h; + final int np = atoi(resp.token(0, 1)); + synchronized(hubs) { + h = hubs.get(atoi(resp.token(0, 0))); + } + h.numpeers = np; + sess.dispatch(new Runnable() { + public void run() { + synchronized(h.ls) { + for(Hub.Listener ls : h.ls) { + ls.chNumPeers(h); + } + } + } + }); + } + } +} diff --git a/lib/java/dolda/dolcon/PasswordAuth.java b/lib/java/dolda/dolcon/PasswordAuth.java index 9f62d7a..7c9191c 100644 --- a/lib/java/dolda/dolcon/PasswordAuth.java +++ b/lib/java/dolda/dolcon/PasswordAuth.java @@ -12,7 +12,6 @@ public class PasswordAuth implements Authenticator { } public String handles(List name) { - System.out.println(name); if(name.contains("pam")) return("pam"); return(null); diff --git a/lib/java/dolda/dolcon/Session.java b/lib/java/dolda/dolcon/Session.java index de6ea18..b2ff07b 100644 --- a/lib/java/dolda/dolcon/Session.java +++ b/lib/java/dolda/dolcon/Session.java @@ -3,14 +3,12 @@ package dolda.dolcon; import java.util.*; import dolda.dolcon.protocol.*; -public class Session implements NotifyListener { - private Connection conn; +public class Session { + Connection conn; private String state; - private Set hubls = new HashSet(); private boolean listening = false; - private String[] hubstate = {"none"}; - private String[][] states = {hubstate}; - private Map hubs = new TreeMap(); + private Dispatcher dispatcher; + HubManager hm = null; public Session(String aspec, String username, List auth) throws AuthException, ProtocolException, InterruptedException { state = "connecting"; @@ -24,6 +22,8 @@ public class Session implements NotifyListener { state = "auth"; authenticate(username, auth); state = ""; + dispatcher = new Dispatcher(); + dispatcher.start(); } public Session(String aspec, String username, Authenticator... auth) throws AuthException, ProtocolException, InterruptedException { @@ -41,7 +41,6 @@ public class Session implements NotifyListener { String use = null; Authenticator au = null; for(Authenticator a : auth) { - System.out.println(a); use = a.handles(mechs); if(use != null) { au = a; @@ -67,71 +66,23 @@ public class Session implements NotifyListener { } } - private void checkstates() { - boolean active = false; - for(String[] sp : states) { - if(sp[0] != "none") { - active = true; - break; - } + private HubManager gethm() { + if(hm == null) { + hm = new HubManager(this); } - if(listening && !active) - conn.removeNotifyListener(this); - else if(!listening && active) - conn.addNotifyListener(this); - } - - private int atoi(String a) { - return(Integer.parseInt(a)); - } - - private void fetchhubs() { - synchronized(hubstate) { - if(hubstate[0] != "none") - return; - hubstate[0] = "fetch"; - } - Command cmd = new Command("lsnodes"); - cmd.new Listener() { - public void done(Response r) { - if(r.code != 200) - return; - for(List line : r.lines) { - Hub h = new Hub(atoi(line.get(0))); - h.fnet = line.get(1).intern(); - h.name = line.get(2); - h.numpeers = atoi(line.get(3)); - h.state = new String[] {"syn", "hs", "est", "dead"}[atoi(line.get(4))]; - h.gid = line.get(5); - hubs.put(h.id, h); - } - } - - public void error(Exception e) { - } - }; - conn.qcmd(new Command("notify fn:act on"), cmd); + return(hm); } - public void addHubListener(HubListener hl, boolean addexisting) { - fetchhubs(); - synchronized(hubls) { - hubls.add(hl); - } + public synchronized void addHubListener(HubListener hl, boolean addexisting) { + gethm().addls(hl, addexisting); } - public void removeHubListener(HubListener hl) { - synchronized(hubls) { - hubls.remove(hl); - if(hubls.isEmpty()) { - hubs.clear(); - hubstate[0] = "none"; - checkstates(); - } - } + public synchronized void removeHubListener(HubListener hl) { + gethm().rmls(hl); } - - public void notified(Response resp) { + + public synchronized Collection getHubs() throws InterruptedException { + return(gethm().gethubs()); } public void close() { @@ -142,5 +93,40 @@ public class Session implements NotifyListener { protected void finalize() { if(state != "closed") close(); + dispatcher.interrupt(); + } + + void dispatch(Runnable ev) { + dispatcher.dispatch(ev); + } + + private static class Dispatcher extends Thread { + private Queue q = new LinkedList(); + + private Dispatcher() { + setDaemon(true); + } + + public void dispatch(Runnable ev) { + synchronized(q) { + q.offer(ev); + q.notifyAll(); + } + } + + public void run() { + while(true) { + try { + Runnable r; + synchronized(q) { + while((r = q.poll()) == null) + q.wait(); + } + r.run(); + } catch(Throwable t) { + t.printStackTrace(); + } + } + } } } diff --git a/lib/java/dolda/dolcon/Test.java b/lib/java/dolda/dolcon/Test.java new file mode 100644 index 0000000..ae661fb --- /dev/null +++ b/lib/java/dolda/dolcon/Test.java @@ -0,0 +1,37 @@ +package dolda.dolcon; + +import java.util.*; + +class Test { + public static void main(String[] args) throws Exception { + System.out.print("Password: "); + PasswordAuth auth = new PasswordAuth(new Scanner(System.in).nextLine()); + long st = System.currentTimeMillis(); + Session sess = new Session(args[0], args[1], auth); + sess.addHubListener(new HubListener() { + public void added(Hub h) { + h.addListener(new Hub.Listener() { + public void chState(Hub h) { + System.out.println(h.getId() + ": " + h.getState()); + } + + public void chNumPeers(Hub h) { + System.out.println(h.getId() + ": " + h.getNumPeers()); + } + + public void chName(Hub h) { + System.out.println(h.getId() + ": " + h.getName()); + } + }); + } + + public void removed(Hub h) { + } + }, true); + /* + System.out.println(sess.getHubs()); + sess.close(); + System.out.println(System.currentTimeMillis() - st); + */ + } +}