|
1 import asyncore |
|
2 import email.utils |
|
3 import socket |
|
4 import threading |
|
5 import smtpd |
|
6 import smtplib |
|
7 import StringIO |
|
8 import sys |
|
9 import time |
|
10 import select |
|
11 |
|
12 from unittest import TestCase |
|
13 from test import test_support |
|
14 |
|
15 HOST = test_support.HOST |
|
16 |
|
17 def server(evt, buf, serv): |
|
18 serv.listen(5) |
|
19 evt.set() |
|
20 try: |
|
21 conn, addr = serv.accept() |
|
22 except socket.timeout: |
|
23 pass |
|
24 else: |
|
25 n = 500 |
|
26 while buf and n > 0: |
|
27 r, w, e = select.select([], [conn], []) |
|
28 if w: |
|
29 sent = conn.send(buf) |
|
30 buf = buf[sent:] |
|
31 |
|
32 n -= 1 |
|
33 |
|
34 conn.close() |
|
35 finally: |
|
36 serv.close() |
|
37 evt.set() |
|
38 |
|
39 class GeneralTests(TestCase): |
|
40 |
|
41 def setUp(self): |
|
42 self.evt = threading.Event() |
|
43 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
44 self.sock.settimeout(15) |
|
45 self.port = test_support.bind_port(self.sock) |
|
46 servargs = (self.evt, "220 Hola mundo\n", self.sock) |
|
47 threading.Thread(target=server, args=servargs).start() |
|
48 self.evt.wait() |
|
49 self.evt.clear() |
|
50 |
|
51 def tearDown(self): |
|
52 self.evt.wait() |
|
53 |
|
54 def testBasic1(self): |
|
55 # connects |
|
56 smtp = smtplib.SMTP(HOST, self.port) |
|
57 smtp.close() |
|
58 |
|
59 def testBasic2(self): |
|
60 # connects, include port in host name |
|
61 smtp = smtplib.SMTP("%s:%s" % (HOST, self.port)) |
|
62 smtp.close() |
|
63 |
|
64 def testLocalHostName(self): |
|
65 # check that supplied local_hostname is used |
|
66 smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost") |
|
67 self.assertEqual(smtp.local_hostname, "testhost") |
|
68 smtp.close() |
|
69 |
|
70 def testTimeoutDefault(self): |
|
71 self.assertTrue(socket.getdefaulttimeout() is None) |
|
72 socket.setdefaulttimeout(30) |
|
73 try: |
|
74 smtp = smtplib.SMTP(HOST, self.port) |
|
75 finally: |
|
76 socket.setdefaulttimeout(None) |
|
77 self.assertEqual(smtp.sock.gettimeout(), 30) |
|
78 smtp.close() |
|
79 |
|
80 def testTimeoutNone(self): |
|
81 self.assertTrue(socket.getdefaulttimeout() is None) |
|
82 socket.setdefaulttimeout(30) |
|
83 try: |
|
84 smtp = smtplib.SMTP(HOST, self.port, timeout=None) |
|
85 finally: |
|
86 socket.setdefaulttimeout(None) |
|
87 self.assertTrue(smtp.sock.gettimeout() is None) |
|
88 smtp.close() |
|
89 |
|
90 def testTimeoutValue(self): |
|
91 smtp = smtplib.SMTP(HOST, self.port, timeout=30) |
|
92 self.assertEqual(smtp.sock.gettimeout(), 30) |
|
93 smtp.close() |
|
94 |
|
95 |
|
96 # Test server thread using the specified SMTP server class |
|
97 def debugging_server(serv, serv_evt, client_evt): |
|
98 serv_evt.set() |
|
99 |
|
100 try: |
|
101 if hasattr(select, 'poll'): |
|
102 poll_fun = asyncore.poll2 |
|
103 else: |
|
104 poll_fun = asyncore.poll |
|
105 |
|
106 n = 1000 |
|
107 while asyncore.socket_map and n > 0: |
|
108 poll_fun(0.01, asyncore.socket_map) |
|
109 |
|
110 # when the client conversation is finished, it will |
|
111 # set client_evt, and it's then ok to kill the server |
|
112 if client_evt.is_set(): |
|
113 serv.close() |
|
114 break |
|
115 |
|
116 n -= 1 |
|
117 |
|
118 except socket.timeout: |
|
119 pass |
|
120 finally: |
|
121 if not client_evt.is_set(): |
|
122 # allow some time for the client to read the result |
|
123 time.sleep(0.5) |
|
124 serv.close() |
|
125 asyncore.close_all() |
|
126 serv_evt.set() |
|
127 |
|
128 MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' |
|
129 MSG_END = '------------ END MESSAGE ------------\n' |
|
130 |
|
131 # NOTE: Some SMTP objects in the tests below are created with a non-default |
|
132 # local_hostname argument to the constructor, since (on some systems) the FQDN |
|
133 # lookup caused by the default local_hostname sometimes takes so long that the |
|
134 # test server times out, causing the test to fail. |
|
135 |
|
136 # Test behavior of smtpd.DebuggingServer |
|
137 class DebuggingServerTests(TestCase): |
|
138 |
|
139 def setUp(self): |
|
140 # temporarily replace sys.stdout to capture DebuggingServer output |
|
141 self.old_stdout = sys.stdout |
|
142 self.output = StringIO.StringIO() |
|
143 sys.stdout = self.output |
|
144 |
|
145 self.serv_evt = threading.Event() |
|
146 self.client_evt = threading.Event() |
|
147 self.port = test_support.find_unused_port() |
|
148 self.serv = smtpd.DebuggingServer((HOST, self.port), ('nowhere', -1)) |
|
149 serv_args = (self.serv, self.serv_evt, self.client_evt) |
|
150 threading.Thread(target=debugging_server, args=serv_args).start() |
|
151 |
|
152 # wait until server thread has assigned a port number |
|
153 self.serv_evt.wait() |
|
154 self.serv_evt.clear() |
|
155 |
|
156 def tearDown(self): |
|
157 # indicate that the client is finished |
|
158 self.client_evt.set() |
|
159 # wait for the server thread to terminate |
|
160 self.serv_evt.wait() |
|
161 # restore sys.stdout |
|
162 sys.stdout = self.old_stdout |
|
163 |
|
164 def testBasic(self): |
|
165 # connect |
|
166 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) |
|
167 smtp.quit() |
|
168 |
|
169 def testNOOP(self): |
|
170 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) |
|
171 expected = (250, 'Ok') |
|
172 self.assertEqual(smtp.noop(), expected) |
|
173 smtp.quit() |
|
174 |
|
175 def testRSET(self): |
|
176 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) |
|
177 expected = (250, 'Ok') |
|
178 self.assertEqual(smtp.rset(), expected) |
|
179 smtp.quit() |
|
180 |
|
181 def testNotImplemented(self): |
|
182 # EHLO isn't implemented in DebuggingServer |
|
183 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) |
|
184 expected = (502, 'Error: command "EHLO" not implemented') |
|
185 self.assertEqual(smtp.ehlo(), expected) |
|
186 smtp.quit() |
|
187 |
|
188 def testVRFY(self): |
|
189 # VRFY isn't implemented in DebuggingServer |
|
190 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) |
|
191 expected = (502, 'Error: command "VRFY" not implemented') |
|
192 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) |
|
193 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) |
|
194 smtp.quit() |
|
195 |
|
196 def testSecondHELO(self): |
|
197 # check that a second HELO returns a message that it's a duplicate |
|
198 # (this behavior is specific to smtpd.SMTPChannel) |
|
199 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) |
|
200 smtp.helo() |
|
201 expected = (503, 'Duplicate HELO/EHLO') |
|
202 self.assertEqual(smtp.helo(), expected) |
|
203 smtp.quit() |
|
204 |
|
205 def testHELP(self): |
|
206 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) |
|
207 self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented') |
|
208 smtp.quit() |
|
209 |
|
210 def testSend(self): |
|
211 # connect and send mail |
|
212 m = 'A test message' |
|
213 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) |
|
214 smtp.sendmail('John', 'Sally', m) |
|
215 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor |
|
216 # in asyncore. This sleep might help, but should really be fixed |
|
217 # properly by using an Event variable. |
|
218 time.sleep(0.01) |
|
219 smtp.quit() |
|
220 |
|
221 self.client_evt.set() |
|
222 self.serv_evt.wait() |
|
223 self.output.flush() |
|
224 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) |
|
225 self.assertEqual(self.output.getvalue(), mexpect) |
|
226 |
|
227 |
|
228 class NonConnectingTests(TestCase): |
|
229 |
|
230 def testNotConnected(self): |
|
231 # Test various operations on an unconnected SMTP object that |
|
232 # should raise exceptions (at present the attempt in SMTP.send |
|
233 # to reference the nonexistent 'sock' attribute of the SMTP object |
|
234 # causes an AttributeError) |
|
235 smtp = smtplib.SMTP() |
|
236 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) |
|
237 self.assertRaises(smtplib.SMTPServerDisconnected, |
|
238 smtp.send, 'test msg') |
|
239 |
|
240 def testNonnumericPort(self): |
|
241 # check that non-numeric port raises socket.error |
|
242 self.assertRaises(socket.error, smtplib.SMTP, |
|
243 "localhost", "bogus") |
|
244 self.assertRaises(socket.error, smtplib.SMTP, |
|
245 "localhost:bogus") |
|
246 |
|
247 |
|
248 # test response of client to a non-successful HELO message |
|
249 class BadHELOServerTests(TestCase): |
|
250 |
|
251 def setUp(self): |
|
252 self.old_stdout = sys.stdout |
|
253 self.output = StringIO.StringIO() |
|
254 sys.stdout = self.output |
|
255 |
|
256 self.evt = threading.Event() |
|
257 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
258 self.sock.settimeout(15) |
|
259 self.port = test_support.bind_port(self.sock) |
|
260 servargs = (self.evt, "199 no hello for you!\n", self.sock) |
|
261 threading.Thread(target=server, args=servargs).start() |
|
262 self.evt.wait() |
|
263 self.evt.clear() |
|
264 |
|
265 def tearDown(self): |
|
266 self.evt.wait() |
|
267 sys.stdout = self.old_stdout |
|
268 |
|
269 def testFailingHELO(self): |
|
270 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, |
|
271 HOST, self.port, 'localhost', 3) |
|
272 |
|
273 |
|
274 sim_users = {'Mr.A@somewhere.com':'John A', |
|
275 'Ms.B@somewhere.com':'Sally B', |
|
276 'Mrs.C@somewhereesle.com':'Ruth C', |
|
277 } |
|
278 |
|
279 sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], |
|
280 'list-2':['Ms.B@somewhere.com',], |
|
281 } |
|
282 |
|
283 # Simulated SMTP channel & server |
|
284 class SimSMTPChannel(smtpd.SMTPChannel): |
|
285 def smtp_EHLO(self, arg): |
|
286 resp = '250-testhost\r\n' \ |
|
287 '250-EXPN\r\n' \ |
|
288 '250-SIZE 20000000\r\n' \ |
|
289 '250-STARTTLS\r\n' \ |
|
290 '250-DELIVERBY\r\n' \ |
|
291 '250 HELP' |
|
292 self.push(resp) |
|
293 |
|
294 def smtp_VRFY(self, arg): |
|
295 # print '\nsmtp_VRFY(%r)\n' % arg |
|
296 |
|
297 raw_addr = email.utils.parseaddr(arg)[1] |
|
298 quoted_addr = smtplib.quoteaddr(arg) |
|
299 if raw_addr in sim_users: |
|
300 self.push('250 %s %s' % (sim_users[raw_addr], quoted_addr)) |
|
301 else: |
|
302 self.push('550 No such user: %s' % arg) |
|
303 |
|
304 def smtp_EXPN(self, arg): |
|
305 # print '\nsmtp_EXPN(%r)\n' % arg |
|
306 |
|
307 list_name = email.utils.parseaddr(arg)[1].lower() |
|
308 if list_name in sim_lists: |
|
309 user_list = sim_lists[list_name] |
|
310 for n, user_email in enumerate(user_list): |
|
311 quoted_addr = smtplib.quoteaddr(user_email) |
|
312 if n < len(user_list) - 1: |
|
313 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) |
|
314 else: |
|
315 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) |
|
316 else: |
|
317 self.push('550 No access for you!') |
|
318 |
|
319 |
|
320 class SimSMTPServer(smtpd.SMTPServer): |
|
321 def handle_accept(self): |
|
322 conn, addr = self.accept() |
|
323 channel = SimSMTPChannel(self, conn, addr) |
|
324 |
|
325 def process_message(self, peer, mailfrom, rcpttos, data): |
|
326 pass |
|
327 |
|
328 |
|
329 # Test various SMTP & ESMTP commands/behaviors that require a simulated server |
|
330 # (i.e., something with more features than DebuggingServer) |
|
331 class SMTPSimTests(TestCase): |
|
332 |
|
333 def setUp(self): |
|
334 self.serv_evt = threading.Event() |
|
335 self.client_evt = threading.Event() |
|
336 self.port = test_support.find_unused_port() |
|
337 self.serv = SimSMTPServer((HOST, self.port), ('nowhere', -1)) |
|
338 serv_args = (self.serv, self.serv_evt, self.client_evt) |
|
339 threading.Thread(target=debugging_server, args=serv_args).start() |
|
340 |
|
341 # wait until server thread has assigned a port number |
|
342 self.serv_evt.wait() |
|
343 self.serv_evt.clear() |
|
344 |
|
345 def tearDown(self): |
|
346 # indicate that the client is finished |
|
347 self.client_evt.set() |
|
348 # wait for the server thread to terminate |
|
349 self.serv_evt.wait() |
|
350 |
|
351 def testBasic(self): |
|
352 # smoke test |
|
353 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) |
|
354 smtp.quit() |
|
355 |
|
356 def testEHLO(self): |
|
357 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) |
|
358 |
|
359 # no features should be present before the EHLO |
|
360 self.assertEqual(smtp.esmtp_features, {}) |
|
361 |
|
362 # features expected from the test server |
|
363 expected_features = {'expn':'', |
|
364 'size': '20000000', |
|
365 'starttls': '', |
|
366 'deliverby': '', |
|
367 'help': '', |
|
368 } |
|
369 |
|
370 smtp.ehlo() |
|
371 self.assertEqual(smtp.esmtp_features, expected_features) |
|
372 for k in expected_features: |
|
373 self.assertTrue(smtp.has_extn(k)) |
|
374 self.assertFalse(smtp.has_extn('unsupported-feature')) |
|
375 smtp.quit() |
|
376 |
|
377 def testVRFY(self): |
|
378 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) |
|
379 |
|
380 for email, name in sim_users.items(): |
|
381 expected_known = (250, '%s %s' % (name, smtplib.quoteaddr(email))) |
|
382 self.assertEqual(smtp.vrfy(email), expected_known) |
|
383 |
|
384 u = 'nobody@nowhere.com' |
|
385 expected_unknown = (550, 'No such user: %s' % smtplib.quoteaddr(u)) |
|
386 self.assertEqual(smtp.vrfy(u), expected_unknown) |
|
387 smtp.quit() |
|
388 |
|
389 def testEXPN(self): |
|
390 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) |
|
391 |
|
392 for listname, members in sim_lists.items(): |
|
393 users = [] |
|
394 for m in members: |
|
395 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) |
|
396 expected_known = (250, '\n'.join(users)) |
|
397 self.assertEqual(smtp.expn(listname), expected_known) |
|
398 |
|
399 u = 'PSU-Members-List' |
|
400 expected_unknown = (550, 'No access for you!') |
|
401 self.assertEqual(smtp.expn(u), expected_unknown) |
|
402 smtp.quit() |
|
403 |
|
404 |
|
405 |
|
406 def test_main(verbose=None): |
|
407 test_support.run_unittest(GeneralTests, DebuggingServerTests, |
|
408 NonConnectingTests, |
|
409 BadHELOServerTests, SMTPSimTests) |
|
410 |
|
411 if __name__ == '__main__': |
|
412 test_main() |