#!/usr/bin/env python # -*- coding: utf-8 -*- # """ report (Report Module) ====================== **Author:** * Dirk Alders **Description:** The Module is designed to help with python logging and to support some handlers for logging to memory. **Submodules:** * :class:`report.collectingHandler` * :class:`report.collectingRingHandler` * :class:`report.collectingTestcaseHandler` * :func:`report.consoleLoggingConfigure` * :class:`report.testSession` **Unittest:** See also the :download:`unittest <../../report/_testresults_/unittest.pdf>` documentation. """ __DEPENDENCIES__ = [] import collections import logging from logging.config import dictConfig import os import sys logger_name = 'REPORT' logger = logging.getLogger(logger_name) __DESCRIPTION__ = """The Module {\\tt %s} is designed to help with python logging and to support some handlers for logging to memory. For more Information read the sphinx documentation.""" % __name__.replace('_', '\_') """The Module Description""" __INTERPRETER__ = (2, 3, ) """The Tested Interpreter-Versions""" SHORT_FMT = "%(asctime)s: %(name)s - %(levelname)s - %(message)s" """ A short formatter including the most important information""" LONG_FMT = """~~~~(%(levelname)-10s)~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ File "%(pathname)s", line %(lineno)d, in %(funcName)s %(asctime)s: %(name)s- %(message)s ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~""" """ A long formatter which results in links to the source code inside Eclipse""" MAX_FMT = """ %(name)s %(levelno)s %(levelname)s %(pathname)s %(filename)s %(module)s %(lineno)d %(funcName)s %(created)f %(asctime)s %(msecs)d %(relativeCreated)d %(thread)d %(threadName)s %(process)d %(message)s""" DEFAULT_FMT = LONG_FMT """The default formatstring""" class collectingHandler(logging.Handler): MY_LOGS = [] def __init__(self): logging.Handler.__init__(self) self.setFormatter(logging.Formatter(MAX_FMT)) self.setLevel(logging.DEBUG) def emit(self, record): self.format(record) self.MY_LOGS.append(record.__dict__) def make_independent(self): self.MY_LOGS = [] def get_logs(self): rv = [] while len(self.MY_LOGS) > 0: rv.append(self.MY_LOGS.pop(0)) return rv def get_str(self, logs=None, fmt=SHORT_FMT): logs = logs or self.MY_LOGS return '\n'.join([fmt % log for log in logs]) def __len__(self): return len(self.MY_LOGS) def __str__(self): return self.get_str(self.MY_LOGS) class collectingRingHandler(collectingHandler): MY_LOGS = collections.deque([], 10) def __init__(self, max_logs=None): collectingHandler.__init__(self) if max_logs is not None and max_logs != self.MY_LOGS.maxlen: self.MY_LOGS.__init__(list(self.MY_LOGS), max_logs) def make_independent(self): self.MY_LOGS = collections.deque([], self.MY_LOGS.maxlen) def get_logs(self): return list(self.MY_LOGS) TCEL_SINGLE = 0 """Testcase level (smoke), this is just a rough test for the main functionality""" TCEL_SMOKE = 10 """Testcase level (smoke), this is just a rough test for the main functionality""" TCEL_SHORT = 50 """Testcase level (short), this is a short test for an extended functionality""" TCEL_FULL = 90 """Testcase level (full), this is a complete test for the full functionality""" TCEL_NAMES = { TCEL_SINGLE: 'Single Test', TCEL_SMOKE: 'Smoke Test (Minumum subset)', TCEL_SHORT: 'Short Test (Subset)', TCEL_FULL: 'Full Test (all defined tests)' } """Dictionary for resolving the test case levels (TCL) to a (human readable) name""" TCEL_REVERSE_NAMED = { 'short': TCEL_SHORT, 'smoke': TCEL_SMOKE, 'single': TCEL_SINGLE, 'full': TCEL_FULL, } """Dictionary for resolving named test case levels (TCL) to test case level number""" class collectingTestcaseHandler(collectingHandler): MY_LOGS = [] def emit(self, record): self.format(record) self.MY_LOGS.append(record.__dict__) self.MY_LOGS[-1]['moduleLogger'] = collectingHandler().get_logs() def appLoggingConfigure(basepath, target, log_name_lvl=[], fmt=SHORT_FMT, ring_logs=None): target_handlers = ['main', ] if basepath is not None: target_handlers.append('logwarn') # define handler # if target == 'stdout': handler = dict(main={ 'level': 'DEBUG', 'formatter': 'format', 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stdout', }) elif target == 'logfile': handler = dict(main={ 'level': 'DEBUG', 'formatter': 'format', 'class': 'logging.handlers.RotatingFileHandler', 'filename': os.path.join(basepath, 'messages.log'), 'mode': 'a', 'maxBytes': 10485760, 'backupCount': 7 }) else: handler = dict(my_handler={ 'level': 'DEBUG', 'formatter': 'my_format', 'class': 'logging.NullHandler', }) if ring_logs is not None: target_handlers.append('ring') handler['ring'] = { 'class': 'report.collectingRingHandler', 'max_logs': ring_logs, } if basepath is not None: handler['logwarn'] = { 'level': 'WARNING', 'formatter': 'long', 'class': 'logging.handlers.RotatingFileHandler', 'filename': os.path.join(basepath, 'messages.warn'), 'mode': 'a', 'maxBytes': 10485760, 'backupCount': 2 } # define loggers # loggers = {} for name, lvl in log_name_lvl: loggers[name] = { 'handlers': target_handlers, 'level': lvl, 'propagate': False } # configure logging # dictConfig(dict( version=1, formatters={ 'long': { 'format': LONG_FMT }, 'format': { 'format': fmt, }, }, handlers=handler, loggers=loggers, )) def stdoutLoggingConfigure(log_name_lvl=[], fmt=SHORT_FMT): appLoggingConfigure(None, 'stdout', log_name_lvl=log_name_lvl, fmt=fmt) class testSession(dict): KEY_NAME = 'name' KEY_FAILED_TESTS = 'number_of_failed_tests' KEY_POSSIBLY_FAILED_TESTS = 'number_of_possibly_failed_tests' KEY_SUCCESS_TESTS = 'number_of_successfull_tests' KEY_ALL_TESTS = 'number_of_tests' KEY_EXEC_LVL = 'testcase_execution_level' KEY_EXEC_NAMES = 'testcase_names' KEY_LVL_NAMES = 'level_names' KEY_TESTCASELIST = 'testcases' KEY_UID_LIST = 'uid_list_sorted' # DEFAULT_BASE_DATA = { KEY_NAME: 'Default Testsession name', KEY_FAILED_TESTS: 0, KEY_POSSIBLY_FAILED_TESTS: 0, KEY_FAILED_TESTS: 0, KEY_SUCCESS_TESTS: 0, KEY_ALL_TESTS: 0, KEY_EXEC_LVL: TCEL_FULL, KEY_EXEC_NAMES: TCEL_NAMES, } def __init__(self, module_names=[], **kwargs): dict.__init__(self, time_consumption=0.) self.__testcase__ = None self.__set_base_data__(**kwargs) self.__configure_logging__(module_names) def __set_base_data__(self, **kwargs): for key in set([key for key in self.DEFAULT_BASE_DATA.keys()] + [key for key in kwargs.keys()]): self[key] = kwargs.get(key, self.DEFAULT_BASE_DATA.get(key)) self[self.KEY_TESTCASELIST] = {} self[self.KEY_UID_LIST] = [] def __configure_logging__(self, module_names): # # Configure logging for testSession # logging_config = dict( version=1, formatters={ 'short': { 'format': SHORT_FMT, }, 'long': { 'format': LONG_FMT, }, }, handlers={ 'console': { 'level': 'DEBUG', 'class': 'logging.NullHandler', 'formatter': 'short', }, 'module_logs': { 'level': 'DEBUG', 'class': 'report.collectingHandler', 'formatter': 'short', }, 'testcase_logs': { 'level': 'DEBUG', 'class': 'report.collectingTestcaseHandler', 'formatter': 'short', }, }, loggers=self.__module_loggers__(module_names), ) dictConfig(logging_config) def __module_loggers__(self, module_names): rv = {} rv['__tLogger__'] = dict(handlers=['console', 'testcase_logs'], level='DEBUG', propagate=False) for name in module_names + ['__mLogger__']: rv[name] = dict(handlers=['console', 'module_logs'], level='DEBUG', propagate=False) return rv def testCase(self, name, testcase_execution_level, test_method, *args, **kwargs): if testcase_execution_level <= self[self.KEY_EXEC_LVL]: tLogger = logging.getLogger('__tLogger__') tHandler = collectingTestcaseHandler() if len(tHandler.MY_LOGS) > 0: raise AttributeError("Testcaselogger shall be empty after closing testcase!") tLogger._log(logging.DEBUG, name, None) if len(tHandler.MY_LOGS) != 1: raise AttributeError("Testcaselogger shall have only one entry for the main testcase (temporary)!") self.__testcase__ = tHandler.get_logs()[0] test_method(logging.getLogger('__tLogger__'), *args, **kwargs) self.__close_active_testcase__() def __close_active_testcase__(self): if self.__testcase__ is not None: name = self.__testcase__.get('message') # # Add testcase # tch = collectingTestcaseHandler() self.__testcase__['testcaseLogger'] = tch.get_logs() if name in self[self.KEY_TESTCASELIST]: raise AttributeError("Testcase named %s already exists" % name) self[self.KEY_TESTCASELIST][name] = self.__testcase__ self[self.KEY_UID_LIST].append(name) # # Adapt testcase data # self[self.KEY_TESTCASELIST][name]['levelno'] = 0 self[self.KEY_TESTCASELIST][name]['time_consumption'] = 0. for teststep in self[self.KEY_TESTCASELIST][name]['testcaseLogger']: # store maximum level to testcase if teststep.get('levelno') > self[self.KEY_TESTCASELIST][name]['levelno']: self[self.KEY_TESTCASELIST][name]['levelno'] = teststep.get('levelno') self[self.KEY_TESTCASELIST][name]['levelname'] = teststep.get('levelname') # store time_consumption for teststep try: teststep['time_consumption'] = teststep['created'] - teststep['moduleLogger'][-1]['created'] except IndexError: teststep['time_consumption'] = 0. # Increment testcase time_comsumption # Increment testcase counters # self[self.KEY_ALL_TESTS] += 1 if self[self.KEY_TESTCASELIST][name]['levelno'] <= logging.INFO: self[self.KEY_SUCCESS_TESTS] += 1 elif self[self.KEY_TESTCASELIST][name]['levelno'] >= logging.ERROR: self[self.KEY_FAILED_TESTS] += 1 else: self[self.KEY_POSSIBLY_FAILED_TESTS] += 1 # Set testcase time and time_consumption self[self.KEY_TESTCASELIST][name]['time_start'] = self.__testcase__['asctime'] self[self.KEY_TESTCASELIST][name]['time_finished'] = teststep['asctime'] self[self.KEY_TESTCASELIST][name]['time_consumption'] = teststep['created'] - self.__testcase__['created'] # Set testcase time consumption self['time_consumption'] += self[self.KEY_TESTCASELIST][name]['time_consumption'] self.__testcase__ = None