diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..ddea26d --- /dev/null +++ b/.gitmodules @@ -0,0 +1,12 @@ +[submodule "geo"] + path = geo + url = https://git.mount-mockery.de/pylib/geo.git +[submodule "report"] + path = report + url = https://git.mount-mockery.de/pylib/report.git +[submodule "mqtt"] + path = mqtt + url = https://git.mount-mockery.de/pylib/mqtt.git +[submodule "task"] + path = task + url = https://git.mount-mockery.de/pylib/task.git diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..3850970 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Verwendet IntelliSense zum Ermitteln möglicher Attribute. + // Zeigen Sie auf vorhandene Attribute, um die zugehörigen Beschreibungen anzuzeigen. + // Weitere Informationen finden Sie unter https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Main File execution", + "type": "python", + "request": "launch", + "program": "${workspaceFolder}/smart_brain_test.py", + "console": "integratedTerminal", + "justMyCode": true + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..ca42408 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,12 @@ +{ + "python.defaultInterpreterPath": "./venv/bin/python", + "editor.formatOnSave": true, + "autopep8.args": [ + "--max-line-length=150" + ], + "editor.fontSize": 14, + "emmet.includeLanguages": { + "django-html": "html" + }, + "python.analysis.typeCheckingMode": "off" +} \ No newline at end of file diff --git a/base.py b/base.py new file mode 120000 index 0000000..e5bca4b --- /dev/null +++ b/base.py @@ -0,0 +1 @@ +../smart_brain/base.py \ No newline at end of file diff --git a/config.py b/config.py new file mode 120000 index 0000000..0e51527 --- /dev/null +++ b/config.py @@ -0,0 +1 @@ +../smart_brain/config.py \ No newline at end of file diff --git a/geo b/geo new file mode 160000 index 0000000..11166bb --- /dev/null +++ b/geo @@ -0,0 +1 @@ +Subproject commit 11166bb27ad2335f7812fcb88c788397f5106751 diff --git a/mqtt b/mqtt new file mode 160000 index 0000000..1adfb06 --- /dev/null +++ b/mqtt @@ -0,0 +1 @@ +Subproject commit 1adfb0626e7777c6d29be65d4ad4ce2d57541301 diff --git a/report b/report new file mode 160000 index 0000000..e2392c9 --- /dev/null +++ b/report @@ -0,0 +1 @@ +Subproject commit e2392c9f28d88ee54463681850acf95ae496c9a0 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4353f82 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +colored diff --git a/simulation/devices.py b/simulation/devices.py new file mode 100644 index 0000000..9765cef --- /dev/null +++ b/simulation/devices.py @@ -0,0 +1,810 @@ +from base import mqtt_base +import colored +import copy +import json +import task +import time + +COLOR_GUI_ACTIVE = colored.fg("light_blue") +COLOR_GUI_PASSIVE = COLOR_GUI_ACTIVE + colored.attr("dim") +COLOR_SHELLY = colored.fg("light_magenta") +COLOR_POWERPLUG = colored.fg("light_cyan") +COLOR_LIGHT_ACTIVE = colored.fg("yellow") +COLOR_LIGHT_PASSIVE = COLOR_LIGHT_ACTIVE + colored.attr("dim") +COLOR_MOTION_SENSOR = colored.fg("dark_orange_3b") +COLOR_HEATING_VALVE = colored.fg("red") +COLOR_REMOTE = colored.fg("green") + + +class base(mqtt_base): + AUTOSEND = True + COMMANDS = [] + BOOL_KEYS = [] + + def __init__(self, mqtt_client, topic, default_values=None): + super().__init__(mqtt_client, topic, default_values) + + self.names = {} + self.commands = self.COMMANDS[:] + # + self.mqtt_client.add_callback(self.topic, self.__rx__) + self.mqtt_client.add_callback(self.topic + '/#', self.__rx__) + + def add_channel_name(self, key, name): + self.names[key] = name + + def capabilities(self): + return self.commands + + def __payload_filter__(self, payload): + try: + return json.loads(payload) + except json.decoder.JSONDecodeError: + return payload.decode("utf-8") + + def __rx__(self, client, userdata, message): + print("%s: __rx__ not handled!" % self.__class__.__name__) + + def __tx__(self, keys_changed): + print("%s: __tx__ not handled!" % self.__class__.__name__) + + def __send__(self, client, key, data): + self.__tx__([key]) + + def __ext_to_int__(self, key, data): + if key in self.BOOL_KEYS: + if data == 'toggle': + return not self.get(key) + return data == "on" + return data + + def __int_to_ext__(self, key, value): + if key in self.BOOL_KEYS: + return "on" if value is True else "off" + return value + + def __devicename__(self): + return " - ".join(self.topic.split('/')[1:]) + + def __percent_bar__(self, percent_value): + rv = "" + for i in range(0, 10): + rv += u"\u25ac" if (percent_value - 5) > 10*i else u"\u25ad" + return rv + + def __command_int_value__(self, value): + try: + return int(value) + except TypeError: + print("You need to give a integer parameter not '%s'" % str(value)) + + def __command_float_value__(self, value): + try: + return float(value) + except TypeError: + print("You need to give a numeric parameter not '%s'" % str(value)) + + def print_formatted_light(self, color, state, description, led=False): + if led is True: + if state is True: + icon = colored.fg('green') + "\u2b24" + color + else: + icon = colored.fg('light_gray') + "\u2b24" + color + else: + icon = u'\u2b24' if state is True else u'\u25ef' + print(color + 10 * ' ' + icon + 9 * ' ' + self.__devicename__(), description + colored.attr("reset")) + + def print_formatted_videv(self, color, state, description): + icon = u'\u25a0' if state is True else u'\u25a1' + print(color + 10 * ' ' + icon + 9 * ' ' + self.__devicename__(), description + colored.attr("reset")) + + def print_formatted_percent(self, color, prefix, perc_value, value_str, description): + if len(prefix) > 1 or len(value_str) > 7: + raise ValueError("Length of prefix (%d) > 1 or length of value_str (%d) > 7" % (len(prefix), len(value_str))) + print(color + prefix + self.__percent_bar__(perc_value), value_str + (8 - len(value_str)) + * ' ' + self.__devicename__(), description + colored.attr("reset")) + + +class base_videv(base): + def set(self, key, data, block_callback=[]): + if key in self.keys(): + return super().set(key, data, block_callback) + else: + self.__send__(self, key, data) + + +class shelly(base): + KEY_OUTPUT_0 = "relay/0" + KEY_OUTPUT_1 = "relay/1" + KEY_INPUT_0 = "input/0" + KEY_INPUT_1 = "input/1" + KEY_LONGPUSH_0 = "longpush/0" + KEY_LONGPUSH_1 = "longpush/1" + # + BOOL_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, ] + # + INPUT_FUNC_OUT1_FOLLOW = "out1_follow" + INPUT_FUNC_OUT1_TRIGGER = "out1_trigger" + INPUT_FUNC_OUT2_FOLLOW = "out2_follow" + INPUT_FUNC_OUT2_TRIGGER = "out2_trigger" + # + COMMANDS = [ + "get_output_0", "toggle_output_0", + "get_output_1", "toggle_output_1", + "get_input_0", "toggle_input_0", + "get_input_1", "toggle_input_1", + "trigger_input_0_longpress", "trigger_input_1_longpress", + ] + + def __init__(self, mqtt_client, topic, input_0_func=None, input_1_func=None, output_0_auto_off=None): + super().__init__(mqtt_client, topic, default_values={self.KEY_OUTPUT_0: False, self.KEY_OUTPUT_1: False, + self.KEY_INPUT_0: False, self.KEY_INPUT_1: False}) + # + self.__input_0_func = input_0_func + self.__input_1_func = input_1_func + self.__output_0_auto_off__ = output_0_auto_off + # print ouput changes + self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) + self.add_callback(self.KEY_OUTPUT_1, None, self.print_formatted, True) + # publish state changes + self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) + self.add_callback(self.KEY_OUTPUT_1, None, self.__send__, True) + # + if self.__output_0_auto_off__ is not None: + self.__delayed_off__ = task.delayed(float(self.__output_0_auto_off__), self.__auto_off__, self.KEY_OUTPUT_0) + self.add_callback(self.KEY_OUTPUT_0, True, self.__start_auto_off__) + # + self.add_callback(self.KEY_INPUT_0, self.__input_function__, None) + self.add_callback(self.KEY_INPUT_1, self.__input_function__, None) + # + self.__tx__((self.KEY_OUTPUT_0, self.KEY_OUTPUT_1)) + + def __rx__(self, client, userdata, message): + value = self.__payload_filter__(message.payload) + if message.topic.startswith(self.topic) and message.topic.endswith("/command"): + key = '/'.join(message.topic.split('/')[-3:-1]) + self.set(key, self.__ext_to_int__(key, value)) + + def __tx__(self, keys_changed): + for key in keys_changed: + self.mqtt_client.send(self.topic + '/' + key, self.__int_to_ext__(key, self[key])) + + def __input_function__(self, device, key, data): + if key == self.KEY_INPUT_0: + func = self.__input_0_func + elif key == self.KEY_INPUT_1: + func = self.__input_1_func + else: + func = None + if func == self.INPUT_FUNC_OUT1_FOLLOW: + self.set(self.KEY_OUTPUT_0, data) + elif func == self.INPUT_FUNC_OUT1_TRIGGER: + self.set__toggle_data__(self.KEY_OUTPUT_0) + elif func == self.INPUT_FUNC_OUT2_FOLLOW: + self.__set_data__(self.KEY_OUTPUT_1, data) + elif func == self.INPUT_FUNC_OUT2_TRIGGER: + self.__toggle_data__(self.KEY_OUTPUT_1) + + def __start_auto_off__(self, device, key, data): + # stop delayed task if needed + if self.__output_0_auto_off__ is not None: + if not self.__delayed_off__._stopped: + self.__delayed_off__.stop() + # start delayed task + if self.__output_0_auto_off__ is not None: + self.__delayed_off__.run() + + def __auto_off__(self, key): + if key == self.KEY_OUTPUT_0: + self.set(key, False) + + def command(self, command): + if command in self.COMMANDS: + if command == self.COMMANDS[0]: + self.print_formatted(self, self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0)) + elif command == self.COMMANDS[1]: + self.set(self.KEY_OUTPUT_0, not self[self.KEY_OUTPUT_0]) + elif command == self.COMMANDS[2]: + self.print_formatted(self, self.KEY_OUTPUT_1, self.get(self.KEY_OUTPUT_1)) + elif command == self.COMMANDS[3]: + self.set(self.KEY_OUTPUT_1, not self[self.KEY_OUTPUT_1]) + elif command == self.COMMANDS[4]: + self.print_formatted(self, self.KEY_INPUT_0, self.get(self.KEY_INPUT_0)) + elif command == self.COMMANDS[5]: + self.set(self.KEY_INPUT_0, not self[self.KEY_INPUT_0]) + elif command == self.COMMANDS[6]: + self.print_formatted(self, self.KEY_INPUT_1, self.get(self.KEY_INPUT_1)) + elif command == self.COMMANDS[7]: + self.set(self.KEY_INPUT_1, not self[self.KEY_INPUT_1]) + elif command == self.COMMANDS[8]: + self.set(self.KEY_INPUT_0, not self[self.KEY_INPUT_0]) + time.sleep(0.4) + self.set(self.KEY_LONGPUSH_0, True) + time.sleep(0.1) + self.set(self.KEY_INPUT_0, not self[self.KEY_INPUT_0]) + self.set(self.KEY_LONGPUSH_0, False) + elif command == self.COMMANDS[9]: + self.set(self.KEY_INPUT_1, not self[self.KEY_INPUT_1]) + time.sleep(0.4) + self.set(self.KEY_LONGPUSH_1, True) + time.sleep(0.1) + self.set(self.KEY_INPUT_1, not self[self.KEY_INPUT_1]) + self.set(self.KEY_LONGPUSH_1, False) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def print_formatted(self, device, key, value): + if value is not None: + info = (" - %ds" % self.__output_0_auto_off__) if self.__output_0_auto_off__ is not None and value else "" + channel = "(%s%s)" % (self.names.get(key, key), info) + self.print_formatted_light(COLOR_SHELLY, value, channel) + + +class my_powerplug(base): + KEY_OUTPUT_0 = "state" + # + COMMANDS = [ + "get_output", "toggle_output", + ] + + def __init__(self, mqtt_client, topic, channel): + super().__init__(mqtt_client, topic + '/' + "output/%d" % (channel + 1), default_values={self.KEY_OUTPUT_0: False}) + # + self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) + self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) + # + self.__tx__((self.KEY_OUTPUT_0, )) + + def __rx__(self, client, userdata, message): + if message.topic == self.topic + '/set': + payload = self.__payload_filter__(message.payload) + if payload == "toggle": + payload = not self.get(self.KEY_OUTPUT_0) + self.set(self.KEY_OUTPUT_0, payload) + + def __tx__(self, keys_changed): + for key in keys_changed: + self.mqtt_client.send(self.topic, json.dumps(self.get(key))) + + def command(self, command): + if command in self.COMMANDS: + if command == self.COMMANDS[0]: + self.print_formatted(self, self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0)) + elif command == self.COMMANDS[1]: + self.set(self.KEY_OUTPUT_0, not self.get(self.KEY_OUTPUT_0)) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def print_formatted(self, device, key, value): + if value is not None: + self.print_formatted_light(COLOR_POWERPLUG, value, "(%s)" % self.names.get(key, "State")) + + +class silvercrest_powerplug(base): + KEY_OUTPUT_0 = "state" + # + BOOL_KEYS = [KEY_OUTPUT_0, ] + # + COMMANDS = [ + "get_output", "toggle_output", + ] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic, default_values={self.KEY_OUTPUT_0: False}) + # + self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) + self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) + # + self.__tx__((self.KEY_OUTPUT_0, )) + + def __rx__(self, client, userdata, message): + if message.topic == self.topic + '/set': + state = json.loads(message.payload).get('state') + self.set(self.KEY_OUTPUT_0, self.__ext_to_int__(self.KEY_OUTPUT_0, state)) + + def __tx__(self, keys_changed): + for key in keys_changed: + self.mqtt_client.send(self.topic + '/' + key, self.__int_to_ext__(self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0))) + + def command(self, command): + if command in self.COMMANDS: + if command == self.COMMANDS[0]: + self.print_formatted(self, self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0)) + elif command == self.COMMANDS[1]: + self.set(self.KEY_OUTPUT_0, not self.get(self.KEY_OUTPUT_0)) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def print_formatted(self, device, key, value): + if value is not None: + self.print_formatted_light(COLOR_POWERPLUG, value, "(%s)" % self.names.get(key, key)) + + +class tradfri_light(base): + KEY_OUTPUT_0 = "state" + KEY_BRIGHTNESS = "brightness" + KEY_COLOR_TEMP = "color_temp" + # + BOOL_KEYS = [KEY_OUTPUT_0, ] + # + STATE_COMMANDS = ("get_state", "toggle_state", ) + BRIGHTNESS_COMMANDS = ("get_brightness", "set_brightness",) + COLOR_TEMP_COMMANDS = ("get_color_temp", "set_color_temp",) + + def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, send_on_power_on=True): + default_values = {} + if enable_state: + default_values[self.KEY_OUTPUT_0] = False + if enable_brightness: + default_values[self.KEY_BRIGHTNESS] = 50 + if enable_color_temp: + default_values[self.KEY_COLOR_TEMP] = 5 + super().__init__(mqtt_client, topic, default_values=default_values) + # + self.send_on_power_on = send_on_power_on + # + if enable_state: + self.commands.extend(self.STATE_COMMANDS) + if enable_brightness: + self.commands.extend(self.BRIGHTNESS_COMMANDS) + if enable_color_temp: + self.commands.extend(self.COLOR_TEMP_COMMANDS) + # + self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) + self.add_callback(self.KEY_BRIGHTNESS, None, self.print_formatted, True) + self.add_callback(self.KEY_COLOR_TEMP, None, self.print_formatted, True) + self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) + self.add_callback(self.KEY_BRIGHTNESS, None, self.__send__, True) + self.add_callback(self.KEY_COLOR_TEMP, None, self.__send__, True) + + def __ext_to_int__(self, key, data): + if key == self.KEY_BRIGHTNESS: + return round((data - 1) / 2.53, 0) + elif key == self.KEY_COLOR_TEMP: + return round((data - 250) / 20.4, 0) + else: + return super().__ext_to_int__(key, data) + + def __int_to_ext__(self, key, data): + if key == self.KEY_BRIGHTNESS: + return 1 + round(2.53 * data, 0) + elif key == self.KEY_COLOR_TEMP: + return 250 + round(20.4 * data, 0) + else: + return super().__int_to_ext__(key, data) + + def __rx__(self, client, userdata, message): + data = json.loads(message.payload) + if self.get(self.KEY_OUTPUT_0) or data.get(self.KEY_OUTPUT_0) in ['on', 'toggle']: # prevent non power changes, if not powered on + if message.topic.startswith(self.topic) and message.topic.endswith('/set'): + for targetkey in data.keys(): + value = data[targetkey] + if targetkey in self.keys(): + self.set(targetkey, self.__ext_to_int__(targetkey, value)) + elif message.topic == self.topic + '/get': + self.__tx__(None) + + def __tx__(self, keys_changed): + tx_data = dict(self) + for key in tx_data: + tx_data[key] = self.__int_to_ext__(key, self[key]) + self.mqtt_client.send(self.topic, json.dumps(tx_data)) + + def command(self, command): + try: + command, value = command.split(' ') + except ValueError: + value = None + if command in self.capabilities(): + if command == self.STATE_COMMANDS[0]: + self.print_formatted(self, self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0)) + elif command == self.STATE_COMMANDS[1]: + self.set(self.KEY_OUTPUT_0, not self.get(self.KEY_OUTPUT_0)) + elif command == self.BRIGHTNESS_COMMANDS[0]: + self.print_formatted(self, self.KEY_BRIGHTNESS, self.get(self.KEY_BRIGHTNESS)) + elif command == self.BRIGHTNESS_COMMANDS[1]: + self.set(self.KEY_BRIGHTNESS, self.__command_int_value__(value)) + elif command == self.COLOR_TEMP_COMMANDS[0]: + self.print_formatted(self, self.KEY_COLOR_TEMP, self.get(self.KEY_COLOR_TEMP)) + elif command == self.COLOR_TEMP_COMMANDS[1]: + self.set(self.KEY_COLOR_TEMP, self.__command_int_value__(value)) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def power_off(self, device, key, value): + self.set(self.KEY_OUTPUT_0, False, block_callback=(self.__send__, )) + + def power_on(self, device, key, value): + block_callback = [] if self.send_on_power_on else (self.__send__, ) + self.set(self.KEY_OUTPUT_0, True, block_callback=block_callback) + + def print_formatted(self, device, key, value): + if value is not None: + color = COLOR_LIGHT_ACTIVE + if key == self.KEY_OUTPUT_0: + self.print_formatted_light(COLOR_LIGHT_ACTIVE, value, "") + self.print_formatted(device, self.KEY_BRIGHTNESS, self.get(self.KEY_BRIGHTNESS)) + self.print_formatted(device, self.KEY_COLOR_TEMP, self.get(self.KEY_COLOR_TEMP)) + elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: + perc_value = round(value, 0) if key == self.KEY_BRIGHTNESS else round(10 * value, 0) + self.print_formatted_percent( + COLOR_LIGHT_PASSIVE if not self.get(self.KEY_OUTPUT_0) else COLOR_LIGHT_ACTIVE, + 'B' if key == self.KEY_BRIGHTNESS else 'C', + perc_value, + "%3d%%" % perc_value, + "" + ) + + +class brennenstuhl_heating_valve(base): + TEMP_RANGE = [10, 30] + # + KEY_TEMPERATURE_SETPOINT = "current_heating_setpoint" + KEY_TEMPERATURE = "local_temperature" + # + COMMANDS = [ + "get_temperature_setpoint", "set_temperature_setpoint", "set_local_temperature", + ] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic, default_values={ + self.KEY_TEMPERATURE_SETPOINT: 20, + self.KEY_TEMPERATURE: 20.7, + }) + # + self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.print_formatted, True) + self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.__send__, True) + self.add_callback(self.KEY_TEMPERATURE, None, self.__send__, True) + # + self.__tx__((self.KEY_TEMPERATURE_SETPOINT, )) + + def __rx__(self, client, userdata, message): + if message.topic.startswith(self.topic) and message.topic.endswith("/set"): + payload = self.__payload_filter__(message.payload) + for key in payload: + self.set(key, self.__ext_to_int__(key, payload[key])) + + def __tx__(self, keys_changed): + tx_data = dict(self) + for key in tx_data: + tx_data[key] = self.__int_to_ext__(key, self[key]) + self.mqtt_client.send(self.topic, json.dumps(tx_data)) + + def command(self, command): + try: + command, value = command.split(' ') + except ValueError: + value = None + if command in self.COMMANDS: + if command == self.COMMANDS[0]: + self.print_formatted(self, self.KEY_TEMPERATURE_SETPOINT, self.get(self.KEY_TEMPERATURE_SETPOINT)) + elif command == self.COMMANDS[1]: + self.set(self.KEY_TEMPERATURE_SETPOINT, self.__command_float_value__(value)) + elif command == self.COMMANDS[2]: + self.set(self.KEY_TEMPERATURE, self.__command_float_value__(value)) + + def print_formatted(self, device, key, value): + if key == self.KEY_TEMPERATURE_SETPOINT: + perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) + perc = 100 if perc > 100 else perc + perc = 0 if perc < 0 else perc + self.print_formatted_percent(COLOR_HEATING_VALVE, '\u03d1', perc, "%4.1f°C" % value, "") + + +class videv_light(base_videv): + KEY_OUTPUT_0 = "state" + KEY_BRIGHTNESS = "brightness" + KEY_COLOR_TEMP = "color_temp" + KEY_TIMER = "timer" + # + STATE_COMMANDS = ("get_state", "toggle_state", ) + BRIGHTNESS_COMMANDS = ("get_brightness", "set_brightness", ) + COLOR_TEMP_COMMANDS = ("get_color_temp", "set_color_temp", ) + TIMER_COMMANDS = ("get_timer", ) + + def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, enable_timer=False): + default_values = {} + if enable_state: + default_values[self.KEY_OUTPUT_0] = False + if enable_brightness: + default_values[self.KEY_BRIGHTNESS] = 50 + if enable_color_temp: + default_values[self.KEY_COLOR_TEMP] = 5 + if enable_timer: + default_values[self.KEY_TIMER] = 0 + super().__init__(mqtt_client, topic, default_values=default_values) + # + self.enable_state = enable_state + self.enable_brightness = enable_brightness + self.enable_color_temp = enable_color_temp + self.enable_timer = enable_timer + # + if enable_state: + self.commands.extend(self.STATE_COMMANDS) + if enable_brightness: + self.commands.extend(self.BRIGHTNESS_COMMANDS) + if enable_color_temp: + self.commands.extend(self.COLOR_TEMP_COMMANDS) + if enable_timer: + self.commands.extend(self.TIMER_COMMANDS) + # + self.timer_maxvalue = None + # add commands to be available + self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) + self.add_callback(self.KEY_BRIGHTNESS, None, self.print_formatted, True) + self.add_callback(self.KEY_COLOR_TEMP, None, self.print_formatted, True) + self.add_callback(self.KEY_TIMER, None, self.print_formatted, True) + self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) + self.add_callback(self.KEY_BRIGHTNESS, None, self.__send__, True) + self.add_callback(self.KEY_COLOR_TEMP, None, self.__send__, True) + self.add_callback(self.KEY_TIMER, None, self.__send__, True) + + def __rx__(self, client, userdata, message): + value = self.__payload_filter__(message.payload) + if message.topic.startswith(self.topic): + targetkey = message.topic.split('/')[-1] + if targetkey in self.keys(): + self.set(targetkey, value, block_callback=(self.__send__, )) + elif targetkey != "__info__": + print("Unknown key %s in %s::%s" % (targetkey, message.topic, self.__class__.__name__)) + + def __tx__(self, keys_changed): + for key in keys_changed: + topic = self.topic + '/' + key + self.mqtt_client.send(topic, json.dumps(self[key])) + + def command(self, command): + try: + command, value = command.split(' ') + except ValueError: + value = None + if command in self.capabilities(): + if command == self.STATE_COMMANDS[0]: + self.print_formatted(self, self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0)) + elif command == self.STATE_COMMANDS[1]: + self.set(self.KEY_OUTPUT_0, not self.get(self.KEY_OUTPUT_0)) + elif command == self.BRIGHTNESS_COMMANDS[0]: + self.print_formatted(self, self.KEY_BRIGHTNESS, self.get(self.KEY_BRIGHTNESS)) + elif command == self.BRIGHTNESS_COMMANDS[1]: + self.set(self.KEY_BRIGHTNESS, self.__command_int_value__(value)) + elif command == self.COLOR_TEMP_COMMANDS[0]: + self.print_formatted(self, self.KEY_COLOR_TEMP, self.get(self.KEY_COLOR_TEMP)) + elif command == self.COLOR_TEMP_COMMANDS[1]: + self.set(self.KEY_COLOR_TEMP, self.__command_int_value__(value)) + elif command == self.TIMER_COMMANDS[0]: + self.print_formatted(self, self.KEY_TIMER, self.get(self.KEY_TIMER)) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def print_formatted(self, device, key, value): + if value is not None: + if key == self.KEY_OUTPUT_0: + self.print_formatted_videv(COLOR_GUI_ACTIVE, value, "") + elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: + perc_value = round(value * 10 if key == self.KEY_COLOR_TEMP else value, 0) + self.print_formatted_percent( + COLOR_GUI_ACTIVE, + 'B' if key == self.KEY_BRIGHTNESS else 'C', + perc_value, + "%3d%%" % perc_value, + "" + ) + elif key == self.KEY_TIMER: + if value > 0: + if self.timer_maxvalue is None and value != 0: + self.timer_maxvalue = value + disp_value = value + try: + perc = disp_value / self.timer_maxvalue * 100 + except ZeroDivisionError: + perc = 0 + else: + disp_value = 0 + perc = 0 + self.timer_maxvalue = None + self.print_formatted_percent(COLOR_GUI_ACTIVE, 't', perc, '%3d%%' % perc, '(%.1f)' % disp_value) + + +class videv_heating(base_videv): + TEMP_RANGE = [10, 30] + # + KEY_USER_TEMPERATURE_SETPOINT = 'user_temperature_setpoint' + KEY_VALVE_TEMPERATURE_SETPOINT = 'valve_temperature_setpoint' + KEY_AWAY_MODE = 'away_mode' + KEY_SUMMER_MODE = 'summer_mode' + KEY_SET_DEFAULT_TEMPERATURE = 'set_default_temperature' + KEY_START_BOOST = 'start_boost' + KEY_BOOST_TIMER = 'boost_timer' + # + KEY_TEMPERATURE = 'temperature' + # + COMMANDS = ["get_temperature_setpoint", "set_temperature_setpoint", "toggle_away_mode", + "toggle_summer_mode", "trigger_default_temperature", "trigger_boost"] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic, default_values={ + self.KEY_USER_TEMPERATURE_SETPOINT: 20, + self.KEY_VALVE_TEMPERATURE_SETPOINT: 20, + self.KEY_TEMPERATURE: 20.7, + self.KEY_AWAY_MODE: False, + self.KEY_SUMMER_MODE: False, + self.KEY_BOOST_TIMER: 0 + }) + self.add_callback(self.KEY_USER_TEMPERATURE_SETPOINT, None, self.print_formatted, True) + self.add_callback(self.KEY_VALVE_TEMPERATURE_SETPOINT, None, self.print_formatted, True) + self.add_callback(self.KEY_TEMPERATURE, None, self.print_formatted, True) + self.add_callback(self.KEY_AWAY_MODE, None, self.print_formatted, True) + self.add_callback(self.KEY_SUMMER_MODE, None, self.print_formatted, True) + self.add_callback(self.KEY_BOOST_TIMER, None, self.print_formatted, True) + self.add_callback(self.KEY_USER_TEMPERATURE_SETPOINT, None, self.__send__, True) + self.add_callback(self.KEY_TEMPERATURE, None, self.__send__, True) + self.add_callback(self.KEY_AWAY_MODE, None, self.__send__, True) + self.add_callback(self.KEY_SUMMER_MODE, None, self.__send__, True) + # + self.timer_maxvalue = None + + def __rx__(self, client, userdata, message): + value = self.__payload_filter__(message.payload) + if message.topic.startswith(self.topic): + targetkey = message.topic.split('/')[-1] + if targetkey in self.keys(): + self.set(targetkey, value, block_callback=(self.__send__, )) + elif targetkey not in ["__info__", self.KEY_SET_DEFAULT_TEMPERATURE, self.KEY_START_BOOST]: + print("Unknown key %s in %s::%s" % (targetkey, message.topic, self.__class__.__name__)) + + def __tx__(self, keys_changed): + for key in keys_changed: + topic = self.topic + '/' + key + try: + self.mqtt_client.send(topic, json.dumps(self[key])) + except KeyError: + self.mqtt_client.send(topic, json.dumps(True)) + + def command(self, command): + try: + command, value = command.split(' ') + except ValueError: + value = None + if command in self.capabilities(): + if command == self.commands[0]: + self.print_formatted(self, self.KEY_USER_TEMPERATURE_SETPOINT, self.get(self.KEY_USER_TEMPERATURE_SETPOINT)) + elif command == self.commands[1]: + self.set(self.KEY_USER_TEMPERATURE_SETPOINT, self.__command_float_value__(value)) + elif command == self.commands[2]: + self.set(self.KEY_AWAY_MODE, not self.get(self.KEY_AWAY_MODE)) + elif command == self.commands[3]: + self.set(self.KEY_SUMMER_MODE, not self.get(self.KEY_SUMMER_MODE)) + elif command == self.commands[4]: + self.set(self, self.KEY_SET_DEFAULT_TEMPERATURE, True) + elif command == self.commands[5]: + self.set(self, self.KEY_START_BOOST, True) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def print_formatted(self, device, key, value): + desc_temp_dict = { + self.KEY_TEMPERATURE: "Current Temperature", + self.KEY_USER_TEMPERATURE_SETPOINT: "User Setpoint", + self.KEY_VALVE_TEMPERATURE_SETPOINT: "Valve Setpoint" + } + if value is not None: + if key in [self.KEY_TEMPERATURE, self.KEY_USER_TEMPERATURE_SETPOINT, self.KEY_VALVE_TEMPERATURE_SETPOINT]: + perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) + perc = 100 if perc > 100 else perc + perc = 0 if perc < 0 else perc + self.print_formatted_percent(COLOR_GUI_ACTIVE, '\u03d1', perc, "%4.1f°C" % value, "(%s)" % desc_temp_dict[key]) + elif key in [self.KEY_AWAY_MODE, self.KEY_SUMMER_MODE]: + self.print_formatted_videv(COLOR_GUI_ACTIVE, value, "(%s)" % "Away-Mode" if key == self.KEY_AWAY_MODE else "Summer-Mode") + elif key == self.KEY_BOOST_TIMER: + if value > 0: + if self.timer_maxvalue is None and value != 0: + self.timer_maxvalue = value + disp_value = value + try: + perc = disp_value / self.timer_maxvalue * 100 + except ZeroDivisionError: + perc = 0 + else: + disp_value = 0 + perc = 0 + self.timer_maxvalue = None + self.print_formatted_percent(COLOR_GUI_ACTIVE, 't', perc, '%3d%%' % perc, '(%.1f)' % disp_value) + + +# class silvercrest_motion_sensor(base): +# KEY_OCCUPANCY = "occupancy" +# COMMANDS = ['motion'] + +# def __init__(self, mqtt_client, topic): +# super().__init__(mqtt_client, topic) +# self.data[self.KEY_OCCUPANCY] = False +# self.add_callback(self.KEY_OCCUPANCY, self.print_formatted, None) + +# def __rx__(self, client, userdata, message): +# pass + +# def command(self, command): +# try: +# command, value = command.split(' ') +# except ValueError: +# value = None +# else: +# value = json.loads(value) +# if command == self.COMMANDS[0]: +# self.store_data(**{self.KEY_OCCUPANCY: True}) +# time.sleep(value or 10) +# self.store_data(**{self.KEY_OCCUPANCY: False}) + +# def print_formatted(self, device, key, value): +# if value is not None: +# print_light(COLOR_MOTION_SENSOR, value, self.topic, "") + + +# class tradfri_button(base): +# KEY_ACTION = "action" +# # +# ACTION_TOGGLE = "toggle" +# ACTION_BRIGHTNESS_UP = "brightness_up_click" +# ACTION_BRIGHTNESS_DOWN = "brightness_down_click" +# ACTION_RIGHT = "arrow_right_click" +# ACTION_LEFT = "arrow_left_click" +# ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" +# ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" +# ACTION_RIGHT_LONG = "arrow_right_hold" +# ACTION_LEFT_LONG = "arrow_left_hold" +# # +# COMMANDS = [ACTION_TOGGLE, ACTION_LEFT, ACTION_RIGHT, ACTION_BRIGHTNESS_UP, ACTION_BRIGHTNESS_DOWN, +# ACTION_LEFT_LONG, ACTION_RIGHT_LONG, ACTION_BRIGHTNESS_UP_LONG, ACTION_BRIGHTNESS_DOWN_LONG] + +# def __init__(self, mqtt_client, topic): +# super().__init__(mqtt_client, topic) + +# def __rx__(self, client, userdata, message): +# pass + +# def command(self, command): +# try: +# command, value = command.split(' ') +# except ValueError: +# value = None +# else: +# value = json.loads(value) +# if command in self.capabilities(): +# action = self.COMMANDS[self.COMMANDS.index(command)] +# if self.COMMANDS.index(command) <= 4: +# self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) +# elif self.COMMANDS.index(command) <= 8: +# self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) +# time.sleep(value or 0.5) +# action = '_'.join(action.split('_')[:-1] + ['release']) +# self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) + + +# class remote(base): +# def __rx__(self, client, userdata, message): +# if message.topic == self.topic + "/VOLUP": +# if self.__payload_filter__(message.payload): +# icon = u'\u1403' +# else: +# icon = u'\u25a1' +# elif message.topic == self.topic + "/VOLDOWN": +# if self.__payload_filter__(message.payload): +# icon = u'\u1401' +# else: +# icon = u'\u25a1' +# else: +# return +# devicename = ' - '.join(self.topic.split('/')[1:-1]) +# print(COLOR_REMOTE + 10 * ' ' + icon + 6 * ' ' + devicename + colored.attr("reset")) diff --git a/simulation/rooms.py b/simulation/rooms.py new file mode 100644 index 0000000..2470e6a --- /dev/null +++ b/simulation/rooms.py @@ -0,0 +1,268 @@ +import config +from simulation.devices import shelly, silvercrest_powerplug, tradfri_light, my_powerplug, brennenstuhl_heating_valve +from simulation.devices import videv_light, videv_heating +import inspect + + +class base(object): + def getmembers(self, prefix=''): + rv = [] + for name, obj in inspect.getmembers(self): + if prefix: + full_name = prefix + '.' + name + else: + full_name = name + if not name.startswith('_'): + try: + if obj.__module__.endswith('devices'): + rv.append(full_name) + else: + rv.extend(obj.getmembers(full_name)) + except AttributeError: + pass + return rv + + def getobjbyname(self, name): + obj = self + for subname in name.split('.'): + obj = getattr(obj, subname) + return obj + + def command(self, full_command): + try: + parameter = " " + full_command.split(' ')[1] + except IndexError: + parameter = "" + command = full_command.split(' ')[0].split('.')[-1] + parameter + device_name = '.'.join(full_command.split(' ')[0].split('.')[:-1]) + self.getobjbyname(device_name).command(command) + + +class gfw_floor(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + self.main_light_zigbee_1 = tradfri_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_ZIGBEE % 1, True, True, True, False) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, True, self.main_light_zigbee_1.power_on) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, False, self.main_light_zigbee_1.power_off) + self.main_light_zigbee_2 = tradfri_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_ZIGBEE % 2, True, True, True, False) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, True, self.main_light_zigbee_2.power_on) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, False, self.main_light_zigbee_2.power_off) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_VIDEV, True, True, True) + + +class gfw_marion(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_GFW_MARION_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + + self.heating_valve = brennenstuhl_heating_valve(mqtt_client, config.TOPIC_GFW_MARION_HEATING_VALVE_ZIGBEE) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_GFW_MARION_MAIN_LIGHT_VIDEV, True, False, False) + self.videv_heating = videv_heating(mqtt_client, config.TOPIC_GFW_MARION_HEATING_VALVE_VIDEV) + + +class gfw_dirk(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_GFW_DIRK_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_GFW_DIRK_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, True, self.main_light_zigbee.power_on) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, False, self.main_light_zigbee.power_off) + + self.amplifier = my_powerplug(mqtt_client, config.TOPIC_GFW_DIRK_POWERPLUG, 0) + self.amplifier.add_channel_name(my_powerplug.KEY_OUTPUT_0, "Amplifier") + self.desk_light = my_powerplug(mqtt_client, config.TOPIC_GFW_DIRK_POWERPLUG, 1) + self.desk_light.add_channel_name(my_powerplug.KEY_OUTPUT_0, "Desk Light") + self.cd_player = my_powerplug(mqtt_client, config.TOPIC_GFW_DIRK_POWERPLUG, 2) + self.cd_player.add_channel_name(my_powerplug.KEY_OUTPUT_0, "CD_Player") + self.pc_dock = my_powerplug(mqtt_client, config.TOPIC_GFW_DIRK_POWERPLUG, 3) + self.pc_dock.add_channel_name(my_powerplug.KEY_OUTPUT_0, "PC_Dock") + self.desk_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_GFW_DIRK_DESK_LIGHT_ZIGBEE, True, True, True) + self.desk_light.add_callback(my_powerplug.KEY_OUTPUT_0, True, self.desk_light_zigbee.power_on) + self.desk_light.add_callback(my_powerplug.KEY_OUTPUT_0, False, self.desk_light_zigbee.power_off) + + self.heating_valve = brennenstuhl_heating_valve(mqtt_client, config.TOPIC_GFW_DIRK_HEATING_VALVE_ZIGBEE) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_GFW_DIRK_MAIN_LIGHT_VIDEV, True, True, True) + self.videv_amplifier = videv_light(mqtt_client, config.TOPIC_GFW_DIRK_AMPLIFIER_VIDEV, True, False, False) + self.videv_desk_light = videv_light(mqtt_client, config.TOPIC_GFW_DIRK_DESK_LIGHT_VIDEV, True, True, True) + self.videv_cd_player = videv_light(mqtt_client, config.TOPIC_GFW_DIRK_CD_PLAYER_VIDEV, True, False, False) + self.videv_pc_dock = videv_light(mqtt_client, config.TOPIC_GFW_DIRK_PC_DOCK_VIDEV, True, False, False) + self.videv_heating = videv_heating(mqtt_client, config.TOPIC_GFW_DIRK_HEATING_VALVE_VIDEV) + + +class gfw(base): + def __init__(self, mqtt_client): + self.floor = gfw_floor(mqtt_client) + self.marion = gfw_marion(mqtt_client) + self.dirk = gfw_dirk(mqtt_client) + + +class ffw_julian(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, True, self.main_light_zigbee.power_on) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, False, self.main_light_zigbee.power_off) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_VIDEV, True, True, True) + + +class ffw_livingroom(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_FFW_LIVINGROOM_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFW_LIVINGROOM_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, True, self.main_light_zigbee.power_on) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, False, self.main_light_zigbee.power_off) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFW_LIVINGROOM_MAIN_LIGHT_VIDEV, True, True, True) + + +class ffw_sleep(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_FFW_SLEEP_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFW_SLEEP_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, True, self.main_light_zigbee.power_on) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, False, self.main_light_zigbee.power_off) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFW_SLEEP_MAIN_LIGHT_VIDEV, True, True, False) + + +class ffw_bath(base): + def __init__(self, mqtt_client): + self.heating_valve = brennenstuhl_heating_valve(mqtt_client, config.TOPIC_FFW_BATH_HEATING_VALVE_ZIGBEE) + # + self.videv_heating = videv_heating(mqtt_client, config.TOPIC_FFW_BATH_HEATING_VALVE_VIDEV) + + +class ffw(base): + def __init__(self, mqtt_client): + self.julian = ffw_julian(mqtt_client) + self.livingroom = ffw_livingroom(mqtt_client) + self.sleep = ffw_sleep(mqtt_client) + self.bath = ffw_bath(mqtt_client) + + +class ffe_floor(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_FFE_FLOOR_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFE_FLOOR_MAIN_LIGHT_VIDEV, True, False, False) + + +class ffe_kitchen(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_FFE_KITCHEN_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + + self.circulation_pump = shelly(mqtt_client, config.TOPIC_FFE_KITCHEN_CIRCULATION_PUMP_SHELLY, + input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER, output_0_auto_off=10*60) + self.circulation_pump.add_channel_name(shelly.KEY_OUTPUT_0, "Circulation Pump") + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFE_KITCHEN_MAIN_LIGHT_VIDEV, True, False, False) + self.videv_circulation_pump = videv_light(mqtt_client, config.TOPIC_FFE_KITCHEN_CIRCULATION_PUMP_VIDEV, True, False, False, True) + + +class ffe_diningroom(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_FFE_DININGROOM_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + + self.floor_lamp = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_DININGROOM_FLOOR_LAMP_POWERPLUG) + self.floor_lamp.add_channel_name(silvercrest_powerplug.KEY_OUTPUT_0, "Floor Lamp") + + if config.CHRISTMAS: + self.garland = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_DININGROOM_GARLAND_POWERPLUG) + self.garland.add_channel_name(silvercrest_powerplug, "Garland") + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFE_DININGROOM_MAIN_LIGHT_VIDEV, True, False, False) + self.videv_floor_lamp = videv_light(mqtt_client, config.TOPIC_FFE_DININGROOM_FLOOR_LAMP_VIDEV, True, False, False) + if config.CHRISTMAS: + self.videv_garland = videv_light(mqtt_client, config.TOPIC_FFE_DININGROOM_GARLAND_VIDEV, True, False, False) + + +class ffe_sleep(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, True, self.main_light_zigbee.power_on) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, False, self.main_light_zigbee.power_off) + + self.bed_light_di_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_DI_ZIGBEE, True, True, False) + self.bed_light_ma = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_MA_POWERPLUG) + + self.heating_valve = brennenstuhl_heating_valve(mqtt_client, config.TOPIC_FFE_SLEEP_HEATING_VALVE_ZIGBEE) + + # + self.videv_bed_light_ma = videv_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_MA_VIDEV, True, False, False) + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_VIDEV, True, True, True) + self.videv_bed_light_di = videv_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_DI_VIDEV, True, True, False) + self.videv_bed_light_ma = videv_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_MA_VIDEV, True, False, False) + + self.videv_heating = videv_heating(mqtt_client, config.TOPIC_FFE_SLEEP_HEATING_VALVE_VIDEV) + + +class ffe_livingroom(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, True, self.main_light_zigbee.power_on) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, False, self.main_light_zigbee.power_off) + + for i in range(1, 7): + setattr(self, "floor_lamp_zigbee_%d" % i, tradfri_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_FLOOR_LAMP_ZIGBEE % i, True, True, True)) + + if config.CHRISTMAS: + self.xmas_tree = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_LIVINGROOM_XMAS_TREE_POWERPLUG) + self.xmas_tree.add_channel_name(silvercrest_powerplug, "Xmas-Tree") + self.xmas_star = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_LIVINGROOM_XMAS_STAR_POWERPLUG) + self.xmas_star.add_channel_name(silvercrest_powerplug, "Xmas-Star") + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_VIDEV, True, True, True) + self.videv_floor_lamp = videv_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_FLOOR_LAMP_VIDEV, True, True, True) + if config.CHRISTMAS: + self.videv_xmas_tree = videv_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_XMAS_TREE_VIDEV) + + +class ffe(base): + def __init__(self, mqtt_client): + self.floor = ffe_floor(mqtt_client) + self.kitchen = ffe_kitchen(mqtt_client) + self.diningroom = ffe_diningroom(mqtt_client) + self.sleep = ffe_sleep(mqtt_client) + self.livingroom = ffe_livingroom(mqtt_client) + + +class stairway(base): + def __init__(self, mqtt_client): + self.main_light = shelly(mqtt_client, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) + self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_VIDEV, True, False, False, True) + + +class house(base): + def __init__(self, mqtt_client): + self.gfw = gfw(mqtt_client) + self.ffw = ffw(mqtt_client) + self.ffe = ffe(mqtt_client) + self.stairway = stairway(mqtt_client) diff --git a/smart_brain_test.py b/smart_brain_test.py new file mode 100644 index 0000000..ac41633 --- /dev/null +++ b/smart_brain_test.py @@ -0,0 +1,85 @@ +import config +import logging +import mqtt +import readline +import report +from simulation.rooms import house +from tests import test_smarthome +import time + +if __name__ == "__main__": + report.stdoutLoggingConfigure( + ((config.APP_NAME, logging.WARNING), ), report.SHORT_FMT) + # + mc = mqtt.mqtt_client(host=config.MQTT_SERVER, port=config.MQTT_PORT, username=config.MQTT_USER, + password=config.MQTT_PASSWORD, name=config.APP_NAME + '_simulation') + # + COMMANDS = ['quit', 'help'] + # + h = house(mc) + for name in h.getmembers(): + d = h.getobjbyname(name) + for c in d.capabilities(): + COMMANDS.append(name + '.' + c) + # + ts = test_smarthome(h) + for name in ts.getmembers(): + d = ts.getobjbyname(name) + for c in d.capabilities(): + COMMANDS.append('test.' + name + '.' + c) + + def reduced_list(text): + """ + Create reduced completation list + """ + reduced_list = {} + for cmd in COMMANDS: + next_dot = cmd[len(text):].find('.') + if next_dot >= 0: + reduced_list[cmd[:len(text) + next_dot + 1]] = None + else: + reduced_list[cmd] = None + return reduced_list.keys() + + def completer(text, state): + """ + Our custom completer function + """ + options = [x for x in reduced_list(text) if x.startswith(text)] + return options[state] + + def complete(text, state): + for cmd in COMMANDS: + if cmd.startswith(text): + if not state: + hit = "" + index = 0 + sub_list = cmd.split('.') + while len(text) >= len(hit): + hit += sub_list[index] + '.' + return hit # cmd + else: + state -= 1 + + readline.parse_and_bind("tab: complete") + readline.set_completer(completer) + time.sleep(0.3) + print("\nEnter command: ") + + while True: + userfeedback = input('') + command = userfeedback.split(' ')[0] + if userfeedback == 'quit': + break + elif userfeedback == 'help': + print("Help is not yet implemented!") + elif userfeedback.startswith("test"): + ts.command(userfeedback) + elif userfeedback == 'test.smoke': + ts.smoke() + elif command in COMMANDS[2:]: + h.command(userfeedback) + elif userfeedback != "": + print("Unknown command!") + else: + print() diff --git a/task b/task new file mode 160000 index 0000000..af35e83 --- /dev/null +++ b/task @@ -0,0 +1 @@ +Subproject commit af35e83d1f07fd4cb9070bdb77cf1f3bdda3a463 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..fcc36e9 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,396 @@ +import colored +import config +import inspect +import simulation.devices as devices +import time + + +DT_TOGGLE = 0.3 + + +TEST_FULL = 'full' +TEST_SMOKE = 'smoke' +# +COLOR_SUCCESS = colored.fg("light_green") +COLOR_FAIL = colored.fg("light_red") + + +class test_smarthome(object): + def __init__(self, rooms): + # add testcases for switching devices + for name in rooms.getmembers(): + obj = rooms.getobjbyname(name) + if obj.__class__.__name__ == "videv_light": + common_name = '.'.join(name.split('.')[:-1]) + '.' + name.split('.')[-1][6:] + try: + li_device = rooms.getobjbyname(common_name + '_zigbee') if obj.enable_brightness or obj.enable_color_temp else None + except AttributeError: + li_device = rooms.getobjbyname(common_name + '_zigbee_1') if obj.enable_brightness or obj.enable_color_temp else None + try: + sw_device = rooms.getobjbyname(common_name) if obj.enable_state else None + except AttributeError: + # must be a device without switching device + sw_device = li_device + setattr(self, common_name.replace('.', '_'), testcase_light(obj, sw_device, li_device)) + # add testcases for heating devices + for name in rooms.getmembers(): + obj = rooms.getobjbyname(name) + if obj.__class__.__name__ == "videv_heating": + common_name = '.'.join(name.split('.')[:-1]) + '.' + name.split('.')[-1][6:] + heat_device = rooms.getobjbyname(common_name + '_valve') + setattr(self, common_name.replace('.', '_'), testcase_heating(obj, heat_device)) + # synchronisation + self.gfw_dirk_cd_player_amplifier_sync = testcase_synchronisation( + rooms.gfw.dirk.cd_player, None, None, + rooms.gfw.dirk.amplifier) + self.gfw_floor_main_light_sync = testcase_synchronisation( + rooms.gfw.floor.main_light, rooms.gfw.floor.videv_main_light, rooms.gfw.floor.videv_main_light, + rooms.gfw.floor.main_light_zigbee_1, rooms.gfw.floor.main_light_zigbee_2 + ) + self.ffe_diningroom_main_light_floor_lamp_sync = testcase_synchronisation( + rooms.ffe.diningroom.main_light, None, None, + rooms.ffe.diningroom.floor_lamp) + self.ffe_livingroom_main_light_floor_lamp_sync = testcase_synchronisation( + rooms.ffe.livingroom.main_light, rooms.ffe.livingroom.videv_floor_lamp, rooms.ffe.livingroom.videv_floor_lamp, + *[getattr(rooms.ffe.livingroom, "floor_lamp_zigbee_%d" % i) for i in range(1, 7)] + ) + # add test collection + self.all = test_collection(self) + + def getmembers(self, prefix=''): + rv = [] + for name, obj in inspect.getmembers(self): + if prefix: + full_name = prefix + '.' + name + else: + full_name = name + if not name.startswith('_'): + try: + if obj.__class__.__bases__[0].__name__ == "testcase" or obj.__class__.__name__ == "test_collection": + rv.append(full_name) + else: + rv.extend(obj.getmembers(full_name)) + except AttributeError: + pass + return rv + + def getobjbyname(self, name): + if name.startswith("test."): + name = name[5:] + obj = self + for subname in name.split('.'): + obj = getattr(obj, subname) + return obj + + def command(self, full_command): + try: + parameter = " " + full_command.split(' ')[1] + except IndexError: + parameter = "" + command = full_command.split(' ')[0].split('.')[-1] + parameter + device_name = '.'.join(full_command.split(' ')[0].split('.')[:-1]) + self.getobjbyname(device_name).command(command) + + +class test_result_base(object): + def __init__(self): + self.__init_test_counters__() + + def __init_test_counters__(self): + self.test_counter = 0 + self.success_tests = 0 + self.failed_tests = 0 + + def statistic(self): + return (self.test_counter, self.success_tests, self.failed_tests) + + def print_statistic(self): + color = COLOR_SUCCESS if self.test_counter == self.success_tests else COLOR_FAIL + print(color + "*** SUCCESS: (%4d/%4d) FAIL: (%4d/%4d) ***\n" % (self.success_tests, + self.test_counter, self.failed_tests, self.test_counter) + colored.attr("reset")) + + +class test_collection(test_result_base): + def __init__(self, test_instance): + super().__init__() + self.test_instance = test_instance + + def capabilities(self): + return [TEST_FULL, TEST_SMOKE] + + def command(self, command): + self.__init_test_counters__() + for member in self.test_instance.getmembers(): + obj = self.test_instance.getobjbyname(member) + if id(obj) != id(self): + obj.test_all(command) + num, suc, fail = obj.statistic() + self.test_counter += num + self.success_tests += suc + self.failed_tests += fail + self.print_statistic() + + +class testcase(test_result_base): + def __init__(self): + super().__init__() + self.__test_list__ = [] + + def capabilities(self): + if len(self.__test_list__) > 0 and not 'test_all' in self.__test_list__: + self.__test_list__.append('test_all') + self.__test_list__.sort() + return self.__test_list__ + + def test_all(self, test=TEST_FULL): + test_counter = 0 + success_tests = 0 + failed_tests = 0 + for tc_name in self.capabilities(): + if tc_name != "test_all": + self.command(tc_name, test) + test_counter += self.test_counter + success_tests += self.success_tests + failed_tests += self.failed_tests + self.test_counter = test_counter + self.success_tests = success_tests + self.failed_tests = failed_tests + + def command(self, command, test=TEST_FULL): + self.__init_test_counters__() + tc = getattr(self, command) + self.__init_test_counters__() + tc(test) + self.print_statistic() + + def heading(self, desciption): + print(desciption) + + def sub_heading(self, desciption): + print(2 * " " + desciption) + + def result(self, desciption, success): + self.test_counter += 1 + if success: + self.success_tests += 1 + else: + self.failed_tests += 1 + print(4 * " " + ("SUCCESS - " if success else "FAIL - ") + desciption) + + +class testcase_light(testcase): + def __init__(self, videv, sw_device, li_device): + self.videv = videv + self.sw_device = sw_device + self.li_device = li_device + self.__test_list__ = [] + if self.videv.enable_state: + self.__test_list__.append('test_power_on_off') + if self.videv.enable_brightness: + self.__test_list__.append('test_brightness') + if self.videv.enable_color_temp: + self.__test_list__.append('test_color_temp') + + def test_power_on_off(self, test=TEST_FULL): + self.heading("Power On/ Off test (%s)" % self.videv.topic) + # + sw_state = self.sw_device.get(self.sw_device.KEY_OUTPUT_0) + # + for i in range(0, 2): + self.sub_heading("State change of switching device") + # + self.sw_device.set(self.sw_device.KEY_OUTPUT_0, not self.sw_device.get(self.sw_device.KEY_OUTPUT_0)) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after Switch on by switching device", sw_state != self.videv.get(self.videv.KEY_OUTPUT_0)) + self.result("Switching device state after Switch on by switching device", + sw_state != self.sw_device.get(self.sw_device.KEY_OUTPUT_0)) + + self.sub_heading("State change of virtual device") + # + self.videv.set(self.videv.KEY_OUTPUT_0, not self.videv.get(self.videv.KEY_OUTPUT_0)) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after Switch off by virtual device", sw_state == self.videv.get(self.videv.KEY_OUTPUT_0)) + self.result("Switching device state after Switch on by switching device", + sw_state == self.sw_device.get(self.sw_device.KEY_OUTPUT_0)) + + def test_brightness(self, test=TEST_FULL): + self.heading("Brightness test (%s)" % self.videv.topic) + # + br_state = self.li_device.get(self.li_device.KEY_BRIGHTNESS) + delta = -15 if br_state > 50 else 15 + + self.sw_device.set(self.sw_device.KEY_OUTPUT_0, True) + time.sleep(DT_TOGGLE) + + for i in range(0, 2): + self.sub_heading("Brightness change by light device") + # + self.li_device.set(self.li_device.KEY_BRIGHTNESS, br_state + delta) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after setting brightness by light device", + br_state + delta == self.videv.get(self.videv.KEY_BRIGHTNESS)) + self.result("Light device state after setting brightness by light device", br_state + + delta == self.li_device.get(self.li_device.KEY_BRIGHTNESS)) + + self.sub_heading("Brightness change by virtual device") + # + self.videv.set(self.videv.KEY_BRIGHTNESS, br_state) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after setting brightness by light device", br_state == self.videv.get(self.videv.KEY_BRIGHTNESS)) + self.result("Light device state after setting brightness by light device", + br_state == self.li_device.get(self.li_device.KEY_BRIGHTNESS)) + + self.sw_device.set(self.sw_device.KEY_OUTPUT_0, False) + time.sleep(DT_TOGGLE) + + def test_color_temp(self, test=TEST_FULL): + self.heading("Color temperature test (%s)" % self.videv.topic) + # + ct_state = self.li_device.get(self.li_device.KEY_COLOR_TEMP) + delta = -3 if ct_state > 5 else 3 + + self.sw_device.set(self.sw_device.KEY_OUTPUT_0, True) + time.sleep(DT_TOGGLE) + + for i in range(0, 2): + self.sub_heading("Color temperature change by light device") + # + self.li_device.set(self.li_device.KEY_COLOR_TEMP, ct_state + delta) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after setting color temperature by light device", + ct_state + delta == self.videv.get(self.videv.KEY_COLOR_TEMP)) + self.result("Light device state after setting color temperature by light device", ct_state + + delta == self.li_device.get(self.li_device.KEY_COLOR_TEMP)) + + self.sub_heading("Color temperature change by virtual device") + # + self.videv.set(self.videv.KEY_COLOR_TEMP, ct_state) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after setting color temperature by light device", + ct_state == self.videv.get(self.videv.KEY_COLOR_TEMP)) + self.result("Light device state after setting color temperature by light device", + ct_state == self.li_device.get(self.li_device.KEY_COLOR_TEMP)) + + self.sw_device.set(self.sw_device.KEY_OUTPUT_0, False) + time.sleep(DT_TOGGLE) + + +class testcase_synchronisation(testcase): + def __init__(self, sw_master, br_master, ct_master, *follower): + super().__init__() + self.sw_master = sw_master + self.br_master = br_master + self.ct_master = ct_master + self.follower = follower + self.__test_list__ = [] + if sw_master is not None: + self.__test_list__.append('test_power_on_off_sync') + if br_master is not None: + self.__test_list__.append('test_brightness_sync') + + def test_power_on_off_sync(self, test=TEST_FULL): + self.heading("Power On/ Off sychronisation test") + # + # set sw_master to slave state as precondition + f_state = self.follower[0].get(self.follower[0].KEY_OUTPUT_0) + self.sw_master.set(self.sw_master.KEY_OUTPUT_0, f_state) + time.sleep(DT_TOGGLE) + for i in range(0, 2): + self.sub_heading("State change of sw_master device") + # + self.sw_master.set(self.sw_master.KEY_OUTPUT_0, not f_state) + time.sleep(DT_TOGGLE) + f_state = not f_state + for fo in self.follower: + self.result("Follower device state after switching (%s)" % fo.topic, + f_state == fo.get(fo.KEY_OUTPUT_0)) + + def test_brightness_sync(self, test=TEST_FULL): + self.heading("Power On/ Off sychronisation test") + # + # set precondition + sw_state = self.sw_master.get(self.sw_master.KEY_OUTPUT_0) + self.sw_master.set(self.sw_master.KEY_OUTPUT_0, True) + time.sleep(DT_TOGGLE) + + target = self.follower[0].get(self.follower[0].KEY_BRIGHTNESS) + delta = 15 if target < 50 else -15 + for i in range(0, 2): + target = target + delta + self.sub_heading("State change of br_master device") + # + self.br_master.set(self.br_master.KEY_BRIGHTNESS, target) + time.sleep(DT_TOGGLE) + for fo in self.follower: + self.result("Follower device brightness after change (%s)" % fo.topic, + target == fo.get(fo.KEY_BRIGHTNESS)) + delta = -delta + + # reset changes of precondition + self.sw_master.set(self.sw_master.KEY_OUTPUT_0, sw_state) + time.sleep(DT_TOGGLE) + + +class testcase_heating(testcase): + def __init__(self, videv, valve): + self.__videv__ = videv + self.__valve__ = valve + self.__default_temperature__ = config.DEFAULT_TEMPERATURE.get(self.__valve__.topic) + self.__test_list__ = ["test_user_temperature_setpoint", "test_default_temperature", ] + + def test_user_temperature_setpoint(self, test=TEST_FULL): + self.heading("User temperature setpoint test (%s)" % self.__videv__.topic) + # + mtemp = round(self.__valve__.TEMP_RANGE[0] + (self.__valve__.TEMP_RANGE[1] - self.__valve__.TEMP_RANGE[0]) / 2, 1) + setp = self.__valve__.get(self.__valve__.KEY_TEMPERATURE_SETPOINT) + delta = 5 if setp < mtemp else -5 + + for i in range(0, 2): + self.sub_heading("Setpoint change by valve device") + # + self.__valve__.set(self.__valve__.KEY_TEMPERATURE_SETPOINT, setp + delta) + time.sleep(DT_TOGGLE) + self.result("Virtual device valve temperature after setting temperature by valve device", + setp + delta == self.__videv__.get(self.__videv__.KEY_VALVE_TEMPERATURE_SETPOINT)) + self.result("Virtual device user temperature after setting temperature by valve device", + setp + delta == self.__videv__.get(self.__videv__.KEY_USER_TEMPERATURE_SETPOINT)) + + self.sub_heading("Setpoint change by videv device") + # + self.__videv__.set(self.__videv__.KEY_USER_TEMPERATURE_SETPOINT, setp) + time.sleep(DT_TOGGLE) + self.result("Valve device temperature setpoint after setting temperature by videv device", + setp == self.__valve__.get(self.__valve__.KEY_TEMPERATURE_SETPOINT)) + self.result("Virtual device valve temperature after setting temperature by videv device", + setp == self.__videv__.get(self.__videv__.KEY_VALVE_TEMPERATURE_SETPOINT)) + + def test_default_temperature(self, test=TEST_FULL): + self.heading("Default temperature setpoint test (%s)" % self.__videv__.topic) + # + dtemp = config.DEFAULT_TEMPERATURE.get(self.__valve__.topic) + mtemp = round(self.__valve__.TEMP_RANGE[0] + (self.__valve__.TEMP_RANGE[1] - self.__valve__.TEMP_RANGE[0]) / 2, 1) + ptemp = dtemp + (5 if dtemp < mtemp else -5) + + self.sub_heading("Setting setpoint to a value unequal default as precondition") + self.__valve__.set(self.__valve__.KEY_TEMPERATURE_SETPOINT, ptemp) + time.sleep(DT_TOGGLE) + self.result("Valve device temperature setpoint after setting precondition temperature", + ptemp == self.__valve__.get(self.__valve__.KEY_TEMPERATURE_SETPOINT)) + + self.sub_heading("Triggering default temperature by videv device") + self.__videv__.set(self.__videv__.KEY_SET_DEFAULT_TEMPERATURE, None) + time.sleep(DT_TOGGLE) + self.result("Valve device temperature setpoint after setting default temperature", + dtemp == self.__valve__.get(self.__valve__.KEY_TEMPERATURE_SETPOINT)) + + def test_summer_mode(self, test=TEST_FULL): + pass + + def test_away_mode(self, test=TEST_FULL): + pass + + def test_boost_mode(self, test=TEST_FULL): + pass + if test == TEST_FULL: + # test boost timer + pass