symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_smtplib.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 import asyncore
       
     2 import email.utils
       
     3 import socket
       
     4 import threading
       
     5 import smtpd
       
     6 import smtplib
       
     7 import StringIO
       
     8 import sys
       
     9 import time
       
    10 import select
       
    11 
       
    12 from unittest import TestCase
       
    13 from test import test_support
       
    14 
       
    15 HOST = test_support.HOST
       
    16 
       
    17 def server(evt, buf, serv):
       
    18     serv.listen(5)
       
    19     evt.set()
       
    20     try:
       
    21         conn, addr = serv.accept()
       
    22     except socket.timeout:
       
    23         pass
       
    24     else:
       
    25         n = 500
       
    26         while buf and n > 0:
       
    27             r, w, e = select.select([], [conn], [])
       
    28             if w:
       
    29                 sent = conn.send(buf)
       
    30                 buf = buf[sent:]
       
    31 
       
    32             n -= 1
       
    33 
       
    34         conn.close()
       
    35     finally:
       
    36         serv.close()
       
    37         evt.set()
       
    38 
       
    39 class GeneralTests(TestCase):
       
    40 
       
    41     def setUp(self):
       
    42         self.evt = threading.Event()
       
    43         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
    44         self.sock.settimeout(15)
       
    45         self.port = test_support.bind_port(self.sock)
       
    46         servargs = (self.evt, "220 Hola mundo\n", self.sock)
       
    47         threading.Thread(target=server, args=servargs).start()
       
    48         self.evt.wait()
       
    49         self.evt.clear()
       
    50 
       
    51     def tearDown(self):
       
    52         self.evt.wait()
       
    53 
       
    54     def testBasic1(self):
       
    55         # connects
       
    56         smtp = smtplib.SMTP(HOST, self.port)
       
    57         smtp.close()
       
    58 
       
    59     def testBasic2(self):
       
    60         # connects, include port in host name
       
    61         smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
       
    62         smtp.close()
       
    63 
       
    64     def testLocalHostName(self):
       
    65         # check that supplied local_hostname is used
       
    66         smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
       
    67         self.assertEqual(smtp.local_hostname, "testhost")
       
    68         smtp.close()
       
    69 
       
    70     def testTimeoutDefault(self):
       
    71         self.assertTrue(socket.getdefaulttimeout() is None)
       
    72         socket.setdefaulttimeout(30)
       
    73         try:
       
    74             smtp = smtplib.SMTP(HOST, self.port)
       
    75         finally:
       
    76             socket.setdefaulttimeout(None)
       
    77         self.assertEqual(smtp.sock.gettimeout(), 30)
       
    78         smtp.close()
       
    79 
       
    80     def testTimeoutNone(self):
       
    81         self.assertTrue(socket.getdefaulttimeout() is None)
       
    82         socket.setdefaulttimeout(30)
       
    83         try:
       
    84             smtp = smtplib.SMTP(HOST, self.port, timeout=None)
       
    85         finally:
       
    86             socket.setdefaulttimeout(None)
       
    87         self.assertTrue(smtp.sock.gettimeout() is None)
       
    88         smtp.close()
       
    89 
       
    90     def testTimeoutValue(self):
       
    91         smtp = smtplib.SMTP(HOST, self.port, timeout=30)
       
    92         self.assertEqual(smtp.sock.gettimeout(), 30)
       
    93         smtp.close()
       
    94 
       
    95 
       
    96 # Test server thread using the specified SMTP server class
       
    97 def debugging_server(serv, serv_evt, client_evt):
       
    98     serv_evt.set()
       
    99 
       
   100     try:
       
   101         if hasattr(select, 'poll'):
       
   102             poll_fun = asyncore.poll2
       
   103         else:
       
   104             poll_fun = asyncore.poll
       
   105 
       
   106         n = 1000
       
   107         while asyncore.socket_map and n > 0:
       
   108             poll_fun(0.01, asyncore.socket_map)
       
   109 
       
   110             # when the client conversation is finished, it will
       
   111             # set client_evt, and it's then ok to kill the server
       
   112             if client_evt.is_set():
       
   113                 serv.close()
       
   114                 break
       
   115 
       
   116             n -= 1
       
   117 
       
   118     except socket.timeout:
       
   119         pass
       
   120     finally:
       
   121         if not client_evt.is_set():
       
   122             # allow some time for the client to read the result
       
   123             time.sleep(0.5)
       
   124             serv.close()
       
   125         asyncore.close_all()
       
   126         serv_evt.set()
       
   127 
       
   128 MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
       
   129 MSG_END = '------------ END MESSAGE ------------\n'
       
   130 
       
   131 # NOTE: Some SMTP objects in the tests below are created with a non-default
       
   132 # local_hostname argument to the constructor, since (on some systems) the FQDN
       
   133 # lookup caused by the default local_hostname sometimes takes so long that the
       
   134 # test server times out, causing the test to fail.
       
   135 
       
   136 # Test behavior of smtpd.DebuggingServer
       
   137 class DebuggingServerTests(TestCase):
       
   138 
       
   139     def setUp(self):
       
   140         # temporarily replace sys.stdout to capture DebuggingServer output
       
   141         self.old_stdout = sys.stdout
       
   142         self.output = StringIO.StringIO()
       
   143         sys.stdout = self.output
       
   144 
       
   145         self.serv_evt = threading.Event()
       
   146         self.client_evt = threading.Event()
       
   147         self.port = test_support.find_unused_port()
       
   148         self.serv = smtpd.DebuggingServer((HOST, self.port), ('nowhere', -1))
       
   149         serv_args = (self.serv, self.serv_evt, self.client_evt)
       
   150         threading.Thread(target=debugging_server, args=serv_args).start()
       
   151 
       
   152         # wait until server thread has assigned a port number
       
   153         self.serv_evt.wait()
       
   154         self.serv_evt.clear()
       
   155 
       
   156     def tearDown(self):
       
   157         # indicate that the client is finished
       
   158         self.client_evt.set()
       
   159         # wait for the server thread to terminate
       
   160         self.serv_evt.wait()
       
   161         # restore sys.stdout
       
   162         sys.stdout = self.old_stdout
       
   163 
       
   164     def testBasic(self):
       
   165         # connect
       
   166         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
       
   167         smtp.quit()
       
   168 
       
   169     def testNOOP(self):
       
   170         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
       
   171         expected = (250, 'Ok')
       
   172         self.assertEqual(smtp.noop(), expected)
       
   173         smtp.quit()
       
   174 
       
   175     def testRSET(self):
       
   176         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
       
   177         expected = (250, 'Ok')
       
   178         self.assertEqual(smtp.rset(), expected)
       
   179         smtp.quit()
       
   180 
       
   181     def testNotImplemented(self):
       
   182         # EHLO isn't implemented in DebuggingServer
       
   183         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
       
   184         expected = (502, 'Error: command "EHLO" not implemented')
       
   185         self.assertEqual(smtp.ehlo(), expected)
       
   186         smtp.quit()
       
   187 
       
   188     def testVRFY(self):
       
   189         # VRFY isn't implemented in DebuggingServer
       
   190         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
       
   191         expected = (502, 'Error: command "VRFY" not implemented')
       
   192         self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
       
   193         self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
       
   194         smtp.quit()
       
   195 
       
   196     def testSecondHELO(self):
       
   197         # check that a second HELO returns a message that it's a duplicate
       
   198         # (this behavior is specific to smtpd.SMTPChannel)
       
   199         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
       
   200         smtp.helo()
       
   201         expected = (503, 'Duplicate HELO/EHLO')
       
   202         self.assertEqual(smtp.helo(), expected)
       
   203         smtp.quit()
       
   204 
       
   205     def testHELP(self):
       
   206         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
       
   207         self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
       
   208         smtp.quit()
       
   209 
       
   210     def testSend(self):
       
   211         # connect and send mail
       
   212         m = 'A test message'
       
   213         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
       
   214         smtp.sendmail('John', 'Sally', m)
       
   215         # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
       
   216         # in asyncore.  This sleep might help, but should really be fixed
       
   217         # properly by using an Event variable.
       
   218         time.sleep(0.01)
       
   219         smtp.quit()
       
   220 
       
   221         self.client_evt.set()
       
   222         self.serv_evt.wait()
       
   223         self.output.flush()
       
   224         mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
       
   225         self.assertEqual(self.output.getvalue(), mexpect)
       
   226 
       
   227 
       
   228 class NonConnectingTests(TestCase):
       
   229 
       
   230     def testNotConnected(self):
       
   231         # Test various operations on an unconnected SMTP object that
       
   232         # should raise exceptions (at present the attempt in SMTP.send
       
   233         # to reference the nonexistent 'sock' attribute of the SMTP object
       
   234         # causes an AttributeError)
       
   235         smtp = smtplib.SMTP()
       
   236         self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
       
   237         self.assertRaises(smtplib.SMTPServerDisconnected,
       
   238                           smtp.send, 'test msg')
       
   239 
       
   240     def testNonnumericPort(self):
       
   241         # check that non-numeric port raises socket.error
       
   242         self.assertRaises(socket.error, smtplib.SMTP,
       
   243                           "localhost", "bogus")
       
   244         self.assertRaises(socket.error, smtplib.SMTP,
       
   245                           "localhost:bogus")
       
   246 
       
   247 
       
   248 # test response of client to a non-successful HELO message
       
   249 class BadHELOServerTests(TestCase):
       
   250 
       
   251     def setUp(self):
       
   252         self.old_stdout = sys.stdout
       
   253         self.output = StringIO.StringIO()
       
   254         sys.stdout = self.output
       
   255 
       
   256         self.evt = threading.Event()
       
   257         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   258         self.sock.settimeout(15)
       
   259         self.port = test_support.bind_port(self.sock)
       
   260         servargs = (self.evt, "199 no hello for you!\n", self.sock)
       
   261         threading.Thread(target=server, args=servargs).start()
       
   262         self.evt.wait()
       
   263         self.evt.clear()
       
   264 
       
   265     def tearDown(self):
       
   266         self.evt.wait()
       
   267         sys.stdout = self.old_stdout
       
   268 
       
   269     def testFailingHELO(self):
       
   270         self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
       
   271                             HOST, self.port, 'localhost', 3)
       
   272 
       
   273 
       
   274 sim_users = {'Mr.A@somewhere.com':'John A',
       
   275              'Ms.B@somewhere.com':'Sally B',
       
   276              'Mrs.C@somewhereesle.com':'Ruth C',
       
   277             }
       
   278 
       
   279 sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
       
   280              'list-2':['Ms.B@somewhere.com',],
       
   281             }
       
   282 
       
   283 # Simulated SMTP channel & server
       
   284 class SimSMTPChannel(smtpd.SMTPChannel):
       
   285     def smtp_EHLO(self, arg):
       
   286         resp = '250-testhost\r\n' \
       
   287                '250-EXPN\r\n' \
       
   288                '250-SIZE 20000000\r\n' \
       
   289                '250-STARTTLS\r\n' \
       
   290                '250-DELIVERBY\r\n' \
       
   291                '250 HELP'
       
   292         self.push(resp)
       
   293 
       
   294     def smtp_VRFY(self, arg):
       
   295 #        print '\nsmtp_VRFY(%r)\n' % arg
       
   296 
       
   297         raw_addr = email.utils.parseaddr(arg)[1]
       
   298         quoted_addr = smtplib.quoteaddr(arg)
       
   299         if raw_addr in sim_users:
       
   300             self.push('250 %s %s' % (sim_users[raw_addr], quoted_addr))
       
   301         else:
       
   302             self.push('550 No such user: %s' % arg)
       
   303 
       
   304     def smtp_EXPN(self, arg):
       
   305 #        print '\nsmtp_EXPN(%r)\n' % arg
       
   306 
       
   307         list_name = email.utils.parseaddr(arg)[1].lower()
       
   308         if list_name in sim_lists:
       
   309             user_list = sim_lists[list_name]
       
   310             for n, user_email in enumerate(user_list):
       
   311                 quoted_addr = smtplib.quoteaddr(user_email)
       
   312                 if n < len(user_list) - 1:
       
   313                     self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
       
   314                 else:
       
   315                     self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
       
   316         else:
       
   317             self.push('550 No access for you!')
       
   318 
       
   319 
       
   320 class SimSMTPServer(smtpd.SMTPServer):
       
   321     def handle_accept(self):
       
   322         conn, addr = self.accept()
       
   323         channel = SimSMTPChannel(self, conn, addr)
       
   324 
       
   325     def process_message(self, peer, mailfrom, rcpttos, data):
       
   326         pass
       
   327 
       
   328 
       
   329 # Test various SMTP & ESMTP commands/behaviors that require a simulated server
       
   330 # (i.e., something with more features than DebuggingServer)
       
   331 class SMTPSimTests(TestCase):
       
   332 
       
   333     def setUp(self):
       
   334         self.serv_evt = threading.Event()
       
   335         self.client_evt = threading.Event()
       
   336         self.port = test_support.find_unused_port()
       
   337         self.serv = SimSMTPServer((HOST, self.port), ('nowhere', -1))
       
   338         serv_args = (self.serv, self.serv_evt, self.client_evt)
       
   339         threading.Thread(target=debugging_server, args=serv_args).start()
       
   340 
       
   341         # wait until server thread has assigned a port number
       
   342         self.serv_evt.wait()
       
   343         self.serv_evt.clear()
       
   344 
       
   345     def tearDown(self):
       
   346         # indicate that the client is finished
       
   347         self.client_evt.set()
       
   348         # wait for the server thread to terminate
       
   349         self.serv_evt.wait()
       
   350 
       
   351     def testBasic(self):
       
   352         # smoke test
       
   353         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
       
   354         smtp.quit()
       
   355 
       
   356     def testEHLO(self):
       
   357         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
       
   358 
       
   359         # no features should be present before the EHLO
       
   360         self.assertEqual(smtp.esmtp_features, {})
       
   361 
       
   362         # features expected from the test server
       
   363         expected_features = {'expn':'',
       
   364                              'size': '20000000',
       
   365                              'starttls': '',
       
   366                              'deliverby': '',
       
   367                              'help': '',
       
   368                              }
       
   369 
       
   370         smtp.ehlo()
       
   371         self.assertEqual(smtp.esmtp_features, expected_features)
       
   372         for k in expected_features:
       
   373             self.assertTrue(smtp.has_extn(k))
       
   374         self.assertFalse(smtp.has_extn('unsupported-feature'))
       
   375         smtp.quit()
       
   376 
       
   377     def testVRFY(self):
       
   378         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
       
   379 
       
   380         for email, name in sim_users.items():
       
   381             expected_known = (250, '%s %s' % (name, smtplib.quoteaddr(email)))
       
   382             self.assertEqual(smtp.vrfy(email), expected_known)
       
   383 
       
   384         u = 'nobody@nowhere.com'
       
   385         expected_unknown = (550, 'No such user: %s' % smtplib.quoteaddr(u))
       
   386         self.assertEqual(smtp.vrfy(u), expected_unknown)
       
   387         smtp.quit()
       
   388 
       
   389     def testEXPN(self):
       
   390         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
       
   391 
       
   392         for listname, members in sim_lists.items():
       
   393             users = []
       
   394             for m in members:
       
   395                 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
       
   396             expected_known = (250, '\n'.join(users))
       
   397             self.assertEqual(smtp.expn(listname), expected_known)
       
   398 
       
   399         u = 'PSU-Members-List'
       
   400         expected_unknown = (550, 'No access for you!')
       
   401         self.assertEqual(smtp.expn(u), expected_unknown)
       
   402         smtp.quit()
       
   403 
       
   404 
       
   405 
       
   406 def test_main(verbose=None):
       
   407     test_support.run_unittest(GeneralTests, DebuggingServerTests,
       
   408                               NonConnectingTests,
       
   409                               BadHELOServerTests, SMTPSimTests)
       
   410 
       
   411 if __name__ == '__main__':
       
   412     test_main()