Attempt to make the session database properly parallel.
[wrw.git] / wrw / session.py
CommitLineData
b409a338 1import threading, time, pickle, random, os
1f61bf31 2import cookie, env
b409a338
FT
3
4__all__ = ["db", "get"]
5
6def hexencode(str):
7 ret = ""
8 for byte in str:
9 ret += "%02X" % (ord(byte),)
10 return ret
11
12def gennonce(length):
13 nonce = ""
14 for i in xrange(length):
15 nonce += chr(random.randint(0, 255))
16 return nonce
17
18class session(object):
b65f311b 19 def __init__(self, lock, expire = 86400 * 7):
b409a338
FT
20 self.id = hexencode(gennonce(16))
21 self.dict = {}
b65f311b 22 self.lock = lock
b409a338
FT
23 self.ctime = self.atime = self.mtime = int(time.time())
24 self.expire = expire
25 self.dctl = set()
26 self.dirtyp = False
27
28 def dirty(self):
29 for d in self.dctl:
30 if d.sessdirty():
31 return True
32 return self.dirtyp
33
34 def frozen(self):
35 for d in self.dctl:
36 d.sessfrozen()
37 self.dirtyp = False
38
39 def __getitem__(self, key):
40 return self.dict[key]
41
42 def get(self, key, default = None):
43 return self.dict.get(key, default)
44
45 def __setitem__(self, key, value):
46 self.dict[key] = value
47 if hasattr(value, "sessdirty"):
48 self.dctl.add(value)
49 else:
50 self.dirtyp = True
51
52 def __delitem__(self, key):
53 old = self.dict.pop(key)
54 if old in self.dctl:
55 self.dctl.remove(old)
56 self.dirtyp = True
57
58 def __contains__(self, key):
59 return key in self.dict
60
61 def __getstate__(self):
62 ret = []
63 for k, v in self.__dict__.items():
64 if k == "lock": continue
65 ret.append((k, v))
66 return ret
67
68 def __setstate__(self, st):
69 for k, v in st:
70 self.__dict__[k] = v
b65f311b 71 # The proper lock is set by the thawer
b409a338
FT
72
73class db(object):
f84a3f10 74 def __init__(self, backdb = None, cookiename = "wrwsess", path = "/"):
b409a338
FT
75 self.live = {}
76 self.cookiename = cookiename
77 self.path = path
78 self.lock = threading.Lock()
b409a338
FT
79 self.cthread = None
80 self.freezetime = 3600
f84a3f10 81 self.backdb = backdb
b409a338
FT
82
83 def clean(self):
84 now = int(time.time())
85 with self.lock:
b65f311b
FT
86 clist = self.live.keys()
87 for sessid in clist:
88 with self.lock:
89 try:
90 entry = self.live[sessid]
91 except KeyError:
92 continue
93 with entry[0]:
94 rm = False
95 if entry[1] == "retired":
96 pass
97 elif entry[1] is None:
98 pass
99 else:
100 sess = entry[1]
101 if sess.atime + self.freezetime < now:
102 try:
103 if sess.dirty():
104 self.freeze(sess)
105 except:
106 if sess.atime + sess.expire < now:
107 rm = True
108 else:
109 rm = True
110 if rm:
111 entry[1] = "retired"
112 with self.lock:
113 del self.live[sessid]
b409a338
FT
114
115 def cleanloop(self):
116 try:
188da534 117 while True:
b409a338
FT
118 time.sleep(300)
119 self.clean()
188da534
FT
120 if len(self.live) == 0:
121 break
b409a338
FT
122 finally:
123 with self.lock:
124 self.cthread = None
125
b65f311b
FT
126 def _fetch(self, sessid):
127 while True:
128 now = int(time.time())
129 with self.lock:
130 if sessid in self.live:
131 entry = self.live[sessid]
132 else:
133 entry = self.live[sessid] = [threading.RLock(), None]
134 with entry[0]:
135 if isinstance(entry[1], session):
136 entry[1].atime = now
137 return entry[1]
138 elif entry[1] == "retired":
139 continue
140 elif entry[1] is None:
141 try:
142 thawed = self.thaw(sessid)
143 if thawed.atime + thawed.expire < now:
144 raise KeyError()
145 thawed.lock = entry[0]
146 thawed.atime = now
147 entry[1] = thawed
148 return thawed
149 finally:
150 if entry[1] is None:
151 entry[1] = "retired"
152 with self.lock:
153 del self.live[sessid]
154 else:
155 raise Exception("Illegal session entry: " + repr(entry[1]))
156
b409a338
FT
157 def fetch(self, req):
158 now = int(time.time())
b409a338 159 sessid = cookie.get(req, self.cookiename)
e70341b2 160 new = False
b409a338
FT
161 with self.lock:
162 if self.cthread is None:
163 self.cthread = threading.Thread(target = self.cleanloop)
164 self.cthread.setDaemon(True)
165 self.cthread.start()
b65f311b
FT
166 try:
167 if sessid is None:
168 raise KeyError()
169 sess = self._fetch(sessid)
170 except KeyError:
171 sess = session(threading.RLock())
172 new = True
e70341b2
FT
173
174 def ckfreeze(req):
175 if sess.dirty():
bce33109
FT
176 if new:
177 cookie.add(req, self.cookiename, sess.id, self.path)
178 with self.lock:
b65f311b 179 self.live[sess.id] = [sess.lock, sess]
e70341b2 180 try:
e70341b2
FT
181 self.freeze(sess)
182 except:
183 pass
184 req.oncommit(ckfreeze)
b409a338
FT
185 return sess
186
b409a338 187 def thaw(self, sessid):
f84a3f10
FT
188 if self.backdb is None:
189 raise KeyError()
b409a338
FT
190 data = self.backdb[sessid]
191 try:
192 return pickle.loads(data)
193 except Exception, e:
194 raise KeyError()
195
196 def freeze(self, sess):
f84a3f10
FT
197 if self.backdb is None:
198 raise TypeError()
b65f311b
FT
199 with sess.lock:
200 data = pickle.dumps(sess, -1)
201 self.backdb[sess.id] = data
b409a338
FT
202 sess.frozen()
203
f84a3f10
FT
204 def get(self, req):
205 return req.item(self.fetch)
206
b409a338
FT
207class dirback(object):
208 def __init__(self, path):
209 self.path = path
210
211 def __getitem__(self, key):
212 try:
213 with open(os.path.join(self.path, key)) as inf:
214 return inf.read()
215 except IOError:
216 raise KeyError(key)
217
218 def __setitem__(self, key, value):
219 if not os.path.exists(self.path):
220 os.makedirs(self.path)
221 with open(os.path.join(self.path, key), "w") as out:
222 out.write(value)
223
1f61bf31 224default = env.var(db(backdb = dirback(os.path.join("/tmp", "wrwsess-" + str(os.getuid())))))
b409a338
FT
225
226def get(req):
1f61bf31 227 return default.val.get(req)