X-Git-Url: http://dolda2000.com/gitweb/?a=blobdiff_plain;f=lib%2Fjava%2Fdolda%2Fdolcon%2FHubManager.java;fp=lib%2Fjava%2Fdolda%2Fdolcon%2FHubManager.java;h=d0e4344ff8c1a56c31e6601959332aecb02b0d36;hb=7ff32e0f0a14400336fbbbf28401a9dab3f35192;hp=0000000000000000000000000000000000000000;hpb=e90ca845da9e7104c8c1cf88964bdc1880561e44;p=doldaconnect.git diff --git a/lib/java/dolda/dolcon/HubManager.java b/lib/java/dolda/dolcon/HubManager.java new file mode 100644 index 0000000..d0e4344 --- /dev/null +++ b/lib/java/dolda/dolcon/HubManager.java @@ -0,0 +1,195 @@ +package dolda.dolcon; + +import java.util.*; +import dolda.dolcon.protocol.*; + +class HubManager implements NotifyListener { + private Set hubls = new HashSet(); + private Set delayed = new HashSet(); + private Map hubs = new TreeMap(); + private String state = "none"; + private Session sess; + + HubManager(Session sess) { + this.sess = sess; + } + + private int atoi(String a) { + return(Integer.parseInt(a)); + } + + private void addall(final HubListener ls) { + for(final Hub hub : hubs.values()) { + sess.dispatch(new Runnable() { + public void run() { + ls.added(hub); + } + }); + } + } + + private void fetchhubs() { + synchronized(this) { + if(state != "none") + return; + state = "fetch"; + } + Command cmd = new Command("lsnodes"); + cmd.new Listener() { + public void done(Response r) { + if(r.code != 200) + return; + for(List line : r.lines) { + Hub h = new Hub(atoi(line.get(0)), line.get(1)); + h.name = line.get(2); + h.numpeers = atoi(line.get(3)); + h.state = new String[] {"syn", "hs", "est", "dead"}[atoi(line.get(4))]; + h.gid = line.get(5); + hubs.put(h.id, h); + } + synchronized(HubManager.this) { + state = ""; + HubManager.this.notifyAll(); + for(HubListener ls : delayed) { + addall(ls); + } + } + } + + public void error(Exception e) { + synchronized(HubManager.this) { + state = "closed"; + } + } + }; + sess.conn.qcmd(new Command("notify", "fn:act", "on"), cmd); + sess.conn.addNotifyListener(this); + } + + public Collection gethubs() throws InterruptedException { + fetchhubs(); + synchronized(this) { + while((state != "") && (state != "closed")) + wait(); + } + Collection ret = new LinkedList(); + synchronized(hubs) { + for(Hub h : hubs.values()) + ret.add(h.copy()); + } + return(ret); + } + + public void addls(HubListener hl, boolean addexisting) { + fetchhubs(); + synchronized(hubls) { + hubls.add(hl); + } + if(addexisting) { + synchronized(this) { + if(state != "") + delayed.add(hl); + else + addall(hl); + } + } + } + + public void rmls(HubListener hl) { + synchronized(sess) { + synchronized(hubls) { + hubls.remove(hl); + if(hubls.isEmpty()) { + synchronized(hubs) { + hubs.clear(); + } + state = "closed"; + sess.conn.removeNotifyListener(this); + sess.hm = null; + } + } + } + } + + public void notified(Response resp) { + synchronized(this) { + if(state != "") + return; + } + if(resp.code == 604) { + final Hub h = new Hub(atoi(resp.token(0, 0)), resp.token(0, 1)); + synchronized(hubs) { + hubs.put(h.id, h); + } + sess.dispatch(new Runnable() { + public void run() { + synchronized(hubls) { + for(HubListener ls : hubls) + ls.added(h); + } + } + }); + } else if(resp.code == 603) { + final Hub h; + synchronized(hubs) { + h = hubs.remove(atoi(resp.token(0, 0))); + } + sess.dispatch(new Runnable() { + public void run() { + synchronized(hubls) { + for(HubListener ls : hubls) + ls.removed(h); + } + } + }); + } else if(resp.code == 601) { + final Hub h; + final String state = new String[] {"syn", "hs", "est", "dead"}[atoi(resp.token(0, 1))]; + synchronized(hubs) { + h = hubs.get(atoi(resp.token(0, 0))); + } + h.state = state; + sess.dispatch(new Runnable() { + public void run() { + synchronized(h.ls) { + for(Hub.Listener ls : h.ls) { + ls.chState(h); + } + } + } + }); + } else if(resp.code == 602) { + final Hub h; + final String name = resp.token(0, 1); + synchronized(hubs) { + h = hubs.get(atoi(resp.token(0, 0))); + } + h.name = name; + sess.dispatch(new Runnable() { + public void run() { + synchronized(h.ls) { + for(Hub.Listener ls : h.ls) { + ls.chName(h); + } + } + } + }); + } else if(resp.code == 605) { + final Hub h; + final int np = atoi(resp.token(0, 1)); + synchronized(hubs) { + h = hubs.get(atoi(resp.token(0, 0))); + } + h.numpeers = np; + sess.dispatch(new Runnable() { + public void run() { + synchronized(h.ls) { + for(Hub.Listener ls : h.ls) { + ls.chNumPeers(h); + } + } + } + }); + } + } +}