From 4b9878719cd3f8ee3279da419119ccb2fecd4266 Mon Sep 17 00:00:00 2001 From: Fredrik Tolf Date: Sun, 27 Jan 2008 19:21:28 +0100 Subject: [PATCH] Reworked the Java connection handler a bit. --- lib/java/dolda/dolcon/protocol/Connection.java | 149 ++++++++++++------------- 1 file changed, 73 insertions(+), 76 deletions(-) diff --git a/lib/java/dolda/dolcon/protocol/Connection.java b/lib/java/dolda/dolcon/protocol/Connection.java index 5d805e6..20c06d6 100644 --- a/lib/java/dolda/dolcon/protocol/Connection.java +++ b/lib/java/dolda/dolcon/protocol/Connection.java @@ -27,7 +27,11 @@ public class Connection { } public void connect() throws ConnectException { - state = "connecting"; + synchronized(this) { + if(state != "idle") + throw(new IllegalStateException("Already connected")); + state = "connecting"; + } try { s = new Socket(aspec, 1500); } catch(java.net.UnknownHostException e) { @@ -45,8 +49,10 @@ public class Connection { error(e); throw(e); } - synchronized(connls) { + synchronized(Connection.this) { state = "connected"; + } + synchronized(connls) { try { for(ConnectListener l : connls) l.connected(); @@ -68,45 +74,39 @@ public class Connection { } }); pending.offer(ccmd); - reader = new Reader(s, pending); - writer = new Writer(s, queue, pending); - Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() { - public void uncaughtException(Thread t, Throwable c) { - boolean n = false; - if(c instanceof StopCondition) { - StopCondition s = (StopCondition)c; - n = s.normal; - c = s.getCause(); - } - Exception e; - if(c instanceof Exception) - e = (Exception)c; - else - e = new Exception(c); - if(!n) { - close(); - error = e; - } - synchronized(pending) { - Command cmd; - while((cmd = pending.poll()) != null) { - cmd.error(e); - } - } - synchronized(queue) { - Command cmd; - while((cmd = queue.poll()) != null) { - cmd.error(e); - } - } - } - }; - reader.setUncaughtExceptionHandler(h); - writer.setUncaughtExceptionHandler(h); + reader = new Reader(); + writer = new Writer(); reader.start(); writer.start(); } + private void error(Throwable c) { + boolean n = false; + if(c instanceof StopCondition) { + StopCondition s = (StopCondition)c; + n = s.normal; + c = s.getCause(); + } + Exception e; + if(c instanceof Exception) + e = (Exception)c; + else + e = new Exception(c); + if(!n) { + close(); + error = e; + } + synchronized(queue) { + Command cmd; + while((cmd = pending.poll()) != null) { + cmd.error(e); + } + while((cmd = queue.poll()) != null) { + cmd.error(e); + } + } + } + private void checkthread() { if(Thread.currentThread() == reader) throw(new RuntimeException("Cannot call synchronous method with dispatch thread!")); @@ -162,10 +162,10 @@ public class Connection { return(error); } - public void addConnectListener(ConnectListener l) { + public synchronized void addConnectListener(ConnectListener l) { + if((state != "idle") && (state != "connecting")) + throw(new IllegalStateException("Already connected")); synchronized(connls) { - if((state != "idle") && (state != "connecting")) - throw(new IllegalStateException("Already connected")); connls.add(l); } } @@ -186,14 +186,8 @@ public class Connection { } } - static private class Writer extends Thread { - Socket s; - Queue queue, pending; - - public Writer(Socket s, Queue queue, Queue pending) { - this.s = s; - this.queue = queue; - this.pending = pending; + private class Writer extends Thread { + public Writer() { setDaemon(true); } @@ -219,22 +213,18 @@ public class Connection { return(sb.toString()); } - public void run() { + private void guarded() { try { java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8"); while(true) { Command cmd; try { - synchronized(pending) { - while(pending.size() > 0) - pending.wait(); - } synchronized(queue) { - do { - if((cmd = queue.poll()) != null) - break; + while(pending.size() > 0) + queue.wait(); + while((cmd = queue.poll()) == null) queue.wait(); - } while(true); + pending.offer(cmd); } } catch(InterruptedException e) { throw(new StopCondition(e, true)); @@ -251,28 +241,32 @@ public class Connection { throw(new StopCondition(e, false)); } } - } - - static private class Reader extends Thread { - Socket s; - Queue pending; - public Reader(Socket s, Queue pending) { - this.s = s; - this.pending = pending; + public void run() { + try { + guarded(); + } catch(Throwable t) { + error(t); + } } - + } + + private class Reader extends Thread { private void dispatch(Response resp) throws Exception { if(resp.code < 600) { - synchronized(pending) { - resp.cmd = pending.remove(); - pending.notifyAll(); + synchronized(queue) { + try { + resp.cmd = pending.remove(); + } catch(NoSuchElementException e) { + throw(new RuntimeException("DC server sent reply without a pending command")); + } + queue.notifyAll(); } resp.cmd.done(resp); } } - public void run() { + private void guarded() { try { java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8")); String state = "start"; @@ -369,18 +363,21 @@ public class Connection { throw(new StopCondition(e, false)); } } + + public void run() { + try { + guarded(); + } catch(Throwable t) { + error(t); + } + } } public void close() { try { s.close(); - } catch(IOException e) { - } + } catch(IOException e) {} reader.interrupt(); writer.interrupt(); } - - protected void finalize() { - close(); - } } -- 2.11.0