Java: Began work on hub listeners.
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
CommitLineData
1335284e 1package dolda.dolcon.protocol;
1b37400b
FT
2
3import java.io.*;
4import java.net.Socket;
5import java.util.*;
6
7public class Connection {
e78d9ca3
FT
8 private Socket s;
9 private Reader reader;
10 private Writer writer;
11 private Queue<Command> queue = new LinkedList<Command>();
12 private Queue<Command> pending = new LinkedList<Command>();
13 private int reqver = 2, revlo, revhi;
14 private String aspec;
15 private String state;
16 private Set<ConnectListener> connls = new HashSet<ConnectListener>();
1505a392 17 private Set<NotifyListener> notls = new HashSet<NotifyListener>();
e78d9ca3 18 private Exception error;
1b37400b 19
e78d9ca3
FT
20 public interface ConnectListener {
21 public void connected() throws Exception;
22 public void error(Exception cause);
23 }
1505a392 24
e78d9ca3
FT
25 public Connection(String aspec) {
26 this.aspec = aspec;
27 state = "idle";
28 }
29
30 public void connect() throws ConnectException {
4b987871
FT
31 synchronized(this) {
32 if(state != "idle")
33 throw(new IllegalStateException("Already connected"));
34 state = "connecting";
35 }
1b37400b
FT
36 try {
37 s = new Socket(aspec, 1500);
38 } catch(java.net.UnknownHostException e) {
39 throw(new ConnectException("Could not resolve host " + aspec, e));
40 } catch(IOException e) {
41 throw(new ConnectException("Could not connect to host " + aspec, e));
42 }
e78d9ca3
FT
43 pending = new LinkedList<Command>();
44 Command ccmd = new Command(".connect");
6bc193f2 45 ccmd.new Listener() {
e78d9ca3
FT
46 public void done(Response resp) throws Exception {
47 try {
48 checkver(resp);
49 } catch(VersionException e) {
50 error(e);
51 throw(e);
52 }
4b987871 53 synchronized(Connection.this) {
e78d9ca3 54 state = "connected";
4b987871
FT
55 }
56 synchronized(connls) {
e78d9ca3
FT
57 try {
58 for(ConnectListener l : connls)
59 l.connected();
60 } finally {
61 connls.clear();
62 }
63 }
64 }
65
66 public void error(Exception cause) {
67 synchronized(connls) {
68 try {
69 for(ConnectListener l : connls)
70 l.error(cause);
71 } finally {
72 connls.clear();
73 }
74 }
75 }
6bc193f2 76 };
e78d9ca3 77 pending.offer(ccmd);
4b987871
FT
78 reader = new Reader();
79 writer = new Writer();
1b37400b 80 reader.start();
e78d9ca3 81 writer.start();
1b37400b
FT
82 }
83
4b987871
FT
84 private void error(Throwable c) {
85 boolean n = false;
86 if(c instanceof StopCondition) {
87 StopCondition s = (StopCondition)c;
88 n = s.normal;
89 c = s.getCause();
90 }
91 Exception e;
92 if(c instanceof Exception)
93 e = (Exception)c;
94 else
95 e = new Exception(c);
96 if(!n) {
97 close();
98 error = e;
99 }
100 synchronized(queue) {
101 Command cmd;
102 while((cmd = pending.poll()) != null) {
103 cmd.error(e);
104 }
105 while((cmd = queue.poll()) != null) {
106 cmd.error(e);
107 }
108 }
109 }
110
e78d9ca3
FT
111 private void checkthread() {
112 if(Thread.currentThread() == reader)
113 throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
114 }
115
6bc193f2 116 public void syncConnect() throws ConnectException, InterruptedException {
e78d9ca3
FT
117 checkthread();
118 final boolean[] donep = new boolean[] {false};
119 final Exception[] errp = new Exception[] {null};
120 ConnectListener l = new ConnectListener() {
121 public void connected() {
122 donep[0] = true;
123 synchronized(this) {
124 notifyAll();
125 }
126 }
127
128 public void error(Exception cause) {
129 donep[0] = true;
130 errp[0] = cause;
131 synchronized(this) {
132 notifyAll();
133 }
134 }
135 };
136 addConnectListener(l);
137 connect();
138 while(!donep[0]) {
139 synchronized(l) {
140 l.wait();
141 }
142 }
143 if(errp[0] != null)
6bc193f2 144 throw(new ConnectException("DC connection has been closed", errp[0]));
e78d9ca3
FT
145 }
146
147 public void expectVersion(int reqver) {
148 this.reqver = reqver;
149 }
150
151 private void checkver(Response resp) throws VersionException {
152 revlo = Integer.parseInt(resp.token(0, 0));
153 revhi = Integer.parseInt(resp.token(0, 1));
154 if((reqver < revlo) || (reqver > revhi))
155 throw(new VersionException(reqver, revlo, revhi));
156 }
157
158 public Exception join() throws InterruptedException {
159 while(reader.isAlive()) {
160 reader.join();
161 }
162 close();
163 return(error);
164 }
165
1505a392
FT
166 public void addNotifyListener(NotifyListener l) {
167 synchronized(notls) {
168 notls.add(l);
169 }
170 }
171
172 public void removeNotifyListener(NotifyListener l) {
173 synchronized(notls) {
174 notls.remove(l);
175 }
176 }
177
4b987871
FT
178 public synchronized void addConnectListener(ConnectListener l) {
179 if((state != "idle") && (state != "connecting"))
180 throw(new IllegalStateException("Already connected"));
e78d9ca3 181 synchronized(connls) {
e78d9ca3
FT
182 connls.add(l);
183 }
184 }
185
e90ca845 186 public void qcmd(Command... cmds) {
e78d9ca3 187 synchronized(queue) {
e90ca845
FT
188 for(Command cmd : cmds)
189 queue.offer(cmd);
e78d9ca3
FT
190 queue.notifyAll();
191 }
192 }
193
6bc193f2
FT
194 public void qcmd(String... tokens) {
195 qcmd(new Command(tokens));
196 }
197
198 public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
199 checkthread();
200 final boolean[] donep = new boolean[] {false};
201 final Response[] resp = new Response[] {null};
202 final Exception[] errp = new Exception[] {null};
203 Object l = cmd.new Listener() {
204 public synchronized void done(Response rsp) {
205 resp[0] = rsp;
206 donep[0] = true;
207 notifyAll();
208 }
209
210 public synchronized void error(Exception e) {
211 errp[0] = e;
212 donep[0] = true;
213 notifyAll();
214 }
215 };
59b214d6 216 qcmd(cmd);
6bc193f2
FT
217 synchronized(l) {
218 while(!donep[0]) {
219 l.wait();
220 }
221 }
222 if(errp[0] != null)
223 throw(new ClosedException(errp[0]));
224 return(resp[0]);
225 }
226
227 public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
228 return(ecmd(new Command(tokens)));
229 }
230
e78d9ca3
FT
231 static private class StopCondition extends Error {
232 final boolean normal;
233
234 public StopCondition(Exception cause, boolean normal) {
235 super(cause);
236 this.normal = normal;
237 }
238 }
239
4b987871
FT
240 private class Writer extends Thread {
241 public Writer() {
1b37400b
FT
242 setDaemon(true);
243 }
244
e78d9ca3
FT
245 private String quote(String t) {
246 if(t.length() == 0)
247 return("\"\"");
248 StringBuilder sb = new StringBuilder();
249 boolean quote = false;
250 for(int i = 0; i < t.length(); i++) {
251 char c = t.charAt(i);
252 if(c == '\"') {
253 sb.append("\\\"");
254 } else if(Character.isWhitespace(c)) {
255 quote = true;
256 sb.append(c);
257 } else {
258 sb.append(c);
1b37400b 259 }
1b37400b 260 }
e78d9ca3
FT
261 if(quote)
262 return("\"" + sb.toString() + "\"");
263 else
264 return(sb.toString());
265 }
266
4b987871 267 private void guarded() {
e78d9ca3
FT
268 try {
269 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
270 while(true) {
271 Command cmd;
1b37400b 272 try {
e78d9ca3 273 synchronized(queue) {
4b987871
FT
274 while(pending.size() > 0)
275 queue.wait();
276 while((cmd = queue.poll()) == null)
e78d9ca3 277 queue.wait();
4b987871 278 pending.offer(cmd);
1b37400b 279 }
e78d9ca3
FT
280 } catch(InterruptedException e) {
281 throw(new StopCondition(e, true));
1b37400b 282 }
e78d9ca3
FT
283 StringBuilder out = new StringBuilder();
284 for(String s : cmd.tokens) {
285 if(out.length() > 0)
286 out.append(' ');
287 out.append(quote(s));
288 }
59b214d6 289 out.append("\r\n");
e78d9ca3 290 w.write(out.toString());
59b214d6 291 w.flush();
1b37400b 292 }
e78d9ca3
FT
293 } catch(IOException e) {
294 throw(new StopCondition(e, false));
295 }
296 }
e78d9ca3 297
4b987871
FT
298 public void run() {
299 try {
300 guarded();
301 } catch(Throwable t) {
302 error(t);
303 }
e78d9ca3 304 }
4b987871
FT
305 }
306
307 private class Reader extends Thread {
e78d9ca3
FT
308 private void dispatch(Response resp) throws Exception {
309 if(resp.code < 600) {
4b987871
FT
310 synchronized(queue) {
311 try {
312 resp.cmd = pending.remove();
313 } catch(NoSuchElementException e) {
314 throw(new RuntimeException("DC server sent reply without a pending command"));
315 }
316 queue.notifyAll();
e78d9ca3
FT
317 }
318 resp.cmd.done(resp);
1505a392
FT
319 } else {
320 synchronized(notls) {
321 for(NotifyListener l : notls) {
322 l.notified(resp);
323 }
324 }
e78d9ca3
FT
325 }
326 }
327
4b987871 328 private void guarded() {
e78d9ca3
FT
329 try {
330 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
331 String state = "start";
332 StringBuilder ct = new StringBuilder();
333 int code = -1;
334 boolean last = true;
335 List<List<String>> lines = new LinkedList<List<String>>();
336 List<String> tokens = new LinkedList<String>();
337 while(true) {
338 char c;
339 {
340 int i;
341 try {
342 if((i = r.read()) < 0) {
343 throw(new IOException("The server closed the connection"));
344 }
345 } catch(java.nio.channels.ClosedByInterruptException e) {
346 throw(new StopCondition(e, true));
1b37400b 347 }
e78d9ca3
FT
348 c = (char)i;
349 }
350 eat: do {
351 if(state == "start") {
352 if(c == '\r') {
353 state = "nl";
354 } else if(Character.isWhitespace(c)) {
355 } else {
356 if(code == -1)
357 state = "code";
358 else
359 state = "token";
360 continue eat;
1b37400b 361 }
e78d9ca3
FT
362 } else if(state == "nl") {
363 if(c == '\n') {
364 if((code < 100) || (code >= 1000)) {
365 throw(new IOException("Illegal response code " + code + " from the server"));
1b37400b 366 }
e78d9ca3
FT
367 lines.add(tokens);
368 tokens = new LinkedList<String>();
369 if(last) {
370 dispatch(new Response(code, lines));
371 lines = new LinkedList<List<String>>();
372 }
373 code = -1;
374 state = "start";
375 } else {
376 state = "start";
377 continue eat;
1b37400b 378 }
e78d9ca3
FT
379 } else if(state == "code") {
380 if((c == '-') || Character.isWhitespace(c)) {
381 last = c != '-';
382 code = Integer.parseInt(ct.toString());
383 ct.setLength(0);
384 state = "start";
e78d9ca3
FT
385 } else {
386 ct.append(c);
387 }
388 } else if(state == "token") {
389 if(Character.isWhitespace(c)) {
390 tokens.add(ct.toString());
391 ct.setLength(0);
392 state = "start";
393 continue eat;
394 } else if(c == '\\') {
395 state = "bs";
396 } else if(c == '"') {
397 state = "cited";
398 } else {
399 ct.append(c);
400 }
401 } else if(state == "bs") {
402 ct.append(c);
403 state = "token";
404 } else if(state == "cited") {
405 if(c == '\\')
406 state = "cbs";
407 else if(c == '"')
408 state = "token";
409 else
410 ct.append(c);
411 } else if(state == "cbs") {
1b37400b 412 ct.append(c);
1b37400b
FT
413 state = "cited";
414 } else {
e78d9ca3 415 throw(new Error("invalid state " + state));
1b37400b 416 }
e78d9ca3
FT
417 break;
418 } while(true);
419 }
420 } catch(Exception e) {
421 throw(new StopCondition(e, false));
1b37400b
FT
422 }
423 }
4b987871
FT
424
425 public void run() {
426 try {
427 guarded();
428 } catch(Throwable t) {
429 error(t);
430 }
431 }
1b37400b
FT
432 }
433
e78d9ca3 434 public void close() {
1b37400b
FT
435 try {
436 s.close();
4b987871 437 } catch(IOException e) {}
1b37400b 438 reader.interrupt();
e78d9ca3 439 writer.interrupt();
1b37400b 440 }
1b37400b 441}