symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_logging.py
changeset 1 2fb8b9db1c86
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_logging.py	Fri Jul 31 15:01:17 2009 +0100
@@ -0,0 +1,899 @@
+#!/usr/bin/env python
+#
+# Copyright 2001-2004 by Vinay Sajip. All Rights Reserved.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose and without fee is hereby granted,
+# provided that the above copyright notice appear in all copies and that
+# both that copyright notice and this permission notice appear in
+# supporting documentation, and that the name of Vinay Sajip
+# not be used in advertising or publicity pertaining to distribution
+# of the software without specific, written prior permission.
+# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
+# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
+# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
+# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""Test harness for the logging module. Run all tests.
+
+Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
+"""
+
+import logging
+import logging.handlers
+import logging.config
+
+import copy
+import cPickle
+import cStringIO
+import gc
+import os
+import re
+import select
+import socket
+from SocketServer import ThreadingTCPServer, StreamRequestHandler
+import string
+import struct
+import sys
+import tempfile
+from test.test_support import captured_stdout, run_with_locale, run_unittest
+import textwrap
+import threading
+import time
+import types
+import unittest
+import weakref
+
+
+class BaseTest(unittest.TestCase):
+
+    """Base class for logging tests."""
+
+    log_format = "%(name)s -> %(levelname)s: %(message)s"
+    expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
+    message_num = 0
+
+    def setUp(self):
+        """Setup the default logging stream to an internal StringIO instance,
+        so that we can examine log output as we want."""
+        logger_dict = logging.getLogger().manager.loggerDict
+        logging._acquireLock()
+        try:
+            self.saved_handlers = logging._handlers.copy()
+            self.saved_handler_list = logging._handlerList[:]
+            self.saved_loggers = logger_dict.copy()
+            self.saved_level_names = logging._levelNames.copy()
+        finally:
+            logging._releaseLock()
+
+        self.root_logger = logging.getLogger("")
+        self.original_logging_level = self.root_logger.getEffectiveLevel()
+
+        self.stream = cStringIO.StringIO()
+        self.root_logger.setLevel(logging.DEBUG)
+        self.root_hdlr = logging.StreamHandler(self.stream)
+        self.root_formatter = logging.Formatter(self.log_format)
+        self.root_hdlr.setFormatter(self.root_formatter)
+        self.root_logger.addHandler(self.root_hdlr)
+
+    def tearDown(self):
+        """Remove our logging stream, and restore the original logging
+        level."""
+        self.stream.close()
+        self.root_logger.removeHandler(self.root_hdlr)
+        self.root_logger.setLevel(self.original_logging_level)
+        logging._acquireLock()
+        try:
+            logging._levelNames.clear()
+            logging._levelNames.update(self.saved_level_names)
+            logging._handlers.clear()
+            logging._handlers.update(self.saved_handlers)
+            logging._handlerList[:] = self.saved_handler_list
+            loggerDict = logging.getLogger().manager.loggerDict
+            loggerDict.clear()
+            loggerDict.update(self.saved_loggers)
+        finally:
+            logging._releaseLock()
+
+    def assert_log_lines(self, expected_values, stream=None):
+        """Match the collected log lines against the regular expression
+        self.expected_log_pat, and compare the extracted group values to
+        the expected_values list of tuples."""
+        stream = stream or self.stream
+        pat = re.compile(self.expected_log_pat)
+        try:
+            stream.reset()
+            actual_lines = stream.readlines()
+        except AttributeError:
+            # StringIO.StringIO lacks a reset() method.
+            actual_lines = stream.getvalue().splitlines()
+        self.assertEquals(len(actual_lines), len(expected_values))
+        for actual, expected in zip(actual_lines, expected_values):
+            match = pat.search(actual)
+            if not match:
+                self.fail("Log line does not match expected pattern:\n" +
+                            actual)
+            self.assertEquals(tuple(match.groups()), expected)
+        s = stream.read()
+        if s:
+            self.fail("Remaining output at end of log stream:\n" + s)
+
+    def next_message(self):
+        """Generate a message consisting solely of an auto-incrementing
+        integer."""
+        self.message_num += 1
+        return "%d" % self.message_num
+
+
+class BuiltinLevelsTest(BaseTest):
+    """Test builtin levels and their inheritance."""
+
+    def test_flat(self):
+        #Logging levels in a flat logger namespace.
+        m = self.next_message
+
+        ERR = logging.getLogger("ERR")
+        ERR.setLevel(logging.ERROR)
+        INF = logging.getLogger("INF")
+        INF.setLevel(logging.INFO)
+        DEB = logging.getLogger("DEB")
+        DEB.setLevel(logging.DEBUG)
+
+        # These should log.
+        ERR.log(logging.CRITICAL, m())
+        ERR.error(m())
+
+        INF.log(logging.CRITICAL, m())
+        INF.error(m())
+        INF.warn(m())
+        INF.info(m())
+
+        DEB.log(logging.CRITICAL, m())
+        DEB.error(m())
+        DEB.warn (m())
+        DEB.info (m())
+        DEB.debug(m())
+
+        # These should not log.
+        ERR.warn(m())
+        ERR.info(m())
+        ERR.debug(m())
+
+        INF.debug(m())
+
+        self.assert_log_lines([
+            ('ERR', 'CRITICAL', '1'),
+            ('ERR', 'ERROR', '2'),
+            ('INF', 'CRITICAL', '3'),
+            ('INF', 'ERROR', '4'),
+            ('INF', 'WARNING', '5'),
+            ('INF', 'INFO', '6'),
+            ('DEB', 'CRITICAL', '7'),
+            ('DEB', 'ERROR', '8'),
+            ('DEB', 'WARNING', '9'),
+            ('DEB', 'INFO', '10'),
+            ('DEB', 'DEBUG', '11'),
+        ])
+
+    def test_nested_explicit(self):
+        # Logging levels in a nested namespace, all explicitly set.
+        m = self.next_message
+
+        INF = logging.getLogger("INF")
+        INF.setLevel(logging.INFO)
+        INF_ERR  = logging.getLogger("INF.ERR")
+        INF_ERR.setLevel(logging.ERROR)
+
+        # These should log.
+        INF_ERR.log(logging.CRITICAL, m())
+        INF_ERR.error(m())
+
+        # These should not log.
+        INF_ERR.warn(m())
+        INF_ERR.info(m())
+        INF_ERR.debug(m())
+
+        self.assert_log_lines([
+            ('INF.ERR', 'CRITICAL', '1'),
+            ('INF.ERR', 'ERROR', '2'),
+        ])
+
+    def test_nested_inherited(self):
+        #Logging levels in a nested namespace, inherited from parent loggers.
+        m = self.next_message
+
+        INF = logging.getLogger("INF")
+        INF.setLevel(logging.INFO)
+        INF_ERR  = logging.getLogger("INF.ERR")
+        INF_ERR.setLevel(logging.ERROR)
+        INF_UNDEF = logging.getLogger("INF.UNDEF")
+        INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
+        UNDEF = logging.getLogger("UNDEF")
+
+        # These should log.
+        INF_UNDEF.log(logging.CRITICAL, m())
+        INF_UNDEF.error(m())
+        INF_UNDEF.warn(m())
+        INF_UNDEF.info(m())
+        INF_ERR_UNDEF.log(logging.CRITICAL, m())
+        INF_ERR_UNDEF.error(m())
+
+        # These should not log.
+        INF_UNDEF.debug(m())
+        INF_ERR_UNDEF.warn(m())
+        INF_ERR_UNDEF.info(m())
+        INF_ERR_UNDEF.debug(m())
+
+        self.assert_log_lines([
+            ('INF.UNDEF', 'CRITICAL', '1'),
+            ('INF.UNDEF', 'ERROR', '2'),
+            ('INF.UNDEF', 'WARNING', '3'),
+            ('INF.UNDEF', 'INFO', '4'),
+            ('INF.ERR.UNDEF', 'CRITICAL', '5'),
+            ('INF.ERR.UNDEF', 'ERROR', '6'),
+        ])
+
+    def test_nested_with_virtual_parent(self):
+        # Logging levels when some parent does not exist yet.
+        m = self.next_message
+
+        INF = logging.getLogger("INF")
+        GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
+        CHILD = logging.getLogger("INF.BADPARENT")
+        INF.setLevel(logging.INFO)
+
+        # These should log.
+        GRANDCHILD.log(logging.FATAL, m())
+        GRANDCHILD.info(m())
+        CHILD.log(logging.FATAL, m())
+        CHILD.info(m())
+
+        # These should not log.
+        GRANDCHILD.debug(m())
+        CHILD.debug(m())
+
+        self.assert_log_lines([
+            ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
+            ('INF.BADPARENT.UNDEF', 'INFO', '2'),
+            ('INF.BADPARENT', 'CRITICAL', '3'),
+            ('INF.BADPARENT', 'INFO', '4'),
+        ])
+
+
+class BasicFilterTest(BaseTest):
+
+    """Test the bundled Filter class."""
+
+    def test_filter(self):
+        # Only messages satisfying the specified criteria pass through the
+        #  filter.
+        filter_ = logging.Filter("spam.eggs")
+        handler = self.root_logger.handlers[0]
+        try:
+            handler.addFilter(filter_)
+            spam = logging.getLogger("spam")
+            spam_eggs = logging.getLogger("spam.eggs")
+            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
+            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
+
+            spam.info(self.next_message())
+            spam_eggs.info(self.next_message())  # Good.
+            spam_eggs_fish.info(self.next_message())  # Good.
+            spam_bakedbeans.info(self.next_message())
+
+            self.assert_log_lines([
+                ('spam.eggs', 'INFO', '2'),
+                ('spam.eggs.fish', 'INFO', '3'),
+            ])
+        finally:
+            handler.removeFilter(filter_)
+
+
+#
+#   First, we define our levels. There can be as many as you want - the only
+#     limitations are that they should be integers, the lowest should be > 0 and
+#   larger values mean less information being logged. If you need specific
+#   level values which do not fit into these limitations, you can use a
+#   mapping dictionary to convert between your application levels and the
+#   logging system.
+#
+SILENT      = 120
+TACITURN    = 119
+TERSE       = 118
+EFFUSIVE    = 117
+SOCIABLE    = 116
+VERBOSE     = 115
+TALKATIVE   = 114
+GARRULOUS   = 113
+CHATTERBOX  = 112
+BORING      = 111
+
+LEVEL_RANGE = range(BORING, SILENT + 1)
+
+#
+#   Next, we define names for our levels. You don't need to do this - in which
+#   case the system will use "Level n" to denote the text for the level.
+#
+my_logging_levels = {
+    SILENT      : 'Silent',
+    TACITURN    : 'Taciturn',
+    TERSE       : 'Terse',
+    EFFUSIVE    : 'Effusive',
+    SOCIABLE    : 'Sociable',
+    VERBOSE     : 'Verbose',
+    TALKATIVE   : 'Talkative',
+    GARRULOUS   : 'Garrulous',
+    CHATTERBOX  : 'Chatterbox',
+    BORING      : 'Boring',
+}
+
+class GarrulousFilter(logging.Filter):
+
+    """A filter which blocks garrulous messages."""
+
+    def filter(self, record):
+        return record.levelno != GARRULOUS
+
+class VerySpecificFilter(logging.Filter):
+
+    """A filter which blocks sociable and taciturn messages."""
+
+    def filter(self, record):
+        return record.levelno not in [SOCIABLE, TACITURN]
+
+
+class CustomLevelsAndFiltersTest(BaseTest):
+
+    """Test various filtering possibilities with custom logging levels."""
+
+    # Skip the logger name group.
+    expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+
+    def setUp(self):
+        BaseTest.setUp(self)
+        for k, v in my_logging_levels.items():
+            logging.addLevelName(k, v)
+
+    def log_at_all_levels(self, logger):
+        for lvl in LEVEL_RANGE:
+            logger.log(lvl, self.next_message())
+
+    def test_logger_filter(self):
+        # Filter at logger level.
+        self.root_logger.setLevel(VERBOSE)
+        # Levels >= 'Verbose' are good.
+        self.log_at_all_levels(self.root_logger)
+        self.assert_log_lines([
+            ('Verbose', '5'),
+            ('Sociable', '6'),
+            ('Effusive', '7'),
+            ('Terse', '8'),
+            ('Taciturn', '9'),
+            ('Silent', '10'),
+        ])
+
+    def test_handler_filter(self):
+        # Filter at handler level.
+        self.root_logger.handlers[0].setLevel(SOCIABLE)
+        try:
+            # Levels >= 'Sociable' are good.
+            self.log_at_all_levels(self.root_logger)
+            self.assert_log_lines([
+                ('Sociable', '6'),
+                ('Effusive', '7'),
+                ('Terse', '8'),
+                ('Taciturn', '9'),
+                ('Silent', '10'),
+            ])
+        finally:
+            self.root_logger.handlers[0].setLevel(logging.NOTSET)
+
+    def test_specific_filters(self):
+        # Set a specific filter object on the handler, and then add another
+        #  filter object on the logger itself.
+        handler = self.root_logger.handlers[0]
+        specific_filter = None
+        garr = GarrulousFilter()
+        handler.addFilter(garr)
+        try:
+            self.log_at_all_levels(self.root_logger)
+            first_lines = [
+                # Notice how 'Garrulous' is missing
+                ('Boring', '1'),
+                ('Chatterbox', '2'),
+                ('Talkative', '4'),
+                ('Verbose', '5'),
+                ('Sociable', '6'),
+                ('Effusive', '7'),
+                ('Terse', '8'),
+                ('Taciturn', '9'),
+                ('Silent', '10'),
+            ]
+            self.assert_log_lines(first_lines)
+
+            specific_filter = VerySpecificFilter()
+            self.root_logger.addFilter(specific_filter)
+            self.log_at_all_levels(self.root_logger)
+            self.assert_log_lines(first_lines + [
+                # Not only 'Garrulous' is still missing, but also 'Sociable'
+                # and 'Taciturn'
+                ('Boring', '11'),
+                ('Chatterbox', '12'),
+                ('Talkative', '14'),
+                ('Verbose', '15'),
+                ('Effusive', '17'),
+                ('Terse', '18'),
+                ('Silent', '20'),
+        ])
+        finally:
+            if specific_filter:
+                self.root_logger.removeFilter(specific_filter)
+            handler.removeFilter(garr)
+
+
+class MemoryHandlerTest(BaseTest):
+
+    """Tests for the MemoryHandler."""
+
+    # Do not bother with a logger name group.
+    expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+
+    def setUp(self):
+        BaseTest.setUp(self)
+        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
+                                                        self.root_hdlr)
+        self.mem_logger = logging.getLogger('mem')
+        self.mem_logger.propagate = 0
+        self.mem_logger.addHandler(self.mem_hdlr)
+
+    def tearDown(self):
+        self.mem_hdlr.close()
+        BaseTest.tearDown(self)
+
+    def test_flush(self):
+        # The memory handler flushes to its target handler based on specific
+        #  criteria (message count and message level).
+        self.mem_logger.debug(self.next_message())
+        self.assert_log_lines([])
+        self.mem_logger.info(self.next_message())
+        self.assert_log_lines([])
+        # This will flush because the level is >= logging.WARNING
+        self.mem_logger.warn(self.next_message())
+        lines = [
+            ('DEBUG', '1'),
+            ('INFO', '2'),
+            ('WARNING', '3'),
+        ]
+        self.assert_log_lines(lines)
+        for n in (4, 14):
+            for i in range(9):
+                self.mem_logger.debug(self.next_message())
+            self.assert_log_lines(lines)
+            # This will flush because it's the 10th message since the last
+            #  flush.
+            self.mem_logger.debug(self.next_message())
+            lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
+            self.assert_log_lines(lines)
+
+        self.mem_logger.debug(self.next_message())
+        self.assert_log_lines(lines)
+
+
+class ExceptionFormatter(logging.Formatter):
+    """A special exception formatter."""
+    def formatException(self, ei):
+        return "Got a [%s]" % ei[0].__name__
+
+
+class ConfigFileTest(BaseTest):
+
+    """Reading logging config from a .ini-style config file."""
+
+    expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
+
+    # config0 is a standard configuration.
+    config0 = """
+    [loggers]
+    keys=root
+
+    [handlers]
+    keys=hand1
+
+    [formatters]
+    keys=form1
+
+    [logger_root]
+    level=WARNING
+    handlers=hand1
+
+    [handler_hand1]
+    class=StreamHandler
+    level=NOTSET
+    formatter=form1
+    args=(sys.stdout,)
+
+    [formatter_form1]
+    format=%(levelname)s ++ %(message)s
+    datefmt=
+    """
+
+    # config1 adds a little to the standard configuration.
+    config1 = """
+    [loggers]
+    keys=root,parser
+
+    [handlers]
+    keys=hand1
+
+    [formatters]
+    keys=form1
+
+    [logger_root]
+    level=WARNING
+    handlers=
+
+    [logger_parser]
+    level=DEBUG
+    handlers=hand1
+    propagate=1
+    qualname=compiler.parser
+
+    [handler_hand1]
+    class=StreamHandler
+    level=NOTSET
+    formatter=form1
+    args=(sys.stdout,)
+
+    [formatter_form1]
+    format=%(levelname)s ++ %(message)s
+    datefmt=
+    """
+
+    # config2 has a subtle configuration error that should be reported
+    config2 = config1.replace("sys.stdout", "sys.stbout")
+
+    # config3 has a less subtle configuration error
+    config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
+
+    # config4 specifies a custom formatter class to be loaded
+    config4 = """
+    [loggers]
+    keys=root
+
+    [handlers]
+    keys=hand1
+
+    [formatters]
+    keys=form1
+
+    [logger_root]
+    level=NOTSET
+    handlers=hand1
+
+    [handler_hand1]
+    class=StreamHandler
+    level=NOTSET
+    formatter=form1
+    args=(sys.stdout,)
+
+    [formatter_form1]
+    class=""" + __name__ + """.ExceptionFormatter
+    format=%(levelname)s:%(name)s:%(message)s
+    datefmt=
+    """
+
+    # config5 specifies a custom handler class to be loaded
+    config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
+
+    # config6 uses ', ' delimiters in the handlers and formatters sections
+    config6 = """
+    [loggers]
+    keys=root,parser
+
+    [handlers]
+    keys=hand1, hand2
+
+    [formatters]
+    keys=form1, form2
+
+    [logger_root]
+    level=WARNING
+    handlers=
+
+    [logger_parser]
+    level=DEBUG
+    handlers=hand1
+    propagate=1
+    qualname=compiler.parser
+
+    [handler_hand1]
+    class=StreamHandler
+    level=NOTSET
+    formatter=form1
+    args=(sys.stdout,)
+
+    [handler_hand2]
+    class=StreamHandler
+    level=NOTSET
+    formatter=form1
+    args=(sys.stderr,)
+
+    [formatter_form1]
+    format=%(levelname)s ++ %(message)s
+    datefmt=
+
+    [formatter_form2]
+    format=%(message)s
+    datefmt=
+    """
+
+    def apply_config(self, conf):
+        try:
+            fn = tempfile.mktemp(".ini")
+            f = open(fn, "w")
+            f.write(textwrap.dedent(conf))
+            f.close()
+            logging.config.fileConfig(fn)
+        finally:
+            os.remove(fn)
+
+    def test_config0_ok(self):
+        # A simple config file which overrides the default settings.
+        with captured_stdout() as output:
+            self.apply_config(self.config0)
+            logger = logging.getLogger()
+            # Won't output anything
+            logger.info(self.next_message())
+            # Outputs a message
+            logger.error(self.next_message())
+            self.assert_log_lines([
+                ('ERROR', '2'),
+            ], stream=output)
+            # Original logger output is empty.
+            self.assert_log_lines([])
+
+    def test_config1_ok(self, config=config1):
+        # A config file defining a sub-parser as well.
+        with captured_stdout() as output:
+            self.apply_config(config)
+            logger = logging.getLogger("compiler.parser")
+            # Both will output a message
+            logger.info(self.next_message())
+            logger.error(self.next_message())
+            self.assert_log_lines([
+                ('INFO', '1'),
+                ('ERROR', '2'),
+            ], stream=output)
+            # Original logger output is empty.
+            self.assert_log_lines([])
+
+    def test_config2_failure(self):
+        # A simple config file which overrides the default settings.
+        self.assertRaises(StandardError, self.apply_config, self.config2)
+
+    def test_config3_failure(self):
+        # A simple config file which overrides the default settings.
+        self.assertRaises(StandardError, self.apply_config, self.config3)
+
+    def test_config4_ok(self):
+        # A config file specifying a custom formatter class.
+        with captured_stdout() as output:
+            self.apply_config(self.config4)
+            logger = logging.getLogger()
+            try:
+                raise RuntimeError()
+            except RuntimeError:
+                logging.exception("just testing")
+            sys.stdout.seek(0)
+            self.assertEquals(output.getvalue(),
+                "ERROR:root:just testing\nGot a [RuntimeError]\n")
+            # Original logger output is empty
+            self.assert_log_lines([])
+
+    def test_config5_ok(self):
+        self.test_config1_ok(config=self.config5)
+
+    def test_config6_ok(self):
+        self.test_config1_ok(config=self.config6)
+
+class LogRecordStreamHandler(StreamRequestHandler):
+
+    """Handler for a streaming logging request. It saves the log message in the
+    TCP server's 'log_output' attribute."""
+
+    TCP_LOG_END = "!!!END!!!"
+
+    def handle(self):
+        """Handle multiple requests - each expected to be of 4-byte length,
+        followed by the LogRecord in pickle format. Logs the record
+        according to whatever policy is configured locally."""
+        while True:
+            chunk = self.connection.recv(4)
+            if len(chunk) < 4:
+                break
+            slen = struct.unpack(">L", chunk)[0]
+            chunk = self.connection.recv(slen)
+            while len(chunk) < slen:
+                chunk = chunk + self.connection.recv(slen - len(chunk))
+            obj = self.unpickle(chunk)
+            record = logging.makeLogRecord(obj)
+            self.handle_log_record(record)
+
+    def unpickle(self, data):
+        return cPickle.loads(data)
+
+    def handle_log_record(self, record):
+        # If the end-of-messages sentinel is seen, tell the server to
+        #  terminate.
+        if self.TCP_LOG_END in record.msg:
+            self.server.abort = 1
+            return
+        self.server.log_output += record.msg + "\n"
+
+
+class LogRecordSocketReceiver(ThreadingTCPServer):
+
+    """A simple-minded TCP socket-based logging receiver suitable for test
+    purposes."""
+
+    allow_reuse_address = 1
+    log_output = ""
+
+    def __init__(self, host='localhost',
+                             port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
+                     handler=LogRecordStreamHandler):
+        ThreadingTCPServer.__init__(self, (host, port), handler)
+        self.abort = False
+        self.timeout = 0.1
+        self.finished = threading.Event()
+
+    def serve_until_stopped(self):
+        while not self.abort:
+            rd, wr, ex = select.select([self.socket.fileno()], [], [],
+                                       self.timeout)
+            if rd:
+                self.handle_request()
+        # Notify the main thread that we're about to exit
+        self.finished.set()
+        # close the listen socket
+        self.server_close()
+
+
+class SocketHandlerTest(BaseTest):
+
+    """Test for SocketHandler objects."""
+
+    def setUp(self):
+        """Set up a TCP server to receive log messages, and a SocketHandler
+        pointing to that server's address and port."""
+        BaseTest.setUp(self)
+        self.tcpserver = LogRecordSocketReceiver(port=0)
+        self.port = self.tcpserver.socket.getsockname()[1]
+        self.threads = [
+                threading.Thread(target=self.tcpserver.serve_until_stopped)]
+        for thread in self.threads:
+            thread.start()
+
+        self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
+        self.sock_hdlr.setFormatter(self.root_formatter)
+        self.root_logger.removeHandler(self.root_logger.handlers[0])
+        self.root_logger.addHandler(self.sock_hdlr)
+
+    def tearDown(self):
+        """Shutdown the TCP server."""
+        try:
+            self.tcpserver.abort = True
+            del self.tcpserver
+            self.root_logger.removeHandler(self.sock_hdlr)
+            self.sock_hdlr.close()
+            for thread in self.threads:
+                thread.join(2.0)
+        finally:
+            BaseTest.tearDown(self)
+
+    def get_output(self):
+        """Get the log output as received by the TCP server."""
+        # Signal the TCP receiver and wait for it to terminate.
+        self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
+        self.tcpserver.finished.wait(2.0)
+        return self.tcpserver.log_output
+
+    def test_output(self):
+        # The log message sent to the SocketHandler is properly received.
+        logger = logging.getLogger("tcp")
+        logger.error("spam")
+        logger.debug("eggs")
+        self.assertEquals(self.get_output(), "spam\neggs\n")
+
+
+class MemoryTest(BaseTest):
+
+    """Test memory persistence of logger objects."""
+
+    def setUp(self):
+        """Create a dict to remember potentially destroyed objects."""
+        BaseTest.setUp(self)
+        self._survivors = {}
+
+    def _watch_for_survival(self, *args):
+        """Watch the given objects for survival, by creating weakrefs to
+        them."""
+        for obj in args:
+            key = id(obj), repr(obj)
+            self._survivors[key] = weakref.ref(obj)
+
+    def _assert_survival(self):
+        """Assert that all objects watched for survival have survived."""
+        # Trigger cycle breaking.
+        gc.collect()
+        dead = []
+        for (id_, repr_), ref in self._survivors.items():
+            if ref() is None:
+                dead.append(repr_)
+        if dead:
+            self.fail("%d objects should have survived "
+                "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
+
+    def test_persistent_loggers(self):
+        # Logger objects are persistent and retain their configuration, even
+        #  if visible references are destroyed.
+        self.root_logger.setLevel(logging.INFO)
+        foo = logging.getLogger("foo")
+        self._watch_for_survival(foo)
+        foo.setLevel(logging.DEBUG)
+        self.root_logger.debug(self.next_message())
+        foo.debug(self.next_message())
+        self.assert_log_lines([
+            ('foo', 'DEBUG', '2'),
+        ])
+        del foo
+        # foo has survived.
+        self._assert_survival()
+        # foo has retained its settings.
+        bar = logging.getLogger("foo")
+        bar.debug(self.next_message())
+        self.assert_log_lines([
+            ('foo', 'DEBUG', '2'),
+            ('foo', 'DEBUG', '3'),
+        ])
+
+class EncodingTest(BaseTest):
+    def test_encoding_plain_file(self):
+        # In Python 2.x, a plain file object is treated as having no encoding.
+        log = logging.getLogger("test")
+        fn = tempfile.mktemp(".log")
+        # the non-ascii data we write to the log.
+        data = "foo\x80"
+        try:
+            handler = logging.FileHandler(fn)
+            log.addHandler(handler)
+            try:
+                # write non-ascii data to the log.
+                log.warning(data)
+            finally:
+                log.removeHandler(handler)
+                handler.close()
+            # check we wrote exactly those bytes, ignoring trailing \n etc
+            f = open(fn)
+            try:
+                self.failUnlessEqual(f.read().rstrip(), data)
+            finally:
+                f.close()
+        finally:
+            if os.path.isfile(fn):
+                os.remove(fn)
+
+# Set the locale to the platform-dependent default.  I have no idea
+# why the test does this, but in any case we save the current locale
+# first and restore it at the end.
+@run_with_locale('LC_ALL', '')
+def test_main():
+    run_unittest(BuiltinLevelsTest, BasicFilterTest,
+                    CustomLevelsAndFiltersTest, MemoryHandlerTest,
+                    ConfigFileTest, SocketHandlerTest, MemoryTest,
+                    EncodingTest)
+
+if __name__ == "__main__":
+    test_main()