|
1 r"""File-like objects that read from or write to a string buffer. |
|
2 |
|
3 This implements (nearly) all stdio methods. |
|
4 |
|
5 f = StringIO() # ready for writing |
|
6 f = StringIO(buf) # ready for reading |
|
7 f.close() # explicitly release resources held |
|
8 flag = f.isatty() # always false |
|
9 pos = f.tell() # get current position |
|
10 f.seek(pos) # set current position |
|
11 f.seek(pos, mode) # mode 0: absolute; 1: relative; 2: relative to EOF |
|
12 buf = f.read() # read until EOF |
|
13 buf = f.read(n) # read up to n bytes |
|
14 buf = f.readline() # read until end of line ('\n') or EOF |
|
15 list = f.readlines()# list of f.readline() results until EOF |
|
16 f.truncate([size]) # truncate file at to at most size (default: current pos) |
|
17 f.write(buf) # write at current position |
|
18 f.writelines(list) # for line in list: f.write(line) |
|
19 f.getvalue() # return whole file's contents as a string |
|
20 |
|
21 Notes: |
|
22 - Using a real file is often faster (but less convenient). |
|
23 - There's also a much faster implementation in C, called cStringIO, but |
|
24 it's not subclassable. |
|
25 - fileno() is left unimplemented so that code which uses it triggers |
|
26 an exception early. |
|
27 - Seeking far beyond EOF and then writing will insert real null |
|
28 bytes that occupy space in the buffer. |
|
29 - There's a simple test set (see end of this file). |
|
30 """ |
|
31 try: |
|
32 from errno import EINVAL |
|
33 except ImportError: |
|
34 EINVAL = 22 |
|
35 |
|
36 __all__ = ["StringIO"] |
|
37 |
|
38 def _complain_ifclosed(closed): |
|
39 if closed: |
|
40 raise ValueError, "I/O operation on closed file" |
|
41 |
|
42 class StringIO: |
|
43 """class StringIO([buffer]) |
|
44 |
|
45 When a StringIO object is created, it can be initialized to an existing |
|
46 string by passing the string to the constructor. If no string is given, |
|
47 the StringIO will start empty. |
|
48 |
|
49 The StringIO object can accept either Unicode or 8-bit strings, but |
|
50 mixing the two may take some care. If both are used, 8-bit strings that |
|
51 cannot be interpreted as 7-bit ASCII (that use the 8th bit) will cause |
|
52 a UnicodeError to be raised when getvalue() is called. |
|
53 """ |
|
54 def __init__(self, buf = ''): |
|
55 # Force self.buf to be a string or unicode |
|
56 if not isinstance(buf, basestring): |
|
57 buf = str(buf) |
|
58 self.buf = buf |
|
59 self.len = len(buf) |
|
60 self.buflist = [] |
|
61 self.pos = 0 |
|
62 self.closed = False |
|
63 self.softspace = 0 |
|
64 |
|
65 def __iter__(self): |
|
66 return self |
|
67 |
|
68 def next(self): |
|
69 """A file object is its own iterator, for example iter(f) returns f |
|
70 (unless f is closed). When a file is used as an iterator, typically |
|
71 in a for loop (for example, for line in f: print line), the next() |
|
72 method is called repeatedly. This method returns the next input line, |
|
73 or raises StopIteration when EOF is hit. |
|
74 """ |
|
75 _complain_ifclosed(self.closed) |
|
76 r = self.readline() |
|
77 if not r: |
|
78 raise StopIteration |
|
79 return r |
|
80 |
|
81 def close(self): |
|
82 """Free the memory buffer. |
|
83 """ |
|
84 if not self.closed: |
|
85 self.closed = True |
|
86 del self.buf, self.pos |
|
87 |
|
88 def isatty(self): |
|
89 """Returns False because StringIO objects are not connected to a |
|
90 tty-like device. |
|
91 """ |
|
92 _complain_ifclosed(self.closed) |
|
93 return False |
|
94 |
|
95 def seek(self, pos, mode = 0): |
|
96 """Set the file's current position. |
|
97 |
|
98 The mode argument is optional and defaults to 0 (absolute file |
|
99 positioning); other values are 1 (seek relative to the current |
|
100 position) and 2 (seek relative to the file's end). |
|
101 |
|
102 There is no return value. |
|
103 """ |
|
104 _complain_ifclosed(self.closed) |
|
105 if self.buflist: |
|
106 self.buf += ''.join(self.buflist) |
|
107 self.buflist = [] |
|
108 if mode == 1: |
|
109 pos += self.pos |
|
110 elif mode == 2: |
|
111 pos += self.len |
|
112 self.pos = max(0, pos) |
|
113 |
|
114 def tell(self): |
|
115 """Return the file's current position.""" |
|
116 _complain_ifclosed(self.closed) |
|
117 return self.pos |
|
118 |
|
119 def read(self, n = -1): |
|
120 """Read at most size bytes from the file |
|
121 (less if the read hits EOF before obtaining size bytes). |
|
122 |
|
123 If the size argument is negative or omitted, read all data until EOF |
|
124 is reached. The bytes are returned as a string object. An empty |
|
125 string is returned when EOF is encountered immediately. |
|
126 """ |
|
127 _complain_ifclosed(self.closed) |
|
128 if self.buflist: |
|
129 self.buf += ''.join(self.buflist) |
|
130 self.buflist = [] |
|
131 if n < 0: |
|
132 newpos = self.len |
|
133 else: |
|
134 newpos = min(self.pos+n, self.len) |
|
135 r = self.buf[self.pos:newpos] |
|
136 self.pos = newpos |
|
137 return r |
|
138 |
|
139 def readline(self, length=None): |
|
140 r"""Read one entire line from the file. |
|
141 |
|
142 A trailing newline character is kept in the string (but may be absent |
|
143 when a file ends with an incomplete line). If the size argument is |
|
144 present and non-negative, it is a maximum byte count (including the |
|
145 trailing newline) and an incomplete line may be returned. |
|
146 |
|
147 An empty string is returned only when EOF is encountered immediately. |
|
148 |
|
149 Note: Unlike stdio's fgets(), the returned string contains null |
|
150 characters ('\0') if they occurred in the input. |
|
151 """ |
|
152 _complain_ifclosed(self.closed) |
|
153 if self.buflist: |
|
154 self.buf += ''.join(self.buflist) |
|
155 self.buflist = [] |
|
156 i = self.buf.find('\n', self.pos) |
|
157 if i < 0: |
|
158 newpos = self.len |
|
159 else: |
|
160 newpos = i+1 |
|
161 if length is not None: |
|
162 if self.pos + length < newpos: |
|
163 newpos = self.pos + length |
|
164 r = self.buf[self.pos:newpos] |
|
165 self.pos = newpos |
|
166 return r |
|
167 |
|
168 def readlines(self, sizehint = 0): |
|
169 """Read until EOF using readline() and return a list containing the |
|
170 lines thus read. |
|
171 |
|
172 If the optional sizehint argument is present, instead of reading up |
|
173 to EOF, whole lines totalling approximately sizehint bytes (or more |
|
174 to accommodate a final whole line). |
|
175 """ |
|
176 total = 0 |
|
177 lines = [] |
|
178 line = self.readline() |
|
179 while line: |
|
180 lines.append(line) |
|
181 total += len(line) |
|
182 if 0 < sizehint <= total: |
|
183 break |
|
184 line = self.readline() |
|
185 return lines |
|
186 |
|
187 def truncate(self, size=None): |
|
188 """Truncate the file's size. |
|
189 |
|
190 If the optional size argument is present, the file is truncated to |
|
191 (at most) that size. The size defaults to the current position. |
|
192 The current file position is not changed unless the position |
|
193 is beyond the new file size. |
|
194 |
|
195 If the specified size exceeds the file's current size, the |
|
196 file remains unchanged. |
|
197 """ |
|
198 _complain_ifclosed(self.closed) |
|
199 if size is None: |
|
200 size = self.pos |
|
201 elif size < 0: |
|
202 raise IOError(EINVAL, "Negative size not allowed") |
|
203 elif size < self.pos: |
|
204 self.pos = size |
|
205 self.buf = self.getvalue()[:size] |
|
206 self.len = size |
|
207 |
|
208 def write(self, s): |
|
209 """Write a string to the file. |
|
210 |
|
211 There is no return value. |
|
212 """ |
|
213 _complain_ifclosed(self.closed) |
|
214 if not s: return |
|
215 # Force s to be a string or unicode |
|
216 if not isinstance(s, basestring): |
|
217 s = str(s) |
|
218 spos = self.pos |
|
219 slen = self.len |
|
220 if spos == slen: |
|
221 self.buflist.append(s) |
|
222 self.len = self.pos = spos + len(s) |
|
223 return |
|
224 if spos > slen: |
|
225 self.buflist.append('\0'*(spos - slen)) |
|
226 slen = spos |
|
227 newpos = spos + len(s) |
|
228 if spos < slen: |
|
229 if self.buflist: |
|
230 self.buf += ''.join(self.buflist) |
|
231 self.buflist = [self.buf[:spos], s, self.buf[newpos:]] |
|
232 self.buf = '' |
|
233 if newpos > slen: |
|
234 slen = newpos |
|
235 else: |
|
236 self.buflist.append(s) |
|
237 slen = newpos |
|
238 self.len = slen |
|
239 self.pos = newpos |
|
240 |
|
241 def writelines(self, iterable): |
|
242 """Write a sequence of strings to the file. The sequence can be any |
|
243 iterable object producing strings, typically a list of strings. There |
|
244 is no return value. |
|
245 |
|
246 (The name is intended to match readlines(); writelines() does not add |
|
247 line separators.) |
|
248 """ |
|
249 write = self.write |
|
250 for line in iterable: |
|
251 write(line) |
|
252 |
|
253 def flush(self): |
|
254 """Flush the internal buffer |
|
255 """ |
|
256 _complain_ifclosed(self.closed) |
|
257 |
|
258 def getvalue(self): |
|
259 """ |
|
260 Retrieve the entire contents of the "file" at any time before |
|
261 the StringIO object's close() method is called. |
|
262 |
|
263 The StringIO object can accept either Unicode or 8-bit strings, |
|
264 but mixing the two may take some care. If both are used, 8-bit |
|
265 strings that cannot be interpreted as 7-bit ASCII (that use the |
|
266 8th bit) will cause a UnicodeError to be raised when getvalue() |
|
267 is called. |
|
268 """ |
|
269 if self.buflist: |
|
270 self.buf += ''.join(self.buflist) |
|
271 self.buflist = [] |
|
272 return self.buf |
|
273 |
|
274 |
|
275 # A little test suite |
|
276 |
|
277 def test(): |
|
278 import sys |
|
279 if sys.argv[1:]: |
|
280 file = sys.argv[1] |
|
281 else: |
|
282 file = '/etc/passwd' |
|
283 lines = open(file, 'r').readlines() |
|
284 text = open(file, 'r').read() |
|
285 f = StringIO() |
|
286 for line in lines[:-2]: |
|
287 f.write(line) |
|
288 f.writelines(lines[-2:]) |
|
289 if f.getvalue() != text: |
|
290 raise RuntimeError, 'write failed' |
|
291 length = f.tell() |
|
292 print 'File length =', length |
|
293 f.seek(len(lines[0])) |
|
294 f.write(lines[1]) |
|
295 f.seek(0) |
|
296 print 'First line =', repr(f.readline()) |
|
297 print 'Position =', f.tell() |
|
298 line = f.readline() |
|
299 print 'Second line =', repr(line) |
|
300 f.seek(-len(line), 1) |
|
301 line2 = f.read(len(line)) |
|
302 if line != line2: |
|
303 raise RuntimeError, 'bad result after seek back' |
|
304 f.seek(len(line2), 1) |
|
305 list = f.readlines() |
|
306 line = list[-1] |
|
307 f.seek(f.tell() - len(line)) |
|
308 line2 = f.read() |
|
309 if line != line2: |
|
310 raise RuntimeError, 'bad result after seek back from EOF' |
|
311 print 'Read', len(list), 'more lines' |
|
312 print 'File length =', f.tell() |
|
313 if f.tell() != length: |
|
314 raise RuntimeError, 'bad length' |
|
315 f.truncate(length/2) |
|
316 f.seek(0, 2) |
|
317 print 'Truncated length =', f.tell() |
|
318 if f.tell() != length/2: |
|
319 raise RuntimeError, 'truncate did not adjust length' |
|
320 f.close() |
|
321 |
|
322 if __name__ == '__main__': |
|
323 test() |