Java: Added NotifyListeners.
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
index 5d805e6..03152bc 100644 (file)
@@ -14,20 +14,25 @@ public class Connection {
     private String aspec;
     private String state;
     private Set<ConnectListener> connls = new HashSet<ConnectListener>();
+    private Set<NotifyListener> notls = new HashSet<NotifyListener>();
     private Exception error;
     
     public interface ConnectListener {
        public void connected() throws Exception;
        public void error(Exception cause);
     }
-
+    
     public Connection(String aspec) {
        this.aspec = aspec;
        state = "idle";
     }
     
     public void connect() throws ConnectException {
-       state = "connecting";
+       synchronized(this) {
+           if(state != "idle")
+               throw(new IllegalStateException("Already connected"));
+           state = "connecting";
+       }
        try {
            s = new Socket(aspec, 1500);
        } catch(java.net.UnknownHostException e) {
@@ -37,7 +42,7 @@ public class Connection {
        }
        pending = new LinkedList<Command>();
        Command ccmd = new Command(".connect");
-       ccmd.addListener(new Command.Listener() {
+       ccmd.new Listener() {
                public void done(Response resp) throws Exception {
                    try {
                        checkver(resp);
@@ -45,8 +50,10 @@ public class Connection {
                        error(e);
                        throw(e);
                    }
-                   synchronized(connls) {
+                   synchronized(Connection.this) {
                        state = "connected";
+                   }
+                   synchronized(connls) {
                        try {
                            for(ConnectListener l : connls)
                                l.connected();
@@ -66,53 +73,47 @@ public class Connection {
                        }
                    }
                }
-           });
-       pending.offer(ccmd);
-       reader = new Reader(s, pending);
-       writer = new Writer(s, queue, pending);
-       Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
-               public void uncaughtException(Thread t, Throwable c) {
-                   boolean n = false;
-                   if(c instanceof StopCondition) {
-                       StopCondition s = (StopCondition)c;
-                       n = s.normal;
-                       c = s.getCause();
-                   }
-                   Exception e;
-                   if(c instanceof Exception)
-                       e = (Exception)c;
-                   else
-                       e = new Exception(c);
-                   if(!n) {
-                       close();
-                       error = e;
-                   }
-                   synchronized(pending) {
-                       Command cmd;
-                       while((cmd = pending.poll()) != null) {
-                           cmd.error(e);
-                       }
-                   }
-                   synchronized(queue) {
-                       Command cmd;
-                       while((cmd = queue.poll()) != null) {
-                           cmd.error(e);
-                       }
-                   }
-               }
            };
-       reader.setUncaughtExceptionHandler(h);
-       writer.setUncaughtExceptionHandler(h);
+       pending.offer(ccmd);
+       reader = new Reader();
+       writer = new Writer();
        reader.start();
        writer.start();
     }
     
+    private void error(Throwable c) {
+       boolean n = false;
+       if(c instanceof StopCondition) {
+           StopCondition s = (StopCondition)c;
+           n = s.normal;
+           c = s.getCause();
+       }
+       Exception e;
+       if(c instanceof Exception)
+           e = (Exception)c;
+       else
+           e = new Exception(c);
+       if(!n) {
+           close();
+           error = e;
+       }
+       synchronized(queue) {
+           Command cmd;
+           while((cmd = pending.poll()) != null) {
+               cmd.error(e);
+           }
+           while((cmd = queue.poll()) != null) {
+               cmd.error(e);
+           }
+       }
+    }
+    
     private void checkthread() {
        if(Thread.currentThread() == reader)
            throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
     }
         
-    public void syncConnect() throws ConnectException, ClosedException, InterruptedException {
+    public void syncConnect() throws ConnectException, InterruptedException {
        checkthread();
        final boolean[] donep = new boolean[] {false};
        final Exception[] errp = new Exception[] {null};
@@ -140,7 +141,7 @@ public class Connection {
            }
        }
        if(errp[0] != null)
-           throw(new ClosedException(errp[0]));
+           throw(new ConnectException("DC connection has been closed", errp[0]));
     }
 
     public void expectVersion(int reqver) {
@@ -162,21 +163,69 @@ public class Connection {
        return(error);
     }
 
-    public void addConnectListener(ConnectListener l) {
+    public void addNotifyListener(NotifyListener l) {
+       synchronized(notls) {
+           notls.add(l);
+       }
+    }
+
+    public void removeNotifyListener(NotifyListener l) {
+       synchronized(notls) {
+           notls.remove(l);
+       }
+    }
+
+    public synchronized void addConnectListener(ConnectListener l) {
+       if((state != "idle") && (state != "connecting"))
+           throw(new IllegalStateException("Already connected"));
        synchronized(connls) {
-           if((state != "idle") && (state != "connecting"))
-               throw(new IllegalStateException("Already connected"));
            connls.add(l);
        }
     }
 
-    private void qcmd(Command cmd) {
+    public void qcmd(Command cmd) {
        synchronized(queue) {
            queue.offer(cmd);
            queue.notifyAll();
        }
     }
     
+    public void qcmd(String... tokens) {
+       qcmd(new Command(tokens));
+    }
+    
+    public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
+       checkthread();
+       final boolean[] donep = new boolean[] {false};
+       final Response[] resp = new Response[] {null};
+       final Exception[] errp = new Exception[] {null};
+       Object l = cmd.new Listener() {
+               public synchronized void done(Response rsp) {
+                   resp[0] = rsp;
+                   donep[0] = true;
+                   notifyAll();
+               }
+               
+               public synchronized void error(Exception e) {
+                   errp[0] = e;
+                   donep[0] = true;
+                   notifyAll();
+               }
+           };
+       synchronized(l) {
+           while(!donep[0]) {
+               l.wait();
+           }
+       }
+       if(errp[0] != null)
+           throw(new ClosedException(errp[0]));
+       return(resp[0]);
+    }
+    
+    public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
+       return(ecmd(new Command(tokens)));
+    }
+    
     static private class StopCondition extends Error {
        final boolean normal;
        
@@ -186,14 +235,8 @@ public class Connection {
        }
     }
     
-    static private class Writer extends Thread {
-       Socket s;
-       Queue<Command> queue, pending;
-       
-       public Writer(Socket s, Queue<Command> queue, Queue<Command> pending) {
-           this.s = s;
-           this.queue = queue;
-           this.pending = pending;
+    private class Writer extends Thread {
+       public Writer() {
            setDaemon(true);
        }
        
@@ -219,22 +262,18 @@ public class Connection {
                return(sb.toString());
        }
 
-       public void run() {
+       private void guarded() {
            try {
                java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
                while(true) {
                    Command cmd;
                    try {
-                       synchronized(pending) {
-                           while(pending.size() > 0)
-                               pending.wait();
-                       }
                        synchronized(queue) {
-                           do {
-                               if((cmd = queue.poll()) != null)
-                                   break;
+                           while(pending.size() > 0)
+                               queue.wait();
+                           while((cmd = queue.poll()) == null)
                                queue.wait();
-                           } while(true);
+                           pending.offer(cmd);
                        }
                    } catch(InterruptedException e) {
                        throw(new StopCondition(e, true));
@@ -251,28 +290,38 @@ public class Connection {
                throw(new StopCondition(e, false));
            }
        }
-    }
-
-    static private class Reader extends Thread {
-       Socket s;
-       Queue<Command> pending;
        
-       public Reader(Socket s, Queue<Command> pending) {
-           this.s = s;
-           this.pending = pending;
+       public void run() {
+           try {
+               guarded();
+           } catch(Throwable t) {
+               error(t);
+           }
        }
-       
+    }
+
+    private class Reader extends Thread {
        private void dispatch(Response resp) throws Exception {
            if(resp.code < 600) {
-               synchronized(pending) {
-                   resp.cmd = pending.remove();
-                   pending.notifyAll();
+               synchronized(queue) {
+                   try {
+                       resp.cmd = pending.remove();
+                   } catch(NoSuchElementException e) {
+                       throw(new RuntimeException("DC server sent reply without a pending command"));
+                   }
+                   queue.notifyAll();
                }
                resp.cmd.done(resp);
+           } else {
+               synchronized(notls) {
+                   for(NotifyListener l : notls) {
+                       l.notified(resp);
+                   }
+               }
            }
        }
 
-       public void run() {
+       private void guarded() {
            try {
                java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
                String state = "start";
@@ -369,18 +418,21 @@ public class Connection {
                throw(new StopCondition(e, false));
            }
        }
+       
+       public void run() {
+           try {
+               guarded();
+           } catch(Throwable t) {
+               error(t);
+           }
+       }
     }
 
     public void close() {
        try {
            s.close();
-       } catch(IOException e) {
-       }
+       } catch(IOException e) {}
        reader.interrupt();
        writer.interrupt();
     }
-
-    protected void finalize() {
-       close();
-    }
 }