#!/usr/bin/env python # -*- coding: utf-8 -*- # import fstools import report import reqif import json import os import sys import platform try: from platform import dist as dist except ImportError: from distro import linux_distribution as dist import getpass import subprocess import imp import xml.dom.minidom try: import jinja2 except ImportError: jinja2 = None import shutil HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' ARG_CLEAN = 'clean' ARG_RUN = 'run' ARG_FINALISE = 'finalise' ARG_PDF = 'pdf' ARG_STATUS = 'status' ARG_COPY = 'copy' ARG_RELEASE = 'release' UNITTEST_KEY_SYSTEM_INFO = 'system_information' UNITTEST_KEY_UNITTEST_INFO = 'unittest_information' UNITTEST_KEY_TESTOBJECT_INFO = 'testobject_information' UNITTEST_KEY_TESTRUNS = 'testrun_list' UNITTEST_KEY_COVERAGE_INFO = 'coverage_information' UNITTEST_KEY_SPECIFICATION = 'specification' FILES = { 'data-collection': 'unittest.json', 'tex-report': 'unittest.tex', 'coverage-xml': 'coverage.xml' } REPORT_FILES = [FILES['data-collection'], FILES['coverage-xml'], 'unittest.pdf'] class coverage_info(list): KEY_NAME = 'name' KEY_FILEPATH = 'filepath' KEY_LINE_COVERAGE = 'line_coverage' KEY_BRANCH_COVERAGE = 'branch_coverage' KEY_FILES = 'files' KEY_FRAGMENTS = 'fragments' KEY_START_LINE = 'start' KEY_END_LINE = 'end' KEY_COVERAGE_STATE = 'coverage_state' COVERED = 'covered' UNCOVERED = 'uncovered' CLEAN = 'clean' PARTIALLY_COVERED = 'partially-covered' def __init__(self, xml_filename, module_basepath): list.__init__(self) xmldoc = xml.dom.minidom.parse(xml_filename) itemlist = xmldoc.getElementsByTagName('package') for p in itemlist: module = {} module[self.KEY_NAME] = p.attributes['name'].value[len(module_basepath) + 1:] module[self.KEY_FILEPATH] = p.attributes['name'].value.replace('.', os.path.sep) module[self.KEY_LINE_COVERAGE] = float(p.attributes['line-rate'].value) * 100. try: module[self.KEY_BRANCH_COVERAGE] = float(p.attributes['branch-rate'].value) * 100. except AttributeError: module[self.KEY_BRANCH_COVERAGE] = None module[self.KEY_FILES] = [] for c in p.getElementsByTagName('class'): f = {} f[self.KEY_NAME] = c.attributes['filename'].value[len(module_basepath) + 1:].replace(os.path.sep, '.') f[self.KEY_FILEPATH] = c.attributes['filename'].value f[self.KEY_LINE_COVERAGE] = float(c.attributes['line-rate'].value) * 100. try: f[self.KEY_BRANCH_COVERAGE] = float(p.attributes['branch-rate'].value) * 100. except: f[self.KEY_BRANCH_COVERAGE] = None f[self.KEY_FRAGMENTS] = [] last_hit = None start_line = 1 end_line = 1 for line in c.getElementsByTagName('line'): line_no = int(line.attributes['number'].value) hit = bool(int(line.attributes['hits'].value)) if hit: cc = line.attributes.get('condition-coverage') if cc is not None and not cc.value.startswith('100%'): hit = self.PARTIALLY_COVERED else: hit = self.COVERED else: hit = self.UNCOVERED if line_no == 1: last_hit = hit elif last_hit != hit or line_no > end_line + 1: if last_hit is not None: line = {} line[self.KEY_START_LINE] = start_line line[self.KEY_END_LINE] = end_line line[self.KEY_COVERAGE_STATE] = last_hit f[self.KEY_FRAGMENTS].append(line) if line_no > end_line + 1: line = {} if last_hit is not None: line[self.KEY_START_LINE] = end_line + 1 else: line[self.KEY_START_LINE] = start_line line[self.KEY_END_LINE] = line_no - 1 line[self.KEY_COVERAGE_STATE] = self.CLEAN f[self.KEY_FRAGMENTS].append(line) start_line = line_no end_line = line_no last_hit = hit elif line_no == end_line + 1: end_line = line_no if last_hit is not None: line = {} line[self.KEY_START_LINE] = start_line line[self.KEY_END_LINE] = end_line line[self.KEY_COVERAGE_STATE] = last_hit f[self.KEY_FRAGMENTS].append(line) line = {} if last_hit is not None: line[self.KEY_START_LINE] = end_line + 1 else: line[self.KEY_START_LINE] = start_line line[self.KEY_END_LINE] = None line[self.KEY_COVERAGE_STATE] = self.CLEAN f[self.KEY_FRAGMENTS].append(line) module[self.KEY_FILES].append(f) self.append(module) def __str__(self): rv = '' for module in self: rv += '%s (%.1f%% - %s)\n' % (module.get(self.KEY_NAME), module.get(self.KEY_LINE_COVERAGE), module.get(self.KEY_FILEPATH)) for py_file in module.get(self.KEY_FILES): rv += ' %s (%.1f%% - %s)\n' % (py_file.get(self.KEY_NAME), py_file.get(self.KEY_LINE_COVERAGE), py_file.get(self.KEY_FILEPATH)) for fragment in py_file.get(self.KEY_FRAGMENTS): if fragment.get(self.KEY_END_LINE) is not None: rv += ' %d - %d: %s\n' % (fragment.get(self.KEY_START_LINE), fragment.get(self.KEY_END_LINE), repr(fragment.get(self.KEY_COVERAGE_STATE))) else: rv += ' %d - : %s\n' % (fragment.get(self.KEY_START_LINE), repr(fragment.get(self.KEY_COVERAGE_STATE))) return rv def unittest_filename(base_folder, filename): return os.path.join(base_folder, 'testresults', filename) def print_header(txt, color=BOLD + WARNING): print(color + txt + ENDC) def print_action(txt, color=BOLD): print(color + ' * ' + txt + ENDC) def print_info(txt, color=ENDC): print(' ' + color + txt + ENDC) def remove_file(filename): if os.path.exists(filename) and not filename.endswith('.gitkeep'): try: print_info('Removing %s' % filename) os.remove(filename) except OSError: pass def module_uid(path): return fstools.uid_filelist(path, '*.py', rekursive=True) def unittest(options, args, unittest_folder): if ARG_CLEAN in args: unittest_init(unittest_folder) elif ARG_RUN in args: unittest_run(unittest_folder, options) elif ARG_FINALISE in args: unittest_finalise(unittest_folder) elif ARG_PDF in args: unittest_pdf(unittest_folder) elif ARG_STATUS in args: unittest_status(unittest_folder) elif ARG_COPY in args: unittest_copy(unittest_folder) elif ARG_RELEASE in args: unittest_release(unittest_folder) def unittest_init(unittest_folder): config = imp.load_source('', os.path.join(unittest_folder, 'src', 'config.py')) # print_header("Initiating unittest for first testrun...") if not os.path.exists(unittest_filename(unittest_folder, '')): print_action('Creating outpout folder %s' % unittest_filename(unittest_folder, '')) fstools.mkdir(unittest_filename(unittest_folder, '')) # print_action('Cleaning up data from last testrun') for fn in os.listdir(unittest_filename(unittest_folder, '')): remove_file(unittest_filename(unittest_folder, fn)) remove_file(unittest_filename(unittest_folder, FILES['coverage-xml'])) # print_action('Creating unittest data-collection: %s' % unittest_filename(unittest_folder, FILES['data-collection'])) # system_info = {} system_info['Architecture'] = platform.architecture()[0] system_info['Machine'] = platform.machine() system_info['Hostname'] = platform.node() system_info['Distribution'] = ' '.join(dist()) system_info['System'] = platform.system() system_info['Kernel'] = platform.release() + ' (%s)' % platform.version() system_info['Username'] = getpass.getuser() system_info['Path'] = unittest_folder # unittest_info = {} unittest_info['Version'] = module_uid(os.path.join(unittest_folder, 'src', 'tests')) # testobject_info = {} testobject_info['Name'] = config.lib.__name__ testobject_info['Version'] = module_uid(config.lib.__path__[0]) testobject_info['Description'] = config.lib.__DESCRIPTION__ testobject_info['Supported Interpreters'] = ', '.join(['python%d' % vers for vers in config.lib.__INTERPRETER__]) testobject_info['State'] = 'Released' if config.release_unittest_version == module_uid(os.path.join(unittest_folder, 'src', 'tests')) else 'In development' testobject_info['Dependencies'] = [] for dependency in config.lib.__DEPENDENCIES__: testobject_info['Dependencies'].append((dependency, module_uid(os.path.join(unittest_folder, 'src', dependency)))) # spec_filename = os.path.join(unittest_folder, '..', 'requirements', 'specification.reqif') print_action("Adding Requirement Specification from %s" % spec_filename) try: spec = reqif.reqif_dict(spec_filename, 'Heading', 'Software Specification') except FileNotFoundError: print_info('FAILED', FAIL) spec = {} else: print_info('SUCCESS', OKGREEN) # data_collection = { UNITTEST_KEY_SYSTEM_INFO: system_info, UNITTEST_KEY_UNITTEST_INFO: unittest_info, UNITTEST_KEY_TESTOBJECT_INFO: testobject_info, UNITTEST_KEY_SPECIFICATION: spec, UNITTEST_KEY_TESTRUNS: [], } with open(unittest_filename(unittest_folder, FILES['data-collection']), 'w') as fh: fh.write(json.dumps(data_collection, indent=4, sort_keys=True)) def unittest_run(unittest_folder, options): tests = imp.load_source('', os.path.join(unittest_folder, 'src', 'tests', '__init__.py')) config = imp.load_source('', os.path.join(unittest_folder, 'src', 'config.py')) # interpreter_version = 'python ' + '.'.join(['%d' % n for n in sys.version_info[:3]]) + ' (%s)' % sys.version_info[3] # execution_level = report.TCEL_REVERSE_NAMED.get(options.execution_level, report.TCEL_FULL) # if sys.version_info.major in config.lib.__INTERPRETER__: print_header("Running \"%s\" Unittest with %s" % (options.execution_level, interpreter_version)) print_action('Loading Testrun data from %s' % unittest_filename(unittest_folder, FILES['data-collection'])) with open(unittest_filename(unittest_folder, FILES['data-collection']), 'r') as fh: data_collection = json.loads(fh.read()) print_action('Executing Testcases') heading_dict = {} for key in data_collection[UNITTEST_KEY_SPECIFICATION].get('item_dict', {}): heading_dict[key] = data_collection[UNITTEST_KEY_SPECIFICATION]['item_dict'][key]['Heading'] test_session = report.testSession( ['__unittest__', config.lib.logger_name] + config.additional_loggers_to_catch, interpreter=interpreter_version, testcase_execution_level=execution_level, testrun_id='p%d' % sys.version_info[0], heading_dict=heading_dict ) tests.testrun(test_session) # print_action('Adding Testrun data to %s' % unittest_filename(unittest_folder, FILES['data-collection'])) data_collection[UNITTEST_KEY_TESTRUNS].append(test_session) with open(unittest_filename(unittest_folder, FILES['data-collection']), 'w') as fh: fh.write(json.dumps(data_collection, indent=4, sort_keys=True)) else: print_header("Library does not support %s." % interpreter_version) def unittest_finalise(unittest_folder): config = imp.load_source('', os.path.join(unittest_folder, 'src', 'config.py')) # print_action('Adding Testrun data to %s' % unittest_filename(unittest_folder, FILES['data-collection'])) with open(unittest_filename(unittest_folder, FILES['data-collection']), 'r') as fh: data_collection = json.loads(fh.read()) # print_header("Adding Requirement information") # data_collection['lost_souls'] = {} # print_action("Adding Lost Requirement Soul") data_collection['lost_souls']['item_list'] = [] for req_id in data_collection['specification'].get('item_dict', {}): item = data_collection['specification']['item_dict'][req_id] if item['system_type_uid'] == '_MR7eNHYYEem_kd-7nxt1sg': testcase_available = False for testrun in data_collection['testrun_list']: if req_id in testrun['testcases']: testcase_available = True break if not testcase_available: data_collection['lost_souls']['item_list'].append(req_id) print_info('%s - "%s" has no corresponding testcase' % (item['system_uid'], item['Heading']), FAIL) # print_action("Adding Lost Testcase Soul") data_collection['lost_souls']['testcase_list'] = [] for testrun in data_collection['testrun_list']: for tc_id in testrun.get('testcases', {}): if tc_id not in data_collection['specification'].get('item_dict', {}) and tc_id not in data_collection['lost_souls']['testcase_list']: data_collection['lost_souls']['testcase_list'].append(tc_id) print_info('"%s" has no corresponding testcase' % tc_id, FAIL) # print_header("Adding Coverage information") print_action('Adding Coverage Information to %s' % unittest_filename(unittest_folder, FILES['data-collection'])) data_collection[UNITTEST_KEY_COVERAGE_INFO] = coverage_info(unittest_filename(unittest_folder, 'coverage.xml'), os.path.dirname(config.lib_path)) with open(unittest_filename(unittest_folder, FILES['data-collection']), 'w') as fh: fh.write(json.dumps(data_collection, indent=4, sort_keys=True)) def unittest_pdf(unittest_folder): print_header("Creating PDF-Report of Unittest") print_action('Loading Testrun data from %s' % unittest_filename(unittest_folder, FILES['data-collection'])) with open(unittest_filename(unittest_folder, FILES['data-collection']), 'r') as fh: data_collection = json.loads(fh.read()) if jinja2 is None: print_action('You need to install jinja2 to create a PDF-Report!', FAIL) else: fn = unittest_filename(unittest_folder, FILES['tex-report']) print_action('Creating LaTeX-File %s' % fn) with open(fn, 'w') as fh: # template_path = os.path.join(os.path.dirname(__file__), 'templates') template_filename = 'unittest.tex' jenv = jinja2.Environment(loader=jinja2.FileSystemLoader(template_path)) template = jenv.get_template(template_filename) fh.write(template.render(data=data_collection)) print_action('Creating PDF %s' % unittest_filename(unittest_folder, 'unittest.pdf')) for i in range(3): sys.stdout.write(' Starting run %d/3 of pdflatex... ' % (i + 1)) sys.stdout.flush() exit_value = os.system("pdflatex -interaction nonstopmode --output-directory %(path)s %(path)s/unittest.tex 1> /dev/null" % {'path': unittest_filename(unittest_folder, '')}) if exit_value != 0: print(FAIL + 'FAILED' + ENDC) break else: print(OKGREEN + 'SUCCESS' + ENDC) def unittest_status(unittest_folder): config = imp.load_source('', os.path.join(unittest_folder, 'src', 'config.py')) # print_header('Checking status of all submodules') print_action('Updating all submodules (fetch)') process = subprocess.Popen("LANGUAGE='en_US.UTF-8 git' git submodule foreach git fetch", cwd=os.path.dirname(unittest_folder), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stderroutput = process.communicate()[1] if stderroutput == b'': print_info('SUCCESS', color=OKGREEN) else: print_info('FAILED', color=FAIL) print_action('Checking status...') process = subprocess.Popen("LANGUAGE='en_US.UTF-8 git' git submodule foreach git status", cwd=os.path.dirname(unittest_folder), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdoutput, stderroutput = process.communicate() if stderroutput == b'': module = None data = {} for line in stdoutput.splitlines(): line = str(line) if 'Entering' in line: m = line[line.index("'") + 1:] m = str(m[:m.index("'")]) if m != module: data[m] = '' module = m else: data[m] += line for key in data: if "working tree clean" not in data[key] and "working directory clean" not in data[key]: data[key] = ("local changes", WARNING) elif "Your branch is behind" in data[key]: data[key] = ("no up to date (try git pull)", FAIL) elif "HEAD detached at" in data[key]: data[key] = ("no up to date (try git checkout master)", FAIL) elif "Your branch is ahead of" in data[key]: data[key] = ("push required", WARNING) elif "nothing to commit" in data[key]: data[key] = ("clean", OKGREEN) else: data[key] = ("unknown", FAIL) print_info('Submodule %s... %s' % (key, data[key][1] + data[key][0])) else: print_info('FAILED', color=FAIL) # print_header('Checking status of unittest and testresults in the library') print_action('Loading Testrun data from %s' % unittest_filename(unittest_folder, FILES['data-collection'])) with open(unittest_filename(unittest_folder, FILES['data-collection']), 'r') as fh: data_collection = json.loads(fh.read()) print_action('Checking release state of this testrun... ') if data_collection['testobject_information']['State'] != 'Released': print_info("FAILED", FAIL) else: print_info("SUCCESS", OKGREEN) # print_action('Checking up to dateness of testrults in library...') try: with open(os.path.join(unittest_folder, '..', 'pylibs', config.lib.__name__, '_testresults_', FILES['data-collection']), 'r') as fh: lib_result = json.loads(fh.read()) except FileNotFoundError: print_info("FAILED: Testresults not in library", FAIL) else: if data_collection['testobject_information'] != lib_result['testobject_information'] or data_collection['unittest_information'] != lib_result['unittest_information']: print_info("FAILED", FAIL) else: print_info("SUCCESS", OKGREEN) def unittest_copy(unittest_folder): config = imp.load_source('', os.path.join(unittest_folder, 'src', 'config.py')) # print_header('Copy unittest files to library') target_folder = os.path.join(config.lib_path, '_testresults_') print_action('Copying Unittest Files to %s' % target_folder) if not os.path.exists(target_folder): print_info('Creating folder %s' % target_folder) fstools.mkdir(target_folder) else: for fn in os.listdir(target_folder): remove_file(os.path.join(target_folder, fn)) for fn in REPORT_FILES: src = unittest_filename(unittest_folder, fn) dst = os.path.join(target_folder, fn) print_info('copying %s -> %s' % (src, dst)) shutil.copyfile(src, dst) def unittest_release(unittest_folder): unittest_uid = module_uid(os.path.join(unittest_folder, 'src', 'tests')) config_file = os.path.join(unittest_folder, 'src', 'config.py') print_header('Releasing unittest') with open(config_file, 'r') as fh: conf_file = fh.read() print_action('Setting release_unittest_version = %s in %s' % (unittest_uid, config_file)) with open(config_file, 'w') as fh: for line in conf_file.splitlines(): if line.startswith('release_unittest_version'): fh.write("release_unittest_version = '%s'\n" % unittest_uid) else: fh.write(line + '\n')