report/__init__.py

363 lines
12 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
report (Report Module)
======================
**Author:**
* Dirk Alders <sudo-dirk@mount-mockery.de>
**Description:**
The Module is designed to help with python logging and to support some handlers for logging to memory.
**Submodules:**
* :class:`report.collectingHandler`
* :class:`report.collectingRingHandler`
* :class:`report.collectingTestcaseHandler`
* :func:`report.consoleLoggingConfigure`
* :class:`report.testSession`
**Unittest:**
See also the :download:`unittest <../../report/_testresults_/unittest.pdf>` documentation.
"""
__DEPENDENCIES__ = []
import collections
import logging
from logging.config import dictConfig
import os
import sys
logger_name = 'REPORT'
logger = logging.getLogger(logger_name)
__DESCRIPTION__ = """The Module {\\tt %s} is designed to help with python logging and to support some handlers for logging to memory.
For more Information read the sphinx documentation.""" % __name__.replace('_', '\_')
"""The Module Description"""
__INTERPRETER__ = (2, 3, )
"""The Tested Interpreter-Versions"""
SHORT_FMT = "%(asctime)s: %(name)s - %(levelname)s - %(message)s"
""" A short formatter including the most important information"""
LONG_FMT = """~~~~(%(levelname)-10s)~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
File "%(pathname)s", line %(lineno)d, in %(funcName)s
%(asctime)s: %(name)s- %(message)s
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"""
""" A long formatter which results in links to the source code inside Eclipse"""
MAX_FMT = """
%(name)s
%(levelno)s
%(levelname)s
%(pathname)s
%(filename)s
%(module)s
%(lineno)d
%(funcName)s
%(created)f
%(asctime)s
%(msecs)d
%(relativeCreated)d
%(thread)d
%(threadName)s
%(process)d
%(message)s"""
DEFAULT_FMT = LONG_FMT
"""The default formatstring"""
class collectingHandler(logging.Handler):
MY_LOGS = []
def __init__(self):
logging.Handler.__init__(self)
self.setFormatter(logging.Formatter(MAX_FMT))
self.setLevel(logging.DEBUG)
def emit(self, record):
self.format(record)
self.MY_LOGS.append(record.__dict__)
def make_independent(self):
self.MY_LOGS = []
def get_logs(self):
rv = []
while len(self.MY_LOGS) > 0:
rv.append(self.MY_LOGS.pop(0))
return rv
def get_str(self, logs=None, fmt=SHORT_FMT):
logs = logs or self.MY_LOGS
return '\n'.join([fmt % log for log in logs])
def __len__(self):
return len(self.MY_LOGS)
def __str__(self):
return self.get_str(self.MY_LOGS)
class collectingRingHandler(collectingHandler):
MY_LOGS = collections.deque([], 10)
def __init__(self, max_logs=None):
collectingHandler.__init__(self)
if max_logs is not None and max_logs != self.MY_LOGS.maxlen:
self.MY_LOGS.__init__(list(self.MY_LOGS), max_logs)
def make_independent(self):
self.MY_LOGS = collections.deque([], self.MY_LOGS.maxlen)
def get_logs(self):
return list(self.MY_LOGS)
TCEL_SINGLE = 0
"""Testcase level (smoke), this is just a rough test for the main functionality"""
TCEL_SMOKE = 10
"""Testcase level (smoke), this is just a rough test for the main functionality"""
TCEL_SHORT = 50
"""Testcase level (short), this is a short test for an extended functionality"""
TCEL_FULL = 90
"""Testcase level (full), this is a complete test for the full functionality"""
TCEL_NAMES = {
TCEL_SINGLE: 'Single Test',
TCEL_SMOKE: 'Smoke Test (Minumum subset)',
TCEL_SHORT: 'Short Test (Subset)',
TCEL_FULL: 'Full Test (all defined tests)'
}
"""Dictionary for resolving the test case levels (TCL) to a (human readable) name"""
TCEL_REVERSE_NAMED = {
'short': TCEL_SHORT,
'smoke': TCEL_SMOKE,
'single': TCEL_SINGLE,
'full': TCEL_FULL,
}
"""Dictionary for resolving named test case levels (TCL) to test case level number"""
class collectingTestcaseHandler(collectingHandler):
MY_LOGS = []
def emit(self, record):
self.format(record)
self.MY_LOGS.append(record.__dict__)
self.MY_LOGS[-1]['moduleLogger'] = collectingHandler().get_logs()
def appLoggingConfigure(basepath, target, log_name_lvl=[], fmt=SHORT_FMT, ring_logs=None):
target_handlers = ['main', ]
if basepath is not None:
target_handlers.append('logwarn')
# define handler
#
if target == 'stdout':
handler = dict(main={
'level': 'DEBUG',
'formatter': 'format',
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout',
})
elif target == 'logfile':
handler = dict(main={
'level': 'DEBUG',
'formatter': 'format',
'class': 'logging.handlers.RotatingFileHandler',
'filename': os.path.join(basepath, 'messages.log'),
'mode': 'a',
'maxBytes': 10485760,
'backupCount': 7
})
else:
handler = dict(my_handler={
'level': 'DEBUG',
'formatter': 'my_format',
'class': 'logging.NullHandler',
})
if ring_logs is not None:
target_handlers.append('ring')
handler['ring'] = {
'class': 'report.collectingRingHandler',
'max_logs': ring_logs,
}
if basepath is not None:
handler['logwarn'] = {
'level': 'WARNING',
'formatter': 'long',
'class': 'logging.handlers.RotatingFileHandler',
'filename': os.path.join(basepath, 'messages.warn'),
'mode': 'a',
'maxBytes': 10485760,
'backupCount': 2
}
# define loggers
#
loggers = {}
for name, lvl in log_name_lvl:
loggers[name] = {
'handlers': target_handlers,
'level': lvl,
'propagate': False
}
# configure logging
#
dictConfig(dict(
version=1,
formatters={
'long': {
'format': LONG_FMT
},
'format': {
'format': fmt,
},
},
handlers=handler,
loggers=loggers,
))
def stdoutLoggingConfigure(log_name_lvl=[], fmt=SHORT_FMT):
appLoggingConfigure(None, 'stdout', log_name_lvl=log_name_lvl, fmt=fmt)
class testSession(dict):
KEY_NAME = 'name'
KEY_FAILED_TESTS = 'number_of_failed_tests'
KEY_POSSIBLY_FAILED_TESTS = 'number_of_possibly_failed_tests'
KEY_SUCCESS_TESTS = 'number_of_successfull_tests'
KEY_ALL_TESTS = 'number_of_tests'
KEY_EXEC_LVL = 'testcase_execution_level'
KEY_EXEC_NAMES = 'testcase_names'
KEY_LVL_NAMES = 'level_names'
KEY_TESTCASELIST = 'testcases'
KEY_UID_LIST = 'uid_list_sorted'
#
DEFAULT_BASE_DATA = {
KEY_NAME: 'Default Testsession name',
KEY_FAILED_TESTS: 0,
KEY_POSSIBLY_FAILED_TESTS: 0,
KEY_FAILED_TESTS: 0,
KEY_SUCCESS_TESTS: 0,
KEY_ALL_TESTS: 0,
KEY_EXEC_LVL: TCEL_FULL,
KEY_EXEC_NAMES: TCEL_NAMES,
}
def __init__(self, module_names=[], **kwargs):
dict.__init__(self, time_consumption=0.)
self.__testcase__ = None
self.__set_base_data__(**kwargs)
self.__configure_logging__(module_names)
def __set_base_data__(self, **kwargs):
for key in set([key for key in self.DEFAULT_BASE_DATA.keys()] + [key for key in kwargs.keys()]):
self[key] = kwargs.get(key, self.DEFAULT_BASE_DATA.get(key))
self[self.KEY_TESTCASELIST] = {}
self[self.KEY_UID_LIST] = []
def __configure_logging__(self, module_names):
#
# Configure logging for testSession
#
logging_config = dict(
version=1,
formatters={
'short': {
'format': SHORT_FMT,
},
'long': {
'format': LONG_FMT,
},
},
handlers={
'console': {
'level': 'DEBUG',
'class': 'logging.NullHandler',
'formatter': 'short',
},
'module_logs': {
'level': 'DEBUG',
'class': 'report.collectingHandler',
'formatter': 'short',
},
'testcase_logs': {
'level': 'DEBUG',
'class': 'report.collectingTestcaseHandler',
'formatter': 'short',
},
},
loggers=self.__module_loggers__(module_names),
)
dictConfig(logging_config)
def __module_loggers__(self, module_names):
rv = {}
rv['__tLogger__'] = dict(handlers=['console', 'testcase_logs'], level='DEBUG', propagate=False)
for name in module_names + ['__mLogger__']:
rv[name] = dict(handlers=['console', 'module_logs'], level='DEBUG', propagate=False)
return rv
def testCase(self, name, testcase_execution_level, test_method, *args, **kwargs):
if testcase_execution_level <= self[self.KEY_EXEC_LVL]:
tLogger = logging.getLogger('__tLogger__')
tHandler = collectingTestcaseHandler()
if len(tHandler.MY_LOGS) > 0:
raise AttributeError("Testcaselogger shall be empty after closing testcase!")
tLogger._log(logging.DEBUG, name, None)
if len(tHandler.MY_LOGS) != 1:
raise AttributeError("Testcaselogger shall have only one entry for the main testcase (temporary)!")
self.__testcase__ = tHandler.get_logs()[0]
test_method(logging.getLogger('__tLogger__'), *args, **kwargs)
self.__close_active_testcase__()
def __close_active_testcase__(self):
if self.__testcase__ is not None:
name = self.__testcase__.get('message')
#
# Add testcase
#
tch = collectingTestcaseHandler()
self.__testcase__['testcaseLogger'] = tch.get_logs()
if name in self[self.KEY_TESTCASELIST]:
raise AttributeError("Testcase named %s already exists" % name)
self[self.KEY_TESTCASELIST][name] = self.__testcase__
self[self.KEY_UID_LIST].append(name)
#
# Adapt testcase data
#
self[self.KEY_TESTCASELIST][name]['levelno'] = 0
self[self.KEY_TESTCASELIST][name]['time_consumption'] = 0.
for teststep in self[self.KEY_TESTCASELIST][name]['testcaseLogger']:
# store maximum level to testcase
if teststep.get('levelno') > self[self.KEY_TESTCASELIST][name]['levelno']:
self[self.KEY_TESTCASELIST][name]['levelno'] = teststep.get('levelno')
self[self.KEY_TESTCASELIST][name]['levelname'] = teststep.get('levelname')
# store time_consumption for teststep
try:
teststep['time_consumption'] = teststep['created'] - teststep['moduleLogger'][-1]['created']
except IndexError:
teststep['time_consumption'] = 0.
# Increment testcase time_comsumption
# Increment testcase counters
#
self[self.KEY_ALL_TESTS] += 1
if self[self.KEY_TESTCASELIST][name]['levelno'] <= logging.INFO:
self[self.KEY_SUCCESS_TESTS] += 1
elif self[self.KEY_TESTCASELIST][name]['levelno'] >= logging.ERROR:
self[self.KEY_FAILED_TESTS] += 1
else:
self[self.KEY_POSSIBLY_FAILED_TESTS] += 1
# Set testcase time and time_consumption
self[self.KEY_TESTCASELIST][name]['time_start'] = self.__testcase__['asctime']
self[self.KEY_TESTCASELIST][name]['time_finished'] = teststep['asctime']
self[self.KEY_TESTCASELIST][name]['time_consumption'] = teststep['created'] - self.__testcase__['created']
# Set testcase time consumption
self['time_consumption'] += self[self.KEY_TESTCASELIST][name]['time_consumption']
self.__testcase__ = None