# Copyright (C) 2012 Google, Inc. # Copyright (C) 2010 Chris Jerdonek (cjerdonek@webkit.org) # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import StringIO import logging from webkitpy.common.system import outputcapture from webkitpy.common.system.systemhost import SystemHost from webkitpy.layout_tests.views.metered_stream import MeteredStream from webkitpy.tool.grammar import pluralize _log = logging.getLogger(__name__) class Printer(object): def __init__(self, stream, options=None): self.stream = stream self.meter = None self.options = options self.num_tests = 0 self.num_started = 0 self.num_errors = 0 self.num_failures = 0 self.running_tests = [] self.completed_tests = [] if options: self.configure(options) def configure(self, options): self.options = options if options.timing: # --timing implies --verbose options.verbose = max(options.verbose, 1) log_level = logging.INFO if options.quiet: log_level = logging.WARNING elif options.verbose == 2: log_level = logging.DEBUG self.meter = MeteredStream(self.stream, (options.verbose == 2), number_of_columns=SystemHost().platform.terminal_width()) handler = logging.StreamHandler(self.stream) # We constrain the level on the handler rather than on the root # logger itself. This is probably better because the handler is # configured and known only to this module, whereas the root logger # is an object shared (and potentially modified) by many modules. # Modifying the handler, then, is less intrusive and less likely to # interfere with modifications made by other modules (e.g. in unit # tests). handler.name = __name__ handler.setLevel(log_level) formatter = logging.Formatter("%(message)s") handler.setFormatter(formatter) logger = logging.getLogger() logger.addHandler(handler) logger.setLevel(logging.NOTSET) # Filter out most webkitpy messages. # # Messages can be selectively re-enabled for this script by updating # this method accordingly. def filter_records(record): """Filter out autoinstall and non-third-party webkitpy messages.""" # FIXME: Figure out a way not to use strings here, for example by # using syntax like webkitpy.test.__name__. We want to be # sure not to import any non-Python 2.4 code, though, until # after the version-checking code has executed. if (record.name.startswith("webkitpy.common.system.autoinstall") or record.name.startswith("webkitpy.test")): return True if record.name.startswith("webkitpy"): return False return True testing_filter = logging.Filter() testing_filter.filter = filter_records # Display a message so developers are not mystified as to why # logging does not work in the unit tests. _log.info("Suppressing most webkitpy logging while running unit tests.") handler.addFilter(testing_filter) if self.options.pass_through: outputcapture.OutputCapture.stream_wrapper = _CaptureAndPassThroughStream def write_update(self, msg): self.meter.write_update(msg) def print_started_test(self, source, test_name): self.running_tests.append(test_name) if len(self.running_tests) > 1: suffix = ' (+%d)' % (len(self.running_tests) - 1) else: suffix = '' if self.options.verbose: write = self.meter.write_update else: write = self.meter.write_throttled_update write(self._test_line(self.running_tests[0], suffix)) def print_finished_test(self, source, test_name, test_time, failures, errors): write = self.meter.writeln if failures: lines = failures[0].splitlines() + [''] suffix = ' failed:' self.num_failures += 1 elif errors: lines = errors[0].splitlines() + [''] suffix = ' erred:' self.num_errors += 1 else: suffix = ' passed' lines = [] if self.options.verbose: write = self.meter.writeln else: write = self.meter.write_throttled_update if self.options.timing: suffix += ' %.4fs' % test_time self.num_started += 1 if test_name == self.running_tests[0]: self.completed_tests.insert(0, [test_name, suffix, lines]) else: self.completed_tests.append([test_name, suffix, lines]) self.running_tests.remove(test_name) for test_name, msg, lines in self.completed_tests: if lines: self.meter.writeln(self._test_line(test_name, msg)) for line in lines: self.meter.writeln(' ' + line) else: write(self._test_line(test_name, msg)) self.completed_tests = [] def _test_line(self, test_name, suffix): format_string = '[%d/%d] %s%s' status_line = format_string % (self.num_started, self.num_tests, test_name, suffix) if len(status_line) > self.meter.number_of_columns(): overflow_columns = len(status_line) - self.meter.number_of_columns() ellipsis = '...' if len(test_name) < overflow_columns + len(ellipsis) + 3: # We don't have enough space even if we elide, just show the test method name. test_name = test_name.split('.')[-1] else: new_length = len(test_name) - overflow_columns - len(ellipsis) prefix = int(new_length / 2) test_name = test_name[:prefix] + ellipsis + test_name[-(new_length - prefix):] return format_string % (self.num_started, self.num_tests, test_name, suffix) def print_result(self, run_time): write = self.meter.writeln write('Ran %s in %.3fs' % (pluralize(self.num_started, "test"), run_time)) if self.num_failures or self.num_errors: write('FAILED (failures=%d, errors=%d)\n' % (self.num_failures, self.num_errors)) else: write('\nOK\n') class _CaptureAndPassThroughStream(object): def __init__(self, stream): self._buffer = StringIO.StringIO() self._stream = stream def write(self, msg): self._stream.write(msg) # Note that we don't want to capture any output generated by the debugger # because that could cause the results of capture_output() to be invalid. if not self._message_is_from_pdb(): self._buffer.write(msg) def _message_is_from_pdb(self): # We will assume that if the pdb module is in the stack then the output # is being generated by the python debugger (or the user calling something # from inside the debugger). import inspect import pdb stack = inspect.stack() return any(frame[1] == pdb.__file__.replace('.pyc', '.py') for frame in stack) def flush(self): self._stream.flush() def getvalue(self): return self._buffer.getvalue()