import java.util.*;
import dolda.dolcon.protocol.*;
-public class Session implements NotifyListener {
- private Connection conn;
+public class Session {
+ Connection conn;
private String state;
- private Set<HubListener> hubls = new HashSet<HubListener>();
private boolean listening = false;
- private String[] hubstate = {"none"};
- private String[][] states = {hubstate};
- private Map<Integer, Hub> hubs = new TreeMap<Integer, Hub>();
+ private Dispatcher dispatcher;
+ HubManager hm = null;
public Session(String aspec, String username, List<Authenticator> auth) throws AuthException, ProtocolException, InterruptedException {
state = "connecting";
state = "auth";
authenticate(username, auth);
state = "";
+ dispatcher = new Dispatcher();
+ dispatcher.start();
}
public Session(String aspec, String username, Authenticator... auth) throws AuthException, ProtocolException, InterruptedException {
String use = null;
Authenticator au = null;
for(Authenticator a : auth) {
- System.out.println(a);
use = a.handles(mechs);
if(use != null) {
au = a;
}
}
- private void checkstates() {
- boolean active = false;
- for(String[] sp : states) {
- if(sp[0] != "none") {
- active = true;
- break;
- }
+ private HubManager gethm() {
+ if(hm == null) {
+ hm = new HubManager(this);
}
- if(listening && !active)
- conn.removeNotifyListener(this);
- else if(!listening && active)
- conn.addNotifyListener(this);
- }
-
- private int atoi(String a) {
- return(Integer.parseInt(a));
- }
-
- private void fetchhubs() {
- synchronized(hubstate) {
- if(hubstate[0] != "none")
- return;
- hubstate[0] = "fetch";
- }
- Command cmd = new Command("lsnodes");
- cmd.new Listener() {
- public void done(Response r) {
- if(r.code != 200)
- return;
- for(List<String> line : r.lines) {
- Hub h = new Hub(atoi(line.get(0)));
- h.fnet = line.get(1).intern();
- h.name = line.get(2);
- h.numpeers = atoi(line.get(3));
- h.state = new String[] {"syn", "hs", "est", "dead"}[atoi(line.get(4))];
- h.gid = line.get(5);
- hubs.put(h.id, h);
- }
- }
-
- public void error(Exception e) {
- }
- };
- conn.qcmd(new Command("notify fn:act on"), cmd);
+ return(hm);
}
- public void addHubListener(HubListener hl, boolean addexisting) {
- fetchhubs();
- synchronized(hubls) {
- hubls.add(hl);
- }
+ public synchronized void addHubListener(HubListener hl, boolean addexisting) {
+ gethm().addls(hl, addexisting);
}
- public void removeHubListener(HubListener hl) {
- synchronized(hubls) {
- hubls.remove(hl);
- if(hubls.isEmpty()) {
- hubs.clear();
- hubstate[0] = "none";
- checkstates();
- }
- }
+ public synchronized void removeHubListener(HubListener hl) {
+ gethm().rmls(hl);
}
-
- public void notified(Response resp) {
+
+ public synchronized Collection<Hub> getHubs() throws InterruptedException {
+ return(gethm().gethubs());
}
public void close() {
protected void finalize() {
if(state != "closed")
close();
+ dispatcher.interrupt();
+ }
+
+ void dispatch(Runnable ev) {
+ dispatcher.dispatch(ev);
+ }
+
+ private static class Dispatcher extends Thread {
+ private Queue<Runnable> q = new LinkedList<Runnable>();
+
+ private Dispatcher() {
+ setDaemon(true);
+ }
+
+ public void dispatch(Runnable ev) {
+ synchronized(q) {
+ q.offer(ev);
+ q.notifyAll();
+ }
+ }
+
+ public void run() {
+ while(true) {
+ try {
+ Runnable r;
+ synchronized(q) {
+ while((r = q.poll()) == null)
+ q.wait();
+ }
+ r.run();
+ } catch(Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ }
}
}