From 769dffe20d1fb4b44cc35b5c2b16faa35c4ecf25 Mon Sep 17 00:00:00 2001 From: Dirk Alders Date: Sun, 15 Oct 2023 08:10:32 +0200 Subject: [PATCH] initial testenvironment added --- .vscode/settings.json | 22 +- __test__/devices/module.py | 35 +++ __test__/devices/test_my_powerplug.py | 45 ++++ __test__/devices/test_shelly.py | 250 ++++++++++++++++++ .../devices/test_silvercrest_motion_sensor.py | 55 ++++ .../devices/test_silvercrest_powerplug.py | 49 ++++ .../function/modules/test_heating_function.py | 40 +++ __test__/requirements.txt | 2 + conftest.py | 0 9 files changed, 489 insertions(+), 9 deletions(-) create mode 100644 __test__/devices/module.py create mode 100644 __test__/devices/test_my_powerplug.py create mode 100644 __test__/devices/test_shelly.py create mode 100644 __test__/devices/test_silvercrest_motion_sensor.py create mode 100644 __test__/devices/test_silvercrest_powerplug.py create mode 100644 __test__/function/modules/test_heating_function.py create mode 100644 __test__/requirements.txt create mode 100644 conftest.py diff --git a/.vscode/settings.json b/.vscode/settings.json index aece2c1..211360c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,11 +1,15 @@ { - "python.defaultInterpreterPath": "./venv/bin/python", - "editor.formatOnSave": true, - "autopep8.args": [ - "--max-line-length=150" - ], - "editor.fontSize": 14, - "emmet.includeLanguages": { - "django-html": "html" - } + "python.defaultInterpreterPath": "./venv/bin/python", + "autopep8.args": ["--max-line-length=150"], + "python.formatting.provider": "none", + "[python]": { + "editor.defaultFormatter": "ms-python.python", + "editor.formatOnSave": true + }, + "editor.formatOnSave": true, + "editor.fontSize": 14, + "emmet.includeLanguages": { "django-html": "html" }, + "python.testing.pytestArgs": ["-v", "--cov", "--cov-report=xml", "__test__"], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true } diff --git a/__test__/devices/module.py b/__test__/devices/module.py new file mode 100644 index 0000000..6263c26 --- /dev/null +++ b/__test__/devices/module.py @@ -0,0 +1,35 @@ +import json +from mqtt import mqtt_client +import time + +TEST_CLIENT_ID = "__test_device_tester__" + + +mqtt_test_client = mqtt_client(TEST_CLIENT_ID, "localhost") + + +def init_state(all_state_keys, device): + for state_topic in all_state_keys: + assert device.get(state_topic, 0) == None + + +def state_change_by_mqtt(all_state_keys, num_states, mqtt_test_client, base_topic, device, mqtt_data, state_data, warning_condition, mqtt_signal_time): + tm_warning = None + + for i in range(num_states): + for state_topic in all_state_keys: + if device.TX_TYPE == device.TX_VALUE: + data = json.dumps(mqtt_data(state_topic)[i]) + mqtt_test_client.send(base_topic + '/' + state_topic, data) + elif device.TX_TYPE == device.TX_DICT: + mqtt_test_client.send(base_topic, json.dumps({state_topic: mqtt_data(state_topic)[i]})) + else: + raise TypeError("Unknown TX_TYPE for device.") + if callable(warning_condition): + if warning_condition(state_topic, mqtt_data(state_topic)[i]): + tm_warning = int(time.time()) + time.sleep(mqtt_signal_time) + for state_topic in all_state_keys: + assert device.get(state_topic) == state_data(state_topic)[i] + + return tm_warning diff --git a/__test__/devices/test_my_powerplug.py b/__test__/devices/test_my_powerplug.py new file mode 100644 index 0000000..e08dd94 --- /dev/null +++ b/__test__/devices/test_my_powerplug.py @@ -0,0 +1,45 @@ +from module import mqtt_test_client, init_state, state_change_by_mqtt +from devices import my_powerplug as test_device +from devices import warning +from mqtt import mqtt_client +import pytest +import time + +DUT_CLIENT_ID = "__%s__" % __name__ +TOPIC = "__test__/%s" % __name__ +# +MQTT_SIGNAL_TIME = 0.2 + + +ALL_STATE_KEYS = ["output/1", "output/2", "output/3", "output/4", ] +BOOL_KEYS = ALL_STATE_KEYS + + +@pytest.fixture +def this_device(): + mc = mqtt_client(DUT_CLIENT_ID, 'localhost') + return test_device(mc, TOPIC) + + +def test_initial_states(this_device: test_device): + # test all initial values + init_state(ALL_STATE_KEYS, this_device) + + +def test_state_change_by_mqtt(this_device: test_device): + def state_data(key): + return (True, False) + + def mqtt_data(key): + return state_data(key) + + # test state changes + tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device, + mqtt_data, state_data, None, MQTT_SIGNAL_TIME) + + +def test_specific_get_functions(this_device: test_device): + assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0) + assert this_device.output_1 == this_device.get(this_device.KEY_OUTPUT_1) + assert this_device.output_2 == this_device.get(this_device.KEY_OUTPUT_2) + assert this_device.output_3 == this_device.get(this_device.KEY_OUTPUT_3) diff --git a/__test__/devices/test_shelly.py b/__test__/devices/test_shelly.py new file mode 100644 index 0000000..e345ed3 --- /dev/null +++ b/__test__/devices/test_shelly.py @@ -0,0 +1,250 @@ +from module import mqtt_test_client, init_state, state_change_by_mqtt +from devices import shelly as test_device +from devices import warning +from mqtt import mqtt_client +import pytest +import time + +DUT_CLIENT_ID = "__%s__" % __name__ +TOPIC = "__test__/%s" % __name__ +# +MQTT_SIGNAL_TIME = 0.2 + + +ALL_STATE_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "temperature", "overtemperature"] +BOOL_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"] + + +@pytest.fixture +def this_device(): + mc = mqtt_client(DUT_CLIENT_ID, 'localhost') + return test_device(mc, TOPIC) + + +def test_initial_states(this_device: test_device): + # test all initial values + init_state(ALL_STATE_KEYS, this_device) + + +def test_state_change_by_mqtt(this_device: test_device): + def state_data(key): + if key in BOOL_KEYS: + return (True, False) + elif key == "temperature": + return (85.3, 20.1) + else: + raise IndexError("No return value defined for key %s" % key) + + def mqtt_data(key): + if key in ["relay/0", "relay/1"]: + return ('on', 'off') + elif key in ["input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"]: + return (1, 0) + else: + return state_data(key) + + def warning_condition(state_topic, value): + return state_topic == "overtemperature" and value == 1 + + # test state changes + tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device, + mqtt_data, state_data, warning_condition, MQTT_SIGNAL_TIME) + + # test warning + w: warning = this_device.get(this_device.KEY_WARNING) + assert w.get(w.KEY_ID) == TOPIC + assert w.get(w.KEY_TYPE) == w.TYPE_OVERTEMPERATURE + wt = time.mktime(w.get(w.KEY_TM)) + wt_min = tm_warning + wt_max = tm_warning + 2 + assert wt >= wt_min and wt <= wt_max + + +def test_specific_get_functions(this_device: test_device): + assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0) + assert this_device.output_1 == this_device.get(this_device.KEY_OUTPUT_1) + assert this_device.input_0 == this_device.get(this_device.KEY_INPUT_0) + assert this_device.input_1 == this_device.get(this_device.KEY_INPUT_1) + assert this_device.longpush_0 == this_device.get(this_device.KEY_LONGPUSH_0) + assert this_device.longpush_1 == this_device.get(this_device.KEY_LONGPUSH_1) + assert this_device.temperature == this_device.get(this_device.KEY_TEMPERATURE) + + +def test_send_command(this_device: test_device): + this_device.set_output_0(True) + this_device.set_output_0(False) + + +''' +class shelly(base): + """ Communication (MQTT) + + shelly + +- relay + | +- 0 ["on" / "off"] <- status + | | +- command ["on"/ "off"] <- command + | | +- energy [numeric] <- status + | +- 1 ["on" / "off"] <- status + | +- command ["on"/ "off"] <- command + | +- energy [numeric] <- status + +- input + | +- 0 [0 / 1] <- status + | +- 1 [0 / 1] <- status + +- input_event + | +- 0 <- status + | +- 1 <- status + +- logpush + | +- 0 [0 / 1] <- status + | +- 1 [0 / 1] <- status + +- temperature [numeric] °C <- status + +- temperature_f [numeric] F <- status + +- overtemperature [0 / 1] <- status + +- id <- status + +- model <- status + +- mac <- status + +- ip <- status + +- new_fw <- status + +- fw_ver <- status + """ + KEY_OUTPUT_0 = "relay/0" + KEY_OUTPUT_1 = "relay/1" + KEY_INPUT_0 = "input/0" + KEY_INPUT_1 = "input/1" + KEY_LONGPUSH_0 = "longpush/0" + KEY_LONGPUSH_1 = "longpush/1" + KEY_TEMPERATURE = "temperature" + KEY_OVERTEMPERATURE = "overtemperature" + KEY_ID = "id" + KEY_MODEL = "model" + KEY_MAC = "mac" + KEY_IP = "ip" + KEY_NEW_FIRMWARE = "new_fw" + KEY_FIRMWARE_VERSION = "fw_ver" + # + TX_TOPIC = "command" + TX_TYPE = base.TX_VALUE + TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1] + # + RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE, + KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION] + RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1'] + RX_IGNORE_KEYS = ['temperature_f'] + RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + # + self.output_key_delayed = None + self.delayed_flash_task = task.delayed(0.3, self.flash_task) + self.delayed_off_task = task.delayed(0.3, self.off_task) + # + self.add_callback(self.KEY_OVERTEMPERATURE, True, self.__warning__, True) + # + self.all_off_requested = False + + def flash_task(self, *args): + if self.flash_active: + self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed)) + self.output_key_delayed = None + if self.all_off_requested: + self.delayed_off_task.run() + + def off_task(self, *args): + self.all_off() + + @property + def flash_active(self): + return self.output_key_delayed is not None + + # + # WARNING CALL + # + def __warning__(self, client, key, data): + w = warning(self.topic, warning.TYPE_OVERTEMPERATURE, "Temperature to high (%.1f°C)", self.get(self.KEY_TEMPERATURE) or math.nan) + self.logger.warning(w) + self.set(self.KEY_WARNING, w) + + # + # RX + # + @property + def output_0(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_0) + + @property + def output_1(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_1) + + @property + def input_0(self): + """rv: [True, False]""" + return self.get(self.KEY_INPUT_0) + + @property + def input_1(self): + """rv: [True, False]""" + return self.get(self.KEY_INPUT_1) + + @property + def longpush_0(self): + """rv: [True, False]""" + return self.get(self.KEY_LONGPUSH_0) + + @property + def longpush_1(self): + """rv: [True, False]""" + return self.get(self.KEY_LONGPUSH_1) + + @property + def temperature(self): + """rv: numeric value""" + return self.get(self.KEY_TEMPERATURE) + + # + # TX + # + def set_output_0(self, state): + """state: [True, False]""" + self.send_command(self.KEY_OUTPUT_0, state) + + def set_output_0_mcb(self, device, key, data): + self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data)) + self.set_output_0(data) + + def toggle_output_0_mcb(self, device, key, data): + self.logger.info("Toggeling output 0") + self.set_output_0(not self.output_0) + + def set_output_1(self, state): + """state: [True, False]""" + self.send_command(self.KEY_OUTPUT_1, state) + + def set_output_1_mcb(self, device, key, data): + self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data)) + self.set_output_1(data) + + def toggle_output_1_mcb(self, device, key, data): + self.logger.info("Toggeling output 1") + self.set_output_1(not self.output_1) + + def flash_0_mcb(self, device, key, data): + self.output_key_delayed = self.KEY_OUTPUT_0 + self.toggle_output_0_mcb(device, key, data) + self.delayed_flash_task.run() + + def flash_1_mcb(self, device, key, data): + self.output_key_delayed = self.KEY_OUTPUT_1 + self.toggle_output_1_mcb(device, key, data) + self.delayed_flash_task.run() + + def all_off(self): + if self.flash_active: + self.all_off_requested = True + else: + if self.output_0: + self.set_output_0(False) + if self.output_1: + self.set_output_1(False) +''' diff --git a/__test__/devices/test_silvercrest_motion_sensor.py b/__test__/devices/test_silvercrest_motion_sensor.py new file mode 100644 index 0000000..4251d37 --- /dev/null +++ b/__test__/devices/test_silvercrest_motion_sensor.py @@ -0,0 +1,55 @@ +from module import mqtt_test_client, init_state, state_change_by_mqtt +from devices import silvercrest_motion_sensor as test_device +from devices import warning +from mqtt import mqtt_client +import pytest +import time + +DUT_CLIENT_ID = "__%s__" % __name__ +TOPIC = "__test__/%s" % __name__ +# +MQTT_SIGNAL_TIME = 0.2 + + +ALL_STATE_KEYS = ["battery", "battery_low", "linkquality", "occupancy", "tamper", "voltage"] +BOOL_KEYS = ["battery_low", "occupancy", "tamper"] + + +@pytest.fixture +def this_device(): + mc = mqtt_client(DUT_CLIENT_ID, 'localhost') + return test_device(mc, TOPIC) + + +def test_initial_states(this_device: test_device): + # test all initial values + init_state(ALL_STATE_KEYS, this_device) + + +def test_state_change_by_mqtt(this_device: test_device): + def state_data(key): + if key in BOOL_KEYS: + return (True, False) + elif key == "battery": + return (2, 87) + elif key == "linkquality": + return (1, 217) + elif key == "voltage": + return (1.17, 2.53) + else: + raise IndexError("No return value defined for key %s" % key) + + def mqtt_data(key): + return state_data(key) + + def warning_condition(state_topic, value): + return state_topic == "battery_low" and value is True + + # test state changes + tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device, + mqtt_data, state_data, None, MQTT_SIGNAL_TIME) + + +def test_specific_get_functions(this_device: test_device): + assert this_device.linkquality == this_device.get(this_device.KEY_LINKQUALITY) + assert this_device.battery == this_device.get(this_device.KEY_BATTERY) diff --git a/__test__/devices/test_silvercrest_powerplug.py b/__test__/devices/test_silvercrest_powerplug.py new file mode 100644 index 0000000..5925ea9 --- /dev/null +++ b/__test__/devices/test_silvercrest_powerplug.py @@ -0,0 +1,49 @@ +from module import mqtt_test_client, init_state, state_change_by_mqtt +from devices import silvercrest_powerplug as test_device +from devices import warning +from mqtt import mqtt_client +import pytest +import time + +DUT_CLIENT_ID = "__%s__" % __name__ +TOPIC = "__test__/%s" % __name__ +# +MQTT_SIGNAL_TIME = 0.2 + + +ALL_STATE_KEYS = ["state"] +BOOL_KEYS = ["state"] + + +@pytest.fixture +def this_device(): + mc = mqtt_client(DUT_CLIENT_ID, 'localhost') + return test_device(mc, TOPIC) + + +def test_initial_states(this_device: test_device): + # test all initial values + init_state(ALL_STATE_KEYS, this_device) + + +def test_state_change_by_mqtt(this_device: test_device): + def state_data(key): + if key in BOOL_KEYS: + return (True, False) + else: + raise IndexError("No return value defined for key %s" % key) + + def mqtt_data(key): + if key in BOOL_KEYS: + return ('ON', 'OFF') + else: + return state_data(key) + + # test state changes + tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device, + mqtt_data, state_data, None, MQTT_SIGNAL_TIME) + + +def test_specific_get_functions(this_device: test_device): + assert this_device.linkquality == this_device.get(this_device.KEY_LINKQUALITY) + assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0) diff --git a/__test__/function/modules/test_heating_function.py b/__test__/function/modules/test_heating_function.py new file mode 100644 index 0000000..f34aeee --- /dev/null +++ b/__test__/function/modules/test_heating_function.py @@ -0,0 +1,40 @@ +from function.modules import heating_function as test_class + +""" +config.DEFAULT_TEMPERATURE[heating_valve.topic], +db_data = get_radiator_data(heating_valve.topic) +**{ + test_class.KEY_USER_TEMPERATURE_SETPOINT: db_data[2], + test_class.KEY_TEMPERATURE_SETPOINT: db_data[3], + test_class.KEY_AWAY_MODE: db_data[0], + test_class.KEY_SUMMER_MODE: db_data[1], + }) +""" + + +def test_initial_states(): + + class heating_valve(object): + KEY_HEATING_SETPOINT = 'hsp' + KEY_TEMPERATURE = 'temp' + + def set_heating_setpoint(self, value): + pass + + def add_callback(self, key, value, callback): + pass + # + # + # + tc = test_class( + heating_valve(), + 21, **{ + test_class.KEY_USER_TEMPERATURE_SETPOINT: 22, + test_class.KEY_TEMPERATURE_SETPOINT: 17, + test_class.KEY_AWAY_MODE: True, + test_class.KEY_SUMMER_MODE: False, + }) + assert tc.get(test_class.KEY_USER_TEMPERATURE_SETPOINT) == 22 + assert tc.get(test_class.KEY_TEMPERATURE_SETPOINT) == 17 + assert tc.get(test_class.KEY_AWAY_MODE) == True + assert tc.get(test_class.KEY_SUMMER_MODE) == False diff --git a/__test__/requirements.txt b/__test__/requirements.txt new file mode 100644 index 0000000..9955dec --- /dev/null +++ b/__test__/requirements.txt @@ -0,0 +1,2 @@ +pytest +pytest-cov diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..e69de29