symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_file.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 import sys
       
     2 import os
       
     3 import unittest
       
     4 import itertools
       
     5 import time
       
     6 import threading
       
     7 from array import array
       
     8 from weakref import proxy
       
     9 
       
    10 from test import test_support
       
    11 from test.test_support import TESTFN, findfile, run_unittest
       
    12 from UserList import UserList
       
    13 
       
    14 class AutoFileTests(unittest.TestCase):
       
    15     # file tests for which a test file is automatically set up
       
    16 
       
    17     def setUp(self):
       
    18         self.f = open(TESTFN, 'wb')
       
    19 
       
    20     def tearDown(self):
       
    21         if self.f:
       
    22             self.f.close()
       
    23         os.remove(TESTFN)
       
    24 
       
    25     def testWeakRefs(self):
       
    26         # verify weak references
       
    27         p = proxy(self.f)
       
    28         p.write('teststring')
       
    29         self.assertEquals(self.f.tell(), p.tell())
       
    30         self.f.close()
       
    31         self.f = None
       
    32         self.assertRaises(ReferenceError, getattr, p, 'tell')
       
    33 
       
    34     def testAttributes(self):
       
    35         # verify expected attributes exist
       
    36         f = self.f
       
    37         softspace = f.softspace
       
    38         f.name     # merely shouldn't blow up
       
    39         f.mode     # ditto
       
    40         f.closed   # ditto
       
    41 
       
    42         # verify softspace is writable
       
    43         f.softspace = softspace    # merely shouldn't blow up
       
    44 
       
    45         # verify the others aren't
       
    46         for attr in 'name', 'mode', 'closed':
       
    47             self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
       
    48 
       
    49     def testReadinto(self):
       
    50         # verify readinto
       
    51         self.f.write('12')
       
    52         self.f.close()
       
    53         a = array('c', 'x'*10)
       
    54         self.f = open(TESTFN, 'rb')
       
    55         n = self.f.readinto(a)
       
    56         self.assertEquals('12', a.tostring()[:n])
       
    57 
       
    58     def testWritelinesUserList(self):
       
    59         # verify writelines with instance sequence
       
    60         l = UserList(['1', '2'])
       
    61         self.f.writelines(l)
       
    62         self.f.close()
       
    63         self.f = open(TESTFN, 'rb')
       
    64         buf = self.f.read()
       
    65         self.assertEquals(buf, '12')
       
    66 
       
    67     def testWritelinesIntegers(self):
       
    68         # verify writelines with integers
       
    69         self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
       
    70 
       
    71     def testWritelinesIntegersUserList(self):
       
    72         # verify writelines with integers in UserList
       
    73         l = UserList([1,2,3])
       
    74         self.assertRaises(TypeError, self.f.writelines, l)
       
    75 
       
    76     def testWritelinesNonString(self):
       
    77         # verify writelines with non-string object
       
    78         class NonString:
       
    79             pass
       
    80 
       
    81         self.assertRaises(TypeError, self.f.writelines,
       
    82                           [NonString(), NonString()])
       
    83 
       
    84     def testRepr(self):
       
    85         # verify repr works
       
    86         self.assert_(repr(self.f).startswith("<open file '" + TESTFN))
       
    87 
       
    88     def testErrors(self):
       
    89         f = self.f
       
    90         self.assertEquals(f.name, TESTFN)
       
    91         self.assert_(not f.isatty())
       
    92         self.assert_(not f.closed)
       
    93 
       
    94         self.assertRaises(TypeError, f.readinto, "")
       
    95         f.close()
       
    96         self.assert_(f.closed)
       
    97 
       
    98     def testMethods(self):
       
    99         methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
       
   100                    'readline', 'readlines', 'seek', 'tell', 'truncate',
       
   101                    'write', 'xreadlines', '__iter__']
       
   102         if sys.platform.startswith('atheos'):
       
   103             methods.remove('truncate')
       
   104 
       
   105         # __exit__ should close the file
       
   106         self.f.__exit__(None, None, None)
       
   107         self.assert_(self.f.closed)
       
   108 
       
   109         for methodname in methods:
       
   110             method = getattr(self.f, methodname)
       
   111             # should raise on closed file
       
   112             self.assertRaises(ValueError, method)
       
   113         self.assertRaises(ValueError, self.f.writelines, [])
       
   114 
       
   115         # file is closed, __exit__ shouldn't do anything
       
   116         self.assertEquals(self.f.__exit__(None, None, None), None)
       
   117         # it must also return None if an exception was given
       
   118         try:
       
   119             1/0
       
   120         except:
       
   121             self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
       
   122 
       
   123 
       
   124 class OtherFileTests(unittest.TestCase):
       
   125 
       
   126     def testModeStrings(self):
       
   127         # check invalid mode strings
       
   128         for mode in ("", "aU", "wU+"):
       
   129             try:
       
   130                 f = open(TESTFN, mode)
       
   131             except ValueError:
       
   132                 pass
       
   133             else:
       
   134                 f.close()
       
   135                 self.fail('%r is an invalid file mode' % mode)
       
   136 
       
   137         # Some invalid modes fail on Windows, but pass on Unix
       
   138         # Issue3965: avoid a crash on Windows when filename is unicode
       
   139         for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
       
   140             try:
       
   141                 f = open(name, "rr")
       
   142             except IOError:
       
   143                 pass
       
   144             else:
       
   145                 f.close()
       
   146 
       
   147     def testStdin(self):
       
   148         # This causes the interpreter to exit on OSF1 v5.1.
       
   149         if sys.platform != 'osf1V5':
       
   150             self.assertRaises(IOError, sys.stdin.seek, -1)
       
   151         else:
       
   152             print >>sys.__stdout__, (
       
   153                 '  Skipping sys.stdin.seek(-1), it may crash the interpreter.'
       
   154                 ' Test manually.')
       
   155         self.assertRaises(IOError, sys.stdin.truncate)
       
   156 
       
   157     def testUnicodeOpen(self):
       
   158         # verify repr works for unicode too
       
   159         f = open(unicode(TESTFN), "w")
       
   160         self.assert_(repr(f).startswith("<open file u'" + TESTFN))
       
   161         f.close()
       
   162         os.unlink(TESTFN)
       
   163 
       
   164     def testBadModeArgument(self):
       
   165         # verify that we get a sensible error message for bad mode argument
       
   166         bad_mode = "qwerty"
       
   167         try:
       
   168             f = open(TESTFN, bad_mode)
       
   169         except ValueError, msg:
       
   170             if msg[0] != 0:
       
   171                 s = str(msg)
       
   172                 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:
       
   173                     self.fail("bad error message for invalid mode: %s" % s)
       
   174             # if msg[0] == 0, we're probably on Windows where there may be
       
   175             # no obvious way to discover why open() failed.
       
   176         else:
       
   177             f.close()
       
   178             self.fail("no error for invalid mode: %s" % bad_mode)
       
   179 
       
   180     def testSetBufferSize(self):
       
   181         # make sure that explicitly setting the buffer size doesn't cause
       
   182         # misbehaviour especially with repeated close() calls
       
   183         for s in (-1, 0, 1, 512):
       
   184             try:
       
   185                 f = open(TESTFN, 'w', s)
       
   186                 f.write(str(s))
       
   187                 f.close()
       
   188                 f.close()
       
   189                 f = open(TESTFN, 'r', s)
       
   190                 d = int(f.read())
       
   191                 f.close()
       
   192                 f.close()
       
   193             except IOError, msg:
       
   194                 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
       
   195             self.assertEquals(d, s)
       
   196 
       
   197     def testTruncateOnWindows(self):
       
   198         os.unlink(TESTFN)
       
   199 
       
   200         def bug801631():
       
   201             # SF bug <http://www.python.org/sf/801631>
       
   202             # "file.truncate fault on windows"
       
   203             f = open(TESTFN, 'wb')
       
   204             f.write('12345678901')   # 11 bytes
       
   205             f.close()
       
   206 
       
   207             f = open(TESTFN,'rb+')
       
   208             data = f.read(5)
       
   209             if data != '12345':
       
   210                 self.fail("Read on file opened for update failed %r" % data)
       
   211             if f.tell() != 5:
       
   212                 self.fail("File pos after read wrong %d" % f.tell())
       
   213 
       
   214             f.truncate()
       
   215             if f.tell() != 5:
       
   216                 self.fail("File pos after ftruncate wrong %d" % f.tell())
       
   217 
       
   218             f.close()
       
   219             size = os.path.getsize(TESTFN)
       
   220             if size != 5:
       
   221                 self.fail("File size after ftruncate wrong %d" % size)
       
   222 
       
   223         try:
       
   224             bug801631()
       
   225         finally:
       
   226             os.unlink(TESTFN)
       
   227 
       
   228     def testIteration(self):
       
   229         # Test the complex interaction when mixing file-iteration and the
       
   230         # various read* methods. Ostensibly, the mixture could just be tested
       
   231         # to work when it should work according to the Python language,
       
   232         # instead of fail when it should fail according to the current CPython
       
   233         # implementation.  People don't always program Python the way they
       
   234         # should, though, and the implemenation might change in subtle ways,
       
   235         # so we explicitly test for errors, too; the test will just have to
       
   236         # be updated when the implementation changes.
       
   237         dataoffset = 16384
       
   238         filler = "ham\n"
       
   239         assert not dataoffset % len(filler), \
       
   240             "dataoffset must be multiple of len(filler)"
       
   241         nchunks = dataoffset // len(filler)
       
   242         testlines = [
       
   243             "spam, spam and eggs\n",
       
   244             "eggs, spam, ham and spam\n",
       
   245             "saussages, spam, spam and eggs\n",
       
   246             "spam, ham, spam and eggs\n",
       
   247             "spam, spam, spam, spam, spam, ham, spam\n",
       
   248             "wonderful spaaaaaam.\n"
       
   249         ]
       
   250         methods = [("readline", ()), ("read", ()), ("readlines", ()),
       
   251                    ("readinto", (array("c", " "*100),))]
       
   252 
       
   253         try:
       
   254             # Prepare the testfile
       
   255             bag = open(TESTFN, "w")
       
   256             bag.write(filler * nchunks)
       
   257             bag.writelines(testlines)
       
   258             bag.close()
       
   259             # Test for appropriate errors mixing read* and iteration
       
   260             for methodname, args in methods:
       
   261                 f = open(TESTFN)
       
   262                 if f.next() != filler:
       
   263                     self.fail, "Broken testfile"
       
   264                 meth = getattr(f, methodname)
       
   265                 try:
       
   266                     meth(*args)
       
   267                 except ValueError:
       
   268                     pass
       
   269                 else:
       
   270                     self.fail("%s%r after next() didn't raise ValueError" %
       
   271                                      (methodname, args))
       
   272                 f.close()
       
   273 
       
   274             # Test to see if harmless (by accident) mixing of read* and
       
   275             # iteration still works. This depends on the size of the internal
       
   276             # iteration buffer (currently 8192,) but we can test it in a
       
   277             # flexible manner.  Each line in the bag o' ham is 4 bytes
       
   278             # ("h", "a", "m", "\n"), so 4096 lines of that should get us
       
   279             # exactly on the buffer boundary for any power-of-2 buffersize
       
   280             # between 4 and 16384 (inclusive).
       
   281             f = open(TESTFN)
       
   282             for i in range(nchunks):
       
   283                 f.next()
       
   284             testline = testlines.pop(0)
       
   285             try:
       
   286                 line = f.readline()
       
   287             except ValueError:
       
   288                 self.fail("readline() after next() with supposedly empty "
       
   289                           "iteration-buffer failed anyway")
       
   290             if line != testline:
       
   291                 self.fail("readline() after next() with empty buffer "
       
   292                           "failed. Got %r, expected %r" % (line, testline))
       
   293             testline = testlines.pop(0)
       
   294             buf = array("c", "\x00" * len(testline))
       
   295             try:
       
   296                 f.readinto(buf)
       
   297             except ValueError:
       
   298                 self.fail("readinto() after next() with supposedly empty "
       
   299                           "iteration-buffer failed anyway")
       
   300             line = buf.tostring()
       
   301             if line != testline:
       
   302                 self.fail("readinto() after next() with empty buffer "
       
   303                           "failed. Got %r, expected %r" % (line, testline))
       
   304 
       
   305             testline = testlines.pop(0)
       
   306             try:
       
   307                 line = f.read(len(testline))
       
   308             except ValueError:
       
   309                 self.fail("read() after next() with supposedly empty "
       
   310                           "iteration-buffer failed anyway")
       
   311             if line != testline:
       
   312                 self.fail("read() after next() with empty buffer "
       
   313                           "failed. Got %r, expected %r" % (line, testline))
       
   314             try:
       
   315                 lines = f.readlines()
       
   316             except ValueError:
       
   317                 self.fail("readlines() after next() with supposedly empty "
       
   318                           "iteration-buffer failed anyway")
       
   319             if lines != testlines:
       
   320                 self.fail("readlines() after next() with empty buffer "
       
   321                           "failed. Got %r, expected %r" % (line, testline))
       
   322             # Reading after iteration hit EOF shouldn't hurt either
       
   323             f = open(TESTFN)
       
   324             try:
       
   325                 for line in f:
       
   326                     pass
       
   327                 try:
       
   328                     f.readline()
       
   329                     f.readinto(buf)
       
   330                     f.read()
       
   331                     f.readlines()
       
   332                 except ValueError:
       
   333                     self.fail("read* failed after next() consumed file")
       
   334             finally:
       
   335                 f.close()
       
   336         finally:
       
   337             os.unlink(TESTFN)
       
   338 
       
   339 class FileSubclassTests(unittest.TestCase):
       
   340 
       
   341     def testExit(self):
       
   342         # test that exiting with context calls subclass' close
       
   343         class C(file):
       
   344             def __init__(self, *args):
       
   345                 self.subclass_closed = False
       
   346                 file.__init__(self, *args)
       
   347             def close(self):
       
   348                 self.subclass_closed = True
       
   349                 file.close(self)
       
   350 
       
   351         with C(TESTFN, 'w') as f:
       
   352             pass
       
   353         self.failUnless(f.subclass_closed)
       
   354 
       
   355 
       
   356 class FileThreadingTests(unittest.TestCase):
       
   357     # These tests check the ability to call various methods of file objects
       
   358     # (including close()) concurrently without crashing the Python interpreter.
       
   359     # See #815646, #595601
       
   360 
       
   361     def setUp(self):
       
   362         self.f = None
       
   363         self.filename = TESTFN
       
   364         with open(self.filename, "w") as f:
       
   365             f.write("\n".join("0123456789"))
       
   366         self._count_lock = threading.Lock()
       
   367         self.close_count = 0
       
   368         self.close_success_count = 0
       
   369 
       
   370     def tearDown(self):
       
   371         if self.f:
       
   372             try:
       
   373                 self.f.close()
       
   374             except (EnvironmentError, ValueError):
       
   375                 pass
       
   376         try:
       
   377             os.remove(self.filename)
       
   378         except EnvironmentError:
       
   379             pass
       
   380 
       
   381     def _create_file(self):
       
   382         self.f = open(self.filename, "w+")
       
   383 
       
   384     def _close_file(self):
       
   385         with self._count_lock:
       
   386             self.close_count += 1
       
   387         self.f.close()
       
   388         with self._count_lock:
       
   389             self.close_success_count += 1
       
   390 
       
   391     def _close_and_reopen_file(self):
       
   392         self._close_file()
       
   393         # if close raises an exception thats fine, self.f remains valid so
       
   394         # we don't need to reopen.
       
   395         self._create_file()
       
   396 
       
   397     def _run_workers(self, func, nb_workers, duration=0.2):
       
   398         with self._count_lock:
       
   399             self.close_count = 0
       
   400             self.close_success_count = 0
       
   401         self.do_continue = True
       
   402         threads = []
       
   403         try:
       
   404             for i in range(nb_workers):
       
   405                 t = threading.Thread(target=func)
       
   406                 t.start()
       
   407                 threads.append(t)
       
   408             for _ in xrange(100):
       
   409                 time.sleep(duration/100)
       
   410                 with self._count_lock:
       
   411                     if self.close_count-self.close_success_count > nb_workers+1:
       
   412                         if test_support.verbose:
       
   413                             print 'Q',
       
   414                         break
       
   415             time.sleep(duration)
       
   416         finally:
       
   417             self.do_continue = False
       
   418             for t in threads:
       
   419                 t.join()
       
   420 
       
   421     def _test_close_open_io(self, io_func, nb_workers=5):
       
   422         def worker():
       
   423             self._create_file()
       
   424             funcs = itertools.cycle((
       
   425                 lambda: io_func(),
       
   426                 lambda: self._close_and_reopen_file(),
       
   427             ))
       
   428             for f in funcs:
       
   429                 if not self.do_continue:
       
   430                     break
       
   431                 try:
       
   432                     f()
       
   433                 except (IOError, ValueError):
       
   434                     pass
       
   435         self._run_workers(worker, nb_workers)
       
   436         if test_support.verbose:
       
   437             # Useful verbose statistics when tuning this test to take
       
   438             # less time to run but still ensuring that its still useful.
       
   439             #
       
   440             # the percent of close calls that raised an error
       
   441             percent = 100. - 100.*self.close_success_count/self.close_count
       
   442             print self.close_count, ('%.4f ' % percent),
       
   443 
       
   444     def test_close_open(self):
       
   445         def io_func():
       
   446             pass
       
   447         self._test_close_open_io(io_func)
       
   448 
       
   449     def test_close_open_flush(self):
       
   450         def io_func():
       
   451             self.f.flush()
       
   452         self._test_close_open_io(io_func)
       
   453 
       
   454     def test_close_open_iter(self):
       
   455         def io_func():
       
   456             list(iter(self.f))
       
   457         self._test_close_open_io(io_func)
       
   458 
       
   459     def test_close_open_isatty(self):
       
   460         def io_func():
       
   461             self.f.isatty()
       
   462         self._test_close_open_io(io_func)
       
   463 
       
   464     def test_close_open_print(self):
       
   465         def io_func():
       
   466             print >> self.f, ''
       
   467         self._test_close_open_io(io_func)
       
   468 
       
   469     def test_close_open_read(self):
       
   470         def io_func():
       
   471             self.f.read(0)
       
   472         self._test_close_open_io(io_func)
       
   473 
       
   474     def test_close_open_readinto(self):
       
   475         def io_func():
       
   476             a = array('c', 'xxxxx')
       
   477             self.f.readinto(a)
       
   478         self._test_close_open_io(io_func)
       
   479 
       
   480     def test_close_open_readline(self):
       
   481         def io_func():
       
   482             self.f.readline()
       
   483         self._test_close_open_io(io_func)
       
   484 
       
   485     def test_close_open_readlines(self):
       
   486         def io_func():
       
   487             self.f.readlines()
       
   488         self._test_close_open_io(io_func)
       
   489 
       
   490     def test_close_open_seek(self):
       
   491         def io_func():
       
   492             self.f.seek(0, 0)
       
   493         self._test_close_open_io(io_func)
       
   494 
       
   495     def test_close_open_tell(self):
       
   496         def io_func():
       
   497             self.f.tell()
       
   498         self._test_close_open_io(io_func)
       
   499 
       
   500     def test_close_open_truncate(self):
       
   501         def io_func():
       
   502             self.f.truncate()
       
   503         self._test_close_open_io(io_func)
       
   504 
       
   505     def test_close_open_write(self):
       
   506         def io_func():
       
   507             self.f.write('')
       
   508         self._test_close_open_io(io_func)
       
   509 
       
   510     def test_close_open_writelines(self):
       
   511         def io_func():
       
   512             self.f.writelines('')
       
   513         self._test_close_open_io(io_func)
       
   514 
       
   515 
       
   516 class StdoutTests(unittest.TestCase):
       
   517 
       
   518     def test_move_stdout_on_write(self):
       
   519         # Issue 3242: sys.stdout can be replaced (and freed) during a
       
   520         # print statement; prevent a segfault in this case
       
   521         save_stdout = sys.stdout
       
   522 
       
   523         class File:
       
   524             def write(self, data):
       
   525                 if '\n' in data:
       
   526                     sys.stdout = save_stdout
       
   527 
       
   528         try:
       
   529             sys.stdout = File()
       
   530             print "some text"
       
   531         finally:
       
   532             sys.stdout = save_stdout
       
   533 
       
   534 
       
   535 def test_main():
       
   536     # Historically, these tests have been sloppy about removing TESTFN.
       
   537     # So get rid of it no matter what.
       
   538     try:
       
   539         run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
       
   540             FileThreadingTests, StdoutTests)
       
   541     finally:
       
   542         if os.path.exists(TESTFN):
       
   543             os.unlink(TESTFN)
       
   544 
       
   545 if __name__ == '__main__':
       
   546     test_main()