Reworked the Java connection handler a bit.
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
1 package dolda.dolcon.protocol;
2
3 import java.io.*;
4 import java.net.Socket;
5 import java.util.*;
6
7 public class Connection {
8     private Socket s;
9     private Reader reader;
10     private Writer writer;
11     private Queue<Command> queue = new LinkedList<Command>();
12     private Queue<Command> pending = new LinkedList<Command>();
13     private int reqver = 2, revlo, revhi;
14     private String aspec;
15     private String state;
16     private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17     private Exception error;
18     
19     public interface ConnectListener {
20         public void connected() throws Exception;
21         public void error(Exception cause);
22     }
23
24     public Connection(String aspec) {
25         this.aspec = aspec;
26         state = "idle";
27     }
28     
29     public void connect() throws ConnectException {
30         synchronized(this) {
31             if(state != "idle")
32                 throw(new IllegalStateException("Already connected"));
33             state = "connecting";
34         }
35         try {
36             s = new Socket(aspec, 1500);
37         } catch(java.net.UnknownHostException e) {
38             throw(new ConnectException("Could not resolve host " + aspec, e));
39         } catch(IOException e) {
40             throw(new ConnectException("Could not connect to host " + aspec, e));
41         }
42         pending = new LinkedList<Command>();
43         Command ccmd = new Command(".connect");
44         ccmd.addListener(new Command.Listener() {
45                 public void done(Response resp) throws Exception {
46                     try {
47                         checkver(resp);
48                     } catch(VersionException e) {
49                         error(e);
50                         throw(e);
51                     }
52                     synchronized(Connection.this) {
53                         state = "connected";
54                     }
55                     synchronized(connls) {
56                         try {
57                             for(ConnectListener l : connls)
58                                 l.connected();
59                         } finally {
60                             connls.clear();
61                         }
62                     }
63                 }
64                 
65                 public void error(Exception cause) {
66                     synchronized(connls) {
67                         try {
68                             for(ConnectListener l : connls)
69                                 l.error(cause);
70                         } finally {
71                             connls.clear();
72                         }
73                     }
74                 }
75             });
76         pending.offer(ccmd);
77         reader = new Reader();
78         writer = new Writer();
79         reader.start();
80         writer.start();
81     }
82     
83     private void error(Throwable c) {
84         boolean n = false;
85         if(c instanceof StopCondition) {
86             StopCondition s = (StopCondition)c;
87             n = s.normal;
88             c = s.getCause();
89         }
90         Exception e;
91         if(c instanceof Exception)
92             e = (Exception)c;
93         else
94             e = new Exception(c);
95         if(!n) {
96             close();
97             error = e;
98         }
99         synchronized(queue) {
100             Command cmd;
101             while((cmd = pending.poll()) != null) {
102                 cmd.error(e);
103             }
104             while((cmd = queue.poll()) != null) {
105                 cmd.error(e);
106             }
107         }
108     }
109     
110     private void checkthread() {
111         if(Thread.currentThread() == reader)
112             throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
113     }
114         
115     public void syncConnect() throws ConnectException, ClosedException, InterruptedException {
116         checkthread();
117         final boolean[] donep = new boolean[] {false};
118         final Exception[] errp = new Exception[] {null};
119         ConnectListener l = new ConnectListener() {
120                 public void connected() {
121                     donep[0] = true;
122                     synchronized(this) {
123                         notifyAll();
124                     }
125                 }
126                 
127                 public void error(Exception cause) {
128                     donep[0] = true;
129                     errp[0] = cause;
130                     synchronized(this) {
131                         notifyAll();
132                     }
133                 }
134             };
135         addConnectListener(l);
136         connect();
137         while(!donep[0]) {
138             synchronized(l) {
139                 l.wait();
140             }
141         }
142         if(errp[0] != null)
143             throw(new ClosedException(errp[0]));
144     }
145
146     public void expectVersion(int reqver) {
147         this.reqver = reqver;
148     }
149     
150     private void checkver(Response resp) throws VersionException {
151         revlo = Integer.parseInt(resp.token(0, 0));
152         revhi = Integer.parseInt(resp.token(0, 1));
153         if((reqver < revlo) || (reqver > revhi))
154             throw(new VersionException(reqver, revlo, revhi));
155     }
156
157     public Exception join() throws InterruptedException {
158         while(reader.isAlive()) {
159             reader.join();
160         }
161         close();
162         return(error);
163     }
164
165     public synchronized void addConnectListener(ConnectListener l) {
166         if((state != "idle") && (state != "connecting"))
167             throw(new IllegalStateException("Already connected"));
168         synchronized(connls) {
169             connls.add(l);
170         }
171     }
172
173     private void qcmd(Command cmd) {
174         synchronized(queue) {
175             queue.offer(cmd);
176             queue.notifyAll();
177         }
178     }
179     
180     static private class StopCondition extends Error {
181         final boolean normal;
182         
183         public StopCondition(Exception cause, boolean normal) {
184             super(cause);
185             this.normal = normal;
186         }
187     }
188     
189     private class Writer extends Thread {
190         public Writer() {
191             setDaemon(true);
192         }
193         
194         private String quote(String t) {
195             if(t.length() == 0)
196                 return("\"\"");
197             StringBuilder sb = new StringBuilder();
198             boolean quote = false;
199             for(int i = 0; i < t.length(); i++) {
200                 char c = t.charAt(i);
201                 if(c == '\"') {
202                     sb.append("\\\"");
203                 } else if(Character.isWhitespace(c)) {
204                     quote = true;
205                     sb.append(c);
206                 } else {
207                     sb.append(c);
208                 }
209             }
210             if(quote)
211                 return("\"" + sb.toString() + "\"");
212             else
213                 return(sb.toString());
214         }
215
216         private void guarded() {
217             try {
218                 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
219                 while(true) {
220                     Command cmd;
221                     try {
222                         synchronized(queue) {
223                             while(pending.size() > 0)
224                                 queue.wait();
225                             while((cmd = queue.poll()) == null)
226                                 queue.wait();
227                             pending.offer(cmd);
228                         }
229                     } catch(InterruptedException e) {
230                         throw(new StopCondition(e, true));
231                     }
232                     StringBuilder out = new StringBuilder();
233                     for(String s : cmd.tokens) {
234                         if(out.length() > 0)
235                             out.append(' ');
236                         out.append(quote(s));
237                     }
238                     w.write(out.toString());
239                 }
240             } catch(IOException e) {
241                 throw(new StopCondition(e, false));
242             }
243         }
244         
245         public void run() {
246             try {
247                 guarded();
248             } catch(Throwable t) {
249                 error(t);
250             }
251         }
252     }
253
254     private class Reader extends Thread {
255         private void dispatch(Response resp) throws Exception {
256             if(resp.code < 600) {
257                 synchronized(queue) {
258                     try {
259                         resp.cmd = pending.remove();
260                     } catch(NoSuchElementException e) {
261                         throw(new RuntimeException("DC server sent reply without a pending command"));
262                     }
263                     queue.notifyAll();
264                 }
265                 resp.cmd.done(resp);
266             }
267         }
268
269         private void guarded() {
270             try {
271                 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
272                 String state = "start";
273                 StringBuilder ct = new StringBuilder();
274                 int code = -1;
275                 boolean last = true;
276                 List<List<String>> lines = new LinkedList<List<String>>();
277                 List<String> tokens = new LinkedList<String>();
278                 while(true) {
279                     char c;
280                     {
281                         int i;
282                         try {
283                             if((i = r.read()) < 0) {
284                                 throw(new IOException("The server closed the connection"));
285                             }
286                         } catch(java.nio.channels.ClosedByInterruptException e) {
287                             throw(new StopCondition(e, true));
288                         }
289                         c = (char)i;
290                     }
291                     eat: do {
292                         if(state == "start") {
293                             if(c == '\r') {
294                                 state = "nl";
295                             } else if(Character.isWhitespace(c)) {
296                             } else {
297                                 if(code == -1)
298                                     state = "code";
299                                 else
300                                     state = "token";
301                                 continue eat;
302                             }
303                         } else if(state == "nl") {
304                             if(c == '\n') {
305                                 if((code < 100) || (code >= 1000)) {
306                                     throw(new IOException("Illegal response code " + code + " from the server"));
307                                 }
308                                 lines.add(tokens);
309                                 tokens = new LinkedList<String>();
310                                 if(last) {
311                                     dispatch(new Response(code, lines));
312                                     lines = new LinkedList<List<String>>();
313                                 }
314                                 code = -1;
315                                 state = "start";
316                             } else {
317                                 state = "start";
318                                 continue eat;
319                             }
320                         } else if(state == "code") {
321                             if((c == '-') || Character.isWhitespace(c)) {
322                                 last = c != '-';
323                                 code = Integer.parseInt(ct.toString());
324                                 ct.setLength(0);
325                                 state = "start";
326                                 continue eat;
327                             } else {
328                                 ct.append(c);
329                             }
330                         } else if(state == "token") {
331                             if(Character.isWhitespace(c)) {
332                                 tokens.add(ct.toString());
333                                 ct.setLength(0);
334                                 state = "start";
335                                 continue eat;
336                             } else if(c == '\\') {
337                                 state = "bs";
338                             } else if(c == '"') {
339                                 state = "cited";
340                             } else {
341                                 ct.append(c);
342                             }
343                         } else if(state == "bs") {
344                             ct.append(c);
345                             state = "token";
346                         } else if(state == "cited") {
347                             if(c == '\\')
348                                 state = "cbs";
349                             else if(c == '"')
350                                 state = "token";
351                             else
352                                 ct.append(c);
353                         } else if(state == "cbs") {
354                             ct.append(c);
355                             state = "cited";
356                         } else {
357                             throw(new Error("invalid state " + state));
358                         }
359                         break;
360                     } while(true);
361                 }
362             } catch(Exception e) {
363                 throw(new StopCondition(e, false));
364             }
365         }
366         
367         public void run() {
368             try {
369                 guarded();
370             } catch(Throwable t) {
371                 error(t);
372             }
373         }
374     }
375
376     public void close() {
377         try {
378             s.close();
379         } catch(IOException e) {}
380         reader.interrupt();
381         writer.interrupt();
382     }
383 }