from module import mqtt_test_client, init_state, state_change_by_mqtt from devices import shelly_sw1 as test_device from devices.base import warning from mqtt import mqtt_client import pytest import time DUT_CLIENT_ID = "__%s__" % __name__ TOPIC = "__test__/%s" % __name__ # MQTT_SIGNAL_TIME = 0.2 ALL_STATE_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "temperature", "overtemperature"] BOOL_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"] @pytest.fixture def this_device(): mc = mqtt_client(DUT_CLIENT_ID, 'localhost') return test_device(mc, TOPIC) def test_initial_states(this_device: test_device): # test all initial values init_state(ALL_STATE_KEYS, this_device) def test_state_change_by_mqtt(this_device: test_device): def state_data(key): if key in BOOL_KEYS: return (True, False) elif key == "temperature": return (85.3, 20.1) else: raise IndexError("No return value defined for key %s" % key) def mqtt_data(key): if key in ["relay/0", "relay/1"]: return ('on', 'off') elif key in ["input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"]: return (1, 0) else: return state_data(key) def warning_condition(state_topic, value): return state_topic == "overtemperature" and value == 1 # test state changes tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device, mqtt_data, state_data, warning_condition, MQTT_SIGNAL_TIME) # test warning w: warning = this_device.get(this_device.KEY_WARNING) assert w.get(w.KEY_ID) == TOPIC assert w.get(w.KEY_TYPE) == w.TYPE_OVERTEMPERATURE wt = time.mktime(w.get(w.KEY_TM)) wt_min = tm_warning wt_max = tm_warning + 2 assert wt >= wt_min and wt <= wt_max def test_specific_get_functions(this_device: test_device): assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0) assert this_device.output_1 == this_device.get(this_device.KEY_OUTPUT_1) assert this_device.input_0 == this_device.get(this_device.KEY_INPUT_0) assert this_device.input_1 == this_device.get(this_device.KEY_INPUT_1) assert this_device.longpush_0 == this_device.get(this_device.KEY_LONGPUSH_0) assert this_device.longpush_1 == this_device.get(this_device.KEY_LONGPUSH_1) assert this_device.temperature == this_device.get(this_device.KEY_TEMPERATURE) def test_send_command(this_device: test_device): this_device.set_output_0(True) this_device.set_output_0(False) ''' class shelly(base): """ Communication (MQTT) shelly +- relay | +- 0 ["on" / "off"] <- status | | +- command ["on"/ "off"] <- command | | +- energy [numeric] <- status | +- 1 ["on" / "off"] <- status | +- command ["on"/ "off"] <- command | +- energy [numeric] <- status +- input | +- 0 [0 / 1] <- status | +- 1 [0 / 1] <- status +- input_event | +- 0 <- status | +- 1 <- status +- logpush | +- 0 [0 / 1] <- status | +- 1 [0 / 1] <- status +- temperature [numeric] °C <- status +- temperature_f [numeric] F <- status +- overtemperature [0 / 1] <- status +- id <- status +- model <- status +- mac <- status +- ip <- status +- new_fw <- status +- fw_ver <- status """ KEY_OUTPUT_0 = "relay/0" KEY_OUTPUT_1 = "relay/1" KEY_INPUT_0 = "input/0" KEY_INPUT_1 = "input/1" KEY_LONGPUSH_0 = "longpush/0" KEY_LONGPUSH_1 = "longpush/1" KEY_TEMPERATURE = "temperature" KEY_OVERTEMPERATURE = "overtemperature" KEY_ID = "id" KEY_MODEL = "model" KEY_MAC = "mac" KEY_IP = "ip" KEY_NEW_FIRMWARE = "new_fw" KEY_FIRMWARE_VERSION = "fw_ver" # TX_TOPIC = "command" TX_TYPE = base.TX_VALUE TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1] # RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE, KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION] RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1'] RX_IGNORE_KEYS = ['temperature_f'] RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # self.output_key_delayed = None self.delayed_flash_task = task.delayed(0.3, self.flash_task) self.delayed_off_task = task.delayed(0.3, self.off_task) # self.add_callback(self.KEY_OVERTEMPERATURE, True, self.__warning__, True) # self.all_off_requested = False def flash_task(self, *args): if self.flash_active: self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed)) self.output_key_delayed = None if self.all_off_requested: self.delayed_off_task.run() def off_task(self, *args): self.all_off() @property def flash_active(self): return self.output_key_delayed is not None # # WARNING CALL # def __warning__(self, client, key, data): w = warning(self.topic, warning.TYPE_OVERTEMPERATURE, "Temperature to high (%.1f°C)", self.get(self.KEY_TEMPERATURE) or math.nan) self.logger.warning(w) self.set(self.KEY_WARNING, w) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def output_1(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_1) @property def input_0(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_0) @property def input_1(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_1) @property def longpush_0(self): """rv: [True, False]""" return self.get(self.KEY_LONGPUSH_0) @property def longpush_1(self): """rv: [True, False]""" return self.get(self.KEY_LONGPUSH_1) @property def temperature(self): """rv: numeric value""" return self.get(self.KEY_TEMPERATURE) # # TX # def set_output_0(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data)) self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.logger.info("Toggeling output 0") self.set_output_0(not self.output_0) def set_output_1(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_1, state) def set_output_1_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data)) self.set_output_1(data) def toggle_output_1_mcb(self, device, key, data): self.logger.info("Toggeling output 1") self.set_output_1(not self.output_1) def flash_0_mcb(self, device, key, data): self.output_key_delayed = self.KEY_OUTPUT_0 self.toggle_output_0_mcb(device, key, data) self.delayed_flash_task.run() def flash_1_mcb(self, device, key, data): self.output_key_delayed = self.KEY_OUTPUT_1 self.toggle_output_1_mcb(device, key, data) self.delayed_flash_task.run() def all_off(self): if self.flash_active: self.all_off_requested = True else: if self.output_0: self.set_output_0(False) if self.output_1: self.set_output_1(False) '''