b31151e496a6f0184a3b5fb3d8ebb4ba32c4ace8
[doldaconnect.git] / lib / java / dolda / dolcon / Connection.java
1 package dolda.dolcon;
2
3 import java.io.*;
4 import java.net.Socket;
5 import java.util.*;
6
7 public class Connection {
8     private Socket s;
9     private Reader reader;
10     private Writer writer;
11     private Queue<Command> queue = new LinkedList<Command>();
12     private Queue<Command> pending = new LinkedList<Command>();
13     private int reqver = 2, revlo, revhi;
14     private String aspec;
15     private String state;
16     private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17     private Exception error;
18     
19     public interface ConnectListener {
20         public void connected() throws Exception;
21         public void error(Exception cause);
22     }
23
24     public Connection(String aspec) {
25         this.aspec = aspec;
26         state = "idle";
27     }
28     
29     public void connect() throws ConnectException {
30         state = "connecting";
31         try {
32             s = new Socket(aspec, 1500);
33         } catch(java.net.UnknownHostException e) {
34             throw(new ConnectException("Could not resolve host " + aspec, e));
35         } catch(IOException e) {
36             throw(new ConnectException("Could not connect to host " + aspec, e));
37         }
38         pending = new LinkedList<Command>();
39         Command ccmd = new Command(".connect");
40         ccmd.addListener(new Command.Listener() {
41                 public void done(Response resp) throws Exception {
42                     try {
43                         checkver(resp);
44                     } catch(VersionException e) {
45                         error(e);
46                         throw(e);
47                     }
48                     synchronized(connls) {
49                         state = "connected";
50                         try {
51                             for(ConnectListener l : connls)
52                                 l.connected();
53                         } finally {
54                             connls.clear();
55                         }
56                     }
57                 }
58                 
59                 public void error(Exception cause) {
60                     synchronized(connls) {
61                         try {
62                             for(ConnectListener l : connls)
63                                 l.error(cause);
64                         } finally {
65                             connls.clear();
66                         }
67                     }
68                 }
69             });
70         pending.offer(ccmd);
71         reader = new Reader(s, pending);
72         writer = new Writer(s, queue, pending);
73         Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
74                 public void uncaughtException(Thread t, Throwable c) {
75                     boolean n = false;
76                     if(c instanceof StopCondition) {
77                         StopCondition s = (StopCondition)c;
78                         n = s.normal;
79                         c = s.getCause();
80                     }
81                     Exception e;
82                     if(c instanceof Exception)
83                         e = (Exception)c;
84                     else
85                         e = new Exception(c);
86                     if(!n) {
87                         close();
88                         error = e;
89                     }
90                     synchronized(pending) {
91                         Command cmd;
92                         while((cmd = pending.poll()) != null) {
93                             cmd.error(e);
94                         }
95                     }
96                     synchronized(queue) {
97                         Command cmd;
98                         while((cmd = queue.poll()) != null) {
99                             cmd.error(e);
100                         }
101                     }
102                 }
103             };
104         reader.setUncaughtExceptionHandler(h);
105         writer.setUncaughtExceptionHandler(h);
106         reader.start();
107         writer.start();
108     }
109     
110     private void checkthread() {
111         if(Thread.currentThread() == reader)
112             throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
113     }
114         
115     public void syncConnect() throws ConnectException, ClosedException, InterruptedException {
116         checkthread();
117         final boolean[] donep = new boolean[] {false};
118         final Exception[] errp = new Exception[] {null};
119         ConnectListener l = new ConnectListener() {
120                 public void connected() {
121                     donep[0] = true;
122                     synchronized(this) {
123                         notifyAll();
124                     }
125                 }
126                 
127                 public void error(Exception cause) {
128                     donep[0] = true;
129                     errp[0] = cause;
130                     synchronized(this) {
131                         notifyAll();
132                     }
133                 }
134             };
135         addConnectListener(l);
136         connect();
137         while(!donep[0]) {
138             synchronized(l) {
139                 l.wait();
140             }
141         }
142         if(errp[0] != null)
143             throw(new ClosedException(errp[0]));
144     }
145
146     public void expectVersion(int reqver) {
147         this.reqver = reqver;
148     }
149     
150     private void checkver(Response resp) throws VersionException {
151         revlo = Integer.parseInt(resp.token(0, 0));
152         revhi = Integer.parseInt(resp.token(0, 1));
153         if((reqver < revlo) || (reqver > revhi))
154             throw(new VersionException(reqver, revlo, revhi));
155     }
156
157     public Exception join() throws InterruptedException {
158         while(reader.isAlive()) {
159             reader.join();
160         }
161         close();
162         return(error);
163     }
164
165     public void addConnectListener(ConnectListener l) {
166         synchronized(connls) {
167             if((state != "idle") && (state != "connecting"))
168                 throw(new IllegalStateException("Already connected"));
169             connls.add(l);
170         }
171     }
172
173     private void qcmd(Command cmd) {
174         synchronized(queue) {
175             queue.offer(cmd);
176             queue.notifyAll();
177         }
178     }
179     
180     static private class StopCondition extends Error {
181         final boolean normal;
182         
183         public StopCondition(Exception cause, boolean normal) {
184             super(cause);
185             this.normal = normal;
186         }
187     }
188     
189     static private class Writer extends Thread {
190         Socket s;
191         Queue<Command> queue, pending;
192         
193         public Writer(Socket s, Queue<Command> queue, Queue<Command> pending) {
194             this.s = s;
195             this.queue = queue;
196             this.pending = pending;
197             setDaemon(true);
198         }
199         
200         private String quote(String t) {
201             if(t.length() == 0)
202                 return("\"\"");
203             StringBuilder sb = new StringBuilder();
204             boolean quote = false;
205             for(int i = 0; i < t.length(); i++) {
206                 char c = t.charAt(i);
207                 if(c == '\"') {
208                     sb.append("\\\"");
209                 } else if(Character.isWhitespace(c)) {
210                     quote = true;
211                     sb.append(c);
212                 } else {
213                     sb.append(c);
214                 }
215             }
216             if(quote)
217                 return("\"" + sb.toString() + "\"");
218             else
219                 return(sb.toString());
220         }
221
222         public void run() {
223             try {
224                 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
225                 while(true) {
226                     Command cmd;
227                     try {
228                         synchronized(pending) {
229                             while(pending.size() > 0)
230                                 pending.wait();
231                         }
232                         synchronized(queue) {
233                             do {
234                                 if((cmd = queue.poll()) != null)
235                                     break;
236                                 queue.wait();
237                             } while(true);
238                         }
239                     } catch(InterruptedException e) {
240                         throw(new StopCondition(e, true));
241                     }
242                     StringBuilder out = new StringBuilder();
243                     for(String s : cmd.tokens) {
244                         if(out.length() > 0)
245                             out.append(' ');
246                         out.append(quote(s));
247                     }
248                     w.write(out.toString());
249                 }
250             } catch(IOException e) {
251                 throw(new StopCondition(e, false));
252             }
253         }
254     }
255
256     static private class Reader extends Thread {
257         Socket s;
258         Queue<Command> pending;
259         
260         public Reader(Socket s, Queue<Command> pending) {
261             this.s = s;
262             this.pending = pending;
263         }
264         
265         private void dispatch(Response resp) throws Exception {
266             if(resp.code < 600) {
267                 synchronized(pending) {
268                     resp.cmd = pending.remove();
269                     pending.notifyAll();
270                 }
271                 resp.cmd.done(resp);
272             }
273         }
274
275         public void run() {
276             try {
277                 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
278                 String state = "start";
279                 StringBuilder ct = new StringBuilder();
280                 int code = -1;
281                 boolean last = true;
282                 List<List<String>> lines = new LinkedList<List<String>>();
283                 List<String> tokens = new LinkedList<String>();
284                 while(true) {
285                     char c;
286                     {
287                         int i;
288                         try {
289                             if((i = r.read()) < 0) {
290                                 throw(new IOException("The server closed the connection"));
291                             }
292                         } catch(java.nio.channels.ClosedByInterruptException e) {
293                             throw(new StopCondition(e, true));
294                         }
295                         c = (char)i;
296                     }
297                     eat: do {
298                         if(state == "start") {
299                             if(c == '\r') {
300                                 state = "nl";
301                             } else if(Character.isWhitespace(c)) {
302                             } else {
303                                 if(code == -1)
304                                     state = "code";
305                                 else
306                                     state = "token";
307                                 continue eat;
308                             }
309                         } else if(state == "nl") {
310                             if(c == '\n') {
311                                 if((code < 100) || (code >= 1000)) {
312                                     throw(new IOException("Illegal response code " + code + " from the server"));
313                                 }
314                                 lines.add(tokens);
315                                 tokens = new LinkedList<String>();
316                                 if(last) {
317                                     dispatch(new Response(code, lines));
318                                     lines = new LinkedList<List<String>>();
319                                 }
320                                 code = -1;
321                                 state = "start";
322                             } else {
323                                 state = "start";
324                                 continue eat;
325                             }
326                         } else if(state == "code") {
327                             if((c == '-') || Character.isWhitespace(c)) {
328                                 last = c != '-';
329                                 code = Integer.parseInt(ct.toString());
330                                 ct.setLength(0);
331                                 state = "start";
332                                 continue eat;
333                             } else {
334                                 ct.append(c);
335                             }
336                         } else if(state == "token") {
337                             if(Character.isWhitespace(c)) {
338                                 tokens.add(ct.toString());
339                                 ct.setLength(0);
340                                 state = "start";
341                                 continue eat;
342                             } else if(c == '\\') {
343                                 state = "bs";
344                             } else if(c == '"') {
345                                 state = "cited";
346                             } else {
347                                 ct.append(c);
348                             }
349                         } else if(state == "bs") {
350                             ct.append(c);
351                             state = "token";
352                         } else if(state == "cited") {
353                             if(c == '\\')
354                                 state = "cbs";
355                             else if(c == '"')
356                                 state = "token";
357                             else
358                                 ct.append(c);
359                         } else if(state == "cbs") {
360                             ct.append(c);
361                             state = "cited";
362                         } else {
363                             throw(new Error("invalid state " + state));
364                         }
365                         break;
366                     } while(true);
367                 }
368             } catch(Exception e) {
369                 throw(new StopCondition(e, false));
370             }
371         }
372     }
373
374     public void close() {
375         try {
376             s.close();
377         } catch(IOException e) {
378         }
379         reader.interrupt();
380         writer.interrupt();
381     }
382
383     protected void finalize() {
384         close();
385     }
386 }