python-2.5.2/win32/Lib/test/test_csv.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 # -*- coding: iso-8859-1 -*-
       
     2 # Copyright (C) 2001,2002 Python Software Foundation
       
     3 # csv package unit tests
       
     4 
       
     5 import sys
       
     6 import os
       
     7 import unittest
       
     8 from StringIO import StringIO
       
     9 import tempfile
       
    10 import csv
       
    11 import gc
       
    12 from test import test_support
       
    13 
       
    14 class Test_Csv(unittest.TestCase):
       
    15     """
       
    16     Test the underlying C csv parser in ways that are not appropriate
       
    17     from the high level interface. Further tests of this nature are done
       
    18     in TestDialectRegistry.
       
    19     """
       
    20     def _test_arg_valid(self, ctor, arg):
       
    21         self.assertRaises(TypeError, ctor)
       
    22         self.assertRaises(TypeError, ctor, None)
       
    23         self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
       
    24         self.assertRaises(TypeError, ctor, arg, delimiter = 0)
       
    25         self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
       
    26         self.assertRaises(csv.Error, ctor, arg, 'foo')
       
    27         self.assertRaises(TypeError, ctor, arg, delimiter=None)
       
    28         self.assertRaises(TypeError, ctor, arg, delimiter=1)
       
    29         self.assertRaises(TypeError, ctor, arg, quotechar=1)
       
    30         self.assertRaises(TypeError, ctor, arg, lineterminator=None)
       
    31         self.assertRaises(TypeError, ctor, arg, lineterminator=1)
       
    32         self.assertRaises(TypeError, ctor, arg, quoting=None)
       
    33         self.assertRaises(TypeError, ctor, arg,
       
    34                           quoting=csv.QUOTE_ALL, quotechar='')
       
    35         self.assertRaises(TypeError, ctor, arg,
       
    36                           quoting=csv.QUOTE_ALL, quotechar=None)
       
    37 
       
    38     def test_reader_arg_valid(self):
       
    39         self._test_arg_valid(csv.reader, [])
       
    40 
       
    41     def test_writer_arg_valid(self):
       
    42         self._test_arg_valid(csv.writer, StringIO())
       
    43 
       
    44     def _test_default_attrs(self, ctor, *args):
       
    45         obj = ctor(*args)
       
    46         # Check defaults
       
    47         self.assertEqual(obj.dialect.delimiter, ',')
       
    48         self.assertEqual(obj.dialect.doublequote, True)
       
    49         self.assertEqual(obj.dialect.escapechar, None)
       
    50         self.assertEqual(obj.dialect.lineterminator, "\r\n")
       
    51         self.assertEqual(obj.dialect.quotechar, '"')
       
    52         self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
       
    53         self.assertEqual(obj.dialect.skipinitialspace, False)
       
    54         self.assertEqual(obj.dialect.strict, False)
       
    55         # Try deleting or changing attributes (they are read-only)
       
    56         self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter')
       
    57         self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':')
       
    58         self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
       
    59         self.assertRaises(AttributeError, setattr, obj.dialect,
       
    60                           'quoting', None)
       
    61 
       
    62     def test_reader_attrs(self):
       
    63         self._test_default_attrs(csv.reader, [])
       
    64 
       
    65     def test_writer_attrs(self):
       
    66         self._test_default_attrs(csv.writer, StringIO())
       
    67 
       
    68     def _test_kw_attrs(self, ctor, *args):
       
    69         # Now try with alternate options
       
    70         kwargs = dict(delimiter=':', doublequote=False, escapechar='\\',
       
    71                       lineterminator='\r', quotechar='*',
       
    72                       quoting=csv.QUOTE_NONE, skipinitialspace=True,
       
    73                       strict=True)
       
    74         obj = ctor(*args, **kwargs)
       
    75         self.assertEqual(obj.dialect.delimiter, ':')
       
    76         self.assertEqual(obj.dialect.doublequote, False)
       
    77         self.assertEqual(obj.dialect.escapechar, '\\')
       
    78         self.assertEqual(obj.dialect.lineterminator, "\r")
       
    79         self.assertEqual(obj.dialect.quotechar, '*')
       
    80         self.assertEqual(obj.dialect.quoting, csv.QUOTE_NONE)
       
    81         self.assertEqual(obj.dialect.skipinitialspace, True)
       
    82         self.assertEqual(obj.dialect.strict, True)
       
    83 
       
    84     def test_reader_kw_attrs(self):
       
    85         self._test_kw_attrs(csv.reader, [])
       
    86 
       
    87     def test_writer_kw_attrs(self):
       
    88         self._test_kw_attrs(csv.writer, StringIO())
       
    89 
       
    90     def _test_dialect_attrs(self, ctor, *args):
       
    91         # Now try with dialect-derived options
       
    92         class dialect:
       
    93             delimiter='-'
       
    94             doublequote=False
       
    95             escapechar='^'
       
    96             lineterminator='$'
       
    97             quotechar='#'
       
    98             quoting=csv.QUOTE_ALL
       
    99             skipinitialspace=True
       
   100             strict=False
       
   101         args = args + (dialect,)
       
   102         obj = ctor(*args)
       
   103         self.assertEqual(obj.dialect.delimiter, '-')
       
   104         self.assertEqual(obj.dialect.doublequote, False)
       
   105         self.assertEqual(obj.dialect.escapechar, '^')
       
   106         self.assertEqual(obj.dialect.lineterminator, "$")
       
   107         self.assertEqual(obj.dialect.quotechar, '#')
       
   108         self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
       
   109         self.assertEqual(obj.dialect.skipinitialspace, True)
       
   110         self.assertEqual(obj.dialect.strict, False)
       
   111 
       
   112     def test_reader_dialect_attrs(self):
       
   113         self._test_dialect_attrs(csv.reader, [])
       
   114 
       
   115     def test_writer_dialect_attrs(self):
       
   116         self._test_dialect_attrs(csv.writer, StringIO())
       
   117 
       
   118 
       
   119     def _write_test(self, fields, expect, **kwargs):
       
   120         fd, name = tempfile.mkstemp()
       
   121         fileobj = os.fdopen(fd, "w+b")
       
   122         try:
       
   123             writer = csv.writer(fileobj, **kwargs)
       
   124             writer.writerow(fields)
       
   125             fileobj.seek(0)
       
   126             self.assertEqual(fileobj.read(),
       
   127                              expect + writer.dialect.lineterminator)
       
   128         finally:
       
   129             fileobj.close()
       
   130             os.unlink(name)
       
   131 
       
   132     def test_write_arg_valid(self):
       
   133         self.assertRaises(csv.Error, self._write_test, None, '')
       
   134         self._write_test((), '')
       
   135         self._write_test([None], '""')
       
   136         self.assertRaises(csv.Error, self._write_test,
       
   137                           [None], None, quoting = csv.QUOTE_NONE)
       
   138         # Check that exceptions are passed up the chain
       
   139         class BadList:
       
   140             def __len__(self):
       
   141                 return 10;
       
   142             def __getitem__(self, i):
       
   143                 if i > 2:
       
   144                     raise IOError
       
   145         self.assertRaises(IOError, self._write_test, BadList(), '')
       
   146         class BadItem:
       
   147             def __str__(self):
       
   148                 raise IOError
       
   149         self.assertRaises(IOError, self._write_test, [BadItem()], '')
       
   150 
       
   151     def test_write_bigfield(self):
       
   152         # This exercises the buffer realloc functionality
       
   153         bigstring = 'X' * 50000
       
   154         self._write_test([bigstring,bigstring], '%s,%s' % \
       
   155                          (bigstring, bigstring))
       
   156 
       
   157     def test_write_quoting(self):
       
   158         self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
       
   159         self.assertRaises(csv.Error,
       
   160                           self._write_test,
       
   161                           ['a',1,'p,q'], 'a,1,p,q',
       
   162                           quoting = csv.QUOTE_NONE)
       
   163         self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
       
   164                          quoting = csv.QUOTE_MINIMAL)
       
   165         self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
       
   166                          quoting = csv.QUOTE_NONNUMERIC)
       
   167         self._write_test(['a',1,'p,q'], '"a","1","p,q"',
       
   168                          quoting = csv.QUOTE_ALL)
       
   169 
       
   170     def test_write_escape(self):
       
   171         self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
       
   172                          escapechar='\\')
       
   173         self.assertRaises(csv.Error,
       
   174                           self._write_test,
       
   175                           ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
       
   176                           escapechar=None, doublequote=False)
       
   177         self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
       
   178                          escapechar='\\', doublequote = False)
       
   179         self._write_test(['"'], '""""',
       
   180                          escapechar='\\', quoting = csv.QUOTE_MINIMAL)
       
   181         self._write_test(['"'], '\\"',
       
   182                          escapechar='\\', quoting = csv.QUOTE_MINIMAL,
       
   183                          doublequote = False)
       
   184         self._write_test(['"'], '\\"',
       
   185                          escapechar='\\', quoting = csv.QUOTE_NONE)
       
   186         self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
       
   187                          escapechar='\\', quoting = csv.QUOTE_NONE)
       
   188 
       
   189     def test_writerows(self):
       
   190         class BrokenFile:
       
   191             def write(self, buf):
       
   192                 raise IOError
       
   193         writer = csv.writer(BrokenFile())
       
   194         self.assertRaises(IOError, writer.writerows, [['a']])
       
   195         fd, name = tempfile.mkstemp()
       
   196         fileobj = os.fdopen(fd, "w+b")
       
   197         try:
       
   198             writer = csv.writer(fileobj)
       
   199             self.assertRaises(TypeError, writer.writerows, None)
       
   200             writer.writerows([['a','b'],['c','d']])
       
   201             fileobj.seek(0)
       
   202             self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
       
   203         finally:
       
   204             fileobj.close()
       
   205             os.unlink(name)
       
   206 
       
   207     def _read_test(self, input, expect, **kwargs):
       
   208         reader = csv.reader(input, **kwargs)
       
   209         result = list(reader)
       
   210         self.assertEqual(result, expect)
       
   211 
       
   212     def test_read_oddinputs(self):
       
   213         self._read_test([], [])
       
   214         self._read_test([''], [[]])
       
   215         self.assertRaises(csv.Error, self._read_test,
       
   216                           ['"ab"c'], None, strict = 1)
       
   217         # cannot handle null bytes for the moment
       
   218         self.assertRaises(csv.Error, self._read_test,
       
   219                           ['ab\0c'], None, strict = 1)
       
   220         self._read_test(['"ab"c'], [['abc']], doublequote = 0)
       
   221 
       
   222     def test_read_eol(self):
       
   223         self._read_test(['a,b'], [['a','b']])
       
   224         self._read_test(['a,b\n'], [['a','b']])
       
   225         self._read_test(['a,b\r\n'], [['a','b']])
       
   226         self._read_test(['a,b\r'], [['a','b']])
       
   227         self.assertRaises(csv.Error, self._read_test, ['a,b\rc,d'], [])
       
   228         self.assertRaises(csv.Error, self._read_test, ['a,b\nc,d'], [])
       
   229         self.assertRaises(csv.Error, self._read_test, ['a,b\r\nc,d'], [])
       
   230 
       
   231     def test_read_escape(self):
       
   232         self._read_test(['a,\\b,c'], [['a', 'b', 'c']], escapechar='\\')
       
   233         self._read_test(['a,b\\,c'], [['a', 'b,c']], escapechar='\\')
       
   234         self._read_test(['a,"b\\,c"'], [['a', 'b,c']], escapechar='\\')
       
   235         self._read_test(['a,"b,\\c"'], [['a', 'b,c']], escapechar='\\')
       
   236         self._read_test(['a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\')
       
   237         self._read_test(['a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\')
       
   238 
       
   239     def test_read_quoting(self):
       
   240         self._read_test(['1,",3,",5'], [['1', ',3,', '5']])
       
   241         self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
       
   242                         quotechar=None, escapechar='\\')
       
   243         self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
       
   244                         quoting=csv.QUOTE_NONE, escapechar='\\')
       
   245         # will this fail where locale uses comma for decimals?
       
   246         self._read_test([',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]],
       
   247                         quoting=csv.QUOTE_NONNUMERIC)
       
   248         self.assertRaises(ValueError, self._read_test,
       
   249                           ['abc,3'], [[]],
       
   250                           quoting=csv.QUOTE_NONNUMERIC)
       
   251 
       
   252     def test_read_bigfield(self):
       
   253         # This exercises the buffer realloc functionality and field size
       
   254         # limits.
       
   255         limit = csv.field_size_limit()
       
   256         try:
       
   257             size = 50000
       
   258             bigstring = 'X' * size
       
   259             bigline = '%s,%s' % (bigstring, bigstring)
       
   260             self._read_test([bigline], [[bigstring, bigstring]])
       
   261             csv.field_size_limit(size)
       
   262             self._read_test([bigline], [[bigstring, bigstring]])
       
   263             self.assertEqual(csv.field_size_limit(), size)
       
   264             csv.field_size_limit(size-1)
       
   265             self.assertRaises(csv.Error, self._read_test, [bigline], [])
       
   266             self.assertRaises(TypeError, csv.field_size_limit, None)
       
   267             self.assertRaises(TypeError, csv.field_size_limit, 1, None)
       
   268         finally:
       
   269             csv.field_size_limit(limit)
       
   270 
       
   271     def test_read_linenum(self):
       
   272         r = csv.reader(['line,1', 'line,2', 'line,3'])
       
   273         self.assertEqual(r.line_num, 0)
       
   274         r.next()
       
   275         self.assertEqual(r.line_num, 1)
       
   276         r.next()
       
   277         self.assertEqual(r.line_num, 2)
       
   278         r.next()
       
   279         self.assertEqual(r.line_num, 3)
       
   280         self.assertRaises(StopIteration, r.next)
       
   281         self.assertEqual(r.line_num, 3)
       
   282 
       
   283 class TestDialectRegistry(unittest.TestCase):
       
   284     def test_registry_badargs(self):
       
   285         self.assertRaises(TypeError, csv.list_dialects, None)
       
   286         self.assertRaises(TypeError, csv.get_dialect)
       
   287         self.assertRaises(csv.Error, csv.get_dialect, None)
       
   288         self.assertRaises(csv.Error, csv.get_dialect, "nonesuch")
       
   289         self.assertRaises(TypeError, csv.unregister_dialect)
       
   290         self.assertRaises(csv.Error, csv.unregister_dialect, None)
       
   291         self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch")
       
   292         self.assertRaises(TypeError, csv.register_dialect, None)
       
   293         self.assertRaises(TypeError, csv.register_dialect, None, None)
       
   294         self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0)
       
   295         self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
       
   296                           badargument=None)
       
   297         self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
       
   298                           quoting=None)
       
   299         self.assertRaises(TypeError, csv.register_dialect, [])
       
   300 
       
   301     def test_registry(self):
       
   302         class myexceltsv(csv.excel):
       
   303             delimiter = "\t"
       
   304         name = "myexceltsv"
       
   305         expected_dialects = csv.list_dialects() + [name]
       
   306         expected_dialects.sort()
       
   307         csv.register_dialect(name, myexceltsv)
       
   308         try:
       
   309             self.failUnless(csv.get_dialect(name).delimiter, '\t')
       
   310             got_dialects = csv.list_dialects()
       
   311             got_dialects.sort()
       
   312             self.assertEqual(expected_dialects, got_dialects)
       
   313         finally:
       
   314             csv.unregister_dialect(name)
       
   315 
       
   316     def test_register_kwargs(self):
       
   317         name = 'fedcba'
       
   318         csv.register_dialect(name, delimiter=';')
       
   319         try:
       
   320             self.failUnless(csv.get_dialect(name).delimiter, '\t')
       
   321             self.failUnless(list(csv.reader('X;Y;Z', name)), ['X', 'Y', 'Z'])
       
   322         finally:
       
   323             csv.unregister_dialect(name)
       
   324 
       
   325     def test_incomplete_dialect(self):
       
   326         class myexceltsv(csv.Dialect):
       
   327             delimiter = "\t"
       
   328         self.assertRaises(csv.Error, myexceltsv)
       
   329 
       
   330     def test_space_dialect(self):
       
   331         class space(csv.excel):
       
   332             delimiter = " "
       
   333             quoting = csv.QUOTE_NONE
       
   334             escapechar = "\\"
       
   335 
       
   336         fd, name = tempfile.mkstemp()
       
   337         fileobj = os.fdopen(fd, "w+b")
       
   338         try:
       
   339             fileobj.write("abc def\nc1ccccc1 benzene\n")
       
   340             fileobj.seek(0)
       
   341             rdr = csv.reader(fileobj, dialect=space())
       
   342             self.assertEqual(rdr.next(), ["abc", "def"])
       
   343             self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
       
   344         finally:
       
   345             fileobj.close()
       
   346             os.unlink(name)
       
   347 
       
   348     def test_dialect_apply(self):
       
   349         class testA(csv.excel):
       
   350             delimiter = "\t"
       
   351         class testB(csv.excel):
       
   352             delimiter = ":"
       
   353         class testC(csv.excel):
       
   354             delimiter = "|"
       
   355 
       
   356         csv.register_dialect('testC', testC)
       
   357         try:
       
   358             fd, name = tempfile.mkstemp()
       
   359             fileobj = os.fdopen(fd, "w+b")
       
   360             try:
       
   361                 writer = csv.writer(fileobj)
       
   362                 writer.writerow([1,2,3])
       
   363                 fileobj.seek(0)
       
   364                 self.assertEqual(fileobj.read(), "1,2,3\r\n")
       
   365             finally:
       
   366                 fileobj.close()
       
   367                 os.unlink(name)
       
   368 
       
   369             fd, name = tempfile.mkstemp()
       
   370             fileobj = os.fdopen(fd, "w+b")
       
   371             try:
       
   372                 writer = csv.writer(fileobj, testA)
       
   373                 writer.writerow([1,2,3])
       
   374                 fileobj.seek(0)
       
   375                 self.assertEqual(fileobj.read(), "1\t2\t3\r\n")
       
   376             finally:
       
   377                 fileobj.close()
       
   378                 os.unlink(name)
       
   379 
       
   380             fd, name = tempfile.mkstemp()
       
   381             fileobj = os.fdopen(fd, "w+b")
       
   382             try:
       
   383                 writer = csv.writer(fileobj, dialect=testB())
       
   384                 writer.writerow([1,2,3])
       
   385                 fileobj.seek(0)
       
   386                 self.assertEqual(fileobj.read(), "1:2:3\r\n")
       
   387             finally:
       
   388                 fileobj.close()
       
   389                 os.unlink(name)
       
   390 
       
   391             fd, name = tempfile.mkstemp()
       
   392             fileobj = os.fdopen(fd, "w+b")
       
   393             try:
       
   394                 writer = csv.writer(fileobj, dialect='testC')
       
   395                 writer.writerow([1,2,3])
       
   396                 fileobj.seek(0)
       
   397                 self.assertEqual(fileobj.read(), "1|2|3\r\n")
       
   398             finally:
       
   399                 fileobj.close()
       
   400                 os.unlink(name)
       
   401 
       
   402             fd, name = tempfile.mkstemp()
       
   403             fileobj = os.fdopen(fd, "w+b")
       
   404             try:
       
   405                 writer = csv.writer(fileobj, dialect=testA, delimiter=';')
       
   406                 writer.writerow([1,2,3])
       
   407                 fileobj.seek(0)
       
   408                 self.assertEqual(fileobj.read(), "1;2;3\r\n")
       
   409             finally:
       
   410                 fileobj.close()
       
   411                 os.unlink(name)
       
   412 
       
   413         finally:
       
   414             csv.unregister_dialect('testC')
       
   415 
       
   416     def test_bad_dialect(self):
       
   417         # Unknown parameter
       
   418         self.assertRaises(TypeError, csv.reader, [], bad_attr = 0)
       
   419         # Bad values
       
   420         self.assertRaises(TypeError, csv.reader, [], delimiter = None)
       
   421         self.assertRaises(TypeError, csv.reader, [], quoting = -1)
       
   422         self.assertRaises(TypeError, csv.reader, [], quoting = 100)
       
   423 
       
   424 class TestCsvBase(unittest.TestCase):
       
   425     def readerAssertEqual(self, input, expected_result):
       
   426         fd, name = tempfile.mkstemp()
       
   427         fileobj = os.fdopen(fd, "w+b")
       
   428         try:
       
   429             fileobj.write(input)
       
   430             fileobj.seek(0)
       
   431             reader = csv.reader(fileobj, dialect = self.dialect)
       
   432             fields = list(reader)
       
   433             self.assertEqual(fields, expected_result)
       
   434         finally:
       
   435             fileobj.close()
       
   436             os.unlink(name)
       
   437 
       
   438     def writerAssertEqual(self, input, expected_result):
       
   439         fd, name = tempfile.mkstemp()
       
   440         fileobj = os.fdopen(fd, "w+b")
       
   441         try:
       
   442             writer = csv.writer(fileobj, dialect = self.dialect)
       
   443             writer.writerows(input)
       
   444             fileobj.seek(0)
       
   445             self.assertEqual(fileobj.read(), expected_result)
       
   446         finally:
       
   447             fileobj.close()
       
   448             os.unlink(name)
       
   449 
       
   450 class TestDialectExcel(TestCsvBase):
       
   451     dialect = 'excel'
       
   452 
       
   453     def test_single(self):
       
   454         self.readerAssertEqual('abc', [['abc']])
       
   455 
       
   456     def test_simple(self):
       
   457         self.readerAssertEqual('1,2,3,4,5', [['1','2','3','4','5']])
       
   458 
       
   459     def test_blankline(self):
       
   460         self.readerAssertEqual('', [])
       
   461 
       
   462     def test_empty_fields(self):
       
   463         self.readerAssertEqual(',', [['', '']])
       
   464 
       
   465     def test_singlequoted(self):
       
   466         self.readerAssertEqual('""', [['']])
       
   467 
       
   468     def test_singlequoted_left_empty(self):
       
   469         self.readerAssertEqual('"",', [['','']])
       
   470 
       
   471     def test_singlequoted_right_empty(self):
       
   472         self.readerAssertEqual(',""', [['','']])
       
   473 
       
   474     def test_single_quoted_quote(self):
       
   475         self.readerAssertEqual('""""', [['"']])
       
   476 
       
   477     def test_quoted_quotes(self):
       
   478         self.readerAssertEqual('""""""', [['""']])
       
   479 
       
   480     def test_inline_quote(self):
       
   481         self.readerAssertEqual('a""b', [['a""b']])
       
   482 
       
   483     def test_inline_quotes(self):
       
   484         self.readerAssertEqual('a"b"c', [['a"b"c']])
       
   485 
       
   486     def test_quotes_and_more(self):
       
   487         self.readerAssertEqual('"a"b', [['ab']])
       
   488 
       
   489     def test_lone_quote(self):
       
   490         self.readerAssertEqual('a"b', [['a"b']])
       
   491 
       
   492     def test_quote_and_quote(self):
       
   493         self.readerAssertEqual('"a" "b"', [['a "b"']])
       
   494 
       
   495     def test_space_and_quote(self):
       
   496         self.readerAssertEqual(' "a"', [[' "a"']])
       
   497 
       
   498     def test_quoted(self):
       
   499         self.readerAssertEqual('1,2,3,"I think, therefore I am",5,6',
       
   500                                [['1', '2', '3',
       
   501                                  'I think, therefore I am',
       
   502                                  '5', '6']])
       
   503 
       
   504     def test_quoted_quote(self):
       
   505         self.readerAssertEqual('1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"',
       
   506                                [['1', '2', '3',
       
   507                                  '"I see," said the blind man',
       
   508                                  'as he picked up his hammer and saw']])
       
   509 
       
   510     def test_quoted_nl(self):
       
   511         input = '''\
       
   512 1,2,3,"""I see,""
       
   513 said the blind man","as he picked up his
       
   514 hammer and saw"
       
   515 9,8,7,6'''
       
   516         self.readerAssertEqual(input,
       
   517                                [['1', '2', '3',
       
   518                                    '"I see,"\nsaid the blind man',
       
   519                                    'as he picked up his\nhammer and saw'],
       
   520                                 ['9','8','7','6']])
       
   521 
       
   522     def test_dubious_quote(self):
       
   523         self.readerAssertEqual('12,12,1",', [['12', '12', '1"', '']])
       
   524 
       
   525     def test_null(self):
       
   526         self.writerAssertEqual([], '')
       
   527 
       
   528     def test_single(self):
       
   529         self.writerAssertEqual([['abc']], 'abc\r\n')
       
   530 
       
   531     def test_simple(self):
       
   532         self.writerAssertEqual([[1, 2, 'abc', 3, 4]], '1,2,abc,3,4\r\n')
       
   533 
       
   534     def test_quotes(self):
       
   535         self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]], '1,2,"a""bc""",3,4\r\n')
       
   536 
       
   537     def test_quote_fieldsep(self):
       
   538         self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
       
   539 
       
   540     def test_newlines(self):
       
   541         self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]], '1,2,"a\nbc",3,4\r\n')
       
   542 
       
   543 class EscapedExcel(csv.excel):
       
   544     quoting = csv.QUOTE_NONE
       
   545     escapechar = '\\'
       
   546 
       
   547 class TestEscapedExcel(TestCsvBase):
       
   548     dialect = EscapedExcel()
       
   549 
       
   550     def test_escape_fieldsep(self):
       
   551         self.writerAssertEqual([['abc,def']], 'abc\\,def\r\n')
       
   552 
       
   553     def test_read_escape_fieldsep(self):
       
   554         self.readerAssertEqual('abc\\,def\r\n', [['abc,def']])
       
   555 
       
   556 class QuotedEscapedExcel(csv.excel):
       
   557     quoting = csv.QUOTE_NONNUMERIC
       
   558     escapechar = '\\'
       
   559 
       
   560 class TestQuotedEscapedExcel(TestCsvBase):
       
   561     dialect = QuotedEscapedExcel()
       
   562 
       
   563     def test_write_escape_fieldsep(self):
       
   564         self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
       
   565 
       
   566     def test_read_escape_fieldsep(self):
       
   567         self.readerAssertEqual('"abc\\,def"\r\n', [['abc,def']])
       
   568 
       
   569 class TestDictFields(unittest.TestCase):
       
   570     ### "long" means the row is longer than the number of fieldnames
       
   571     ### "short" means there are fewer elements in the row than fieldnames
       
   572     def test_write_simple_dict(self):
       
   573         fd, name = tempfile.mkstemp()
       
   574         fileobj = os.fdopen(fd, "w+b")
       
   575         try:
       
   576             writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
       
   577             writer.writerow({"f1": 10, "f3": "abc"})
       
   578             fileobj.seek(0)
       
   579             self.assertEqual(fileobj.read(), "10,,abc\r\n")
       
   580         finally:
       
   581             fileobj.close()
       
   582             os.unlink(name)
       
   583 
       
   584     def test_write_no_fields(self):
       
   585         fileobj = StringIO()
       
   586         self.assertRaises(TypeError, csv.DictWriter, fileobj)
       
   587 
       
   588     def test_read_dict_fields(self):
       
   589         fd, name = tempfile.mkstemp()
       
   590         fileobj = os.fdopen(fd, "w+b")
       
   591         try:
       
   592             fileobj.write("1,2,abc\r\n")
       
   593             fileobj.seek(0)
       
   594             reader = csv.DictReader(fileobj,
       
   595                                     fieldnames=["f1", "f2", "f3"])
       
   596             self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
       
   597         finally:
       
   598             fileobj.close()
       
   599             os.unlink(name)
       
   600 
       
   601     def test_read_dict_no_fieldnames(self):
       
   602         fd, name = tempfile.mkstemp()
       
   603         fileobj = os.fdopen(fd, "w+b")
       
   604         try:
       
   605             fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
       
   606             fileobj.seek(0)
       
   607             reader = csv.DictReader(fileobj)
       
   608             self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
       
   609         finally:
       
   610             fileobj.close()
       
   611             os.unlink(name)
       
   612 
       
   613     def test_read_long(self):
       
   614         fd, name = tempfile.mkstemp()
       
   615         fileobj = os.fdopen(fd, "w+b")
       
   616         try:
       
   617             fileobj.write("1,2,abc,4,5,6\r\n")
       
   618             fileobj.seek(0)
       
   619             reader = csv.DictReader(fileobj,
       
   620                                     fieldnames=["f1", "f2"])
       
   621             self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
       
   622                                              None: ["abc", "4", "5", "6"]})
       
   623         finally:
       
   624             fileobj.close()
       
   625             os.unlink(name)
       
   626 
       
   627     def test_read_long_with_rest(self):
       
   628         fd, name = tempfile.mkstemp()
       
   629         fileobj = os.fdopen(fd, "w+b")
       
   630         try:
       
   631             fileobj.write("1,2,abc,4,5,6\r\n")
       
   632             fileobj.seek(0)
       
   633             reader = csv.DictReader(fileobj,
       
   634                                     fieldnames=["f1", "f2"], restkey="_rest")
       
   635             self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
       
   636                                              "_rest": ["abc", "4", "5", "6"]})
       
   637         finally:
       
   638             fileobj.close()
       
   639             os.unlink(name)
       
   640 
       
   641     def test_read_long_with_rest_no_fieldnames(self):
       
   642         fd, name = tempfile.mkstemp()
       
   643         fileobj = os.fdopen(fd, "w+b")
       
   644         try:
       
   645             fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n")
       
   646             fileobj.seek(0)
       
   647             reader = csv.DictReader(fileobj, restkey="_rest")
       
   648             self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
       
   649                                              "_rest": ["abc", "4", "5", "6"]})
       
   650         finally:
       
   651             fileobj.close()
       
   652             os.unlink(name)
       
   653 
       
   654     def test_read_short(self):
       
   655         fd, name = tempfile.mkstemp()
       
   656         fileobj = os.fdopen(fd, "w+b")
       
   657         try:
       
   658             fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n")
       
   659             fileobj.seek(0)
       
   660             reader = csv.DictReader(fileobj,
       
   661                                     fieldnames="1 2 3 4 5 6".split(),
       
   662                                     restval="DEFAULT")
       
   663             self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
       
   664                                              "4": '4', "5": '5', "6": '6'})
       
   665             self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
       
   666                                              "4": 'DEFAULT', "5": 'DEFAULT',
       
   667                                              "6": 'DEFAULT'})
       
   668         finally:
       
   669             fileobj.close()
       
   670             os.unlink(name)
       
   671 
       
   672     def test_read_multi(self):
       
   673         sample = [
       
   674             '2147483648,43.0e12,17,abc,def\r\n',
       
   675             '147483648,43.0e2,17,abc,def\r\n',
       
   676             '47483648,43.0,170,abc,def\r\n'
       
   677             ]
       
   678 
       
   679         reader = csv.DictReader(sample,
       
   680                                 fieldnames="i1 float i2 s1 s2".split())
       
   681         self.assertEqual(reader.next(), {"i1": '2147483648',
       
   682                                          "float": '43.0e12',
       
   683                                          "i2": '17',
       
   684                                          "s1": 'abc',
       
   685                                          "s2": 'def'})
       
   686 
       
   687     def test_read_with_blanks(self):
       
   688         reader = csv.DictReader(["1,2,abc,4,5,6\r\n","\r\n",
       
   689                                  "1,2,abc,4,5,6\r\n"],
       
   690                                 fieldnames="1 2 3 4 5 6".split())
       
   691         self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
       
   692                                          "4": '4', "5": '5', "6": '6'})
       
   693         self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
       
   694                                          "4": '4', "5": '5', "6": '6'})
       
   695 
       
   696     def test_read_semi_sep(self):
       
   697         reader = csv.DictReader(["1;2;abc;4;5;6\r\n"],
       
   698                                 fieldnames="1 2 3 4 5 6".split(),
       
   699                                 delimiter=';')
       
   700         self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
       
   701                                          "4": '4', "5": '5', "6": '6'})
       
   702 
       
   703 class TestArrayWrites(unittest.TestCase):
       
   704     def test_int_write(self):
       
   705         import array
       
   706         contents = [(20-i) for i in range(20)]
       
   707         a = array.array('i', contents)
       
   708 
       
   709         fd, name = tempfile.mkstemp()
       
   710         fileobj = os.fdopen(fd, "w+b")
       
   711         try:
       
   712             writer = csv.writer(fileobj, dialect="excel")
       
   713             writer.writerow(a)
       
   714             expected = ",".join([str(i) for i in a])+"\r\n"
       
   715             fileobj.seek(0)
       
   716             self.assertEqual(fileobj.read(), expected)
       
   717         finally:
       
   718             fileobj.close()
       
   719             os.unlink(name)
       
   720 
       
   721     def test_double_write(self):
       
   722         import array
       
   723         contents = [(20-i)*0.1 for i in range(20)]
       
   724         a = array.array('d', contents)
       
   725         fd, name = tempfile.mkstemp()
       
   726         fileobj = os.fdopen(fd, "w+b")
       
   727         try:
       
   728             writer = csv.writer(fileobj, dialect="excel")
       
   729             writer.writerow(a)
       
   730             expected = ",".join([str(i) for i in a])+"\r\n"
       
   731             fileobj.seek(0)
       
   732             self.assertEqual(fileobj.read(), expected)
       
   733         finally:
       
   734             fileobj.close()
       
   735             os.unlink(name)
       
   736 
       
   737     def test_float_write(self):
       
   738         import array
       
   739         contents = [(20-i)*0.1 for i in range(20)]
       
   740         a = array.array('f', contents)
       
   741         fd, name = tempfile.mkstemp()
       
   742         fileobj = os.fdopen(fd, "w+b")
       
   743         try:
       
   744             writer = csv.writer(fileobj, dialect="excel")
       
   745             writer.writerow(a)
       
   746             expected = ",".join([str(i) for i in a])+"\r\n"
       
   747             fileobj.seek(0)
       
   748             self.assertEqual(fileobj.read(), expected)
       
   749         finally:
       
   750             fileobj.close()
       
   751             os.unlink(name)
       
   752 
       
   753     def test_char_write(self):
       
   754         import array, string
       
   755         a = array.array('c', string.letters)
       
   756         fd, name = tempfile.mkstemp()
       
   757         fileobj = os.fdopen(fd, "w+b")
       
   758         try:
       
   759             writer = csv.writer(fileobj, dialect="excel")
       
   760             writer.writerow(a)
       
   761             expected = ",".join(a)+"\r\n"
       
   762             fileobj.seek(0)
       
   763             self.assertEqual(fileobj.read(), expected)
       
   764         finally:
       
   765             fileobj.close()
       
   766             os.unlink(name)
       
   767 
       
   768 class TestDialectValidity(unittest.TestCase):
       
   769     def test_quoting(self):
       
   770         class mydialect(csv.Dialect):
       
   771             delimiter = ";"
       
   772             escapechar = '\\'
       
   773             doublequote = False
       
   774             skipinitialspace = True
       
   775             lineterminator = '\r\n'
       
   776             quoting = csv.QUOTE_NONE
       
   777         d = mydialect()
       
   778 
       
   779         mydialect.quoting = None
       
   780         self.assertRaises(csv.Error, mydialect)
       
   781 
       
   782         mydialect.doublequote = True
       
   783         mydialect.quoting = csv.QUOTE_ALL
       
   784         mydialect.quotechar = '"'
       
   785         d = mydialect()
       
   786 
       
   787         mydialect.quotechar = "''"
       
   788         self.assertRaises(csv.Error, mydialect)
       
   789 
       
   790         mydialect.quotechar = 4
       
   791         self.assertRaises(csv.Error, mydialect)
       
   792 
       
   793     def test_delimiter(self):
       
   794         class mydialect(csv.Dialect):
       
   795             delimiter = ";"
       
   796             escapechar = '\\'
       
   797             doublequote = False
       
   798             skipinitialspace = True
       
   799             lineterminator = '\r\n'
       
   800             quoting = csv.QUOTE_NONE
       
   801         d = mydialect()
       
   802 
       
   803         mydialect.delimiter = ":::"
       
   804         self.assertRaises(csv.Error, mydialect)
       
   805 
       
   806         mydialect.delimiter = 4
       
   807         self.assertRaises(csv.Error, mydialect)
       
   808 
       
   809     def test_lineterminator(self):
       
   810         class mydialect(csv.Dialect):
       
   811             delimiter = ";"
       
   812             escapechar = '\\'
       
   813             doublequote = False
       
   814             skipinitialspace = True
       
   815             lineterminator = '\r\n'
       
   816             quoting = csv.QUOTE_NONE
       
   817         d = mydialect()
       
   818 
       
   819         mydialect.lineterminator = ":::"
       
   820         d = mydialect()
       
   821 
       
   822         mydialect.lineterminator = 4
       
   823         self.assertRaises(csv.Error, mydialect)
       
   824 
       
   825 
       
   826 class TestSniffer(unittest.TestCase):
       
   827     sample1 = """\
       
   828 Harry's, Arlington Heights, IL, 2/1/03, Kimi Hayes
       
   829 Shark City, Glendale Heights, IL, 12/28/02, Prezence
       
   830 Tommy's Place, Blue Island, IL, 12/28/02, Blue Sunday/White Crow
       
   831 Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back
       
   832 """
       
   833     sample2 = """\
       
   834 'Harry''s':'Arlington Heights':'IL':'2/1/03':'Kimi Hayes'
       
   835 'Shark City':'Glendale Heights':'IL':'12/28/02':'Prezence'
       
   836 'Tommy''s Place':'Blue Island':'IL':'12/28/02':'Blue Sunday/White Crow'
       
   837 'Stonecutters Seafood and Chop House':'Lemont':'IL':'12/19/02':'Week Back'
       
   838 """
       
   839     header = '''\
       
   840 "venue","city","state","date","performers"
       
   841 '''
       
   842     sample3 = '''\
       
   843 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
       
   844 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
       
   845 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
       
   846 '''
       
   847 
       
   848     sample4 = '''\
       
   849 2147483648;43.0e12;17;abc;def
       
   850 147483648;43.0e2;17;abc;def
       
   851 47483648;43.0;170;abc;def
       
   852 '''
       
   853 
       
   854     sample5 = "aaa\tbbb\r\nAAA\t\r\nBBB\t\r\n"
       
   855     sample6 = "a|b|c\r\nd|e|f\r\n"
       
   856     sample7 = "'a'|'b'|'c'\r\n'd'|e|f\r\n"
       
   857 
       
   858     def test_has_header(self):
       
   859         sniffer = csv.Sniffer()
       
   860         self.assertEqual(sniffer.has_header(self.sample1), False)
       
   861         self.assertEqual(sniffer.has_header(self.header+self.sample1), True)
       
   862 
       
   863     def test_sniff(self):
       
   864         sniffer = csv.Sniffer()
       
   865         dialect = sniffer.sniff(self.sample1)
       
   866         self.assertEqual(dialect.delimiter, ",")
       
   867         self.assertEqual(dialect.quotechar, '"')
       
   868         self.assertEqual(dialect.skipinitialspace, True)
       
   869 
       
   870         dialect = sniffer.sniff(self.sample2)
       
   871         self.assertEqual(dialect.delimiter, ":")
       
   872         self.assertEqual(dialect.quotechar, "'")
       
   873         self.assertEqual(dialect.skipinitialspace, False)
       
   874 
       
   875     def test_delimiters(self):
       
   876         sniffer = csv.Sniffer()
       
   877         dialect = sniffer.sniff(self.sample3)
       
   878         # given that all three lines in sample3 are equal,
       
   879         # I think that any character could have been 'guessed' as the
       
   880         # delimiter, depending on dictionary order
       
   881         self.assert_(dialect.delimiter in self.sample3)
       
   882         dialect = sniffer.sniff(self.sample3, delimiters="?,")
       
   883         self.assertEqual(dialect.delimiter, "?")
       
   884         dialect = sniffer.sniff(self.sample3, delimiters="/,")
       
   885         self.assertEqual(dialect.delimiter, "/")
       
   886         dialect = sniffer.sniff(self.sample4)
       
   887         self.assertEqual(dialect.delimiter, ";")
       
   888         dialect = sniffer.sniff(self.sample5)
       
   889         self.assertEqual(dialect.delimiter, "\t")
       
   890         dialect = sniffer.sniff(self.sample6)
       
   891         self.assertEqual(dialect.delimiter, "|")
       
   892         dialect = sniffer.sniff(self.sample7)
       
   893         self.assertEqual(dialect.delimiter, "|")
       
   894         self.assertEqual(dialect.quotechar, "'")
       
   895 
       
   896 if not hasattr(sys, "gettotalrefcount"):
       
   897     if test_support.verbose: print "*** skipping leakage tests ***"
       
   898 else:
       
   899     class NUL:
       
   900         def write(s, *args):
       
   901             pass
       
   902         writelines = write
       
   903 
       
   904     class TestLeaks(unittest.TestCase):
       
   905         def test_create_read(self):
       
   906             delta = 0
       
   907             lastrc = sys.gettotalrefcount()
       
   908             for i in xrange(20):
       
   909                 gc.collect()
       
   910                 self.assertEqual(gc.garbage, [])
       
   911                 rc = sys.gettotalrefcount()
       
   912                 csv.reader(["a,b,c\r\n"])
       
   913                 csv.reader(["a,b,c\r\n"])
       
   914                 csv.reader(["a,b,c\r\n"])
       
   915                 delta = rc-lastrc
       
   916                 lastrc = rc
       
   917             # if csv.reader() leaks, last delta should be 3 or more
       
   918             self.assertEqual(delta < 3, True)
       
   919 
       
   920         def test_create_write(self):
       
   921             delta = 0
       
   922             lastrc = sys.gettotalrefcount()
       
   923             s = NUL()
       
   924             for i in xrange(20):
       
   925                 gc.collect()
       
   926                 self.assertEqual(gc.garbage, [])
       
   927                 rc = sys.gettotalrefcount()
       
   928                 csv.writer(s)
       
   929                 csv.writer(s)
       
   930                 csv.writer(s)
       
   931                 delta = rc-lastrc
       
   932                 lastrc = rc
       
   933             # if csv.writer() leaks, last delta should be 3 or more
       
   934             self.assertEqual(delta < 3, True)
       
   935 
       
   936         def test_read(self):
       
   937             delta = 0
       
   938             rows = ["a,b,c\r\n"]*5
       
   939             lastrc = sys.gettotalrefcount()
       
   940             for i in xrange(20):
       
   941                 gc.collect()
       
   942                 self.assertEqual(gc.garbage, [])
       
   943                 rc = sys.gettotalrefcount()
       
   944                 rdr = csv.reader(rows)
       
   945                 for row in rdr:
       
   946                     pass
       
   947                 delta = rc-lastrc
       
   948                 lastrc = rc
       
   949             # if reader leaks during read, delta should be 5 or more
       
   950             self.assertEqual(delta < 5, True)
       
   951 
       
   952         def test_write(self):
       
   953             delta = 0
       
   954             rows = [[1,2,3]]*5
       
   955             s = NUL()
       
   956             lastrc = sys.gettotalrefcount()
       
   957             for i in xrange(20):
       
   958                 gc.collect()
       
   959                 self.assertEqual(gc.garbage, [])
       
   960                 rc = sys.gettotalrefcount()
       
   961                 writer = csv.writer(s)
       
   962                 for row in rows:
       
   963                     writer.writerow(row)
       
   964                 delta = rc-lastrc
       
   965                 lastrc = rc
       
   966             # if writer leaks during write, last delta should be 5 or more
       
   967             self.assertEqual(delta < 5, True)
       
   968 
       
   969 # commented out for now - csv module doesn't yet support Unicode
       
   970 ## class TestUnicode(unittest.TestCase):
       
   971 ##     def test_unicode_read(self):
       
   972 ##         import codecs
       
   973 ##         f = codecs.EncodedFile(StringIO("Martin von Löwis,"
       
   974 ##                                         "Marc André Lemburg,"
       
   975 ##                                         "Guido van Rossum,"
       
   976 ##                                         "François Pinard\r\n"),
       
   977 ##                                data_encoding='iso-8859-1')
       
   978 ##         reader = csv.reader(f)
       
   979 ##         self.assertEqual(list(reader), [[u"Martin von Löwis",
       
   980 ##                                          u"Marc André Lemburg",
       
   981 ##                                          u"Guido van Rossum",
       
   982 ##                                          u"François Pinardn"]])
       
   983 
       
   984 def test_main():
       
   985     mod = sys.modules[__name__]
       
   986     test_support.run_unittest(
       
   987         *[getattr(mod, name) for name in dir(mod) if name.startswith('Test')]
       
   988     )
       
   989 
       
   990 if __name__ == '__main__':
       
   991     test_main()