1 package dolda.dolcon.protocol;
4 import java.net.Socket;
7 public class Connection {
10 private Writer writer;
11 private Queue<Command> queue = new LinkedList<Command>();
12 private Queue<Command> pending = new LinkedList<Command>();
13 private int reqver = 2, revlo, revhi;
16 private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17 private Exception error;
19 public interface ConnectListener {
20 public void connected() throws Exception;
21 public void error(Exception cause);
24 public Connection(String aspec) {
29 public void connect() throws ConnectException {
32 throw(new IllegalStateException("Already connected"));
36 s = new Socket(aspec, 1500);
37 } catch(java.net.UnknownHostException e) {
38 throw(new ConnectException("Could not resolve host " + aspec, e));
39 } catch(IOException e) {
40 throw(new ConnectException("Could not connect to host " + aspec, e));
42 pending = new LinkedList<Command>();
43 Command ccmd = new Command(".connect");
45 public void done(Response resp) throws Exception {
48 } catch(VersionException e) {
52 synchronized(Connection.this) {
55 synchronized(connls) {
57 for(ConnectListener l : connls)
65 public void error(Exception cause) {
66 synchronized(connls) {
68 for(ConnectListener l : connls)
77 reader = new Reader();
78 writer = new Writer();
83 private void error(Throwable c) {
85 if(c instanceof StopCondition) {
86 StopCondition s = (StopCondition)c;
91 if(c instanceof Exception)
101 while((cmd = pending.poll()) != null) {
104 while((cmd = queue.poll()) != null) {
110 private void checkthread() {
111 if(Thread.currentThread() == reader)
112 throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
115 public void syncConnect() throws ConnectException, InterruptedException {
117 final boolean[] donep = new boolean[] {false};
118 final Exception[] errp = new Exception[] {null};
119 ConnectListener l = new ConnectListener() {
120 public void connected() {
127 public void error(Exception cause) {
135 addConnectListener(l);
143 throw(new ConnectException("DC connection has been closed", errp[0]));
146 public void expectVersion(int reqver) {
147 this.reqver = reqver;
150 private void checkver(Response resp) throws VersionException {
151 revlo = Integer.parseInt(resp.token(0, 0));
152 revhi = Integer.parseInt(resp.token(0, 1));
153 if((reqver < revlo) || (reqver > revhi))
154 throw(new VersionException(reqver, revlo, revhi));
157 public Exception join() throws InterruptedException {
158 while(reader.isAlive()) {
165 public synchronized void addConnectListener(ConnectListener l) {
166 if((state != "idle") && (state != "connecting"))
167 throw(new IllegalStateException("Already connected"));
168 synchronized(connls) {
173 public void qcmd(Command cmd) {
174 synchronized(queue) {
180 public void qcmd(String... tokens) {
181 qcmd(new Command(tokens));
184 public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
186 final boolean[] donep = new boolean[] {false};
187 final Response[] resp = new Response[] {null};
188 final Exception[] errp = new Exception[] {null};
189 Object l = cmd.new Listener() {
190 public synchronized void done(Response rsp) {
196 public synchronized void error(Exception e) {
208 throw(new ClosedException(errp[0]));
212 public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
213 return(ecmd(new Command(tokens)));
216 static private class StopCondition extends Error {
217 final boolean normal;
219 public StopCondition(Exception cause, boolean normal) {
221 this.normal = normal;
225 private class Writer extends Thread {
230 private String quote(String t) {
233 StringBuilder sb = new StringBuilder();
234 boolean quote = false;
235 for(int i = 0; i < t.length(); i++) {
236 char c = t.charAt(i);
239 } else if(Character.isWhitespace(c)) {
247 return("\"" + sb.toString() + "\"");
249 return(sb.toString());
252 private void guarded() {
254 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
258 synchronized(queue) {
259 while(pending.size() > 0)
261 while((cmd = queue.poll()) == null)
265 } catch(InterruptedException e) {
266 throw(new StopCondition(e, true));
268 StringBuilder out = new StringBuilder();
269 for(String s : cmd.tokens) {
272 out.append(quote(s));
274 w.write(out.toString());
276 } catch(IOException e) {
277 throw(new StopCondition(e, false));
284 } catch(Throwable t) {
290 private class Reader extends Thread {
291 private void dispatch(Response resp) throws Exception {
292 if(resp.code < 600) {
293 synchronized(queue) {
295 resp.cmd = pending.remove();
296 } catch(NoSuchElementException e) {
297 throw(new RuntimeException("DC server sent reply without a pending command"));
305 private void guarded() {
307 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
308 String state = "start";
309 StringBuilder ct = new StringBuilder();
312 List<List<String>> lines = new LinkedList<List<String>>();
313 List<String> tokens = new LinkedList<String>();
319 if((i = r.read()) < 0) {
320 throw(new IOException("The server closed the connection"));
322 } catch(java.nio.channels.ClosedByInterruptException e) {
323 throw(new StopCondition(e, true));
328 if(state == "start") {
331 } else if(Character.isWhitespace(c)) {
339 } else if(state == "nl") {
341 if((code < 100) || (code >= 1000)) {
342 throw(new IOException("Illegal response code " + code + " from the server"));
345 tokens = new LinkedList<String>();
347 dispatch(new Response(code, lines));
348 lines = new LinkedList<List<String>>();
356 } else if(state == "code") {
357 if((c == '-') || Character.isWhitespace(c)) {
359 code = Integer.parseInt(ct.toString());
366 } else if(state == "token") {
367 if(Character.isWhitespace(c)) {
368 tokens.add(ct.toString());
372 } else if(c == '\\') {
374 } else if(c == '"') {
379 } else if(state == "bs") {
382 } else if(state == "cited") {
389 } else if(state == "cbs") {
393 throw(new Error("invalid state " + state));
398 } catch(Exception e) {
399 throw(new StopCondition(e, false));
406 } catch(Throwable t) {
412 public void close() {
415 } catch(IOException e) {}