Package cherrypy :: Package test :: Module test_session
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.test_session

  1  import os 
  2  localDir = os.path.dirname(__file__) 
  3  import sys 
  4  import threading 
  5  import time 
  6   
  7  import cherrypy 
  8  from cherrypy._cpcompat import copykeys, HTTPConnection, HTTPSConnection 
  9  from cherrypy.lib import sessions 
 10  from cherrypy.lib.httputil import response_codes 
 11   
12 -def http_methods_allowed(methods=['GET', 'HEAD']):
13 method = cherrypy.request.method.upper() 14 if method not in methods: 15 cherrypy.response.headers['Allow'] = ", ".join(methods) 16 raise cherrypy.HTTPError(405)
17 18 cherrypy.tools.allow = cherrypy.Tool('on_start_resource', http_methods_allowed) 19 20
21 -def setup_server():
22 23 class Root: 24 25 _cp_config = {'tools.sessions.on': True, 26 'tools.sessions.storage_type' : 'ram', 27 'tools.sessions.storage_path' : localDir, 28 'tools.sessions.timeout': (1.0 / 60), 29 'tools.sessions.clean_freq': (1.0 / 60), 30 } 31 32 def clear(self): 33 cherrypy.session.cache.clear()
34 clear.exposed = True 35 36 def data(self): 37 cherrypy.session['aha'] = 'foo' 38 return repr(cherrypy.session._data) 39 data.exposed = True 40 41 def testGen(self): 42 counter = cherrypy.session.get('counter', 0) + 1 43 cherrypy.session['counter'] = counter 44 yield str(counter) 45 testGen.exposed = True 46 47 def testStr(self): 48 counter = cherrypy.session.get('counter', 0) + 1 49 cherrypy.session['counter'] = counter 50 return str(counter) 51 testStr.exposed = True 52 53 def setsessiontype(self, newtype): 54 self.__class__._cp_config.update({'tools.sessions.storage_type': newtype}) 55 if hasattr(cherrypy, "session"): 56 del cherrypy.session 57 cls = getattr(sessions, newtype.title() + 'Session') 58 if cls.clean_thread: 59 cls.clean_thread.stop() 60 cls.clean_thread.unsubscribe() 61 del cls.clean_thread 62 setsessiontype.exposed = True 63 setsessiontype._cp_config = {'tools.sessions.on': False} 64 65 def index(self): 66 sess = cherrypy.session 67 c = sess.get('counter', 0) + 1 68 time.sleep(0.01) 69 sess['counter'] = c 70 return str(c) 71 index.exposed = True 72 73 def keyin(self, key): 74 return str(key in cherrypy.session) 75 keyin.exposed = True 76 77 def delete(self): 78 cherrypy.session.delete() 79 sessions.expire() 80 return "done" 81 delete.exposed = True 82 83 def delkey(self, key): 84 del cherrypy.session[key] 85 return "OK" 86 delkey.exposed = True 87 88 def blah(self): 89 return self._cp_config['tools.sessions.storage_type'] 90 blah.exposed = True 91 92 def iredir(self): 93 raise cherrypy.InternalRedirect('/blah') 94 iredir.exposed = True 95 96 def restricted(self): 97 return cherrypy.request.method 98 restricted.exposed = True 99 restricted._cp_config = {'tools.allow.on': True, 100 'tools.allow.methods': ['GET']} 101 102 def regen(self): 103 cherrypy.tools.sessions.regenerate() 104 return "logged in" 105 regen.exposed = True 106 107 def length(self): 108 return str(len(cherrypy.session)) 109 length.exposed = True 110 111 def session_cookie(self): 112 # Must load() to start the clean thread. 113 cherrypy.session.load() 114 return cherrypy.session.id 115 session_cookie.exposed = True 116 session_cookie._cp_config = { 117 'tools.sessions.path': '/session_cookie', 118 'tools.sessions.name': 'temp', 119 'tools.sessions.persistent': False} 120 121 cherrypy.tree.mount(Root()) 122 123 124 from cherrypy.test import helper 125
126 -class SessionTest(helper.CPWebCase):
127 setup_server = staticmethod(setup_server) 128
129 - def tearDown(self):
130 # Clean up sessions. 131 for fname in os.listdir(localDir): 132 if fname.startswith(sessions.FileSession.SESSION_PREFIX): 133 os.unlink(os.path.join(localDir, fname))
134
135 - def test_0_Session(self):
136 self.getPage('/setsessiontype/ram') 137 self.getPage('/clear') 138 139 # Test that a normal request gets the same id in the cookies. 140 # Note: this wouldn't work if /data didn't load the session. 141 self.getPage('/data') 142 self.assertBody("{'aha': 'foo'}") 143 c = self.cookies[0] 144 self.getPage('/data', self.cookies) 145 self.assertEqual(self.cookies[0], c) 146 147 self.getPage('/testStr') 148 self.assertBody('1') 149 cookie_parts = dict([p.strip().split('=') 150 for p in self.cookies[0][1].split(";")]) 151 # Assert there is an 'expires' param 152 self.assertEqual(set(cookie_parts.keys()), 153 set(['session_id', 'expires', 'Path'])) 154 self.getPage('/testGen', self.cookies) 155 self.assertBody('2') 156 self.getPage('/testStr', self.cookies) 157 self.assertBody('3') 158 self.getPage('/data', self.cookies) 159 self.assertBody("{'aha': 'foo', 'counter': 3}") 160 self.getPage('/length', self.cookies) 161 self.assertBody('2') 162 self.getPage('/delkey?key=counter', self.cookies) 163 self.assertStatus(200) 164 165 self.getPage('/setsessiontype/file') 166 self.getPage('/testStr') 167 self.assertBody('1') 168 self.getPage('/testGen', self.cookies) 169 self.assertBody('2') 170 self.getPage('/testStr', self.cookies) 171 self.assertBody('3') 172 self.getPage('/delkey?key=counter', self.cookies) 173 self.assertStatus(200) 174 175 # Wait for the session.timeout (1 second) 176 time.sleep(2) 177 self.getPage('/') 178 self.assertBody('1') 179 self.getPage('/length', self.cookies) 180 self.assertBody('1') 181 182 # Test session __contains__ 183 self.getPage('/keyin?key=counter', self.cookies) 184 self.assertBody("True") 185 cookieset1 = self.cookies 186 187 # Make a new session and test __len__ again 188 self.getPage('/') 189 self.getPage('/length', self.cookies) 190 self.assertBody('2') 191 192 # Test session delete 193 self.getPage('/delete', self.cookies) 194 self.assertBody("done") 195 self.getPage('/delete', cookieset1) 196 self.assertBody("done") 197 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')] 198 self.assertEqual(f(), []) 199 200 # Wait for the cleanup thread to delete remaining session files 201 self.getPage('/') 202 f = lambda: [x for x in os.listdir(localDir) if x.startswith('session-')] 203 self.assertNotEqual(f(), []) 204 time.sleep(2) 205 self.assertEqual(f(), [])
206
207 - def test_1_Ram_Concurrency(self):
208 self.getPage('/setsessiontype/ram') 209 self._test_Concurrency()
210
211 - def test_2_File_Concurrency(self):
212 self.getPage('/setsessiontype/file') 213 self._test_Concurrency()
214
215 - def _test_Concurrency(self):
216 client_thread_count = 5 217 request_count = 30 218 219 # Get initial cookie 220 self.getPage("/") 221 self.assertBody("1") 222 cookies = self.cookies 223 224 data_dict = {} 225 errors = [] 226 227 def request(index): 228 if self.scheme == 'https': 229 c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT)) 230 else: 231 c = HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 232 for i in range(request_count): 233 c.putrequest('GET', '/') 234 for k, v in cookies: 235 c.putheader(k, v) 236 c.endheaders() 237 response = c.getresponse() 238 body = response.read() 239 if response.status != 200 or not body.isdigit(): 240 errors.append((response.status, body)) 241 else: 242 data_dict[index] = max(data_dict[index], int(body))
243 # Uncomment the following line to prove threads overlap. 244 ## sys.stdout.write("%d " % index) 245 246 # Start <request_count> requests from each of 247 # <client_thread_count> concurrent clients 248 ts = [] 249 for c in range(client_thread_count): 250 data_dict[c] = 0 251 t = threading.Thread(target=request, args=(c,)) 252 ts.append(t) 253 t.start() 254 255 for t in ts: 256 t.join() 257 258 hitcount = max(data_dict.values()) 259 expected = 1 + (client_thread_count * request_count) 260 261 for e in errors: 262 print(e) 263 self.assertEqual(hitcount, expected)
264
265 - def test_3_Redirect(self):
266 # Start a new session 267 self.getPage('/testStr') 268 self.getPage('/iredir', self.cookies) 269 self.assertBody("file")
270
271 - def test_4_File_deletion(self):
272 # Start a new session 273 self.getPage('/testStr') 274 # Delete the session file manually and retry. 275 id = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 276 path = os.path.join(localDir, "session-" + id) 277 os.unlink(path) 278 self.getPage('/testStr', self.cookies)
279
280 - def test_5_Error_paths(self):
281 self.getPage('/unknown/page') 282 self.assertErrorPage(404, "The path '/unknown/page' was not found.") 283 284 # Note: this path is *not* the same as above. The above 285 # takes a normal route through the session code; this one 286 # skips the session code's before_handler and only calls 287 # before_finalize (save) and on_end (close). So the session 288 # code has to survive calling save/close without init. 289 self.getPage('/restricted', self.cookies, method='POST') 290 self.assertErrorPage(405, response_codes[405][1])
291
292 - def test_6_regenerate(self):
293 self.getPage('/testStr') 294 # grab the cookie ID 295 id1 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 296 self.getPage('/regen') 297 self.assertBody('logged in') 298 id2 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 299 self.assertNotEqual(id1, id2) 300 301 self.getPage('/testStr') 302 # grab the cookie ID 303 id1 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 304 self.getPage('/testStr', 305 headers=[('Cookie', 306 'session_id=maliciousid; ' 307 'expires=Sat, 27 Oct 2017 04:18:28 GMT; Path=/;')]) 308 id2 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1] 309 self.assertNotEqual(id1, id2) 310 self.assertNotEqual(id2, 'maliciousid')
311
312 - def test_7_session_cookies(self):
313 self.getPage('/setsessiontype/ram') 314 self.getPage('/clear') 315 self.getPage('/session_cookie') 316 # grab the cookie ID 317 cookie_parts = dict([p.strip().split('=') for p in self.cookies[0][1].split(";")]) 318 # Assert there is no 'expires' param 319 self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path'])) 320 id1 = cookie_parts['temp'] 321 self.assertEqual(copykeys(sessions.RamSession.cache), [id1]) 322 323 # Send another request in the same "browser session". 324 self.getPage('/session_cookie', self.cookies) 325 cookie_parts = dict([p.strip().split('=') for p in self.cookies[0][1].split(";")]) 326 # Assert there is no 'expires' param 327 self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path'])) 328 self.assertBody(id1) 329 self.assertEqual(copykeys(sessions.RamSession.cache), [id1]) 330 331 # Simulate a browser close by just not sending the cookies 332 self.getPage('/session_cookie') 333 # grab the cookie ID 334 cookie_parts = dict([p.strip().split('=') for p in self.cookies[0][1].split(";")]) 335 # Assert there is no 'expires' param 336 self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path'])) 337 # Assert a new id has been generated... 338 id2 = cookie_parts['temp'] 339 self.assertNotEqual(id1, id2) 340 self.assertEqual(set(sessions.RamSession.cache.keys()), set([id1, id2])) 341 342 # Wait for the session.timeout on both sessions 343 time.sleep(2.5) 344 cache = copykeys(sessions.RamSession.cache) 345 if cache: 346 if cache == [id2]: 347 self.fail("The second session did not time out.") 348 else: 349 self.fail("Unknown session id in cache: %r", cache)
350 351 352 import socket 353 try: 354 import memcache 355 356 host, port = '127.0.0.1', 11211 357 for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, 358 socket.SOCK_STREAM): 359 af, socktype, proto, canonname, sa = res 360 s = None 361 try: 362 s = socket.socket(af, socktype, proto) 363 # See http://groups.google.com/group/cherrypy-users/ 364 # browse_frm/thread/bbfe5eb39c904fe0 365 s.settimeout(1.0) 366 s.connect((host, port)) 367 s.close() 368 except socket.error: 369 if s: 370 s.close() 371 raise 372 break 373 except (ImportError, socket.error):
374 - class MemcachedSessionTest(helper.CPWebCase):
375 setup_server = staticmethod(setup_server) 376
377 - def test(self):
378 return self.skip("memcached not reachable ")
379 else:
380 - class MemcachedSessionTest(helper.CPWebCase):
381 setup_server = staticmethod(setup_server) 382
383 - def test_0_Session(self):
384 self.getPage('/setsessiontype/memcached') 385 386 self.getPage('/testStr') 387 self.assertBody('1') 388 self.getPage('/testGen', self.cookies) 389 self.assertBody('2') 390 self.getPage('/testStr', self.cookies) 391 self.assertBody('3') 392 self.getPage('/length', self.cookies) 393 self.assertErrorPage(500) 394 self.assertInBody("NotImplementedError") 395 self.getPage('/delkey?key=counter', self.cookies) 396 self.assertStatus(200) 397 398 # Wait for the session.timeout (1 second) 399 time.sleep(1.25) 400 self.getPage('/') 401 self.assertBody('1') 402 403 # Test session __contains__ 404 self.getPage('/keyin?key=counter', self.cookies) 405 self.assertBody("True") 406 407 # Test session delete 408 self.getPage('/delete', self.cookies) 409 self.assertBody("done")
410
411 - def test_1_Concurrency(self):
412 client_thread_count = 5 413 request_count = 30 414 415 # Get initial cookie 416 self.getPage("/") 417 self.assertBody("1") 418 cookies = self.cookies 419 420 data_dict = {} 421 422 def request(index): 423 for i in range(request_count): 424 self.getPage("/", cookies) 425 # Uncomment the following line to prove threads overlap. 426 ## sys.stdout.write("%d " % index) 427 if not self.body.isdigit(): 428 self.fail(self.body) 429 data_dict[index] = v = int(self.body)
430 431 # Start <request_count> concurrent requests from 432 # each of <client_thread_count> clients 433 ts = [] 434 for c in range(client_thread_count): 435 data_dict[c] = 0 436 t = threading.Thread(target=request, args=(c,)) 437 ts.append(t) 438 t.start() 439 440 for t in ts: 441 t.join() 442 443 hitcount = max(data_dict.values()) 444 expected = 1 + (client_thread_count * request_count) 445 self.assertEqual(hitcount, expected)
446
447 - def test_3_Redirect(self):
448 # Start a new session 449 self.getPage('/testStr') 450 self.getPage('/iredir', self.cookies) 451 self.assertBody("memcached")
452
453 - def test_5_Error_paths(self):
454 self.getPage('/unknown/page') 455 self.assertErrorPage(404, "The path '/unknown/page' was not found.") 456 457 # Note: this path is *not* the same as above. The above 458 # takes a normal route through the session code; this one 459 # skips the session code's before_handler and only calls 460 # before_finalize (save) and on_end (close). So the session 461 # code has to survive calling save/close without init. 462 self.getPage('/restricted', self.cookies, method='POST') 463 self.assertErrorPage(405, response_codes[405][1])
464