symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_ssl.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 # Test the support for SSL and sockets
       
     2 
       
     3 import sys
       
     4 import unittest
       
     5 from test import test_support
       
     6 import asyncore
       
     7 import socket
       
     8 import select
       
     9 import errno
       
    10 import subprocess
       
    11 import time
       
    12 import os
       
    13 import pprint
       
    14 import urllib, urlparse
       
    15 import shutil
       
    16 import traceback
       
    17 
       
    18 from BaseHTTPServer import HTTPServer
       
    19 from SimpleHTTPServer import SimpleHTTPRequestHandler
       
    20 
       
    21 # Optionally test SSL support, if we have it in the tested platform
       
    22 skip_expected = False
       
    23 try:
       
    24     import ssl
       
    25 except ImportError:
       
    26     skip_expected = True
       
    27 
       
    28 HOST = test_support.HOST
       
    29 CERTFILE = None
       
    30 SVN_PYTHON_ORG_ROOT_CERT = None
       
    31 
       
    32 def handle_error(prefix):
       
    33     exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
       
    34     if test_support.verbose:
       
    35         sys.stdout.write(prefix + exc_format)
       
    36 
       
    37     def testSimpleSSLwrap(self):
       
    38         try:
       
    39             ssl.sslwrap_simple(socket.socket(socket.AF_INET))
       
    40         except IOError, e:
       
    41             if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
       
    42                 pass
       
    43             else:
       
    44                 raise
       
    45         try:
       
    46             ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
       
    47         except IOError, e:
       
    48             if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
       
    49                 pass
       
    50             else:
       
    51                 raise
       
    52 
       
    53 class BasicTests(unittest.TestCase):
       
    54 
       
    55     def testSSLconnect(self):
       
    56         if not test_support.is_resource_enabled('network'):
       
    57             return
       
    58         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
       
    59                             cert_reqs=ssl.CERT_NONE)
       
    60         s.connect(("svn.python.org", 443))
       
    61         c = s.getpeercert()
       
    62         if c:
       
    63             raise test_support.TestFailed("Peer cert %s shouldn't be here!")
       
    64         s.close()
       
    65 
       
    66         # this should fail because we have no verification certs
       
    67         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
       
    68                             cert_reqs=ssl.CERT_REQUIRED)
       
    69         try:
       
    70             s.connect(("svn.python.org", 443))
       
    71         except ssl.SSLError:
       
    72             pass
       
    73         finally:
       
    74             s.close()
       
    75 
       
    76     def testCrucialConstants(self):
       
    77         ssl.PROTOCOL_SSLv2
       
    78         ssl.PROTOCOL_SSLv23
       
    79         ssl.PROTOCOL_SSLv3
       
    80         ssl.PROTOCOL_TLSv1
       
    81         ssl.CERT_NONE
       
    82         ssl.CERT_OPTIONAL
       
    83         ssl.CERT_REQUIRED
       
    84 
       
    85     def testRAND(self):
       
    86         v = ssl.RAND_status()
       
    87         if test_support.verbose:
       
    88             sys.stdout.write("\n RAND_status is %d (%s)\n"
       
    89                              % (v, (v and "sufficient randomness") or
       
    90                                 "insufficient randomness"))
       
    91         try:
       
    92             ssl.RAND_egd(1)
       
    93         except TypeError:
       
    94             pass
       
    95         else:
       
    96             print "didn't raise TypeError"
       
    97         ssl.RAND_add("this is a random string", 75.0)
       
    98 
       
    99     def testParseCert(self):
       
   100         # note that this uses an 'unofficial' function in _ssl.c,
       
   101         # provided solely for this test, to exercise the certificate
       
   102         # parsing code
       
   103         p = ssl._ssl._test_decode_cert(CERTFILE, False)
       
   104         if test_support.verbose:
       
   105             sys.stdout.write("\n" + pprint.pformat(p) + "\n")
       
   106 
       
   107     def testDERtoPEM(self):
       
   108 
       
   109         pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read()
       
   110         d1 = ssl.PEM_cert_to_DER_cert(pem)
       
   111         p2 = ssl.DER_cert_to_PEM_cert(d1)
       
   112         d2 = ssl.PEM_cert_to_DER_cert(p2)
       
   113         if (d1 != d2):
       
   114             raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
       
   115 
       
   116 class NetworkedTests(unittest.TestCase):
       
   117 
       
   118     def testConnect(self):
       
   119         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
       
   120                             cert_reqs=ssl.CERT_NONE)
       
   121         s.connect(("svn.python.org", 443))
       
   122         c = s.getpeercert()
       
   123         if c:
       
   124             raise test_support.TestFailed("Peer cert %s shouldn't be here!")
       
   125         s.close()
       
   126 
       
   127         # this should fail because we have no verification certs
       
   128         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
       
   129                             cert_reqs=ssl.CERT_REQUIRED)
       
   130         try:
       
   131             s.connect(("svn.python.org", 443))
       
   132         except ssl.SSLError:
       
   133             pass
       
   134         finally:
       
   135             s.close()
       
   136 
       
   137         # this should succeed because we specify the root cert
       
   138         s = ssl.wrap_socket(socket.socket(socket.AF_INET),
       
   139                             cert_reqs=ssl.CERT_REQUIRED,
       
   140                             ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
       
   141         try:
       
   142             s.connect(("svn.python.org", 443))
       
   143         except ssl.SSLError, x:
       
   144             raise test_support.TestFailed("Unexpected exception %s" % x)
       
   145         finally:
       
   146             s.close()
       
   147 
       
   148 
       
   149     def testNonBlockingHandshake(self):
       
   150         s = socket.socket(socket.AF_INET)
       
   151         s.connect(("svn.python.org", 443))
       
   152         s.setblocking(False)
       
   153         s = ssl.wrap_socket(s,
       
   154                             cert_reqs=ssl.CERT_NONE,
       
   155                             do_handshake_on_connect=False)
       
   156         count = 0
       
   157         while True:
       
   158             try:
       
   159                 count += 1
       
   160                 s.do_handshake()
       
   161                 break
       
   162             except ssl.SSLError, err:
       
   163                 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
       
   164                     select.select([s], [], [])
       
   165                 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
       
   166                     select.select([], [s], [])
       
   167                 else:
       
   168                     raise
       
   169         s.close()
       
   170         if test_support.verbose:
       
   171             sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
       
   172 
       
   173     def testFetchServerCert(self):
       
   174 
       
   175         pem = ssl.get_server_certificate(("svn.python.org", 443))
       
   176         if not pem:
       
   177             raise test_support.TestFailed("No server certificate on svn.python.org:443!")
       
   178 
       
   179         try:
       
   180             pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
       
   181         except ssl.SSLError:
       
   182             #should fail
       
   183             pass
       
   184         else:
       
   185             raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
       
   186 
       
   187         pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
       
   188         if not pem:
       
   189             raise test_support.TestFailed("No server certificate on svn.python.org:443!")
       
   190         if test_support.verbose:
       
   191             sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
       
   192 
       
   193 
       
   194 try:
       
   195     import threading
       
   196 except ImportError:
       
   197     _have_threads = False
       
   198 else:
       
   199 
       
   200     _have_threads = True
       
   201 
       
   202     class ThreadedEchoServer(threading.Thread):
       
   203 
       
   204         class ConnectionHandler(threading.Thread):
       
   205 
       
   206             """A mildly complicated class, because we want it to work both
       
   207             with and without the SSL wrapper around the socket connection, so
       
   208             that we can test the STARTTLS functionality."""
       
   209 
       
   210             def __init__(self, server, connsock):
       
   211                 self.server = server
       
   212                 self.running = False
       
   213                 self.sock = connsock
       
   214                 self.sock.setblocking(1)
       
   215                 self.sslconn = None
       
   216                 threading.Thread.__init__(self)
       
   217                 self.daemon = True
       
   218 
       
   219             def show_conn_details(self):
       
   220                 if self.server.certreqs == ssl.CERT_REQUIRED:
       
   221                     cert = self.sslconn.getpeercert()
       
   222                     if test_support.verbose and self.server.chatty:
       
   223                         sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
       
   224                     cert_binary = self.sslconn.getpeercert(True)
       
   225                     if test_support.verbose and self.server.chatty:
       
   226                         sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
       
   227                 cipher = self.sslconn.cipher()
       
   228                 if test_support.verbose and self.server.chatty:
       
   229                     sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
       
   230 
       
   231             def wrap_conn (self):
       
   232                 try:
       
   233                     self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
       
   234                                                    certfile=self.server.certificate,
       
   235                                                    ssl_version=self.server.protocol,
       
   236                                                    ca_certs=self.server.cacerts,
       
   237                                                    cert_reqs=self.server.certreqs)
       
   238                 except:
       
   239                     if self.server.chatty:
       
   240                         handle_error("\n server:  bad connection attempt from " +
       
   241                                      str(self.sock.getpeername()) + ":\n")
       
   242                     self.close()
       
   243                     if not self.server.expect_bad_connects:
       
   244                         # here, we want to stop the server, because this shouldn't
       
   245                         # happen in the context of our test case
       
   246                         self.running = False
       
   247                         # normally, we'd just stop here, but for the test
       
   248                         # harness, we want to stop the server
       
   249                         self.server.stop()
       
   250                     return False
       
   251 
       
   252                 else:
       
   253                     return True
       
   254 
       
   255             def read(self):
       
   256                 if self.sslconn:
       
   257                     return self.sslconn.read()
       
   258                 else:
       
   259                     return self.sock.recv(1024)
       
   260 
       
   261             def write(self, bytes):
       
   262                 if self.sslconn:
       
   263                     return self.sslconn.write(bytes)
       
   264                 else:
       
   265                     return self.sock.send(bytes)
       
   266 
       
   267             def close(self):
       
   268                 if self.sslconn:
       
   269                     self.sslconn.close()
       
   270                 else:
       
   271                     self.sock._sock.close()
       
   272 
       
   273             def run (self):
       
   274                 self.running = True
       
   275                 if not self.server.starttls_server:
       
   276                     if isinstance(self.sock, ssl.SSLSocket):
       
   277                         self.sslconn = self.sock
       
   278                     elif not self.wrap_conn():
       
   279                         return
       
   280                     self.show_conn_details()
       
   281                 while self.running:
       
   282                     try:
       
   283                         msg = self.read()
       
   284                         if not msg:
       
   285                             # eof, so quit this handler
       
   286                             self.running = False
       
   287                             self.close()
       
   288                         elif msg.strip() == 'over':
       
   289                             if test_support.verbose and self.server.connectionchatty:
       
   290                                 sys.stdout.write(" server: client closed connection\n")
       
   291                             self.close()
       
   292                             return
       
   293                         elif self.server.starttls_server and msg.strip() == 'STARTTLS':
       
   294                             if test_support.verbose and self.server.connectionchatty:
       
   295                                 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
       
   296                             self.write("OK\n")
       
   297                             if not self.wrap_conn():
       
   298                                 return
       
   299                         elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS':
       
   300                             if test_support.verbose and self.server.connectionchatty:
       
   301                                 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
       
   302                             self.write("OK\n")
       
   303                             self.sslconn.unwrap()
       
   304                             self.sslconn = None
       
   305                             if test_support.verbose and self.server.connectionchatty:
       
   306                                 sys.stdout.write(" server: connection is now unencrypted...\n")
       
   307                         else:
       
   308                             if (test_support.verbose and
       
   309                                 self.server.connectionchatty):
       
   310                                 ctype = (self.sslconn and "encrypted") or "unencrypted"
       
   311                                 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
       
   312                                                  % (repr(msg), ctype, repr(msg.lower()), ctype))
       
   313                             self.write(msg.lower())
       
   314                     except ssl.SSLError:
       
   315                         if self.server.chatty:
       
   316                             handle_error("Test server failure:\n")
       
   317                         self.close()
       
   318                         self.running = False
       
   319                         # normally, we'd just stop here, but for the test
       
   320                         # harness, we want to stop the server
       
   321                         self.server.stop()
       
   322                     except:
       
   323                         handle_error('')
       
   324 
       
   325         def __init__(self, certificate, ssl_version=None,
       
   326                      certreqs=None, cacerts=None, expect_bad_connects=False,
       
   327                      chatty=True, connectionchatty=False, starttls_server=False,
       
   328                      wrap_accepting_socket=False):
       
   329 
       
   330             if ssl_version is None:
       
   331                 ssl_version = ssl.PROTOCOL_TLSv1
       
   332             if certreqs is None:
       
   333                 certreqs = ssl.CERT_NONE
       
   334             self.certificate = certificate
       
   335             self.protocol = ssl_version
       
   336             self.certreqs = certreqs
       
   337             self.cacerts = cacerts
       
   338             self.expect_bad_connects = expect_bad_connects
       
   339             self.chatty = chatty
       
   340             self.connectionchatty = connectionchatty
       
   341             self.starttls_server = starttls_server
       
   342             self.sock = socket.socket()
       
   343             self.flag = None
       
   344             if wrap_accepting_socket:
       
   345                 self.sock = ssl.wrap_socket(self.sock, server_side=True,
       
   346                                             certfile=self.certificate,
       
   347                                             cert_reqs = self.certreqs,
       
   348                                             ca_certs = self.cacerts,
       
   349                                             ssl_version = self.protocol)
       
   350                 if test_support.verbose and self.chatty:
       
   351                     sys.stdout.write(' server:  wrapped server socket as %s\n' % str(self.sock))
       
   352             self.port = test_support.bind_port(self.sock)
       
   353             self.active = False
       
   354             threading.Thread.__init__(self)
       
   355             self.daemon = True
       
   356 
       
   357         def start (self, flag=None):
       
   358             self.flag = flag
       
   359             threading.Thread.start(self)
       
   360 
       
   361         def run (self):
       
   362             self.sock.settimeout(0.5)
       
   363             self.sock.listen(5)
       
   364             self.active = True
       
   365             if self.flag:
       
   366                 # signal an event
       
   367                 self.flag.set()
       
   368             while self.active:
       
   369                 try:
       
   370                     newconn, connaddr = self.sock.accept()
       
   371                     if test_support.verbose and self.chatty:
       
   372                         sys.stdout.write(' server:  new connection from '
       
   373                                          + str(connaddr) + '\n')
       
   374                     handler = self.ConnectionHandler(self, newconn)
       
   375                     handler.start()
       
   376                 except socket.timeout:
       
   377                     pass
       
   378                 except KeyboardInterrupt:
       
   379                     self.stop()
       
   380                 except:
       
   381                     if self.chatty:
       
   382                         handle_error("Test server failure:\n")
       
   383             self.sock.close()
       
   384 
       
   385         def stop (self):
       
   386             self.active = False
       
   387 
       
   388     class AsyncoreEchoServer(threading.Thread):
       
   389 
       
   390         class EchoServer (asyncore.dispatcher):
       
   391 
       
   392             class ConnectionHandler (asyncore.dispatcher_with_send):
       
   393 
       
   394                 def __init__(self, conn, certfile):
       
   395                     asyncore.dispatcher_with_send.__init__(self, conn)
       
   396                     self.socket = ssl.wrap_socket(conn, server_side=True,
       
   397                                                   certfile=certfile,
       
   398                                                   do_handshake_on_connect=True)
       
   399 
       
   400                 def readable(self):
       
   401                     if isinstance(self.socket, ssl.SSLSocket):
       
   402                         while self.socket.pending() > 0:
       
   403                             self.handle_read_event()
       
   404                     return True
       
   405 
       
   406                 def handle_read(self):
       
   407                     data = self.recv(1024)
       
   408                     self.send(data.lower())
       
   409 
       
   410                 def handle_close(self):
       
   411                     self.close()
       
   412                     if test_support.verbose:
       
   413                         sys.stdout.write(" server:  closed connection %s\n" % self.socket)
       
   414 
       
   415                 def handle_error(self):
       
   416                     raise
       
   417 
       
   418             def __init__(self, certfile):
       
   419                 self.certfile = certfile
       
   420                 asyncore.dispatcher.__init__(self)
       
   421                 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
       
   422                 self.port = test_support.bind_port(self.socket)
       
   423                 self.listen(5)
       
   424 
       
   425             def handle_accept(self):
       
   426                 sock_obj, addr = self.accept()
       
   427                 if test_support.verbose:
       
   428                     sys.stdout.write(" server:  new connection from %s:%s\n" %addr)
       
   429                 self.ConnectionHandler(sock_obj, self.certfile)
       
   430 
       
   431             def handle_error(self):
       
   432                 raise
       
   433 
       
   434         def __init__(self, certfile):
       
   435             self.flag = None
       
   436             self.active = False
       
   437             self.server = self.EchoServer(certfile)
       
   438             self.port = self.server.port
       
   439             threading.Thread.__init__(self)
       
   440             self.daemon = True
       
   441 
       
   442         def __str__(self):
       
   443             return "<%s %s>" % (self.__class__.__name__, self.server)
       
   444 
       
   445         def start (self, flag=None):
       
   446             self.flag = flag
       
   447             threading.Thread.start(self)
       
   448 
       
   449         def run (self):
       
   450             self.active = True
       
   451             if self.flag:
       
   452                 self.flag.set()
       
   453             while self.active:
       
   454                 try:
       
   455                     asyncore.loop(1)
       
   456                 except:
       
   457                     pass
       
   458 
       
   459         def stop (self):
       
   460             self.active = False
       
   461             self.server.close()
       
   462 
       
   463     class SocketServerHTTPSServer(threading.Thread):
       
   464 
       
   465         class HTTPSServer(HTTPServer):
       
   466 
       
   467             def __init__(self, server_address, RequestHandlerClass, certfile):
       
   468 
       
   469                 HTTPServer.__init__(self, server_address, RequestHandlerClass)
       
   470                 # we assume the certfile contains both private key and certificate
       
   471                 self.certfile = certfile
       
   472                 self.active = False
       
   473                 self.active_lock = threading.Lock()
       
   474                 self.allow_reuse_address = True
       
   475 
       
   476             def __str__(self):
       
   477                 return ('<%s %s:%s>' %
       
   478                         (self.__class__.__name__,
       
   479                          self.server_name,
       
   480                          self.server_port))
       
   481 
       
   482             def get_request (self):
       
   483                 # override this to wrap socket with SSL
       
   484                 sock, addr = self.socket.accept()
       
   485                 sslconn = ssl.wrap_socket(sock, server_side=True,
       
   486                                           certfile=self.certfile)
       
   487                 return sslconn, addr
       
   488 
       
   489             # The methods overridden below this are mainly so that we
       
   490             # can run it in a thread and be able to stop it from another
       
   491             # You probably wouldn't need them in other uses.
       
   492 
       
   493             def server_activate(self):
       
   494                 # We want to run this in a thread for testing purposes,
       
   495                 # so we override this to set timeout, so that we get
       
   496                 # a chance to stop the server
       
   497                 self.socket.settimeout(0.5)
       
   498                 HTTPServer.server_activate(self)
       
   499 
       
   500             def serve_forever(self):
       
   501                 # We want this to run in a thread, so we use a slightly
       
   502                 # modified version of "forever".
       
   503                 self.active = True
       
   504                 while 1:
       
   505                     try:
       
   506                         # We need to lock while handling the request.
       
   507                         # Another thread can close the socket after self.active
       
   508                         # has been checked and before the request is handled.
       
   509                         # This causes an exception when using the closed socket.
       
   510                         with self.active_lock:
       
   511                             if not self.active:
       
   512                                 break
       
   513                             self.handle_request()
       
   514                     except socket.timeout:
       
   515                         pass
       
   516                     except KeyboardInterrupt:
       
   517                         self.server_close()
       
   518                         return
       
   519                     except:
       
   520                         sys.stdout.write(''.join(traceback.format_exception(*sys.exc_info())))
       
   521                         break
       
   522                     time.sleep(0.1)
       
   523 
       
   524             def server_close(self):
       
   525                 # Again, we want this to run in a thread, so we need to override
       
   526                 # close to clear the "active" flag, so that serve_forever() will
       
   527                 # terminate.
       
   528                 with self.active_lock:
       
   529                     HTTPServer.server_close(self)
       
   530                     self.active = False
       
   531 
       
   532         class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
       
   533 
       
   534             # need to override translate_path to get a known root,
       
   535             # instead of using os.curdir, since the test could be
       
   536             # run from anywhere
       
   537 
       
   538             server_version = "TestHTTPS/1.0"
       
   539 
       
   540             root = None
       
   541 
       
   542             def translate_path(self, path):
       
   543                 """Translate a /-separated PATH to the local filename syntax.
       
   544 
       
   545                 Components that mean special things to the local file system
       
   546                 (e.g. drive or directory names) are ignored.  (XXX They should
       
   547                 probably be diagnosed.)
       
   548 
       
   549                 """
       
   550                 # abandon query parameters
       
   551                 path = urlparse.urlparse(path)[2]
       
   552                 path = os.path.normpath(urllib.unquote(path))
       
   553                 words = path.split('/')
       
   554                 words = filter(None, words)
       
   555                 path = self.root
       
   556                 for word in words:
       
   557                     drive, word = os.path.splitdrive(word)
       
   558                     head, word = os.path.split(word)
       
   559                     if word in self.root: continue
       
   560                     path = os.path.join(path, word)
       
   561                 return path
       
   562 
       
   563             def log_message(self, format, *args):
       
   564 
       
   565                 # we override this to suppress logging unless "verbose"
       
   566 
       
   567                 if test_support.verbose:
       
   568                     sys.stdout.write(" server (%s:%d %s):\n   [%s] %s\n" %
       
   569                                      (self.server.server_address,
       
   570                                       self.server.server_port,
       
   571                                       self.request.cipher(),
       
   572                                       self.log_date_time_string(),
       
   573                                       format%args))
       
   574 
       
   575 
       
   576         def __init__(self, certfile):
       
   577             self.flag = None
       
   578             self.active = False
       
   579             self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
       
   580             self.port = test_support.find_unused_port()
       
   581             self.server = self.HTTPSServer(
       
   582                 (HOST, self.port), self.RootedHTTPRequestHandler, certfile)
       
   583             threading.Thread.__init__(self)
       
   584             self.daemon = True
       
   585 
       
   586         def __str__(self):
       
   587             return "<%s %s>" % (self.__class__.__name__, self.server)
       
   588 
       
   589         def start (self, flag=None):
       
   590             self.flag = flag
       
   591             threading.Thread.start(self)
       
   592 
       
   593         def run (self):
       
   594             self.active = True
       
   595             if self.flag:
       
   596                 self.flag.set()
       
   597             self.server.serve_forever()
       
   598             self.active = False
       
   599 
       
   600         def stop (self):
       
   601             self.active = False
       
   602             self.server.server_close()
       
   603 
       
   604 
       
   605     def badCertTest (certfile):
       
   606         server = ThreadedEchoServer(CERTFILE,
       
   607                                     certreqs=ssl.CERT_REQUIRED,
       
   608                                     cacerts=CERTFILE, chatty=False)
       
   609         flag = threading.Event()
       
   610         server.start(flag)
       
   611         # wait for it to start
       
   612         flag.wait()
       
   613         # try to connect
       
   614         try:
       
   615             try:
       
   616                 s = ssl.wrap_socket(socket.socket(),
       
   617                                     certfile=certfile,
       
   618                                     ssl_version=ssl.PROTOCOL_TLSv1)
       
   619                 s.connect((HOST, server.port))
       
   620             except ssl.SSLError, x:
       
   621                 if test_support.verbose:
       
   622                     sys.stdout.write("\nSSLError is %s\n" % x[1])
       
   623             except socket.error, x:
       
   624                 if test_support.verbose:
       
   625                     sys.stdout.write("\nsocket.error is %s\n" % x[1])
       
   626             else:
       
   627                 raise test_support.TestFailed(
       
   628                     "Use of invalid cert should have failed!")
       
   629         finally:
       
   630             server.stop()
       
   631             server.join()
       
   632 
       
   633     def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
       
   634                           client_certfile, client_protocol=None, indata="FOO\n",
       
   635                           chatty=True, connectionchatty=False,
       
   636                           wrap_accepting_socket=False):
       
   637 
       
   638         server = ThreadedEchoServer(certfile,
       
   639                                     certreqs=certreqs,
       
   640                                     ssl_version=protocol,
       
   641                                     cacerts=cacertsfile,
       
   642                                     chatty=chatty,
       
   643                                     connectionchatty=connectionchatty,
       
   644                                     wrap_accepting_socket=wrap_accepting_socket)
       
   645         flag = threading.Event()
       
   646         server.start(flag)
       
   647         # wait for it to start
       
   648         flag.wait()
       
   649         # try to connect
       
   650         if client_protocol is None:
       
   651             client_protocol = protocol
       
   652         try:
       
   653             try:
       
   654                 s = ssl.wrap_socket(socket.socket(),
       
   655                                     certfile=client_certfile,
       
   656                                     ca_certs=cacertsfile,
       
   657                                     cert_reqs=certreqs,
       
   658                                     ssl_version=client_protocol)
       
   659                 s.connect((HOST, server.port))
       
   660             except ssl.SSLError, x:
       
   661                 raise test_support.TestFailed("Unexpected SSL error:  " + str(x))
       
   662             except Exception, x:
       
   663                 raise test_support.TestFailed("Unexpected exception:  " + str(x))
       
   664             else:
       
   665                 if connectionchatty:
       
   666                     if test_support.verbose:
       
   667                         sys.stdout.write(
       
   668                             " client:  sending %s...\n" % (repr(indata)))
       
   669                 s.write(indata)
       
   670                 outdata = s.read()
       
   671                 if connectionchatty:
       
   672                     if test_support.verbose:
       
   673                         sys.stdout.write(" client:  read %s\n" % repr(outdata))
       
   674                 if outdata != indata.lower():
       
   675                     raise test_support.TestFailed(
       
   676                         "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
       
   677                         % (outdata[:min(len(outdata),20)], len(outdata),
       
   678                            indata[:min(len(indata),20)].lower(), len(indata)))
       
   679                 s.write("over\n")
       
   680                 if connectionchatty:
       
   681                     if test_support.verbose:
       
   682                         sys.stdout.write(" client:  closing connection.\n")
       
   683                 s.close()
       
   684         finally:
       
   685             server.stop()
       
   686             server.join()
       
   687 
       
   688     def tryProtocolCombo (server_protocol,
       
   689                           client_protocol,
       
   690                           expectedToWork,
       
   691                           certsreqs=None):
       
   692 
       
   693         if certsreqs is None:
       
   694             certsreqs = ssl.CERT_NONE
       
   695 
       
   696         if certsreqs == ssl.CERT_NONE:
       
   697             certtype = "CERT_NONE"
       
   698         elif certsreqs == ssl.CERT_OPTIONAL:
       
   699             certtype = "CERT_OPTIONAL"
       
   700         elif certsreqs == ssl.CERT_REQUIRED:
       
   701             certtype = "CERT_REQUIRED"
       
   702         if test_support.verbose:
       
   703             formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
       
   704             sys.stdout.write(formatstr %
       
   705                              (ssl.get_protocol_name(client_protocol),
       
   706                               ssl.get_protocol_name(server_protocol),
       
   707                               certtype))
       
   708         try:
       
   709             serverParamsTest(CERTFILE, server_protocol, certsreqs,
       
   710                              CERTFILE, CERTFILE, client_protocol, chatty=False)
       
   711         except test_support.TestFailed:
       
   712             if expectedToWork:
       
   713                 raise
       
   714         else:
       
   715             if not expectedToWork:
       
   716                 raise test_support.TestFailed(
       
   717                     "Client protocol %s succeeded with server protocol %s!"
       
   718                     % (ssl.get_protocol_name(client_protocol),
       
   719                        ssl.get_protocol_name(server_protocol)))
       
   720 
       
   721 
       
   722     class ThreadedTests(unittest.TestCase):
       
   723 
       
   724         def testRudeShutdown(self):
       
   725 
       
   726             listener_ready = threading.Event()
       
   727             listener_gone = threading.Event()
       
   728             port = test_support.find_unused_port()
       
   729 
       
   730             # `listener` runs in a thread.  It opens a socket listening on
       
   731             # PORT, and sits in an accept() until the main thread connects.
       
   732             # Then it rudely closes the socket, and sets Event `listener_gone`
       
   733             # to let the main thread know the socket is gone.
       
   734             def listener():
       
   735                 s = socket.socket()
       
   736                 s.bind((HOST, port))
       
   737                 s.listen(5)
       
   738                 listener_ready.set()
       
   739                 s.accept()
       
   740                 s = None # reclaim the socket object, which also closes it
       
   741                 listener_gone.set()
       
   742 
       
   743             def connector():
       
   744                 listener_ready.wait()
       
   745                 s = socket.socket()
       
   746                 s.connect((HOST, port))
       
   747                 listener_gone.wait()
       
   748                 try:
       
   749                     ssl_sock = ssl.wrap_socket(s)
       
   750                 except IOError:
       
   751                     pass
       
   752                 else:
       
   753                     raise test_support.TestFailed(
       
   754                           'connecting to closed SSL socket should have failed')
       
   755 
       
   756             t = threading.Thread(target=listener)
       
   757             t.start()
       
   758             connector()
       
   759             t.join()
       
   760 
       
   761         def testEcho (self):
       
   762 
       
   763             if test_support.verbose:
       
   764                 sys.stdout.write("\n")
       
   765             serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
       
   766                              CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
       
   767                              chatty=True, connectionchatty=True)
       
   768 
       
   769         def testReadCert(self):
       
   770 
       
   771             if test_support.verbose:
       
   772                 sys.stdout.write("\n")
       
   773             s2 = socket.socket()
       
   774             server = ThreadedEchoServer(CERTFILE,
       
   775                                         certreqs=ssl.CERT_NONE,
       
   776                                         ssl_version=ssl.PROTOCOL_SSLv23,
       
   777                                         cacerts=CERTFILE,
       
   778                                         chatty=False)
       
   779             flag = threading.Event()
       
   780             server.start(flag)
       
   781             # wait for it to start
       
   782             flag.wait()
       
   783             # try to connect
       
   784             try:
       
   785                 try:
       
   786                     s = ssl.wrap_socket(socket.socket(),
       
   787                                         certfile=CERTFILE,
       
   788                                         ca_certs=CERTFILE,
       
   789                                         cert_reqs=ssl.CERT_REQUIRED,
       
   790                                         ssl_version=ssl.PROTOCOL_SSLv23)
       
   791                     s.connect((HOST, server.port))
       
   792                 except ssl.SSLError, x:
       
   793                     raise test_support.TestFailed(
       
   794                         "Unexpected SSL error:  " + str(x))
       
   795                 except Exception, x:
       
   796                     raise test_support.TestFailed(
       
   797                         "Unexpected exception:  " + str(x))
       
   798                 else:
       
   799                     if not s:
       
   800                         raise test_support.TestFailed(
       
   801                             "Can't SSL-handshake with test server")
       
   802                     cert = s.getpeercert()
       
   803                     if not cert:
       
   804                         raise test_support.TestFailed(
       
   805                             "Can't get peer certificate.")
       
   806                     cipher = s.cipher()
       
   807                     if test_support.verbose:
       
   808                         sys.stdout.write(pprint.pformat(cert) + '\n')
       
   809                         sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
       
   810                     if not cert.has_key('subject'):
       
   811                         raise test_support.TestFailed(
       
   812                             "No subject field in certificate: %s." %
       
   813                             pprint.pformat(cert))
       
   814                     if ((('organizationName', 'Python Software Foundation'),)
       
   815                         not in cert['subject']):
       
   816                         raise test_support.TestFailed(
       
   817                             "Missing or invalid 'organizationName' field in certificate subject; "
       
   818                             "should be 'Python Software Foundation'.")
       
   819                     s.close()
       
   820             finally:
       
   821                 server.stop()
       
   822                 server.join()
       
   823 
       
   824         def testNULLcert(self):
       
   825             badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
       
   826                                      "nullcert.pem"))
       
   827         def testMalformedCert(self):
       
   828             badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
       
   829                                      "badcert.pem"))
       
   830         def testWrongCert(self):
       
   831             badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
       
   832                                      "wrongcert.pem"))
       
   833         def testMalformedKey(self):
       
   834             badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
       
   835                                      "badkey.pem"))
       
   836 
       
   837         def testProtocolSSL2(self):
       
   838             if test_support.verbose:
       
   839                 sys.stdout.write("\n")
       
   840             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
       
   841             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
       
   842             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
       
   843             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
       
   844             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
       
   845             tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
       
   846 
       
   847         def testProtocolSSL23(self):
       
   848             if test_support.verbose:
       
   849                 sys.stdout.write("\n")
       
   850             try:
       
   851                 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
       
   852             except test_support.TestFailed, x:
       
   853                 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
       
   854                 if test_support.verbose:
       
   855                     sys.stdout.write(
       
   856                         " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
       
   857                         % str(x))
       
   858             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
       
   859             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
       
   860             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
       
   861 
       
   862             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
       
   863             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
       
   864             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
       
   865 
       
   866             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
       
   867             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
       
   868             tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
       
   869 
       
   870         def testProtocolSSL3(self):
       
   871             if test_support.verbose:
       
   872                 sys.stdout.write("\n")
       
   873             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
       
   874             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
       
   875             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
       
   876             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
       
   877             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
       
   878             tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
       
   879 
       
   880         def testProtocolTLS1(self):
       
   881             if test_support.verbose:
       
   882                 sys.stdout.write("\n")
       
   883             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
       
   884             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
       
   885             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
       
   886             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
       
   887             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
       
   888             tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
       
   889 
       
   890         def testSTARTTLS (self):
       
   891 
       
   892             msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
       
   893 
       
   894             server = ThreadedEchoServer(CERTFILE,
       
   895                                         ssl_version=ssl.PROTOCOL_TLSv1,
       
   896                                         starttls_server=True,
       
   897                                         chatty=True,
       
   898                                         connectionchatty=True)
       
   899             flag = threading.Event()
       
   900             server.start(flag)
       
   901             # wait for it to start
       
   902             flag.wait()
       
   903             # try to connect
       
   904             wrapped = False
       
   905             try:
       
   906                 try:
       
   907                     s = socket.socket()
       
   908                     s.setblocking(1)
       
   909                     s.connect((HOST, server.port))
       
   910                 except Exception, x:
       
   911                     raise test_support.TestFailed("Unexpected exception:  " + str(x))
       
   912                 else:
       
   913                     if test_support.verbose:
       
   914                         sys.stdout.write("\n")
       
   915                     for indata in msgs:
       
   916                         if test_support.verbose:
       
   917                             sys.stdout.write(
       
   918                                 " client:  sending %s...\n" % repr(indata))
       
   919                         if wrapped:
       
   920                             conn.write(indata)
       
   921                             outdata = conn.read()
       
   922                         else:
       
   923                             s.send(indata)
       
   924                             outdata = s.recv(1024)
       
   925                         if (indata == "STARTTLS" and
       
   926                             outdata.strip().lower().startswith("ok")):
       
   927                             if test_support.verbose:
       
   928                                 sys.stdout.write(
       
   929                                     " client:  read %s from server, starting TLS...\n"
       
   930                                     % repr(outdata))
       
   931                             conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
       
   932                             wrapped = True
       
   933                         elif (indata == "ENDTLS" and
       
   934                             outdata.strip().lower().startswith("ok")):
       
   935                             if test_support.verbose:
       
   936                                 sys.stdout.write(
       
   937                                     " client:  read %s from server, ending TLS...\n"
       
   938                                     % repr(outdata))
       
   939                             s = conn.unwrap()
       
   940                             wrapped = False
       
   941                         else:
       
   942                             if test_support.verbose:
       
   943                                 sys.stdout.write(
       
   944                                     " client:  read %s from server\n" % repr(outdata))
       
   945                     if test_support.verbose:
       
   946                         sys.stdout.write(" client:  closing connection.\n")
       
   947                     if wrapped:
       
   948                         conn.write("over\n")
       
   949                     else:
       
   950                         s.send("over\n")
       
   951                     s.close()
       
   952             finally:
       
   953                 server.stop()
       
   954                 server.join()
       
   955 
       
   956         def testSocketServer(self):
       
   957 
       
   958             server = SocketServerHTTPSServer(CERTFILE)
       
   959             flag = threading.Event()
       
   960             server.start(flag)
       
   961             # wait for it to start
       
   962             flag.wait()
       
   963             # try to connect
       
   964             try:
       
   965                 if test_support.verbose:
       
   966                     sys.stdout.write('\n')
       
   967                 d1 = open(CERTFILE, 'rb').read()
       
   968                 d2 = ''
       
   969                 # now fetch the same data from the HTTPS server
       
   970                 url = 'https://127.0.0.1:%d/%s' % (
       
   971                     server.port, os.path.split(CERTFILE)[1])
       
   972                 f = urllib.urlopen(url)
       
   973                 dlen = f.info().getheader("content-length")
       
   974                 if dlen and (int(dlen) > 0):
       
   975                     d2 = f.read(int(dlen))
       
   976                     if test_support.verbose:
       
   977                         sys.stdout.write(
       
   978                             " client: read %d bytes from remote server '%s'\n"
       
   979                             % (len(d2), server))
       
   980                 f.close()
       
   981             except:
       
   982                 msg = ''.join(traceback.format_exception(*sys.exc_info()))
       
   983                 if test_support.verbose:
       
   984                     sys.stdout.write('\n' + msg)
       
   985                 raise test_support.TestFailed(msg)
       
   986             else:
       
   987                 if not (d1 == d2):
       
   988                     raise test_support.TestFailed(
       
   989                         "Couldn't fetch data from HTTPS server")
       
   990             finally:
       
   991                 server.stop()
       
   992                 server.join()
       
   993 
       
   994         def testWrappedAccept (self):
       
   995 
       
   996             if test_support.verbose:
       
   997                 sys.stdout.write("\n")
       
   998             serverParamsTest(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED,
       
   999                              CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23,
       
  1000                              chatty=True, connectionchatty=True,
       
  1001                              wrap_accepting_socket=True)
       
  1002 
       
  1003 
       
  1004         def testAsyncoreServer (self):
       
  1005 
       
  1006             indata = "TEST MESSAGE of mixed case\n"
       
  1007 
       
  1008             if test_support.verbose:
       
  1009                 sys.stdout.write("\n")
       
  1010             server = AsyncoreEchoServer(CERTFILE)
       
  1011             flag = threading.Event()
       
  1012             server.start(flag)
       
  1013             # wait for it to start
       
  1014             flag.wait()
       
  1015             # try to connect
       
  1016             try:
       
  1017                 try:
       
  1018                     s = ssl.wrap_socket(socket.socket())
       
  1019                     s.connect(('127.0.0.1', server.port))
       
  1020                 except ssl.SSLError, x:
       
  1021                     raise test_support.TestFailed("Unexpected SSL error:  " + str(x))
       
  1022                 except Exception, x:
       
  1023                     raise test_support.TestFailed("Unexpected exception:  " + str(x))
       
  1024                 else:
       
  1025                     if test_support.verbose:
       
  1026                         sys.stdout.write(
       
  1027                             " client:  sending %s...\n" % (repr(indata)))
       
  1028                     s.write(indata)
       
  1029                     outdata = s.read()
       
  1030                     if test_support.verbose:
       
  1031                         sys.stdout.write(" client:  read %s\n" % repr(outdata))
       
  1032                     if outdata != indata.lower():
       
  1033                         raise test_support.TestFailed(
       
  1034                             "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
       
  1035                             % (outdata[:min(len(outdata),20)], len(outdata),
       
  1036                                indata[:min(len(indata),20)].lower(), len(indata)))
       
  1037                     s.write("over\n")
       
  1038                     if test_support.verbose:
       
  1039                         sys.stdout.write(" client:  closing connection.\n")
       
  1040                     s.close()
       
  1041             finally:
       
  1042                 server.stop()
       
  1043                 # wait for server thread to end
       
  1044                 server.join()
       
  1045 
       
  1046 
       
  1047         def testAllRecvAndSendMethods(self):
       
  1048 
       
  1049             if test_support.verbose:
       
  1050                 sys.stdout.write("\n")
       
  1051 
       
  1052             server = ThreadedEchoServer(CERTFILE,
       
  1053                                         certreqs=ssl.CERT_NONE,
       
  1054                                         ssl_version=ssl.PROTOCOL_TLSv1,
       
  1055                                         cacerts=CERTFILE,
       
  1056                                         chatty=True,
       
  1057                                         connectionchatty=False)
       
  1058             flag = threading.Event()
       
  1059             server.start(flag)
       
  1060             # wait for it to start
       
  1061             flag.wait()
       
  1062             # try to connect
       
  1063             try:
       
  1064                 s = ssl.wrap_socket(socket.socket(),
       
  1065                                     server_side=False,
       
  1066                                     certfile=CERTFILE,
       
  1067                                     ca_certs=CERTFILE,
       
  1068                                     cert_reqs=ssl.CERT_NONE,
       
  1069                                     ssl_version=ssl.PROTOCOL_TLSv1)
       
  1070                 s.connect((HOST, server.port))
       
  1071             except ssl.SSLError as x:
       
  1072                 raise support.TestFailed("Unexpected SSL error:  " + str(x))
       
  1073             except Exception as x:
       
  1074                 raise support.TestFailed("Unexpected exception:  " + str(x))
       
  1075             else:
       
  1076                 # helper methods for standardising recv* method signatures
       
  1077                 def _recv_into():
       
  1078                     b = bytearray("\0"*100)
       
  1079                     count = s.recv_into(b)
       
  1080                     return b[:count]
       
  1081 
       
  1082                 def _recvfrom_into():
       
  1083                     b = bytearray("\0"*100)
       
  1084                     count, addr = s.recvfrom_into(b)
       
  1085                     return b[:count]
       
  1086 
       
  1087                 # (name, method, whether to expect success, *args)
       
  1088                 send_methods = [
       
  1089                     ('send', s.send, True, []),
       
  1090                     ('sendto', s.sendto, False, ["some.address"]),
       
  1091                     ('sendall', s.sendall, True, []),
       
  1092                 ]
       
  1093                 recv_methods = [
       
  1094                     ('recv', s.recv, True, []),
       
  1095                     ('recvfrom', s.recvfrom, False, ["some.address"]),
       
  1096                     ('recv_into', _recv_into, True, []),
       
  1097                     ('recvfrom_into', _recvfrom_into, False, []),
       
  1098                 ]
       
  1099                 data_prefix = u"PREFIX_"
       
  1100 
       
  1101                 for meth_name, send_meth, expect_success, args in send_methods:
       
  1102                     indata = data_prefix + meth_name
       
  1103                     try:
       
  1104                         send_meth(indata.encode('ASCII', 'strict'), *args)
       
  1105                         outdata = s.read()
       
  1106                         outdata = outdata.decode('ASCII', 'strict')
       
  1107                         if outdata != indata.lower():
       
  1108                             raise support.TestFailed(
       
  1109                                 "While sending with <<%s>> bad data "
       
  1110                                 "<<%r>> (%d) received; "
       
  1111                                 "expected <<%r>> (%d)\n" % (
       
  1112                                     meth_name, outdata[:20], len(outdata),
       
  1113                                     indata[:20], len(indata)
       
  1114                                 )
       
  1115                             )
       
  1116                     except ValueError as e:
       
  1117                         if expect_success:
       
  1118                             raise support.TestFailed(
       
  1119                                 "Failed to send with method <<%s>>; "
       
  1120                                 "expected to succeed.\n" % (meth_name,)
       
  1121                             )
       
  1122                         if not str(e).startswith(meth_name):
       
  1123                             raise support.TestFailed(
       
  1124                                 "Method <<%s>> failed with unexpected "
       
  1125                                 "exception message: %s\n" % (
       
  1126                                     meth_name, e
       
  1127                                 )
       
  1128                             )
       
  1129 
       
  1130                 for meth_name, recv_meth, expect_success, args in recv_methods:
       
  1131                     indata = data_prefix + meth_name
       
  1132                     try:
       
  1133                         s.send(indata.encode('ASCII', 'strict'))
       
  1134                         outdata = recv_meth(*args)
       
  1135                         outdata = outdata.decode('ASCII', 'strict')
       
  1136                         if outdata != indata.lower():
       
  1137                             raise support.TestFailed(
       
  1138                                 "While receiving with <<%s>> bad data "
       
  1139                                 "<<%r>> (%d) received; "
       
  1140                                 "expected <<%r>> (%d)\n" % (
       
  1141                                     meth_name, outdata[:20], len(outdata),
       
  1142                                     indata[:20], len(indata)
       
  1143                                 )
       
  1144                             )
       
  1145                     except ValueError as e:
       
  1146                         if expect_success:
       
  1147                             raise support.TestFailed(
       
  1148                                 "Failed to receive with method <<%s>>; "
       
  1149                                 "expected to succeed.\n" % (meth_name,)
       
  1150                             )
       
  1151                         if not str(e).startswith(meth_name):
       
  1152                             raise support.TestFailed(
       
  1153                                 "Method <<%s>> failed with unexpected "
       
  1154                                 "exception message: %s\n" % (
       
  1155                                     meth_name, e
       
  1156                                 )
       
  1157                             )
       
  1158                         # consume data
       
  1159                         s.read()
       
  1160 
       
  1161                 s.write("over\n".encode("ASCII", "strict"))
       
  1162                 s.close()
       
  1163             finally:
       
  1164                 server.stop()
       
  1165                 server.join()
       
  1166 
       
  1167 
       
  1168 def test_main(verbose=False):
       
  1169     if skip_expected:
       
  1170         raise test_support.TestSkipped("No SSL support")
       
  1171 
       
  1172     global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
       
  1173     CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
       
  1174                             "keycert.pem")
       
  1175     SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
       
  1176         os.path.dirname(__file__) or os.curdir,
       
  1177         "https_svn_python_org_root.pem")
       
  1178 
       
  1179     if (not os.path.exists(CERTFILE) or
       
  1180         not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
       
  1181         raise test_support.TestFailed("Can't read certificate files!")
       
  1182 
       
  1183     TESTPORT = test_support.find_unused_port()
       
  1184     if not TESTPORT:
       
  1185         raise test_support.TestFailed("Can't find open port to test servers on!")
       
  1186 
       
  1187     tests = [BasicTests]
       
  1188 
       
  1189     if test_support.is_resource_enabled('network'):
       
  1190         tests.append(NetworkedTests)
       
  1191 
       
  1192     if _have_threads:
       
  1193         thread_info = test_support.threading_setup()
       
  1194         if thread_info and test_support.is_resource_enabled('network'):
       
  1195             tests.append(ThreadedTests)
       
  1196 
       
  1197     test_support.run_unittest(*tests)
       
  1198 
       
  1199     if _have_threads:
       
  1200         test_support.threading_cleanup(*thread_info)
       
  1201 
       
  1202 if __name__ == "__main__":
       
  1203     test_main()