#!/usr/bin/env python # -*- coding: utf-8 -*- # """ report (Report Module) ====================== **Author:** * Dirk Alders **Description:** The Module is designed to help with python logging and to support some handlers for logging to memory. **Submodules:** * :class:`report.collectingHandler` * :class:`report.collectingRingHandler` * :class:`report.collectingTestcaseHandler` * :func:`report.consoleLoggingConfigure` * :class:`report.testSession` **Unittest:** See also the :download:`unittest <../../report/_testresults_/unittest.pdf>` documentation. """ __DEPENDENCIES__ = [] import collections import json import logging from logging.config import dictConfig import os import sys try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) __DESCRIPTION__ = """The Module {\\tt %s} is designed to help with python logging and to support some handlers for logging to memory. For more Information read the sphinx documentation.""" % __name__.replace('_', '\_') """The Module Description""" __INTERPRETER__ = (3, ) """The Tested Interpreter-Versions""" SHORT_FMT = "%(asctime)s: %(name)s - %(levelname)s - %(message)s" """ A short formatter including the most important information""" LONG_FMT = SHORT_FMT + \ "\n File \"%(pathname)s\", line %(lineno)d, in %(funcName)s" """ A long formatter which results in links to the source code inside Eclipse""" MAX_FMT = """ %(name)s %(levelno)s %(levelname)s %(pathname)s %(filename)s %(module)s %(lineno)d %(funcName)s %(created)f %(asctime)s %(msecs)d %(relativeCreated)d %(thread)d %(threadName)s %(process)d %(message)s""" DEFAULT_FMT = LONG_FMT """The default formatstring""" class collectingHandler(logging.Handler): MY_LOGS = [] def __init__(self): logging.Handler.__init__(self) self.setFormatter(logging.Formatter(MAX_FMT)) self.setLevel(logging.DEBUG) def emit(self, record): self.format(record) self.MY_LOGS.append(record.__dict__) def make_independent(self): self.MY_LOGS = [] def get_logs(self): rv = [] while len(self.MY_LOGS) > 0: rv.append(self.MY_LOGS.pop(0)) return rv def get_str(self, logs=None, fmt=SHORT_FMT): logs = logs or self.MY_LOGS return '\n'.join([fmt % log for log in logs]) def __len__(self): return len(self.MY_LOGS) def __str__(self): return self.get_str(self.MY_LOGS) class collectingRingHandler(collectingHandler): MY_LOGS = collections.deque([], 10) def __init__(self, max_logs=None): collectingHandler.__init__(self) if max_logs is not None and max_logs != self.MY_LOGS.maxlen: self.MY_LOGS.__init__(list(self.MY_LOGS), max_logs) def make_independent(self): self.MY_LOGS = collections.deque([], self.MY_LOGS.maxlen) def get_logs(self): return list(self.MY_LOGS) TCEL_SINGLE = 0 """Testcase level (smoke), this is just a rough test for the main functionality""" TCEL_SMOKE = 10 """Testcase level (smoke), this is just a rough test for the main functionality""" TCEL_SHORT = 50 """Testcase level (short), this is a short test for an extended functionality""" TCEL_FULL = 90 """Testcase level (full), this is a complete test for the full functionality""" TCEL_NAMES = { TCEL_SINGLE: 'Single Test', TCEL_SMOKE: 'Smoke Test (Minumum subset)', TCEL_SHORT: 'Short Test (Subset)', TCEL_FULL: 'Full Test (all defined tests)' } """Dictionary for resolving the test case levels (TCL) to a (human readable) name""" TCEL_REVERSE_NAMED = { 'short': TCEL_SHORT, 'smoke': TCEL_SMOKE, 'single': TCEL_SINGLE, 'full': TCEL_FULL, } """Dictionary for resolving named test case levels (TCL) to test case level number""" class collectingTestcaseHandler(collectingHandler): MY_LOGS = [] def emit(self, record): self.format(record) self.MY_LOGS.append(record.__dict__) self.MY_LOGS[-1]['moduleLogger'] = collectingHandler().get_logs() class JsonFormatter(logging.Formatter): def format(self, record): obj = {} for key in ["name", "levelno", "levelname", "pathname", "filename", "module", "lineno", "funcName", "created", "msecs", "relativeCreated", "thread", "threadName", "process", "processName", "msg", "args", "exc_info", "exc_text"]: obj[key] = getattr(record, key) obj["msg"] = obj["msg"] % obj["args"] return json.dumps(obj) def appLoggingConfigure(basepath, target, log_name_lvl=[], target_level=logging.DEBUG, fmt=SHORT_FMT, ring_logs=None, host=None, port=None): target_handlers = ['main', ] # define handler # if target == 'stdout': handler = dict(main={ 'level': logging.getLevelName(target_level), 'formatter': 'format', 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stdout', }) elif target == 'logfile': handler = dict(main={ 'level': logging.getLevelName(target_level), 'formatter': 'json', 'class': 'logging.handlers.RotatingFileHandler', 'filename': os.path.join(basepath, 'messages.log'), 'mode': 'a', 'maxBytes': 10485760, 'backupCount': 7 }) else: handler = dict(main={ 'level': 'DEBUG', 'formatter': 'json', 'class': 'logging.NullHandler', }) if host is not None and port is not None: target_handlers.append('socket') handler['socket'] = { 'level': 'DEBUG', 'class': 'logging.handlers.SocketHandler', 'host': host, 'port': port } if ring_logs is not None: target_handlers.append('ring') handler['ring'] = { 'class': 'report.collectingRingHandler', 'max_logs': ring_logs, } # define loggers # loggers = {} for name, lvl in log_name_lvl: loggers[name] = { 'handlers': target_handlers, 'level': lvl, 'propagate': False } # configure logging # dictConfig(dict( version=1, formatters={ 'json': { '()': JsonFormatter }, 'long': { 'format': LONG_FMT }, 'format': { 'format': fmt, }, }, handlers=handler, loggers=loggers, )) def stdoutLoggingConfigure(log_name_lvl=[], fmt=SHORT_FMT): appLoggingConfigure(None, 'stdout', log_name_lvl=log_name_lvl, fmt=fmt) class testSession(dict): KEY_NAME = 'name' KEY_FAILED_TESTS = 'number_of_failed_tests' KEY_POSSIBLY_FAILED_TESTS = 'number_of_possibly_failed_tests' KEY_SUCCESS_TESTS = 'number_of_successfull_tests' KEY_ALL_TESTS = 'number_of_tests' KEY_EXEC_LVL = 'testcase_execution_level' KEY_EXEC_NAMES = 'testcase_names' KEY_LVL_NAMES = 'level_names' KEY_TESTCASELIST = 'testcases' KEY_UID_LIST = 'uid_list_sorted' # DEFAULT_BASE_DATA = { KEY_NAME: 'Default Testsession name', KEY_FAILED_TESTS: 0, KEY_POSSIBLY_FAILED_TESTS: 0, KEY_FAILED_TESTS: 0, KEY_SUCCESS_TESTS: 0, KEY_ALL_TESTS: 0, KEY_EXEC_LVL: TCEL_FULL, KEY_EXEC_NAMES: TCEL_NAMES, } def __init__(self, module_names=[], **kwargs): dict.__init__(self, time_consumption=0.) self.__testcase__ = None self.__set_base_data__(**kwargs) self.__configure_logging__(module_names) def __set_base_data__(self, **kwargs): for key in set([key for key in self.DEFAULT_BASE_DATA.keys()] + [key for key in kwargs.keys()]): self[key] = kwargs.get(key, self.DEFAULT_BASE_DATA.get(key)) self[self.KEY_TESTCASELIST] = {} self[self.KEY_UID_LIST] = [] def __configure_logging__(self, module_names): # # Configure logging for testSession # logging_config = dict( version=1, formatters={ 'short': { 'format': SHORT_FMT, }, 'long': { 'format': LONG_FMT, }, }, handlers={ 'console': { 'level': 'DEBUG', 'class': 'logging.NullHandler', 'formatter': 'short', }, 'module_logs': { 'level': 'DEBUG', 'class': 'report.collectingHandler', 'formatter': 'short', }, 'testcase_logs': { 'level': 'DEBUG', 'class': 'report.collectingTestcaseHandler', 'formatter': 'short', }, }, loggers=self.__module_loggers__(module_names), ) dictConfig(logging_config) def __module_loggers__(self, module_names): rv = {} rv['__tLogger__'] = dict(handlers=['console', 'testcase_logs'], level='DEBUG', propagate=False) for name in module_names + ['__mLogger__']: rv[name] = dict(handlers=['console', 'module_logs'], level='DEBUG', propagate=False) return rv def testCase(self, name, testcase_execution_level, test_method, *args, **kwargs): if testcase_execution_level <= self[self.KEY_EXEC_LVL]: sys.stdout.write(' %s...' % name[:75]) tLogger = logging.getLogger('__tLogger__') tHandler = collectingTestcaseHandler() if len(tHandler.MY_LOGS) > 0: raise AttributeError("Testcaselogger shall be empty after closing testcase!") tLogger._log(logging.DEBUG, name, None) if len(tHandler.MY_LOGS) != 1: raise AttributeError("Testcaselogger shall have only one entry for the main testcase (temporary)!") self.__testcase__ = tHandler.get_logs()[0] test_method(logging.getLogger('__tLogger__'), *args, **kwargs) self.__close_active_testcase__() def __close_active_testcase__(self): if self.__testcase__ is not None: name = self.__testcase__.get('message') # # Add testcase # tch = collectingTestcaseHandler() self.__testcase__['testcaseLogger'] = tch.get_logs() if name in self[self.KEY_TESTCASELIST]: raise AttributeError("Testcase named %s already exists" % name) self[self.KEY_TESTCASELIST][name] = self.__testcase__ self[self.KEY_UID_LIST].append(name) # # Adapt testcase data # self[self.KEY_TESTCASELIST][name]['levelno'] = 0 self[self.KEY_TESTCASELIST][name]['time_consumption'] = 0. for teststep in self[self.KEY_TESTCASELIST][name]['testcaseLogger']: # store maximum level to testcase if teststep.get('levelno') > self[self.KEY_TESTCASELIST][name]['levelno']: self[self.KEY_TESTCASELIST][name]['levelno'] = teststep.get('levelno') self[self.KEY_TESTCASELIST][name]['levelname'] = teststep.get('levelname') # store time_consumption for teststep try: teststep['time_consumption'] = teststep['created'] - teststep['moduleLogger'][-1]['created'] except IndexError: teststep['time_consumption'] = 0. # Increment testcase time_comsumption # Increment testcase counters # self[self.KEY_ALL_TESTS] += 1 if self[self.KEY_TESTCASELIST][name]['levelno'] <= logging.INFO: self[self.KEY_SUCCESS_TESTS] += 1 sys.stdout.write('\033[92mSUCCESS\033[0m\n') elif self[self.KEY_TESTCASELIST][name]['levelno'] >= logging.ERROR: self[self.KEY_FAILED_TESTS] += 1 sys.stdout.write('\033[91mFAILED\033[0m\n') else: self[self.KEY_POSSIBLY_FAILED_TESTS] += 1 sys.stdout.write('\033[93mPOSSIBLY FAILED\033[0m\n') # Set testcase time and time_consumption self[self.KEY_TESTCASELIST][name]['time_start'] = self.__testcase__['asctime'] self[self.KEY_TESTCASELIST][name]['time_finished'] = teststep['asctime'] self[self.KEY_TESTCASELIST][name]['time_consumption'] = teststep['created'] - self.__testcase__['created'] # Set testcase time consumption self['time_consumption'] += self[self.KEY_TESTCASELIST][name]['time_consumption'] self.__testcase__ = None