Reworked the Java connection handler a bit.
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
CommitLineData
1335284e 1package dolda.dolcon.protocol;
1b37400b
FT
2
3import java.io.*;
4import java.net.Socket;
5import java.util.*;
6
7public class Connection {
e78d9ca3
FT
8 private Socket s;
9 private Reader reader;
10 private Writer writer;
11 private Queue<Command> queue = new LinkedList<Command>();
12 private Queue<Command> pending = new LinkedList<Command>();
13 private int reqver = 2, revlo, revhi;
14 private String aspec;
15 private String state;
16 private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17 private Exception error;
1b37400b 18
e78d9ca3
FT
19 public interface ConnectListener {
20 public void connected() throws Exception;
21 public void error(Exception cause);
22 }
23
24 public Connection(String aspec) {
25 this.aspec = aspec;
26 state = "idle";
27 }
28
29 public void connect() throws ConnectException {
4b987871
FT
30 synchronized(this) {
31 if(state != "idle")
32 throw(new IllegalStateException("Already connected"));
33 state = "connecting";
34 }
1b37400b
FT
35 try {
36 s = new Socket(aspec, 1500);
37 } catch(java.net.UnknownHostException e) {
38 throw(new ConnectException("Could not resolve host " + aspec, e));
39 } catch(IOException e) {
40 throw(new ConnectException("Could not connect to host " + aspec, e));
41 }
e78d9ca3
FT
42 pending = new LinkedList<Command>();
43 Command ccmd = new Command(".connect");
44 ccmd.addListener(new Command.Listener() {
45 public void done(Response resp) throws Exception {
46 try {
47 checkver(resp);
48 } catch(VersionException e) {
49 error(e);
50 throw(e);
51 }
4b987871 52 synchronized(Connection.this) {
e78d9ca3 53 state = "connected";
4b987871
FT
54 }
55 synchronized(connls) {
e78d9ca3
FT
56 try {
57 for(ConnectListener l : connls)
58 l.connected();
59 } finally {
60 connls.clear();
61 }
62 }
63 }
64
65 public void error(Exception cause) {
66 synchronized(connls) {
67 try {
68 for(ConnectListener l : connls)
69 l.error(cause);
70 } finally {
71 connls.clear();
72 }
73 }
74 }
75 });
76 pending.offer(ccmd);
4b987871
FT
77 reader = new Reader();
78 writer = new Writer();
1b37400b 79 reader.start();
e78d9ca3 80 writer.start();
1b37400b
FT
81 }
82
4b987871
FT
83 private void error(Throwable c) {
84 boolean n = false;
85 if(c instanceof StopCondition) {
86 StopCondition s = (StopCondition)c;
87 n = s.normal;
88 c = s.getCause();
89 }
90 Exception e;
91 if(c instanceof Exception)
92 e = (Exception)c;
93 else
94 e = new Exception(c);
95 if(!n) {
96 close();
97 error = e;
98 }
99 synchronized(queue) {
100 Command cmd;
101 while((cmd = pending.poll()) != null) {
102 cmd.error(e);
103 }
104 while((cmd = queue.poll()) != null) {
105 cmd.error(e);
106 }
107 }
108 }
109
e78d9ca3
FT
110 private void checkthread() {
111 if(Thread.currentThread() == reader)
112 throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
113 }
114
115 public void syncConnect() throws ConnectException, ClosedException, InterruptedException {
116 checkthread();
117 final boolean[] donep = new boolean[] {false};
118 final Exception[] errp = new Exception[] {null};
119 ConnectListener l = new ConnectListener() {
120 public void connected() {
121 donep[0] = true;
122 synchronized(this) {
123 notifyAll();
124 }
125 }
126
127 public void error(Exception cause) {
128 donep[0] = true;
129 errp[0] = cause;
130 synchronized(this) {
131 notifyAll();
132 }
133 }
134 };
135 addConnectListener(l);
136 connect();
137 while(!donep[0]) {
138 synchronized(l) {
139 l.wait();
140 }
141 }
142 if(errp[0] != null)
143 throw(new ClosedException(errp[0]));
144 }
145
146 public void expectVersion(int reqver) {
147 this.reqver = reqver;
148 }
149
150 private void checkver(Response resp) throws VersionException {
151 revlo = Integer.parseInt(resp.token(0, 0));
152 revhi = Integer.parseInt(resp.token(0, 1));
153 if((reqver < revlo) || (reqver > revhi))
154 throw(new VersionException(reqver, revlo, revhi));
155 }
156
157 public Exception join() throws InterruptedException {
158 while(reader.isAlive()) {
159 reader.join();
160 }
161 close();
162 return(error);
163 }
164
4b987871
FT
165 public synchronized void addConnectListener(ConnectListener l) {
166 if((state != "idle") && (state != "connecting"))
167 throw(new IllegalStateException("Already connected"));
e78d9ca3 168 synchronized(connls) {
e78d9ca3
FT
169 connls.add(l);
170 }
171 }
172
173 private void qcmd(Command cmd) {
174 synchronized(queue) {
175 queue.offer(cmd);
176 queue.notifyAll();
177 }
178 }
179
180 static private class StopCondition extends Error {
181 final boolean normal;
182
183 public StopCondition(Exception cause, boolean normal) {
184 super(cause);
185 this.normal = normal;
186 }
187 }
188
4b987871
FT
189 private class Writer extends Thread {
190 public Writer() {
1b37400b
FT
191 setDaemon(true);
192 }
193
e78d9ca3
FT
194 private String quote(String t) {
195 if(t.length() == 0)
196 return("\"\"");
197 StringBuilder sb = new StringBuilder();
198 boolean quote = false;
199 for(int i = 0; i < t.length(); i++) {
200 char c = t.charAt(i);
201 if(c == '\"') {
202 sb.append("\\\"");
203 } else if(Character.isWhitespace(c)) {
204 quote = true;
205 sb.append(c);
206 } else {
207 sb.append(c);
1b37400b 208 }
1b37400b 209 }
e78d9ca3
FT
210 if(quote)
211 return("\"" + sb.toString() + "\"");
212 else
213 return(sb.toString());
214 }
215
4b987871 216 private void guarded() {
e78d9ca3
FT
217 try {
218 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
219 while(true) {
220 Command cmd;
1b37400b 221 try {
e78d9ca3 222 synchronized(queue) {
4b987871
FT
223 while(pending.size() > 0)
224 queue.wait();
225 while((cmd = queue.poll()) == null)
e78d9ca3 226 queue.wait();
4b987871 227 pending.offer(cmd);
1b37400b 228 }
e78d9ca3
FT
229 } catch(InterruptedException e) {
230 throw(new StopCondition(e, true));
1b37400b 231 }
e78d9ca3
FT
232 StringBuilder out = new StringBuilder();
233 for(String s : cmd.tokens) {
234 if(out.length() > 0)
235 out.append(' ');
236 out.append(quote(s));
237 }
238 w.write(out.toString());
1b37400b 239 }
e78d9ca3
FT
240 } catch(IOException e) {
241 throw(new StopCondition(e, false));
242 }
243 }
e78d9ca3 244
4b987871
FT
245 public void run() {
246 try {
247 guarded();
248 } catch(Throwable t) {
249 error(t);
250 }
e78d9ca3 251 }
4b987871
FT
252 }
253
254 private class Reader extends Thread {
e78d9ca3
FT
255 private void dispatch(Response resp) throws Exception {
256 if(resp.code < 600) {
4b987871
FT
257 synchronized(queue) {
258 try {
259 resp.cmd = pending.remove();
260 } catch(NoSuchElementException e) {
261 throw(new RuntimeException("DC server sent reply without a pending command"));
262 }
263 queue.notifyAll();
e78d9ca3
FT
264 }
265 resp.cmd.done(resp);
266 }
267 }
268
4b987871 269 private void guarded() {
e78d9ca3
FT
270 try {
271 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
272 String state = "start";
273 StringBuilder ct = new StringBuilder();
274 int code = -1;
275 boolean last = true;
276 List<List<String>> lines = new LinkedList<List<String>>();
277 List<String> tokens = new LinkedList<String>();
278 while(true) {
279 char c;
280 {
281 int i;
282 try {
283 if((i = r.read()) < 0) {
284 throw(new IOException("The server closed the connection"));
285 }
286 } catch(java.nio.channels.ClosedByInterruptException e) {
287 throw(new StopCondition(e, true));
1b37400b 288 }
e78d9ca3
FT
289 c = (char)i;
290 }
291 eat: do {
292 if(state == "start") {
293 if(c == '\r') {
294 state = "nl";
295 } else if(Character.isWhitespace(c)) {
296 } else {
297 if(code == -1)
298 state = "code";
299 else
300 state = "token";
301 continue eat;
1b37400b 302 }
e78d9ca3
FT
303 } else if(state == "nl") {
304 if(c == '\n') {
305 if((code < 100) || (code >= 1000)) {
306 throw(new IOException("Illegal response code " + code + " from the server"));
1b37400b 307 }
e78d9ca3
FT
308 lines.add(tokens);
309 tokens = new LinkedList<String>();
310 if(last) {
311 dispatch(new Response(code, lines));
312 lines = new LinkedList<List<String>>();
313 }
314 code = -1;
315 state = "start";
316 } else {
317 state = "start";
318 continue eat;
1b37400b 319 }
e78d9ca3
FT
320 } else if(state == "code") {
321 if((c == '-') || Character.isWhitespace(c)) {
322 last = c != '-';
323 code = Integer.parseInt(ct.toString());
324 ct.setLength(0);
325 state = "start";
326 continue eat;
327 } else {
328 ct.append(c);
329 }
330 } else if(state == "token") {
331 if(Character.isWhitespace(c)) {
332 tokens.add(ct.toString());
333 ct.setLength(0);
334 state = "start";
335 continue eat;
336 } else if(c == '\\') {
337 state = "bs";
338 } else if(c == '"') {
339 state = "cited";
340 } else {
341 ct.append(c);
342 }
343 } else if(state == "bs") {
344 ct.append(c);
345 state = "token";
346 } else if(state == "cited") {
347 if(c == '\\')
348 state = "cbs";
349 else if(c == '"')
350 state = "token";
351 else
352 ct.append(c);
353 } else if(state == "cbs") {
1b37400b 354 ct.append(c);
1b37400b
FT
355 state = "cited";
356 } else {
e78d9ca3 357 throw(new Error("invalid state " + state));
1b37400b 358 }
e78d9ca3
FT
359 break;
360 } while(true);
361 }
362 } catch(Exception e) {
363 throw(new StopCondition(e, false));
1b37400b
FT
364 }
365 }
4b987871
FT
366
367 public void run() {
368 try {
369 guarded();
370 } catch(Throwable t) {
371 error(t);
372 }
373 }
1b37400b
FT
374 }
375
e78d9ca3 376 public void close() {
1b37400b
FT
377 try {
378 s.close();
4b987871 379 } catch(IOException e) {}
1b37400b 380 reader.interrupt();
e78d9ca3 381 writer.interrupt();
1b37400b 382 }
1b37400b 383}