|
1 # Copyright (C) 2002-2006 Python Software Foundation |
|
2 # Author: Ben Gertzfield, Barry Warsaw |
|
3 # Contact: email-sig@python.org |
|
4 |
|
5 """Header encoding and decoding functionality.""" |
|
6 |
|
7 __all__ = [ |
|
8 'Header', |
|
9 'decode_header', |
|
10 'make_header', |
|
11 ] |
|
12 |
|
13 import re |
|
14 import binascii |
|
15 |
|
16 import email.quoprimime |
|
17 import email.base64mime |
|
18 |
|
19 from email.errors import HeaderParseError |
|
20 from email.charset import Charset |
|
21 |
|
22 NL = '\n' |
|
23 SPACE = ' ' |
|
24 USPACE = u' ' |
|
25 SPACE8 = ' ' * 8 |
|
26 UEMPTYSTRING = u'' |
|
27 |
|
28 MAXLINELEN = 76 |
|
29 |
|
30 USASCII = Charset('us-ascii') |
|
31 UTF8 = Charset('utf-8') |
|
32 |
|
33 # Match encoded-word strings in the form =?charset?q?Hello_World?= |
|
34 ecre = re.compile(r''' |
|
35 =\? # literal =? |
|
36 (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset |
|
37 \? # literal ? |
|
38 (?P<encoding>[qb]) # either a "q" or a "b", case insensitive |
|
39 \? # literal ? |
|
40 (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string |
|
41 \?= # literal ?= |
|
42 (?=[ \t]|$) # whitespace or the end of the string |
|
43 ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE) |
|
44 |
|
45 # Field name regexp, including trailing colon, but not separating whitespace, |
|
46 # according to RFC 2822. Character range is from tilde to exclamation mark. |
|
47 # For use with .match() |
|
48 fcre = re.compile(r'[\041-\176]+:$') |
|
49 |
|
50 |
|
51 |
|
52 # Helpers |
|
53 _max_append = email.quoprimime._max_append |
|
54 |
|
55 |
|
56 |
|
57 def decode_header(header): |
|
58 """Decode a message header value without converting charset. |
|
59 |
|
60 Returns a list of (decoded_string, charset) pairs containing each of the |
|
61 decoded parts of the header. Charset is None for non-encoded parts of the |
|
62 header, otherwise a lower-case string containing the name of the character |
|
63 set specified in the encoded string. |
|
64 |
|
65 An email.Errors.HeaderParseError may be raised when certain decoding error |
|
66 occurs (e.g. a base64 decoding exception). |
|
67 """ |
|
68 # If no encoding, just return the header |
|
69 header = str(header) |
|
70 if not ecre.search(header): |
|
71 return [(header, None)] |
|
72 decoded = [] |
|
73 dec = '' |
|
74 for line in header.splitlines(): |
|
75 # This line might not have an encoding in it |
|
76 if not ecre.search(line): |
|
77 decoded.append((line, None)) |
|
78 continue |
|
79 parts = ecre.split(line) |
|
80 while parts: |
|
81 unenc = parts.pop(0).strip() |
|
82 if unenc: |
|
83 # Should we continue a long line? |
|
84 if decoded and decoded[-1][1] is None: |
|
85 decoded[-1] = (decoded[-1][0] + SPACE + unenc, None) |
|
86 else: |
|
87 decoded.append((unenc, None)) |
|
88 if parts: |
|
89 charset, encoding = [s.lower() for s in parts[0:2]] |
|
90 encoded = parts[2] |
|
91 dec = None |
|
92 if encoding == 'q': |
|
93 dec = email.quoprimime.header_decode(encoded) |
|
94 elif encoding == 'b': |
|
95 try: |
|
96 dec = email.base64mime.decode(encoded) |
|
97 except binascii.Error: |
|
98 # Turn this into a higher level exception. BAW: Right |
|
99 # now we throw the lower level exception away but |
|
100 # when/if we get exception chaining, we'll preserve it. |
|
101 raise HeaderParseError |
|
102 if dec is None: |
|
103 dec = encoded |
|
104 |
|
105 if decoded and decoded[-1][1] == charset: |
|
106 decoded[-1] = (decoded[-1][0] + dec, decoded[-1][1]) |
|
107 else: |
|
108 decoded.append((dec, charset)) |
|
109 del parts[0:3] |
|
110 return decoded |
|
111 |
|
112 |
|
113 |
|
114 def make_header(decoded_seq, maxlinelen=None, header_name=None, |
|
115 continuation_ws=' '): |
|
116 """Create a Header from a sequence of pairs as returned by decode_header() |
|
117 |
|
118 decode_header() takes a header value string and returns a sequence of |
|
119 pairs of the format (decoded_string, charset) where charset is the string |
|
120 name of the character set. |
|
121 |
|
122 This function takes one of those sequence of pairs and returns a Header |
|
123 instance. Optional maxlinelen, header_name, and continuation_ws are as in |
|
124 the Header constructor. |
|
125 """ |
|
126 h = Header(maxlinelen=maxlinelen, header_name=header_name, |
|
127 continuation_ws=continuation_ws) |
|
128 for s, charset in decoded_seq: |
|
129 # None means us-ascii but we can simply pass it on to h.append() |
|
130 if charset is not None and not isinstance(charset, Charset): |
|
131 charset = Charset(charset) |
|
132 h.append(s, charset) |
|
133 return h |
|
134 |
|
135 |
|
136 |
|
137 class Header: |
|
138 def __init__(self, s=None, charset=None, |
|
139 maxlinelen=None, header_name=None, |
|
140 continuation_ws=' ', errors='strict'): |
|
141 """Create a MIME-compliant header that can contain many character sets. |
|
142 |
|
143 Optional s is the initial header value. If None, the initial header |
|
144 value is not set. You can later append to the header with .append() |
|
145 method calls. s may be a byte string or a Unicode string, but see the |
|
146 .append() documentation for semantics. |
|
147 |
|
148 Optional charset serves two purposes: it has the same meaning as the |
|
149 charset argument to the .append() method. It also sets the default |
|
150 character set for all subsequent .append() calls that omit the charset |
|
151 argument. If charset is not provided in the constructor, the us-ascii |
|
152 charset is used both as s's initial charset and as the default for |
|
153 subsequent .append() calls. |
|
154 |
|
155 The maximum line length can be specified explicit via maxlinelen. For |
|
156 splitting the first line to a shorter value (to account for the field |
|
157 header which isn't included in s, e.g. `Subject') pass in the name of |
|
158 the field in header_name. The default maxlinelen is 76. |
|
159 |
|
160 continuation_ws must be RFC 2822 compliant folding whitespace (usually |
|
161 either a space or a hard tab) which will be prepended to continuation |
|
162 lines. |
|
163 |
|
164 errors is passed through to the .append() call. |
|
165 """ |
|
166 if charset is None: |
|
167 charset = USASCII |
|
168 if not isinstance(charset, Charset): |
|
169 charset = Charset(charset) |
|
170 self._charset = charset |
|
171 self._continuation_ws = continuation_ws |
|
172 cws_expanded_len = len(continuation_ws.replace('\t', SPACE8)) |
|
173 # BAW: I believe `chunks' and `maxlinelen' should be non-public. |
|
174 self._chunks = [] |
|
175 if s is not None: |
|
176 self.append(s, charset, errors) |
|
177 if maxlinelen is None: |
|
178 maxlinelen = MAXLINELEN |
|
179 if header_name is None: |
|
180 # We don't know anything about the field header so the first line |
|
181 # is the same length as subsequent lines. |
|
182 self._firstlinelen = maxlinelen |
|
183 else: |
|
184 # The first line should be shorter to take into account the field |
|
185 # header. Also subtract off 2 extra for the colon and space. |
|
186 self._firstlinelen = maxlinelen - len(header_name) - 2 |
|
187 # Second and subsequent lines should subtract off the length in |
|
188 # columns of the continuation whitespace prefix. |
|
189 self._maxlinelen = maxlinelen - cws_expanded_len |
|
190 |
|
191 def __str__(self): |
|
192 """A synonym for self.encode().""" |
|
193 return self.encode() |
|
194 |
|
195 def __unicode__(self): |
|
196 """Helper for the built-in unicode function.""" |
|
197 uchunks = [] |
|
198 lastcs = None |
|
199 for s, charset in self._chunks: |
|
200 # We must preserve spaces between encoded and non-encoded word |
|
201 # boundaries, which means for us we need to add a space when we go |
|
202 # from a charset to None/us-ascii, or from None/us-ascii to a |
|
203 # charset. Only do this for the second and subsequent chunks. |
|
204 nextcs = charset |
|
205 if uchunks: |
|
206 if lastcs not in (None, 'us-ascii'): |
|
207 if nextcs in (None, 'us-ascii'): |
|
208 uchunks.append(USPACE) |
|
209 nextcs = None |
|
210 elif nextcs not in (None, 'us-ascii'): |
|
211 uchunks.append(USPACE) |
|
212 lastcs = nextcs |
|
213 uchunks.append(unicode(s, str(charset))) |
|
214 return UEMPTYSTRING.join(uchunks) |
|
215 |
|
216 # Rich comparison operators for equality only. BAW: does it make sense to |
|
217 # have or explicitly disable <, <=, >, >= operators? |
|
218 def __eq__(self, other): |
|
219 # other may be a Header or a string. Both are fine so coerce |
|
220 # ourselves to a string, swap the args and do another comparison. |
|
221 return other == self.encode() |
|
222 |
|
223 def __ne__(self, other): |
|
224 return not self == other |
|
225 |
|
226 def append(self, s, charset=None, errors='strict'): |
|
227 """Append a string to the MIME header. |
|
228 |
|
229 Optional charset, if given, should be a Charset instance or the name |
|
230 of a character set (which will be converted to a Charset instance). A |
|
231 value of None (the default) means that the charset given in the |
|
232 constructor is used. |
|
233 |
|
234 s may be a byte string or a Unicode string. If it is a byte string |
|
235 (i.e. isinstance(s, str) is true), then charset is the encoding of |
|
236 that byte string, and a UnicodeError will be raised if the string |
|
237 cannot be decoded with that charset. If s is a Unicode string, then |
|
238 charset is a hint specifying the character set of the characters in |
|
239 the string. In this case, when producing an RFC 2822 compliant header |
|
240 using RFC 2047 rules, the Unicode string will be encoded using the |
|
241 following charsets in order: us-ascii, the charset hint, utf-8. The |
|
242 first character set not to provoke a UnicodeError is used. |
|
243 |
|
244 Optional `errors' is passed as the third argument to any unicode() or |
|
245 ustr.encode() call. |
|
246 """ |
|
247 if charset is None: |
|
248 charset = self._charset |
|
249 elif not isinstance(charset, Charset): |
|
250 charset = Charset(charset) |
|
251 # If the charset is our faux 8bit charset, leave the string unchanged |
|
252 if charset != '8bit': |
|
253 # We need to test that the string can be converted to unicode and |
|
254 # back to a byte string, given the input and output codecs of the |
|
255 # charset. |
|
256 if isinstance(s, str): |
|
257 # Possibly raise UnicodeError if the byte string can't be |
|
258 # converted to a unicode with the input codec of the charset. |
|
259 incodec = charset.input_codec or 'us-ascii' |
|
260 ustr = unicode(s, incodec, errors) |
|
261 # Now make sure that the unicode could be converted back to a |
|
262 # byte string with the output codec, which may be different |
|
263 # than the iput coded. Still, use the original byte string. |
|
264 outcodec = charset.output_codec or 'us-ascii' |
|
265 ustr.encode(outcodec, errors) |
|
266 elif isinstance(s, unicode): |
|
267 # Now we have to be sure the unicode string can be converted |
|
268 # to a byte string with a reasonable output codec. We want to |
|
269 # use the byte string in the chunk. |
|
270 for charset in USASCII, charset, UTF8: |
|
271 try: |
|
272 outcodec = charset.output_codec or 'us-ascii' |
|
273 s = s.encode(outcodec, errors) |
|
274 break |
|
275 except UnicodeError: |
|
276 pass |
|
277 else: |
|
278 assert False, 'utf-8 conversion failed' |
|
279 self._chunks.append((s, charset)) |
|
280 |
|
281 def _split(self, s, charset, maxlinelen, splitchars): |
|
282 # Split up a header safely for use with encode_chunks. |
|
283 splittable = charset.to_splittable(s) |
|
284 encoded = charset.from_splittable(splittable, True) |
|
285 elen = charset.encoded_header_len(encoded) |
|
286 # If the line's encoded length first, just return it |
|
287 if elen <= maxlinelen: |
|
288 return [(encoded, charset)] |
|
289 # If we have undetermined raw 8bit characters sitting in a byte |
|
290 # string, we really don't know what the right thing to do is. We |
|
291 # can't really split it because it might be multibyte data which we |
|
292 # could break if we split it between pairs. The least harm seems to |
|
293 # be to not split the header at all, but that means they could go out |
|
294 # longer than maxlinelen. |
|
295 if charset == '8bit': |
|
296 return [(s, charset)] |
|
297 # BAW: I'm not sure what the right test here is. What we're trying to |
|
298 # do is be faithful to RFC 2822's recommendation that ($2.2.3): |
|
299 # |
|
300 # "Note: Though structured field bodies are defined in such a way that |
|
301 # folding can take place between many of the lexical tokens (and even |
|
302 # within some of the lexical tokens), folding SHOULD be limited to |
|
303 # placing the CRLF at higher-level syntactic breaks." |
|
304 # |
|
305 # For now, I can only imagine doing this when the charset is us-ascii, |
|
306 # although it's possible that other charsets may also benefit from the |
|
307 # higher-level syntactic breaks. |
|
308 elif charset == 'us-ascii': |
|
309 return self._split_ascii(s, charset, maxlinelen, splitchars) |
|
310 # BAW: should we use encoded? |
|
311 elif elen == len(s): |
|
312 # We can split on _maxlinelen boundaries because we know that the |
|
313 # encoding won't change the size of the string |
|
314 splitpnt = maxlinelen |
|
315 first = charset.from_splittable(splittable[:splitpnt], False) |
|
316 last = charset.from_splittable(splittable[splitpnt:], False) |
|
317 else: |
|
318 # Binary search for split point |
|
319 first, last = _binsplit(splittable, charset, maxlinelen) |
|
320 # first is of the proper length so just wrap it in the appropriate |
|
321 # chrome. last must be recursively split. |
|
322 fsplittable = charset.to_splittable(first) |
|
323 fencoded = charset.from_splittable(fsplittable, True) |
|
324 chunk = [(fencoded, charset)] |
|
325 return chunk + self._split(last, charset, self._maxlinelen, splitchars) |
|
326 |
|
327 def _split_ascii(self, s, charset, firstlen, splitchars): |
|
328 chunks = _split_ascii(s, firstlen, self._maxlinelen, |
|
329 self._continuation_ws, splitchars) |
|
330 return zip(chunks, [charset]*len(chunks)) |
|
331 |
|
332 def _encode_chunks(self, newchunks, maxlinelen): |
|
333 # MIME-encode a header with many different charsets and/or encodings. |
|
334 # |
|
335 # Given a list of pairs (string, charset), return a MIME-encoded |
|
336 # string suitable for use in a header field. Each pair may have |
|
337 # different charsets and/or encodings, and the resulting header will |
|
338 # accurately reflect each setting. |
|
339 # |
|
340 # Each encoding can be email.Utils.QP (quoted-printable, for |
|
341 # ASCII-like character sets like iso-8859-1), email.Utils.BASE64 |
|
342 # (Base64, for non-ASCII like character sets like KOI8-R and |
|
343 # iso-2022-jp), or None (no encoding). |
|
344 # |
|
345 # Each pair will be represented on a separate line; the resulting |
|
346 # string will be in the format: |
|
347 # |
|
348 # =?charset1?q?Mar=EDa_Gonz=E1lez_Alonso?=\n |
|
349 # =?charset2?b?SvxyZ2VuIEL2aW5n?=" |
|
350 chunks = [] |
|
351 for header, charset in newchunks: |
|
352 if not header: |
|
353 continue |
|
354 if charset is None or charset.header_encoding is None: |
|
355 s = header |
|
356 else: |
|
357 s = charset.header_encode(header) |
|
358 # Don't add more folding whitespace than necessary |
|
359 if chunks and chunks[-1].endswith(' '): |
|
360 extra = '' |
|
361 else: |
|
362 extra = ' ' |
|
363 _max_append(chunks, s, maxlinelen, extra) |
|
364 joiner = NL + self._continuation_ws |
|
365 return joiner.join(chunks) |
|
366 |
|
367 def encode(self, splitchars=';, '): |
|
368 """Encode a message header into an RFC-compliant format. |
|
369 |
|
370 There are many issues involved in converting a given string for use in |
|
371 an email header. Only certain character sets are readable in most |
|
372 email clients, and as header strings can only contain a subset of |
|
373 7-bit ASCII, care must be taken to properly convert and encode (with |
|
374 Base64 or quoted-printable) header strings. In addition, there is a |
|
375 75-character length limit on any given encoded header field, so |
|
376 line-wrapping must be performed, even with double-byte character sets. |
|
377 |
|
378 This method will do its best to convert the string to the correct |
|
379 character set used in email, and encode and line wrap it safely with |
|
380 the appropriate scheme for that character set. |
|
381 |
|
382 If the given charset is not known or an error occurs during |
|
383 conversion, this function will return the header untouched. |
|
384 |
|
385 Optional splitchars is a string containing characters to split long |
|
386 ASCII lines on, in rough support of RFC 2822's `highest level |
|
387 syntactic breaks'. This doesn't affect RFC 2047 encoded lines. |
|
388 """ |
|
389 newchunks = [] |
|
390 maxlinelen = self._firstlinelen |
|
391 lastlen = 0 |
|
392 for s, charset in self._chunks: |
|
393 # The first bit of the next chunk should be just long enough to |
|
394 # fill the next line. Don't forget the space separating the |
|
395 # encoded words. |
|
396 targetlen = maxlinelen - lastlen - 1 |
|
397 if targetlen < charset.encoded_header_len(''): |
|
398 # Stick it on the next line |
|
399 targetlen = maxlinelen |
|
400 newchunks += self._split(s, charset, targetlen, splitchars) |
|
401 lastchunk, lastcharset = newchunks[-1] |
|
402 lastlen = lastcharset.encoded_header_len(lastchunk) |
|
403 return self._encode_chunks(newchunks, maxlinelen) |
|
404 |
|
405 |
|
406 |
|
407 def _split_ascii(s, firstlen, restlen, continuation_ws, splitchars): |
|
408 lines = [] |
|
409 maxlen = firstlen |
|
410 for line in s.splitlines(): |
|
411 # Ignore any leading whitespace (i.e. continuation whitespace) already |
|
412 # on the line, since we'll be adding our own. |
|
413 line = line.lstrip() |
|
414 if len(line) < maxlen: |
|
415 lines.append(line) |
|
416 maxlen = restlen |
|
417 continue |
|
418 # Attempt to split the line at the highest-level syntactic break |
|
419 # possible. Note that we don't have a lot of smarts about field |
|
420 # syntax; we just try to break on semi-colons, then commas, then |
|
421 # whitespace. |
|
422 for ch in splitchars: |
|
423 if ch in line: |
|
424 break |
|
425 else: |
|
426 # There's nothing useful to split the line on, not even spaces, so |
|
427 # just append this line unchanged |
|
428 lines.append(line) |
|
429 maxlen = restlen |
|
430 continue |
|
431 # Now split the line on the character plus trailing whitespace |
|
432 cre = re.compile(r'%s\s*' % ch) |
|
433 if ch in ';,': |
|
434 eol = ch |
|
435 else: |
|
436 eol = '' |
|
437 joiner = eol + ' ' |
|
438 joinlen = len(joiner) |
|
439 wslen = len(continuation_ws.replace('\t', SPACE8)) |
|
440 this = [] |
|
441 linelen = 0 |
|
442 for part in cre.split(line): |
|
443 curlen = linelen + max(0, len(this)-1) * joinlen |
|
444 partlen = len(part) |
|
445 onfirstline = not lines |
|
446 # We don't want to split after the field name, if we're on the |
|
447 # first line and the field name is present in the header string. |
|
448 if ch == ' ' and onfirstline and \ |
|
449 len(this) == 1 and fcre.match(this[0]): |
|
450 this.append(part) |
|
451 linelen += partlen |
|
452 elif curlen + partlen > maxlen: |
|
453 if this: |
|
454 lines.append(joiner.join(this) + eol) |
|
455 # If this part is longer than maxlen and we aren't already |
|
456 # splitting on whitespace, try to recursively split this line |
|
457 # on whitespace. |
|
458 if partlen > maxlen and ch != ' ': |
|
459 subl = _split_ascii(part, maxlen, restlen, |
|
460 continuation_ws, ' ') |
|
461 lines.extend(subl[:-1]) |
|
462 this = [subl[-1]] |
|
463 else: |
|
464 this = [part] |
|
465 linelen = wslen + len(this[-1]) |
|
466 maxlen = restlen |
|
467 else: |
|
468 this.append(part) |
|
469 linelen += partlen |
|
470 # Put any left over parts on a line by themselves |
|
471 if this: |
|
472 lines.append(joiner.join(this)) |
|
473 return lines |
|
474 |
|
475 |
|
476 |
|
477 def _binsplit(splittable, charset, maxlinelen): |
|
478 i = 0 |
|
479 j = len(splittable) |
|
480 while i < j: |
|
481 # Invariants: |
|
482 # 1. splittable[:k] fits for all k <= i (note that we *assume*, |
|
483 # at the start, that splittable[:0] fits). |
|
484 # 2. splittable[:k] does not fit for any k > j (at the start, |
|
485 # this means we shouldn't look at any k > len(splittable)). |
|
486 # 3. We don't know about splittable[:k] for k in i+1..j. |
|
487 # 4. We want to set i to the largest k that fits, with i <= k <= j. |
|
488 # |
|
489 m = (i+j+1) >> 1 # ceiling((i+j)/2); i < m <= j |
|
490 chunk = charset.from_splittable(splittable[:m], True) |
|
491 chunklen = charset.encoded_header_len(chunk) |
|
492 if chunklen <= maxlinelen: |
|
493 # m is acceptable, so is a new lower bound. |
|
494 i = m |
|
495 else: |
|
496 # m is not acceptable, so final i must be < m. |
|
497 j = m - 1 |
|
498 # i == j. Invariant #1 implies that splittable[:i] fits, and |
|
499 # invariant #2 implies that splittable[:i+1] does not fit, so i |
|
500 # is what we're looking for. |
|
501 first = charset.from_splittable(splittable[:i], False) |
|
502 last = charset.from_splittable(splittable[i:], False) |
|
503 return first, last |