#!/usr/bin/env python # -*- coding: utf-8 -*- # import fstools from unittest import jsonlog from unittest import output import report import reqif import json import os import sys import platform try: from platform import dist as dist except ImportError: from distro import linux_distribution as dist import getpass import subprocess import imp import xml.dom.minidom try: import jinja2 except ImportError: jinja2 = None import shutil ARG_CLEAN = 'clean' ARG_RUN = 'run' ARG_FINALISE = 'finalise' ARG_TEX = 'tex' ARG_STATUS = 'status' ARG_COPY = 'copy' ARG_RELEASE = 'release' FN_DATA_COLLECTION = 'unittest.json' FN_TEX_REPORT = 'unittest.tex' FN_PDF_REPORT = 'unittest.pdf' FN_COVERAGE = 'coverage.xml' REPORT_FILES = [FN_DATA_COLLECTION, FN_COVERAGE, FN_PDF_REPORT] def testresults_filename(ut_folder, filename): return os.path.join(jsonlog.get_ut_testresult_folder(ut_folder), filename) def remove_file(filename): if os.path.exists(filename) and not filename.endswith('.gitkeep'): try: output.print_info('Removing %s' % filename) os.remove(filename) except OSError: pass class coverage_info(list): KEY_FRAGMENTS = 'fragments' KEY_START_LINE = 'start' KEY_END_LINE = 'end' KEY_COVERAGE_STATE = 'coverage_state' COVERED = 'covered' UNCOVERED = 'uncovered' CLEAN = 'clean' PARTIALLY_COVERED = 'partially-covered' def __init__(self, xml_filename, module_basepath): list.__init__(self) xmldoc = xml.dom.minidom.parse(xml_filename) itemlist = xmldoc.getElementsByTagName('package') for p in itemlist: module = {} module[jsonlog.COVI_KEY_NAME] = p.attributes['name'].value[len(module_basepath) + 1:] module[jsonlog.COVI_KEY_FILEPATH] = p.attributes['name'].value.replace('.', os.path.sep) module[jsonlog.COVI_KEY_LINE_COVERAGE] = float(p.attributes['line-rate'].value) * 100. try: module[jsonlog.COVI_KEY_BRANCH_COVERAGE] = float(p.attributes['branch-rate'].value) * 100. except AttributeError: module[jsonlog.COVI_KEY_BRANCH_COVERAGE] = None module[jsonlog.COVI_KEY_FILES] = [] for c in p.getElementsByTagName('class'): f = {} f[jsonlog.COVI_KEY_NAME] = c.attributes['filename'].value[len(module_basepath) + 1:].replace(os.path.sep, '.') f[jsonlog.COVI_KEY_FILEPATH] = c.attributes['filename'].value f[jsonlog.COVI_KEY_LINE_COVERAGE] = float(c.attributes['line-rate'].value) * 100. try: f[jsonlog.COVI_KEY_BRANCH_COVERAGE] = float(p.attributes['branch-rate'].value) * 100. except Exception: f[jsonlog.COVI_KEY_BRANCH_COVERAGE] = None f[self.KEY_FRAGMENTS] = [] last_hit = None start_line = 1 end_line = 1 for line in c.getElementsByTagName('line'): line_no = int(line.attributes['number'].value) hit = bool(int(line.attributes['hits'].value)) if hit: cc = line.attributes.get('condition-coverage') if cc is not None and not cc.value.startswith('100%'): hit = self.PARTIALLY_COVERED else: hit = self.COVERED else: hit = self.UNCOVERED if line_no == 1: last_hit = hit elif last_hit != hit or line_no > end_line + 1: if last_hit is not None: line = {} line[self.KEY_START_LINE] = start_line line[self.KEY_END_LINE] = end_line line[self.KEY_COVERAGE_STATE] = last_hit f[self.KEY_FRAGMENTS].append(line) if line_no > end_line + 1: line = {} if last_hit is not None: line[self.KEY_START_LINE] = end_line + 1 else: line[self.KEY_START_LINE] = start_line line[self.KEY_END_LINE] = line_no - 1 line[self.KEY_COVERAGE_STATE] = self.CLEAN f[self.KEY_FRAGMENTS].append(line) start_line = line_no end_line = line_no last_hit = hit elif line_no == end_line + 1: end_line = line_no if last_hit is not None: line = {} line[self.KEY_START_LINE] = start_line line[self.KEY_END_LINE] = end_line line[self.KEY_COVERAGE_STATE] = last_hit f[self.KEY_FRAGMENTS].append(line) line = {} if last_hit is not None: line[self.KEY_START_LINE] = end_line + 1 else: line[self.KEY_START_LINE] = start_line line[self.KEY_END_LINE] = None line[self.KEY_COVERAGE_STATE] = self.CLEAN f[self.KEY_FRAGMENTS].append(line) module[jsonlog.COVI_KEY_FILES].append(f) self.append(module) def __str__(self): rv = '' for module in self: rv += '%s (%.1f%% - %s)\n' % (module.get(jsonlog.COVI_KEY_NAME), module.get(jsonlog.COVI_KEY_LINE_COVERAGE), module.get(jsonlog.COVI_KEY_FILEPATH)) for py_file in module.get(jsonlog.COVI_KEY_FILES): rv += ' %s (%.1f%% - %s)\n' % (py_file.get(jsonlog.COVI_KEY_NAME), py_file.get(jsonlog.COVI_KEY_LINE_COVERAGE), py_file.get(jsonlog.COVI_KEY_FILEPATH)) for fragment in py_file.get(self.KEY_FRAGMENTS): if fragment.get(self.KEY_END_LINE) is not None: rv += ' %d - %d: %s\n' % (fragment.get(self.KEY_START_LINE), fragment.get(self.KEY_END_LINE), repr(fragment.get(self.KEY_COVERAGE_STATE))) else: rv += ' %d - : %s\n' % (fragment.get(self.KEY_START_LINE), repr(fragment.get(self.COVERAGE_STATE))) return rv def unittest(options, args, unittest_folder): if 'release_testcases' in args: unittest_release_testcases(unittest_folder) elif 'clean' in args: unittest_clean(unittest_folder) elif 'prepare' in args: unittest_prepare(unittest_folder) elif 'testrun' in args: unittest_testrun(unittest_folder, options) elif 'finalise' in args: unittest_finalise(unittest_folder) elif 'status' in args: unittest_status(unittest_folder) elif 'publish' in args: unittest_publish(unittest_folder) def unittest_release_testcases(ut_folder): unittest_uid = jsonlog.module_uid(jsonlog.get_ut_testcase_folder(ut_folder)) output.print_header('Releasing unittest') config_file = jsonlog.get_ut_config(ut_folder) with open(config_file, 'r') as fh: conf_file = fh.read() output.print_action('Setting release_unittest_version = %s in %s' % (unittest_uid, config_file)) with open(config_file, 'w') as fh: for line in conf_file.splitlines(): if line.startswith('release_unittest_version'): fh.write("release_unittest_version = '%s'\n" % unittest_uid) else: fh.write(line + '\n') def unittest_clean(ut_folder): output.print_header('Cleaning up...') output.print_action('Testresults from last testrun') for fn in os.listdir(testresults_filename(ut_folder, '')): remove_file(testresults_filename(ut_folder, fn)) def unittest_prepare(ut_folder): config = imp.load_source('', jsonlog.get_ut_config(ut_folder)) # output.print_header("Initiating unittest for first testrun...") if not os.path.exists(testresults_filename(ut_folder, '')): output.print_action('Creating outpout folder %s' % testresults_filename(ut_folder, '')) fstools.mkdir(testresults_filename(ut_folder, '')) # output.print_action('Creating unittest data-collection: %s' % testresults_filename(ut_folder, FN_DATA_COLLECTION)) # system_info = {} system_info[jsonlog.SYSI_ARCHITECTURE] = platform.architecture()[0] system_info[jsonlog.SYSI_MACHINE] = platform.machine() system_info[jsonlog.SYSI_HOSTNAME] = platform.node() system_info[jsonlog.SYSI_DISTRIBUTION] = ' '.join(dist()) system_info[jsonlog.SYSI_SYSTEM] = platform.system() system_info[jsonlog.SYSI_KERNEL] = platform.release() + ' (%s)' % platform.version() system_info[jsonlog.SYSI_USERNAME] = getpass.getuser() system_info[jsonlog.SYSI_PATH] = ut_folder # unittest_info = {} unittest_info[jsonlog.UTEI_VERSION] = jsonlog.module_uid(jsonlog.get_ut_testcase_folder(ut_folder)) # testobject_info = {} testobject_info[jsonlog.TOBI_NAME] = config.lib.__name__ testobject_info[jsonlog.TOBI_VERSION] = jsonlog.module_uid(config.lib.__path__[0]) testobject_info[jsonlog.TOBI_DESCRIPTION] = config.lib.__DESCRIPTION__ testobject_info[jsonlog.TOBI_SUPP_INTERP] = ', '.join(['python%d' % vers for vers in config.lib.__INTERPRETER__]) testobject_info[jsonlog.TOBI_STATE] = jsonlog.TOBI_STATE_RELESED if config.release_unittest_version == unittest_info[jsonlog.UTEI_VERSION] else jsonlog.TOBI_STATE_IN_DEVELOPMENT testobject_info[jsonlog.TOBI_DEPENDENCIES] = [] for dependency in config.lib.__DEPENDENCIES__: testobject_info[jsonlog.TOBI_DEPENDENCIES].append((dependency, jsonlog.module_uid(os.path.join(jsonlog.get_ut_src_folder(ut_folder), dependency)))) # spec_filename = os.path.join(ut_folder, 'requirements', 'specification.reqif') output.print_action("Adding Requirement Specification from %s" % spec_filename) try: spec = reqif.reqif_dict(spec_filename, 'Heading', 'Software Specification') except FileNotFoundError: output.print_info(output.STATUS_FAILED) spec = {} else: output.print_info(output.STATUS_SUCCESS) # data_collection = { jsonlog.MAIN_KEY_SYSTEM_INFO: system_info, jsonlog.MAIN_KEY_UNITTEST_INFO: unittest_info, jsonlog.MAIN_KEY_TESTOBJECT_INFO: testobject_info, jsonlog.MAIN_KEY_SPECIFICATION: spec, jsonlog.MAIN_KEY_TESTRUNS: [], } with open(testresults_filename(ut_folder, FN_DATA_COLLECTION), 'w') as fh: fh.write(json.dumps(data_collection, indent=4, sort_keys=True)) def unittest_testrun(ut_folder, options): tests = imp.load_source('', os.path.join(jsonlog.get_ut_testcase_folder(ut_folder), '__init__.py')) config = imp.load_source('', jsonlog.get_ut_config(ut_folder)) # interpreter_version = 'python ' + '.'.join(['%d' % n for n in sys.version_info[:3]]) + ' (%s)' % sys.version_info[3] # execution_level = report.TCEL_REVERSE_NAMED.get(options.execution_level, report.TCEL_FULL) # if sys.version_info.major in config.lib.__INTERPRETER__: output.print_header("Running \"%s\" Unittest with %s" % (options.execution_level, interpreter_version)) with open(testresults_filename(ut_folder, FN_DATA_COLLECTION), 'r') as fh: data_collection = json.loads(fh.read()) output.print_action('Executing Testcases') heading_dict = {} for key in data_collection[jsonlog.MAIN_KEY_SPECIFICATION].get(jsonlog.SPEC_ITEM_DICT, {}): heading_dict[key] = data_collection[jsonlog.MAIN_KEY_SPECIFICATION][jsonlog.SPEC_ITEM_DICT][key]['Heading'] test_session = report.testSession( ['__unittest__', 'root'], interpreter=interpreter_version, testcase_execution_level=execution_level, testrun_id='p%d' % sys.version_info[0], heading_dict=heading_dict ) tests.testrun(test_session) # output.print_action('Adding Testrun data to %s' % testresults_filename(ut_folder, FN_DATA_COLLECTION)) data_collection[jsonlog.MAIN_KEY_TESTRUNS].append(test_session) with open(testresults_filename(ut_folder, FN_DATA_COLLECTION), 'w') as fh: fh.write(json.dumps(data_collection, indent=4, sort_keys=True)) else: output.print_header("Library does not support %s." % interpreter_version) def unittest_finalise(ut_folder): config = imp.load_source('', jsonlog.get_ut_config(ut_folder)) # output.print_header("Adding Requirement information") # with open(testresults_filename(ut_folder, FN_DATA_COLLECTION), 'r') as fh: data_collection = json.loads(fh.read()) # data_collection[jsonlog.MAIN_KEY_LOST_SOULS] = {} # output.print_action("Adding Lost Requirement Soul") data_collection[jsonlog.MAIN_KEY_LOST_SOULS][jsonlog.LOST_ITEMLIST] = [] for req_id in data_collection[jsonlog.MAIN_KEY_SPECIFICATION].get(jsonlog.SPEC_ITEM_DICT, {}): item = data_collection[jsonlog.MAIN_KEY_SPECIFICATION][jsonlog.SPEC_ITEM_DICT][req_id] if item['system_type_uid'] == '_MR7eNHYYEem_kd-7nxt1sg': testcase_available = False for testrun in data_collection[jsonlog.MAIN_KEY_TESTRUNS]: if req_id in testrun[jsonlog.TRUN_TESTCASES]: testcase_available = True break if not testcase_available: data_collection[jsonlog.MAIN_KEY_LOST_SOULS][jsonlog.LOST_ITEMLIST].append(req_id) output.print_info('%s - "%s" has no corresponding testcase' % (item['system_uid'], item['Heading']), output.termcolors.FAIL) # output.print_action("Adding Lost Testcase Soul") data_collection[jsonlog.MAIN_KEY_LOST_SOULS][jsonlog.LOST_TESTCASELIST] = [] for testrun in data_collection[jsonlog.MAIN_KEY_TESTRUNS]: for tc_id in testrun.get(jsonlog.TRUN_TESTCASES, {}): if tc_id not in data_collection[jsonlog.MAIN_KEY_SPECIFICATION].get(jsonlog.SPEC_ITEM_DICT, {}) and tc_id not in data_collection[jsonlog.MAIN_KEY_LOST_SOULS][jsonlog.LOST_TESTCASELIST]: data_collection[jsonlog.MAIN_KEY_LOST_SOULS][jsonlog.LOST_TESTCASELIST].append(tc_id) output.print_info('"%s" has no corresponding testcase' % tc_id, output.termcolors.FAIL) # output.print_header("Adding Coverage information") output.print_action('Adding Coverage Information to %s' % testresults_filename(ut_folder, FN_DATA_COLLECTION)) data_collection[jsonlog.MAIN_KEY_COVERAGE_INFO] = coverage_info(testresults_filename(ut_folder, FN_COVERAGE), os.path.dirname(config.lib_path)) with open(testresults_filename(ut_folder, FN_DATA_COLLECTION), 'w') as fh: fh.write(json.dumps(data_collection, indent=4, sort_keys=True)) # output.print_header("Creating LaTeX-Report of Unittest") with open(testresults_filename(ut_folder, FN_DATA_COLLECTION), 'r') as fh: data_collection = json.loads(fh.read()) if jinja2 is None: output.print_action('You need to install jinja2 to create a LaTeX-Report!', output.termcolors.FAIL) else: fn = testresults_filename(ut_folder, FN_TEX_REPORT) output.print_action('Creating LaTeX-File %s' % fn) with open(fn, 'w') as fh: # template_path = os.path.join(os.path.dirname(__file__), 'templates') template_filename = 'unittest.tex' jenv = jinja2.Environment(loader=jinja2.FileSystemLoader(template_path)) template = jenv.get_template(template_filename) fh.write(template.render(data=data_collection)) def unittest_publish(ut_folder): config = imp.load_source('', jsonlog.get_ut_config(ut_folder)) # output.print_header('Copy unittest files to library') target_folder = os.path.join(config.lib_path, '_testresults_') output.print_action('Copying Unittest Files to %s' % target_folder) if not os.path.exists(target_folder): output.print_info('Creating folder %s' % target_folder) fstools.mkdir(target_folder) else: for fn in os.listdir(target_folder): remove_file(os.path.join(target_folder, fn)) for fn in REPORT_FILES: src = testresults_filename(ut_folder, fn) dst = os.path.join(target_folder, fn) output.print_info('copying %s -> %s' % (src, dst)) shutil.copyfile(src, dst) def unittest_status(ut_folder): # # GIT STATUS # output.print_header('Checking GIT repository status') # GIT FETCH output.print_action('Fetching repository from server...') process = subprocess.Popen("LANGUAGE='en_US.UTF-8 git' git submodule foreach git fetch", cwd=ut_folder, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stderroutput = process.communicate()[1] if stderroutput == b'': output.print_info(output.STATUS_SUCCESS) else: output.print_info(output.STATUS_FAILED) # GIT_REPO output.print_action('Analysing repository status...') output.print_info(jsonlog.status_git(ut_folder)) # SUBMODULES output.print_action('Analysing submodule status...') process = subprocess.Popen("LANGUAGE='en_US.UTF-8 git' git submodule foreach git status", cwd=ut_folder, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdoutput, stderroutput = process.communicate() if stderroutput == b'': module = None data = {} for line in stdoutput.splitlines(): line = str(line) if 'Entering' in line: m = line[line.index("'") + 1:] m = str(m[:m.index("'")]) if m != module: data[m] = '' module = m else: data[m] += line for key in data: if "working tree clean" not in data[key] and "working directory clean" not in data[key]: data[key] = ("LOCAL CHANGES", output.termcolors.WARNING) elif "Your branch is behind" in data[key]: data[key] = ("OUTDATED (try git pull)", output.termcolors.WARNING) elif "HEAD detached at" in data[key]: data[key] = ("OUTDATED (try git checkout master)", output.termcolors.WARNING) elif "Your branch is ahead of" in data[key]: data[key] = ("CHANGED (try git push)", output.termcolors.WARNING) elif "nothing to commit" in data[key]: data[key] = ("CLEAN", output.termcolors.OKGREEN) else: data[key] = ("UNKNOWN", output.termcolors.FAIL) output.print_info('Submodule %s... %s' % (key, data[key][1] + data[key][0])) else: output.print_info(output.STATUS_FAILED) # # TESTRUN STATUS # output.print_header('Checking status of unittest in the library') for txt, fcn in ( ('Checking release state... ', jsonlog.get_lib_release_state), ('Checking testcase integrity... ', jsonlog.get_lib_testcase_integrity), ('Checking source integrity... ', jsonlog.get_lib_src_integrity) ): output.print_action(txt) output.print_info(fcn(ut_folder)) output.print_action('Checking code coverage... ') output.print_coverage(*jsonlog.lib_coverage(ut_folder)) # output.print_header('Checking status of unittest for this testrun') for txt, fcn in ( ('Checking release state... ', jsonlog.get_ut_release_state), ('Checking testcase integrity... ', jsonlog.get_ut_testcase_integrity), ('Checking source integrity... ', jsonlog.get_ut_src_integrity) ): output.print_action(txt) output.print_info(fcn(ut_folder)) output.print_action('Checking code coverage... ') output.print_coverage(*jsonlog.ut_coverage(ut_folder))