symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_ftplib.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """Test script for ftplib module."""
       
     2 
       
     3 # Modified by Giampaolo Rodola' to test FTP class and IPv6 environment
       
     4 
       
     5 import ftplib
       
     6 import threading
       
     7 import asyncore
       
     8 import asynchat
       
     9 import socket
       
    10 import StringIO
       
    11 
       
    12 from unittest import TestCase
       
    13 from test import test_support
       
    14 from test.test_support import HOST
       
    15 
       
    16 
       
    17 # the dummy data returned by server over the data channel when
       
    18 # RETR, LIST and NLST commands are issued
       
    19 RETR_DATA = 'abcde12345\r\n' * 1000
       
    20 LIST_DATA = 'foo\r\nbar\r\n'
       
    21 NLST_DATA = 'foo\r\nbar\r\n'
       
    22 
       
    23 
       
    24 class DummyDTPHandler(asynchat.async_chat):
       
    25 
       
    26     def __init__(self, conn, baseclass):
       
    27         asynchat.async_chat.__init__(self, conn)
       
    28         self.baseclass = baseclass
       
    29         self.baseclass.last_received_data = ''
       
    30 
       
    31     def handle_read(self):
       
    32         self.baseclass.last_received_data += self.recv(1024)
       
    33 
       
    34     def handle_close(self):
       
    35         self.baseclass.push('226 transfer complete')
       
    36         self.close()
       
    37 
       
    38 
       
    39 class DummyFTPHandler(asynchat.async_chat):
       
    40 
       
    41     def __init__(self, conn):
       
    42         asynchat.async_chat.__init__(self, conn)
       
    43         self.set_terminator("\r\n")
       
    44         self.in_buffer = []
       
    45         self.dtp = None
       
    46         self.last_received_cmd = None
       
    47         self.last_received_data = ''
       
    48         self.next_response = ''
       
    49         self.push('220 welcome')
       
    50 
       
    51     def collect_incoming_data(self, data):
       
    52         self.in_buffer.append(data)
       
    53 
       
    54     def found_terminator(self):
       
    55         line = ''.join(self.in_buffer)
       
    56         self.in_buffer = []
       
    57         if self.next_response:
       
    58             self.push(self.next_response)
       
    59             self.next_response = ''
       
    60         cmd = line.split(' ')[0].lower()
       
    61         self.last_received_cmd = cmd
       
    62         space = line.find(' ')
       
    63         if space != -1:
       
    64             arg = line[space + 1:]
       
    65         else:
       
    66             arg = ""
       
    67         if hasattr(self, 'cmd_' + cmd):
       
    68             method = getattr(self, 'cmd_' + cmd)
       
    69             method(arg)
       
    70         else:
       
    71             self.push('550 command "%s" not understood.' %cmd)
       
    72 
       
    73     def handle_error(self):
       
    74         raise
       
    75 
       
    76     def push(self, data):
       
    77         asynchat.async_chat.push(self, data + '\r\n')
       
    78 
       
    79     def cmd_port(self, arg):
       
    80         addr = map(int, arg.split(','))
       
    81         ip = '%d.%d.%d.%d' %tuple(addr[:4])
       
    82         port = (addr[4] * 256) + addr[5]
       
    83         s = socket.create_connection((ip, port), timeout=2)
       
    84         self.dtp = DummyDTPHandler(s, baseclass=self)
       
    85         self.push('200 active data connection established')
       
    86 
       
    87     def cmd_pasv(self, arg):
       
    88         sock = socket.socket()
       
    89         sock.bind((self.socket.getsockname()[0], 0))
       
    90         sock.listen(5)
       
    91         sock.settimeout(2)
       
    92         ip, port = sock.getsockname()[:2]
       
    93         ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
       
    94         self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
       
    95         conn, addr = sock.accept()
       
    96         self.dtp = DummyDTPHandler(conn, baseclass=self)
       
    97 
       
    98     def cmd_eprt(self, arg):
       
    99         af, ip, port = arg.split(arg[0])[1:-1]
       
   100         port = int(port)
       
   101         s = socket.create_connection((ip, port), timeout=2)
       
   102         self.dtp = DummyDTPHandler(s, baseclass=self)
       
   103         self.push('200 active data connection established')
       
   104 
       
   105     def cmd_epsv(self, arg):
       
   106         sock = socket.socket(socket.AF_INET6)
       
   107         sock.bind((self.socket.getsockname()[0], 0))
       
   108         sock.listen(5)
       
   109         sock.settimeout(2)
       
   110         port = sock.getsockname()[1]
       
   111         self.push('229 entering extended passive mode (|||%d|)' %port)
       
   112         conn, addr = sock.accept()
       
   113         self.dtp = DummyDTPHandler(conn, baseclass=self)
       
   114 
       
   115     def cmd_echo(self, arg):
       
   116         # sends back the received string (used by the test suite)
       
   117         self.push(arg)
       
   118 
       
   119     def cmd_user(self, arg):
       
   120         self.push('331 username ok')
       
   121 
       
   122     def cmd_pass(self, arg):
       
   123         self.push('230 password ok')
       
   124 
       
   125     def cmd_acct(self, arg):
       
   126         self.push('230 acct ok')
       
   127 
       
   128     def cmd_rnfr(self, arg):
       
   129         self.push('350 rnfr ok')
       
   130 
       
   131     def cmd_rnto(self, arg):
       
   132         self.push('250 rnto ok')
       
   133 
       
   134     def cmd_dele(self, arg):
       
   135         self.push('250 dele ok')
       
   136 
       
   137     def cmd_cwd(self, arg):
       
   138         self.push('250 cwd ok')
       
   139 
       
   140     def cmd_size(self, arg):
       
   141         self.push('250 1000')
       
   142 
       
   143     def cmd_mkd(self, arg):
       
   144         self.push('257 "%s"' %arg)
       
   145 
       
   146     def cmd_rmd(self, arg):
       
   147         self.push('250 rmd ok')
       
   148 
       
   149     def cmd_pwd(self, arg):
       
   150         self.push('257 "pwd ok"')
       
   151 
       
   152     def cmd_type(self, arg):
       
   153         self.push('200 type ok')
       
   154 
       
   155     def cmd_quit(self, arg):
       
   156         self.push('221 quit ok')
       
   157         self.close()
       
   158 
       
   159     def cmd_stor(self, arg):
       
   160         self.push('125 stor ok')
       
   161 
       
   162     def cmd_retr(self, arg):
       
   163         self.push('125 retr ok')
       
   164         self.dtp.push(RETR_DATA)
       
   165         self.dtp.close_when_done()
       
   166 
       
   167     def cmd_list(self, arg):
       
   168         self.push('125 list ok')
       
   169         self.dtp.push(LIST_DATA)
       
   170         self.dtp.close_when_done()
       
   171 
       
   172     def cmd_nlst(self, arg):
       
   173         self.push('125 nlst ok')
       
   174         self.dtp.push(NLST_DATA)
       
   175         self.dtp.close_when_done()
       
   176 
       
   177 
       
   178 class DummyFTPServer(asyncore.dispatcher, threading.Thread):
       
   179 
       
   180     handler = DummyFTPHandler
       
   181 
       
   182     def __init__(self, address, af=socket.AF_INET):
       
   183         threading.Thread.__init__(self)
       
   184         asyncore.dispatcher.__init__(self)
       
   185         self.create_socket(af, socket.SOCK_STREAM)
       
   186         self.bind(address)
       
   187         self.listen(5)
       
   188         self.active = False
       
   189         self.active_lock = threading.Lock()
       
   190         self.host, self.port = self.socket.getsockname()[:2]
       
   191 
       
   192     def start(self):
       
   193         assert not self.active
       
   194         self.__flag = threading.Event()
       
   195         threading.Thread.start(self)
       
   196         self.__flag.wait()
       
   197 
       
   198     def run(self):
       
   199         self.active = True
       
   200         self.__flag.set()
       
   201         while self.active and asyncore.socket_map:
       
   202             self.active_lock.acquire()
       
   203             asyncore.loop(timeout=0.1, count=1)
       
   204             self.active_lock.release()
       
   205         asyncore.close_all(ignore_all=True)
       
   206 
       
   207     def stop(self):
       
   208         assert self.active
       
   209         self.active = False
       
   210         self.join()
       
   211 
       
   212     def handle_accept(self):
       
   213         conn, addr = self.accept()
       
   214         self.handler = self.handler(conn)
       
   215         self.close()
       
   216 
       
   217     def handle_connect(self):
       
   218         self.close()
       
   219     handle_read = handle_connect
       
   220 
       
   221     def writable(self):
       
   222         return 0
       
   223 
       
   224     def handle_error(self):
       
   225         raise
       
   226 
       
   227 
       
   228 class TestFTPClass(TestCase):
       
   229 
       
   230     def setUp(self):
       
   231         self.server = DummyFTPServer((HOST, 0))
       
   232         self.server.start()
       
   233         self.client = ftplib.FTP(timeout=2)
       
   234         self.client.connect(self.server.host, self.server.port)
       
   235 
       
   236     def tearDown(self):
       
   237         self.client.close()
       
   238         self.server.stop()
       
   239 
       
   240     def test_getwelcome(self):
       
   241         self.assertEqual(self.client.getwelcome(), '220 welcome')
       
   242 
       
   243     def test_sanitize(self):
       
   244         self.assertEqual(self.client.sanitize('foo'), repr('foo'))
       
   245         self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
       
   246         self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
       
   247 
       
   248     def test_exceptions(self):
       
   249         self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
       
   250         self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
       
   251         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
       
   252         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
       
   253         self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
       
   254 
       
   255     def test_all_errors(self):
       
   256         exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
       
   257                       ftplib.error_proto, ftplib.Error, IOError, EOFError)
       
   258         for x in exceptions:
       
   259             try:
       
   260                 raise x('exception not included in all_errors set')
       
   261             except ftplib.all_errors:
       
   262                 pass
       
   263 
       
   264     def test_set_pasv(self):
       
   265         # passive mode is supposed to be enabled by default
       
   266         self.assertTrue(self.client.passiveserver)
       
   267         self.client.set_pasv(True)
       
   268         self.assertTrue(self.client.passiveserver)
       
   269         self.client.set_pasv(False)
       
   270         self.assertFalse(self.client.passiveserver)
       
   271 
       
   272     def test_voidcmd(self):
       
   273         self.client.voidcmd('echo 200')
       
   274         self.client.voidcmd('echo 299')
       
   275         self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
       
   276         self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
       
   277 
       
   278     def test_login(self):
       
   279         self.client.login()
       
   280 
       
   281     def test_acct(self):
       
   282         self.client.acct('passwd')
       
   283 
       
   284     def test_rename(self):
       
   285         self.client.rename('a', 'b')
       
   286         self.server.handler.next_response = '200'
       
   287         self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
       
   288 
       
   289     def test_delete(self):
       
   290         self.client.delete('foo')
       
   291         self.server.handler.next_response = '199'
       
   292         self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
       
   293 
       
   294     def test_size(self):
       
   295         self.client.size('foo')
       
   296 
       
   297     def test_mkd(self):
       
   298         dir = self.client.mkd('/foo')
       
   299         self.assertEqual(dir, '/foo')
       
   300 
       
   301     def test_rmd(self):
       
   302         self.client.rmd('foo')
       
   303 
       
   304     def test_pwd(self):
       
   305         dir = self.client.pwd()
       
   306         self.assertEqual(dir, 'pwd ok')
       
   307 
       
   308     def test_quit(self):
       
   309         self.assertEqual(self.client.quit(), '221 quit ok')
       
   310         # Ensure the connection gets closed; sock attribute should be None
       
   311         self.assertEqual(self.client.sock, None)
       
   312 
       
   313     def test_retrbinary(self):
       
   314         received = []
       
   315         self.client.retrbinary('retr', received.append)
       
   316         self.assertEqual(''.join(received), RETR_DATA)
       
   317 
       
   318     def test_retrlines(self):
       
   319         received = []
       
   320         self.client.retrlines('retr', received.append)
       
   321         self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', ''))
       
   322 
       
   323     def test_storbinary(self):
       
   324         f = StringIO.StringIO(RETR_DATA)
       
   325         self.client.storbinary('stor', f)
       
   326         self.assertEqual(self.server.handler.last_received_data, RETR_DATA)
       
   327         # test new callback arg
       
   328         flag = []
       
   329         f.seek(0)
       
   330         self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
       
   331         self.assertTrue(flag)
       
   332 
       
   333     def test_storlines(self):
       
   334         f = StringIO.StringIO(RETR_DATA.replace('\r\n', '\n'))
       
   335         self.client.storlines('stor', f)
       
   336         self.assertEqual(self.server.handler.last_received_data, RETR_DATA)
       
   337         # test new callback arg
       
   338         flag = []
       
   339         f.seek(0)
       
   340         self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
       
   341         self.assertTrue(flag)
       
   342 
       
   343     def test_nlst(self):
       
   344         self.client.nlst()
       
   345         self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
       
   346 
       
   347     def test_dir(self):
       
   348         l = []
       
   349         self.client.dir(lambda x: l.append(x))
       
   350         self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
       
   351 
       
   352     def test_makeport(self):
       
   353         self.client.makeport()
       
   354         # IPv4 is in use, just make sure send_eprt has not been used
       
   355         self.assertEqual(self.server.handler.last_received_cmd, 'port')
       
   356 
       
   357     def test_makepasv(self):
       
   358         host, port = self.client.makepasv()
       
   359         conn = socket.create_connection((host, port), 2)
       
   360         conn.close()
       
   361         # IPv4 is in use, just make sure send_epsv has not been used
       
   362         self.assertEqual(self.server.handler.last_received_cmd, 'pasv')
       
   363 
       
   364 
       
   365 class TestIPv6Environment(TestCase):
       
   366 
       
   367     def setUp(self):
       
   368         self.server = DummyFTPServer((HOST, 0), af=socket.AF_INET6)
       
   369         self.server.start()
       
   370         self.client = ftplib.FTP()
       
   371         self.client.connect(self.server.host, self.server.port)
       
   372 
       
   373     def tearDown(self):
       
   374         self.client.close()
       
   375         self.server.stop()
       
   376 
       
   377     def test_af(self):
       
   378         self.assertEqual(self.client.af, socket.AF_INET6)
       
   379 
       
   380     def test_makeport(self):
       
   381         self.client.makeport()
       
   382         self.assertEqual(self.server.handler.last_received_cmd, 'eprt')
       
   383 
       
   384     def test_makepasv(self):
       
   385         host, port = self.client.makepasv()
       
   386         conn = socket.create_connection((host, port), 2)
       
   387         conn.close()
       
   388         self.assertEqual(self.server.handler.last_received_cmd, 'epsv')
       
   389 
       
   390     def test_transfer(self):
       
   391         def retr():
       
   392             received = []
       
   393             self.client.retrbinary('retr', received.append)
       
   394             self.assertEqual(''.join(received), RETR_DATA)
       
   395         self.client.set_pasv(True)
       
   396         retr()
       
   397         self.client.set_pasv(False)
       
   398         retr()
       
   399 
       
   400 
       
   401 class TestTimeouts(TestCase):
       
   402 
       
   403     def setUp(self):
       
   404         self.evt = threading.Event()
       
   405         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   406         self.sock.settimeout(3)
       
   407         self.port = test_support.bind_port(self.sock)
       
   408         threading.Thread(target=self.server, args=(self.evt,self.sock)).start()
       
   409         # Wait for the server to be ready.
       
   410         self.evt.wait()
       
   411         self.evt.clear()
       
   412         ftplib.FTP.port = self.port
       
   413 
       
   414     def tearDown(self):
       
   415         self.evt.wait()
       
   416 
       
   417     def server(self, evt, serv):
       
   418         # This method sets the evt 3 times:
       
   419         #  1) when the connection is ready to be accepted.
       
   420         #  2) when it is safe for the caller to close the connection
       
   421         #  3) when we have closed the socket
       
   422         serv.listen(5)
       
   423         # (1) Signal the caller that we are ready to accept the connection.
       
   424         evt.set()
       
   425         try:
       
   426             conn, addr = serv.accept()
       
   427         except socket.timeout:
       
   428             pass
       
   429         else:
       
   430             conn.send("1 Hola mundo\n")
       
   431             # (2) Signal the caller that it is safe to close the socket.
       
   432             evt.set()
       
   433             conn.close()
       
   434         finally:
       
   435             serv.close()
       
   436             # (3) Signal the caller that we are done.
       
   437             evt.set()
       
   438 
       
   439     def testTimeoutDefault(self):
       
   440         # default -- use global socket timeout
       
   441         self.assert_(socket.getdefaulttimeout() is None)
       
   442         socket.setdefaulttimeout(30)
       
   443         try:
       
   444             ftp = ftplib.FTP("localhost")
       
   445         finally:
       
   446             socket.setdefaulttimeout(None)
       
   447         self.assertEqual(ftp.sock.gettimeout(), 30)
       
   448         self.evt.wait()
       
   449         ftp.close()
       
   450 
       
   451     def testTimeoutNone(self):
       
   452         # no timeout -- do not use global socket timeout
       
   453         self.assert_(socket.getdefaulttimeout() is None)
       
   454         socket.setdefaulttimeout(30)
       
   455         try:
       
   456             ftp = ftplib.FTP("localhost", timeout=None)
       
   457         finally:
       
   458             socket.setdefaulttimeout(None)
       
   459         self.assertTrue(ftp.sock.gettimeout() is None)
       
   460         self.evt.wait()
       
   461         ftp.close()
       
   462 
       
   463     def testTimeoutValue(self):
       
   464         # a value
       
   465         ftp = ftplib.FTP(HOST, timeout=30)
       
   466         self.assertEqual(ftp.sock.gettimeout(), 30)
       
   467         self.evt.wait()
       
   468         ftp.close()
       
   469 
       
   470     def testTimeoutConnect(self):
       
   471         ftp = ftplib.FTP()
       
   472         ftp.connect(HOST, timeout=30)
       
   473         self.assertEqual(ftp.sock.gettimeout(), 30)
       
   474         self.evt.wait()
       
   475         ftp.close()
       
   476 
       
   477     def testTimeoutDifferentOrder(self):
       
   478         ftp = ftplib.FTP(timeout=30)
       
   479         ftp.connect(HOST)
       
   480         self.assertEqual(ftp.sock.gettimeout(), 30)
       
   481         self.evt.wait()
       
   482         ftp.close()
       
   483 
       
   484     def testTimeoutDirectAccess(self):
       
   485         ftp = ftplib.FTP()
       
   486         ftp.timeout = 30
       
   487         ftp.connect(HOST)
       
   488         self.assertEqual(ftp.sock.gettimeout(), 30)
       
   489         self.evt.wait()
       
   490         ftp.close()
       
   491 
       
   492 
       
   493 def test_main():
       
   494     tests = [TestFTPClass, TestTimeouts]
       
   495     if socket.has_ipv6:
       
   496         try:
       
   497             DummyFTPServer((HOST, 0), af=socket.AF_INET6)
       
   498         except socket.error:
       
   499             pass
       
   500         else:
       
   501             tests.append(TestIPv6Environment)
       
   502     thread_info = test_support.threading_setup()
       
   503     try:
       
   504         test_support.run_unittest(*tests)
       
   505     finally:
       
   506         test_support.threading_cleanup(*thread_info)
       
   507 
       
   508 
       
   509 if __name__ == '__main__':
       
   510     test_main()