private String aspec;
private String state;
private Set<ConnectListener> connls = new HashSet<ConnectListener>();
+ private Set<NotifyListener> notls = new HashSet<NotifyListener>();
private Exception error;
public interface ConnectListener {
public void connected() throws Exception;
public void error(Exception cause);
}
-
+
public Connection(String aspec) {
this.aspec = aspec;
state = "idle";
}
public void connect() throws ConnectException {
- state = "connecting";
+ synchronized(this) {
+ if(state != "idle")
+ throw(new IllegalStateException("Already connected"));
+ state = "connecting";
+ }
try {
s = new Socket(aspec, 1500);
} catch(java.net.UnknownHostException e) {
}
pending = new LinkedList<Command>();
Command ccmd = new Command(".connect");
- ccmd.addListener(new Command.Listener() {
+ ccmd.new Listener() {
public void done(Response resp) throws Exception {
try {
checkver(resp);
error(e);
throw(e);
}
- synchronized(connls) {
+ synchronized(Connection.this) {
state = "connected";
+ }
+ synchronized(connls) {
try {
for(ConnectListener l : connls)
l.connected();
}
}
}
- });
- pending.offer(ccmd);
- reader = new Reader(s, pending);
- writer = new Writer(s, queue, pending);
- Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
- public void uncaughtException(Thread t, Throwable c) {
- boolean n = false;
- if(c instanceof StopCondition) {
- StopCondition s = (StopCondition)c;
- n = s.normal;
- c = s.getCause();
- }
- Exception e;
- if(c instanceof Exception)
- e = (Exception)c;
- else
- e = new Exception(c);
- if(!n) {
- close();
- error = e;
- }
- synchronized(pending) {
- Command cmd;
- while((cmd = pending.poll()) != null) {
- cmd.error(e);
- }
- }
- synchronized(queue) {
- Command cmd;
- while((cmd = queue.poll()) != null) {
- cmd.error(e);
- }
- }
- }
};
- reader.setUncaughtExceptionHandler(h);
- writer.setUncaughtExceptionHandler(h);
+ pending.offer(ccmd);
+ reader = new Reader();
+ writer = new Writer();
reader.start();
writer.start();
}
+ private void error(Throwable c) {
+ boolean n = false;
+ if(c instanceof StopCondition) {
+ StopCondition s = (StopCondition)c;
+ n = s.normal;
+ c = s.getCause();
+ }
+ Exception e;
+ if(c instanceof Exception)
+ e = (Exception)c;
+ else
+ e = new Exception(c);
+ if(!n) {
+ close();
+ error = e;
+ }
+ synchronized(queue) {
+ Command cmd;
+ while((cmd = pending.poll()) != null) {
+ cmd.error(e);
+ }
+ while((cmd = queue.poll()) != null) {
+ cmd.error(e);
+ }
+ }
+ }
+
private void checkthread() {
if(Thread.currentThread() == reader)
throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
}
- public void syncConnect() throws ConnectException, ClosedException, InterruptedException {
+ public void syncConnect() throws ConnectException, InterruptedException {
checkthread();
final boolean[] donep = new boolean[] {false};
final Exception[] errp = new Exception[] {null};
}
}
if(errp[0] != null)
- throw(new ClosedException(errp[0]));
+ throw(new ConnectException("DC connection has been closed", errp[0]));
}
public void expectVersion(int reqver) {
return(error);
}
- public void addConnectListener(ConnectListener l) {
+ public void addNotifyListener(NotifyListener l) {
+ synchronized(notls) {
+ notls.add(l);
+ }
+ }
+
+ public void removeNotifyListener(NotifyListener l) {
+ synchronized(notls) {
+ notls.remove(l);
+ }
+ }
+
+ public synchronized void addConnectListener(ConnectListener l) {
+ if((state != "idle") && (state != "connecting"))
+ throw(new IllegalStateException("Already connected"));
synchronized(connls) {
- if((state != "idle") && (state != "connecting"))
- throw(new IllegalStateException("Already connected"));
connls.add(l);
}
}
- private void qcmd(Command cmd) {
+ public void qcmd(Command... cmds) {
synchronized(queue) {
- queue.offer(cmd);
+ for(Command cmd : cmds)
+ queue.offer(cmd);
queue.notifyAll();
}
}
+ public void qcmd(String... tokens) {
+ qcmd(new Command(tokens));
+ }
+
+ public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
+ checkthread();
+ final boolean[] donep = new boolean[] {false};
+ final Response[] resp = new Response[] {null};
+ final Exception[] errp = new Exception[] {null};
+ Object l = cmd.new Listener() {
+ public synchronized void done(Response rsp) {
+ resp[0] = rsp;
+ donep[0] = true;
+ notifyAll();
+ }
+
+ public synchronized void error(Exception e) {
+ errp[0] = e;
+ donep[0] = true;
+ notifyAll();
+ }
+ };
+ qcmd(cmd);
+ synchronized(l) {
+ while(!donep[0]) {
+ l.wait();
+ }
+ }
+ if(errp[0] != null)
+ throw(new ClosedException(errp[0]));
+ return(resp[0]);
+ }
+
+ public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
+ return(ecmd(new Command(tokens)));
+ }
+
static private class StopCondition extends Error {
final boolean normal;
}
}
- static private class Writer extends Thread {
- Socket s;
- Queue<Command> queue, pending;
-
- public Writer(Socket s, Queue<Command> queue, Queue<Command> pending) {
- this.s = s;
- this.queue = queue;
- this.pending = pending;
+ private class Writer extends Thread {
+ public Writer() {
setDaemon(true);
}
return(sb.toString());
}
- public void run() {
+ private void guarded() {
try {
java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
while(true) {
Command cmd;
try {
- synchronized(pending) {
- while(pending.size() > 0)
- pending.wait();
- }
synchronized(queue) {
- do {
- if((cmd = queue.poll()) != null)
- break;
+ while(pending.size() > 0)
queue.wait();
- } while(true);
+ while((cmd = queue.poll()) == null)
+ queue.wait();
+ pending.offer(cmd);
}
} catch(InterruptedException e) {
throw(new StopCondition(e, true));
out.append(' ');
out.append(quote(s));
}
+ out.append("\r\n");
w.write(out.toString());
+ w.flush();
}
} catch(IOException e) {
throw(new StopCondition(e, false));
}
}
- }
-
- static private class Reader extends Thread {
- Socket s;
- Queue<Command> pending;
- public Reader(Socket s, Queue<Command> pending) {
- this.s = s;
- this.pending = pending;
+ public void run() {
+ try {
+ guarded();
+ } catch(Throwable t) {
+ error(t);
+ }
}
-
+ }
+
+ private class Reader extends Thread {
private void dispatch(Response resp) throws Exception {
if(resp.code < 600) {
- synchronized(pending) {
- resp.cmd = pending.remove();
- pending.notifyAll();
+ synchronized(queue) {
+ try {
+ resp.cmd = pending.remove();
+ } catch(NoSuchElementException e) {
+ throw(new RuntimeException("DC server sent reply without a pending command"));
+ }
+ queue.notifyAll();
}
resp.cmd.done(resp);
+ } else {
+ synchronized(notls) {
+ for(NotifyListener l : notls) {
+ l.notified(resp);
+ }
+ }
}
}
- public void run() {
+ private void guarded() {
try {
java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
String state = "start";
code = Integer.parseInt(ct.toString());
ct.setLength(0);
state = "start";
- continue eat;
} else {
ct.append(c);
}
throw(new StopCondition(e, false));
}
}
+
+ public void run() {
+ try {
+ guarded();
+ } catch(Throwable t) {
+ error(t);
+ }
+ }
}
public void close() {
try {
s.close();
- } catch(IOException e) {
- }
+ } catch(IOException e) {}
reader.interrupt();
writer.interrupt();
}
-
- protected void finalize() {
- close();
- }
}