Java: Added ecmd to the connection manager.
[doldaconnect.git] / lib / java / dolda / dolcon / protocol / Connection.java
CommitLineData
1335284e 1package dolda.dolcon.protocol;
1b37400b
FT
2
3import java.io.*;
4import java.net.Socket;
5import java.util.*;
6
7public class Connection {
e78d9ca3
FT
8 private Socket s;
9 private Reader reader;
10 private Writer writer;
11 private Queue<Command> queue = new LinkedList<Command>();
12 private Queue<Command> pending = new LinkedList<Command>();
13 private int reqver = 2, revlo, revhi;
14 private String aspec;
15 private String state;
16 private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17 private Exception error;
1b37400b 18
e78d9ca3
FT
19 public interface ConnectListener {
20 public void connected() throws Exception;
21 public void error(Exception cause);
22 }
23
24 public Connection(String aspec) {
25 this.aspec = aspec;
26 state = "idle";
27 }
28
29 public void connect() throws ConnectException {
4b987871
FT
30 synchronized(this) {
31 if(state != "idle")
32 throw(new IllegalStateException("Already connected"));
33 state = "connecting";
34 }
1b37400b
FT
35 try {
36 s = new Socket(aspec, 1500);
37 } catch(java.net.UnknownHostException e) {
38 throw(new ConnectException("Could not resolve host " + aspec, e));
39 } catch(IOException e) {
40 throw(new ConnectException("Could not connect to host " + aspec, e));
41 }
e78d9ca3
FT
42 pending = new LinkedList<Command>();
43 Command ccmd = new Command(".connect");
6bc193f2 44 ccmd.new Listener() {
e78d9ca3
FT
45 public void done(Response resp) throws Exception {
46 try {
47 checkver(resp);
48 } catch(VersionException e) {
49 error(e);
50 throw(e);
51 }
4b987871 52 synchronized(Connection.this) {
e78d9ca3 53 state = "connected";
4b987871
FT
54 }
55 synchronized(connls) {
e78d9ca3
FT
56 try {
57 for(ConnectListener l : connls)
58 l.connected();
59 } finally {
60 connls.clear();
61 }
62 }
63 }
64
65 public void error(Exception cause) {
66 synchronized(connls) {
67 try {
68 for(ConnectListener l : connls)
69 l.error(cause);
70 } finally {
71 connls.clear();
72 }
73 }
74 }
6bc193f2 75 };
e78d9ca3 76 pending.offer(ccmd);
4b987871
FT
77 reader = new Reader();
78 writer = new Writer();
1b37400b 79 reader.start();
e78d9ca3 80 writer.start();
1b37400b
FT
81 }
82
4b987871
FT
83 private void error(Throwable c) {
84 boolean n = false;
85 if(c instanceof StopCondition) {
86 StopCondition s = (StopCondition)c;
87 n = s.normal;
88 c = s.getCause();
89 }
90 Exception e;
91 if(c instanceof Exception)
92 e = (Exception)c;
93 else
94 e = new Exception(c);
95 if(!n) {
96 close();
97 error = e;
98 }
99 synchronized(queue) {
100 Command cmd;
101 while((cmd = pending.poll()) != null) {
102 cmd.error(e);
103 }
104 while((cmd = queue.poll()) != null) {
105 cmd.error(e);
106 }
107 }
108 }
109
e78d9ca3
FT
110 private void checkthread() {
111 if(Thread.currentThread() == reader)
112 throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
113 }
114
6bc193f2 115 public void syncConnect() throws ConnectException, InterruptedException {
e78d9ca3
FT
116 checkthread();
117 final boolean[] donep = new boolean[] {false};
118 final Exception[] errp = new Exception[] {null};
119 ConnectListener l = new ConnectListener() {
120 public void connected() {
121 donep[0] = true;
122 synchronized(this) {
123 notifyAll();
124 }
125 }
126
127 public void error(Exception cause) {
128 donep[0] = true;
129 errp[0] = cause;
130 synchronized(this) {
131 notifyAll();
132 }
133 }
134 };
135 addConnectListener(l);
136 connect();
137 while(!donep[0]) {
138 synchronized(l) {
139 l.wait();
140 }
141 }
142 if(errp[0] != null)
6bc193f2 143 throw(new ConnectException("DC connection has been closed", errp[0]));
e78d9ca3
FT
144 }
145
146 public void expectVersion(int reqver) {
147 this.reqver = reqver;
148 }
149
150 private void checkver(Response resp) throws VersionException {
151 revlo = Integer.parseInt(resp.token(0, 0));
152 revhi = Integer.parseInt(resp.token(0, 1));
153 if((reqver < revlo) || (reqver > revhi))
154 throw(new VersionException(reqver, revlo, revhi));
155 }
156
157 public Exception join() throws InterruptedException {
158 while(reader.isAlive()) {
159 reader.join();
160 }
161 close();
162 return(error);
163 }
164
4b987871
FT
165 public synchronized void addConnectListener(ConnectListener l) {
166 if((state != "idle") && (state != "connecting"))
167 throw(new IllegalStateException("Already connected"));
e78d9ca3 168 synchronized(connls) {
e78d9ca3
FT
169 connls.add(l);
170 }
171 }
172
6bc193f2 173 public void qcmd(Command cmd) {
e78d9ca3
FT
174 synchronized(queue) {
175 queue.offer(cmd);
176 queue.notifyAll();
177 }
178 }
179
6bc193f2
FT
180 public void qcmd(String... tokens) {
181 qcmd(new Command(tokens));
182 }
183
184 public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
185 checkthread();
186 final boolean[] donep = new boolean[] {false};
187 final Response[] resp = new Response[] {null};
188 final Exception[] errp = new Exception[] {null};
189 Object l = cmd.new Listener() {
190 public synchronized void done(Response rsp) {
191 resp[0] = rsp;
192 donep[0] = true;
193 notifyAll();
194 }
195
196 public synchronized void error(Exception e) {
197 errp[0] = e;
198 donep[0] = true;
199 notifyAll();
200 }
201 };
202 synchronized(l) {
203 while(!donep[0]) {
204 l.wait();
205 }
206 }
207 if(errp[0] != null)
208 throw(new ClosedException(errp[0]));
209 return(resp[0]);
210 }
211
212 public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
213 return(ecmd(new Command(tokens)));
214 }
215
e78d9ca3
FT
216 static private class StopCondition extends Error {
217 final boolean normal;
218
219 public StopCondition(Exception cause, boolean normal) {
220 super(cause);
221 this.normal = normal;
222 }
223 }
224
4b987871
FT
225 private class Writer extends Thread {
226 public Writer() {
1b37400b
FT
227 setDaemon(true);
228 }
229
e78d9ca3
FT
230 private String quote(String t) {
231 if(t.length() == 0)
232 return("\"\"");
233 StringBuilder sb = new StringBuilder();
234 boolean quote = false;
235 for(int i = 0; i < t.length(); i++) {
236 char c = t.charAt(i);
237 if(c == '\"') {
238 sb.append("\\\"");
239 } else if(Character.isWhitespace(c)) {
240 quote = true;
241 sb.append(c);
242 } else {
243 sb.append(c);
1b37400b 244 }
1b37400b 245 }
e78d9ca3
FT
246 if(quote)
247 return("\"" + sb.toString() + "\"");
248 else
249 return(sb.toString());
250 }
251
4b987871 252 private void guarded() {
e78d9ca3
FT
253 try {
254 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
255 while(true) {
256 Command cmd;
1b37400b 257 try {
e78d9ca3 258 synchronized(queue) {
4b987871
FT
259 while(pending.size() > 0)
260 queue.wait();
261 while((cmd = queue.poll()) == null)
e78d9ca3 262 queue.wait();
4b987871 263 pending.offer(cmd);
1b37400b 264 }
e78d9ca3
FT
265 } catch(InterruptedException e) {
266 throw(new StopCondition(e, true));
1b37400b 267 }
e78d9ca3
FT
268 StringBuilder out = new StringBuilder();
269 for(String s : cmd.tokens) {
270 if(out.length() > 0)
271 out.append(' ');
272 out.append(quote(s));
273 }
274 w.write(out.toString());
1b37400b 275 }
e78d9ca3
FT
276 } catch(IOException e) {
277 throw(new StopCondition(e, false));
278 }
279 }
e78d9ca3 280
4b987871
FT
281 public void run() {
282 try {
283 guarded();
284 } catch(Throwable t) {
285 error(t);
286 }
e78d9ca3 287 }
4b987871
FT
288 }
289
290 private class Reader extends Thread {
e78d9ca3
FT
291 private void dispatch(Response resp) throws Exception {
292 if(resp.code < 600) {
4b987871
FT
293 synchronized(queue) {
294 try {
295 resp.cmd = pending.remove();
296 } catch(NoSuchElementException e) {
297 throw(new RuntimeException("DC server sent reply without a pending command"));
298 }
299 queue.notifyAll();
e78d9ca3
FT
300 }
301 resp.cmd.done(resp);
302 }
303 }
304
4b987871 305 private void guarded() {
e78d9ca3
FT
306 try {
307 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
308 String state = "start";
309 StringBuilder ct = new StringBuilder();
310 int code = -1;
311 boolean last = true;
312 List<List<String>> lines = new LinkedList<List<String>>();
313 List<String> tokens = new LinkedList<String>();
314 while(true) {
315 char c;
316 {
317 int i;
318 try {
319 if((i = r.read()) < 0) {
320 throw(new IOException("The server closed the connection"));
321 }
322 } catch(java.nio.channels.ClosedByInterruptException e) {
323 throw(new StopCondition(e, true));
1b37400b 324 }
e78d9ca3
FT
325 c = (char)i;
326 }
327 eat: do {
328 if(state == "start") {
329 if(c == '\r') {
330 state = "nl";
331 } else if(Character.isWhitespace(c)) {
332 } else {
333 if(code == -1)
334 state = "code";
335 else
336 state = "token";
337 continue eat;
1b37400b 338 }
e78d9ca3
FT
339 } else if(state == "nl") {
340 if(c == '\n') {
341 if((code < 100) || (code >= 1000)) {
342 throw(new IOException("Illegal response code " + code + " from the server"));
1b37400b 343 }
e78d9ca3
FT
344 lines.add(tokens);
345 tokens = new LinkedList<String>();
346 if(last) {
347 dispatch(new Response(code, lines));
348 lines = new LinkedList<List<String>>();
349 }
350 code = -1;
351 state = "start";
352 } else {
353 state = "start";
354 continue eat;
1b37400b 355 }
e78d9ca3
FT
356 } else if(state == "code") {
357 if((c == '-') || Character.isWhitespace(c)) {
358 last = c != '-';
359 code = Integer.parseInt(ct.toString());
360 ct.setLength(0);
361 state = "start";
362 continue eat;
363 } else {
364 ct.append(c);
365 }
366 } else if(state == "token") {
367 if(Character.isWhitespace(c)) {
368 tokens.add(ct.toString());
369 ct.setLength(0);
370 state = "start";
371 continue eat;
372 } else if(c == '\\') {
373 state = "bs";
374 } else if(c == '"') {
375 state = "cited";
376 } else {
377 ct.append(c);
378 }
379 } else if(state == "bs") {
380 ct.append(c);
381 state = "token";
382 } else if(state == "cited") {
383 if(c == '\\')
384 state = "cbs";
385 else if(c == '"')
386 state = "token";
387 else
388 ct.append(c);
389 } else if(state == "cbs") {
1b37400b 390 ct.append(c);
1b37400b
FT
391 state = "cited";
392 } else {
e78d9ca3 393 throw(new Error("invalid state " + state));
1b37400b 394 }
e78d9ca3
FT
395 break;
396 } while(true);
397 }
398 } catch(Exception e) {
399 throw(new StopCondition(e, false));
1b37400b
FT
400 }
401 }
4b987871
FT
402
403 public void run() {
404 try {
405 guarded();
406 } catch(Throwable t) {
407 error(t);
408 }
409 }
1b37400b
FT
410 }
411
e78d9ca3 412 public void close() {
1b37400b
FT
413 try {
414 s.close();
4b987871 415 } catch(IOException e) {}
1b37400b 416 reader.interrupt();
e78d9ca3 417 writer.interrupt();
1b37400b 418 }
1b37400b 419}