|
1 # Test the support for SSL and sockets |
|
2 |
|
3 import sys |
|
4 import unittest |
|
5 from test import test_support |
|
6 import asyncore |
|
7 import socket |
|
8 import select |
|
9 import errno |
|
10 import subprocess |
|
11 import time |
|
12 import os |
|
13 import pprint |
|
14 import urllib, urlparse |
|
15 import shutil |
|
16 import traceback |
|
17 |
|
18 from BaseHTTPServer import HTTPServer |
|
19 from SimpleHTTPServer import SimpleHTTPRequestHandler |
|
20 |
|
21 # Optionally test SSL support, if we have it in the tested platform |
|
22 skip_expected = False |
|
23 try: |
|
24 import ssl |
|
25 except ImportError: |
|
26 skip_expected = True |
|
27 |
|
28 HOST = test_support.HOST |
|
29 CERTFILE = None |
|
30 SVN_PYTHON_ORG_ROOT_CERT = None |
|
31 |
|
32 def handle_error(prefix): |
|
33 exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) |
|
34 if test_support.verbose: |
|
35 sys.stdout.write(prefix + exc_format) |
|
36 |
|
37 def testSimpleSSLwrap(self): |
|
38 try: |
|
39 ssl.sslwrap_simple(socket.socket(socket.AF_INET)) |
|
40 except IOError, e: |
|
41 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that |
|
42 pass |
|
43 else: |
|
44 raise |
|
45 try: |
|
46 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock) |
|
47 except IOError, e: |
|
48 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that |
|
49 pass |
|
50 else: |
|
51 raise |
|
52 |
|
53 class BasicTests(unittest.TestCase): |
|
54 |
|
55 def testSSLconnect(self): |
|
56 if not test_support.is_resource_enabled('network'): |
|
57 return |
|
58 s = ssl.wrap_socket(socket.socket(socket.AF_INET), |
|
59 cert_reqs=ssl.CERT_NONE) |
|
60 s.connect(("svn.python.org", 443)) |
|
61 c = s.getpeercert() |
|
62 if c: |
|
63 raise test_support.TestFailed("Peer cert %s shouldn't be here!") |
|
64 s.close() |
|
65 |
|
66 # this should fail because we have no verification certs |
|
67 s = ssl.wrap_socket(socket.socket(socket.AF_INET), |
|
68 cert_reqs=ssl.CERT_REQUIRED) |
|
69 try: |
|
70 s.connect(("svn.python.org", 443)) |
|
71 except ssl.SSLError: |
|
72 pass |
|
73 finally: |
|
74 s.close() |
|
75 |
|
76 def testCrucialConstants(self): |
|
77 ssl.PROTOCOL_SSLv2 |
|
78 ssl.PROTOCOL_SSLv23 |
|
79 ssl.PROTOCOL_SSLv3 |
|
80 ssl.PROTOCOL_TLSv1 |
|
81 ssl.CERT_NONE |
|
82 ssl.CERT_OPTIONAL |
|
83 ssl.CERT_REQUIRED |
|
84 |
|
85 def testRAND(self): |
|
86 v = ssl.RAND_status() |
|
87 if test_support.verbose: |
|
88 sys.stdout.write("\n RAND_status is %d (%s)\n" |
|
89 % (v, (v and "sufficient randomness") or |
|
90 "insufficient randomness")) |
|
91 try: |
|
92 ssl.RAND_egd(1) |
|
93 except TypeError: |
|
94 pass |
|
95 else: |
|
96 print "didn't raise TypeError" |
|
97 ssl.RAND_add("this is a random string", 75.0) |
|
98 |
|
99 def testParseCert(self): |
|
100 # note that this uses an 'unofficial' function in _ssl.c, |
|
101 # provided solely for this test, to exercise the certificate |
|
102 # parsing code |
|
103 p = ssl._ssl._test_decode_cert(CERTFILE, False) |
|
104 if test_support.verbose: |
|
105 sys.stdout.write("\n" + pprint.pformat(p) + "\n") |
|
106 |
|
107 def testDERtoPEM(self): |
|
108 |
|
109 pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read() |
|
110 d1 = ssl.PEM_cert_to_DER_cert(pem) |
|
111 p2 = ssl.DER_cert_to_PEM_cert(d1) |
|
112 d2 = ssl.PEM_cert_to_DER_cert(p2) |
|
113 if (d1 != d2): |
|
114 raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed") |
|
115 |
|
116 class NetworkedTests(unittest.TestCase): |
|
117 |
|
118 def testConnect(self): |
|
119 s = ssl.wrap_socket(socket.socket(socket.AF_INET), |
|
120 cert_reqs=ssl.CERT_NONE) |
|
121 s.connect(("svn.python.org", 443)) |
|
122 c = s.getpeercert() |
|
123 if c: |
|
124 raise test_support.TestFailed("Peer cert %s shouldn't be here!") |
|
125 s.close() |
|
126 |
|
127 # this should fail because we have no verification certs |
|
128 s = ssl.wrap_socket(socket.socket(socket.AF_INET), |
|
129 cert_reqs=ssl.CERT_REQUIRED) |
|
130 try: |
|
131 s.connect(("svn.python.org", 443)) |
|
132 except ssl.SSLError: |
|
133 pass |
|
134 finally: |
|
135 s.close() |
|
136 |
|
137 # this should succeed because we specify the root cert |
|
138 s = ssl.wrap_socket(socket.socket(socket.AF_INET), |
|
139 cert_reqs=ssl.CERT_REQUIRED, |
|
140 ca_certs=SVN_PYTHON_ORG_ROOT_CERT) |
|
141 try: |
|
142 s.connect(("svn.python.org", 443)) |
|
143 except ssl.SSLError, x: |
|
144 raise test_support.TestFailed("Unexpected exception %s" % x) |
|
145 finally: |
|
146 s.close() |
|
147 |
|
148 |
|
149 def testNonBlockingHandshake(self): |
|
150 s = socket.socket(socket.AF_INET) |
|
151 s.connect(("svn.python.org", 443)) |
|
152 s.setblocking(False) |
|
153 s = ssl.wrap_socket(s, |
|
154 cert_reqs=ssl.CERT_NONE, |
|
155 do_handshake_on_connect=False) |
|
156 count = 0 |
|
157 while True: |
|
158 try: |
|
159 count += 1 |
|
160 s.do_handshake() |
|
161 break |
|
162 except ssl.SSLError, err: |
|
163 if err.args[0] == ssl.SSL_ERROR_WANT_READ: |
|
164 select.select([s], [], []) |
|
165 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: |
|
166 select.select([], [s], []) |
|
167 else: |
|
168 raise |
|
169 s.close() |
|
170 if test_support.verbose: |
|
171 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) |
|
172 |
|
173 def testFetchServerCert(self): |
|
174 |
|
175 pem = ssl.get_server_certificate(("svn.python.org", 443)) |
|
176 if not pem: |
|
177 raise test_support.TestFailed("No server certificate on svn.python.org:443!") |
|
178 |
|
179 try: |
|
180 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) |
|
181 except ssl.SSLError: |
|
182 #should fail |
|
183 pass |
|
184 else: |
|
185 raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem) |
|
186 |
|
187 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) |
|
188 if not pem: |
|
189 raise test_support.TestFailed("No server certificate on svn.python.org:443!") |
|
190 if test_support.verbose: |
|
191 sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) |
|
192 |
|
193 |
|
194 try: |
|
195 import threading |
|
196 except ImportError: |
|
197 _have_threads = False |
|
198 else: |
|
199 |
|
200 _have_threads = True |
|
201 |
|
202 class ThreadedEchoServer(threading.Thread): |
|
203 |
|
204 class ConnectionHandler(threading.Thread): |
|
205 |
|
206 """A mildly complicated class, because we want it to work both |
|
207 with and without the SSL wrapper around the socket connection, so |
|
208 that we can test the STARTTLS functionality.""" |
|
209 |
|
210 def __init__(self, server, connsock): |
|
211 self.server = server |
|
212 self.running = False |
|
213 self.sock = connsock |
|
214 self.sock.setblocking(1) |
|
215 self.sslconn = None |
|
216 threading.Thread.__init__(self) |
|
217 self.daemon = True |
|
218 |
|
219 def show_conn_details(self): |
|
220 if self.server.certreqs == ssl.CERT_REQUIRED: |
|
221 cert = self.sslconn.getpeercert() |
|
222 if test_support.verbose and self.server.chatty: |
|
223 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") |
|
224 cert_binary = self.sslconn.getpeercert(True) |
|
225 if test_support.verbose and self.server.chatty: |
|
226 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") |
|
227 cipher = self.sslconn.cipher() |
|
228 if test_support.verbose and self.server.chatty: |
|
229 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") |
|
230 |
|
231 def wrap_conn (self): |
|
232 try: |
|
233 self.sslconn = ssl.wrap_socket(self.sock, server_side=True, |
|
234 certfile=self.server.certificate, |
|
235 ssl_version=self.server.protocol, |
|
236 ca_certs=self.server.cacerts, |
|
237 cert_reqs=self.server.certreqs) |
|
238 except: |
|
239 if self.server.chatty: |
|
240 handle_error("\n server: bad connection attempt from " + |
|
241 str(self.sock.getpeername()) + ":\n") |
|
242 self.close() |
|
243 if not self.server.expect_bad_connects: |
|
244 # here, we want to stop the server, because this shouldn't |
|
245 # happen in the context of our test case |
|
246 self.running = False |
|
247 # normally, we'd just stop here, but for the test |
|
248 # harness, we want to stop the server |
|
249 self.server.stop() |
|
250 return False |
|
251 |
|
252 else: |
|
253 return True |
|
254 |
|
255 def read(self): |
|
256 if self.sslconn: |
|
257 return self.sslconn.read() |
|
258 else: |
|
259 return self.sock.recv(1024) |
|
260 |
|
261 def write(self, bytes): |
|
262 if self.sslconn: |
|
263 return self.sslconn.write(bytes) |
|
264 else: |
|
265 return self.sock.send(bytes) |
|
266 |
|
267 def close(self): |
|
268 if self.sslconn: |
|
269 self.sslconn.close() |
|
270 else: |
|
271 self.sock._sock.close() |
|
272 |
|
273 def run (self): |
|
274 self.running = True |
|
275 if not self.server.starttls_server: |
|
276 if isinstance(self.sock, ssl.SSLSocket): |
|
277 self.sslconn = self.sock |
|
278 elif not self.wrap_conn(): |
|
279 return |
|
280 self.show_conn_details() |
|
281 while self.running: |
|
282 try: |
|
283 msg = self.read() |
|
284 if not msg: |
|
285 # eof, so quit this handler |
|
286 self.running = False |
|
287 self.close() |
|
288 elif msg.strip() == 'over': |
|
289 if test_support.verbose and self.server.connectionchatty: |
|
290 sys.stdout.write(" server: client closed connection\n") |
|
291 self.close() |
|
292 return |
|
293 elif self.server.starttls_server and msg.strip() == 'STARTTLS': |
|
294 if test_support.verbose and self.server.connectionchatty: |
|
295 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n") |
|
296 self.write("OK\n") |
|
297 if not self.wrap_conn(): |
|
298 return |
|
299 elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS': |
|
300 if test_support.verbose and self.server.connectionchatty: |
|
301 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n") |
|
302 self.write("OK\n") |
|
303 self.sslconn.unwrap() |
|
304 self.sslconn = None |
|
305 if test_support.verbose and self.server.connectionchatty: |
|
306 sys.stdout.write(" server: connection is now unencrypted...\n") |
|
307 else: |
|
308 if (test_support.verbose and |
|
309 self.server.connectionchatty): |
|
310 ctype = (self.sslconn and "encrypted") or "unencrypted" |
|
311 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n" |
|
312 % (repr(msg), ctype, repr(msg.lower()), ctype)) |
|
313 self.write(msg.lower()) |
|
314 except ssl.SSLError: |
|
315 if self.server.chatty: |
|
316 handle_error("Test server failure:\n") |
|
317 self.close() |
|
318 self.running = False |
|
319 # normally, we'd just stop here, but for the test |
|
320 # harness, we want to stop the server |
|
321 self.server.stop() |
|
322 except: |
|
323 handle_error('') |
|
324 |
|
325 def __init__(self, certificate, ssl_version=None, |
|
326 certreqs=None, cacerts=None, expect_bad_connects=False, |
|
327 chatty=True, connectionchatty=False, starttls_server=False, |
|
328 wrap_accepting_socket=False): |
|
329 |
|
330 if ssl_version is None: |
|
331 ssl_version = ssl.PROTOCOL_TLSv1 |
|
332 if certreqs is None: |
|
333 certreqs = ssl.CERT_NONE |
|
334 self.certificate = certificate |
|
335 self.protocol = ssl_version |
|
336 self.certreqs = certreqs |
|
337 self.cacerts = cacerts |
|
338 self.expect_bad_connects = expect_bad_connects |
|
339 self.chatty = chatty |
|
340 self.connectionchatty = connectionchatty |
|
341 self.starttls_server = starttls_server |
|
342 self.sock = socket.socket() |
|
343 self.flag = None |
|
344 if wrap_accepting_socket: |
|
345 self.sock = ssl.wrap_socket(self.sock, server_side=True, |
|
346 certfile=self.certificate, |
|
347 cert_reqs = self.certreqs, |
|
348 ca_certs = self.cacerts, |
|
349 ssl_version = self.protocol) |
|
350 if test_support.verbose and self.chatty: |
|
351 sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock)) |
|
352 self.port = test_support.bind_port(self.sock) |
|
353 self.active = False |
|
354 threading.Thread.__init__(self) |
|
355 self.daemon = True |
|
356 |
|
357 def start (self, flag=None): |
|
358 self.flag = flag |
|
359 threading.Thread.start(self) |
|
360 |
|
361 def run (self): |
|
362 self.sock.settimeout(0.5) |
|
363 self.sock.listen(5) |
|
364 self.active = True |
|
365 if self.flag: |
|
366 # signal an event |
|
367 self.flag.set() |
|
368 while self.active: |
|
369 try: |
|
370 newconn, connaddr = self.sock.accept() |
|
371 if test_support.verbose and self.chatty: |
|
372 sys.stdout.write(' server: new connection from ' |
|
373 + str(connaddr) + '\n') |
|
374 handler = self.ConnectionHandler(self, newconn) |
|
375 handler.start() |
|
376 except socket.timeout: |
|
377 pass |
|
378 except KeyboardInterrupt: |
|
379 self.stop() |
|
380 except: |
|
381 if self.chatty: |
|
382 handle_error("Test server failure:\n") |
|
383 self.sock.close() |
|
384 |
|
385 def stop (self): |
|
386 self.active = False |
|
387 |
|
388 class AsyncoreEchoServer(threading.Thread): |
|
389 |
|
390 class EchoServer (asyncore.dispatcher): |
|
391 |
|
392 class ConnectionHandler (asyncore.dispatcher_with_send): |
|
393 |
|
394 def __init__(self, conn, certfile): |
|
395 asyncore.dispatcher_with_send.__init__(self, conn) |
|
396 self.socket = ssl.wrap_socket(conn, server_side=True, |
|
397 certfile=certfile, |
|
398 do_handshake_on_connect=True) |
|
399 |
|
400 def readable(self): |
|
401 if isinstance(self.socket, ssl.SSLSocket): |
|
402 while self.socket.pending() > 0: |
|
403 self.handle_read_event() |
|
404 return True |
|
405 |
|
406 def handle_read(self): |
|
407 data = self.recv(1024) |
|
408 self.send(data.lower()) |
|
409 |
|
410 def handle_close(self): |
|
411 self.close() |
|
412 if test_support.verbose: |
|
413 sys.stdout.write(" server: closed connection %s\n" % self.socket) |
|
414 |
|
415 def handle_error(self): |
|
416 raise |
|
417 |
|
418 def __init__(self, certfile): |
|
419 self.certfile = certfile |
|
420 asyncore.dispatcher.__init__(self) |
|
421 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) |
|
422 self.port = test_support.bind_port(self.socket) |
|
423 self.listen(5) |
|
424 |
|
425 def handle_accept(self): |
|
426 sock_obj, addr = self.accept() |
|
427 if test_support.verbose: |
|
428 sys.stdout.write(" server: new connection from %s:%s\n" %addr) |
|
429 self.ConnectionHandler(sock_obj, self.certfile) |
|
430 |
|
431 def handle_error(self): |
|
432 raise |
|
433 |
|
434 def __init__(self, certfile): |
|
435 self.flag = None |
|
436 self.active = False |
|
437 self.server = self.EchoServer(certfile) |
|
438 self.port = self.server.port |
|
439 threading.Thread.__init__(self) |
|
440 self.daemon = True |
|
441 |
|
442 def __str__(self): |
|
443 return "<%s %s>" % (self.__class__.__name__, self.server) |
|
444 |
|
445 def start (self, flag=None): |
|
446 self.flag = flag |
|
447 threading.Thread.start(self) |
|
448 |
|
449 def run (self): |
|
450 self.active = True |
|
451 if self.flag: |
|
452 self.flag.set() |
|
453 while self.active: |
|
454 try: |
|
455 asyncore.loop(1) |
|
456 except: |
|
457 pass |
|
458 |
|
459 def stop (self): |
|
460 self.active = False |
|
461 self.server.close() |
|
462 |
|
463 class SocketServerHTTPSServer(threading.Thread): |
|
464 |
|
465 class HTTPSServer(HTTPServer): |
|
466 |
|
467 def __init__(self, server_address, RequestHandlerClass, certfile): |
|
468 |
|
469 HTTPServer.__init__(self, server_address, RequestHandlerClass) |
|
470 # we assume the certfile contains both private key and certificate |
|
471 self.certfile = certfile |
|
472 self.active = False |
|
473 self.active_lock = threading.Lock() |
|
474 self.allow_reuse_address = True |
|
475 |
|
476 def __str__(self): |
|
477 return ('<%s %s:%s>' % |
|
478 (self.__class__.__name__, |
|
479 self.server_name, |
|
480 self.server_port)) |
|
481 |
|
482 def get_request (self): |
|
483 # override this to wrap socket with SSL |
|
484 sock, addr = self.socket.accept() |
|
485 sslconn = ssl.wrap_socket(sock, server_side=True, |
|
486 certfile=self.certfile) |
|
487 return sslconn, addr |
|
488 |
|
489 # The methods overridden below this are mainly so that we |
|
490 # can run it in a thread and be able to stop it from another |
|
491 # You probably wouldn't need them in other uses. |
|
492 |
|
493 def server_activate(self): |
|
494 # We want to run this in a thread for testing purposes, |
|
495 # so we override this to set timeout, so that we get |
|
496 # a chance to stop the server |
|
497 self.socket.settimeout(0.5) |
|
498 HTTPServer.server_activate(self) |
|
499 |
|
500 def serve_forever(self): |
|
501 # We want this to run in a thread, so we use a slightly |
|
502 # modified version of "forever". |
|
503 self.active = True |
|
504 while 1: |
|
505 try: |
|
506 # We need to lock while handling the request. |
|
507 # Another thread can close the socket after self.active |
|
508 # has been checked and before the request is handled. |
|
509 # This causes an exception when using the closed socket. |
|
510 with self.active_lock: |
|
511 if not self.active: |
|
512 break |
|
513 self.handle_request() |
|
514 except socket.timeout: |
|
515 pass |
|
516 except KeyboardInterrupt: |
|
517 self.server_close() |
|
518 return |
|
519 except: |
|
520 sys.stdout.write(''.join(traceback.format_exception(*sys.exc_info()))) |
|
521 break |
|
522 time.sleep(0.1) |
|
523 |
|
524 def server_close(self): |
|
525 # Again, we want this to run in a thread, so we need to override |
|
526 # close to clear the "active" flag, so that serve_forever() will |
|
527 # terminate. |
|
528 with self.active_lock: |
|
529 HTTPServer.server_close(self) |
|
530 self.active = False |
|
531 |
|
532 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler): |
|
533 |
|
534 # need to override translate_path to get a known root, |
|
535 # instead of using os.curdir, since the test could be |
|
536 # run from anywhere |
|
537 |
|
538 server_version = "TestHTTPS/1.0" |
|
539 |
|
540 root = None |
|
541 |
|
542 def translate_path(self, path): |
|
543 """Translate a /-separated PATH to the local filename syntax. |
|
544 |
|
545 Components that mean special things to the local file system |
|
546 (e.g. drive or directory names) are ignored. (XXX They should |
|
547 probably be diagnosed.) |
|
548 |
|
549 """ |
|
550 # abandon query parameters |
|
551 path = urlparse.urlparse(path)[2] |
|
552 path = os.path.normpath(urllib.unquote(path)) |
|
553 words = path.split('/') |
|
554 words = filter(None, words) |
|
555 path = self.root |
|
556 for word in words: |
|
557 drive, word = os.path.splitdrive(word) |
|
558 head, word = os.path.split(word) |
|
559 if word in self.root: continue |
|
560 path = os.path.join(path, word) |
|
561 return path |
|
562 |
|
563 def log_message(self, format, *args): |
|
564 |
|
565 # we override this to suppress logging unless "verbose" |
|
566 |
|
567 if test_support.verbose: |
|
568 sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" % |
|
569 (self.server.server_address, |
|
570 self.server.server_port, |
|
571 self.request.cipher(), |
|
572 self.log_date_time_string(), |
|
573 format%args)) |
|
574 |
|
575 |
|
576 def __init__(self, certfile): |
|
577 self.flag = None |
|
578 self.active = False |
|
579 self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] |
|
580 self.port = test_support.find_unused_port() |
|
581 self.server = self.HTTPSServer( |
|
582 (HOST, self.port), self.RootedHTTPRequestHandler, certfile) |
|
583 threading.Thread.__init__(self) |
|
584 self.daemon = True |
|
585 |
|
586 def __str__(self): |
|
587 return "<%s %s>" % (self.__class__.__name__, self.server) |
|
588 |
|
589 def start (self, flag=None): |
|
590 self.flag = flag |
|
591 threading.Thread.start(self) |
|
592 |
|
593 def run (self): |
|
594 self.active = True |
|
595 if self.flag: |
|
596 self.flag.set() |
|
597 self.server.serve_forever() |
|
598 self.active = False |
|
599 |
|
600 def stop (self): |
|
601 self.active = False |
|
602 self.server.server_close() |
|
603 |
|
604 |
|
605 def badCertTest (certfile): |
|
606 server = ThreadedEchoServer(CERTFILE, |
|
607 certreqs=ssl.CERT_REQUIRED, |
|
608 cacerts=CERTFILE, chatty=False) |
|
609 flag = threading.Event() |
|
610 server.start(flag) |
|
611 # wait for it to start |
|
612 flag.wait() |
|
613 # try to connect |
|
614 try: |
|
615 try: |
|
616 s = ssl.wrap_socket(socket.socket(), |
|
617 certfile=certfile, |
|
618 ssl_version=ssl.PROTOCOL_TLSv1) |
|
619 s.connect((HOST, server.port)) |
|
620 except ssl.SSLError, x: |
|
621 if test_support.verbose: |
|
622 sys.stdout.write("\nSSLError is %s\n" % x[1]) |
|
623 except socket.error, x: |
|
624 if test_support.verbose: |
|
625 sys.stdout.write("\nsocket.error is %s\n" % x[1]) |
|
626 else: |
|
627 raise test_support.TestFailed( |
|
628 "Use of invalid cert should have failed!") |
|
629 finally: |
|
630 server.stop() |
|
631 server.join() |
|
632 |
|
633 def serverParamsTest (certfile, protocol, certreqs, cacertsfile, |
|
634 client_certfile, client_protocol=None, indata="FOO\n", |
|
635 chatty=True, connectionchatty=False, |
|
636 wrap_accepting_socket=False): |
|
637 |
|
638 server = ThreadedEchoServer(certfile, |
|
639 certreqs=certreqs, |
|
640 ssl_version=protocol, |
|
641 cacerts=cacertsfile, |
|
642 chatty=chatty, |
|
643 connectionchatty=connectionchatty, |
|
644 wrap_accepting_socket=wrap_accepting_socket) |
|
645 flag = threading.Event() |
|
646 server.start(flag) |
|
647 # wait for it to start |
|
648 flag.wait() |
|
649 # try to connect |
|
650 if client_protocol is None: |
|
651 client_protocol = protocol |
|
652 try: |
|
653 try: |
|
654 s = ssl.wrap_socket(socket.socket(), |
|
655 certfile=client_certfile, |
|
656 ca_certs=cacertsfile, |
|
657 cert_reqs=certreqs, |
|
658 ssl_version=client_protocol) |
|
659 s.connect((HOST, server.port)) |
|
660 except ssl.SSLError, x: |
|
661 raise test_support.TestFailed("Unexpected SSL error: " + str(x)) |
|
662 except Exception, x: |
|
663 raise test_support.TestFailed("Unexpected exception: " + str(x)) |
|
664 else: |
|
665 if connectionchatty: |
|
666 if test_support.verbose: |
|
667 sys.stdout.write( |
|
668 " client: sending %s...\n" % (repr(indata))) |
|
669 s.write(indata) |
|
670 outdata = s.read() |
|
671 if connectionchatty: |
|
672 if test_support.verbose: |
|
673 sys.stdout.write(" client: read %s\n" % repr(outdata)) |
|
674 if outdata != indata.lower(): |
|
675 raise test_support.TestFailed( |
|
676 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" |
|
677 % (outdata[:min(len(outdata),20)], len(outdata), |
|
678 indata[:min(len(indata),20)].lower(), len(indata))) |
|
679 s.write("over\n") |
|
680 if connectionchatty: |
|
681 if test_support.verbose: |
|
682 sys.stdout.write(" client: closing connection.\n") |
|
683 s.close() |
|
684 finally: |
|
685 server.stop() |
|
686 server.join() |
|
687 |
|
688 def tryProtocolCombo (server_protocol, |
|
689 client_protocol, |
|
690 expectedToWork, |
|
691 certsreqs=None): |
|
692 |
|
693 if certsreqs is None: |
|
694 certsreqs = ssl.CERT_NONE |
|
695 |
|
696 if certsreqs == ssl.CERT_NONE: |
|
697 certtype = "CERT_NONE" |
|
698 elif certsreqs == ssl.CERT_OPTIONAL: |
|
699 certtype = "CERT_OPTIONAL" |
|
700 elif certsreqs == ssl.CERT_REQUIRED: |
|
701 certtype = "CERT_REQUIRED" |
|
702 if test_support.verbose: |
|
703 formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n" |
|
704 sys.stdout.write(formatstr % |
|
705 (ssl.get_protocol_name(client_protocol), |
|
706 ssl.get_protocol_name(server_protocol), |
|
707 certtype)) |
|
708 try: |
|
709 serverParamsTest(CERTFILE, server_protocol, certsreqs, |
|
710 CERTFILE, CERTFILE, client_protocol, chatty=False) |
|
711 except test_support.TestFailed: |
|
712 if expectedToWork: |
|
713 raise |
|
714 else: |
|
715 if not expectedToWork: |
|
716 raise test_support.TestFailed( |
|
717 "Client protocol %s succeeded with server protocol %s!" |
|
718 % (ssl.get_protocol_name(client_protocol), |
|
719 ssl.get_protocol_name(server_protocol))) |
|
720 |
|
721 |
|
722 class ThreadedTests(unittest.TestCase): |
|
723 |
|
724 def testRudeShutdown(self): |
|
725 |
|
726 listener_ready = threading.Event() |
|
727 listener_gone = threading.Event() |
|
728 port = test_support.find_unused_port() |
|
729 |
|
730 # `listener` runs in a thread. It opens a socket listening on |
|
731 # PORT, and sits in an accept() until the main thread connects. |
|
732 # Then it rudely closes the socket, and sets Event `listener_gone` |
|
733 # to let the main thread know the socket is gone. |
|
734 def listener(): |
|
735 s = socket.socket() |
|
736 s.bind((HOST, port)) |
|
737 s.listen(5) |
|
738 listener_ready.set() |
|
739 s.accept() |
|
740 s = None # reclaim the socket object, which also closes it |
|
741 listener_gone.set() |
|
742 |
|
743 def connector(): |
|
744 listener_ready.wait() |
|
745 s = socket.socket() |
|
746 s.connect((HOST, port)) |
|
747 listener_gone.wait() |
|
748 try: |
|
749 ssl_sock = ssl.wrap_socket(s) |
|
750 except IOError: |
|
751 pass |
|
752 else: |
|
753 raise test_support.TestFailed( |
|
754 'connecting to closed SSL socket should have failed') |
|
755 |
|
756 t = threading.Thread(target=listener) |
|
757 t.start() |
|
758 connector() |
|
759 t.join() |
|
760 |
|
761 def testEcho (self): |
|
762 |
|
763 if test_support.verbose: |
|
764 sys.stdout.write("\n") |
|
765 serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, |
|
766 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, |
|
767 chatty=True, connectionchatty=True) |
|
768 |
|
769 def testReadCert(self): |
|
770 |
|
771 if test_support.verbose: |
|
772 sys.stdout.write("\n") |
|
773 s2 = socket.socket() |
|
774 server = ThreadedEchoServer(CERTFILE, |
|
775 certreqs=ssl.CERT_NONE, |
|
776 ssl_version=ssl.PROTOCOL_SSLv23, |
|
777 cacerts=CERTFILE, |
|
778 chatty=False) |
|
779 flag = threading.Event() |
|
780 server.start(flag) |
|
781 # wait for it to start |
|
782 flag.wait() |
|
783 # try to connect |
|
784 try: |
|
785 try: |
|
786 s = ssl.wrap_socket(socket.socket(), |
|
787 certfile=CERTFILE, |
|
788 ca_certs=CERTFILE, |
|
789 cert_reqs=ssl.CERT_REQUIRED, |
|
790 ssl_version=ssl.PROTOCOL_SSLv23) |
|
791 s.connect((HOST, server.port)) |
|
792 except ssl.SSLError, x: |
|
793 raise test_support.TestFailed( |
|
794 "Unexpected SSL error: " + str(x)) |
|
795 except Exception, x: |
|
796 raise test_support.TestFailed( |
|
797 "Unexpected exception: " + str(x)) |
|
798 else: |
|
799 if not s: |
|
800 raise test_support.TestFailed( |
|
801 "Can't SSL-handshake with test server") |
|
802 cert = s.getpeercert() |
|
803 if not cert: |
|
804 raise test_support.TestFailed( |
|
805 "Can't get peer certificate.") |
|
806 cipher = s.cipher() |
|
807 if test_support.verbose: |
|
808 sys.stdout.write(pprint.pformat(cert) + '\n') |
|
809 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') |
|
810 if not cert.has_key('subject'): |
|
811 raise test_support.TestFailed( |
|
812 "No subject field in certificate: %s." % |
|
813 pprint.pformat(cert)) |
|
814 if ((('organizationName', 'Python Software Foundation'),) |
|
815 not in cert['subject']): |
|
816 raise test_support.TestFailed( |
|
817 "Missing or invalid 'organizationName' field in certificate subject; " |
|
818 "should be 'Python Software Foundation'.") |
|
819 s.close() |
|
820 finally: |
|
821 server.stop() |
|
822 server.join() |
|
823 |
|
824 def testNULLcert(self): |
|
825 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, |
|
826 "nullcert.pem")) |
|
827 def testMalformedCert(self): |
|
828 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, |
|
829 "badcert.pem")) |
|
830 def testWrongCert(self): |
|
831 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, |
|
832 "wrongcert.pem")) |
|
833 def testMalformedKey(self): |
|
834 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, |
|
835 "badkey.pem")) |
|
836 |
|
837 def testProtocolSSL2(self): |
|
838 if test_support.verbose: |
|
839 sys.stdout.write("\n") |
|
840 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) |
|
841 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) |
|
842 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) |
|
843 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) |
|
844 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) |
|
845 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) |
|
846 |
|
847 def testProtocolSSL23(self): |
|
848 if test_support.verbose: |
|
849 sys.stdout.write("\n") |
|
850 try: |
|
851 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) |
|
852 except test_support.TestFailed, x: |
|
853 # this fails on some older versions of OpenSSL (0.9.7l, for instance) |
|
854 if test_support.verbose: |
|
855 sys.stdout.write( |
|
856 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n" |
|
857 % str(x)) |
|
858 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True) |
|
859 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) |
|
860 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True) |
|
861 |
|
862 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) |
|
863 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) |
|
864 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) |
|
865 |
|
866 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) |
|
867 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) |
|
868 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) |
|
869 |
|
870 def testProtocolSSL3(self): |
|
871 if test_support.verbose: |
|
872 sys.stdout.write("\n") |
|
873 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) |
|
874 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) |
|
875 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) |
|
876 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) |
|
877 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False) |
|
878 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) |
|
879 |
|
880 def testProtocolTLS1(self): |
|
881 if test_support.verbose: |
|
882 sys.stdout.write("\n") |
|
883 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) |
|
884 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) |
|
885 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) |
|
886 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) |
|
887 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) |
|
888 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False) |
|
889 |
|
890 def testSTARTTLS (self): |
|
891 |
|
892 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6") |
|
893 |
|
894 server = ThreadedEchoServer(CERTFILE, |
|
895 ssl_version=ssl.PROTOCOL_TLSv1, |
|
896 starttls_server=True, |
|
897 chatty=True, |
|
898 connectionchatty=True) |
|
899 flag = threading.Event() |
|
900 server.start(flag) |
|
901 # wait for it to start |
|
902 flag.wait() |
|
903 # try to connect |
|
904 wrapped = False |
|
905 try: |
|
906 try: |
|
907 s = socket.socket() |
|
908 s.setblocking(1) |
|
909 s.connect((HOST, server.port)) |
|
910 except Exception, x: |
|
911 raise test_support.TestFailed("Unexpected exception: " + str(x)) |
|
912 else: |
|
913 if test_support.verbose: |
|
914 sys.stdout.write("\n") |
|
915 for indata in msgs: |
|
916 if test_support.verbose: |
|
917 sys.stdout.write( |
|
918 " client: sending %s...\n" % repr(indata)) |
|
919 if wrapped: |
|
920 conn.write(indata) |
|
921 outdata = conn.read() |
|
922 else: |
|
923 s.send(indata) |
|
924 outdata = s.recv(1024) |
|
925 if (indata == "STARTTLS" and |
|
926 outdata.strip().lower().startswith("ok")): |
|
927 if test_support.verbose: |
|
928 sys.stdout.write( |
|
929 " client: read %s from server, starting TLS...\n" |
|
930 % repr(outdata)) |
|
931 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) |
|
932 wrapped = True |
|
933 elif (indata == "ENDTLS" and |
|
934 outdata.strip().lower().startswith("ok")): |
|
935 if test_support.verbose: |
|
936 sys.stdout.write( |
|
937 " client: read %s from server, ending TLS...\n" |
|
938 % repr(outdata)) |
|
939 s = conn.unwrap() |
|
940 wrapped = False |
|
941 else: |
|
942 if test_support.verbose: |
|
943 sys.stdout.write( |
|
944 " client: read %s from server\n" % repr(outdata)) |
|
945 if test_support.verbose: |
|
946 sys.stdout.write(" client: closing connection.\n") |
|
947 if wrapped: |
|
948 conn.write("over\n") |
|
949 else: |
|
950 s.send("over\n") |
|
951 s.close() |
|
952 finally: |
|
953 server.stop() |
|
954 server.join() |
|
955 |
|
956 def testSocketServer(self): |
|
957 |
|
958 server = SocketServerHTTPSServer(CERTFILE) |
|
959 flag = threading.Event() |
|
960 server.start(flag) |
|
961 # wait for it to start |
|
962 flag.wait() |
|
963 # try to connect |
|
964 try: |
|
965 if test_support.verbose: |
|
966 sys.stdout.write('\n') |
|
967 d1 = open(CERTFILE, 'rb').read() |
|
968 d2 = '' |
|
969 # now fetch the same data from the HTTPS server |
|
970 url = 'https://127.0.0.1:%d/%s' % ( |
|
971 server.port, os.path.split(CERTFILE)[1]) |
|
972 f = urllib.urlopen(url) |
|
973 dlen = f.info().getheader("content-length") |
|
974 if dlen and (int(dlen) > 0): |
|
975 d2 = f.read(int(dlen)) |
|
976 if test_support.verbose: |
|
977 sys.stdout.write( |
|
978 " client: read %d bytes from remote server '%s'\n" |
|
979 % (len(d2), server)) |
|
980 f.close() |
|
981 except: |
|
982 msg = ''.join(traceback.format_exception(*sys.exc_info())) |
|
983 if test_support.verbose: |
|
984 sys.stdout.write('\n' + msg) |
|
985 raise test_support.TestFailed(msg) |
|
986 else: |
|
987 if not (d1 == d2): |
|
988 raise test_support.TestFailed( |
|
989 "Couldn't fetch data from HTTPS server") |
|
990 finally: |
|
991 server.stop() |
|
992 server.join() |
|
993 |
|
994 def testWrappedAccept (self): |
|
995 |
|
996 if test_support.verbose: |
|
997 sys.stdout.write("\n") |
|
998 serverParamsTest(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED, |
|
999 CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23, |
|
1000 chatty=True, connectionchatty=True, |
|
1001 wrap_accepting_socket=True) |
|
1002 |
|
1003 |
|
1004 def testAsyncoreServer (self): |
|
1005 |
|
1006 indata = "TEST MESSAGE of mixed case\n" |
|
1007 |
|
1008 if test_support.verbose: |
|
1009 sys.stdout.write("\n") |
|
1010 server = AsyncoreEchoServer(CERTFILE) |
|
1011 flag = threading.Event() |
|
1012 server.start(flag) |
|
1013 # wait for it to start |
|
1014 flag.wait() |
|
1015 # try to connect |
|
1016 try: |
|
1017 try: |
|
1018 s = ssl.wrap_socket(socket.socket()) |
|
1019 s.connect(('127.0.0.1', server.port)) |
|
1020 except ssl.SSLError, x: |
|
1021 raise test_support.TestFailed("Unexpected SSL error: " + str(x)) |
|
1022 except Exception, x: |
|
1023 raise test_support.TestFailed("Unexpected exception: " + str(x)) |
|
1024 else: |
|
1025 if test_support.verbose: |
|
1026 sys.stdout.write( |
|
1027 " client: sending %s...\n" % (repr(indata))) |
|
1028 s.write(indata) |
|
1029 outdata = s.read() |
|
1030 if test_support.verbose: |
|
1031 sys.stdout.write(" client: read %s\n" % repr(outdata)) |
|
1032 if outdata != indata.lower(): |
|
1033 raise test_support.TestFailed( |
|
1034 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" |
|
1035 % (outdata[:min(len(outdata),20)], len(outdata), |
|
1036 indata[:min(len(indata),20)].lower(), len(indata))) |
|
1037 s.write("over\n") |
|
1038 if test_support.verbose: |
|
1039 sys.stdout.write(" client: closing connection.\n") |
|
1040 s.close() |
|
1041 finally: |
|
1042 server.stop() |
|
1043 # wait for server thread to end |
|
1044 server.join() |
|
1045 |
|
1046 |
|
1047 def testAllRecvAndSendMethods(self): |
|
1048 |
|
1049 if test_support.verbose: |
|
1050 sys.stdout.write("\n") |
|
1051 |
|
1052 server = ThreadedEchoServer(CERTFILE, |
|
1053 certreqs=ssl.CERT_NONE, |
|
1054 ssl_version=ssl.PROTOCOL_TLSv1, |
|
1055 cacerts=CERTFILE, |
|
1056 chatty=True, |
|
1057 connectionchatty=False) |
|
1058 flag = threading.Event() |
|
1059 server.start(flag) |
|
1060 # wait for it to start |
|
1061 flag.wait() |
|
1062 # try to connect |
|
1063 try: |
|
1064 s = ssl.wrap_socket(socket.socket(), |
|
1065 server_side=False, |
|
1066 certfile=CERTFILE, |
|
1067 ca_certs=CERTFILE, |
|
1068 cert_reqs=ssl.CERT_NONE, |
|
1069 ssl_version=ssl.PROTOCOL_TLSv1) |
|
1070 s.connect((HOST, server.port)) |
|
1071 except ssl.SSLError as x: |
|
1072 raise support.TestFailed("Unexpected SSL error: " + str(x)) |
|
1073 except Exception as x: |
|
1074 raise support.TestFailed("Unexpected exception: " + str(x)) |
|
1075 else: |
|
1076 # helper methods for standardising recv* method signatures |
|
1077 def _recv_into(): |
|
1078 b = bytearray("\0"*100) |
|
1079 count = s.recv_into(b) |
|
1080 return b[:count] |
|
1081 |
|
1082 def _recvfrom_into(): |
|
1083 b = bytearray("\0"*100) |
|
1084 count, addr = s.recvfrom_into(b) |
|
1085 return b[:count] |
|
1086 |
|
1087 # (name, method, whether to expect success, *args) |
|
1088 send_methods = [ |
|
1089 ('send', s.send, True, []), |
|
1090 ('sendto', s.sendto, False, ["some.address"]), |
|
1091 ('sendall', s.sendall, True, []), |
|
1092 ] |
|
1093 recv_methods = [ |
|
1094 ('recv', s.recv, True, []), |
|
1095 ('recvfrom', s.recvfrom, False, ["some.address"]), |
|
1096 ('recv_into', _recv_into, True, []), |
|
1097 ('recvfrom_into', _recvfrom_into, False, []), |
|
1098 ] |
|
1099 data_prefix = u"PREFIX_" |
|
1100 |
|
1101 for meth_name, send_meth, expect_success, args in send_methods: |
|
1102 indata = data_prefix + meth_name |
|
1103 try: |
|
1104 send_meth(indata.encode('ASCII', 'strict'), *args) |
|
1105 outdata = s.read() |
|
1106 outdata = outdata.decode('ASCII', 'strict') |
|
1107 if outdata != indata.lower(): |
|
1108 raise support.TestFailed( |
|
1109 "While sending with <<%s>> bad data " |
|
1110 "<<%r>> (%d) received; " |
|
1111 "expected <<%r>> (%d)\n" % ( |
|
1112 meth_name, outdata[:20], len(outdata), |
|
1113 indata[:20], len(indata) |
|
1114 ) |
|
1115 ) |
|
1116 except ValueError as e: |
|
1117 if expect_success: |
|
1118 raise support.TestFailed( |
|
1119 "Failed to send with method <<%s>>; " |
|
1120 "expected to succeed.\n" % (meth_name,) |
|
1121 ) |
|
1122 if not str(e).startswith(meth_name): |
|
1123 raise support.TestFailed( |
|
1124 "Method <<%s>> failed with unexpected " |
|
1125 "exception message: %s\n" % ( |
|
1126 meth_name, e |
|
1127 ) |
|
1128 ) |
|
1129 |
|
1130 for meth_name, recv_meth, expect_success, args in recv_methods: |
|
1131 indata = data_prefix + meth_name |
|
1132 try: |
|
1133 s.send(indata.encode('ASCII', 'strict')) |
|
1134 outdata = recv_meth(*args) |
|
1135 outdata = outdata.decode('ASCII', 'strict') |
|
1136 if outdata != indata.lower(): |
|
1137 raise support.TestFailed( |
|
1138 "While receiving with <<%s>> bad data " |
|
1139 "<<%r>> (%d) received; " |
|
1140 "expected <<%r>> (%d)\n" % ( |
|
1141 meth_name, outdata[:20], len(outdata), |
|
1142 indata[:20], len(indata) |
|
1143 ) |
|
1144 ) |
|
1145 except ValueError as e: |
|
1146 if expect_success: |
|
1147 raise support.TestFailed( |
|
1148 "Failed to receive with method <<%s>>; " |
|
1149 "expected to succeed.\n" % (meth_name,) |
|
1150 ) |
|
1151 if not str(e).startswith(meth_name): |
|
1152 raise support.TestFailed( |
|
1153 "Method <<%s>> failed with unexpected " |
|
1154 "exception message: %s\n" % ( |
|
1155 meth_name, e |
|
1156 ) |
|
1157 ) |
|
1158 # consume data |
|
1159 s.read() |
|
1160 |
|
1161 s.write("over\n".encode("ASCII", "strict")) |
|
1162 s.close() |
|
1163 finally: |
|
1164 server.stop() |
|
1165 server.join() |
|
1166 |
|
1167 |
|
1168 def test_main(verbose=False): |
|
1169 if skip_expected: |
|
1170 raise test_support.TestSkipped("No SSL support") |
|
1171 |
|
1172 global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT |
|
1173 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, |
|
1174 "keycert.pem") |
|
1175 SVN_PYTHON_ORG_ROOT_CERT = os.path.join( |
|
1176 os.path.dirname(__file__) or os.curdir, |
|
1177 "https_svn_python_org_root.pem") |
|
1178 |
|
1179 if (not os.path.exists(CERTFILE) or |
|
1180 not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)): |
|
1181 raise test_support.TestFailed("Can't read certificate files!") |
|
1182 |
|
1183 TESTPORT = test_support.find_unused_port() |
|
1184 if not TESTPORT: |
|
1185 raise test_support.TestFailed("Can't find open port to test servers on!") |
|
1186 |
|
1187 tests = [BasicTests] |
|
1188 |
|
1189 if test_support.is_resource_enabled('network'): |
|
1190 tests.append(NetworkedTests) |
|
1191 |
|
1192 if _have_threads: |
|
1193 thread_info = test_support.threading_setup() |
|
1194 if thread_info and test_support.is_resource_enabled('network'): |
|
1195 tests.append(ThreadedTests) |
|
1196 |
|
1197 test_support.run_unittest(*tests) |
|
1198 |
|
1199 if _have_threads: |
|
1200 test_support.threading_cleanup(*thread_info) |
|
1201 |
|
1202 if __name__ == "__main__": |
|
1203 test_main() |