|
1 import os |
|
2 import sys |
|
3 import time |
|
4 import stat |
|
5 import socket |
|
6 import email |
|
7 import email.message |
|
8 import rfc822 |
|
9 import re |
|
10 import StringIO |
|
11 from test import test_support |
|
12 import unittest |
|
13 import mailbox |
|
14 import glob |
|
15 try: |
|
16 import fcntl |
|
17 except ImportError: |
|
18 pass |
|
19 |
|
20 |
|
21 class TestBase(unittest.TestCase): |
|
22 |
|
23 def _check_sample(self, msg): |
|
24 # Inspect a mailbox.Message representation of the sample message |
|
25 self.assert_(isinstance(msg, email.message.Message)) |
|
26 self.assert_(isinstance(msg, mailbox.Message)) |
|
27 for key, value in _sample_headers.iteritems(): |
|
28 self.assert_(value in msg.get_all(key)) |
|
29 self.assert_(msg.is_multipart()) |
|
30 self.assert_(len(msg.get_payload()) == len(_sample_payloads)) |
|
31 for i, payload in enumerate(_sample_payloads): |
|
32 part = msg.get_payload(i) |
|
33 self.assert_(isinstance(part, email.message.Message)) |
|
34 self.assert_(not isinstance(part, mailbox.Message)) |
|
35 self.assert_(part.get_payload() == payload) |
|
36 |
|
37 def _delete_recursively(self, target): |
|
38 # Delete a file or delete a directory recursively |
|
39 if os.path.isdir(target): |
|
40 for path, dirs, files in os.walk(target, topdown=False): |
|
41 for name in files: |
|
42 os.remove(os.path.join(path, name)) |
|
43 for name in dirs: |
|
44 os.rmdir(os.path.join(path, name)) |
|
45 os.rmdir(target) |
|
46 elif os.path.exists(target): |
|
47 os.remove(target) |
|
48 |
|
49 |
|
50 class TestMailbox(TestBase): |
|
51 |
|
52 _factory = None # Overridden by subclasses to reuse tests |
|
53 _template = 'From: foo\n\n%s' |
|
54 |
|
55 def setUp(self): |
|
56 self._path = test_support.TESTFN |
|
57 self._delete_recursively(self._path) |
|
58 self._box = self._factory(self._path) |
|
59 |
|
60 def tearDown(self): |
|
61 self._box.close() |
|
62 self._delete_recursively(self._path) |
|
63 |
|
64 def test_add(self): |
|
65 # Add copies of a sample message |
|
66 keys = [] |
|
67 keys.append(self._box.add(self._template % 0)) |
|
68 self.assert_(len(self._box) == 1) |
|
69 keys.append(self._box.add(mailbox.Message(_sample_message))) |
|
70 self.assert_(len(self._box) == 2) |
|
71 keys.append(self._box.add(email.message_from_string(_sample_message))) |
|
72 self.assert_(len(self._box) == 3) |
|
73 keys.append(self._box.add(StringIO.StringIO(_sample_message))) |
|
74 self.assert_(len(self._box) == 4) |
|
75 keys.append(self._box.add(_sample_message)) |
|
76 self.assert_(len(self._box) == 5) |
|
77 self.assert_(self._box.get_string(keys[0]) == self._template % 0) |
|
78 for i in (1, 2, 3, 4): |
|
79 self._check_sample(self._box[keys[i]]) |
|
80 |
|
81 def test_remove(self): |
|
82 # Remove messages using remove() |
|
83 self._test_remove_or_delitem(self._box.remove) |
|
84 |
|
85 def test_delitem(self): |
|
86 # Remove messages using __delitem__() |
|
87 self._test_remove_or_delitem(self._box.__delitem__) |
|
88 |
|
89 def _test_remove_or_delitem(self, method): |
|
90 # (Used by test_remove() and test_delitem().) |
|
91 key0 = self._box.add(self._template % 0) |
|
92 key1 = self._box.add(self._template % 1) |
|
93 self.assert_(len(self._box) == 2) |
|
94 method(key0) |
|
95 l = len(self._box) |
|
96 self.assert_(l == 1, "actual l: %s" % l) |
|
97 self.assertRaises(KeyError, lambda: self._box[key0]) |
|
98 self.assertRaises(KeyError, lambda: method(key0)) |
|
99 self.assert_(self._box.get_string(key1) == self._template % 1) |
|
100 key2 = self._box.add(self._template % 2) |
|
101 self.assert_(len(self._box) == 2) |
|
102 method(key2) |
|
103 l = len(self._box) |
|
104 self.assert_(l == 1, "actual l: %s" % l) |
|
105 self.assertRaises(KeyError, lambda: self._box[key2]) |
|
106 self.assertRaises(KeyError, lambda: method(key2)) |
|
107 self.assert_(self._box.get_string(key1) == self._template % 1) |
|
108 method(key1) |
|
109 self.assert_(len(self._box) == 0) |
|
110 self.assertRaises(KeyError, lambda: self._box[key1]) |
|
111 self.assertRaises(KeyError, lambda: method(key1)) |
|
112 |
|
113 def test_discard(self, repetitions=10): |
|
114 # Discard messages |
|
115 key0 = self._box.add(self._template % 0) |
|
116 key1 = self._box.add(self._template % 1) |
|
117 self.assert_(len(self._box) == 2) |
|
118 self._box.discard(key0) |
|
119 self.assert_(len(self._box) == 1) |
|
120 self.assertRaises(KeyError, lambda: self._box[key0]) |
|
121 self._box.discard(key0) |
|
122 self.assert_(len(self._box) == 1) |
|
123 self.assertRaises(KeyError, lambda: self._box[key0]) |
|
124 |
|
125 def test_get(self): |
|
126 # Retrieve messages using get() |
|
127 key0 = self._box.add(self._template % 0) |
|
128 msg = self._box.get(key0) |
|
129 self.assert_(msg['from'] == 'foo') |
|
130 self.assert_(msg.get_payload() == '0') |
|
131 self.assert_(self._box.get('foo') is None) |
|
132 self.assert_(self._box.get('foo', False) is False) |
|
133 self._box.close() |
|
134 self._box = self._factory(self._path, factory=rfc822.Message) |
|
135 key1 = self._box.add(self._template % 1) |
|
136 msg = self._box.get(key1) |
|
137 self.assert_(msg['from'] == 'foo') |
|
138 self.assert_(msg.fp.read() == '1') |
|
139 |
|
140 def test_getitem(self): |
|
141 # Retrieve message using __getitem__() |
|
142 key0 = self._box.add(self._template % 0) |
|
143 msg = self._box[key0] |
|
144 self.assert_(msg['from'] == 'foo') |
|
145 self.assert_(msg.get_payload() == '0') |
|
146 self.assertRaises(KeyError, lambda: self._box['foo']) |
|
147 self._box.discard(key0) |
|
148 self.assertRaises(KeyError, lambda: self._box[key0]) |
|
149 |
|
150 def test_get_message(self): |
|
151 # Get Message representations of messages |
|
152 key0 = self._box.add(self._template % 0) |
|
153 key1 = self._box.add(_sample_message) |
|
154 msg0 = self._box.get_message(key0) |
|
155 self.assert_(isinstance(msg0, mailbox.Message)) |
|
156 self.assert_(msg0['from'] == 'foo') |
|
157 self.assert_(msg0.get_payload() == '0') |
|
158 self._check_sample(self._box.get_message(key1)) |
|
159 |
|
160 def test_get_string(self): |
|
161 # Get string representations of messages |
|
162 key0 = self._box.add(self._template % 0) |
|
163 key1 = self._box.add(_sample_message) |
|
164 self.assert_(self._box.get_string(key0) == self._template % 0) |
|
165 self.assert_(self._box.get_string(key1) == _sample_message) |
|
166 |
|
167 def test_get_file(self): |
|
168 # Get file representations of messages |
|
169 key0 = self._box.add(self._template % 0) |
|
170 key1 = self._box.add(_sample_message) |
|
171 self.assert_(self._box.get_file(key0).read().replace(os.linesep, '\n') |
|
172 == self._template % 0) |
|
173 self.assert_(self._box.get_file(key1).read().replace(os.linesep, '\n') |
|
174 == _sample_message) |
|
175 |
|
176 def test_iterkeys(self): |
|
177 # Get keys using iterkeys() |
|
178 self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) |
|
179 |
|
180 def test_keys(self): |
|
181 # Get keys using keys() |
|
182 self._check_iteration(self._box.keys, do_keys=True, do_values=False) |
|
183 |
|
184 def test_itervalues(self): |
|
185 # Get values using itervalues() |
|
186 self._check_iteration(self._box.itervalues, do_keys=False, |
|
187 do_values=True) |
|
188 |
|
189 def test_iter(self): |
|
190 # Get values using __iter__() |
|
191 self._check_iteration(self._box.__iter__, do_keys=False, |
|
192 do_values=True) |
|
193 |
|
194 def test_values(self): |
|
195 # Get values using values() |
|
196 self._check_iteration(self._box.values, do_keys=False, do_values=True) |
|
197 |
|
198 def test_iteritems(self): |
|
199 # Get keys and values using iteritems() |
|
200 self._check_iteration(self._box.iteritems, do_keys=True, |
|
201 do_values=True) |
|
202 |
|
203 def test_items(self): |
|
204 # Get keys and values using items() |
|
205 self._check_iteration(self._box.items, do_keys=True, do_values=True) |
|
206 |
|
207 def _check_iteration(self, method, do_keys, do_values, repetitions=10): |
|
208 for value in method(): |
|
209 self.fail("Not empty") |
|
210 keys, values = [], [] |
|
211 for i in xrange(repetitions): |
|
212 keys.append(self._box.add(self._template % i)) |
|
213 values.append(self._template % i) |
|
214 if do_keys and not do_values: |
|
215 returned_keys = list(method()) |
|
216 elif do_values and not do_keys: |
|
217 returned_values = list(method()) |
|
218 else: |
|
219 returned_keys, returned_values = [], [] |
|
220 for key, value in method(): |
|
221 returned_keys.append(key) |
|
222 returned_values.append(value) |
|
223 if do_keys: |
|
224 self.assert_(len(keys) == len(returned_keys)) |
|
225 self.assert_(set(keys) == set(returned_keys)) |
|
226 if do_values: |
|
227 count = 0 |
|
228 for value in returned_values: |
|
229 self.assert_(value['from'] == 'foo') |
|
230 self.assert_(int(value.get_payload()) < repetitions) |
|
231 count += 1 |
|
232 self.assert_(len(values) == count) |
|
233 |
|
234 def test_has_key(self): |
|
235 # Check existence of keys using has_key() |
|
236 self._test_has_key_or_contains(self._box.has_key) |
|
237 |
|
238 def test_contains(self): |
|
239 # Check existence of keys using __contains__() |
|
240 self._test_has_key_or_contains(self._box.__contains__) |
|
241 |
|
242 def _test_has_key_or_contains(self, method): |
|
243 # (Used by test_has_key() and test_contains().) |
|
244 self.assert_(not method('foo')) |
|
245 key0 = self._box.add(self._template % 0) |
|
246 self.assert_(method(key0)) |
|
247 self.assert_(not method('foo')) |
|
248 key1 = self._box.add(self._template % 1) |
|
249 self.assert_(method(key1)) |
|
250 self.assert_(method(key0)) |
|
251 self.assert_(not method('foo')) |
|
252 self._box.remove(key0) |
|
253 self.assert_(not method(key0)) |
|
254 self.assert_(method(key1)) |
|
255 self.assert_(not method('foo')) |
|
256 self._box.remove(key1) |
|
257 self.assert_(not method(key1)) |
|
258 self.assert_(not method(key0)) |
|
259 self.assert_(not method('foo')) |
|
260 |
|
261 def test_len(self, repetitions=10): |
|
262 # Get message count |
|
263 keys = [] |
|
264 for i in xrange(repetitions): |
|
265 self.assert_(len(self._box) == i) |
|
266 keys.append(self._box.add(self._template % i)) |
|
267 self.assert_(len(self._box) == i + 1) |
|
268 for i in xrange(repetitions): |
|
269 self.assert_(len(self._box) == repetitions - i) |
|
270 self._box.remove(keys[i]) |
|
271 self.assert_(len(self._box) == repetitions - i - 1) |
|
272 |
|
273 def test_set_item(self): |
|
274 # Modify messages using __setitem__() |
|
275 key0 = self._box.add(self._template % 'original 0') |
|
276 self.assert_(self._box.get_string(key0) == \ |
|
277 self._template % 'original 0') |
|
278 key1 = self._box.add(self._template % 'original 1') |
|
279 self.assert_(self._box.get_string(key1) == \ |
|
280 self._template % 'original 1') |
|
281 self._box[key0] = self._template % 'changed 0' |
|
282 self.assert_(self._box.get_string(key0) == \ |
|
283 self._template % 'changed 0') |
|
284 self._box[key1] = self._template % 'changed 1' |
|
285 self.assert_(self._box.get_string(key1) == \ |
|
286 self._template % 'changed 1') |
|
287 self._box[key0] = _sample_message |
|
288 self._check_sample(self._box[key0]) |
|
289 self._box[key1] = self._box[key0] |
|
290 self._check_sample(self._box[key1]) |
|
291 self._box[key0] = self._template % 'original 0' |
|
292 self.assert_(self._box.get_string(key0) == |
|
293 self._template % 'original 0') |
|
294 self._check_sample(self._box[key1]) |
|
295 self.assertRaises(KeyError, |
|
296 lambda: self._box.__setitem__('foo', 'bar')) |
|
297 self.assertRaises(KeyError, lambda: self._box['foo']) |
|
298 self.assert_(len(self._box) == 2) |
|
299 |
|
300 def test_clear(self, iterations=10): |
|
301 # Remove all messages using clear() |
|
302 keys = [] |
|
303 for i in xrange(iterations): |
|
304 self._box.add(self._template % i) |
|
305 for i, key in enumerate(keys): |
|
306 self.assert_(self._box.get_string(key) == self._template % i) |
|
307 self._box.clear() |
|
308 self.assert_(len(self._box) == 0) |
|
309 for i, key in enumerate(keys): |
|
310 self.assertRaises(KeyError, lambda: self._box.get_string(key)) |
|
311 |
|
312 def test_pop(self): |
|
313 # Get and remove a message using pop() |
|
314 key0 = self._box.add(self._template % 0) |
|
315 self.assert_(key0 in self._box) |
|
316 key1 = self._box.add(self._template % 1) |
|
317 self.assert_(key1 in self._box) |
|
318 self.assert_(self._box.pop(key0).get_payload() == '0') |
|
319 self.assert_(key0 not in self._box) |
|
320 self.assert_(key1 in self._box) |
|
321 key2 = self._box.add(self._template % 2) |
|
322 self.assert_(key2 in self._box) |
|
323 self.assert_(self._box.pop(key2).get_payload() == '2') |
|
324 self.assert_(key2 not in self._box) |
|
325 self.assert_(key1 in self._box) |
|
326 self.assert_(self._box.pop(key1).get_payload() == '1') |
|
327 self.assert_(key1 not in self._box) |
|
328 self.assert_(len(self._box) == 0) |
|
329 |
|
330 def test_popitem(self, iterations=10): |
|
331 # Get and remove an arbitrary (key, message) using popitem() |
|
332 keys = [] |
|
333 for i in xrange(10): |
|
334 keys.append(self._box.add(self._template % i)) |
|
335 seen = [] |
|
336 for i in xrange(10): |
|
337 key, msg = self._box.popitem() |
|
338 self.assert_(key in keys) |
|
339 self.assert_(key not in seen) |
|
340 seen.append(key) |
|
341 self.assert_(int(msg.get_payload()) == keys.index(key)) |
|
342 self.assert_(len(self._box) == 0) |
|
343 for key in keys: |
|
344 self.assertRaises(KeyError, lambda: self._box[key]) |
|
345 |
|
346 def test_update(self): |
|
347 # Modify multiple messages using update() |
|
348 key0 = self._box.add(self._template % 'original 0') |
|
349 key1 = self._box.add(self._template % 'original 1') |
|
350 key2 = self._box.add(self._template % 'original 2') |
|
351 self._box.update({key0: self._template % 'changed 0', |
|
352 key2: _sample_message}) |
|
353 self.assert_(len(self._box) == 3) |
|
354 self.assert_(self._box.get_string(key0) == |
|
355 self._template % 'changed 0') |
|
356 self.assert_(self._box.get_string(key1) == |
|
357 self._template % 'original 1') |
|
358 self._check_sample(self._box[key2]) |
|
359 self._box.update([(key2, self._template % 'changed 2'), |
|
360 (key1, self._template % 'changed 1'), |
|
361 (key0, self._template % 'original 0')]) |
|
362 self.assert_(len(self._box) == 3) |
|
363 self.assert_(self._box.get_string(key0) == |
|
364 self._template % 'original 0') |
|
365 self.assert_(self._box.get_string(key1) == |
|
366 self._template % 'changed 1') |
|
367 self.assert_(self._box.get_string(key2) == |
|
368 self._template % 'changed 2') |
|
369 self.assertRaises(KeyError, |
|
370 lambda: self._box.update({'foo': 'bar', |
|
371 key0: self._template % "changed 0"})) |
|
372 self.assert_(len(self._box) == 3) |
|
373 self.assert_(self._box.get_string(key0) == |
|
374 self._template % "changed 0") |
|
375 self.assert_(self._box.get_string(key1) == |
|
376 self._template % "changed 1") |
|
377 self.assert_(self._box.get_string(key2) == |
|
378 self._template % "changed 2") |
|
379 |
|
380 def test_flush(self): |
|
381 # Write changes to disk |
|
382 self._test_flush_or_close(self._box.flush, True) |
|
383 |
|
384 def test_lock_unlock(self): |
|
385 # Lock and unlock the mailbox |
|
386 self.assert_(not os.path.exists(self._get_lock_path())) |
|
387 self._box.lock() |
|
388 self.assert_(os.path.exists(self._get_lock_path())) |
|
389 self._box.unlock() |
|
390 self.assert_(not os.path.exists(self._get_lock_path())) |
|
391 |
|
392 def test_close(self): |
|
393 # Close mailbox and flush changes to disk |
|
394 self._test_flush_or_close(self._box.close, False) |
|
395 |
|
396 def _test_flush_or_close(self, method, should_call_close): |
|
397 contents = [self._template % i for i in xrange(3)] |
|
398 self._box.add(contents[0]) |
|
399 self._box.add(contents[1]) |
|
400 self._box.add(contents[2]) |
|
401 method() |
|
402 if should_call_close: |
|
403 self._box.close() |
|
404 self._box = self._factory(self._path) |
|
405 keys = self._box.keys() |
|
406 self.assert_(len(keys) == 3) |
|
407 for key in keys: |
|
408 self.assert_(self._box.get_string(key) in contents) |
|
409 |
|
410 def test_dump_message(self): |
|
411 # Write message representations to disk |
|
412 for input in (email.message_from_string(_sample_message), |
|
413 _sample_message, StringIO.StringIO(_sample_message)): |
|
414 output = StringIO.StringIO() |
|
415 self._box._dump_message(input, output) |
|
416 self.assert_(output.getvalue() == |
|
417 _sample_message.replace('\n', os.linesep)) |
|
418 output = StringIO.StringIO() |
|
419 self.assertRaises(TypeError, |
|
420 lambda: self._box._dump_message(None, output)) |
|
421 |
|
422 def _get_lock_path(self): |
|
423 # Return the path of the dot lock file. May be overridden. |
|
424 return self._path + '.lock' |
|
425 |
|
426 |
|
427 class TestMailboxSuperclass(TestBase): |
|
428 |
|
429 def test_notimplemented(self): |
|
430 # Test that all Mailbox methods raise NotImplementedException. |
|
431 box = mailbox.Mailbox('path') |
|
432 self.assertRaises(NotImplementedError, lambda: box.add('')) |
|
433 self.assertRaises(NotImplementedError, lambda: box.remove('')) |
|
434 self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) |
|
435 self.assertRaises(NotImplementedError, lambda: box.discard('')) |
|
436 self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) |
|
437 self.assertRaises(NotImplementedError, lambda: box.iterkeys()) |
|
438 self.assertRaises(NotImplementedError, lambda: box.keys()) |
|
439 self.assertRaises(NotImplementedError, lambda: box.itervalues().next()) |
|
440 self.assertRaises(NotImplementedError, lambda: box.__iter__().next()) |
|
441 self.assertRaises(NotImplementedError, lambda: box.values()) |
|
442 self.assertRaises(NotImplementedError, lambda: box.iteritems().next()) |
|
443 self.assertRaises(NotImplementedError, lambda: box.items()) |
|
444 self.assertRaises(NotImplementedError, lambda: box.get('')) |
|
445 self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) |
|
446 self.assertRaises(NotImplementedError, lambda: box.get_message('')) |
|
447 self.assertRaises(NotImplementedError, lambda: box.get_string('')) |
|
448 self.assertRaises(NotImplementedError, lambda: box.get_file('')) |
|
449 self.assertRaises(NotImplementedError, lambda: box.has_key('')) |
|
450 self.assertRaises(NotImplementedError, lambda: box.__contains__('')) |
|
451 self.assertRaises(NotImplementedError, lambda: box.__len__()) |
|
452 self.assertRaises(NotImplementedError, lambda: box.clear()) |
|
453 self.assertRaises(NotImplementedError, lambda: box.pop('')) |
|
454 self.assertRaises(NotImplementedError, lambda: box.popitem()) |
|
455 self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) |
|
456 self.assertRaises(NotImplementedError, lambda: box.flush()) |
|
457 self.assertRaises(NotImplementedError, lambda: box.lock()) |
|
458 self.assertRaises(NotImplementedError, lambda: box.unlock()) |
|
459 self.assertRaises(NotImplementedError, lambda: box.close()) |
|
460 |
|
461 |
|
462 class TestMaildir(TestMailbox): |
|
463 |
|
464 _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) |
|
465 |
|
466 def setUp(self): |
|
467 TestMailbox.setUp(self) |
|
468 if os.name in ('nt', 'os2') or sys.platform == 'cygwin': |
|
469 self._box.colon = '!' |
|
470 |
|
471 def test_add_MM(self): |
|
472 # Add a MaildirMessage instance |
|
473 msg = mailbox.MaildirMessage(self._template % 0) |
|
474 msg.set_subdir('cur') |
|
475 msg.set_info('foo') |
|
476 key = self._box.add(msg) |
|
477 self.assert_(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % |
|
478 (key, self._box.colon)))) |
|
479 |
|
480 def test_get_MM(self): |
|
481 # Get a MaildirMessage instance |
|
482 msg = mailbox.MaildirMessage(self._template % 0) |
|
483 msg.set_subdir('cur') |
|
484 msg.set_flags('RF') |
|
485 key = self._box.add(msg) |
|
486 msg_returned = self._box.get_message(key) |
|
487 self.assert_(isinstance(msg_returned, mailbox.MaildirMessage)) |
|
488 self.assert_(msg_returned.get_subdir() == 'cur') |
|
489 self.assert_(msg_returned.get_flags() == 'FR') |
|
490 |
|
491 def test_set_MM(self): |
|
492 # Set with a MaildirMessage instance |
|
493 msg0 = mailbox.MaildirMessage(self._template % 0) |
|
494 msg0.set_flags('TP') |
|
495 key = self._box.add(msg0) |
|
496 msg_returned = self._box.get_message(key) |
|
497 self.assert_(msg_returned.get_subdir() == 'new') |
|
498 self.assert_(msg_returned.get_flags() == 'PT') |
|
499 msg1 = mailbox.MaildirMessage(self._template % 1) |
|
500 self._box[key] = msg1 |
|
501 msg_returned = self._box.get_message(key) |
|
502 self.assert_(msg_returned.get_subdir() == 'new') |
|
503 self.assert_(msg_returned.get_flags() == '') |
|
504 self.assert_(msg_returned.get_payload() == '1') |
|
505 msg2 = mailbox.MaildirMessage(self._template % 2) |
|
506 msg2.set_info('2,S') |
|
507 self._box[key] = msg2 |
|
508 self._box[key] = self._template % 3 |
|
509 msg_returned = self._box.get_message(key) |
|
510 self.assert_(msg_returned.get_subdir() == 'new') |
|
511 self.assert_(msg_returned.get_flags() == 'S') |
|
512 self.assert_(msg_returned.get_payload() == '3') |
|
513 |
|
514 def test_consistent_factory(self): |
|
515 # Add a message. |
|
516 msg = mailbox.MaildirMessage(self._template % 0) |
|
517 msg.set_subdir('cur') |
|
518 msg.set_flags('RF') |
|
519 key = self._box.add(msg) |
|
520 |
|
521 # Create new mailbox with |
|
522 class FakeMessage(mailbox.MaildirMessage): |
|
523 pass |
|
524 box = mailbox.Maildir(self._path, factory=FakeMessage) |
|
525 box.colon = self._box.colon |
|
526 msg2 = box.get_message(key) |
|
527 self.assert_(isinstance(msg2, FakeMessage)) |
|
528 |
|
529 def test_initialize_new(self): |
|
530 # Initialize a non-existent mailbox |
|
531 self.tearDown() |
|
532 self._box = mailbox.Maildir(self._path) |
|
533 self._check_basics(factory=rfc822.Message) |
|
534 self._delete_recursively(self._path) |
|
535 self._box = self._factory(self._path, factory=None) |
|
536 self._check_basics() |
|
537 |
|
538 def test_initialize_existing(self): |
|
539 # Initialize an existing mailbox |
|
540 self.tearDown() |
|
541 for subdir in '', 'tmp', 'new', 'cur': |
|
542 os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) |
|
543 self._box = mailbox.Maildir(self._path) |
|
544 self._check_basics(factory=rfc822.Message) |
|
545 self._box = mailbox.Maildir(self._path, factory=None) |
|
546 self._check_basics() |
|
547 |
|
548 def _check_basics(self, factory=None): |
|
549 # (Used by test_open_new() and test_open_existing().) |
|
550 self.assertEqual(self._box._path, os.path.abspath(self._path)) |
|
551 self.assertEqual(self._box._factory, factory) |
|
552 for subdir in '', 'tmp', 'new', 'cur': |
|
553 path = os.path.join(self._path, subdir) |
|
554 mode = os.stat(path)[stat.ST_MODE] |
|
555 self.assert_(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) |
|
556 |
|
557 def test_list_folders(self): |
|
558 # List folders |
|
559 self._box.add_folder('one') |
|
560 self._box.add_folder('two') |
|
561 self._box.add_folder('three') |
|
562 self.assert_(len(self._box.list_folders()) == 3) |
|
563 self.assert_(set(self._box.list_folders()) == |
|
564 set(('one', 'two', 'three'))) |
|
565 |
|
566 def test_get_folder(self): |
|
567 # Open folders |
|
568 self._box.add_folder('foo.bar') |
|
569 folder0 = self._box.get_folder('foo.bar') |
|
570 folder0.add(self._template % 'bar') |
|
571 self.assert_(os.path.isdir(os.path.join(self._path, '.foo.bar'))) |
|
572 folder1 = self._box.get_folder('foo.bar') |
|
573 self.assert_(folder1.get_string(folder1.keys()[0]) == \ |
|
574 self._template % 'bar') |
|
575 |
|
576 def test_add_and_remove_folders(self): |
|
577 # Delete folders |
|
578 self._box.add_folder('one') |
|
579 self._box.add_folder('two') |
|
580 self.assert_(len(self._box.list_folders()) == 2) |
|
581 self.assert_(set(self._box.list_folders()) == set(('one', 'two'))) |
|
582 self._box.remove_folder('one') |
|
583 self.assert_(len(self._box.list_folders()) == 1) |
|
584 self.assert_(set(self._box.list_folders()) == set(('two',))) |
|
585 self._box.add_folder('three') |
|
586 self.assert_(len(self._box.list_folders()) == 2) |
|
587 self.assert_(set(self._box.list_folders()) == set(('two', 'three'))) |
|
588 self._box.remove_folder('three') |
|
589 self.assert_(len(self._box.list_folders()) == 1) |
|
590 self.assert_(set(self._box.list_folders()) == set(('two',))) |
|
591 self._box.remove_folder('two') |
|
592 self.assert_(len(self._box.list_folders()) == 0) |
|
593 self.assert_(self._box.list_folders() == []) |
|
594 |
|
595 def test_clean(self): |
|
596 # Remove old files from 'tmp' |
|
597 foo_path = os.path.join(self._path, 'tmp', 'foo') |
|
598 bar_path = os.path.join(self._path, 'tmp', 'bar') |
|
599 f = open(foo_path, 'w') |
|
600 f.write("@") |
|
601 f.close() |
|
602 f = open(bar_path, 'w') |
|
603 f.write("@") |
|
604 f.close() |
|
605 self._box.clean() |
|
606 self.assert_(os.path.exists(foo_path)) |
|
607 self.assert_(os.path.exists(bar_path)) |
|
608 foo_stat = os.stat(foo_path) |
|
609 os.utime(foo_path, (time.time() - 129600 - 2, |
|
610 foo_stat.st_mtime)) |
|
611 self._box.clean() |
|
612 self.assert_(not os.path.exists(foo_path)) |
|
613 self.assert_(os.path.exists(bar_path)) |
|
614 |
|
615 def test_create_tmp(self, repetitions=10): |
|
616 # Create files in tmp directory |
|
617 hostname = socket.gethostname() |
|
618 if '/' in hostname: |
|
619 hostname = hostname.replace('/', r'\057') |
|
620 if ':' in hostname: |
|
621 hostname = hostname.replace(':', r'\072') |
|
622 pid = os.getpid() |
|
623 pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)" |
|
624 r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)") |
|
625 previous_groups = None |
|
626 for x in xrange(repetitions): |
|
627 tmp_file = self._box._create_tmp() |
|
628 head, tail = os.path.split(tmp_file.name) |
|
629 self.assertEqual(head, os.path.abspath(os.path.join(self._path, |
|
630 "tmp")), |
|
631 "File in wrong location: '%s'" % head) |
|
632 match = pattern.match(tail) |
|
633 self.assert_(match is not None, "Invalid file name: '%s'" % tail) |
|
634 groups = match.groups() |
|
635 if previous_groups is not None: |
|
636 self.assert_(int(groups[0] >= previous_groups[0]), |
|
637 "Non-monotonic seconds: '%s' before '%s'" % |
|
638 (previous_groups[0], groups[0])) |
|
639 self.assert_(int(groups[1] >= previous_groups[1]) or |
|
640 groups[0] != groups[1], |
|
641 "Non-monotonic milliseconds: '%s' before '%s'" % |
|
642 (previous_groups[1], groups[1])) |
|
643 self.assert_(int(groups[2]) == pid, |
|
644 "Process ID mismatch: '%s' should be '%s'" % |
|
645 (groups[2], pid)) |
|
646 self.assert_(int(groups[3]) == int(previous_groups[3]) + 1, |
|
647 "Non-sequential counter: '%s' before '%s'" % |
|
648 (previous_groups[3], groups[3])) |
|
649 self.assert_(groups[4] == hostname, |
|
650 "Host name mismatch: '%s' should be '%s'" % |
|
651 (groups[4], hostname)) |
|
652 previous_groups = groups |
|
653 tmp_file.write(_sample_message) |
|
654 tmp_file.seek(0) |
|
655 self.assert_(tmp_file.read() == _sample_message) |
|
656 tmp_file.close() |
|
657 file_count = len(os.listdir(os.path.join(self._path, "tmp"))) |
|
658 self.assert_(file_count == repetitions, |
|
659 "Wrong file count: '%s' should be '%s'" % |
|
660 (file_count, repetitions)) |
|
661 |
|
662 def test_refresh(self): |
|
663 # Update the table of contents |
|
664 self.assert_(self._box._toc == {}) |
|
665 key0 = self._box.add(self._template % 0) |
|
666 key1 = self._box.add(self._template % 1) |
|
667 self.assert_(self._box._toc == {}) |
|
668 self._box._refresh() |
|
669 self.assert_(self._box._toc == {key0: os.path.join('new', key0), |
|
670 key1: os.path.join('new', key1)}) |
|
671 key2 = self._box.add(self._template % 2) |
|
672 self.assert_(self._box._toc == {key0: os.path.join('new', key0), |
|
673 key1: os.path.join('new', key1)}) |
|
674 self._box._refresh() |
|
675 self.assert_(self._box._toc == {key0: os.path.join('new', key0), |
|
676 key1: os.path.join('new', key1), |
|
677 key2: os.path.join('new', key2)}) |
|
678 |
|
679 def test_lookup(self): |
|
680 # Look up message subpaths in the TOC |
|
681 self.assertRaises(KeyError, lambda: self._box._lookup('foo')) |
|
682 key0 = self._box.add(self._template % 0) |
|
683 self.assert_(self._box._lookup(key0) == os.path.join('new', key0)) |
|
684 os.remove(os.path.join(self._path, 'new', key0)) |
|
685 self.assert_(self._box._toc == {key0: os.path.join('new', key0)}) |
|
686 self.assertRaises(KeyError, lambda: self._box._lookup(key0)) |
|
687 self.assert_(self._box._toc == {}) |
|
688 |
|
689 def test_lock_unlock(self): |
|
690 # Lock and unlock the mailbox. For Maildir, this does nothing. |
|
691 self._box.lock() |
|
692 self._box.unlock() |
|
693 |
|
694 def test_folder (self): |
|
695 # Test for bug #1569790: verify that folders returned by .get_folder() |
|
696 # use the same factory function. |
|
697 def dummy_factory (s): |
|
698 return None |
|
699 box = self._factory(self._path, factory=dummy_factory) |
|
700 folder = box.add_folder('folder1') |
|
701 self.assert_(folder._factory is dummy_factory) |
|
702 |
|
703 folder1_alias = box.get_folder('folder1') |
|
704 self.assert_(folder1_alias._factory is dummy_factory) |
|
705 |
|
706 def test_directory_in_folder (self): |
|
707 # Test that mailboxes still work if there's a stray extra directory |
|
708 # in a folder. |
|
709 for i in range(10): |
|
710 self._box.add(mailbox.Message(_sample_message)) |
|
711 |
|
712 # Create a stray directory |
|
713 os.mkdir(os.path.join(self._path, 'cur', 'stray-dir')) |
|
714 |
|
715 # Check that looping still works with the directory present. |
|
716 for msg in self._box: |
|
717 pass |
|
718 |
|
719 def test_file_permissions(self): |
|
720 # Verify that message files are created without execute permissions |
|
721 if not hasattr(os, "stat") or not hasattr(os, "umask"): |
|
722 return |
|
723 msg = mailbox.MaildirMessage(self._template % 0) |
|
724 orig_umask = os.umask(0) |
|
725 try: |
|
726 key = self._box.add(msg) |
|
727 finally: |
|
728 os.umask(orig_umask) |
|
729 path = os.path.join(self._path, self._box._lookup(key)) |
|
730 mode = os.stat(path).st_mode |
|
731 self.assert_(mode & 0111 == 0) |
|
732 |
|
733 def test_folder_file_perms(self): |
|
734 # From bug #3228, we want to verify that the file created inside a Maildir |
|
735 # subfolder isn't marked as executable. |
|
736 if not hasattr(os, "stat") or not hasattr(os, "umask"): |
|
737 return |
|
738 |
|
739 orig_umask = os.umask(0) |
|
740 try: |
|
741 subfolder = self._box.add_folder('subfolder') |
|
742 finally: |
|
743 os.umask(orig_umask) |
|
744 |
|
745 path = os.path.join(subfolder._path, 'maildirfolder') |
|
746 st = os.stat(path) |
|
747 perms = st.st_mode |
|
748 self.assertFalse((perms & 0111)) # Execute bits should all be off. |
|
749 |
|
750 |
|
751 class _TestMboxMMDF(TestMailbox): |
|
752 |
|
753 def tearDown(self): |
|
754 self._box.close() |
|
755 self._delete_recursively(self._path) |
|
756 for lock_remnant in glob.glob(self._path + '.*'): |
|
757 test_support.unlink(lock_remnant) |
|
758 |
|
759 def test_add_from_string(self): |
|
760 # Add a string starting with 'From ' to the mailbox |
|
761 key = self._box.add('From foo@bar blah\nFrom: foo\n\n0') |
|
762 self.assert_(self._box[key].get_from() == 'foo@bar blah') |
|
763 self.assert_(self._box[key].get_payload() == '0') |
|
764 |
|
765 def test_add_mbox_or_mmdf_message(self): |
|
766 # Add an mboxMessage or MMDFMessage |
|
767 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): |
|
768 msg = class_('From foo@bar blah\nFrom: foo\n\n0') |
|
769 key = self._box.add(msg) |
|
770 |
|
771 def test_open_close_open(self): |
|
772 # Open and inspect previously-created mailbox |
|
773 values = [self._template % i for i in xrange(3)] |
|
774 for value in values: |
|
775 self._box.add(value) |
|
776 self._box.close() |
|
777 mtime = os.path.getmtime(self._path) |
|
778 self._box = self._factory(self._path) |
|
779 self.assert_(len(self._box) == 3) |
|
780 for key in self._box.iterkeys(): |
|
781 self.assert_(self._box.get_string(key) in values) |
|
782 self._box.close() |
|
783 self.assert_(mtime == os.path.getmtime(self._path)) |
|
784 |
|
785 def test_add_and_close(self): |
|
786 # Verifying that closing a mailbox doesn't change added items |
|
787 self._box.add(_sample_message) |
|
788 for i in xrange(3): |
|
789 self._box.add(self._template % i) |
|
790 self._box.add(_sample_message) |
|
791 self._box._file.flush() |
|
792 self._box._file.seek(0) |
|
793 contents = self._box._file.read() |
|
794 self._box.close() |
|
795 self.assert_(contents == open(self._path, 'rb').read()) |
|
796 self._box = self._factory(self._path) |
|
797 |
|
798 def test_lock_conflict(self): |
|
799 # Fork off a subprocess that will lock the file for 2 seconds, |
|
800 # unlock it, and then exit. |
|
801 if not hasattr(os, 'fork'): |
|
802 return |
|
803 pid = os.fork() |
|
804 if pid == 0: |
|
805 # In the child, lock the mailbox. |
|
806 self._box.lock() |
|
807 time.sleep(2) |
|
808 self._box.unlock() |
|
809 os._exit(0) |
|
810 |
|
811 # In the parent, sleep a bit to give the child time to acquire |
|
812 # the lock. |
|
813 time.sleep(0.5) |
|
814 try: |
|
815 self.assertRaises(mailbox.ExternalClashError, |
|
816 self._box.lock) |
|
817 finally: |
|
818 # Wait for child to exit. Locking should now succeed. |
|
819 exited_pid, status = os.waitpid(pid, 0) |
|
820 |
|
821 self._box.lock() |
|
822 self._box.unlock() |
|
823 |
|
824 def test_relock(self): |
|
825 # Test case for bug #1575506: the mailbox class was locking the |
|
826 # wrong file object in its flush() method. |
|
827 msg = "Subject: sub\n\nbody\n" |
|
828 key1 = self._box.add(msg) |
|
829 self._box.flush() |
|
830 self._box.close() |
|
831 |
|
832 self._box = self._factory(self._path) |
|
833 self._box.lock() |
|
834 key2 = self._box.add(msg) |
|
835 self._box.flush() |
|
836 self.assert_(self._box._locked) |
|
837 self._box.close() |
|
838 |
|
839 |
|
840 class TestMbox(_TestMboxMMDF): |
|
841 |
|
842 _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) |
|
843 |
|
844 def test_file_perms(self): |
|
845 # From bug #3228, we want to verify that the mailbox file isn't executable, |
|
846 # even if the umask is set to something that would leave executable bits set. |
|
847 # We only run this test on platforms that support umask. |
|
848 if hasattr(os, 'umask') and hasattr(os, 'stat'): |
|
849 try: |
|
850 old_umask = os.umask(0077) |
|
851 self._box.close() |
|
852 os.unlink(self._path) |
|
853 self._box = mailbox.mbox(self._path, create=True) |
|
854 self._box.add('') |
|
855 self._box.close() |
|
856 finally: |
|
857 os.umask(old_umask) |
|
858 |
|
859 st = os.stat(self._path) |
|
860 perms = st.st_mode |
|
861 self.assertFalse((perms & 0111)) # Execute bits should all be off. |
|
862 |
|
863 class TestMMDF(_TestMboxMMDF): |
|
864 |
|
865 _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) |
|
866 |
|
867 |
|
868 class TestMH(TestMailbox): |
|
869 |
|
870 _factory = lambda self, path, factory=None: mailbox.MH(path, factory) |
|
871 |
|
872 def test_list_folders(self): |
|
873 # List folders |
|
874 self._box.add_folder('one') |
|
875 self._box.add_folder('two') |
|
876 self._box.add_folder('three') |
|
877 self.assert_(len(self._box.list_folders()) == 3) |
|
878 self.assert_(set(self._box.list_folders()) == |
|
879 set(('one', 'two', 'three'))) |
|
880 |
|
881 def test_get_folder(self): |
|
882 # Open folders |
|
883 def dummy_factory (s): |
|
884 return None |
|
885 self._box = self._factory(self._path, dummy_factory) |
|
886 |
|
887 new_folder = self._box.add_folder('foo.bar') |
|
888 folder0 = self._box.get_folder('foo.bar') |
|
889 folder0.add(self._template % 'bar') |
|
890 self.assert_(os.path.isdir(os.path.join(self._path, 'foo.bar'))) |
|
891 folder1 = self._box.get_folder('foo.bar') |
|
892 self.assert_(folder1.get_string(folder1.keys()[0]) == \ |
|
893 self._template % 'bar') |
|
894 |
|
895 # Test for bug #1569790: verify that folders returned by .get_folder() |
|
896 # use the same factory function. |
|
897 self.assert_(new_folder._factory is self._box._factory) |
|
898 self.assert_(folder0._factory is self._box._factory) |
|
899 |
|
900 def test_add_and_remove_folders(self): |
|
901 # Delete folders |
|
902 self._box.add_folder('one') |
|
903 self._box.add_folder('two') |
|
904 self.assert_(len(self._box.list_folders()) == 2) |
|
905 self.assert_(set(self._box.list_folders()) == set(('one', 'two'))) |
|
906 self._box.remove_folder('one') |
|
907 self.assert_(len(self._box.list_folders()) == 1) |
|
908 self.assert_(set(self._box.list_folders()) == set(('two',))) |
|
909 self._box.add_folder('three') |
|
910 self.assert_(len(self._box.list_folders()) == 2) |
|
911 self.assert_(set(self._box.list_folders()) == set(('two', 'three'))) |
|
912 self._box.remove_folder('three') |
|
913 self.assert_(len(self._box.list_folders()) == 1) |
|
914 self.assert_(set(self._box.list_folders()) == set(('two',))) |
|
915 self._box.remove_folder('two') |
|
916 self.assert_(len(self._box.list_folders()) == 0) |
|
917 self.assert_(self._box.list_folders() == []) |
|
918 |
|
919 def test_sequences(self): |
|
920 # Get and set sequences |
|
921 self.assert_(self._box.get_sequences() == {}) |
|
922 msg0 = mailbox.MHMessage(self._template % 0) |
|
923 msg0.add_sequence('foo') |
|
924 key0 = self._box.add(msg0) |
|
925 self.assert_(self._box.get_sequences() == {'foo':[key0]}) |
|
926 msg1 = mailbox.MHMessage(self._template % 1) |
|
927 msg1.set_sequences(['bar', 'replied', 'foo']) |
|
928 key1 = self._box.add(msg1) |
|
929 self.assert_(self._box.get_sequences() == |
|
930 {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]}) |
|
931 msg0.set_sequences(['flagged']) |
|
932 self._box[key0] = msg0 |
|
933 self.assert_(self._box.get_sequences() == |
|
934 {'foo':[key1], 'bar':[key1], 'replied':[key1], |
|
935 'flagged':[key0]}) |
|
936 self._box.remove(key1) |
|
937 self.assert_(self._box.get_sequences() == {'flagged':[key0]}) |
|
938 |
|
939 def test_pack(self): |
|
940 # Pack the contents of the mailbox |
|
941 msg0 = mailbox.MHMessage(self._template % 0) |
|
942 msg1 = mailbox.MHMessage(self._template % 1) |
|
943 msg2 = mailbox.MHMessage(self._template % 2) |
|
944 msg3 = mailbox.MHMessage(self._template % 3) |
|
945 msg0.set_sequences(['foo', 'unseen']) |
|
946 msg1.set_sequences(['foo']) |
|
947 msg2.set_sequences(['foo', 'flagged']) |
|
948 msg3.set_sequences(['foo', 'bar', 'replied']) |
|
949 key0 = self._box.add(msg0) |
|
950 key1 = self._box.add(msg1) |
|
951 key2 = self._box.add(msg2) |
|
952 key3 = self._box.add(msg3) |
|
953 self.assert_(self._box.get_sequences() == |
|
954 {'foo':[key0,key1,key2,key3], 'unseen':[key0], |
|
955 'flagged':[key2], 'bar':[key3], 'replied':[key3]}) |
|
956 self._box.remove(key2) |
|
957 self.assert_(self._box.get_sequences() == |
|
958 {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3], |
|
959 'replied':[key3]}) |
|
960 self._box.pack() |
|
961 self.assert_(self._box.keys() == [1, 2, 3]) |
|
962 key0 = key0 |
|
963 key1 = key0 + 1 |
|
964 key2 = key1 + 1 |
|
965 self.assert_(self._box.get_sequences() == |
|
966 {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]}) |
|
967 |
|
968 # Test case for packing while holding the mailbox locked. |
|
969 key0 = self._box.add(msg1) |
|
970 key1 = self._box.add(msg1) |
|
971 key2 = self._box.add(msg1) |
|
972 key3 = self._box.add(msg1) |
|
973 |
|
974 self._box.remove(key0) |
|
975 self._box.remove(key2) |
|
976 self._box.lock() |
|
977 self._box.pack() |
|
978 self._box.unlock() |
|
979 self.assert_(self._box.get_sequences() == |
|
980 {'foo':[1, 2, 3, 4, 5], |
|
981 'unseen':[1], 'bar':[3], 'replied':[3]}) |
|
982 |
|
983 def _get_lock_path(self): |
|
984 return os.path.join(self._path, '.mh_sequences.lock') |
|
985 |
|
986 |
|
987 class TestBabyl(TestMailbox): |
|
988 |
|
989 _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) |
|
990 |
|
991 def tearDown(self): |
|
992 self._box.close() |
|
993 self._delete_recursively(self._path) |
|
994 for lock_remnant in glob.glob(self._path + '.*'): |
|
995 test_support.unlink(lock_remnant) |
|
996 |
|
997 def test_labels(self): |
|
998 # Get labels from the mailbox |
|
999 self.assert_(self._box.get_labels() == []) |
|
1000 msg0 = mailbox.BabylMessage(self._template % 0) |
|
1001 msg0.add_label('foo') |
|
1002 key0 = self._box.add(msg0) |
|
1003 self.assert_(self._box.get_labels() == ['foo']) |
|
1004 msg1 = mailbox.BabylMessage(self._template % 1) |
|
1005 msg1.set_labels(['bar', 'answered', 'foo']) |
|
1006 key1 = self._box.add(msg1) |
|
1007 self.assert_(set(self._box.get_labels()) == set(['foo', 'bar'])) |
|
1008 msg0.set_labels(['blah', 'filed']) |
|
1009 self._box[key0] = msg0 |
|
1010 self.assert_(set(self._box.get_labels()) == |
|
1011 set(['foo', 'bar', 'blah'])) |
|
1012 self._box.remove(key1) |
|
1013 self.assert_(set(self._box.get_labels()) == set(['blah'])) |
|
1014 |
|
1015 |
|
1016 class TestMessage(TestBase): |
|
1017 |
|
1018 _factory = mailbox.Message # Overridden by subclasses to reuse tests |
|
1019 |
|
1020 def setUp(self): |
|
1021 self._path = test_support.TESTFN |
|
1022 |
|
1023 def tearDown(self): |
|
1024 self._delete_recursively(self._path) |
|
1025 |
|
1026 def test_initialize_with_eMM(self): |
|
1027 # Initialize based on email.message.Message instance |
|
1028 eMM = email.message_from_string(_sample_message) |
|
1029 msg = self._factory(eMM) |
|
1030 self._post_initialize_hook(msg) |
|
1031 self._check_sample(msg) |
|
1032 |
|
1033 def test_initialize_with_string(self): |
|
1034 # Initialize based on string |
|
1035 msg = self._factory(_sample_message) |
|
1036 self._post_initialize_hook(msg) |
|
1037 self._check_sample(msg) |
|
1038 |
|
1039 def test_initialize_with_file(self): |
|
1040 # Initialize based on contents of file |
|
1041 f = open(self._path, 'w+') |
|
1042 f.write(_sample_message) |
|
1043 f.seek(0) |
|
1044 msg = self._factory(f) |
|
1045 self._post_initialize_hook(msg) |
|
1046 self._check_sample(msg) |
|
1047 f.close() |
|
1048 |
|
1049 def test_initialize_with_nothing(self): |
|
1050 # Initialize without arguments |
|
1051 msg = self._factory() |
|
1052 self._post_initialize_hook(msg) |
|
1053 self.assert_(isinstance(msg, email.message.Message)) |
|
1054 self.assert_(isinstance(msg, mailbox.Message)) |
|
1055 self.assert_(isinstance(msg, self._factory)) |
|
1056 self.assert_(msg.keys() == []) |
|
1057 self.assert_(not msg.is_multipart()) |
|
1058 self.assert_(msg.get_payload() == None) |
|
1059 |
|
1060 def test_initialize_incorrectly(self): |
|
1061 # Initialize with invalid argument |
|
1062 self.assertRaises(TypeError, lambda: self._factory(object())) |
|
1063 |
|
1064 def test_become_message(self): |
|
1065 # Take on the state of another message |
|
1066 eMM = email.message_from_string(_sample_message) |
|
1067 msg = self._factory() |
|
1068 msg._become_message(eMM) |
|
1069 self._check_sample(msg) |
|
1070 |
|
1071 def test_explain_to(self): |
|
1072 # Copy self's format-specific data to other message formats. |
|
1073 # This test is superficial; better ones are in TestMessageConversion. |
|
1074 msg = self._factory() |
|
1075 for class_ in (mailbox.Message, mailbox.MaildirMessage, |
|
1076 mailbox.mboxMessage, mailbox.MHMessage, |
|
1077 mailbox.BabylMessage, mailbox.MMDFMessage): |
|
1078 other_msg = class_() |
|
1079 msg._explain_to(other_msg) |
|
1080 other_msg = email.message.Message() |
|
1081 self.assertRaises(TypeError, lambda: msg._explain_to(other_msg)) |
|
1082 |
|
1083 def _post_initialize_hook(self, msg): |
|
1084 # Overridden by subclasses to check extra things after initialization |
|
1085 pass |
|
1086 |
|
1087 |
|
1088 class TestMaildirMessage(TestMessage): |
|
1089 |
|
1090 _factory = mailbox.MaildirMessage |
|
1091 |
|
1092 def _post_initialize_hook(self, msg): |
|
1093 self.assert_(msg._subdir == 'new') |
|
1094 self.assert_(msg._info == '') |
|
1095 |
|
1096 def test_subdir(self): |
|
1097 # Use get_subdir() and set_subdir() |
|
1098 msg = mailbox.MaildirMessage(_sample_message) |
|
1099 self.assert_(msg.get_subdir() == 'new') |
|
1100 msg.set_subdir('cur') |
|
1101 self.assert_(msg.get_subdir() == 'cur') |
|
1102 msg.set_subdir('new') |
|
1103 self.assert_(msg.get_subdir() == 'new') |
|
1104 self.assertRaises(ValueError, lambda: msg.set_subdir('tmp')) |
|
1105 self.assert_(msg.get_subdir() == 'new') |
|
1106 msg.set_subdir('new') |
|
1107 self.assert_(msg.get_subdir() == 'new') |
|
1108 self._check_sample(msg) |
|
1109 |
|
1110 def test_flags(self): |
|
1111 # Use get_flags(), set_flags(), add_flag(), remove_flag() |
|
1112 msg = mailbox.MaildirMessage(_sample_message) |
|
1113 self.assert_(msg.get_flags() == '') |
|
1114 self.assert_(msg.get_subdir() == 'new') |
|
1115 msg.set_flags('F') |
|
1116 self.assert_(msg.get_subdir() == 'new') |
|
1117 self.assert_(msg.get_flags() == 'F') |
|
1118 msg.set_flags('SDTP') |
|
1119 self.assert_(msg.get_flags() == 'DPST') |
|
1120 msg.add_flag('FT') |
|
1121 self.assert_(msg.get_flags() == 'DFPST') |
|
1122 msg.remove_flag('TDRP') |
|
1123 self.assert_(msg.get_flags() == 'FS') |
|
1124 self.assert_(msg.get_subdir() == 'new') |
|
1125 self._check_sample(msg) |
|
1126 |
|
1127 def test_date(self): |
|
1128 # Use get_date() and set_date() |
|
1129 msg = mailbox.MaildirMessage(_sample_message) |
|
1130 self.assert_(abs(msg.get_date() - time.time()) < 60) |
|
1131 msg.set_date(0.0) |
|
1132 self.assert_(msg.get_date() == 0.0) |
|
1133 |
|
1134 def test_info(self): |
|
1135 # Use get_info() and set_info() |
|
1136 msg = mailbox.MaildirMessage(_sample_message) |
|
1137 self.assert_(msg.get_info() == '') |
|
1138 msg.set_info('1,foo=bar') |
|
1139 self.assert_(msg.get_info() == '1,foo=bar') |
|
1140 self.assertRaises(TypeError, lambda: msg.set_info(None)) |
|
1141 self._check_sample(msg) |
|
1142 |
|
1143 def test_info_and_flags(self): |
|
1144 # Test interaction of info and flag methods |
|
1145 msg = mailbox.MaildirMessage(_sample_message) |
|
1146 self.assert_(msg.get_info() == '') |
|
1147 msg.set_flags('SF') |
|
1148 self.assert_(msg.get_flags() == 'FS') |
|
1149 self.assert_(msg.get_info() == '2,FS') |
|
1150 msg.set_info('1,') |
|
1151 self.assert_(msg.get_flags() == '') |
|
1152 self.assert_(msg.get_info() == '1,') |
|
1153 msg.remove_flag('RPT') |
|
1154 self.assert_(msg.get_flags() == '') |
|
1155 self.assert_(msg.get_info() == '1,') |
|
1156 msg.add_flag('D') |
|
1157 self.assert_(msg.get_flags() == 'D') |
|
1158 self.assert_(msg.get_info() == '2,D') |
|
1159 self._check_sample(msg) |
|
1160 |
|
1161 |
|
1162 class _TestMboxMMDFMessage(TestMessage): |
|
1163 |
|
1164 _factory = mailbox._mboxMMDFMessage |
|
1165 |
|
1166 def _post_initialize_hook(self, msg): |
|
1167 self._check_from(msg) |
|
1168 |
|
1169 def test_initialize_with_unixfrom(self): |
|
1170 # Initialize with a message that already has a _unixfrom attribute |
|
1171 msg = mailbox.Message(_sample_message) |
|
1172 msg.set_unixfrom('From foo@bar blah') |
|
1173 msg = mailbox.mboxMessage(msg) |
|
1174 self.assert_(msg.get_from() == 'foo@bar blah', msg.get_from()) |
|
1175 |
|
1176 def test_from(self): |
|
1177 # Get and set "From " line |
|
1178 msg = mailbox.mboxMessage(_sample_message) |
|
1179 self._check_from(msg) |
|
1180 msg.set_from('foo bar') |
|
1181 self.assert_(msg.get_from() == 'foo bar') |
|
1182 msg.set_from('foo@bar', True) |
|
1183 self._check_from(msg, 'foo@bar') |
|
1184 msg.set_from('blah@temp', time.localtime()) |
|
1185 self._check_from(msg, 'blah@temp') |
|
1186 |
|
1187 def test_flags(self): |
|
1188 # Use get_flags(), set_flags(), add_flag(), remove_flag() |
|
1189 msg = mailbox.mboxMessage(_sample_message) |
|
1190 self.assert_(msg.get_flags() == '') |
|
1191 msg.set_flags('F') |
|
1192 self.assert_(msg.get_flags() == 'F') |
|
1193 msg.set_flags('XODR') |
|
1194 self.assert_(msg.get_flags() == 'RODX') |
|
1195 msg.add_flag('FA') |
|
1196 self.assert_(msg.get_flags() == 'RODFAX') |
|
1197 msg.remove_flag('FDXA') |
|
1198 self.assert_(msg.get_flags() == 'RO') |
|
1199 self._check_sample(msg) |
|
1200 |
|
1201 def _check_from(self, msg, sender=None): |
|
1202 # Check contents of "From " line |
|
1203 if sender is None: |
|
1204 sender = "MAILER-DAEMON" |
|
1205 self.assert_(re.match(sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:" |
|
1206 r"\d{2} \d{4}", msg.get_from()) is not None) |
|
1207 |
|
1208 |
|
1209 class TestMboxMessage(_TestMboxMMDFMessage): |
|
1210 |
|
1211 _factory = mailbox.mboxMessage |
|
1212 |
|
1213 |
|
1214 class TestMHMessage(TestMessage): |
|
1215 |
|
1216 _factory = mailbox.MHMessage |
|
1217 |
|
1218 def _post_initialize_hook(self, msg): |
|
1219 self.assert_(msg._sequences == []) |
|
1220 |
|
1221 def test_sequences(self): |
|
1222 # Get, set, join, and leave sequences |
|
1223 msg = mailbox.MHMessage(_sample_message) |
|
1224 self.assert_(msg.get_sequences() == []) |
|
1225 msg.set_sequences(['foobar']) |
|
1226 self.assert_(msg.get_sequences() == ['foobar']) |
|
1227 msg.set_sequences([]) |
|
1228 self.assert_(msg.get_sequences() == []) |
|
1229 msg.add_sequence('unseen') |
|
1230 self.assert_(msg.get_sequences() == ['unseen']) |
|
1231 msg.add_sequence('flagged') |
|
1232 self.assert_(msg.get_sequences() == ['unseen', 'flagged']) |
|
1233 msg.add_sequence('flagged') |
|
1234 self.assert_(msg.get_sequences() == ['unseen', 'flagged']) |
|
1235 msg.remove_sequence('unseen') |
|
1236 self.assert_(msg.get_sequences() == ['flagged']) |
|
1237 msg.add_sequence('foobar') |
|
1238 self.assert_(msg.get_sequences() == ['flagged', 'foobar']) |
|
1239 msg.remove_sequence('replied') |
|
1240 self.assert_(msg.get_sequences() == ['flagged', 'foobar']) |
|
1241 msg.set_sequences(['foobar', 'replied']) |
|
1242 self.assert_(msg.get_sequences() == ['foobar', 'replied']) |
|
1243 |
|
1244 |
|
1245 class TestBabylMessage(TestMessage): |
|
1246 |
|
1247 _factory = mailbox.BabylMessage |
|
1248 |
|
1249 def _post_initialize_hook(self, msg): |
|
1250 self.assert_(msg._labels == []) |
|
1251 |
|
1252 def test_labels(self): |
|
1253 # Get, set, join, and leave labels |
|
1254 msg = mailbox.BabylMessage(_sample_message) |
|
1255 self.assert_(msg.get_labels() == []) |
|
1256 msg.set_labels(['foobar']) |
|
1257 self.assert_(msg.get_labels() == ['foobar']) |
|
1258 msg.set_labels([]) |
|
1259 self.assert_(msg.get_labels() == []) |
|
1260 msg.add_label('filed') |
|
1261 self.assert_(msg.get_labels() == ['filed']) |
|
1262 msg.add_label('resent') |
|
1263 self.assert_(msg.get_labels() == ['filed', 'resent']) |
|
1264 msg.add_label('resent') |
|
1265 self.assert_(msg.get_labels() == ['filed', 'resent']) |
|
1266 msg.remove_label('filed') |
|
1267 self.assert_(msg.get_labels() == ['resent']) |
|
1268 msg.add_label('foobar') |
|
1269 self.assert_(msg.get_labels() == ['resent', 'foobar']) |
|
1270 msg.remove_label('unseen') |
|
1271 self.assert_(msg.get_labels() == ['resent', 'foobar']) |
|
1272 msg.set_labels(['foobar', 'answered']) |
|
1273 self.assert_(msg.get_labels() == ['foobar', 'answered']) |
|
1274 |
|
1275 def test_visible(self): |
|
1276 # Get, set, and update visible headers |
|
1277 msg = mailbox.BabylMessage(_sample_message) |
|
1278 visible = msg.get_visible() |
|
1279 self.assert_(visible.keys() == []) |
|
1280 self.assert_(visible.get_payload() is None) |
|
1281 visible['User-Agent'] = 'FooBar 1.0' |
|
1282 visible['X-Whatever'] = 'Blah' |
|
1283 self.assert_(msg.get_visible().keys() == []) |
|
1284 msg.set_visible(visible) |
|
1285 visible = msg.get_visible() |
|
1286 self.assert_(visible.keys() == ['User-Agent', 'X-Whatever']) |
|
1287 self.assert_(visible['User-Agent'] == 'FooBar 1.0') |
|
1288 self.assert_(visible['X-Whatever'] == 'Blah') |
|
1289 self.assert_(visible.get_payload() is None) |
|
1290 msg.update_visible() |
|
1291 self.assert_(visible.keys() == ['User-Agent', 'X-Whatever']) |
|
1292 self.assert_(visible.get_payload() is None) |
|
1293 visible = msg.get_visible() |
|
1294 self.assert_(visible.keys() == ['User-Agent', 'Date', 'From', 'To', |
|
1295 'Subject']) |
|
1296 for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): |
|
1297 self.assert_(visible[header] == msg[header]) |
|
1298 |
|
1299 |
|
1300 class TestMMDFMessage(_TestMboxMMDFMessage): |
|
1301 |
|
1302 _factory = mailbox.MMDFMessage |
|
1303 |
|
1304 |
|
1305 class TestMessageConversion(TestBase): |
|
1306 |
|
1307 def test_plain_to_x(self): |
|
1308 # Convert Message to all formats |
|
1309 for class_ in (mailbox.Message, mailbox.MaildirMessage, |
|
1310 mailbox.mboxMessage, mailbox.MHMessage, |
|
1311 mailbox.BabylMessage, mailbox.MMDFMessage): |
|
1312 msg_plain = mailbox.Message(_sample_message) |
|
1313 msg = class_(msg_plain) |
|
1314 self._check_sample(msg) |
|
1315 |
|
1316 def test_x_to_plain(self): |
|
1317 # Convert all formats to Message |
|
1318 for class_ in (mailbox.Message, mailbox.MaildirMessage, |
|
1319 mailbox.mboxMessage, mailbox.MHMessage, |
|
1320 mailbox.BabylMessage, mailbox.MMDFMessage): |
|
1321 msg = class_(_sample_message) |
|
1322 msg_plain = mailbox.Message(msg) |
|
1323 self._check_sample(msg_plain) |
|
1324 |
|
1325 def test_x_to_invalid(self): |
|
1326 # Convert all formats to an invalid format |
|
1327 for class_ in (mailbox.Message, mailbox.MaildirMessage, |
|
1328 mailbox.mboxMessage, mailbox.MHMessage, |
|
1329 mailbox.BabylMessage, mailbox.MMDFMessage): |
|
1330 self.assertRaises(TypeError, lambda: class_(False)) |
|
1331 |
|
1332 def test_maildir_to_maildir(self): |
|
1333 # Convert MaildirMessage to MaildirMessage |
|
1334 msg_maildir = mailbox.MaildirMessage(_sample_message) |
|
1335 msg_maildir.set_flags('DFPRST') |
|
1336 msg_maildir.set_subdir('cur') |
|
1337 date = msg_maildir.get_date() |
|
1338 msg = mailbox.MaildirMessage(msg_maildir) |
|
1339 self._check_sample(msg) |
|
1340 self.assert_(msg.get_flags() == 'DFPRST') |
|
1341 self.assert_(msg.get_subdir() == 'cur') |
|
1342 self.assert_(msg.get_date() == date) |
|
1343 |
|
1344 def test_maildir_to_mboxmmdf(self): |
|
1345 # Convert MaildirMessage to mboxmessage and MMDFMessage |
|
1346 pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'), |
|
1347 ('T', 'D'), ('DFPRST', 'RDFA')) |
|
1348 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): |
|
1349 msg_maildir = mailbox.MaildirMessage(_sample_message) |
|
1350 msg_maildir.set_date(0.0) |
|
1351 for setting, result in pairs: |
|
1352 msg_maildir.set_flags(setting) |
|
1353 msg = class_(msg_maildir) |
|
1354 self.assert_(msg.get_flags() == result) |
|
1355 self.assert_(msg.get_from() == 'MAILER-DAEMON %s' % |
|
1356 time.asctime(time.gmtime(0.0))) |
|
1357 msg_maildir.set_subdir('cur') |
|
1358 self.assert_(class_(msg_maildir).get_flags() == 'RODFA') |
|
1359 |
|
1360 def test_maildir_to_mh(self): |
|
1361 # Convert MaildirMessage to MHMessage |
|
1362 msg_maildir = mailbox.MaildirMessage(_sample_message) |
|
1363 pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']), |
|
1364 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []), |
|
1365 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged'])) |
|
1366 for setting, result in pairs: |
|
1367 msg_maildir.set_flags(setting) |
|
1368 self.assert_(mailbox.MHMessage(msg_maildir).get_sequences() == \ |
|
1369 result) |
|
1370 |
|
1371 def test_maildir_to_babyl(self): |
|
1372 # Convert MaildirMessage to Babyl |
|
1373 msg_maildir = mailbox.MaildirMessage(_sample_message) |
|
1374 pairs = (('D', ['unseen']), ('F', ['unseen']), |
|
1375 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']), |
|
1376 ('S', []), ('T', ['unseen', 'deleted']), |
|
1377 ('DFPRST', ['deleted', 'answered', 'forwarded'])) |
|
1378 for setting, result in pairs: |
|
1379 msg_maildir.set_flags(setting) |
|
1380 self.assert_(mailbox.BabylMessage(msg_maildir).get_labels() == \ |
|
1381 result) |
|
1382 |
|
1383 def test_mboxmmdf_to_maildir(self): |
|
1384 # Convert mboxMessage and MMDFMessage to MaildirMessage |
|
1385 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): |
|
1386 msg_mboxMMDF = class_(_sample_message) |
|
1387 msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0)) |
|
1388 pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'), |
|
1389 ('RODFA', 'FRST')) |
|
1390 for setting, result in pairs: |
|
1391 msg_mboxMMDF.set_flags(setting) |
|
1392 msg = mailbox.MaildirMessage(msg_mboxMMDF) |
|
1393 self.assert_(msg.get_flags() == result) |
|
1394 self.assert_(msg.get_date() == 0.0, msg.get_date()) |
|
1395 msg_mboxMMDF.set_flags('O') |
|
1396 self.assert_(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir() == \ |
|
1397 'cur') |
|
1398 |
|
1399 def test_mboxmmdf_to_mboxmmdf(self): |
|
1400 # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage |
|
1401 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): |
|
1402 msg_mboxMMDF = class_(_sample_message) |
|
1403 msg_mboxMMDF.set_flags('RODFA') |
|
1404 msg_mboxMMDF.set_from('foo@bar') |
|
1405 for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage): |
|
1406 msg2 = class2_(msg_mboxMMDF) |
|
1407 self.assert_(msg2.get_flags() == 'RODFA') |
|
1408 self.assert_(msg2.get_from() == 'foo@bar') |
|
1409 |
|
1410 def test_mboxmmdf_to_mh(self): |
|
1411 # Convert mboxMessage and MMDFMessage to MHMessage |
|
1412 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): |
|
1413 msg_mboxMMDF = class_(_sample_message) |
|
1414 pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']), |
|
1415 ('F', ['unseen', 'flagged']), |
|
1416 ('A', ['unseen', 'replied']), |
|
1417 ('RODFA', ['replied', 'flagged'])) |
|
1418 for setting, result in pairs: |
|
1419 msg_mboxMMDF.set_flags(setting) |
|
1420 self.assert_(mailbox.MHMessage(msg_mboxMMDF).get_sequences() \ |
|
1421 == result) |
|
1422 |
|
1423 def test_mboxmmdf_to_babyl(self): |
|
1424 # Convert mboxMessage and MMDFMessage to BabylMessage |
|
1425 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): |
|
1426 msg = class_(_sample_message) |
|
1427 pairs = (('R', []), ('O', ['unseen']), |
|
1428 ('D', ['unseen', 'deleted']), ('F', ['unseen']), |
|
1429 ('A', ['unseen', 'answered']), |
|
1430 ('RODFA', ['deleted', 'answered'])) |
|
1431 for setting, result in pairs: |
|
1432 msg.set_flags(setting) |
|
1433 self.assert_(mailbox.BabylMessage(msg).get_labels() == result) |
|
1434 |
|
1435 def test_mh_to_maildir(self): |
|
1436 # Convert MHMessage to MaildirMessage |
|
1437 pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS')) |
|
1438 for setting, result in pairs: |
|
1439 msg = mailbox.MHMessage(_sample_message) |
|
1440 msg.add_sequence(setting) |
|
1441 self.assert_(mailbox.MaildirMessage(msg).get_flags() == result) |
|
1442 self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur') |
|
1443 msg = mailbox.MHMessage(_sample_message) |
|
1444 msg.add_sequence('unseen') |
|
1445 msg.add_sequence('replied') |
|
1446 msg.add_sequence('flagged') |
|
1447 self.assert_(mailbox.MaildirMessage(msg).get_flags() == 'FR') |
|
1448 self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur') |
|
1449 |
|
1450 def test_mh_to_mboxmmdf(self): |
|
1451 # Convert MHMessage to mboxMessage and MMDFMessage |
|
1452 pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF')) |
|
1453 for setting, result in pairs: |
|
1454 msg = mailbox.MHMessage(_sample_message) |
|
1455 msg.add_sequence(setting) |
|
1456 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): |
|
1457 self.assert_(class_(msg).get_flags() == result) |
|
1458 msg = mailbox.MHMessage(_sample_message) |
|
1459 msg.add_sequence('unseen') |
|
1460 msg.add_sequence('replied') |
|
1461 msg.add_sequence('flagged') |
|
1462 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): |
|
1463 self.assert_(class_(msg).get_flags() == 'OFA') |
|
1464 |
|
1465 def test_mh_to_mh(self): |
|
1466 # Convert MHMessage to MHMessage |
|
1467 msg = mailbox.MHMessage(_sample_message) |
|
1468 msg.add_sequence('unseen') |
|
1469 msg.add_sequence('replied') |
|
1470 msg.add_sequence('flagged') |
|
1471 self.assert_(mailbox.MHMessage(msg).get_sequences() == \ |
|
1472 ['unseen', 'replied', 'flagged']) |
|
1473 |
|
1474 def test_mh_to_babyl(self): |
|
1475 # Convert MHMessage to BabylMessage |
|
1476 pairs = (('unseen', ['unseen']), ('replied', ['answered']), |
|
1477 ('flagged', [])) |
|
1478 for setting, result in pairs: |
|
1479 msg = mailbox.MHMessage(_sample_message) |
|
1480 msg.add_sequence(setting) |
|
1481 self.assert_(mailbox.BabylMessage(msg).get_labels() == result) |
|
1482 msg = mailbox.MHMessage(_sample_message) |
|
1483 msg.add_sequence('unseen') |
|
1484 msg.add_sequence('replied') |
|
1485 msg.add_sequence('flagged') |
|
1486 self.assert_(mailbox.BabylMessage(msg).get_labels() == \ |
|
1487 ['unseen', 'answered']) |
|
1488 |
|
1489 def test_babyl_to_maildir(self): |
|
1490 # Convert BabylMessage to MaildirMessage |
|
1491 pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'), |
|
1492 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'), |
|
1493 ('resent', 'PS')) |
|
1494 for setting, result in pairs: |
|
1495 msg = mailbox.BabylMessage(_sample_message) |
|
1496 msg.add_label(setting) |
|
1497 self.assert_(mailbox.MaildirMessage(msg).get_flags() == result) |
|
1498 self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur') |
|
1499 msg = mailbox.BabylMessage(_sample_message) |
|
1500 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', |
|
1501 'edited', 'resent'): |
|
1502 msg.add_label(label) |
|
1503 self.assert_(mailbox.MaildirMessage(msg).get_flags() == 'PRT') |
|
1504 self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur') |
|
1505 |
|
1506 def test_babyl_to_mboxmmdf(self): |
|
1507 # Convert BabylMessage to mboxMessage and MMDFMessage |
|
1508 pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'), |
|
1509 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'), |
|
1510 ('resent', 'RO')) |
|
1511 for setting, result in pairs: |
|
1512 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): |
|
1513 msg = mailbox.BabylMessage(_sample_message) |
|
1514 msg.add_label(setting) |
|
1515 self.assert_(class_(msg).get_flags() == result) |
|
1516 msg = mailbox.BabylMessage(_sample_message) |
|
1517 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', |
|
1518 'edited', 'resent'): |
|
1519 msg.add_label(label) |
|
1520 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): |
|
1521 self.assert_(class_(msg).get_flags() == 'ODA') |
|
1522 |
|
1523 def test_babyl_to_mh(self): |
|
1524 # Convert BabylMessage to MHMessage |
|
1525 pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []), |
|
1526 ('answered', ['replied']), ('forwarded', []), ('edited', []), |
|
1527 ('resent', [])) |
|
1528 for setting, result in pairs: |
|
1529 msg = mailbox.BabylMessage(_sample_message) |
|
1530 msg.add_label(setting) |
|
1531 self.assert_(mailbox.MHMessage(msg).get_sequences() == result) |
|
1532 msg = mailbox.BabylMessage(_sample_message) |
|
1533 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', |
|
1534 'edited', 'resent'): |
|
1535 msg.add_label(label) |
|
1536 self.assert_(mailbox.MHMessage(msg).get_sequences() == \ |
|
1537 ['unseen', 'replied']) |
|
1538 |
|
1539 def test_babyl_to_babyl(self): |
|
1540 # Convert BabylMessage to BabylMessage |
|
1541 msg = mailbox.BabylMessage(_sample_message) |
|
1542 msg.update_visible() |
|
1543 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', |
|
1544 'edited', 'resent'): |
|
1545 msg.add_label(label) |
|
1546 msg2 = mailbox.BabylMessage(msg) |
|
1547 self.assert_(msg2.get_labels() == ['unseen', 'deleted', 'filed', |
|
1548 'answered', 'forwarded', 'edited', |
|
1549 'resent']) |
|
1550 self.assert_(msg.get_visible().keys() == msg2.get_visible().keys()) |
|
1551 for key in msg.get_visible().keys(): |
|
1552 self.assert_(msg.get_visible()[key] == msg2.get_visible()[key]) |
|
1553 |
|
1554 |
|
1555 class TestProxyFileBase(TestBase): |
|
1556 |
|
1557 def _test_read(self, proxy): |
|
1558 # Read by byte |
|
1559 proxy.seek(0) |
|
1560 self.assert_(proxy.read() == 'bar') |
|
1561 proxy.seek(1) |
|
1562 self.assert_(proxy.read() == 'ar') |
|
1563 proxy.seek(0) |
|
1564 self.assert_(proxy.read(2) == 'ba') |
|
1565 proxy.seek(1) |
|
1566 self.assert_(proxy.read(-1) == 'ar') |
|
1567 proxy.seek(2) |
|
1568 self.assert_(proxy.read(1000) == 'r') |
|
1569 |
|
1570 def _test_readline(self, proxy): |
|
1571 # Read by line |
|
1572 proxy.seek(0) |
|
1573 self.assert_(proxy.readline() == 'foo' + os.linesep) |
|
1574 self.assert_(proxy.readline() == 'bar' + os.linesep) |
|
1575 self.assert_(proxy.readline() == 'fred' + os.linesep) |
|
1576 self.assert_(proxy.readline() == 'bob') |
|
1577 proxy.seek(2) |
|
1578 self.assert_(proxy.readline() == 'o' + os.linesep) |
|
1579 proxy.seek(6 + 2 * len(os.linesep)) |
|
1580 self.assert_(proxy.readline() == 'fred' + os.linesep) |
|
1581 proxy.seek(6 + 2 * len(os.linesep)) |
|
1582 self.assert_(proxy.readline(2) == 'fr') |
|
1583 self.assert_(proxy.readline(-10) == 'ed' + os.linesep) |
|
1584 |
|
1585 def _test_readlines(self, proxy): |
|
1586 # Read multiple lines |
|
1587 proxy.seek(0) |
|
1588 self.assert_(proxy.readlines() == ['foo' + os.linesep, |
|
1589 'bar' + os.linesep, |
|
1590 'fred' + os.linesep, 'bob']) |
|
1591 proxy.seek(0) |
|
1592 self.assert_(proxy.readlines(2) == ['foo' + os.linesep]) |
|
1593 proxy.seek(3 + len(os.linesep)) |
|
1594 self.assert_(proxy.readlines(4 + len(os.linesep)) == |
|
1595 ['bar' + os.linesep, 'fred' + os.linesep]) |
|
1596 proxy.seek(3) |
|
1597 self.assert_(proxy.readlines(1000) == [os.linesep, 'bar' + os.linesep, |
|
1598 'fred' + os.linesep, 'bob']) |
|
1599 |
|
1600 def _test_iteration(self, proxy): |
|
1601 # Iterate by line |
|
1602 proxy.seek(0) |
|
1603 iterator = iter(proxy) |
|
1604 self.assert_(iterator.next() == 'foo' + os.linesep) |
|
1605 self.assert_(iterator.next() == 'bar' + os.linesep) |
|
1606 self.assert_(iterator.next() == 'fred' + os.linesep) |
|
1607 self.assert_(iterator.next() == 'bob') |
|
1608 self.assertRaises(StopIteration, lambda: iterator.next()) |
|
1609 |
|
1610 def _test_seek_and_tell(self, proxy): |
|
1611 # Seek and use tell to check position |
|
1612 proxy.seek(3) |
|
1613 self.assert_(proxy.tell() == 3) |
|
1614 self.assert_(proxy.read(len(os.linesep)) == os.linesep) |
|
1615 proxy.seek(2, 1) |
|
1616 self.assert_(proxy.read(1 + len(os.linesep)) == 'r' + os.linesep) |
|
1617 proxy.seek(-3 - len(os.linesep), 2) |
|
1618 self.assert_(proxy.read(3) == 'bar') |
|
1619 proxy.seek(2, 0) |
|
1620 self.assert_(proxy.read() == 'o' + os.linesep + 'bar' + os.linesep) |
|
1621 proxy.seek(100) |
|
1622 self.assert_(proxy.read() == '') |
|
1623 |
|
1624 def _test_close(self, proxy): |
|
1625 # Close a file |
|
1626 proxy.close() |
|
1627 self.assertRaises(AttributeError, lambda: proxy.close()) |
|
1628 |
|
1629 |
|
1630 class TestProxyFile(TestProxyFileBase): |
|
1631 |
|
1632 def setUp(self): |
|
1633 self._path = test_support.TESTFN |
|
1634 self._file = open(self._path, 'wb+') |
|
1635 |
|
1636 def tearDown(self): |
|
1637 self._file.close() |
|
1638 self._delete_recursively(self._path) |
|
1639 |
|
1640 def test_initialize(self): |
|
1641 # Initialize and check position |
|
1642 self._file.write('foo') |
|
1643 pos = self._file.tell() |
|
1644 proxy0 = mailbox._ProxyFile(self._file) |
|
1645 self.assert_(proxy0.tell() == pos) |
|
1646 self.assert_(self._file.tell() == pos) |
|
1647 proxy1 = mailbox._ProxyFile(self._file, 0) |
|
1648 self.assert_(proxy1.tell() == 0) |
|
1649 self.assert_(self._file.tell() == pos) |
|
1650 |
|
1651 def test_read(self): |
|
1652 self._file.write('bar') |
|
1653 self._test_read(mailbox._ProxyFile(self._file)) |
|
1654 |
|
1655 def test_readline(self): |
|
1656 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, |
|
1657 os.linesep)) |
|
1658 self._test_readline(mailbox._ProxyFile(self._file)) |
|
1659 |
|
1660 def test_readlines(self): |
|
1661 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, |
|
1662 os.linesep)) |
|
1663 self._test_readlines(mailbox._ProxyFile(self._file)) |
|
1664 |
|
1665 def test_iteration(self): |
|
1666 self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, |
|
1667 os.linesep)) |
|
1668 self._test_iteration(mailbox._ProxyFile(self._file)) |
|
1669 |
|
1670 def test_seek_and_tell(self): |
|
1671 self._file.write('foo%sbar%s' % (os.linesep, os.linesep)) |
|
1672 self._test_seek_and_tell(mailbox._ProxyFile(self._file)) |
|
1673 |
|
1674 def test_close(self): |
|
1675 self._file.write('foo%sbar%s' % (os.linesep, os.linesep)) |
|
1676 self._test_close(mailbox._ProxyFile(self._file)) |
|
1677 |
|
1678 |
|
1679 class TestPartialFile(TestProxyFileBase): |
|
1680 |
|
1681 def setUp(self): |
|
1682 self._path = test_support.TESTFN |
|
1683 self._file = open(self._path, 'wb+') |
|
1684 |
|
1685 def tearDown(self): |
|
1686 self._file.close() |
|
1687 self._delete_recursively(self._path) |
|
1688 |
|
1689 def test_initialize(self): |
|
1690 # Initialize and check position |
|
1691 self._file.write('foo' + os.linesep + 'bar') |
|
1692 pos = self._file.tell() |
|
1693 proxy = mailbox._PartialFile(self._file, 2, 5) |
|
1694 self.assert_(proxy.tell() == 0) |
|
1695 self.assert_(self._file.tell() == pos) |
|
1696 |
|
1697 def test_read(self): |
|
1698 self._file.write('***bar***') |
|
1699 self._test_read(mailbox._PartialFile(self._file, 3, 6)) |
|
1700 |
|
1701 def test_readline(self): |
|
1702 self._file.write('!!!!!foo%sbar%sfred%sbob!!!!!' % |
|
1703 (os.linesep, os.linesep, os.linesep)) |
|
1704 self._test_readline(mailbox._PartialFile(self._file, 5, |
|
1705 18 + 3 * len(os.linesep))) |
|
1706 |
|
1707 def test_readlines(self): |
|
1708 self._file.write('foo%sbar%sfred%sbob?????' % |
|
1709 (os.linesep, os.linesep, os.linesep)) |
|
1710 self._test_readlines(mailbox._PartialFile(self._file, 0, |
|
1711 13 + 3 * len(os.linesep))) |
|
1712 |
|
1713 def test_iteration(self): |
|
1714 self._file.write('____foo%sbar%sfred%sbob####' % |
|
1715 (os.linesep, os.linesep, os.linesep)) |
|
1716 self._test_iteration(mailbox._PartialFile(self._file, 4, |
|
1717 17 + 3 * len(os.linesep))) |
|
1718 |
|
1719 def test_seek_and_tell(self): |
|
1720 self._file.write('(((foo%sbar%s$$$' % (os.linesep, os.linesep)) |
|
1721 self._test_seek_and_tell(mailbox._PartialFile(self._file, 3, |
|
1722 9 + 2 * len(os.linesep))) |
|
1723 |
|
1724 def test_close(self): |
|
1725 self._file.write('&foo%sbar%s^' % (os.linesep, os.linesep)) |
|
1726 self._test_close(mailbox._PartialFile(self._file, 1, |
|
1727 6 + 3 * len(os.linesep))) |
|
1728 |
|
1729 |
|
1730 ## Start: tests from the original module (for backward compatibility). |
|
1731 |
|
1732 FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n" |
|
1733 DUMMY_MESSAGE = """\ |
|
1734 From: some.body@dummy.domain |
|
1735 To: me@my.domain |
|
1736 Subject: Simple Test |
|
1737 |
|
1738 This is a dummy message. |
|
1739 """ |
|
1740 |
|
1741 class MaildirTestCase(unittest.TestCase): |
|
1742 |
|
1743 def setUp(self): |
|
1744 # create a new maildir mailbox to work with: |
|
1745 self._dir = test_support.TESTFN |
|
1746 os.mkdir(self._dir) |
|
1747 os.mkdir(os.path.join(self._dir, "cur")) |
|
1748 os.mkdir(os.path.join(self._dir, "tmp")) |
|
1749 os.mkdir(os.path.join(self._dir, "new")) |
|
1750 self._counter = 1 |
|
1751 self._msgfiles = [] |
|
1752 |
|
1753 def tearDown(self): |
|
1754 map(os.unlink, self._msgfiles) |
|
1755 os.rmdir(os.path.join(self._dir, "cur")) |
|
1756 os.rmdir(os.path.join(self._dir, "tmp")) |
|
1757 os.rmdir(os.path.join(self._dir, "new")) |
|
1758 os.rmdir(self._dir) |
|
1759 |
|
1760 def createMessage(self, dir, mbox=False): |
|
1761 t = int(time.time() % 1000000) |
|
1762 pid = self._counter |
|
1763 self._counter += 1 |
|
1764 filename = os.extsep.join((str(t), str(pid), "myhostname", "mydomain")) |
|
1765 tmpname = os.path.join(self._dir, "tmp", filename) |
|
1766 newname = os.path.join(self._dir, dir, filename) |
|
1767 fp = open(tmpname, "w") |
|
1768 self._msgfiles.append(tmpname) |
|
1769 if mbox: |
|
1770 fp.write(FROM_) |
|
1771 fp.write(DUMMY_MESSAGE) |
|
1772 fp.close() |
|
1773 if hasattr(os, "link"): |
|
1774 os.link(tmpname, newname) |
|
1775 else: |
|
1776 fp = open(newname, "w") |
|
1777 fp.write(DUMMY_MESSAGE) |
|
1778 fp.close() |
|
1779 self._msgfiles.append(newname) |
|
1780 return tmpname |
|
1781 |
|
1782 def test_empty_maildir(self): |
|
1783 """Test an empty maildir mailbox""" |
|
1784 # Test for regression on bug #117490: |
|
1785 # Make sure the boxes attribute actually gets set. |
|
1786 self.mbox = mailbox.Maildir(test_support.TESTFN) |
|
1787 #self.assert_(hasattr(self.mbox, "boxes")) |
|
1788 #self.assert_(len(self.mbox.boxes) == 0) |
|
1789 self.assert_(self.mbox.next() is None) |
|
1790 self.assert_(self.mbox.next() is None) |
|
1791 |
|
1792 def test_nonempty_maildir_cur(self): |
|
1793 self.createMessage("cur") |
|
1794 self.mbox = mailbox.Maildir(test_support.TESTFN) |
|
1795 #self.assert_(len(self.mbox.boxes) == 1) |
|
1796 self.assert_(self.mbox.next() is not None) |
|
1797 self.assert_(self.mbox.next() is None) |
|
1798 self.assert_(self.mbox.next() is None) |
|
1799 |
|
1800 def test_nonempty_maildir_new(self): |
|
1801 self.createMessage("new") |
|
1802 self.mbox = mailbox.Maildir(test_support.TESTFN) |
|
1803 #self.assert_(len(self.mbox.boxes) == 1) |
|
1804 self.assert_(self.mbox.next() is not None) |
|
1805 self.assert_(self.mbox.next() is None) |
|
1806 self.assert_(self.mbox.next() is None) |
|
1807 |
|
1808 def test_nonempty_maildir_both(self): |
|
1809 self.createMessage("cur") |
|
1810 self.createMessage("new") |
|
1811 self.mbox = mailbox.Maildir(test_support.TESTFN) |
|
1812 #self.assert_(len(self.mbox.boxes) == 2) |
|
1813 self.assert_(self.mbox.next() is not None) |
|
1814 self.assert_(self.mbox.next() is not None) |
|
1815 self.assert_(self.mbox.next() is None) |
|
1816 self.assert_(self.mbox.next() is None) |
|
1817 |
|
1818 def test_unix_mbox(self): |
|
1819 ### should be better! |
|
1820 import email.parser |
|
1821 fname = self.createMessage("cur", True) |
|
1822 n = 0 |
|
1823 for msg in mailbox.PortableUnixMailbox(open(fname), |
|
1824 email.parser.Parser().parse): |
|
1825 n += 1 |
|
1826 self.assertEqual(msg["subject"], "Simple Test") |
|
1827 self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE)) |
|
1828 self.assertEqual(n, 1) |
|
1829 |
|
1830 ## End: classes from the original module (for backward compatibility). |
|
1831 |
|
1832 |
|
1833 _sample_message = """\ |
|
1834 Return-Path: <gkj@gregorykjohnson.com> |
|
1835 X-Original-To: gkj+person@localhost |
|
1836 Delivered-To: gkj+person@localhost |
|
1837 Received: from localhost (localhost [127.0.0.1]) |
|
1838 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 |
|
1839 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT) |
|
1840 Delivered-To: gkj@sundance.gregorykjohnson.com |
|
1841 Received: from localhost [127.0.0.1] |
|
1842 by localhost with POP3 (fetchmail-6.2.5) |
|
1843 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT) |
|
1844 Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) |
|
1845 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 |
|
1846 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) |
|
1847 Received: by andy.gregorykjohnson.com (Postfix, from userid 1000) |
|
1848 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) |
|
1849 Date: Wed, 13 Jul 2005 17:23:11 -0400 |
|
1850 From: "Gregory K. Johnson" <gkj@gregorykjohnson.com> |
|
1851 To: gkj@gregorykjohnson.com |
|
1852 Subject: Sample message |
|
1853 Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com> |
|
1854 Mime-Version: 1.0 |
|
1855 Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+" |
|
1856 Content-Disposition: inline |
|
1857 User-Agent: Mutt/1.5.9i |
|
1858 |
|
1859 |
|
1860 --NMuMz9nt05w80d4+ |
|
1861 Content-Type: text/plain; charset=us-ascii |
|
1862 Content-Disposition: inline |
|
1863 |
|
1864 This is a sample message. |
|
1865 |
|
1866 -- |
|
1867 Gregory K. Johnson |
|
1868 |
|
1869 --NMuMz9nt05w80d4+ |
|
1870 Content-Type: application/octet-stream |
|
1871 Content-Disposition: attachment; filename="text.gz" |
|
1872 Content-Transfer-Encoding: base64 |
|
1873 |
|
1874 H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs |
|
1875 3FYlAAAA |
|
1876 |
|
1877 --NMuMz9nt05w80d4+-- |
|
1878 """ |
|
1879 |
|
1880 _sample_headers = { |
|
1881 "Return-Path":"<gkj@gregorykjohnson.com>", |
|
1882 "X-Original-To":"gkj+person@localhost", |
|
1883 "Delivered-To":"gkj+person@localhost", |
|
1884 "Received":"""from localhost (localhost [127.0.0.1]) |
|
1885 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 |
|
1886 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", |
|
1887 "Delivered-To":"gkj@sundance.gregorykjohnson.com", |
|
1888 "Received":"""from localhost [127.0.0.1] |
|
1889 by localhost with POP3 (fetchmail-6.2.5) |
|
1890 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", |
|
1891 "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) |
|
1892 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 |
|
1893 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", |
|
1894 "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000) |
|
1895 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", |
|
1896 "Date":"Wed, 13 Jul 2005 17:23:11 -0400", |
|
1897 "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""", |
|
1898 "To":"gkj@gregorykjohnson.com", |
|
1899 "Subject":"Sample message", |
|
1900 "Mime-Version":"1.0", |
|
1901 "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""", |
|
1902 "Content-Disposition":"inline", |
|
1903 "User-Agent": "Mutt/1.5.9i" } |
|
1904 |
|
1905 _sample_payloads = ("""This is a sample message. |
|
1906 |
|
1907 -- |
|
1908 Gregory K. Johnson |
|
1909 """, |
|
1910 """H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs |
|
1911 3FYlAAAA |
|
1912 """) |
|
1913 |
|
1914 |
|
1915 def test_main(): |
|
1916 tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH, |
|
1917 TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage, |
|
1918 TestMHMessage, TestBabylMessage, TestMMDFMessage, |
|
1919 TestMessageConversion, TestProxyFile, TestPartialFile, |
|
1920 MaildirTestCase) |
|
1921 test_support.run_unittest(*tests) |
|
1922 test_support.reap_children() |
|
1923 |
|
1924 |
|
1925 if __name__ == '__main__': |
|
1926 test_main() |