Java: Began work on hub listeners.
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
1 package dolda.dolcon.protocol;
2
3 import java.io.*;
4 import java.net.Socket;
5 import java.util.*;
6
7 public class Connection {
8     private Socket s;
9     private Reader reader;
10     private Writer writer;
11     private Queue<Command> queue = new LinkedList<Command>();
12     private Queue<Command> pending = new LinkedList<Command>();
13     private int reqver = 2, revlo, revhi;
14     private String aspec;
15     private String state;
16     private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17     private Set<NotifyListener> notls = new HashSet<NotifyListener>();
18     private Exception error;
19     
20     public interface ConnectListener {
21         public void connected() throws Exception;
22         public void error(Exception cause);
23     }
24     
25     public Connection(String aspec) {
26         this.aspec = aspec;
27         state = "idle";
28     }
29     
30     public void connect() throws ConnectException {
31         synchronized(this) {
32             if(state != "idle")
33                 throw(new IllegalStateException("Already connected"));
34             state = "connecting";
35         }
36         try {
37             s = new Socket(aspec, 1500);
38         } catch(java.net.UnknownHostException e) {
39             throw(new ConnectException("Could not resolve host " + aspec, e));
40         } catch(IOException e) {
41             throw(new ConnectException("Could not connect to host " + aspec, e));
42         }
43         pending = new LinkedList<Command>();
44         Command ccmd = new Command(".connect");
45         ccmd.new Listener() {
46                 public void done(Response resp) throws Exception {
47                     try {
48                         checkver(resp);
49                     } catch(VersionException e) {
50                         error(e);
51                         throw(e);
52                     }
53                     synchronized(Connection.this) {
54                         state = "connected";
55                     }
56                     synchronized(connls) {
57                         try {
58                             for(ConnectListener l : connls)
59                                 l.connected();
60                         } finally {
61                             connls.clear();
62                         }
63                     }
64                 }
65                 
66                 public void error(Exception cause) {
67                     synchronized(connls) {
68                         try {
69                             for(ConnectListener l : connls)
70                                 l.error(cause);
71                         } finally {
72                             connls.clear();
73                         }
74                     }
75                 }
76             };
77         pending.offer(ccmd);
78         reader = new Reader();
79         writer = new Writer();
80         reader.start();
81         writer.start();
82     }
83     
84     private void error(Throwable c) {
85         boolean n = false;
86         if(c instanceof StopCondition) {
87             StopCondition s = (StopCondition)c;
88             n = s.normal;
89             c = s.getCause();
90         }
91         Exception e;
92         if(c instanceof Exception)
93             e = (Exception)c;
94         else
95             e = new Exception(c);
96         if(!n) {
97             close();
98             error = e;
99         }
100         synchronized(queue) {
101             Command cmd;
102             while((cmd = pending.poll()) != null) {
103                 cmd.error(e);
104             }
105             while((cmd = queue.poll()) != null) {
106                 cmd.error(e);
107             }
108         }
109     }
110     
111     private void checkthread() {
112         if(Thread.currentThread() == reader)
113             throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
114     }
115         
116     public void syncConnect() throws ConnectException, InterruptedException {
117         checkthread();
118         final boolean[] donep = new boolean[] {false};
119         final Exception[] errp = new Exception[] {null};
120         ConnectListener l = new ConnectListener() {
121                 public void connected() {
122                     donep[0] = true;
123                     synchronized(this) {
124                         notifyAll();
125                     }
126                 }
127                 
128                 public void error(Exception cause) {
129                     donep[0] = true;
130                     errp[0] = cause;
131                     synchronized(this) {
132                         notifyAll();
133                     }
134                 }
135             };
136         addConnectListener(l);
137         connect();
138         while(!donep[0]) {
139             synchronized(l) {
140                 l.wait();
141             }
142         }
143         if(errp[0] != null)
144             throw(new ConnectException("DC connection has been closed", errp[0]));
145     }
146
147     public void expectVersion(int reqver) {
148         this.reqver = reqver;
149     }
150     
151     private void checkver(Response resp) throws VersionException {
152         revlo = Integer.parseInt(resp.token(0, 0));
153         revhi = Integer.parseInt(resp.token(0, 1));
154         if((reqver < revlo) || (reqver > revhi))
155             throw(new VersionException(reqver, revlo, revhi));
156     }
157
158     public Exception join() throws InterruptedException {
159         while(reader.isAlive()) {
160             reader.join();
161         }
162         close();
163         return(error);
164     }
165
166     public void addNotifyListener(NotifyListener l) {
167         synchronized(notls) {
168             notls.add(l);
169         }
170     }
171
172     public void removeNotifyListener(NotifyListener l) {
173         synchronized(notls) {
174             notls.remove(l);
175         }
176     }
177
178     public synchronized void addConnectListener(ConnectListener l) {
179         if((state != "idle") && (state != "connecting"))
180             throw(new IllegalStateException("Already connected"));
181         synchronized(connls) {
182             connls.add(l);
183         }
184     }
185
186     public void qcmd(Command... cmds) {
187         synchronized(queue) {
188             for(Command cmd : cmds)
189                 queue.offer(cmd);
190             queue.notifyAll();
191         }
192     }
193     
194     public void qcmd(String... tokens) {
195         qcmd(new Command(tokens));
196     }
197     
198     public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
199         checkthread();
200         final boolean[] donep = new boolean[] {false};
201         final Response[] resp = new Response[] {null};
202         final Exception[] errp = new Exception[] {null};
203         Object l = cmd.new Listener() {
204                 public synchronized void done(Response rsp) {
205                     resp[0] = rsp;
206                     donep[0] = true;
207                     notifyAll();
208                 }
209                 
210                 public synchronized void error(Exception e) {
211                     errp[0] = e;
212                     donep[0] = true;
213                     notifyAll();
214                 }
215             };
216         qcmd(cmd);
217         synchronized(l) {
218             while(!donep[0]) {
219                 l.wait();
220             }
221         }
222         if(errp[0] != null)
223             throw(new ClosedException(errp[0]));
224         return(resp[0]);
225     }
226     
227     public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
228         return(ecmd(new Command(tokens)));
229     }
230     
231     static private class StopCondition extends Error {
232         final boolean normal;
233         
234         public StopCondition(Exception cause, boolean normal) {
235             super(cause);
236             this.normal = normal;
237         }
238     }
239     
240     private class Writer extends Thread {
241         public Writer() {
242             setDaemon(true);
243         }
244         
245         private String quote(String t) {
246             if(t.length() == 0)
247                 return("\"\"");
248             StringBuilder sb = new StringBuilder();
249             boolean quote = false;
250             for(int i = 0; i < t.length(); i++) {
251                 char c = t.charAt(i);
252                 if(c == '\"') {
253                     sb.append("\\\"");
254                 } else if(Character.isWhitespace(c)) {
255                     quote = true;
256                     sb.append(c);
257                 } else {
258                     sb.append(c);
259                 }
260             }
261             if(quote)
262                 return("\"" + sb.toString() + "\"");
263             else
264                 return(sb.toString());
265         }
266
267         private void guarded() {
268             try {
269                 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
270                 while(true) {
271                     Command cmd;
272                     try {
273                         synchronized(queue) {
274                             while(pending.size() > 0)
275                                 queue.wait();
276                             while((cmd = queue.poll()) == null)
277                                 queue.wait();
278                             pending.offer(cmd);
279                         }
280                     } catch(InterruptedException e) {
281                         throw(new StopCondition(e, true));
282                     }
283                     StringBuilder out = new StringBuilder();
284                     for(String s : cmd.tokens) {
285                         if(out.length() > 0)
286                             out.append(' ');
287                         out.append(quote(s));
288                     }
289                     out.append("\r\n");
290                     w.write(out.toString());
291                     w.flush();
292                 }
293             } catch(IOException e) {
294                 throw(new StopCondition(e, false));
295             }
296         }
297         
298         public void run() {
299             try {
300                 guarded();
301             } catch(Throwable t) {
302                 error(t);
303             }
304         }
305     }
306
307     private class Reader extends Thread {
308         private void dispatch(Response resp) throws Exception {
309             if(resp.code < 600) {
310                 synchronized(queue) {
311                     try {
312                         resp.cmd = pending.remove();
313                     } catch(NoSuchElementException e) {
314                         throw(new RuntimeException("DC server sent reply without a pending command"));
315                     }
316                     queue.notifyAll();
317                 }
318                 resp.cmd.done(resp);
319             } else {
320                 synchronized(notls) {
321                     for(NotifyListener l : notls) {
322                         l.notified(resp);
323                     }
324                 }
325             }
326         }
327
328         private void guarded() {
329             try {
330                 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
331                 String state = "start";
332                 StringBuilder ct = new StringBuilder();
333                 int code = -1;
334                 boolean last = true;
335                 List<List<String>> lines = new LinkedList<List<String>>();
336                 List<String> tokens = new LinkedList<String>();
337                 while(true) {
338                     char c;
339                     {
340                         int i;
341                         try {
342                             if((i = r.read()) < 0) {
343                                 throw(new IOException("The server closed the connection"));
344                             }
345                         } catch(java.nio.channels.ClosedByInterruptException e) {
346                             throw(new StopCondition(e, true));
347                         }
348                         c = (char)i;
349                     }
350                     eat: do {
351                         if(state == "start") {
352                             if(c == '\r') {
353                                 state = "nl";
354                             } else if(Character.isWhitespace(c)) {
355                             } else {
356                                 if(code == -1)
357                                     state = "code";
358                                 else
359                                     state = "token";
360                                 continue eat;
361                             }
362                         } else if(state == "nl") {
363                             if(c == '\n') {
364                                 if((code < 100) || (code >= 1000)) {
365                                     throw(new IOException("Illegal response code " + code + " from the server"));
366                                 }
367                                 lines.add(tokens);
368                                 tokens = new LinkedList<String>();
369                                 if(last) {
370                                     dispatch(new Response(code, lines));
371                                     lines = new LinkedList<List<String>>();
372                                 }
373                                 code = -1;
374                                 state = "start";
375                             } else {
376                                 state = "start";
377                                 continue eat;
378                             }
379                         } else if(state == "code") {
380                             if((c == '-') || Character.isWhitespace(c)) {
381                                 last = c != '-';
382                                 code = Integer.parseInt(ct.toString());
383                                 ct.setLength(0);
384                                 state = "start";
385                             } else {
386                                 ct.append(c);
387                             }
388                         } else if(state == "token") {
389                             if(Character.isWhitespace(c)) {
390                                 tokens.add(ct.toString());
391                                 ct.setLength(0);
392                                 state = "start";
393                                 continue eat;
394                             } else if(c == '\\') {
395                                 state = "bs";
396                             } else if(c == '"') {
397                                 state = "cited";
398                             } else {
399                                 ct.append(c);
400                             }
401                         } else if(state == "bs") {
402                             ct.append(c);
403                             state = "token";
404                         } else if(state == "cited") {
405                             if(c == '\\')
406                                 state = "cbs";
407                             else if(c == '"')
408                                 state = "token";
409                             else
410                                 ct.append(c);
411                         } else if(state == "cbs") {
412                             ct.append(c);
413                             state = "cited";
414                         } else {
415                             throw(new Error("invalid state " + state));
416                         }
417                         break;
418                     } while(true);
419                 }
420             } catch(Exception e) {
421                 throw(new StopCondition(e, false));
422             }
423         }
424         
425         public void run() {
426             try {
427                 guarded();
428             } catch(Throwable t) {
429                 error(t);
430             }
431         }
432     }
433
434     public void close() {
435         try {
436             s.close();
437         } catch(IOException e) {}
438         reader.interrupt();
439         writer.interrupt();
440     }
441 }