Java: Added NotifyListeners.
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
CommitLineData
1335284e 1package dolda.dolcon.protocol;
1b37400b
FT
2
3import java.io.*;
4import java.net.Socket;
5import java.util.*;
6
7public class Connection {
e78d9ca3
FT
8 private Socket s;
9 private Reader reader;
10 private Writer writer;
11 private Queue<Command> queue = new LinkedList<Command>();
12 private Queue<Command> pending = new LinkedList<Command>();
13 private int reqver = 2, revlo, revhi;
14 private String aspec;
15 private String state;
16 private Set<ConnectListener> connls = new HashSet<ConnectListener>();
1505a392 17 private Set<NotifyListener> notls = new HashSet<NotifyListener>();
e78d9ca3 18 private Exception error;
1b37400b 19
e78d9ca3
FT
20 public interface ConnectListener {
21 public void connected() throws Exception;
22 public void error(Exception cause);
23 }
1505a392 24
e78d9ca3
FT
25 public Connection(String aspec) {
26 this.aspec = aspec;
27 state = "idle";
28 }
29
30 public void connect() throws ConnectException {
4b987871
FT
31 synchronized(this) {
32 if(state != "idle")
33 throw(new IllegalStateException("Already connected"));
34 state = "connecting";
35 }
1b37400b
FT
36 try {
37 s = new Socket(aspec, 1500);
38 } catch(java.net.UnknownHostException e) {
39 throw(new ConnectException("Could not resolve host " + aspec, e));
40 } catch(IOException e) {
41 throw(new ConnectException("Could not connect to host " + aspec, e));
42 }
e78d9ca3
FT
43 pending = new LinkedList<Command>();
44 Command ccmd = new Command(".connect");
6bc193f2 45 ccmd.new Listener() {
e78d9ca3
FT
46 public void done(Response resp) throws Exception {
47 try {
48 checkver(resp);
49 } catch(VersionException e) {
50 error(e);
51 throw(e);
52 }
4b987871 53 synchronized(Connection.this) {
e78d9ca3 54 state = "connected";
4b987871
FT
55 }
56 synchronized(connls) {
e78d9ca3
FT
57 try {
58 for(ConnectListener l : connls)
59 l.connected();
60 } finally {
61 connls.clear();
62 }
63 }
64 }
65
66 public void error(Exception cause) {
67 synchronized(connls) {
68 try {
69 for(ConnectListener l : connls)
70 l.error(cause);
71 } finally {
72 connls.clear();
73 }
74 }
75 }
6bc193f2 76 };
e78d9ca3 77 pending.offer(ccmd);
4b987871
FT
78 reader = new Reader();
79 writer = new Writer();
1b37400b 80 reader.start();
e78d9ca3 81 writer.start();
1b37400b
FT
82 }
83
4b987871
FT
84 private void error(Throwable c) {
85 boolean n = false;
86 if(c instanceof StopCondition) {
87 StopCondition s = (StopCondition)c;
88 n = s.normal;
89 c = s.getCause();
90 }
91 Exception e;
92 if(c instanceof Exception)
93 e = (Exception)c;
94 else
95 e = new Exception(c);
96 if(!n) {
97 close();
98 error = e;
99 }
100 synchronized(queue) {
101 Command cmd;
102 while((cmd = pending.poll()) != null) {
103 cmd.error(e);
104 }
105 while((cmd = queue.poll()) != null) {
106 cmd.error(e);
107 }
108 }
109 }
110
e78d9ca3
FT
111 private void checkthread() {
112 if(Thread.currentThread() == reader)
113 throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
114 }
115
6bc193f2 116 public void syncConnect() throws ConnectException, InterruptedException {
e78d9ca3
FT
117 checkthread();
118 final boolean[] donep = new boolean[] {false};
119 final Exception[] errp = new Exception[] {null};
120 ConnectListener l = new ConnectListener() {
121 public void connected() {
122 donep[0] = true;
123 synchronized(this) {
124 notifyAll();
125 }
126 }
127
128 public void error(Exception cause) {
129 donep[0] = true;
130 errp[0] = cause;
131 synchronized(this) {
132 notifyAll();
133 }
134 }
135 };
136 addConnectListener(l);
137 connect();
138 while(!donep[0]) {
139 synchronized(l) {
140 l.wait();
141 }
142 }
143 if(errp[0] != null)
6bc193f2 144 throw(new ConnectException("DC connection has been closed", errp[0]));
e78d9ca3
FT
145 }
146
147 public void expectVersion(int reqver) {
148 this.reqver = reqver;
149 }
150
151 private void checkver(Response resp) throws VersionException {
152 revlo = Integer.parseInt(resp.token(0, 0));
153 revhi = Integer.parseInt(resp.token(0, 1));
154 if((reqver < revlo) || (reqver > revhi))
155 throw(new VersionException(reqver, revlo, revhi));
156 }
157
158 public Exception join() throws InterruptedException {
159 while(reader.isAlive()) {
160 reader.join();
161 }
162 close();
163 return(error);
164 }
165
1505a392
FT
166 public void addNotifyListener(NotifyListener l) {
167 synchronized(notls) {
168 notls.add(l);
169 }
170 }
171
172 public void removeNotifyListener(NotifyListener l) {
173 synchronized(notls) {
174 notls.remove(l);
175 }
176 }
177
4b987871
FT
178 public synchronized void addConnectListener(ConnectListener l) {
179 if((state != "idle") && (state != "connecting"))
180 throw(new IllegalStateException("Already connected"));
e78d9ca3 181 synchronized(connls) {
e78d9ca3
FT
182 connls.add(l);
183 }
184 }
185
6bc193f2 186 public void qcmd(Command cmd) {
e78d9ca3
FT
187 synchronized(queue) {
188 queue.offer(cmd);
189 queue.notifyAll();
190 }
191 }
192
6bc193f2
FT
193 public void qcmd(String... tokens) {
194 qcmd(new Command(tokens));
195 }
196
197 public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
198 checkthread();
199 final boolean[] donep = new boolean[] {false};
200 final Response[] resp = new Response[] {null};
201 final Exception[] errp = new Exception[] {null};
202 Object l = cmd.new Listener() {
203 public synchronized void done(Response rsp) {
204 resp[0] = rsp;
205 donep[0] = true;
206 notifyAll();
207 }
208
209 public synchronized void error(Exception e) {
210 errp[0] = e;
211 donep[0] = true;
212 notifyAll();
213 }
214 };
215 synchronized(l) {
216 while(!donep[0]) {
217 l.wait();
218 }
219 }
220 if(errp[0] != null)
221 throw(new ClosedException(errp[0]));
222 return(resp[0]);
223 }
224
225 public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
226 return(ecmd(new Command(tokens)));
227 }
228
e78d9ca3
FT
229 static private class StopCondition extends Error {
230 final boolean normal;
231
232 public StopCondition(Exception cause, boolean normal) {
233 super(cause);
234 this.normal = normal;
235 }
236 }
237
4b987871
FT
238 private class Writer extends Thread {
239 public Writer() {
1b37400b
FT
240 setDaemon(true);
241 }
242
e78d9ca3
FT
243 private String quote(String t) {
244 if(t.length() == 0)
245 return("\"\"");
246 StringBuilder sb = new StringBuilder();
247 boolean quote = false;
248 for(int i = 0; i < t.length(); i++) {
249 char c = t.charAt(i);
250 if(c == '\"') {
251 sb.append("\\\"");
252 } else if(Character.isWhitespace(c)) {
253 quote = true;
254 sb.append(c);
255 } else {
256 sb.append(c);
1b37400b 257 }
1b37400b 258 }
e78d9ca3
FT
259 if(quote)
260 return("\"" + sb.toString() + "\"");
261 else
262 return(sb.toString());
263 }
264
4b987871 265 private void guarded() {
e78d9ca3
FT
266 try {
267 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
268 while(true) {
269 Command cmd;
1b37400b 270 try {
e78d9ca3 271 synchronized(queue) {
4b987871
FT
272 while(pending.size() > 0)
273 queue.wait();
274 while((cmd = queue.poll()) == null)
e78d9ca3 275 queue.wait();
4b987871 276 pending.offer(cmd);
1b37400b 277 }
e78d9ca3
FT
278 } catch(InterruptedException e) {
279 throw(new StopCondition(e, true));
1b37400b 280 }
e78d9ca3
FT
281 StringBuilder out = new StringBuilder();
282 for(String s : cmd.tokens) {
283 if(out.length() > 0)
284 out.append(' ');
285 out.append(quote(s));
286 }
287 w.write(out.toString());
1b37400b 288 }
e78d9ca3
FT
289 } catch(IOException e) {
290 throw(new StopCondition(e, false));
291 }
292 }
e78d9ca3 293
4b987871
FT
294 public void run() {
295 try {
296 guarded();
297 } catch(Throwable t) {
298 error(t);
299 }
e78d9ca3 300 }
4b987871
FT
301 }
302
303 private class Reader extends Thread {
e78d9ca3
FT
304 private void dispatch(Response resp) throws Exception {
305 if(resp.code < 600) {
4b987871
FT
306 synchronized(queue) {
307 try {
308 resp.cmd = pending.remove();
309 } catch(NoSuchElementException e) {
310 throw(new RuntimeException("DC server sent reply without a pending command"));
311 }
312 queue.notifyAll();
e78d9ca3
FT
313 }
314 resp.cmd.done(resp);
1505a392
FT
315 } else {
316 synchronized(notls) {
317 for(NotifyListener l : notls) {
318 l.notified(resp);
319 }
320 }
e78d9ca3
FT
321 }
322 }
323
4b987871 324 private void guarded() {
e78d9ca3
FT
325 try {
326 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
327 String state = "start";
328 StringBuilder ct = new StringBuilder();
329 int code = -1;
330 boolean last = true;
331 List<List<String>> lines = new LinkedList<List<String>>();
332 List<String> tokens = new LinkedList<String>();
333 while(true) {
334 char c;
335 {
336 int i;
337 try {
338 if((i = r.read()) < 0) {
339 throw(new IOException("The server closed the connection"));
340 }
341 } catch(java.nio.channels.ClosedByInterruptException e) {
342 throw(new StopCondition(e, true));
1b37400b 343 }
e78d9ca3
FT
344 c = (char)i;
345 }
346 eat: do {
347 if(state == "start") {
348 if(c == '\r') {
349 state = "nl";
350 } else if(Character.isWhitespace(c)) {
351 } else {
352 if(code == -1)
353 state = "code";
354 else
355 state = "token";
356 continue eat;
1b37400b 357 }
e78d9ca3
FT
358 } else if(state == "nl") {
359 if(c == '\n') {
360 if((code < 100) || (code >= 1000)) {
361 throw(new IOException("Illegal response code " + code + " from the server"));
1b37400b 362 }
e78d9ca3
FT
363 lines.add(tokens);
364 tokens = new LinkedList<String>();
365 if(last) {
366 dispatch(new Response(code, lines));
367 lines = new LinkedList<List<String>>();
368 }
369 code = -1;
370 state = "start";
371 } else {
372 state = "start";
373 continue eat;
1b37400b 374 }
e78d9ca3
FT
375 } else if(state == "code") {
376 if((c == '-') || Character.isWhitespace(c)) {
377 last = c != '-';
378 code = Integer.parseInt(ct.toString());
379 ct.setLength(0);
380 state = "start";
381 continue eat;
382 } else {
383 ct.append(c);
384 }
385 } else if(state == "token") {
386 if(Character.isWhitespace(c)) {
387 tokens.add(ct.toString());
388 ct.setLength(0);
389 state = "start";
390 continue eat;
391 } else if(c == '\\') {
392 state = "bs";
393 } else if(c == '"') {
394 state = "cited";
395 } else {
396 ct.append(c);
397 }
398 } else if(state == "bs") {
399 ct.append(c);
400 state = "token";
401 } else if(state == "cited") {
402 if(c == '\\')
403 state = "cbs";
404 else if(c == '"')
405 state = "token";
406 else
407 ct.append(c);
408 } else if(state == "cbs") {
1b37400b 409 ct.append(c);
1b37400b
FT
410 state = "cited";
411 } else {
e78d9ca3 412 throw(new Error("invalid state " + state));
1b37400b 413 }
e78d9ca3
FT
414 break;
415 } while(true);
416 }
417 } catch(Exception e) {
418 throw(new StopCondition(e, false));
1b37400b
FT
419 }
420 }
4b987871
FT
421
422 public void run() {
423 try {
424 guarded();
425 } catch(Throwable t) {
426 error(t);
427 }
428 }
1b37400b
FT
429 }
430
e78d9ca3 431 public void close() {
1b37400b
FT
432 try {
433 s.close();
4b987871 434 } catch(IOException e) {}
1b37400b 435 reader.interrupt();
e78d9ca3 436 writer.interrupt();
1b37400b 437 }
1b37400b 438}