Java: Added a session handler with authentication capability.
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
CommitLineData
1335284e 1package dolda.dolcon.protocol;
1b37400b
FT
2
3import java.io.*;
4import java.net.Socket;
5import java.util.*;
6
7public class Connection {
e78d9ca3
FT
8 private Socket s;
9 private Reader reader;
10 private Writer writer;
11 private Queue<Command> queue = new LinkedList<Command>();
12 private Queue<Command> pending = new LinkedList<Command>();
13 private int reqver = 2, revlo, revhi;
14 private String aspec;
15 private String state;
16 private Set<ConnectListener> connls = new HashSet<ConnectListener>();
1505a392 17 private Set<NotifyListener> notls = new HashSet<NotifyListener>();
e78d9ca3 18 private Exception error;
1b37400b 19
e78d9ca3
FT
20 public interface ConnectListener {
21 public void connected() throws Exception;
22 public void error(Exception cause);
23 }
1505a392 24
e78d9ca3
FT
25 public Connection(String aspec) {
26 this.aspec = aspec;
27 state = "idle";
28 }
29
30 public void connect() throws ConnectException {
4b987871
FT
31 synchronized(this) {
32 if(state != "idle")
33 throw(new IllegalStateException("Already connected"));
34 state = "connecting";
35 }
1b37400b
FT
36 try {
37 s = new Socket(aspec, 1500);
38 } catch(java.net.UnknownHostException e) {
39 throw(new ConnectException("Could not resolve host " + aspec, e));
40 } catch(IOException e) {
41 throw(new ConnectException("Could not connect to host " + aspec, e));
42 }
e78d9ca3
FT
43 pending = new LinkedList<Command>();
44 Command ccmd = new Command(".connect");
6bc193f2 45 ccmd.new Listener() {
e78d9ca3
FT
46 public void done(Response resp) throws Exception {
47 try {
48 checkver(resp);
49 } catch(VersionException e) {
50 error(e);
51 throw(e);
52 }
4b987871 53 synchronized(Connection.this) {
e78d9ca3 54 state = "connected";
4b987871
FT
55 }
56 synchronized(connls) {
e78d9ca3
FT
57 try {
58 for(ConnectListener l : connls)
59 l.connected();
60 } finally {
61 connls.clear();
62 }
63 }
64 }
65
66 public void error(Exception cause) {
67 synchronized(connls) {
68 try {
69 for(ConnectListener l : connls)
70 l.error(cause);
71 } finally {
72 connls.clear();
73 }
74 }
75 }
6bc193f2 76 };
e78d9ca3 77 pending.offer(ccmd);
4b987871
FT
78 reader = new Reader();
79 writer = new Writer();
1b37400b 80 reader.start();
e78d9ca3 81 writer.start();
1b37400b
FT
82 }
83
4b987871
FT
84 private void error(Throwable c) {
85 boolean n = false;
86 if(c instanceof StopCondition) {
87 StopCondition s = (StopCondition)c;
88 n = s.normal;
89 c = s.getCause();
90 }
91 Exception e;
92 if(c instanceof Exception)
93 e = (Exception)c;
94 else
95 e = new Exception(c);
96 if(!n) {
97 close();
98 error = e;
99 }
100 synchronized(queue) {
101 Command cmd;
102 while((cmd = pending.poll()) != null) {
103 cmd.error(e);
104 }
105 while((cmd = queue.poll()) != null) {
106 cmd.error(e);
107 }
108 }
109 }
110
e78d9ca3
FT
111 private void checkthread() {
112 if(Thread.currentThread() == reader)
113 throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
114 }
115
6bc193f2 116 public void syncConnect() throws ConnectException, InterruptedException {
e78d9ca3
FT
117 checkthread();
118 final boolean[] donep = new boolean[] {false};
119 final Exception[] errp = new Exception[] {null};
120 ConnectListener l = new ConnectListener() {
121 public void connected() {
122 donep[0] = true;
123 synchronized(this) {
124 notifyAll();
125 }
126 }
127
128 public void error(Exception cause) {
129 donep[0] = true;
130 errp[0] = cause;
131 synchronized(this) {
132 notifyAll();
133 }
134 }
135 };
136 addConnectListener(l);
137 connect();
138 while(!donep[0]) {
139 synchronized(l) {
140 l.wait();
141 }
142 }
143 if(errp[0] != null)
6bc193f2 144 throw(new ConnectException("DC connection has been closed", errp[0]));
e78d9ca3
FT
145 }
146
147 public void expectVersion(int reqver) {
148 this.reqver = reqver;
149 }
150
151 private void checkver(Response resp) throws VersionException {
152 revlo = Integer.parseInt(resp.token(0, 0));
153 revhi = Integer.parseInt(resp.token(0, 1));
154 if((reqver < revlo) || (reqver > revhi))
155 throw(new VersionException(reqver, revlo, revhi));
156 }
157
158 public Exception join() throws InterruptedException {
159 while(reader.isAlive()) {
160 reader.join();
161 }
162 close();
163 return(error);
164 }
165
1505a392
FT
166 public void addNotifyListener(NotifyListener l) {
167 synchronized(notls) {
168 notls.add(l);
169 }
170 }
171
172 public void removeNotifyListener(NotifyListener l) {
173 synchronized(notls) {
174 notls.remove(l);
175 }
176 }
177
4b987871
FT
178 public synchronized void addConnectListener(ConnectListener l) {
179 if((state != "idle") && (state != "connecting"))
180 throw(new IllegalStateException("Already connected"));
e78d9ca3 181 synchronized(connls) {
e78d9ca3
FT
182 connls.add(l);
183 }
184 }
185
6bc193f2 186 public void qcmd(Command cmd) {
e78d9ca3
FT
187 synchronized(queue) {
188 queue.offer(cmd);
189 queue.notifyAll();
190 }
191 }
192
6bc193f2
FT
193 public void qcmd(String... tokens) {
194 qcmd(new Command(tokens));
195 }
196
197 public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
198 checkthread();
199 final boolean[] donep = new boolean[] {false};
200 final Response[] resp = new Response[] {null};
201 final Exception[] errp = new Exception[] {null};
202 Object l = cmd.new Listener() {
203 public synchronized void done(Response rsp) {
204 resp[0] = rsp;
205 donep[0] = true;
206 notifyAll();
207 }
208
209 public synchronized void error(Exception e) {
210 errp[0] = e;
211 donep[0] = true;
212 notifyAll();
213 }
214 };
59b214d6 215 qcmd(cmd);
6bc193f2
FT
216 synchronized(l) {
217 while(!donep[0]) {
218 l.wait();
219 }
220 }
221 if(errp[0] != null)
222 throw(new ClosedException(errp[0]));
223 return(resp[0]);
224 }
225
226 public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
227 return(ecmd(new Command(tokens)));
228 }
229
e78d9ca3
FT
230 static private class StopCondition extends Error {
231 final boolean normal;
232
233 public StopCondition(Exception cause, boolean normal) {
234 super(cause);
235 this.normal = normal;
236 }
237 }
238
4b987871
FT
239 private class Writer extends Thread {
240 public Writer() {
1b37400b
FT
241 setDaemon(true);
242 }
243
e78d9ca3
FT
244 private String quote(String t) {
245 if(t.length() == 0)
246 return("\"\"");
247 StringBuilder sb = new StringBuilder();
248 boolean quote = false;
249 for(int i = 0; i < t.length(); i++) {
250 char c = t.charAt(i);
251 if(c == '\"') {
252 sb.append("\\\"");
253 } else if(Character.isWhitespace(c)) {
254 quote = true;
255 sb.append(c);
256 } else {
257 sb.append(c);
1b37400b 258 }
1b37400b 259 }
e78d9ca3
FT
260 if(quote)
261 return("\"" + sb.toString() + "\"");
262 else
263 return(sb.toString());
264 }
265
4b987871 266 private void guarded() {
e78d9ca3
FT
267 try {
268 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
269 while(true) {
270 Command cmd;
1b37400b 271 try {
e78d9ca3 272 synchronized(queue) {
4b987871
FT
273 while(pending.size() > 0)
274 queue.wait();
275 while((cmd = queue.poll()) == null)
e78d9ca3 276 queue.wait();
4b987871 277 pending.offer(cmd);
1b37400b 278 }
e78d9ca3
FT
279 } catch(InterruptedException e) {
280 throw(new StopCondition(e, true));
1b37400b 281 }
e78d9ca3
FT
282 StringBuilder out = new StringBuilder();
283 for(String s : cmd.tokens) {
284 if(out.length() > 0)
285 out.append(' ');
286 out.append(quote(s));
287 }
59b214d6 288 out.append("\r\n");
e78d9ca3 289 w.write(out.toString());
59b214d6 290 w.flush();
1b37400b 291 }
e78d9ca3
FT
292 } catch(IOException e) {
293 throw(new StopCondition(e, false));
294 }
295 }
e78d9ca3 296
4b987871
FT
297 public void run() {
298 try {
299 guarded();
300 } catch(Throwable t) {
301 error(t);
302 }
e78d9ca3 303 }
4b987871
FT
304 }
305
306 private class Reader extends Thread {
e78d9ca3
FT
307 private void dispatch(Response resp) throws Exception {
308 if(resp.code < 600) {
4b987871
FT
309 synchronized(queue) {
310 try {
311 resp.cmd = pending.remove();
312 } catch(NoSuchElementException e) {
313 throw(new RuntimeException("DC server sent reply without a pending command"));
314 }
315 queue.notifyAll();
e78d9ca3
FT
316 }
317 resp.cmd.done(resp);
1505a392
FT
318 } else {
319 synchronized(notls) {
320 for(NotifyListener l : notls) {
321 l.notified(resp);
322 }
323 }
e78d9ca3
FT
324 }
325 }
326
4b987871 327 private void guarded() {
e78d9ca3
FT
328 try {
329 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
330 String state = "start";
331 StringBuilder ct = new StringBuilder();
332 int code = -1;
333 boolean last = true;
334 List<List<String>> lines = new LinkedList<List<String>>();
335 List<String> tokens = new LinkedList<String>();
336 while(true) {
337 char c;
338 {
339 int i;
340 try {
341 if((i = r.read()) < 0) {
342 throw(new IOException("The server closed the connection"));
343 }
344 } catch(java.nio.channels.ClosedByInterruptException e) {
345 throw(new StopCondition(e, true));
1b37400b 346 }
e78d9ca3
FT
347 c = (char)i;
348 }
349 eat: do {
350 if(state == "start") {
351 if(c == '\r') {
352 state = "nl";
353 } else if(Character.isWhitespace(c)) {
354 } else {
355 if(code == -1)
356 state = "code";
357 else
358 state = "token";
359 continue eat;
1b37400b 360 }
e78d9ca3
FT
361 } else if(state == "nl") {
362 if(c == '\n') {
363 if((code < 100) || (code >= 1000)) {
364 throw(new IOException("Illegal response code " + code + " from the server"));
1b37400b 365 }
e78d9ca3
FT
366 lines.add(tokens);
367 tokens = new LinkedList<String>();
368 if(last) {
369 dispatch(new Response(code, lines));
370 lines = new LinkedList<List<String>>();
371 }
372 code = -1;
373 state = "start";
374 } else {
375 state = "start";
376 continue eat;
1b37400b 377 }
e78d9ca3
FT
378 } else if(state == "code") {
379 if((c == '-') || Character.isWhitespace(c)) {
380 last = c != '-';
381 code = Integer.parseInt(ct.toString());
382 ct.setLength(0);
383 state = "start";
e78d9ca3
FT
384 } else {
385 ct.append(c);
386 }
387 } else if(state == "token") {
388 if(Character.isWhitespace(c)) {
389 tokens.add(ct.toString());
390 ct.setLength(0);
391 state = "start";
392 continue eat;
393 } else if(c == '\\') {
394 state = "bs";
395 } else if(c == '"') {
396 state = "cited";
397 } else {
398 ct.append(c);
399 }
400 } else if(state == "bs") {
401 ct.append(c);
402 state = "token";
403 } else if(state == "cited") {
404 if(c == '\\')
405 state = "cbs";
406 else if(c == '"')
407 state = "token";
408 else
409 ct.append(c);
410 } else if(state == "cbs") {
1b37400b 411 ct.append(c);
1b37400b
FT
412 state = "cited";
413 } else {
e78d9ca3 414 throw(new Error("invalid state " + state));
1b37400b 415 }
e78d9ca3
FT
416 break;
417 } while(true);
418 }
419 } catch(Exception e) {
420 throw(new StopCondition(e, false));
1b37400b
FT
421 }
422 }
4b987871
FT
423
424 public void run() {
425 try {
426 guarded();
427 } catch(Throwable t) {
428 error(t);
429 }
430 }
1b37400b
FT
431 }
432
e78d9ca3 433 public void close() {
1b37400b
FT
434 try {
435 s.close();
4b987871 436 } catch(IOException e) {}
1b37400b 437 reader.interrupt();
e78d9ca3 438 writer.interrupt();
1b37400b 439 }
1b37400b 440}