From: Fredrik Tolf Date: Sun, 18 Nov 2007 04:05:58 +0000 (+0100) Subject: More Java work. X-Git-Url: http://dolda2000.com/gitweb/?p=doldaconnect.git;a=commitdiff_plain;h=e78d9ca3a7ae1bcf61849b02c816d6659a477657 More Java work. --- diff --git a/lib/java/dolda/dolcon/ClosedException.java b/lib/java/dolda/dolcon/ClosedException.java new file mode 100644 index 0000000..e1944c0 --- /dev/null +++ b/lib/java/dolda/dolcon/ClosedException.java @@ -0,0 +1,11 @@ +package dolda.dolcon; + +public class ClosedException extends Exception { + public ClosedException(Throwable cause) { + super("DC connection has been closed", cause); + } + + public ClosedException() { + this(null); + } +} diff --git a/lib/java/dolda/dolcon/Command.java b/lib/java/dolda/dolcon/Command.java new file mode 100644 index 0000000..fa72357 --- /dev/null +++ b/lib/java/dolda/dolcon/Command.java @@ -0,0 +1,37 @@ +package dolda.dolcon; + +import java.util.*; + +public class Command { + List tokens; + Set listeners = new HashSet(); + Response resp; + + public interface Listener { + public void done(Response resp) throws Exception; + public void error(Exception cause); + } + + public Command(List tokens) { + this.tokens = tokens; + } + + public Command(String... tokens) { + this(Arrays.asList(tokens)); + } + + public void addListener(Listener l) { + listeners.add(l); + } + + public void done(Response resp) throws Exception { + this.resp = resp; + for(Listener l : listeners) + l.done(resp); + } + + public void error(Exception cause) { + for(Listener l : listeners) + l.error(cause); + } +} diff --git a/lib/java/dolda/dolcon/Connection.java b/lib/java/dolda/dolcon/Connection.java index 0d85f76..b31151e 100644 --- a/lib/java/dolda/dolcon/Connection.java +++ b/lib/java/dolda/dolcon/Connection.java @@ -5,11 +5,29 @@ import java.net.Socket; import java.util.*; public class Connection { - Socket s; - Reader reader; - LinkedList resps = new LinkedList(); + private Socket s; + private Reader reader; + private Writer writer; + private Queue queue = new LinkedList(); + private Queue pending = new LinkedList(); + private int reqver = 2, revlo, revhi; + private String aspec; + private String state; + private Set connls = new HashSet(); + private Exception error; - public Connection(String aspec) throws ConnectException { + public interface ConnectListener { + public void connected() throws Exception; + public void error(Exception cause); + } + + public Connection(String aspec) { + this.aspec = aspec; + state = "idle"; + } + + public void connect() throws ConnectException { + state = "connecting"; try { s = new Socket(aspec, 1500); } catch(java.net.UnknownHostException e) { @@ -17,157 +35,352 @@ public class Connection { } catch(IOException e) { throw(new ConnectException("Could not connect to host " + aspec, e)); } - reader = new Reader(s, resps); + pending = new LinkedList(); + Command ccmd = new Command(".connect"); + ccmd.addListener(new Command.Listener() { + public void done(Response resp) throws Exception { + try { + checkver(resp); + } catch(VersionException e) { + error(e); + throw(e); + } + synchronized(connls) { + state = "connected"; + try { + for(ConnectListener l : connls) + l.connected(); + } finally { + connls.clear(); + } + } + } + + public void error(Exception cause) { + synchronized(connls) { + try { + for(ConnectListener l : connls) + l.error(cause); + } finally { + connls.clear(); + } + } + } + }); + pending.offer(ccmd); + reader = new Reader(s, pending); + writer = new Writer(s, queue, pending); + Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable c) { + boolean n = false; + if(c instanceof StopCondition) { + StopCondition s = (StopCondition)c; + n = s.normal; + c = s.getCause(); + } + Exception e; + if(c instanceof Exception) + e = (Exception)c; + else + e = new Exception(c); + if(!n) { + close(); + error = e; + } + synchronized(pending) { + Command cmd; + while((cmd = pending.poll()) != null) { + cmd.error(e); + } + } + synchronized(queue) { + Command cmd; + while((cmd = queue.poll()) != null) { + cmd.error(e); + } + } + } + }; + reader.setUncaughtExceptionHandler(h); + writer.setUncaughtExceptionHandler(h); reader.start(); + writer.start(); } - static private class Reader extends Thread { - Exception error = null; + private void checkthread() { + if(Thread.currentThread() == reader) + throw(new RuntimeException("Cannot call synchronous method with dispatch thread!")); + } + + public void syncConnect() throws ConnectException, ClosedException, InterruptedException { + checkthread(); + final boolean[] donep = new boolean[] {false}; + final Exception[] errp = new Exception[] {null}; + ConnectListener l = new ConnectListener() { + public void connected() { + donep[0] = true; + synchronized(this) { + notifyAll(); + } + } + + public void error(Exception cause) { + donep[0] = true; + errp[0] = cause; + synchronized(this) { + notifyAll(); + } + } + }; + addConnectListener(l); + connect(); + while(!donep[0]) { + synchronized(l) { + l.wait(); + } + } + if(errp[0] != null) + throw(new ClosedException(errp[0])); + } + + public void expectVersion(int reqver) { + this.reqver = reqver; + } + + private void checkver(Response resp) throws VersionException { + revlo = Integer.parseInt(resp.token(0, 0)); + revhi = Integer.parseInt(resp.token(0, 1)); + if((reqver < revlo) || (reqver > revhi)) + throw(new VersionException(reqver, revlo, revhi)); + } + + public Exception join() throws InterruptedException { + while(reader.isAlive()) { + reader.join(); + } + close(); + return(error); + } + + public void addConnectListener(ConnectListener l) { + synchronized(connls) { + if((state != "idle") && (state != "connecting")) + throw(new IllegalStateException("Already connected")); + connls.add(l); + } + } + + private void qcmd(Command cmd) { + synchronized(queue) { + queue.offer(cmd); + queue.notifyAll(); + } + } + + static private class StopCondition extends Error { + final boolean normal; + + public StopCondition(Exception cause, boolean normal) { + super(cause); + this.normal = normal; + } + } + + static private class Writer extends Thread { Socket s; - Collection resps; + Queue queue, pending; - public Reader(Socket s, Collection resps) { + public Writer(Socket s, Queue queue, Queue pending) { this.s = s; - this.resps = resps; + this.queue = queue; + this.pending = pending; setDaemon(true); } - public void run() { - java.io.Reader r; - try { - r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8")); - } catch(IOException e) { - synchronized(resps) { - resps.notifyAll(); - error = e; + private String quote(String t) { + if(t.length() == 0) + return("\"\""); + StringBuilder sb = new StringBuilder(); + boolean quote = false; + for(int i = 0; i < t.length(); i++) { + char c = t.charAt(i); + if(c == '\"') { + sb.append("\\\""); + } else if(Character.isWhitespace(c)) { + quote = true; + sb.append(c); + } else { + sb.append(c); } - return; } - String state = "start"; - StringBuilder ct = new StringBuilder(); - int code = -1; - boolean last = true; - List>lines = new LinkedList>(); - Listtokens = new LinkedList(); - while(true) { - char c; - { - int i; + if(quote) + return("\"" + sb.toString() + "\""); + else + return(sb.toString()); + } + + public void run() { + try { + java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8"); + while(true) { + Command cmd; try { - if((i = r.read()) < 0) { - throw(new IOException("The server closed the connection")); + synchronized(pending) { + while(pending.size() > 0) + pending.wait(); } - } catch(IOException e) { - synchronized(resps) { - resps.notifyAll(); - error = e; + synchronized(queue) { + do { + if((cmd = queue.poll()) != null) + break; + queue.wait(); + } while(true); } - return; + } catch(InterruptedException e) { + throw(new StopCondition(e, true)); } - c = (char)i; + StringBuilder out = new StringBuilder(); + for(String s : cmd.tokens) { + if(out.length() > 0) + out.append(' '); + out.append(quote(s)); + } + w.write(out.toString()); } - eat: do { - if(state == "start") { - if(c == '\r') { - state = "nl"; - } else if(Character.isWhitespace(c)) { - } else { - if(code == -1) - state = "code"; - else - state = "token"; - continue eat; + } catch(IOException e) { + throw(new StopCondition(e, false)); + } + } + } + + static private class Reader extends Thread { + Socket s; + Queue pending; + + public Reader(Socket s, Queue pending) { + this.s = s; + this.pending = pending; + } + + private void dispatch(Response resp) throws Exception { + if(resp.code < 600) { + synchronized(pending) { + resp.cmd = pending.remove(); + pending.notifyAll(); + } + resp.cmd.done(resp); + } + } + + public void run() { + try { + java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8")); + String state = "start"; + StringBuilder ct = new StringBuilder(); + int code = -1; + boolean last = true; + List> lines = new LinkedList>(); + List tokens = new LinkedList(); + while(true) { + char c; + { + int i; + try { + if((i = r.read()) < 0) { + throw(new IOException("The server closed the connection")); + } + } catch(java.nio.channels.ClosedByInterruptException e) { + throw(new StopCondition(e, true)); } - } else if(state == "nl") { - if(c == '\n') { - if(code == -1) { - synchronized(resps) { - resps.notifyAll(); - try { - throw(new IOException("Illegal response code " + code + " from the server")); - } catch(IOException e) { - error = e; - } - } - return; + c = (char)i; + } + eat: do { + if(state == "start") { + if(c == '\r') { + state = "nl"; + } else if(Character.isWhitespace(c)) { + } else { + if(code == -1) + state = "code"; + else + state = "token"; + continue eat; } - lines.add(tokens); - tokens = new LinkedList(); - if(last) { - synchronized(resps) { - resps.add(new Response(code, lines)); - resps.notifyAll(); + } else if(state == "nl") { + if(c == '\n') { + if((code < 100) || (code >= 1000)) { + throw(new IOException("Illegal response code " + code + " from the server")); } - lines = new LinkedList>(); + lines.add(tokens); + tokens = new LinkedList(); + if(last) { + dispatch(new Response(code, lines)); + lines = new LinkedList>(); + } + code = -1; + state = "start"; + } else { + state = "start"; + continue eat; } - state = "start"; - } else { - state = "start"; - continue eat; - } - } else if(state == "code") { - if((c == '-') || Character.isWhitespace(c)) { - last = c != '-'; - code = Integer.parseInt(ct.toString()); - ct.setLength(0); - state = "start"; - continue eat; - } else { + } else if(state == "code") { + if((c == '-') || Character.isWhitespace(c)) { + last = c != '-'; + code = Integer.parseInt(ct.toString()); + ct.setLength(0); + state = "start"; + continue eat; + } else { + ct.append(c); + } + } else if(state == "token") { + if(Character.isWhitespace(c)) { + tokens.add(ct.toString()); + ct.setLength(0); + state = "start"; + continue eat; + } else if(c == '\\') { + state = "bs"; + } else if(c == '"') { + state = "cited"; + } else { + ct.append(c); + } + } else if(state == "bs") { + ct.append(c); + state = "token"; + } else if(state == "cited") { + if(c == '\\') + state = "cbs"; + else if(c == '"') + state = "token"; + else + ct.append(c); + } else if(state == "cbs") { ct.append(c); - } - } else if(state == "token") { - if(Character.isWhitespace(c)) { - tokens.add(ct.toString()); - ct.setLength(0); - state = "start"; - code = -1; - continue eat; - } else if(c == '\\') { - state = "bs"; - } else if(c == '"') { state = "cited"; } else { - ct.append(c); + throw(new Error("invalid state " + state)); } - } else if(state == "bs") { - ct.append(c); - state = "token"; - } else if(state == "cited") { - if(c == '\\') - state = "cbs"; - else if(c == '"') - state = "token"; - else - ct.append(c); - } else if(state == "cbs") { - ct.append(c); - state = "cited"; - } else { - throw(new Error("invalid state " + state)); - } - break; - } while(true); + break; + } while(true); + } + } catch(Exception e) { + throw(new StopCondition(e, false)); } } } - protected void finalize() { + public void close() { try { s.close(); } catch(IOException e) { } reader.interrupt(); + writer.interrupt(); } - public static void main(String[] args) throws Exception { - Connection c = new Connection("pc18"); - while(true) { - while(c.resps.size() > 0) { - System.out.println(c.resps.remove(0)); - } - synchronized(c.resps) { - c.resps.wait(); - } - } + protected void finalize() { + close(); } } diff --git a/lib/java/dolda/dolcon/Response.java b/lib/java/dolda/dolcon/Response.java index 5ce1115..d5ee921 100644 --- a/lib/java/dolda/dolcon/Response.java +++ b/lib/java/dolda/dolcon/Response.java @@ -3,10 +3,11 @@ package dolda.dolcon; import java.util.*; public class Response { - List>lines; + List> lines; + Command cmd; int code; - public Response(int code, List>lines) { + public Response(int code, List> lines) { this.code = code; this.lines = lines; } @@ -14,4 +15,8 @@ public class Response { public String toString() { return("Response " + code + ": " + lines.toString()); } + + public String token(int line, int token) { + return(lines.get(line).get(token)); + } } diff --git a/lib/java/dolda/dolcon/VersionException.java b/lib/java/dolda/dolcon/VersionException.java new file mode 100644 index 0000000..ca3bc74 --- /dev/null +++ b/lib/java/dolda/dolcon/VersionException.java @@ -0,0 +1,12 @@ +package dolda.dolcon; + +public class VersionException extends Exception { + public final int r, l, h; + + public VersionException(int r, int l, int h) { + super("Unexpected protocol revision: " + l + "-" + h + ", wanted " + r); + this.r = r; + this.l = l; + this.h = h; + } +}