b6d9677a049b65e8e12f33d4b1e683b8d57efec1
[jsvc.git] / src / dolda / jsvc / ThreadContext.java
1 package dolda.jsvc;
2
3 import dolda.jsvc.util.Misc;
4 import java.util.logging.*;
5 import java.lang.reflect.*;
6 import java.util.*;
7
8 public class ThreadContext extends ThreadGroup {
9     private Logger logger = Logger.getLogger("dolda.jsvc.context");
10     private ThreadGroup workers;
11     private long reqs = 0;
12     private final ServerContext ctx;
13     public final Responder root;
14     private int timelimit = 0;
15     private boolean forcelimit = false;
16     
17     public ThreadContext(ThreadGroup parent, String name, ServerContext ctx, Class<?> bootclass) {
18         super((parent == null)?(Thread.currentThread().getThreadGroup()):parent, name);
19         this.ctx = ctx;
20         workers = new ThreadGroup(this, "Worker threads") {
21                 public void uncaughtException(Thread t, Throwable e) {
22                     logger.log(Level.SEVERE, "Worker thread terminated with an uncaught exception", e);
23                 }
24             };
25         
26         int tl;
27         tl = Integer.parseInt(ctx.sysconfig("jsvc.timelimit", "0"));
28         if((tl > 0) && ((timelimit == 0) || (tl < timelimit)))
29             timelimit = tl;
30         tl = Integer.parseInt(ctx.libconfig("jsvc.timelimit", "0"));
31         if((tl > 0) && ((timelimit == 0) || (tl < timelimit)))
32             timelimit = tl;
33         forcelimit |= Misc.boolval(ctx.sysconfig("jsvc.forcelimit", "0"));
34         forcelimit |= Misc.boolval(ctx.libconfig("jsvc.forcelimit", "0"));
35         
36         root = bootstrap(bootclass);
37         
38         if(timelimit > 0)
39             (new WatchDog()).start();
40     }
41     
42     private class WatchDog extends Thread {
43         private Map<RequestThread, State> state = new WeakHashMap<RequestThread, State>();
44         
45         private class State {
46             String st = "running";
47             long lastkill;
48         }
49         
50         private WatchDog() {
51             super(ThreadContext.this, "Worker watchdog");
52             setDaemon(true);
53         }
54         
55         @SuppressWarnings("deprecation")
56         private long ckthread(long now, RequestThread rt) {
57             State st = state.get(rt);
58             if(st == null) {
59                 st = new State();
60                 state.put(rt, st);
61             }
62             if(st.st == "running") {
63                 if(now - rt.stime() > timelimit) {
64                     rt.interrupt();
65                     st.st = "interrupted";
66                     st.lastkill = now;
67                     return(5000);
68                 } else {
69                     return(timelimit - (now - rt.stime()));
70                 }
71             } else if((st.st == "interrupted") || (st.st == "killed")) {
72                 if(st.st == "killed")
73                     logger.log(Level.WARNING, "Thread " + rt + " refused to die; killing again");
74                 if(now - st.lastkill > 5000) {
75                     if(forcelimit)
76                         rt.stop();
77                     else
78                         rt.interrupt();
79                     st.st = "killed";
80                     st.lastkill = now;
81                 } else {
82                     return(5000 - (now - st.lastkill));
83                 }
84             }
85             return(timelimit);
86         }
87
88         public void run() {
89             try {
90                 while(true) {
91                     long next = timelimit;
92                     long now = System.currentTimeMillis();
93                     Thread[] w = new Thread[workers.activeCount() + 5];
94                     int num = workers.enumerate(w);
95                     for(int i = 0; i < num; i++) {
96                         if(w[i] instanceof RequestThread){
97                             RequestThread rt = (RequestThread)w[i];
98                             if(rt.stime() > 0) {
99                                 long n = ckthread(now, rt);
100                                 if(n < next)
101                                     next = n;
102                             }
103                         }
104                     }
105                     Thread.sleep(next);
106                 }
107             } catch(InterruptedException e) {
108             }
109         }
110     }
111     
112     public void uncaughtException(Thread t, Throwable e) {
113         logger.log(Level.SEVERE, "Service thread " + t.toString() + " terminated with an uncaught exception", e);
114     }
115     
116     public ServerContext server() {
117         return(ctx);
118     }
119     
120     public void shutdown() {
121         if(root instanceof ContextResponder)
122             ((ContextResponder)root).destroy();
123         try {
124             long last = 0;
125             while(true) {
126                 long now = System.currentTimeMillis();
127                 if(now - last > 10000) {
128                     interrupt();
129                     last = now;
130                 }
131                 Thread[] th = new Thread[1];
132                 if(enumerate(th) < 1)
133                     break;
134                 th[0].join(10000);
135             }
136         } catch(InterruptedException e) {
137             logger.log(Level.WARNING, "Interrupted while trying to shut down all service threads. Some may remain.", e);
138         }
139         destroy();
140     }
141     
142     public RequestThread respond(Request req) {
143         return(ctx.worker(root, req, workers, "Worker thread " + reqs++));
144     }
145     
146     private Responder bootstrap(final Class<?> bootclass) {
147         final Throwable[] err = new Throwable[1];
148         final Responder[] res = new Responder[1];
149         Thread boot = new Thread(this, "JSvc boot thread") {
150                 public void run() {
151                     try {
152                         Method cm = bootclass.getMethod("responder");
153                         Object resp = cm.invoke(null);
154                         if(!(resp instanceof Responder))
155                             throw(new ClassCastException("JSvc bootstrapper did not return a responder"));
156                         res[0] = (Responder)resp;
157                     } catch(NoSuchMethodException e) {
158                         logger.log(Level.SEVERE, "Invalid JSvc bootstrapper specified", e);
159                         err[0] = e;
160                     } catch(IllegalAccessException e) {
161                         logger.log(Level.SEVERE, "Invalid JSvc bootstrapper specified", e);
162                         err[0] = e;
163                     } catch(InvocationTargetException e) {
164                         logger.log(Level.SEVERE, "JSvc bootstrapper failed", e);
165                         err[0] = e;
166                     }
167                 }
168             };
169         boot.start();
170         try {
171             boot.join();
172         } catch(InterruptedException e) {
173             logger.log(Level.WARNING, "Interrupted during bootstrapping", e);
174             boot.interrupt();
175             Thread.currentThread().interrupt();
176         }
177         if(err[0] != null) {
178             destroy();
179             throw(new RuntimeException(err[0]));
180         }
181         if(res[0] == null) {
182             destroy();
183             logger.log(Level.SEVERE, "No responder returned in spite of no error having happened.");
184             throw(new NullPointerException("No responder returned in spite of no error having happened."));
185         }
186         return(res[0]);
187     }
188
189     public static ThreadContext current() {
190         for(ThreadGroup tg = Thread.currentThread().getThreadGroup(); tg != null; tg = tg.getParent()) {
191             if(tg instanceof ThreadContext)
192                 return((ThreadContext)tg);
193         }
194         return(null);
195     }
196     
197     public static class CreateException extends Exception {
198         public CreateException(String message) {
199             super(message);
200         }
201
202         public CreateException(String message, Throwable cause) {
203             super(message, cause);
204         }
205     }
206
207     public static ThreadContext create(ServerContext ctx, ClassLoader cl) throws CreateException {
208         String nm = "JSvc Service";
209         if(ctx.name() != null)
210             nm = "JSvc Service for " + ctx.name();
211         
212         String clnm = ctx.libconfig("jsvc.bootstrap", null);
213         if(clnm == null)
214             throw(new CreateException("No JSvc bootstrapper specified"));
215         Class<?> bc;
216         try {
217             bc = cl.loadClass(clnm);
218         } catch(ClassNotFoundException e) {
219             throw(new CreateException("Invalid JSvc bootstrapper specified", e));
220         }
221         return(new ThreadContext(null, nm, ctx, bc));
222     }
223 }