symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_logging.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #!/usr/bin/env python
       
     2 #
       
     3 # Copyright 2001-2004 by Vinay Sajip. All Rights Reserved.
       
     4 #
       
     5 # Permission to use, copy, modify, and distribute this software and its
       
     6 # documentation for any purpose and without fee is hereby granted,
       
     7 # provided that the above copyright notice appear in all copies and that
       
     8 # both that copyright notice and this permission notice appear in
       
     9 # supporting documentation, and that the name of Vinay Sajip
       
    10 # not be used in advertising or publicity pertaining to distribution
       
    11 # of the software without specific, written prior permission.
       
    12 # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
       
    13 # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
       
    14 # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
       
    15 # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
       
    16 # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
       
    17 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
       
    18 
       
    19 """Test harness for the logging module. Run all tests.
       
    20 
       
    21 Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
       
    22 """
       
    23 
       
    24 import logging
       
    25 import logging.handlers
       
    26 import logging.config
       
    27 
       
    28 import copy
       
    29 import cPickle
       
    30 import cStringIO
       
    31 import gc
       
    32 import os
       
    33 import re
       
    34 import select
       
    35 import socket
       
    36 from SocketServer import ThreadingTCPServer, StreamRequestHandler
       
    37 import string
       
    38 import struct
       
    39 import sys
       
    40 import tempfile
       
    41 from test.test_support import captured_stdout, run_with_locale, run_unittest
       
    42 import textwrap
       
    43 import threading
       
    44 import time
       
    45 import types
       
    46 import unittest
       
    47 import weakref
       
    48 
       
    49 
       
    50 class BaseTest(unittest.TestCase):
       
    51 
       
    52     """Base class for logging tests."""
       
    53 
       
    54     log_format = "%(name)s -> %(levelname)s: %(message)s"
       
    55     expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
       
    56     message_num = 0
       
    57 
       
    58     def setUp(self):
       
    59         """Setup the default logging stream to an internal StringIO instance,
       
    60         so that we can examine log output as we want."""
       
    61         logger_dict = logging.getLogger().manager.loggerDict
       
    62         logging._acquireLock()
       
    63         try:
       
    64             self.saved_handlers = logging._handlers.copy()
       
    65             self.saved_handler_list = logging._handlerList[:]
       
    66             self.saved_loggers = logger_dict.copy()
       
    67             self.saved_level_names = logging._levelNames.copy()
       
    68         finally:
       
    69             logging._releaseLock()
       
    70 
       
    71         self.root_logger = logging.getLogger("")
       
    72         self.original_logging_level = self.root_logger.getEffectiveLevel()
       
    73 
       
    74         self.stream = cStringIO.StringIO()
       
    75         self.root_logger.setLevel(logging.DEBUG)
       
    76         self.root_hdlr = logging.StreamHandler(self.stream)
       
    77         self.root_formatter = logging.Formatter(self.log_format)
       
    78         self.root_hdlr.setFormatter(self.root_formatter)
       
    79         self.root_logger.addHandler(self.root_hdlr)
       
    80 
       
    81     def tearDown(self):
       
    82         """Remove our logging stream, and restore the original logging
       
    83         level."""
       
    84         self.stream.close()
       
    85         self.root_logger.removeHandler(self.root_hdlr)
       
    86         self.root_logger.setLevel(self.original_logging_level)
       
    87         logging._acquireLock()
       
    88         try:
       
    89             logging._levelNames.clear()
       
    90             logging._levelNames.update(self.saved_level_names)
       
    91             logging._handlers.clear()
       
    92             logging._handlers.update(self.saved_handlers)
       
    93             logging._handlerList[:] = self.saved_handler_list
       
    94             loggerDict = logging.getLogger().manager.loggerDict
       
    95             loggerDict.clear()
       
    96             loggerDict.update(self.saved_loggers)
       
    97         finally:
       
    98             logging._releaseLock()
       
    99 
       
   100     def assert_log_lines(self, expected_values, stream=None):
       
   101         """Match the collected log lines against the regular expression
       
   102         self.expected_log_pat, and compare the extracted group values to
       
   103         the expected_values list of tuples."""
       
   104         stream = stream or self.stream
       
   105         pat = re.compile(self.expected_log_pat)
       
   106         try:
       
   107             stream.reset()
       
   108             actual_lines = stream.readlines()
       
   109         except AttributeError:
       
   110             # StringIO.StringIO lacks a reset() method.
       
   111             actual_lines = stream.getvalue().splitlines()
       
   112         self.assertEquals(len(actual_lines), len(expected_values))
       
   113         for actual, expected in zip(actual_lines, expected_values):
       
   114             match = pat.search(actual)
       
   115             if not match:
       
   116                 self.fail("Log line does not match expected pattern:\n" +
       
   117                             actual)
       
   118             self.assertEquals(tuple(match.groups()), expected)
       
   119         s = stream.read()
       
   120         if s:
       
   121             self.fail("Remaining output at end of log stream:\n" + s)
       
   122 
       
   123     def next_message(self):
       
   124         """Generate a message consisting solely of an auto-incrementing
       
   125         integer."""
       
   126         self.message_num += 1
       
   127         return "%d" % self.message_num
       
   128 
       
   129 
       
   130 class BuiltinLevelsTest(BaseTest):
       
   131     """Test builtin levels and their inheritance."""
       
   132 
       
   133     def test_flat(self):
       
   134         #Logging levels in a flat logger namespace.
       
   135         m = self.next_message
       
   136 
       
   137         ERR = logging.getLogger("ERR")
       
   138         ERR.setLevel(logging.ERROR)
       
   139         INF = logging.getLogger("INF")
       
   140         INF.setLevel(logging.INFO)
       
   141         DEB = logging.getLogger("DEB")
       
   142         DEB.setLevel(logging.DEBUG)
       
   143 
       
   144         # These should log.
       
   145         ERR.log(logging.CRITICAL, m())
       
   146         ERR.error(m())
       
   147 
       
   148         INF.log(logging.CRITICAL, m())
       
   149         INF.error(m())
       
   150         INF.warn(m())
       
   151         INF.info(m())
       
   152 
       
   153         DEB.log(logging.CRITICAL, m())
       
   154         DEB.error(m())
       
   155         DEB.warn (m())
       
   156         DEB.info (m())
       
   157         DEB.debug(m())
       
   158 
       
   159         # These should not log.
       
   160         ERR.warn(m())
       
   161         ERR.info(m())
       
   162         ERR.debug(m())
       
   163 
       
   164         INF.debug(m())
       
   165 
       
   166         self.assert_log_lines([
       
   167             ('ERR', 'CRITICAL', '1'),
       
   168             ('ERR', 'ERROR', '2'),
       
   169             ('INF', 'CRITICAL', '3'),
       
   170             ('INF', 'ERROR', '4'),
       
   171             ('INF', 'WARNING', '5'),
       
   172             ('INF', 'INFO', '6'),
       
   173             ('DEB', 'CRITICAL', '7'),
       
   174             ('DEB', 'ERROR', '8'),
       
   175             ('DEB', 'WARNING', '9'),
       
   176             ('DEB', 'INFO', '10'),
       
   177             ('DEB', 'DEBUG', '11'),
       
   178         ])
       
   179 
       
   180     def test_nested_explicit(self):
       
   181         # Logging levels in a nested namespace, all explicitly set.
       
   182         m = self.next_message
       
   183 
       
   184         INF = logging.getLogger("INF")
       
   185         INF.setLevel(logging.INFO)
       
   186         INF_ERR  = logging.getLogger("INF.ERR")
       
   187         INF_ERR.setLevel(logging.ERROR)
       
   188 
       
   189         # These should log.
       
   190         INF_ERR.log(logging.CRITICAL, m())
       
   191         INF_ERR.error(m())
       
   192 
       
   193         # These should not log.
       
   194         INF_ERR.warn(m())
       
   195         INF_ERR.info(m())
       
   196         INF_ERR.debug(m())
       
   197 
       
   198         self.assert_log_lines([
       
   199             ('INF.ERR', 'CRITICAL', '1'),
       
   200             ('INF.ERR', 'ERROR', '2'),
       
   201         ])
       
   202 
       
   203     def test_nested_inherited(self):
       
   204         #Logging levels in a nested namespace, inherited from parent loggers.
       
   205         m = self.next_message
       
   206 
       
   207         INF = logging.getLogger("INF")
       
   208         INF.setLevel(logging.INFO)
       
   209         INF_ERR  = logging.getLogger("INF.ERR")
       
   210         INF_ERR.setLevel(logging.ERROR)
       
   211         INF_UNDEF = logging.getLogger("INF.UNDEF")
       
   212         INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
       
   213         UNDEF = logging.getLogger("UNDEF")
       
   214 
       
   215         # These should log.
       
   216         INF_UNDEF.log(logging.CRITICAL, m())
       
   217         INF_UNDEF.error(m())
       
   218         INF_UNDEF.warn(m())
       
   219         INF_UNDEF.info(m())
       
   220         INF_ERR_UNDEF.log(logging.CRITICAL, m())
       
   221         INF_ERR_UNDEF.error(m())
       
   222 
       
   223         # These should not log.
       
   224         INF_UNDEF.debug(m())
       
   225         INF_ERR_UNDEF.warn(m())
       
   226         INF_ERR_UNDEF.info(m())
       
   227         INF_ERR_UNDEF.debug(m())
       
   228 
       
   229         self.assert_log_lines([
       
   230             ('INF.UNDEF', 'CRITICAL', '1'),
       
   231             ('INF.UNDEF', 'ERROR', '2'),
       
   232             ('INF.UNDEF', 'WARNING', '3'),
       
   233             ('INF.UNDEF', 'INFO', '4'),
       
   234             ('INF.ERR.UNDEF', 'CRITICAL', '5'),
       
   235             ('INF.ERR.UNDEF', 'ERROR', '6'),
       
   236         ])
       
   237 
       
   238     def test_nested_with_virtual_parent(self):
       
   239         # Logging levels when some parent does not exist yet.
       
   240         m = self.next_message
       
   241 
       
   242         INF = logging.getLogger("INF")
       
   243         GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
       
   244         CHILD = logging.getLogger("INF.BADPARENT")
       
   245         INF.setLevel(logging.INFO)
       
   246 
       
   247         # These should log.
       
   248         GRANDCHILD.log(logging.FATAL, m())
       
   249         GRANDCHILD.info(m())
       
   250         CHILD.log(logging.FATAL, m())
       
   251         CHILD.info(m())
       
   252 
       
   253         # These should not log.
       
   254         GRANDCHILD.debug(m())
       
   255         CHILD.debug(m())
       
   256 
       
   257         self.assert_log_lines([
       
   258             ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
       
   259             ('INF.BADPARENT.UNDEF', 'INFO', '2'),
       
   260             ('INF.BADPARENT', 'CRITICAL', '3'),
       
   261             ('INF.BADPARENT', 'INFO', '4'),
       
   262         ])
       
   263 
       
   264 
       
   265 class BasicFilterTest(BaseTest):
       
   266 
       
   267     """Test the bundled Filter class."""
       
   268 
       
   269     def test_filter(self):
       
   270         # Only messages satisfying the specified criteria pass through the
       
   271         #  filter.
       
   272         filter_ = logging.Filter("spam.eggs")
       
   273         handler = self.root_logger.handlers[0]
       
   274         try:
       
   275             handler.addFilter(filter_)
       
   276             spam = logging.getLogger("spam")
       
   277             spam_eggs = logging.getLogger("spam.eggs")
       
   278             spam_eggs_fish = logging.getLogger("spam.eggs.fish")
       
   279             spam_bakedbeans = logging.getLogger("spam.bakedbeans")
       
   280 
       
   281             spam.info(self.next_message())
       
   282             spam_eggs.info(self.next_message())  # Good.
       
   283             spam_eggs_fish.info(self.next_message())  # Good.
       
   284             spam_bakedbeans.info(self.next_message())
       
   285 
       
   286             self.assert_log_lines([
       
   287                 ('spam.eggs', 'INFO', '2'),
       
   288                 ('spam.eggs.fish', 'INFO', '3'),
       
   289             ])
       
   290         finally:
       
   291             handler.removeFilter(filter_)
       
   292 
       
   293 
       
   294 #
       
   295 #   First, we define our levels. There can be as many as you want - the only
       
   296 #     limitations are that they should be integers, the lowest should be > 0 and
       
   297 #   larger values mean less information being logged. If you need specific
       
   298 #   level values which do not fit into these limitations, you can use a
       
   299 #   mapping dictionary to convert between your application levels and the
       
   300 #   logging system.
       
   301 #
       
   302 SILENT      = 120
       
   303 TACITURN    = 119
       
   304 TERSE       = 118
       
   305 EFFUSIVE    = 117
       
   306 SOCIABLE    = 116
       
   307 VERBOSE     = 115
       
   308 TALKATIVE   = 114
       
   309 GARRULOUS   = 113
       
   310 CHATTERBOX  = 112
       
   311 BORING      = 111
       
   312 
       
   313 LEVEL_RANGE = range(BORING, SILENT + 1)
       
   314 
       
   315 #
       
   316 #   Next, we define names for our levels. You don't need to do this - in which
       
   317 #   case the system will use "Level n" to denote the text for the level.
       
   318 #
       
   319 my_logging_levels = {
       
   320     SILENT      : 'Silent',
       
   321     TACITURN    : 'Taciturn',
       
   322     TERSE       : 'Terse',
       
   323     EFFUSIVE    : 'Effusive',
       
   324     SOCIABLE    : 'Sociable',
       
   325     VERBOSE     : 'Verbose',
       
   326     TALKATIVE   : 'Talkative',
       
   327     GARRULOUS   : 'Garrulous',
       
   328     CHATTERBOX  : 'Chatterbox',
       
   329     BORING      : 'Boring',
       
   330 }
       
   331 
       
   332 class GarrulousFilter(logging.Filter):
       
   333 
       
   334     """A filter which blocks garrulous messages."""
       
   335 
       
   336     def filter(self, record):
       
   337         return record.levelno != GARRULOUS
       
   338 
       
   339 class VerySpecificFilter(logging.Filter):
       
   340 
       
   341     """A filter which blocks sociable and taciturn messages."""
       
   342 
       
   343     def filter(self, record):
       
   344         return record.levelno not in [SOCIABLE, TACITURN]
       
   345 
       
   346 
       
   347 class CustomLevelsAndFiltersTest(BaseTest):
       
   348 
       
   349     """Test various filtering possibilities with custom logging levels."""
       
   350 
       
   351     # Skip the logger name group.
       
   352     expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
       
   353 
       
   354     def setUp(self):
       
   355         BaseTest.setUp(self)
       
   356         for k, v in my_logging_levels.items():
       
   357             logging.addLevelName(k, v)
       
   358 
       
   359     def log_at_all_levels(self, logger):
       
   360         for lvl in LEVEL_RANGE:
       
   361             logger.log(lvl, self.next_message())
       
   362 
       
   363     def test_logger_filter(self):
       
   364         # Filter at logger level.
       
   365         self.root_logger.setLevel(VERBOSE)
       
   366         # Levels >= 'Verbose' are good.
       
   367         self.log_at_all_levels(self.root_logger)
       
   368         self.assert_log_lines([
       
   369             ('Verbose', '5'),
       
   370             ('Sociable', '6'),
       
   371             ('Effusive', '7'),
       
   372             ('Terse', '8'),
       
   373             ('Taciturn', '9'),
       
   374             ('Silent', '10'),
       
   375         ])
       
   376 
       
   377     def test_handler_filter(self):
       
   378         # Filter at handler level.
       
   379         self.root_logger.handlers[0].setLevel(SOCIABLE)
       
   380         try:
       
   381             # Levels >= 'Sociable' are good.
       
   382             self.log_at_all_levels(self.root_logger)
       
   383             self.assert_log_lines([
       
   384                 ('Sociable', '6'),
       
   385                 ('Effusive', '7'),
       
   386                 ('Terse', '8'),
       
   387                 ('Taciturn', '9'),
       
   388                 ('Silent', '10'),
       
   389             ])
       
   390         finally:
       
   391             self.root_logger.handlers[0].setLevel(logging.NOTSET)
       
   392 
       
   393     def test_specific_filters(self):
       
   394         # Set a specific filter object on the handler, and then add another
       
   395         #  filter object on the logger itself.
       
   396         handler = self.root_logger.handlers[0]
       
   397         specific_filter = None
       
   398         garr = GarrulousFilter()
       
   399         handler.addFilter(garr)
       
   400         try:
       
   401             self.log_at_all_levels(self.root_logger)
       
   402             first_lines = [
       
   403                 # Notice how 'Garrulous' is missing
       
   404                 ('Boring', '1'),
       
   405                 ('Chatterbox', '2'),
       
   406                 ('Talkative', '4'),
       
   407                 ('Verbose', '5'),
       
   408                 ('Sociable', '6'),
       
   409                 ('Effusive', '7'),
       
   410                 ('Terse', '8'),
       
   411                 ('Taciturn', '9'),
       
   412                 ('Silent', '10'),
       
   413             ]
       
   414             self.assert_log_lines(first_lines)
       
   415 
       
   416             specific_filter = VerySpecificFilter()
       
   417             self.root_logger.addFilter(specific_filter)
       
   418             self.log_at_all_levels(self.root_logger)
       
   419             self.assert_log_lines(first_lines + [
       
   420                 # Not only 'Garrulous' is still missing, but also 'Sociable'
       
   421                 # and 'Taciturn'
       
   422                 ('Boring', '11'),
       
   423                 ('Chatterbox', '12'),
       
   424                 ('Talkative', '14'),
       
   425                 ('Verbose', '15'),
       
   426                 ('Effusive', '17'),
       
   427                 ('Terse', '18'),
       
   428                 ('Silent', '20'),
       
   429         ])
       
   430         finally:
       
   431             if specific_filter:
       
   432                 self.root_logger.removeFilter(specific_filter)
       
   433             handler.removeFilter(garr)
       
   434 
       
   435 
       
   436 class MemoryHandlerTest(BaseTest):
       
   437 
       
   438     """Tests for the MemoryHandler."""
       
   439 
       
   440     # Do not bother with a logger name group.
       
   441     expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
       
   442 
       
   443     def setUp(self):
       
   444         BaseTest.setUp(self)
       
   445         self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
       
   446                                                         self.root_hdlr)
       
   447         self.mem_logger = logging.getLogger('mem')
       
   448         self.mem_logger.propagate = 0
       
   449         self.mem_logger.addHandler(self.mem_hdlr)
       
   450 
       
   451     def tearDown(self):
       
   452         self.mem_hdlr.close()
       
   453         BaseTest.tearDown(self)
       
   454 
       
   455     def test_flush(self):
       
   456         # The memory handler flushes to its target handler based on specific
       
   457         #  criteria (message count and message level).
       
   458         self.mem_logger.debug(self.next_message())
       
   459         self.assert_log_lines([])
       
   460         self.mem_logger.info(self.next_message())
       
   461         self.assert_log_lines([])
       
   462         # This will flush because the level is >= logging.WARNING
       
   463         self.mem_logger.warn(self.next_message())
       
   464         lines = [
       
   465             ('DEBUG', '1'),
       
   466             ('INFO', '2'),
       
   467             ('WARNING', '3'),
       
   468         ]
       
   469         self.assert_log_lines(lines)
       
   470         for n in (4, 14):
       
   471             for i in range(9):
       
   472                 self.mem_logger.debug(self.next_message())
       
   473             self.assert_log_lines(lines)
       
   474             # This will flush because it's the 10th message since the last
       
   475             #  flush.
       
   476             self.mem_logger.debug(self.next_message())
       
   477             lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
       
   478             self.assert_log_lines(lines)
       
   479 
       
   480         self.mem_logger.debug(self.next_message())
       
   481         self.assert_log_lines(lines)
       
   482 
       
   483 
       
   484 class ExceptionFormatter(logging.Formatter):
       
   485     """A special exception formatter."""
       
   486     def formatException(self, ei):
       
   487         return "Got a [%s]" % ei[0].__name__
       
   488 
       
   489 
       
   490 class ConfigFileTest(BaseTest):
       
   491 
       
   492     """Reading logging config from a .ini-style config file."""
       
   493 
       
   494     expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
       
   495 
       
   496     # config0 is a standard configuration.
       
   497     config0 = """
       
   498     [loggers]
       
   499     keys=root
       
   500 
       
   501     [handlers]
       
   502     keys=hand1
       
   503 
       
   504     [formatters]
       
   505     keys=form1
       
   506 
       
   507     [logger_root]
       
   508     level=WARNING
       
   509     handlers=hand1
       
   510 
       
   511     [handler_hand1]
       
   512     class=StreamHandler
       
   513     level=NOTSET
       
   514     formatter=form1
       
   515     args=(sys.stdout,)
       
   516 
       
   517     [formatter_form1]
       
   518     format=%(levelname)s ++ %(message)s
       
   519     datefmt=
       
   520     """
       
   521 
       
   522     # config1 adds a little to the standard configuration.
       
   523     config1 = """
       
   524     [loggers]
       
   525     keys=root,parser
       
   526 
       
   527     [handlers]
       
   528     keys=hand1
       
   529 
       
   530     [formatters]
       
   531     keys=form1
       
   532 
       
   533     [logger_root]
       
   534     level=WARNING
       
   535     handlers=
       
   536 
       
   537     [logger_parser]
       
   538     level=DEBUG
       
   539     handlers=hand1
       
   540     propagate=1
       
   541     qualname=compiler.parser
       
   542 
       
   543     [handler_hand1]
       
   544     class=StreamHandler
       
   545     level=NOTSET
       
   546     formatter=form1
       
   547     args=(sys.stdout,)
       
   548 
       
   549     [formatter_form1]
       
   550     format=%(levelname)s ++ %(message)s
       
   551     datefmt=
       
   552     """
       
   553 
       
   554     # config2 has a subtle configuration error that should be reported
       
   555     config2 = config1.replace("sys.stdout", "sys.stbout")
       
   556 
       
   557     # config3 has a less subtle configuration error
       
   558     config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
       
   559 
       
   560     # config4 specifies a custom formatter class to be loaded
       
   561     config4 = """
       
   562     [loggers]
       
   563     keys=root
       
   564 
       
   565     [handlers]
       
   566     keys=hand1
       
   567 
       
   568     [formatters]
       
   569     keys=form1
       
   570 
       
   571     [logger_root]
       
   572     level=NOTSET
       
   573     handlers=hand1
       
   574 
       
   575     [handler_hand1]
       
   576     class=StreamHandler
       
   577     level=NOTSET
       
   578     formatter=form1
       
   579     args=(sys.stdout,)
       
   580 
       
   581     [formatter_form1]
       
   582     class=""" + __name__ + """.ExceptionFormatter
       
   583     format=%(levelname)s:%(name)s:%(message)s
       
   584     datefmt=
       
   585     """
       
   586 
       
   587     # config5 specifies a custom handler class to be loaded
       
   588     config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
       
   589 
       
   590     # config6 uses ', ' delimiters in the handlers and formatters sections
       
   591     config6 = """
       
   592     [loggers]
       
   593     keys=root,parser
       
   594 
       
   595     [handlers]
       
   596     keys=hand1, hand2
       
   597 
       
   598     [formatters]
       
   599     keys=form1, form2
       
   600 
       
   601     [logger_root]
       
   602     level=WARNING
       
   603     handlers=
       
   604 
       
   605     [logger_parser]
       
   606     level=DEBUG
       
   607     handlers=hand1
       
   608     propagate=1
       
   609     qualname=compiler.parser
       
   610 
       
   611     [handler_hand1]
       
   612     class=StreamHandler
       
   613     level=NOTSET
       
   614     formatter=form1
       
   615     args=(sys.stdout,)
       
   616 
       
   617     [handler_hand2]
       
   618     class=StreamHandler
       
   619     level=NOTSET
       
   620     formatter=form1
       
   621     args=(sys.stderr,)
       
   622 
       
   623     [formatter_form1]
       
   624     format=%(levelname)s ++ %(message)s
       
   625     datefmt=
       
   626 
       
   627     [formatter_form2]
       
   628     format=%(message)s
       
   629     datefmt=
       
   630     """
       
   631 
       
   632     def apply_config(self, conf):
       
   633         try:
       
   634             fn = tempfile.mktemp(".ini")
       
   635             f = open(fn, "w")
       
   636             f.write(textwrap.dedent(conf))
       
   637             f.close()
       
   638             logging.config.fileConfig(fn)
       
   639         finally:
       
   640             os.remove(fn)
       
   641 
       
   642     def test_config0_ok(self):
       
   643         # A simple config file which overrides the default settings.
       
   644         with captured_stdout() as output:
       
   645             self.apply_config(self.config0)
       
   646             logger = logging.getLogger()
       
   647             # Won't output anything
       
   648             logger.info(self.next_message())
       
   649             # Outputs a message
       
   650             logger.error(self.next_message())
       
   651             self.assert_log_lines([
       
   652                 ('ERROR', '2'),
       
   653             ], stream=output)
       
   654             # Original logger output is empty.
       
   655             self.assert_log_lines([])
       
   656 
       
   657     def test_config1_ok(self, config=config1):
       
   658         # A config file defining a sub-parser as well.
       
   659         with captured_stdout() as output:
       
   660             self.apply_config(config)
       
   661             logger = logging.getLogger("compiler.parser")
       
   662             # Both will output a message
       
   663             logger.info(self.next_message())
       
   664             logger.error(self.next_message())
       
   665             self.assert_log_lines([
       
   666                 ('INFO', '1'),
       
   667                 ('ERROR', '2'),
       
   668             ], stream=output)
       
   669             # Original logger output is empty.
       
   670             self.assert_log_lines([])
       
   671 
       
   672     def test_config2_failure(self):
       
   673         # A simple config file which overrides the default settings.
       
   674         self.assertRaises(StandardError, self.apply_config, self.config2)
       
   675 
       
   676     def test_config3_failure(self):
       
   677         # A simple config file which overrides the default settings.
       
   678         self.assertRaises(StandardError, self.apply_config, self.config3)
       
   679 
       
   680     def test_config4_ok(self):
       
   681         # A config file specifying a custom formatter class.
       
   682         with captured_stdout() as output:
       
   683             self.apply_config(self.config4)
       
   684             logger = logging.getLogger()
       
   685             try:
       
   686                 raise RuntimeError()
       
   687             except RuntimeError:
       
   688                 logging.exception("just testing")
       
   689             sys.stdout.seek(0)
       
   690             self.assertEquals(output.getvalue(),
       
   691                 "ERROR:root:just testing\nGot a [RuntimeError]\n")
       
   692             # Original logger output is empty
       
   693             self.assert_log_lines([])
       
   694 
       
   695     def test_config5_ok(self):
       
   696         self.test_config1_ok(config=self.config5)
       
   697 
       
   698     def test_config6_ok(self):
       
   699         self.test_config1_ok(config=self.config6)
       
   700 
       
   701 class LogRecordStreamHandler(StreamRequestHandler):
       
   702 
       
   703     """Handler for a streaming logging request. It saves the log message in the
       
   704     TCP server's 'log_output' attribute."""
       
   705 
       
   706     TCP_LOG_END = "!!!END!!!"
       
   707 
       
   708     def handle(self):
       
   709         """Handle multiple requests - each expected to be of 4-byte length,
       
   710         followed by the LogRecord in pickle format. Logs the record
       
   711         according to whatever policy is configured locally."""
       
   712         while True:
       
   713             chunk = self.connection.recv(4)
       
   714             if len(chunk) < 4:
       
   715                 break
       
   716             slen = struct.unpack(">L", chunk)[0]
       
   717             chunk = self.connection.recv(slen)
       
   718             while len(chunk) < slen:
       
   719                 chunk = chunk + self.connection.recv(slen - len(chunk))
       
   720             obj = self.unpickle(chunk)
       
   721             record = logging.makeLogRecord(obj)
       
   722             self.handle_log_record(record)
       
   723 
       
   724     def unpickle(self, data):
       
   725         return cPickle.loads(data)
       
   726 
       
   727     def handle_log_record(self, record):
       
   728         # If the end-of-messages sentinel is seen, tell the server to
       
   729         #  terminate.
       
   730         if self.TCP_LOG_END in record.msg:
       
   731             self.server.abort = 1
       
   732             return
       
   733         self.server.log_output += record.msg + "\n"
       
   734 
       
   735 
       
   736 class LogRecordSocketReceiver(ThreadingTCPServer):
       
   737 
       
   738     """A simple-minded TCP socket-based logging receiver suitable for test
       
   739     purposes."""
       
   740 
       
   741     allow_reuse_address = 1
       
   742     log_output = ""
       
   743 
       
   744     def __init__(self, host='localhost',
       
   745                              port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
       
   746                      handler=LogRecordStreamHandler):
       
   747         ThreadingTCPServer.__init__(self, (host, port), handler)
       
   748         self.abort = False
       
   749         self.timeout = 0.1
       
   750         self.finished = threading.Event()
       
   751 
       
   752     def serve_until_stopped(self):
       
   753         while not self.abort:
       
   754             rd, wr, ex = select.select([self.socket.fileno()], [], [],
       
   755                                        self.timeout)
       
   756             if rd:
       
   757                 self.handle_request()
       
   758         # Notify the main thread that we're about to exit
       
   759         self.finished.set()
       
   760         # close the listen socket
       
   761         self.server_close()
       
   762 
       
   763 
       
   764 class SocketHandlerTest(BaseTest):
       
   765 
       
   766     """Test for SocketHandler objects."""
       
   767 
       
   768     def setUp(self):
       
   769         """Set up a TCP server to receive log messages, and a SocketHandler
       
   770         pointing to that server's address and port."""
       
   771         BaseTest.setUp(self)
       
   772         self.tcpserver = LogRecordSocketReceiver(port=0)
       
   773         self.port = self.tcpserver.socket.getsockname()[1]
       
   774         self.threads = [
       
   775                 threading.Thread(target=self.tcpserver.serve_until_stopped)]
       
   776         for thread in self.threads:
       
   777             thread.start()
       
   778 
       
   779         self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
       
   780         self.sock_hdlr.setFormatter(self.root_formatter)
       
   781         self.root_logger.removeHandler(self.root_logger.handlers[0])
       
   782         self.root_logger.addHandler(self.sock_hdlr)
       
   783 
       
   784     def tearDown(self):
       
   785         """Shutdown the TCP server."""
       
   786         try:
       
   787             self.tcpserver.abort = True
       
   788             del self.tcpserver
       
   789             self.root_logger.removeHandler(self.sock_hdlr)
       
   790             self.sock_hdlr.close()
       
   791             for thread in self.threads:
       
   792                 thread.join(2.0)
       
   793         finally:
       
   794             BaseTest.tearDown(self)
       
   795 
       
   796     def get_output(self):
       
   797         """Get the log output as received by the TCP server."""
       
   798         # Signal the TCP receiver and wait for it to terminate.
       
   799         self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
       
   800         self.tcpserver.finished.wait(2.0)
       
   801         return self.tcpserver.log_output
       
   802 
       
   803     def test_output(self):
       
   804         # The log message sent to the SocketHandler is properly received.
       
   805         logger = logging.getLogger("tcp")
       
   806         logger.error("spam")
       
   807         logger.debug("eggs")
       
   808         self.assertEquals(self.get_output(), "spam\neggs\n")
       
   809 
       
   810 
       
   811 class MemoryTest(BaseTest):
       
   812 
       
   813     """Test memory persistence of logger objects."""
       
   814 
       
   815     def setUp(self):
       
   816         """Create a dict to remember potentially destroyed objects."""
       
   817         BaseTest.setUp(self)
       
   818         self._survivors = {}
       
   819 
       
   820     def _watch_for_survival(self, *args):
       
   821         """Watch the given objects for survival, by creating weakrefs to
       
   822         them."""
       
   823         for obj in args:
       
   824             key = id(obj), repr(obj)
       
   825             self._survivors[key] = weakref.ref(obj)
       
   826 
       
   827     def _assert_survival(self):
       
   828         """Assert that all objects watched for survival have survived."""
       
   829         # Trigger cycle breaking.
       
   830         gc.collect()
       
   831         dead = []
       
   832         for (id_, repr_), ref in self._survivors.items():
       
   833             if ref() is None:
       
   834                 dead.append(repr_)
       
   835         if dead:
       
   836             self.fail("%d objects should have survived "
       
   837                 "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
       
   838 
       
   839     def test_persistent_loggers(self):
       
   840         # Logger objects are persistent and retain their configuration, even
       
   841         #  if visible references are destroyed.
       
   842         self.root_logger.setLevel(logging.INFO)
       
   843         foo = logging.getLogger("foo")
       
   844         self._watch_for_survival(foo)
       
   845         foo.setLevel(logging.DEBUG)
       
   846         self.root_logger.debug(self.next_message())
       
   847         foo.debug(self.next_message())
       
   848         self.assert_log_lines([
       
   849             ('foo', 'DEBUG', '2'),
       
   850         ])
       
   851         del foo
       
   852         # foo has survived.
       
   853         self._assert_survival()
       
   854         # foo has retained its settings.
       
   855         bar = logging.getLogger("foo")
       
   856         bar.debug(self.next_message())
       
   857         self.assert_log_lines([
       
   858             ('foo', 'DEBUG', '2'),
       
   859             ('foo', 'DEBUG', '3'),
       
   860         ])
       
   861 
       
   862 class EncodingTest(BaseTest):
       
   863     def test_encoding_plain_file(self):
       
   864         # In Python 2.x, a plain file object is treated as having no encoding.
       
   865         log = logging.getLogger("test")
       
   866         fn = tempfile.mktemp(".log")
       
   867         # the non-ascii data we write to the log.
       
   868         data = "foo\x80"
       
   869         try:
       
   870             handler = logging.FileHandler(fn)
       
   871             log.addHandler(handler)
       
   872             try:
       
   873                 # write non-ascii data to the log.
       
   874                 log.warning(data)
       
   875             finally:
       
   876                 log.removeHandler(handler)
       
   877                 handler.close()
       
   878             # check we wrote exactly those bytes, ignoring trailing \n etc
       
   879             f = open(fn)
       
   880             try:
       
   881                 self.failUnlessEqual(f.read().rstrip(), data)
       
   882             finally:
       
   883                 f.close()
       
   884         finally:
       
   885             if os.path.isfile(fn):
       
   886                 os.remove(fn)
       
   887 
       
   888 # Set the locale to the platform-dependent default.  I have no idea
       
   889 # why the test does this, but in any case we save the current locale
       
   890 # first and restore it at the end.
       
   891 @run_with_locale('LC_ALL', '')
       
   892 def test_main():
       
   893     run_unittest(BuiltinLevelsTest, BasicFilterTest,
       
   894                     CustomLevelsAndFiltersTest, MemoryHandlerTest,
       
   895                     ConfigFileTest, SocketHandlerTest, MemoryTest,
       
   896                     EncodingTest)
       
   897 
       
   898 if __name__ == "__main__":
       
   899     test_main()