From 60b18fac646af5b931771ac4b178970b8ce00864 Mon Sep 17 00:00:00 2001 From: Dirk Alders Date: Mon, 13 Feb 2023 16:01:17 +0100 Subject: [PATCH] User interface for warning and triggering device warning states --- simulation/devices.py | 97 ++++++++++++++++++++++++++++++++++++++++--- simulation/rooms.py | 3 +- smart_brain_test.py | 1 + tests/all.py | 2 +- 4 files changed, 96 insertions(+), 7 deletions(-) diff --git a/simulation/devices.py b/simulation/devices.py index b5f04c5..71cdcab 100644 --- a/simulation/devices.py +++ b/simulation/devices.py @@ -11,7 +11,7 @@ COLOR_SHELLY = colored.fg("light_magenta") COLOR_POWERPLUG = colored.fg("light_cyan") COLOR_LIGHT_ACTIVE = colored.fg("yellow") COLOR_LIGHT_PASSIVE = COLOR_LIGHT_ACTIVE + colored.attr("dim") -COLOR_MOTION_SENSOR = colored.fg("dark_orange_3b") +COLOR_WARNINGS = colored.fg("dark_orange_3b") COLOR_HEATING_VALVE = colored.fg("red") COLOR_REMOTE = colored.fg("green") @@ -125,8 +125,10 @@ class shelly(base): KEY_INPUT_1 = "input/1" KEY_LONGPUSH_0 = "longpush/0" KEY_LONGPUSH_1 = "longpush/1" + KEY_TEMPERATURE = "temperature" + KEY_OVERTEMPERATURE = "overtemperature" # - BOOL_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, ] + BOOL_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, ] # INPUT_FUNC_OUT1_FOLLOW = "out1_follow" INPUT_FUNC_OUT1_TRIGGER = "out1_trigger" @@ -139,11 +141,13 @@ class shelly(base): "get_input_0", "toggle_input_0", "get_input_1", "toggle_input_1", "trigger_input_0_longpress", "trigger_input_1_longpress", + "toggle_overtemperature", ] def __init__(self, mqtt_client, topic, input_0_func=None, input_1_func=None, output_0_auto_off=None): super().__init__(mqtt_client, topic, default_values={self.KEY_OUTPUT_0: False, self.KEY_OUTPUT_1: False, - self.KEY_INPUT_0: False, self.KEY_INPUT_1: False}) + self.KEY_INPUT_0: False, self.KEY_INPUT_1: False, + self.KEY_TEMPERATURE: 35.2, self.KEY_OVERTEMPERATURE: False}) # self.__input_0_func = input_0_func self.__input_1_func = input_1_func @@ -154,6 +158,8 @@ class shelly(base): # publish state changes self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) self.add_callback(self.KEY_OUTPUT_1, None, self.__send__, True) + self.add_callback(self.KEY_TEMPERATURE, None, self.__send__, True) + self.add_callback(self.KEY_OVERTEMPERATURE, None, self.__send__, True) # if self.__output_0_auto_off__ is not None: self.__delayed_off__ = task.delayed(float(self.__output_0_auto_off__), self.__auto_off__, self.KEY_OUTPUT_0) @@ -164,6 +170,11 @@ class shelly(base): # self.__tx__((self.KEY_OUTPUT_0, self.KEY_OUTPUT_1)) + def __int_to_ext__(self, key, value): + if key == self.KEY_OVERTEMPERATURE: + return int(value) + return super().__int_to_ext__(key, value) + def __rx__(self, client, userdata, message): value = self.__payload_filter__(message.payload) if message.topic.startswith(self.topic) and message.topic.endswith("/command"): @@ -235,6 +246,11 @@ class shelly(base): time.sleep(0.1) self.set(self.KEY_INPUT_1, not self[self.KEY_INPUT_1]) self.set(self.KEY_LONGPUSH_1, False) + elif command == self.COMMANDS[10]: + if self.get(self.KEY_OVERTEMPERATURE): + self.warning_state_off() + else: + self.warning_state_on() else: print("%s: not yet implemented!" % command) else: @@ -246,6 +262,14 @@ class shelly(base): channel = "(%s%s)" % (self.names.get(key, key), info) self.print_formatted_light(COLOR_SHELLY, value, channel) + def warning_state_on(self): + self.set(self.KEY_TEMPERATURE, 78.3) + self.set(self.KEY_OVERTEMPERATURE, True) + + def warning_state_off(self): + self.set(self.KEY_TEMPERATURE, 34.1) + self.set(self.KEY_OVERTEMPERATURE, False) + class my_powerplug(base): KEY_OUTPUT_0 = "state" @@ -312,8 +336,11 @@ class silvercrest_powerplug(base): self.set(self.KEY_OUTPUT_0, self.__ext_to_int__(self.KEY_OUTPUT_0, state)) def __tx__(self, keys_changed): + data = {} + for key in self: + data[key] = self.__int_to_ext__(key, self[key]) for key in keys_changed: - self.mqtt_client.send(self.topic + '/' + key, self.__int_to_ext__(self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0))) + self.mqtt_client.send(self.topic, json.dumps(data)) def command(self, command): if command in self.COMMANDS: @@ -454,20 +481,23 @@ class brennenstuhl_heating_valve(base): # KEY_TEMPERATURE_SETPOINT = "current_heating_setpoint" KEY_TEMPERATURE = "local_temperature" + KEY_BATTERY = "battery" # COMMANDS = [ - "get_temperature_setpoint", "set_temperature_setpoint", "set_local_temperature", + "get_temperature_setpoint", "set_temperature_setpoint", "set_local_temperature", "set_battery", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic, default_values={ self.KEY_TEMPERATURE_SETPOINT: 20, self.KEY_TEMPERATURE: 20.7, + self.KEY_BATTERY: 97, }) # self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.print_formatted, True) self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.__send__, True) self.add_callback(self.KEY_TEMPERATURE, None, self.__send__, True) + self.add_callback(self.KEY_BATTERY, None, self.__send__, True) # self.__tx__((self.KEY_TEMPERATURE_SETPOINT, )) @@ -495,6 +525,8 @@ class brennenstuhl_heating_valve(base): self.set(self.KEY_TEMPERATURE_SETPOINT, self.__command_float_value__(value)) elif command == self.COMMANDS[2]: self.set(self.KEY_TEMPERATURE, self.__command_float_value__(value)) + elif command == self.COMMANDS[3]: + self.set(self.KEY_BATTERY, self.__command_float_value__(value)) def print_formatted(self, device, key, value): if key == self.KEY_TEMPERATURE_SETPOINT: @@ -503,6 +535,12 @@ class brennenstuhl_heating_valve(base): perc = 0 if perc < 0 else perc self.print_formatted_percent(COLOR_HEATING_VALVE, '\u03d1', perc, "%4.1f°C" % value, "") + def warning_state_on(self): + self.set(self.KEY_BATTERY, 7) + + def warning_state_off(self): + self.set(self.KEY_BATTERY, 97) + class videv_light(base_videv): KEY_OUTPUT_0 = "state" @@ -733,8 +771,44 @@ class videv_heating(base_videv): self.print_formatted_percent(COLOR_GUI_ACTIVE, 't', perc, '%3d%%' % perc, '(%.1f)' % disp_value) +class videv_warnings(base): + KEY_WARNING = "html_short" + # + COMMANDS = [ + "get_warnings", + ] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic, dict.fromkeys([self.KEY_WARNING])) + # + self.add_callback(self.KEY_WARNING, None, self.print_formatted, True) + + def __rx__(self, client, userdata, message): + if message.topic == self.topic + "/" + self.KEY_WARNING: + self.set(self.KEY_WARNING, message.payload.decode("utf-8")) + + def command(self, command): + if command in self.COMMANDS: + if command == self.COMMANDS[0]: + self.print_formatted(self, self.KEY_WARNING, self.get(self.KEY_WARNING)) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def print_formatted(self, device, key, value): + if OUTPUT_ACTIVE: + value = value.replace("

", "\n") + value = value.replace("
", " - ") + print(COLOR_WARNINGS + "** WARNINGS: *************************************************") + for line in value.split("\n"): + print(" ", line) + print("**************************************************************" + colored.attr("reset")) + # class silvercrest_motion_sensor(base): # KEY_OCCUPANCY = "occupancy" +# KEY_BATTERY = "battery" +# KEY_BATTERY_LOW = "battery_low" # COMMANDS = ['motion'] # def __init__(self, mqtt_client, topic): @@ -761,9 +835,17 @@ class videv_heating(base_videv): # if value is not None: # print_light(COLOR_MOTION_SENSOR, value, self.topic, "") +# def warning_state_on(self): +# self.set(self.KEY_BATTERY, 7) +# self.set(self.KEY_BATTERY_LOW, True) +# def warning_state_off(self): +# self.set(self.KEY_BATTERY, 97) +# self.set(self.KEY_BATTERY_LOW, False) + # class tradfri_button(base): # KEY_ACTION = "action" +# KEY_BATTERY = "battery" # # # ACTION_TOGGLE = "toggle" # ACTION_BRIGHTNESS_UP = "brightness_up_click" @@ -800,6 +882,11 @@ class videv_heating(base_videv): # time.sleep(value or 0.5) # action = '_'.join(action.split('_')[:-1] + ['release']) # self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) +# +# def warning_state_on(self): +# self.set(self.KEY_BATTERY, 7) +# def warning_state_off(self): +# self.set(self.KEY_BATTERY, 97) # class remote(base): diff --git a/simulation/rooms.py b/simulation/rooms.py index 2470e6a..b7eda45 100644 --- a/simulation/rooms.py +++ b/simulation/rooms.py @@ -1,6 +1,6 @@ import config from simulation.devices import shelly, silvercrest_powerplug, tradfri_light, my_powerplug, brennenstuhl_heating_valve -from simulation.devices import videv_light, videv_heating +from simulation.devices import videv_light, videv_heating, videv_warnings import inspect @@ -266,3 +266,4 @@ class house(base): self.ffw = ffw(mqtt_client) self.ffe = ffe(mqtt_client) self.stairway = stairway(mqtt_client) + self.warnings = videv_warnings(mqtt_client, config.TOPIC_WARNINGS) diff --git a/smart_brain_test.py b/smart_brain_test.py index 3676a94..22c424e 100644 --- a/smart_brain_test.py +++ b/smart_brain_test.py @@ -6,6 +6,7 @@ import sys from tests.all import test_smarthome # TODO: Extend tests in simulation +# - Test: Check of warning messages for battery and overtemperature # - Switching button functions (gfw_dirk, ffe.sleep) # - Brightness button functions (gfw.dirk, ffe.sleep) # - Synch functions of amplifier with spotify, mpd diff --git a/tests/all.py b/tests/all.py index ed39076..3be5b2b 100644 --- a/tests/all.py +++ b/tests/all.py @@ -83,7 +83,7 @@ class test_smarthome(object): system_info[jsonlog.SYSI_USERNAME] = getpass.getuser() system_info[jsonlog.SYSI_PATH] = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) - self.tcl = report.testSession(['__unittest__', 'unittest', ROOT_LOGGER_NAME]) + self.tcl = report.testSession(['__unittest__', ROOT_LOGGER_NAME]) self.tcl[jsonlog.MAIN_KEY_SYSTEM_INFO] = system_info self.tcl["testcase_names"] = report.TCEL_NAMES