|
1 from test.test_support import verify, vereq, TESTFN |
|
2 import mmap |
|
3 import os, re |
|
4 |
|
5 PAGESIZE = mmap.PAGESIZE |
|
6 |
|
7 def test_both(): |
|
8 "Test mmap module on Unix systems and Windows" |
|
9 |
|
10 # Create a file to be mmap'ed. |
|
11 if os.path.exists(TESTFN): |
|
12 os.unlink(TESTFN) |
|
13 f = open(TESTFN, 'w+') |
|
14 |
|
15 try: # unlink TESTFN no matter what |
|
16 # Write 2 pages worth of data to the file |
|
17 f.write('\0'* PAGESIZE) |
|
18 f.write('foo') |
|
19 f.write('\0'* (PAGESIZE-3) ) |
|
20 f.flush() |
|
21 m = mmap.mmap(f.fileno(), 2 * PAGESIZE) |
|
22 f.close() |
|
23 |
|
24 # Simple sanity checks |
|
25 |
|
26 print type(m) # SF bug 128713: segfaulted on Linux |
|
27 print ' Position of foo:', m.find('foo') / float(PAGESIZE), 'pages' |
|
28 vereq(m.find('foo'), PAGESIZE) |
|
29 |
|
30 print ' Length of file:', len(m) / float(PAGESIZE), 'pages' |
|
31 vereq(len(m), 2*PAGESIZE) |
|
32 |
|
33 print ' Contents of byte 0:', repr(m[0]) |
|
34 vereq(m[0], '\0') |
|
35 print ' Contents of first 3 bytes:', repr(m[0:3]) |
|
36 vereq(m[0:3], '\0\0\0') |
|
37 |
|
38 # Modify the file's content |
|
39 print "\n Modifying file's content..." |
|
40 m[0] = '3' |
|
41 m[PAGESIZE +3: PAGESIZE +3+3] = 'bar' |
|
42 |
|
43 # Check that the modification worked |
|
44 print ' Contents of byte 0:', repr(m[0]) |
|
45 vereq(m[0], '3') |
|
46 print ' Contents of first 3 bytes:', repr(m[0:3]) |
|
47 vereq(m[0:3], '3\0\0') |
|
48 print ' Contents of second page:', repr(m[PAGESIZE-1 : PAGESIZE + 7]) |
|
49 vereq(m[PAGESIZE-1 : PAGESIZE + 7], '\0foobar\0') |
|
50 |
|
51 m.flush() |
|
52 |
|
53 # Test doing a regular expression match in an mmap'ed file |
|
54 match = re.search('[A-Za-z]+', m) |
|
55 if match is None: |
|
56 print ' ERROR: regex match on mmap failed!' |
|
57 else: |
|
58 start, end = match.span(0) |
|
59 length = end - start |
|
60 |
|
61 print ' Regex match on mmap (page start, length of match):', |
|
62 print start / float(PAGESIZE), length |
|
63 |
|
64 vereq(start, PAGESIZE) |
|
65 vereq(end, PAGESIZE + 6) |
|
66 |
|
67 # test seeking around (try to overflow the seek implementation) |
|
68 m.seek(0,0) |
|
69 print ' Seek to zeroth byte' |
|
70 vereq(m.tell(), 0) |
|
71 m.seek(42,1) |
|
72 print ' Seek to 42nd byte' |
|
73 vereq(m.tell(), 42) |
|
74 m.seek(0,2) |
|
75 print ' Seek to last byte' |
|
76 vereq(m.tell(), len(m)) |
|
77 |
|
78 print ' Try to seek to negative position...' |
|
79 try: |
|
80 m.seek(-1) |
|
81 except ValueError: |
|
82 pass |
|
83 else: |
|
84 verify(0, 'expected a ValueError but did not get it') |
|
85 |
|
86 print ' Try to seek beyond end of mmap...' |
|
87 try: |
|
88 m.seek(1,2) |
|
89 except ValueError: |
|
90 pass |
|
91 else: |
|
92 verify(0, 'expected a ValueError but did not get it') |
|
93 |
|
94 print ' Try to seek to negative position...' |
|
95 try: |
|
96 m.seek(-len(m)-1,2) |
|
97 except ValueError: |
|
98 pass |
|
99 else: |
|
100 verify(0, 'expected a ValueError but did not get it') |
|
101 |
|
102 # Try resizing map |
|
103 print ' Attempting resize()' |
|
104 try: |
|
105 m.resize(512) |
|
106 except SystemError: |
|
107 # resize() not supported |
|
108 # No messages are printed, since the output of this test suite |
|
109 # would then be different across platforms. |
|
110 pass |
|
111 else: |
|
112 # resize() is supported |
|
113 verify(len(m) == 512, |
|
114 "len(m) is %d, but expecting 512" % (len(m),) ) |
|
115 # Check that we can no longer seek beyond the new size. |
|
116 try: |
|
117 m.seek(513,0) |
|
118 except ValueError: |
|
119 pass |
|
120 else: |
|
121 verify(0, 'Could seek beyond the new size') |
|
122 |
|
123 # Check that the underlying file is truncated too |
|
124 # (bug #728515) |
|
125 f = open(TESTFN) |
|
126 f.seek(0, 2) |
|
127 verify(f.tell() == 512, 'Underlying file not truncated') |
|
128 f.close() |
|
129 verify(m.size() == 512, 'New size not reflected in file') |
|
130 |
|
131 m.close() |
|
132 |
|
133 finally: |
|
134 try: |
|
135 f.close() |
|
136 except OSError: |
|
137 pass |
|
138 try: |
|
139 os.unlink(TESTFN) |
|
140 except OSError: |
|
141 pass |
|
142 |
|
143 # Test for "access" keyword parameter |
|
144 try: |
|
145 mapsize = 10 |
|
146 print " Creating", mapsize, "byte test data file." |
|
147 open(TESTFN, "wb").write("a"*mapsize) |
|
148 print " Opening mmap with access=ACCESS_READ" |
|
149 f = open(TESTFN, "rb") |
|
150 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ) |
|
151 verify(m[:] == 'a'*mapsize, "Readonly memory map data incorrect.") |
|
152 |
|
153 print " Ensuring that readonly mmap can't be slice assigned." |
|
154 try: |
|
155 m[:] = 'b'*mapsize |
|
156 except TypeError: |
|
157 pass |
|
158 else: |
|
159 verify(0, "Able to write to readonly memory map") |
|
160 |
|
161 print " Ensuring that readonly mmap can't be item assigned." |
|
162 try: |
|
163 m[0] = 'b' |
|
164 except TypeError: |
|
165 pass |
|
166 else: |
|
167 verify(0, "Able to write to readonly memory map") |
|
168 |
|
169 print " Ensuring that readonly mmap can't be write() to." |
|
170 try: |
|
171 m.seek(0,0) |
|
172 m.write('abc') |
|
173 except TypeError: |
|
174 pass |
|
175 else: |
|
176 verify(0, "Able to write to readonly memory map") |
|
177 |
|
178 print " Ensuring that readonly mmap can't be write_byte() to." |
|
179 try: |
|
180 m.seek(0,0) |
|
181 m.write_byte('d') |
|
182 except TypeError: |
|
183 pass |
|
184 else: |
|
185 verify(0, "Able to write to readonly memory map") |
|
186 |
|
187 print " Ensuring that readonly mmap can't be resized." |
|
188 try: |
|
189 m.resize(2*mapsize) |
|
190 except SystemError: # resize is not universally supported |
|
191 pass |
|
192 except TypeError: |
|
193 pass |
|
194 else: |
|
195 verify(0, "Able to resize readonly memory map") |
|
196 del m, f |
|
197 verify(open(TESTFN, "rb").read() == 'a'*mapsize, |
|
198 "Readonly memory map data file was modified") |
|
199 |
|
200 print " Opening mmap with size too big" |
|
201 import sys |
|
202 f = open(TESTFN, "r+b") |
|
203 try: |
|
204 m = mmap.mmap(f.fileno(), mapsize+1) |
|
205 except ValueError: |
|
206 # we do not expect a ValueError on Windows |
|
207 # CAUTION: This also changes the size of the file on disk, and |
|
208 # later tests assume that the length hasn't changed. We need to |
|
209 # repair that. |
|
210 if sys.platform.startswith('win'): |
|
211 verify(0, "Opening mmap with size+1 should work on Windows.") |
|
212 else: |
|
213 # we expect a ValueError on Unix, but not on Windows |
|
214 if not sys.platform.startswith('win'): |
|
215 verify(0, "Opening mmap with size+1 should raise ValueError.") |
|
216 m.close() |
|
217 f.close() |
|
218 if sys.platform.startswith('win'): |
|
219 # Repair damage from the resizing test. |
|
220 f = open(TESTFN, 'r+b') |
|
221 f.truncate(mapsize) |
|
222 f.close() |
|
223 |
|
224 print " Opening mmap with access=ACCESS_WRITE" |
|
225 f = open(TESTFN, "r+b") |
|
226 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE) |
|
227 print " Modifying write-through memory map." |
|
228 m[:] = 'c'*mapsize |
|
229 verify(m[:] == 'c'*mapsize, |
|
230 "Write-through memory map memory not updated properly.") |
|
231 m.flush() |
|
232 m.close() |
|
233 f.close() |
|
234 f = open(TESTFN, 'rb') |
|
235 stuff = f.read() |
|
236 f.close() |
|
237 verify(stuff == 'c'*mapsize, |
|
238 "Write-through memory map data file not updated properly.") |
|
239 |
|
240 print " Opening mmap with access=ACCESS_COPY" |
|
241 f = open(TESTFN, "r+b") |
|
242 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY) |
|
243 print " Modifying copy-on-write memory map." |
|
244 m[:] = 'd'*mapsize |
|
245 verify(m[:] == 'd' * mapsize, |
|
246 "Copy-on-write memory map data not written correctly.") |
|
247 m.flush() |
|
248 verify(open(TESTFN, "rb").read() == 'c'*mapsize, |
|
249 "Copy-on-write test data file should not be modified.") |
|
250 try: |
|
251 print " Ensuring copy-on-write maps cannot be resized." |
|
252 m.resize(2*mapsize) |
|
253 except TypeError: |
|
254 pass |
|
255 else: |
|
256 verify(0, "Copy-on-write mmap resize did not raise exception.") |
|
257 del m, f |
|
258 try: |
|
259 print " Ensuring invalid access parameter raises exception." |
|
260 f = open(TESTFN, "r+b") |
|
261 m = mmap.mmap(f.fileno(), mapsize, access=4) |
|
262 except ValueError: |
|
263 pass |
|
264 else: |
|
265 verify(0, "Invalid access code should have raised exception.") |
|
266 |
|
267 if os.name == "posix": |
|
268 # Try incompatible flags, prot and access parameters. |
|
269 f = open(TESTFN, "r+b") |
|
270 try: |
|
271 m = mmap.mmap(f.fileno(), mapsize, flags=mmap.MAP_PRIVATE, |
|
272 prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE) |
|
273 except ValueError: |
|
274 pass |
|
275 else: |
|
276 verify(0, "Incompatible parameters should raise ValueError.") |
|
277 f.close() |
|
278 finally: |
|
279 try: |
|
280 os.unlink(TESTFN) |
|
281 except OSError: |
|
282 pass |
|
283 |
|
284 print ' Try opening a bad file descriptor...' |
|
285 try: |
|
286 mmap.mmap(-2, 4096) |
|
287 except mmap.error: |
|
288 pass |
|
289 else: |
|
290 verify(0, 'expected a mmap.error but did not get it') |
|
291 |
|
292 # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2, |
|
293 # searching for data with embedded \0 bytes didn't work. |
|
294 f = open(TESTFN, 'w+') |
|
295 |
|
296 try: # unlink TESTFN no matter what |
|
297 data = 'aabaac\x00deef\x00\x00aa\x00' |
|
298 n = len(data) |
|
299 f.write(data) |
|
300 f.flush() |
|
301 m = mmap.mmap(f.fileno(), n) |
|
302 f.close() |
|
303 |
|
304 for start in range(n+1): |
|
305 for finish in range(start, n+1): |
|
306 slice = data[start : finish] |
|
307 vereq(m.find(slice), data.find(slice)) |
|
308 vereq(m.find(slice + 'x'), -1) |
|
309 m.close() |
|
310 |
|
311 finally: |
|
312 os.unlink(TESTFN) |
|
313 |
|
314 # make sure a double close doesn't crash on Solaris (Bug# 665913) |
|
315 f = open(TESTFN, 'w+') |
|
316 |
|
317 try: # unlink TESTFN no matter what |
|
318 f.write(2**16 * 'a') # Arbitrary character |
|
319 f.close() |
|
320 |
|
321 f = open(TESTFN) |
|
322 mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ) |
|
323 mf.close() |
|
324 mf.close() |
|
325 f.close() |
|
326 |
|
327 finally: |
|
328 os.unlink(TESTFN) |
|
329 |
|
330 # test mapping of entire file by passing 0 for map length |
|
331 if hasattr(os, "stat"): |
|
332 print " Ensuring that passing 0 as map length sets map size to current file size." |
|
333 f = open(TESTFN, "w+") |
|
334 |
|
335 try: |
|
336 f.write(2**16 * 'm') # Arbitrary character |
|
337 f.close() |
|
338 |
|
339 f = open(TESTFN, "rb+") |
|
340 mf = mmap.mmap(f.fileno(), 0) |
|
341 verify(len(mf) == 2**16, "Map size should equal file size.") |
|
342 vereq(mf.read(2**16), 2**16 * "m") |
|
343 mf.close() |
|
344 f.close() |
|
345 |
|
346 finally: |
|
347 os.unlink(TESTFN) |
|
348 |
|
349 # test mapping of entire file by passing 0 for map length |
|
350 if hasattr(os, "stat"): |
|
351 print " Ensuring that passing 0 as map length sets map size to current file size." |
|
352 f = open(TESTFN, "w+") |
|
353 try: |
|
354 f.write(2**16 * 'm') # Arbitrary character |
|
355 f.close() |
|
356 |
|
357 f = open(TESTFN, "rb+") |
|
358 mf = mmap.mmap(f.fileno(), 0) |
|
359 verify(len(mf) == 2**16, "Map size should equal file size.") |
|
360 vereq(mf.read(2**16), 2**16 * "m") |
|
361 mf.close() |
|
362 f.close() |
|
363 |
|
364 finally: |
|
365 os.unlink(TESTFN) |
|
366 |
|
367 # make move works everywhere (64-bit format problem earlier) |
|
368 f = open(TESTFN, 'w+') |
|
369 |
|
370 try: # unlink TESTFN no matter what |
|
371 f.write("ABCDEabcde") # Arbitrary character |
|
372 f.flush() |
|
373 |
|
374 mf = mmap.mmap(f.fileno(), 10) |
|
375 mf.move(5, 0, 5) |
|
376 verify(mf[:] == "ABCDEABCDE", "Map move should have duplicated front 5") |
|
377 mf.close() |
|
378 f.close() |
|
379 |
|
380 finally: |
|
381 os.unlink(TESTFN) |
|
382 |
|
383 def test_anon(): |
|
384 print " anonymous mmap.mmap(-1, PAGESIZE)..." |
|
385 m = mmap.mmap(-1, PAGESIZE) |
|
386 for x in xrange(PAGESIZE): |
|
387 verify(m[x] == '\0', "anonymously mmap'ed contents should be zero") |
|
388 |
|
389 for x in xrange(PAGESIZE): |
|
390 m[x] = ch = chr(x & 255) |
|
391 vereq(m[x], ch) |
|
392 |
|
393 test_both() |
|
394 test_anon() |
|
395 print ' Test passed' |