Added timelimits to threads.
[jsvc.git] / src / dolda / jsvc / ThreadContext.java
CommitLineData
c9837b5e
FT
1package dolda.jsvc;
2
5cdd61df 3import dolda.jsvc.util.Misc;
c9837b5e
FT
4import java.util.logging.*;
5import java.lang.reflect.*;
5cdd61df 6import java.util.*;
c9837b5e
FT
7
8public class ThreadContext extends ThreadGroup {
9 private Logger logger = Logger.getLogger("dolda.jsvc.context");
10 private ThreadGroup workers;
11 private long reqs = 0;
4b8346e1 12 private final ServerContext ctx;
c9837b5e 13 public final Responder root;
5cdd61df
FT
14 private int timelimit = 0;
15 private boolean forcelimit = false;
c9837b5e 16
4b8346e1 17 public ThreadContext(ThreadGroup parent, String name, ServerContext ctx, Class<?> bootclass) {
c9837b5e 18 super((parent == null)?(Thread.currentThread().getThreadGroup()):parent, name);
4b8346e1 19 this.ctx = ctx;
c9837b5e
FT
20 workers = new ThreadGroup(this, "Worker threads") {
21 public void uncaughtException(Thread t, Throwable e) {
22 logger.log(Level.SEVERE, "Worker thread terminated with an uncaught exception", e);
23 }
24 };
5cdd61df
FT
25
26 int tl;
27 tl = Integer.parseInt(ctx.sysconfig("jsvc.timelimit", "0"));
28 if((tl > 0) && ((timelimit == 0) || (tl < timelimit)))
29 timelimit = tl;
30 tl = Integer.parseInt(ctx.libconfig("jsvc.timelimit", "0"));
31 if((tl > 0) && ((timelimit == 0) || (tl < timelimit)))
32 timelimit = tl;
33 forcelimit |= Misc.boolval(ctx.sysconfig("jsvc.forcelimit", "0"));
34 forcelimit |= Misc.boolval(ctx.libconfig("jsvc.forcelimit", "0"));
35
c9837b5e 36 root = bootstrap(bootclass);
5cdd61df
FT
37
38 if(timelimit > 0)
39 (new WatchDog()).start();
40 }
41
42 private class WatchDog extends Thread {
43 private Map<RequestThread, State> state = new WeakHashMap<RequestThread, State>();
44
45 private class State {
46 String st = "running";
47 long lastkill;
48 }
49
50 private WatchDog() {
51 super(ThreadContext.this, "Worker watchdog");
52 setDaemon(true);
53 }
54
55 @SuppressWarnings("deprecation")
56 private long ckthread(long now, RequestThread rt) {
57 State st = state.get(rt);
58 if(st == null) {
59 st = new State();
60 state.put(rt, st);
61 }
62 if(st.st == "running") {
63 if(now - rt.stime() > timelimit) {
64 rt.interrupt();
65 st.st = "interrupted";
66 st.lastkill = now;
67 return(5000);
68 } else {
69 return(timelimit - (now - rt.stime()));
70 }
71 } else if((st.st == "interrupted") || (st.st == "killed")) {
72 if(st.st == "killed")
73 logger.log(Level.WARNING, "Thread " + rt + " refused to die; killing again");
74 if(now - st.lastkill > 5000) {
75 rt.stop();
76 st.st = "killed";
77 st.lastkill = now;
78 } else {
79 return(5000 - (now - st.lastkill));
80 }
81 }
82 return(timelimit);
83 }
84
85 public void run() {
86 try {
87 while(true) {
88 long next = timelimit;
89 long now = System.currentTimeMillis();
90 Thread[] w = new Thread[workers.activeCount() + 5];
91 int num = workers.enumerate(w);
92 for(int i = 0; i < num; i++) {
93 if(w[i] instanceof RequestThread){
94 RequestThread rt = (RequestThread)w[i];
95 if(rt.stime() > 0) {
96 long n = ckthread(now, rt);
97 if(n < next)
98 next = n;
99 }
100 }
101 }
102 Thread.sleep(next);
103 }
104 } catch(InterruptedException e) {
105 }
106 }
c9837b5e
FT
107 }
108
109 public void uncaughtException(Thread t, Throwable e) {
110 logger.log(Level.SEVERE, "Service thread " + t.toString() + " terminated with an uncaught exception", e);
111 }
112
4b8346e1
FT
113 public ServerContext server() {
114 return(ctx);
115 }
116
c9837b5e 117 public void shutdown() {
c9837b5e
FT
118 if(root instanceof ContextResponder)
119 ((ContextResponder)root).destroy();
a0b186f8
FT
120 try {
121 long last = 0;
122 while(true) {
123 long now = System.currentTimeMillis();
124 if(now - last > 10000) {
125 interrupt();
126 last = now;
127 }
128 Thread[] th = new Thread[1];
129 if(enumerate(th) < 1)
130 break;
131 th[0].join(10000);
132 }
133 } catch(InterruptedException e) {
134 logger.log(Level.WARNING, "Interrupted while trying to shut down all service threads. Some may remain.", e);
135 }
136 destroy();
c9837b5e
FT
137 }
138
139 public RequestThread respond(Request req) {
140 return(new RequestThread(root, req, workers, "Worker thread " + reqs++));
141 }
142
143 private Responder bootstrap(final Class<?> bootclass) {
144 final Throwable[] err = new Throwable[1];
145 final Responder[] res = new Responder[1];
146 Thread boot = new Thread(this, "JSvc boot thread") {
147 public void run() {
148 try {
149 Method cm = bootclass.getMethod("responder");
150 Object resp = cm.invoke(null);
151 if(!(resp instanceof Responder))
152 throw(new ClassCastException("JSvc bootstrapper did not return a responder"));
153 res[0] = (Responder)resp;
154 } catch(NoSuchMethodException e) {
155 logger.log(Level.SEVERE, "Invalid JSvc bootstrapper specified", e);
156 err[0] = e;
157 } catch(IllegalAccessException e) {
158 logger.log(Level.SEVERE, "Invalid JSvc bootstrapper specified", e);
159 err[0] = e;
160 } catch(InvocationTargetException e) {
161 logger.log(Level.SEVERE, "JSvc bootstrapper failed", e);
162 err[0] = e;
163 }
164 }
165 };
166 boot.start();
167 try {
168 boot.join();
169 } catch(InterruptedException e) {
170 logger.log(Level.WARNING, "Interrupted during bootstrapping", e);
171 boot.interrupt();
172 Thread.currentThread().interrupt();
173 }
83f55da4
FT
174 if(err[0] != null) {
175 destroy();
c9837b5e 176 throw(new RuntimeException(err[0]));
83f55da4 177 }
c9837b5e 178 if(res[0] == null) {
83f55da4 179 destroy();
c9837b5e
FT
180 logger.log(Level.SEVERE, "No responder returned in spite of no error having happened.");
181 throw(new NullPointerException("No responder returned in spite of no error having happened."));
182 }
183 return(res[0]);
184 }
4b8346e1
FT
185
186 public static ThreadContext current() {
187 for(ThreadGroup tg = Thread.currentThread().getThreadGroup(); tg != null; tg = tg.getParent()) {
188 if(tg instanceof ThreadContext)
189 return((ThreadContext)tg);
190 }
191 return(null);
192 }
c9837b5e 193}