2023-02-09 08:03:43 +01:00
|
|
|
import distro
|
|
|
|
import getpass
|
|
|
|
import inspect
|
|
|
|
import jinja2
|
|
|
|
import json
|
|
|
|
import os
|
|
|
|
import platform
|
|
|
|
import report
|
|
|
|
from tests import test_collection
|
|
|
|
from tests.heating import testcase_heating
|
|
|
|
from tests.light import testcase_light
|
|
|
|
from tests.synchronisation import testcase_synchronisation
|
|
|
|
from unittest import jsonlog
|
|
|
|
|
|
|
|
try:
|
|
|
|
from config import APP_NAME as ROOT_LOGGER_NAME
|
|
|
|
except ImportError:
|
|
|
|
ROOT_LOGGER_NAME = 'root'
|
|
|
|
|
|
|
|
|
|
|
|
class test_smarthome(object):
|
2023-02-09 16:01:04 +01:00
|
|
|
def __init__(self, rooms, mqtt_client):
|
|
|
|
self.mqtt_client = mqtt_client
|
2023-02-09 08:03:43 +01:00
|
|
|
self.__init__tcl__()
|
2023-02-09 16:01:04 +01:00
|
|
|
self.mqtt_client.add_callback('__info__', self.__test_system_info__)
|
|
|
|
self.mqtt_client.send('__info__', json.dumps(None))
|
2023-02-09 08:03:43 +01:00
|
|
|
# add testcases for switching devices
|
|
|
|
for name in rooms.getmembers():
|
|
|
|
obj = rooms.getobjbyname(name)
|
|
|
|
if obj.__class__.__name__ == "videv_light":
|
|
|
|
common_name = '.'.join(name.split('.')[:-1]) + '.' + name.split('.')[-1][6:]
|
|
|
|
try:
|
|
|
|
li_device = rooms.getobjbyname(common_name + '_zigbee') if obj.enable_brightness or obj.enable_color_temp else None
|
|
|
|
except AttributeError:
|
|
|
|
li_device = rooms.getobjbyname(common_name + '_zigbee_1') if obj.enable_brightness or obj.enable_color_temp else None
|
|
|
|
try:
|
|
|
|
sw_device = rooms.getobjbyname(common_name) if obj.enable_state else None
|
|
|
|
except AttributeError:
|
|
|
|
# must be a device without switching device
|
|
|
|
sw_device = li_device
|
|
|
|
setattr(self, common_name.replace('.', '_'), testcase_light(self.tcl, obj, sw_device, li_device))
|
|
|
|
# add testcases for heating devices
|
|
|
|
for name in rooms.getmembers():
|
|
|
|
obj = rooms.getobjbyname(name)
|
|
|
|
if obj.__class__.__name__ == "videv_heating":
|
|
|
|
common_name = '.'.join(name.split('.')[:-1]) + '.' + name.split('.')[-1][6:]
|
|
|
|
heat_device = rooms.getobjbyname(common_name + '_valve')
|
|
|
|
setattr(self, common_name.replace('.', '_'), testcase_heating(self.tcl, obj, heat_device))
|
|
|
|
# synchronisation
|
|
|
|
# gfw.dirk.amplifier with cd_player
|
|
|
|
self.gfw_dirk_cd_player_amplifier_sync = testcase_synchronisation(
|
|
|
|
self.tcl,
|
|
|
|
rooms.gfw.dirk.cd_player, None, None,
|
|
|
|
rooms.gfw.dirk.amplifier)
|
|
|
|
# gfw.floor.main_light_2 with gfw.floor.main_light_1
|
|
|
|
self.gfw_floor_main_light_sync = testcase_synchronisation(
|
|
|
|
self.tcl,
|
|
|
|
rooms.gfw.floor.main_light, rooms.gfw.floor.videv_main_light, rooms.gfw.floor.videv_main_light,
|
|
|
|
rooms.gfw.floor.main_light_zigbee_1, rooms.gfw.floor.main_light_zigbee_2
|
|
|
|
)
|
|
|
|
# ffe.diningroom.floorlamp with ffe.dinigroom.main_light
|
|
|
|
self.ffe_diningroom_main_light_floor_lamp_sync = testcase_synchronisation(
|
|
|
|
self.tcl,
|
|
|
|
rooms.ffe.diningroom.main_light, None, None,
|
|
|
|
rooms.ffe.diningroom.floor_lamp)
|
|
|
|
# ffe.livingroom.floorlamp_[1-6] with ffe.livingroom.main_light
|
|
|
|
self.ffe_livingroom_main_light_floor_lamp_sync = testcase_synchronisation(
|
|
|
|
self.tcl,
|
|
|
|
rooms.ffe.livingroom.main_light, rooms.ffe.livingroom.videv_floor_lamp, rooms.ffe.livingroom.videv_floor_lamp,
|
|
|
|
*[getattr(rooms.ffe.livingroom, "floor_lamp_zigbee_%d" % i) for i in range(1, 7)]
|
|
|
|
)
|
|
|
|
# add test collection
|
|
|
|
self.all = test_collection(self)
|
|
|
|
|
|
|
|
def __init__tcl__(self):
|
|
|
|
system_info = {}
|
|
|
|
system_info[jsonlog.SYSI_ARCHITECTURE] = platform.architecture()[0]
|
|
|
|
system_info[jsonlog.SYSI_MACHINE] = platform.machine()
|
|
|
|
system_info[jsonlog.SYSI_HOSTNAME] = platform.node()
|
|
|
|
system_info[jsonlog.SYSI_DISTRIBUTION] = distro.name() + " " + distro.version() + " (" + distro.codename() + ")"
|
|
|
|
system_info[jsonlog.SYSI_SYSTEM] = platform.system()
|
|
|
|
system_info[jsonlog.SYSI_KERNEL] = platform.release() + ' (%s)' % platform.version()
|
|
|
|
system_info[jsonlog.SYSI_USERNAME] = getpass.getuser()
|
2023-02-09 10:03:34 +01:00
|
|
|
system_info[jsonlog.SYSI_PATH] = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
2023-02-09 08:03:43 +01:00
|
|
|
|
|
|
|
self.tcl = report.testSession(['__unittest__', 'unittest', ROOT_LOGGER_NAME])
|
|
|
|
self.tcl[jsonlog.MAIN_KEY_SYSTEM_INFO] = system_info
|
2023-02-09 10:03:34 +01:00
|
|
|
self.tcl["testcase_names"] = report.TCEL_NAMES
|
2023-02-09 08:03:43 +01:00
|
|
|
|
2023-02-09 16:01:04 +01:00
|
|
|
def __test_system_info__(self, client, userdata, message):
|
|
|
|
data = json.loads(message.payload)
|
|
|
|
if data is not None:
|
|
|
|
self.tcl[jsonlog.MAIN_KEY_TESTOBJECT_INFO] = data
|
|
|
|
|
|
|
|
def close(self):
|
2023-02-09 08:03:43 +01:00
|
|
|
path = os.path.abspath(os.path.join(os.path.basename(__file__), '..'))
|
|
|
|
|
|
|
|
with open(os.path.join(path, "testresults", "testrun.json"), "w") as fh:
|
|
|
|
fh.write(json.dumps(self.tcl, indent=4))
|
|
|
|
|
|
|
|
template_path = os.path.join(path, 'templates')
|
|
|
|
template_filename = 'unittest.tex'
|
|
|
|
jenv = jinja2.Environment(loader=jinja2.FileSystemLoader(template_path))
|
|
|
|
template = jenv.get_template(template_filename)
|
|
|
|
with open(os.path.join(path, "testresults", "testrun.tex"), "w") as fh:
|
|
|
|
fh.write(template.render(data=self.tcl, details=False))
|
|
|
|
with open(os.path.join(path, "testresults", "testrun_full.tex"), "w") as fh:
|
|
|
|
fh.write(template.render(data=self.tcl, details=True))
|
|
|
|
|
|
|
|
def getmembers(self, prefix=''):
|
|
|
|
rv = []
|
|
|
|
for name, obj in inspect.getmembers(self):
|
|
|
|
if prefix:
|
|
|
|
full_name = prefix + '.' + name
|
|
|
|
else:
|
|
|
|
full_name = name
|
|
|
|
if not name.startswith('_'):
|
|
|
|
try:
|
|
|
|
if obj.__class__.__bases__[0].__name__ == "testcase" or obj.__class__.__name__ == "test_collection":
|
|
|
|
rv.append(full_name)
|
|
|
|
else:
|
|
|
|
rv.extend(obj.getmembers(full_name))
|
|
|
|
except AttributeError:
|
|
|
|
pass
|
|
|
|
return rv
|
|
|
|
|
|
|
|
def getobjbyname(self, name):
|
|
|
|
if name.startswith("test."):
|
|
|
|
name = name[5:]
|
|
|
|
obj = self
|
|
|
|
for subname in name.split('.'):
|
|
|
|
obj = getattr(obj, subname)
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def command(self, full_command):
|
|
|
|
try:
|
|
|
|
parameter = " " + full_command.split(' ')[1]
|
|
|
|
except IndexError:
|
|
|
|
parameter = ""
|
|
|
|
command = full_command.split(' ')[0].split('.')[-1] + parameter
|
|
|
|
device_name = '.'.join(full_command.split(' ')[0].split('.')[:-1])
|
|
|
|
self.getobjbyname(device_name).command(command)
|