More Java work.
[doldaconnect.git] / lib / java / dolda / dolcon / Connection.java
CommitLineData
1b37400b
FT
1package dolda.dolcon;
2
3import java.io.*;
4import java.net.Socket;
5import java.util.*;
6
7public class Connection {
e78d9ca3
FT
8 private Socket s;
9 private Reader reader;
10 private Writer writer;
11 private Queue<Command> queue = new LinkedList<Command>();
12 private Queue<Command> pending = new LinkedList<Command>();
13 private int reqver = 2, revlo, revhi;
14 private String aspec;
15 private String state;
16 private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17 private Exception error;
1b37400b 18
e78d9ca3
FT
19 public interface ConnectListener {
20 public void connected() throws Exception;
21 public void error(Exception cause);
22 }
23
24 public Connection(String aspec) {
25 this.aspec = aspec;
26 state = "idle";
27 }
28
29 public void connect() throws ConnectException {
30 state = "connecting";
1b37400b
FT
31 try {
32 s = new Socket(aspec, 1500);
33 } catch(java.net.UnknownHostException e) {
34 throw(new ConnectException("Could not resolve host " + aspec, e));
35 } catch(IOException e) {
36 throw(new ConnectException("Could not connect to host " + aspec, e));
37 }
e78d9ca3
FT
38 pending = new LinkedList<Command>();
39 Command ccmd = new Command(".connect");
40 ccmd.addListener(new Command.Listener() {
41 public void done(Response resp) throws Exception {
42 try {
43 checkver(resp);
44 } catch(VersionException e) {
45 error(e);
46 throw(e);
47 }
48 synchronized(connls) {
49 state = "connected";
50 try {
51 for(ConnectListener l : connls)
52 l.connected();
53 } finally {
54 connls.clear();
55 }
56 }
57 }
58
59 public void error(Exception cause) {
60 synchronized(connls) {
61 try {
62 for(ConnectListener l : connls)
63 l.error(cause);
64 } finally {
65 connls.clear();
66 }
67 }
68 }
69 });
70 pending.offer(ccmd);
71 reader = new Reader(s, pending);
72 writer = new Writer(s, queue, pending);
73 Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
74 public void uncaughtException(Thread t, Throwable c) {
75 boolean n = false;
76 if(c instanceof StopCondition) {
77 StopCondition s = (StopCondition)c;
78 n = s.normal;
79 c = s.getCause();
80 }
81 Exception e;
82 if(c instanceof Exception)
83 e = (Exception)c;
84 else
85 e = new Exception(c);
86 if(!n) {
87 close();
88 error = e;
89 }
90 synchronized(pending) {
91 Command cmd;
92 while((cmd = pending.poll()) != null) {
93 cmd.error(e);
94 }
95 }
96 synchronized(queue) {
97 Command cmd;
98 while((cmd = queue.poll()) != null) {
99 cmd.error(e);
100 }
101 }
102 }
103 };
104 reader.setUncaughtExceptionHandler(h);
105 writer.setUncaughtExceptionHandler(h);
1b37400b 106 reader.start();
e78d9ca3 107 writer.start();
1b37400b
FT
108 }
109
e78d9ca3
FT
110 private void checkthread() {
111 if(Thread.currentThread() == reader)
112 throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
113 }
114
115 public void syncConnect() throws ConnectException, ClosedException, InterruptedException {
116 checkthread();
117 final boolean[] donep = new boolean[] {false};
118 final Exception[] errp = new Exception[] {null};
119 ConnectListener l = new ConnectListener() {
120 public void connected() {
121 donep[0] = true;
122 synchronized(this) {
123 notifyAll();
124 }
125 }
126
127 public void error(Exception cause) {
128 donep[0] = true;
129 errp[0] = cause;
130 synchronized(this) {
131 notifyAll();
132 }
133 }
134 };
135 addConnectListener(l);
136 connect();
137 while(!donep[0]) {
138 synchronized(l) {
139 l.wait();
140 }
141 }
142 if(errp[0] != null)
143 throw(new ClosedException(errp[0]));
144 }
145
146 public void expectVersion(int reqver) {
147 this.reqver = reqver;
148 }
149
150 private void checkver(Response resp) throws VersionException {
151 revlo = Integer.parseInt(resp.token(0, 0));
152 revhi = Integer.parseInt(resp.token(0, 1));
153 if((reqver < revlo) || (reqver > revhi))
154 throw(new VersionException(reqver, revlo, revhi));
155 }
156
157 public Exception join() throws InterruptedException {
158 while(reader.isAlive()) {
159 reader.join();
160 }
161 close();
162 return(error);
163 }
164
165 public void addConnectListener(ConnectListener l) {
166 synchronized(connls) {
167 if((state != "idle") && (state != "connecting"))
168 throw(new IllegalStateException("Already connected"));
169 connls.add(l);
170 }
171 }
172
173 private void qcmd(Command cmd) {
174 synchronized(queue) {
175 queue.offer(cmd);
176 queue.notifyAll();
177 }
178 }
179
180 static private class StopCondition extends Error {
181 final boolean normal;
182
183 public StopCondition(Exception cause, boolean normal) {
184 super(cause);
185 this.normal = normal;
186 }
187 }
188
189 static private class Writer extends Thread {
1b37400b 190 Socket s;
e78d9ca3 191 Queue<Command> queue, pending;
1b37400b 192
e78d9ca3 193 public Writer(Socket s, Queue<Command> queue, Queue<Command> pending) {
1b37400b 194 this.s = s;
e78d9ca3
FT
195 this.queue = queue;
196 this.pending = pending;
1b37400b
FT
197 setDaemon(true);
198 }
199
e78d9ca3
FT
200 private String quote(String t) {
201 if(t.length() == 0)
202 return("\"\"");
203 StringBuilder sb = new StringBuilder();
204 boolean quote = false;
205 for(int i = 0; i < t.length(); i++) {
206 char c = t.charAt(i);
207 if(c == '\"') {
208 sb.append("\\\"");
209 } else if(Character.isWhitespace(c)) {
210 quote = true;
211 sb.append(c);
212 } else {
213 sb.append(c);
1b37400b 214 }
1b37400b 215 }
e78d9ca3
FT
216 if(quote)
217 return("\"" + sb.toString() + "\"");
218 else
219 return(sb.toString());
220 }
221
222 public void run() {
223 try {
224 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
225 while(true) {
226 Command cmd;
1b37400b 227 try {
e78d9ca3
FT
228 synchronized(pending) {
229 while(pending.size() > 0)
230 pending.wait();
1b37400b 231 }
e78d9ca3
FT
232 synchronized(queue) {
233 do {
234 if((cmd = queue.poll()) != null)
235 break;
236 queue.wait();
237 } while(true);
1b37400b 238 }
e78d9ca3
FT
239 } catch(InterruptedException e) {
240 throw(new StopCondition(e, true));
1b37400b 241 }
e78d9ca3
FT
242 StringBuilder out = new StringBuilder();
243 for(String s : cmd.tokens) {
244 if(out.length() > 0)
245 out.append(' ');
246 out.append(quote(s));
247 }
248 w.write(out.toString());
1b37400b 249 }
e78d9ca3
FT
250 } catch(IOException e) {
251 throw(new StopCondition(e, false));
252 }
253 }
254 }
255
256 static private class Reader extends Thread {
257 Socket s;
258 Queue<Command> pending;
259
260 public Reader(Socket s, Queue<Command> pending) {
261 this.s = s;
262 this.pending = pending;
263 }
264
265 private void dispatch(Response resp) throws Exception {
266 if(resp.code < 600) {
267 synchronized(pending) {
268 resp.cmd = pending.remove();
269 pending.notifyAll();
270 }
271 resp.cmd.done(resp);
272 }
273 }
274
275 public void run() {
276 try {
277 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
278 String state = "start";
279 StringBuilder ct = new StringBuilder();
280 int code = -1;
281 boolean last = true;
282 List<List<String>> lines = new LinkedList<List<String>>();
283 List<String> tokens = new LinkedList<String>();
284 while(true) {
285 char c;
286 {
287 int i;
288 try {
289 if((i = r.read()) < 0) {
290 throw(new IOException("The server closed the connection"));
291 }
292 } catch(java.nio.channels.ClosedByInterruptException e) {
293 throw(new StopCondition(e, true));
1b37400b 294 }
e78d9ca3
FT
295 c = (char)i;
296 }
297 eat: do {
298 if(state == "start") {
299 if(c == '\r') {
300 state = "nl";
301 } else if(Character.isWhitespace(c)) {
302 } else {
303 if(code == -1)
304 state = "code";
305 else
306 state = "token";
307 continue eat;
1b37400b 308 }
e78d9ca3
FT
309 } else if(state == "nl") {
310 if(c == '\n') {
311 if((code < 100) || (code >= 1000)) {
312 throw(new IOException("Illegal response code " + code + " from the server"));
1b37400b 313 }
e78d9ca3
FT
314 lines.add(tokens);
315 tokens = new LinkedList<String>();
316 if(last) {
317 dispatch(new Response(code, lines));
318 lines = new LinkedList<List<String>>();
319 }
320 code = -1;
321 state = "start";
322 } else {
323 state = "start";
324 continue eat;
1b37400b 325 }
e78d9ca3
FT
326 } else if(state == "code") {
327 if((c == '-') || Character.isWhitespace(c)) {
328 last = c != '-';
329 code = Integer.parseInt(ct.toString());
330 ct.setLength(0);
331 state = "start";
332 continue eat;
333 } else {
334 ct.append(c);
335 }
336 } else if(state == "token") {
337 if(Character.isWhitespace(c)) {
338 tokens.add(ct.toString());
339 ct.setLength(0);
340 state = "start";
341 continue eat;
342 } else if(c == '\\') {
343 state = "bs";
344 } else if(c == '"') {
345 state = "cited";
346 } else {
347 ct.append(c);
348 }
349 } else if(state == "bs") {
350 ct.append(c);
351 state = "token";
352 } else if(state == "cited") {
353 if(c == '\\')
354 state = "cbs";
355 else if(c == '"')
356 state = "token";
357 else
358 ct.append(c);
359 } else if(state == "cbs") {
1b37400b 360 ct.append(c);
1b37400b
FT
361 state = "cited";
362 } else {
e78d9ca3 363 throw(new Error("invalid state " + state));
1b37400b 364 }
e78d9ca3
FT
365 break;
366 } while(true);
367 }
368 } catch(Exception e) {
369 throw(new StopCondition(e, false));
1b37400b
FT
370 }
371 }
372 }
373
e78d9ca3 374 public void close() {
1b37400b
FT
375 try {
376 s.close();
377 } catch(IOException e) {
378 }
379 reader.interrupt();
e78d9ca3 380 writer.interrupt();
1b37400b
FT
381 }
382
e78d9ca3
FT
383 protected void finalize() {
384 close();
1b37400b
FT
385 }
386}