1 package dolda.dolcon.protocol;
4 import java.net.Socket;
7 public class Connection {
10 private Writer writer;
11 private Queue<Command> queue = new LinkedList<Command>();
12 private Queue<Command> pending = new LinkedList<Command>();
13 private int reqver = 2, revlo, revhi;
16 private Set<ConnectListener> connls = new HashSet<ConnectListener>();
17 private Set<NotifyListener> notls = new HashSet<NotifyListener>();
18 private Exception error;
20 public interface ConnectListener {
21 public void connected() throws Exception;
22 public void error(Exception cause);
25 public Connection(String aspec) {
30 public void connect() throws ConnectException {
33 throw(new IllegalStateException("Already connected"));
37 s = new Socket(aspec, 1500);
38 } catch(java.net.UnknownHostException e) {
39 throw(new ConnectException("Could not resolve host " + aspec, e));
40 } catch(IOException e) {
41 throw(new ConnectException("Could not connect to host " + aspec, e));
43 pending = new LinkedList<Command>();
44 Command ccmd = new Command(".connect");
46 public void done(Response resp) throws Exception {
49 } catch(VersionException e) {
53 synchronized(Connection.this) {
56 synchronized(connls) {
58 for(ConnectListener l : connls)
66 public void error(Exception cause) {
67 synchronized(connls) {
69 for(ConnectListener l : connls)
78 reader = new Reader();
79 writer = new Writer();
84 private void error(Throwable c) {
86 if(c instanceof StopCondition) {
87 StopCondition s = (StopCondition)c;
92 if(c instanceof Exception)
100 synchronized(queue) {
102 while((cmd = pending.poll()) != null) {
105 while((cmd = queue.poll()) != null) {
111 private void checkthread() {
112 if(Thread.currentThread() == reader)
113 throw(new RuntimeException("Cannot call synchronous method with dispatch thread!"));
116 public void syncConnect() throws ConnectException, InterruptedException {
118 final boolean[] donep = new boolean[] {false};
119 final Exception[] errp = new Exception[] {null};
120 ConnectListener l = new ConnectListener() {
121 public void connected() {
128 public void error(Exception cause) {
136 addConnectListener(l);
144 throw(new ConnectException("DC connection has been closed", errp[0]));
147 public void expectVersion(int reqver) {
148 this.reqver = reqver;
151 private void checkver(Response resp) throws VersionException {
152 revlo = Integer.parseInt(resp.token(0, 0));
153 revhi = Integer.parseInt(resp.token(0, 1));
154 if((reqver < revlo) || (reqver > revhi))
155 throw(new VersionException(reqver, revlo, revhi));
158 public Exception join() throws InterruptedException {
159 while(reader.isAlive()) {
166 public void addNotifyListener(NotifyListener l) {
167 synchronized(notls) {
172 public void removeNotifyListener(NotifyListener l) {
173 synchronized(notls) {
178 public synchronized void addConnectListener(ConnectListener l) {
179 if((state != "idle") && (state != "connecting"))
180 throw(new IllegalStateException("Already connected"));
181 synchronized(connls) {
186 public void qcmd(Command cmd) {
187 synchronized(queue) {
193 public void qcmd(String... tokens) {
194 qcmd(new Command(tokens));
197 public Response ecmd(Command cmd) throws ClosedException, InterruptedException {
199 final boolean[] donep = new boolean[] {false};
200 final Response[] resp = new Response[] {null};
201 final Exception[] errp = new Exception[] {null};
202 Object l = cmd.new Listener() {
203 public synchronized void done(Response rsp) {
209 public synchronized void error(Exception e) {
221 throw(new ClosedException(errp[0]));
225 public Response ecmd(String... tokens) throws ClosedException, InterruptedException {
226 return(ecmd(new Command(tokens)));
229 static private class StopCondition extends Error {
230 final boolean normal;
232 public StopCondition(Exception cause, boolean normal) {
234 this.normal = normal;
238 private class Writer extends Thread {
243 private String quote(String t) {
246 StringBuilder sb = new StringBuilder();
247 boolean quote = false;
248 for(int i = 0; i < t.length(); i++) {
249 char c = t.charAt(i);
252 } else if(Character.isWhitespace(c)) {
260 return("\"" + sb.toString() + "\"");
262 return(sb.toString());
265 private void guarded() {
267 java.io.Writer w = new OutputStreamWriter(s.getOutputStream(), "UTF-8");
271 synchronized(queue) {
272 while(pending.size() > 0)
274 while((cmd = queue.poll()) == null)
278 } catch(InterruptedException e) {
279 throw(new StopCondition(e, true));
281 StringBuilder out = new StringBuilder();
282 for(String s : cmd.tokens) {
285 out.append(quote(s));
287 w.write(out.toString());
289 } catch(IOException e) {
290 throw(new StopCondition(e, false));
297 } catch(Throwable t) {
303 private class Reader extends Thread {
304 private void dispatch(Response resp) throws Exception {
305 if(resp.code < 600) {
306 synchronized(queue) {
308 resp.cmd = pending.remove();
309 } catch(NoSuchElementException e) {
310 throw(new RuntimeException("DC server sent reply without a pending command"));
316 synchronized(notls) {
317 for(NotifyListener l : notls) {
324 private void guarded() {
326 java.io.Reader r = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
327 String state = "start";
328 StringBuilder ct = new StringBuilder();
331 List<List<String>> lines = new LinkedList<List<String>>();
332 List<String> tokens = new LinkedList<String>();
338 if((i = r.read()) < 0) {
339 throw(new IOException("The server closed the connection"));
341 } catch(java.nio.channels.ClosedByInterruptException e) {
342 throw(new StopCondition(e, true));
347 if(state == "start") {
350 } else if(Character.isWhitespace(c)) {
358 } else if(state == "nl") {
360 if((code < 100) || (code >= 1000)) {
361 throw(new IOException("Illegal response code " + code + " from the server"));
364 tokens = new LinkedList<String>();
366 dispatch(new Response(code, lines));
367 lines = new LinkedList<List<String>>();
375 } else if(state == "code") {
376 if((c == '-') || Character.isWhitespace(c)) {
378 code = Integer.parseInt(ct.toString());
385 } else if(state == "token") {
386 if(Character.isWhitespace(c)) {
387 tokens.add(ct.toString());
391 } else if(c == '\\') {
393 } else if(c == '"') {
398 } else if(state == "bs") {
401 } else if(state == "cited") {
408 } else if(state == "cbs") {
412 throw(new Error("invalid state " + state));
417 } catch(Exception e) {
418 throw(new StopCondition(e, false));
425 } catch(Throwable t) {
431 public void close() {
434 } catch(IOException e) {}