|
1 """ Python 'utf-8-sig' Codec |
|
2 This work similar to UTF-8 with the following changes: |
|
3 |
|
4 * On encoding/writing a UTF-8 encoded BOM will be prepended/written as the |
|
5 first three bytes. |
|
6 |
|
7 * On decoding/reading if the first three bytes are a UTF-8 encoded BOM, these |
|
8 bytes will be skipped. |
|
9 """ |
|
10 import codecs |
|
11 |
|
12 ### Codec APIs |
|
13 |
|
14 def encode(input, errors='strict'): |
|
15 return (codecs.BOM_UTF8 + codecs.utf_8_encode(input, errors)[0], len(input)) |
|
16 |
|
17 def decode(input, errors='strict'): |
|
18 prefix = 0 |
|
19 if input[:3] == codecs.BOM_UTF8: |
|
20 input = input[3:] |
|
21 prefix = 3 |
|
22 (output, consumed) = codecs.utf_8_decode(input, errors, True) |
|
23 return (output, consumed+prefix) |
|
24 |
|
25 class IncrementalEncoder(codecs.IncrementalEncoder): |
|
26 def __init__(self, errors='strict'): |
|
27 codecs.IncrementalEncoder.__init__(self, errors) |
|
28 self.first = True |
|
29 |
|
30 def encode(self, input, final=False): |
|
31 if self.first: |
|
32 self.first = False |
|
33 return codecs.BOM_UTF8 + codecs.utf_8_encode(input, self.errors)[0] |
|
34 else: |
|
35 return codecs.utf_8_encode(input, self.errors)[0] |
|
36 |
|
37 def reset(self): |
|
38 codecs.IncrementalEncoder.reset(self) |
|
39 self.first = True |
|
40 |
|
41 class IncrementalDecoder(codecs.BufferedIncrementalDecoder): |
|
42 def __init__(self, errors='strict'): |
|
43 codecs.BufferedIncrementalDecoder.__init__(self, errors) |
|
44 self.first = True |
|
45 |
|
46 def _buffer_decode(self, input, errors, final): |
|
47 if self.first: |
|
48 if len(input) < 3: |
|
49 if codecs.BOM_UTF8.startswith(input): |
|
50 # not enough data to decide if this really is a BOM |
|
51 # => try again on the next call |
|
52 return (u"", 0) |
|
53 else: |
|
54 self.first = None |
|
55 else: |
|
56 self.first = None |
|
57 if input[:3] == codecs.BOM_UTF8: |
|
58 (output, consumed) = codecs.utf_8_decode(input[3:], errors, final) |
|
59 return (output, consumed+3) |
|
60 return codecs.utf_8_decode(input, errors, final) |
|
61 |
|
62 def reset(self): |
|
63 codecs.BufferedIncrementalDecoder.reset(self) |
|
64 self.first = True |
|
65 |
|
66 class StreamWriter(codecs.StreamWriter): |
|
67 def reset(self): |
|
68 codecs.StreamWriter.reset(self) |
|
69 try: |
|
70 del self.encode |
|
71 except AttributeError: |
|
72 pass |
|
73 |
|
74 def encode(self, input, errors='strict'): |
|
75 self.encode = codecs.utf_8_encode |
|
76 return encode(input, errors) |
|
77 |
|
78 class StreamReader(codecs.StreamReader): |
|
79 def reset(self): |
|
80 codecs.StreamReader.reset(self) |
|
81 try: |
|
82 del self.decode |
|
83 except AttributeError: |
|
84 pass |
|
85 |
|
86 def decode(self, input, errors='strict'): |
|
87 if len(input) < 3: |
|
88 if codecs.BOM_UTF8.startswith(input): |
|
89 # not enough data to decide if this is a BOM |
|
90 # => try again on the next call |
|
91 return (u"", 0) |
|
92 elif input[:3] == codecs.BOM_UTF8: |
|
93 self.decode = codecs.utf_8_decode |
|
94 (output, consumed) = codecs.utf_8_decode(input[3:],errors) |
|
95 return (output, consumed+3) |
|
96 # (else) no BOM present |
|
97 self.decode = codecs.utf_8_decode |
|
98 return codecs.utf_8_decode(input, errors) |
|
99 |
|
100 ### encodings module API |
|
101 |
|
102 def getregentry(): |
|
103 return codecs.CodecInfo( |
|
104 name='utf-8-sig', |
|
105 encode=encode, |
|
106 decode=decode, |
|
107 incrementalencoder=IncrementalEncoder, |
|
108 incrementaldecoder=IncrementalDecoder, |
|
109 streamreader=StreamReader, |
|
110 streamwriter=StreamWriter, |
|
111 ) |