diff --git a/__simulation__/devices.py b/__simulation__/devices.py index 11df954..d0a90b7 100644 --- a/__simulation__/devices.py +++ b/__simulation__/devices.py @@ -1,4 +1,5 @@ import colored +import copy import json import task import time @@ -107,6 +108,15 @@ class base(object): if self.AUTOSEND and len(keys_changed) > 0: self.__tx__(keys_changed) + def get_data(self, key, default=None): + rv = self.data.get(key, default) + try: + rv = True if rv.lower() == 'on' else rv + rv = False if rv.lower() == 'off' else rv + except AttributeError: + pass + return rv + def __tx__(self, keys_changed): self.mqtt_client.send(self.topic, json.dumps(self.data)) @@ -138,7 +148,7 @@ class shelly(base): def __init__(self, mqtt_client, topic, input_0_func=None, input_1_func=None, output_0_auto_off=None): super().__init__(mqtt_client, topic) # - self.store_data(**{self.KEY_OUTPUT_0: "off", self.KEY_OUTPUT_1: "off", self.KEY_INPUT_0: "off", self.KEY_INPUT_1: "off"}) + self.store_data(**{self.KEY_OUTPUT_0: False, self.KEY_OUTPUT_1: False, self.KEY_INPUT_0: False, self.KEY_INPUT_1: False}) self.__input_0_func = input_0_func self.__input_1_func = input_1_func self.__output_0_auto_off__ = output_0_auto_off @@ -146,8 +156,8 @@ class shelly(base): self.__delayed_off__ = task.delayed(float(self.__output_0_auto_off__), self.__auto_off__, self.KEY_OUTPUT_0) # self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) - self.add_callback(self.KEY_OUTPUT_0, self.__start_auto_off__, "on") - self.add_callback(self.KEY_OUTPUT_0, self.__stop_auto_off__, "off") + self.add_callback(self.KEY_OUTPUT_0, self.__start_auto_off__, True) + self.add_callback(self.KEY_OUTPUT_0, self.__stop_auto_off__, True) self.add_callback(self.KEY_OUTPUT_1, self.print_formatted, None) # self.add_callback(self.KEY_INPUT_0, self.__input_function__, None) @@ -164,7 +174,7 @@ class shelly(base): def __tx__(self, keys_changed): for key in keys_changed: - self.mqtt_client.send(self.topic + '/' + key, self.data.get(key)) + self.mqtt_client.send(self.topic + '/' + key, "on" if self.data.get(key) else "off") def __input_function__(self, device, key, data): if key == self.KEY_INPUT_0: @@ -197,14 +207,11 @@ class shelly(base): self.__set_data__(key, 'off') def __set_data__(self, key, value): - if value in ["on", "off"]: - self.store_data(**{key: value}) - else: - print("Wrong value (%s)!" % repr(value)) + self.store_data(**{key: value == "on"}) def __toggle_data__(self, key): if key in self.data: - self.__set_data__(key, 'on' if self.data.get(key) == 'off' else 'off') + self.__set_data__(key, "on" if not self.data.get(key) else "off") def command(self, command): if command in self.COMMANDS: @@ -227,15 +234,15 @@ class shelly(base): elif command == self.COMMANDS[8]: self.__toggle_data__(self.KEY_INPUT_0) time.sleep(0.4) - self.__set_data__(self.KEY_LONGPUSH_0, 'on') + self.__set_data__(self.KEY_LONGPUSH_0, True) time.sleep(0.1) - self.__set_data__(self.KEY_LONGPUSH_0, 'off') + self.__set_data__(self.KEY_LONGPUSH_0, False) elif command == self.COMMANDS[9]: self.__toggle_data__(self.KEY_INPUT_1) time.sleep(0.4) - self.__set_data__(self.KEY_LONGPUSH_1, 'on') + self.__set_data__(self.KEY_LONGPUSH_1, True) time.sleep(0.1) - self.__set_data__(self.KEY_LONGPUSH_1, 'off') + self.__set_data__(self.KEY_LONGPUSH_1, False) else: print("%s: not yet implemented!" % command) else: @@ -243,45 +250,33 @@ class shelly(base): def print_formatted(self, device, key, value): if value is not None: - info = (" - %ds" % self.__output_0_auto_off__) if self.__output_0_auto_off__ is not None and value == "on" else "" + info = (" - %ds" % self.__output_0_auto_off__) if self.__output_0_auto_off__ is not None and value else "" channel = "(%s%s)" % (self.names.get(key, key), info) - print_light(COLOR_SHELLY, value == "on", self.topic, channel) + print_light(COLOR_SHELLY, value, self.topic, channel) class my_powerplug(base): - KEY_OUTPUT_0 = "0" - KEY_OUTPUT_1 = "1" - KEY_OUTPUT_2 = "2" - KEY_OUTPUT_3 = "3" + KEY_OUTPUT_0 = "state" COMMANDS = [ - "get_output_0", "toggle_output_0", - "get_output_1", "toggle_output_1", - "get_output_2", "toggle_output_2", - "get_output_3", "toggle_output_3", + "get_output", "toggle_output", ] - def __init__(self, mqtt_client, topic): - super().__init__(mqtt_client, topic) + def __init__(self, mqtt_client, topic, channel): + super().__init__(mqtt_client, topic + '/' + "output/%d" % (channel + 1)) # - for i in range(0, 4): - self.data[str(i)] = False - self.add_callback(str(i), self.print_formatted, None) + self.data[self.KEY_OUTPUT_0] = False + self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) def __rx__(self, client, userdata, message): - if message.topic.startswith(self.topic + '/output/') and message.topic.endswith('/set'): - try: - channels = [int(message.topic.split('/')[-2]) - 1] - except ValueError: - channels = range(0, 4) - for channel in channels: - payload = payload_filter(message.payload) - if payload == "toggle": - payload = not self.data.get(str(channel)) - self.store_data(**{str(channel): payload}) + if message.topic == self.topic + '/set': + payload = payload_filter(message.payload) + if payload == "toggle": + payload = not self.data.get(self.KEY_OUTPUT_0) + self.store_data(**{self.KEY_OUTPUT_0: payload}) def __tx__(self, keys_changed): for key in keys_changed: - self.mqtt_client.send(self.topic + "/output/" + str(int(key) + 1), json.dumps(self.data.get(key))) + self.mqtt_client.send(self.topic, json.dumps(self.data.get(key))) def command(self, command): if command in self.COMMANDS: @@ -289,18 +284,6 @@ class my_powerplug(base): self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.store_data(**{self.KEY_OUTPUT_0: not self.data.get(self.KEY_OUTPUT_0)}) - elif command == self.COMMANDS[2]: - self.print_formatted(self, self.KEY_OUTPUT_1, self.data.get(self.KEY_OUTPUT_1)) - elif command == self.COMMANDS[3]: - self.store_data(**{self.KEY_OUTPUT_1: not self.data.get(self.KEY_OUTPUT_1)}) - elif command == self.COMMANDS[4]: - self.print_formatted(self, self.KEY_OUTPUT_2, self.data.get(self.KEY_OUTPUT_2)) - elif command == self.COMMANDS[5]: - self.store_data(**{self.KEY_OUTPUT_2: not self.data.get(self.KEY_OUTPUT_2)}) - elif command == self.COMMANDS[6]: - self.print_formatted(self, self.KEY_OUTPUT_3, self.data.get(self.KEY_OUTPUT_3)) - elif command == self.COMMANDS[7]: - self.store_data(**{self.KEY_OUTPUT_3: not self.data.get(self.KEY_OUTPUT_3)}) else: print("%s: not yet implemented!" % command) else: @@ -308,7 +291,7 @@ class my_powerplug(base): def print_formatted(self, device, key, value): if value is not None: - print_light(COLOR_POWERPLUG, value, self.topic, "(%s)" % self.names.get(key, "Channel %d" % (int(key) + 1))) + print_light(COLOR_POWERPLUG, value, self.topic, "(%s)" % self.names.get(key, "State")) class silvercrest_powerplug(base): @@ -322,7 +305,7 @@ class silvercrest_powerplug(base): super().__init__(mqtt_client, topic) self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) # - self.store_data(**{self.KEY_OUTPUT_0: "off"}) + self.store_data(**{self.KEY_OUTPUT_0: False}) def __rx__(self, client, userdata, message): if message.topic == self.topic + '/set': @@ -331,20 +314,24 @@ class silvercrest_powerplug(base): state = json.loads(message.payload).get('state').lower() if state in STATES: if state == STATES[0]: - self.store_data(**{self.KEY_OUTPUT_0: 'on'}) + self.store_data(**{self.KEY_OUTPUT_0: True}) elif state == STATES[1]: - self.store_data(**{self.KEY_OUTPUT_0: 'off'}) + self.store_data(**{self.KEY_OUTPUT_0: False}) else: - self.store_data(**{self.KEY_OUTPUT_0: "off" if self.data.get(self.KEY_OUTPUT_0) == "on" else "on"}) + self.store_data(**{not self.data.get(self.KEY_OUTPUT_0)}) + + def __tx__(self, keys_changed): + for key in keys_changed: + self.mqtt_client.send(self.topic + '/' + key, "on" if self.data.get(key) else "off") def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: - self.store_data(**{self.KEY_OUTPUT_0: 'on'}) + self.store_data(**{self.KEY_OUTPUT_0: True}) elif command == self.COMMANDS[2]: - self.store_data(**{self.KEY_OUTPUT_0: 'off'}) + self.store_data(**{self.KEY_OUTPUT_0: False}) else: print("%s: not yet implemented!" % command) else: @@ -352,40 +339,11 @@ class silvercrest_powerplug(base): def print_formatted(self, device, key, value): if value is not None: - print_light(COLOR_POWERPLUG, value == "on", self.topic, "(%s)" % self.names.get(key, key)) - - -class silvercrest_motion_sensor(base): - KEY_OCCUPANCY = "occupancy" - COMMANDS = ['motion'] - - def __init__(self, mqtt_client, topic): - super().__init__(mqtt_client, topic) - self.data[self.KEY_OCCUPANCY] = False - self.add_callback(self.KEY_OCCUPANCY, self.print_formatted, None) - - def __rx__(self, client, userdata, message): - pass - - def command(self, command): - try: - command, value = command.split(' ') - except ValueError: - value = None - else: - value = json.loads(value) - if command == self.COMMANDS[0]: - self.store_data(**{self.KEY_OCCUPANCY: True}) - time.sleep(value or 10) - self.store_data(**{self.KEY_OCCUPANCY: False}) - - def print_formatted(self, device, key, value): - if value is not None: - print_light(COLOR_MOTION_SENSOR, value, self.topic, "") + print_light(COLOR_POWERPLUG, value, self.topic, "(%s)" % self.names.get(key, key)) class tradfri_light(base): - KEY_STATE = "state" + KEY_OUTPUT_0 = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" KEY_BRIGHTNESS_MOVE = "brightness_move" @@ -397,7 +355,7 @@ class tradfri_light(base): def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, send_on_power_on=True): super().__init__(mqtt_client, topic) self.send_on_power_on = send_on_power_on - self.add_callback(self.KEY_STATE, self.print_formatted, None) + self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) self.add_callback(self.KEY_BRIGHTNESS, self.print_formatted, None) self.add_callback(self.KEY_COLOR_TEMP, self.print_formatted, None) # @@ -413,34 +371,41 @@ class tradfri_light(base): def __init_data__(self, enable_state, enable_brightness, enable_color_temp): data = {} if enable_state: - data[self.KEY_STATE] = 'off' + data[self.KEY_OUTPUT_0] = False self.commands.extend(self.STATE_COMMANDS) if enable_brightness: - data[self.KEY_BRIGHTNESS] = 128 + data[self.KEY_BRIGHTNESS] = 50 self.brightnes_move = (0, time.time()) self.commands.extend(self.BRIGHTNESS_COMMANDS) if enable_color_temp: - data[self.KEY_COLOR_TEMP] = 352 + data[self.KEY_COLOR_TEMP] = 5 self.commands.extend(self.COLOR_TEMP_COMMANDS) self.store_data(**data) def __rx__(self, client, userdata, message): data = json.loads(message.payload) - if self.data.get(self.KEY_STATE) == 'on' or data.get(self.KEY_STATE) in ['on', 'toggle']: + if self.data.get(self.KEY_OUTPUT_0) or data.get(self.KEY_OUTPUT_0) in ['on', 'toggle']: if message.topic.startswith(self.topic) and message.topic.endswith('/set'): for targetkey in data: value = data[targetkey] if targetkey in self.data.keys(): - if targetkey == self.KEY_STATE and value == "toggle": - value = "on" if self.data.get(self.KEY_STATE) == "off" else "off" + if targetkey == self.KEY_OUTPUT_0: + if value == "toggle": + value = not self.data.get(self.KEY_OUTPUT_0) + else: + value = value == "on" + elif targetkey == self.KEY_BRIGHTNESS: + value = round((value - 1) / 2.53, 0) + elif targetkey == self.KEY_COLOR_TEMP: + value = round((value - 250) / 20.4, 0) self.store_data(**{targetkey: value}) else: if targetkey == self.KEY_BRIGHTNESS_MOVE: new_value = self.data.get(self.KEY_BRIGHTNESS) + (time.time() - self.brightnes_move[1]) * self.brightnes_move[0] if new_value < 0: new_value = 0 - if new_value > 255: - new_value = 255 + if new_value > 256: + new_value = 256 self.store_data(**{self.KEY_BRIGHTNESS: int(new_value)}) self.brightnes_move = (value, time.time()) else: @@ -448,6 +413,16 @@ class tradfri_light(base): elif message.topic == self.topic + '/get': self.__tx__(None) + def __tx__(self, keys_changed): + tx_data = copy.copy(self.data) + if self.KEY_OUTPUT_0 in tx_data: + tx_data[self.KEY_OUTPUT_0] = "on" if tx_data[self.KEY_OUTPUT_0] else "off" + if self.KEY_BRIGHTNESS in tx_data: + tx_data[self.KEY_BRIGHTNESS] = 1 + round(2.53 * tx_data[self.KEY_BRIGHTNESS], 0) + if self.KEY_COLOR_TEMP in tx_data: + tx_data[self.KEY_COLOR_TEMP] = 250 + round(20.4 * tx_data[self.KEY_COLOR_TEMP], 0) + self.mqtt_client.send(self.topic, json.dumps(tx_data)) + def command(self, command): try: command, value = command.split(' ') @@ -455,9 +430,9 @@ class tradfri_light(base): value = None if command in self.capabilities(): if command == self.STATE_COMMANDS[0]: - self.print_formatted(self, self.KEY_STATE, self.data.get(self.KEY_STATE)) + self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.STATE_COMMANDS[1]: - self.store_data(**{self.KEY_STATE: 'off' if self.data.get(self.KEY_STATE) == 'on' else 'on'}) + self.store_data(**{self.KEY_OUTPUT_0: not self.data.get(self.KEY_OUTPUT_0)}) elif command == self.BRIGHTNESS_COMMANDS[0]: self.print_formatted(self, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) elif command == self.BRIGHTNESS_COMMANDS[1]: @@ -472,28 +447,28 @@ class tradfri_light(base): print("Unknown command!") def power_off(self, device, key, value): - self.data[self.KEY_STATE] = 'off' - self.print_formatted(self, self.KEY_STATE, 'off') + self.data[self.KEY_OUTPUT_0] = False + self.print_formatted(self, self.KEY_OUTPUT_0, False) def power_on(self, device, key, value): if self.send_on_power_on: - self.store_data(**{self.KEY_STATE: 'on'}) + self.store_data(**{self.KEY_OUTPUT_0: True}) else: - self.data[self.KEY_STATE] = 'on' - self.print_formatted(self, self.KEY_STATE, 'on') + self.data[self.KEY_OUTPUT_0] = True + self.print_formatted(self, self.KEY_OUTPUT_0, True) def print_formatted(self, device, key, value): if value is not None: color = COLOR_LIGHT_ACTIVE - if key == self.KEY_STATE: - print_light(COLOR_LIGHT_ACTIVE, value == "on", self.topic, "") + if key == self.KEY_OUTPUT_0: + print_light(COLOR_LIGHT_ACTIVE, value, self.topic, "") self.print_formatted(device, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) self.print_formatted(device, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: - perc_value = round(value * 100 / 256, 0) if key == self.KEY_BRIGHTNESS else round((value - 250) * 100 / 204, 0) + perc_value = round(value, 0) if key == self.KEY_BRIGHTNESS else round(10 * value, 0) print_percent( - COLOR_LIGHT_PASSIVE if self.data.get(self.KEY_STATE) != "on" else COLOR_LIGHT_ACTIVE, - 'B' if key == gui_light.KEY_BRIGHTNESS else 'C', + COLOR_LIGHT_PASSIVE if not self.data.get(self.KEY_OUTPUT_0) else COLOR_LIGHT_ACTIVE, + 'B' if key == self.KEY_BRIGHTNESS else 'C', perc_value, "%3d%%" % perc_value, self.topic, @@ -501,44 +476,109 @@ class tradfri_light(base): ) -class gui_light(tradfri_light): +class brennenstuhl_heating_valve(base): + TEMP_RANGE = [10, 30] + # + KEY_TEMPERATURE_SETPOINT = "current_heating_setpoint" + KEY_TEMPERATURE = "local_temperature" + # + COMMANDS = [ + "get_temperature_setpoint", "set_temperature_setpoint", "set_local_temperature", + ] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + self.store_data(**{ + self.KEY_TEMPERATURE_SETPOINT: 20, + self.KEY_TEMPERATURE: 20.7, + }) + self.add_callback(self.KEY_TEMPERATURE_SETPOINT, self.print_formatted, None) + + def __rx__(self, client, userdata, message): + if message.topic.startswith(self.topic) and message.topic.endswith("/set"): + payload = payload_filter(message.payload) + self.store_data(**payload) + + def command(self, command): + try: + command, value = command.split(' ') + except ValueError: + value = None + if command in self.COMMANDS: + if command == self.COMMANDS[0]: + self.print_formatted(self, self.KEY_TEMPERATURE_SETPOINT, self.data.get(self.KEY_TEMPERATURE_SETPOINT)) + elif command == self.COMMANDS[1]: + self.store_data(**{self.KEY_TEMPERATURE_SETPOINT: command_float_value(value)}) + elif command == self.COMMANDS[2]: + self.store_data(**{self.KEY_TEMPERATURE: command_float_value(value)}) + + def print_formatted(self, device, key, value): + devicename = ' - '.join(self.topic.split('/')[1:]) + if key == self.KEY_TEMPERATURE_SETPOINT: + perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) + perc = 100 if perc > 100 else perc + perc = 0 if perc < 0 else perc + print_percent(COLOR_HEATING_VALVE, '\u03d1', perc, "%4.1f°C" % value, self.topic, "") + + +class videv_light(base): AUTOSEND = False # - KEY_ENABLE = "enable" + KEY_STATE = "state" + KEY_BRIGHTNESS = "brightness" + KEY_COLOR_TEMP = "color_temp" KEY_TIMER = "timer" - KEY_LED_X = "led%d" + # + STATE_COMMANDS = ("get_state", "toggle_state", ) + BRIGHTNESS_COMMANDS = ("get_brightness", "set_brightness", ) + COLOR_TEMP_COMMANDS = ("get_color_temp", "set_color_temp", ) + TIMER_COMMANDS = ("get_timer", ) - def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False): - super().__init__(mqtt_client, topic, enable_state, enable_brightness, enable_color_temp) - self.add_callback(self.KEY_ENABLE, self.print_formatted, None) - self.add_callback(self.KEY_TIMER, self.print_formatted, None) - for i in range(0, 10): - self.add_callback(self.KEY_LED_X % i, self.print_formatted, None) - self.led_names = {} + def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, enable_timer=False): + super().__init__(mqtt_client, topic) + self.enable_state = enable_state + self.enable_brightness = enable_brightness + self.enable_color_temp = enable_color_temp + self.enable_timer = enable_timer # self.maxvalue = None - - def __init_data__(self, enable_state, enable_brightness, enable_color_temp): - data = {} - data[self.KEY_ENABLE] = False + # add commands to be available if enable_state: - data[self.KEY_STATE] = False + # init default value + self.data[self.KEY_STATE] = False + # add print callback + self.add_callback(self.KEY_STATE, self.print_formatted, None) + # add commands to be available + self.commands.extend(self.STATE_COMMANDS) if enable_brightness: - data[self.KEY_BRIGHTNESS] = 50 + # init default value + self.data[self.KEY_BRIGHTNESS] = 50 + # add print callback + self.add_callback(self.KEY_BRIGHTNESS, self.print_formatted, None) + # add commands to be available + self.commands.extend(self.BRIGHTNESS_COMMANDS) if enable_color_temp: - data[self.KEY_COLOR_TEMP] = 5 - data[self.KEY_TIMER] = '-' - for i in range(0, 10): - data[self.KEY_LED_X % i] = False - self.store_data(**data) + # init default value + self.data[self.KEY_COLOR_TEMP] = 5 + # add print callback + self.add_callback(self.KEY_COLOR_TEMP, self.print_formatted, None) + # add commands to be available + self.commands.extend(self.COLOR_TEMP_COMMANDS) + if enable_timer: + # init default value + self.data[self.KEY_TIMER] = 0 + # add print callback + self.add_callback(self.KEY_TIMER, self.print_formatted, None) + # add commands to be available + self.commands.extend(self.TIMER_COMMANDS) def __rx__(self, client, userdata, message): value = payload_filter(message.payload) - if message.topic.startswith(self.topic) and message.topic.endswith('/set'): - targetkey = message.topic.split('/')[-2] + if message.topic.startswith(self.topic): + targetkey = message.topic.split('/')[-1] if targetkey in self.data.keys(): self.store_data(**{targetkey: value}) - else: + elif targetkey != "__info__": print("Unknown key %s in %s::%s" % (targetkey, message.topic, self.__class__.__name__)) elif message.topic == self.topic + '/get': self.__tx__(None) @@ -566,26 +606,22 @@ class gui_light(tradfri_light): self.print_formatted(self, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif command == self.COLOR_TEMP_COMMANDS[1]: self.send(self.KEY_COLOR_TEMP, command_int_value(value)) + elif command == self.TIMER_COMMANDS[0]: + self.print_formatted(self, self.KEY_TIMER, self.data.get(self.KEY_TIMER)) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") - def add_led_name(self, key, name): - self.led_names[key] = name - def print_formatted(self, device, key, value): if value is not None: device = " - ".join(self.topic.split('/')[1:]) if key == self.KEY_STATE: print_switch(COLOR_GUI_ACTIVE, value, self.topic, "") - elif key == self.KEY_ENABLE: - self.print_formatted(device, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) - self.print_formatted(device, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: perc_value = round(value * 10 if key == self.KEY_COLOR_TEMP else value, 0) print_percent( - COLOR_GUI_PASSIVE if not self.data.get(self.KEY_ENABLE, False) else COLOR_GUI_ACTIVE, + COLOR_GUI_ACTIVE, 'B' if key == self.KEY_BRIGHTNESS else 'C', perc_value, "%3d%%" % perc_value, @@ -606,241 +642,191 @@ class gui_light(tradfri_light): perc = 0 self.maxvalue = None print_percent(COLOR_GUI_ACTIVE, 't', perc, '%3d%%' % perc, self.topic, '(%.1f)' % disp_value) - elif key.startswith(self.KEY_LED_X[:-2]): - print_light(COLOR_GUI_ACTIVE, value, self.topic, '(%s)' % self.led_names.get(key, key), True) -class tradfri_button(base): - KEY_ACTION = "action" - # - ACTION_TOGGLE = "toggle" - ACTION_BRIGHTNESS_UP = "brightness_up_click" - ACTION_BRIGHTNESS_DOWN = "brightness_down_click" - ACTION_RIGHT = "arrow_right_click" - ACTION_LEFT = "arrow_left_click" - ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" - ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" - ACTION_RIGHT_LONG = "arrow_right_hold" - ACTION_LEFT_LONG = "arrow_left_hold" - # - COMMANDS = [ACTION_TOGGLE, ACTION_LEFT, ACTION_RIGHT, ACTION_BRIGHTNESS_UP, ACTION_BRIGHTNESS_DOWN, - ACTION_LEFT_LONG, ACTION_RIGHT_LONG, ACTION_BRIGHTNESS_UP_LONG, ACTION_BRIGHTNESS_DOWN_LONG] +# class silvercrest_motion_sensor(base): +# KEY_OCCUPANCY = "occupancy" +# COMMANDS = ['motion'] - def __init__(self, mqtt_client, topic): - super().__init__(mqtt_client, topic) +# def __init__(self, mqtt_client, topic): +# super().__init__(mqtt_client, topic) +# self.data[self.KEY_OCCUPANCY] = False +# self.add_callback(self.KEY_OCCUPANCY, self.print_formatted, None) - def __rx__(self, client, userdata, message): - pass +# def __rx__(self, client, userdata, message): +# pass - def command(self, command): - try: - command, value = command.split(' ') - except ValueError: - value = None - else: - value = json.loads(value) - if command in self.capabilities(): - action = self.COMMANDS[self.COMMANDS.index(command)] - if self.COMMANDS.index(command) <= 4: - self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) - elif self.COMMANDS.index(command) <= 8: - self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) - time.sleep(value or 0.5) - action = '_'.join(action.split('_')[:-1] + ['release']) - self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) +# def command(self, command): +# try: +# command, value = command.split(' ') +# except ValueError: +# value = None +# else: +# value = json.loads(value) +# if command == self.COMMANDS[0]: +# self.store_data(**{self.KEY_OCCUPANCY: True}) +# time.sleep(value or 10) +# self.store_data(**{self.KEY_OCCUPANCY: False}) + +# def print_formatted(self, device, key, value): +# if value is not None: +# print_light(COLOR_MOTION_SENSOR, value, self.topic, "") -class gui_led_array(base): - AUTOSEND = False - # - KEY_LED_0 = "led0" - KEY_LED_1 = "led1" - KEY_LED_2 = "led2" - KEY_LED_3 = "led3" - KEY_LED_4 = "led4" - KEY_LED_5 = "led5" - KEY_LED_6 = "led6" - KEY_LED_7 = "led7" - KEY_LED_8 = "led8" - KEY_LED_9 = "led9" +# class tradfri_button(base): +# KEY_ACTION = "action" +# # +# ACTION_TOGGLE = "toggle" +# ACTION_BRIGHTNESS_UP = "brightness_up_click" +# ACTION_BRIGHTNESS_DOWN = "brightness_down_click" +# ACTION_RIGHT = "arrow_right_click" +# ACTION_LEFT = "arrow_left_click" +# ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" +# ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" +# ACTION_RIGHT_LONG = "arrow_right_hold" +# ACTION_LEFT_LONG = "arrow_left_hold" +# # +# COMMANDS = [ACTION_TOGGLE, ACTION_LEFT, ACTION_RIGHT, ACTION_BRIGHTNESS_UP, ACTION_BRIGHTNESS_DOWN, +# ACTION_LEFT_LONG, ACTION_RIGHT_LONG, ACTION_BRIGHTNESS_UP_LONG, ACTION_BRIGHTNESS_DOWN_LONG] - def __init__(self, mqtt_client, topic, ): - super().__init__(mqtt_client, topic) - for i in range(0, 10): - key = getattr(self, "KEY_LED_%d" % i) - self.data[key] = False - self.add_callback(key, self.print_formatted, None) +# def __init__(self, mqtt_client, topic): +# super().__init__(mqtt_client, topic) - def __rx__(self, client, userdata, message): - value = payload_filter(message.payload) - if message.topic.startswith(self.topic) and message.topic.endswith('/set'): - targetkey = message.topic.split('/')[-2] - if targetkey in self.data.keys(): - self.store_data(**{targetkey: value}) - else: - print("Unknown key %s in %s" % (targetkey, self.__class__.__name__)) +# def __rx__(self, client, userdata, message): +# pass - def print_formatted(self, device, key, value): - print_light(COLOR_GUI_ACTIVE, value, self.topic, '(%s)' % self.names.get(key, key), True) +# def command(self, command): +# try: +# command, value = command.split(' ') +# except ValueError: +# value = None +# else: +# value = json.loads(value) +# if command in self.capabilities(): +# action = self.COMMANDS[self.COMMANDS.index(command)] +# if self.COMMANDS.index(command) <= 4: +# self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) +# elif self.COMMANDS.index(command) <= 8: +# self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) +# time.sleep(value or 0.5) +# action = '_'.join(action.split('_')[:-1] + ['release']) +# self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) -class remote(base): - def __rx__(self, client, userdata, message): - if message.topic == self.topic + "/VOLUP": - if payload_filter(message.payload): - icon = u'\u1403' - else: - icon = u'\u25a1' - elif message.topic == self.topic + "/VOLDOWN": - if payload_filter(message.payload): - icon = u'\u1401' - else: - icon = u'\u25a1' - else: - return - devicename = ' - '.join(self.topic.split('/')[1:-1]) - print(COLOR_REMOTE + 10 * ' ' + icon + 6 * ' ' + devicename + colored.attr("reset")) +# class remote(base): +# def __rx__(self, client, userdata, message): +# if message.topic == self.topic + "/VOLUP": +# if payload_filter(message.payload): +# icon = u'\u1403' +# else: +# icon = u'\u25a1' +# elif message.topic == self.topic + "/VOLDOWN": +# if payload_filter(message.payload): +# icon = u'\u1401' +# else: +# icon = u'\u25a1' +# else: +# return +# devicename = ' - '.join(self.topic.split('/')[1:-1]) +# print(COLOR_REMOTE + 10 * ' ' + icon + 6 * ' ' + devicename + colored.attr("reset")) -class brennenstuhl_heating_valve(base): - TEMP_RANGE = [10, 30] - # - KEY_TEMPERATURE_SETPOINT = "current_heating_setpoint" - KEY_TEMPERATURE = "local_temperature" - # - COMMANDS = [ - "get_temperature_setpoint", "set_temperature_setpoint", - ] +# class gui_heating_valve(base): +# AUTOSEND = False +# # +# TEMP_RANGE = [10, 30] +# # +# KEY_TIMER = "timer" +# KEY_TEMPERATURE = "temperature" +# KEY_SETPOINT_TEMP = "setpoint_temp" +# KEY_SETPOINT_TO_DEFAULT = "setpoint_to_default" +# KEY_BOOST = 'boost' +# KEY_AWAY = "away" +# KEY_SUMMER = "summer" +# KEY_ENABLE = "enable" +# # +# COMMANDS = [ +# "get_temperature", +# "get_temperature_setpoint", "set_temperature_setpoint", +# "trigger_boost", "trigger_setpoint_to_default", +# "toggle_away", "toggle_summer", +# ] - def __init__(self, mqtt_client, topic): - super().__init__(mqtt_client, topic) - self.store_data(**{ - self.KEY_TEMPERATURE_SETPOINT: 20, - self.KEY_TEMPERATURE: 20.7, - }) - self.add_callback(self.KEY_TEMPERATURE_SETPOINT, self.print_formatted, None) +# def __init__(self, mqtt_client, topic): +# super().__init__(mqtt_client, topic) +# self.add_callback(self.KEY_SETPOINT_TEMP, self.print_formatted, None) +# self.add_callback(self.KEY_TIMER, self.print_formatted, None) +# self.add_callback(self.KEY_AWAY, self.print_formatted, None) +# self.add_callback(self.KEY_SUMMER, self.print_formatted, None) +# # +# self.store_data(**{ +# self.KEY_TEMPERATURE: 20.7, +# self.KEY_SETPOINT_TEMP: 20, +# self.KEY_TIMER: 0, +# self.KEY_AWAY: False, +# self.KEY_SUMMER: False, +# self.KEY_ENABLE: True +# }) - def __rx__(self, client, userdata, message): - if message.topic.startswith(self.topic) and message.topic.endswith("/set"): - payload = payload_filter(message.payload) - self.store_data(**payload) +# def __rx__(self, client, userdata, message): +# value = payload_filter(message.payload) +# if message.topic.startswith(self.topic) and message.topic.endswith('/set'): +# targetkey = message.topic.split('/')[-2] +# if targetkey in self.data.keys(): +# self.store_data(**{targetkey: value}) +# else: +# print("Unknown key %s in %s::%s" % (targetkey, message.topic, self.__class__.__name__)) +# elif message.topic == self.topic + '/get': +# self.__tx__(None) - def command(self, command): - try: - command, value = command.split(' ') - except ValueError: - value = None - if command in self.COMMANDS: - if command == self.COMMANDS[0]: - self.print_formatted(self, self.KEY_TEMPERATURE_SETPOINT, self.data.get(self.KEY_TEMPERATURE_SETPOINT)) - elif command == self.COMMANDS[1]: - self.store_data(**{self.KEY_TEMPERATURE_SETPOINT: command_float_value(value)}) +# def send(self, key, data): +# if data is not None: +# topic = self.topic + '/' + key +# self.mqtt_client.send(topic, json.dumps(data)) - def print_formatted(self, device, key, value): - devicename = ' - '.join(self.topic.split('/')[1:]) - if key == self.KEY_TEMPERATURE_SETPOINT: - perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) - perc = 100 if perc > 100 else perc - perc = 0 if perc < 0 else perc - print_percent(COLOR_HEATING_VALVE, '\u03d1', perc, "%4.1f°C" % value, self.topic, "") +# def command(self, command): +# try: +# command, value = command.split(' ') +# except ValueError: +# value = None +# if command in self.COMMANDS: +# if command == self.COMMANDS[0]: +# self.print_formatted(self, self.KEY_TEMPERATURE, self.data.get(self.KEY_TEMPERATURE)) +# elif command == self.COMMANDS[1]: +# self.print_formatted(self, self.KEY_SETPOINT_TEMP, self.data.get(self.KEY_SETPOINT_TEMP)) +# elif command == self.COMMANDS[2]: +# self.send(self.KEY_SETPOINT_TEMP, command_float_value(value)) +# elif command == self.COMMANDS[3]: +# self.send(self.KEY_BOOST, True) +# elif command == self.COMMANDS[4]: +# self.send(self.KEY_SETPOINT_TO_DEFAULT, True) +# elif command == self.COMMANDS[5]: +# self.send(self.KEY_AWAY, not self.data.get(self.KEY_AWAY)) +# elif command == self.COMMANDS[6]: +# self.send(self.KEY_SUMMER, not self.data.get(self.KEY_SUMMER)) - -class gui_heating_valve(base): - AUTOSEND = False - # - TEMP_RANGE = [10, 30] - # - KEY_TIMER = "timer" - KEY_TEMPERATURE = "temperature" - KEY_SETPOINT_TEMP = "setpoint_temp" - KEY_SETPOINT_TO_DEFAULT = "setpoint_to_default" - KEY_BOOST = 'boost' - KEY_AWAY = "away" - KEY_SUMMER = "summer" - KEY_ENABLE = "enable" - # - COMMANDS = [ - "get_temperature", - "get_temperature_setpoint", "set_temperature_setpoint", - "trigger_boost", "trigger_setpoint_to_default", - "toggle_away", "toggle_summer", - ] - - def __init__(self, mqtt_client, topic): - super().__init__(mqtt_client, topic) - self.add_callback(self.KEY_SETPOINT_TEMP, self.print_formatted, None) - self.add_callback(self.KEY_TIMER, self.print_formatted, None) - self.add_callback(self.KEY_AWAY, self.print_formatted, None) - self.add_callback(self.KEY_SUMMER, self.print_formatted, None) - # - self.store_data(**{ - self.KEY_TEMPERATURE: 20.7, - self.KEY_SETPOINT_TEMP: 20, - self.KEY_TIMER: 0, - self.KEY_AWAY: False, - self.KEY_SUMMER: False, - self.KEY_ENABLE: True - }) - - def __rx__(self, client, userdata, message): - value = payload_filter(message.payload) - if message.topic.startswith(self.topic) and message.topic.endswith('/set'): - targetkey = message.topic.split('/')[-2] - if targetkey in self.data.keys(): - self.store_data(**{targetkey: value}) - else: - print("Unknown key %s in %s::%s" % (targetkey, message.topic, self.__class__.__name__)) - elif message.topic == self.topic + '/get': - self.__tx__(None) - - def send(self, key, data): - if data is not None: - topic = self.topic + '/' + key - self.mqtt_client.send(topic, json.dumps(data)) - - def command(self, command): - try: - command, value = command.split(' ') - except ValueError: - value = None - if command in self.COMMANDS: - if command == self.COMMANDS[0]: - self.print_formatted(self, self.KEY_TEMPERATURE, self.data.get(self.KEY_TEMPERATURE)) - elif command == self.COMMANDS[1]: - self.print_formatted(self, self.KEY_SETPOINT_TEMP, self.data.get(self.KEY_SETPOINT_TEMP)) - elif command == self.COMMANDS[2]: - self.send(self.KEY_SETPOINT_TEMP, command_float_value(value)) - elif command == self.COMMANDS[3]: - self.send(self.KEY_BOOST, True) - elif command == self.COMMANDS[4]: - self.send(self.KEY_SETPOINT_TO_DEFAULT, True) - elif command == self.COMMANDS[5]: - self.send(self.KEY_AWAY, not self.data.get(self.KEY_AWAY)) - elif command == self.COMMANDS[6]: - self.send(self.KEY_SUMMER, not self.data.get(self.KEY_SUMMER)) - - def print_formatted(self, device, key, value): - devicename = ' - '.join(self.topic.split('/')[1:]) - if key == self.KEY_TIMER: - value /= 60 - try: - perc = 100 * value / 60 - except TypeError: - value = 0 - perc = 0 - print_percent(COLOR_GUI_ACTIVE, 't', perc, "%4.1fmin" % value, self.topic, "(Timer)") - elif key == self.KEY_TEMPERATURE: - perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) - perc = 100 if perc > 100 else perc - perc = 0 if perc < 0 else perc - print_percent(COLOR_GUI_ACTIVE, '\u03d1', perc, "%4.1f°C" % value, self.topic, "(Temperature)") - elif key == self.KEY_SETPOINT_TEMP: - perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) - perc = 100 if perc > 100 else perc - perc = 0 if perc < 0 else perc - print_percent(COLOR_GUI_ACTIVE if self.data.get(self.KEY_ENABLE) else COLOR_GUI_PASSIVE, - '\u03d1', perc, "%4.1f°C" % value, self.topic, "(Setpoint)") - elif key == self.KEY_AWAY: - print_switch(COLOR_GUI_ACTIVE, value, self.topic, "(Away Mode)") - elif key == self.KEY_SUMMER: - print_switch(COLOR_GUI_ACTIVE, value, self.topic, "(Summer Mode)") +# def print_formatted(self, device, key, value): +# devicename = ' - '.join(self.topic.split('/')[1:]) +# if key == self.KEY_TIMER: +# value /= 60 +# try: +# perc = 100 * value / 60 +# except TypeError: +# value = 0 +# perc = 0 +# print_percent(COLOR_GUI_ACTIVE, 't', perc, "%4.1fmin" % value, self.topic, "(Timer)") +# elif key == self.KEY_TEMPERATURE: +# perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) +# perc = 100 if perc > 100 else perc +# perc = 0 if perc < 0 else perc +# print_percent(COLOR_GUI_ACTIVE, '\u03d1', perc, "%4.1f°C" % value, self.topic, "(Temperature)") +# elif key == self.KEY_SETPOINT_TEMP: +# perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) +# perc = 100 if perc > 100 else perc +# perc = 0 if perc < 0 else perc +# print_percent(COLOR_GUI_ACTIVE if self.data.get(self.KEY_ENABLE) else COLOR_GUI_PASSIVE, +# '\u03d1', perc, "%4.1f°C" % value, self.topic, "(Setpoint)") +# elif key == self.KEY_AWAY: +# print_switch(COLOR_GUI_ACTIVE, value, self.topic, "(Away Mode)") +# elif key == self.KEY_SUMMER: +# print_switch(COLOR_GUI_ACTIVE, value, self.topic, "(Summer Mode)") diff --git a/__simulation__/rooms.py b/__simulation__/rooms.py index a0bc66a..1663fc9 100644 --- a/__simulation__/rooms.py +++ b/__simulation__/rooms.py @@ -1,6 +1,6 @@ import config -from __simulation__.devices import shelly, silvercrest_powerplug, tradfri_light, tradfri_button, silvercrest_motion_sensor, my_powerplug, remote, brennenstuhl_heating_valve -from __simulation__.devices import gui_light, gui_led_array, gui_heating_valve +from __simulation__.devices import shelly, silvercrest_powerplug, tradfri_light, my_powerplug, brennenstuhl_heating_valve +from __simulation__.devices import videv_light import inspect @@ -40,58 +40,58 @@ class base(object): class gfw_floor(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_GUI, True, True, True) self.main_light = shelly(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") - self.main_light_zigbee_1 = tradfri_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_ZIGBEE % 1, True, True, True, False) - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee_1.power_on, "on") - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee_1.power_off, "off") + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_ZIGBEE % 1, True, True, True, False) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, False) self.main_light_zigbee_2 = tradfri_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_ZIGBEE % 2, True, True, True, False) - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee_2.power_on, "on") - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee_1.power_off, "off") + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee_2.power_on, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee_2.power_off, False) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_VIDEV, True, True, True) class gfw_marion(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_GFW_MARION_MAIN_LIGHT_GUI, True, False, False) self.main_light = shelly(mqtt_client, config.TOPIC_GFW_MARION_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + self.heating_valve = brennenstuhl_heating_valve(mqtt_client, config.TOPIC_GFW_MARION_HEATING_VALVE_ZIGBEE) - self.gui_heating_valve = gui_heating_valve(mqtt_client, config.TOPIC_GFW_MARION_HEATING_VALVE_GUI) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_GFW_MARION_MAIN_LIGHT_VIDEV, True, False, False) class gfw_dirk(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_MAIN_LIGHT_GUI, True, True, True) self.main_light = shelly(mqtt_client, config.TOPIC_GFW_DIRK_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_GFW_DIRK_MAIN_LIGHT_ZIGBEE, True, True, True) - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, "on") - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, "off") - # - self.powerplug = my_powerplug(mqtt_client, config.TOPIC_GFW_DIRK_POWERPLUG) - self.powerplug.add_channel_name(my_powerplug.KEY_OUTPUT_0, "Amplifier") - self.powerplug.add_channel_name(my_powerplug.KEY_OUTPUT_1, "Desk_Light") - self.powerplug.add_channel_name(my_powerplug.KEY_OUTPUT_2, "CD_Player") - self.powerplug.add_channel_name(my_powerplug.KEY_OUTPUT_3, "PC_Dock") - self.gui_amplifier = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_AMPLIFIER_GUI, True, False, False) - self.gui_desk_light = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_DESK_LIGHT_GUI, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, False) + + self.amplifier = my_powerplug(mqtt_client, config.TOPIC_GFW_DIRK_POWERPLUG, 0) + self.amplifier.add_channel_name(my_powerplug.KEY_OUTPUT_0, "Amplifier") + self.desk_light = my_powerplug(mqtt_client, config.TOPIC_GFW_DIRK_POWERPLUG, 1) + self.desk_light.add_channel_name(my_powerplug.KEY_OUTPUT_0, "Desk Light") + self.cd_player = my_powerplug(mqtt_client, config.TOPIC_GFW_DIRK_POWERPLUG, 2) + self.cd_player.add_channel_name(my_powerplug.KEY_OUTPUT_0, "CD_Player") + self.pc_dock = my_powerplug(mqtt_client, config.TOPIC_GFW_DIRK_POWERPLUG, 3) + self.pc_dock.add_channel_name(my_powerplug.KEY_OUTPUT_0, "PC_Dock") self.desk_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_GFW_DIRK_DESK_LIGHT_ZIGBEE, True, True, True) - self.powerplug.add_callback(my_powerplug.KEY_OUTPUT_1, self.desk_light_zigbee.power_on, True) - self.powerplug.add_callback(my_powerplug.KEY_OUTPUT_1, self.desk_light_zigbee.power_off, False) - self.gui_cd_player = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_CD_PLAYER_GUI, True, False, False) - self.gui_pc_dock = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_PC_DOCK_GUI, True, False, False) - # - self.remote = remote(mqtt_client, config.TOPIC_GFW_DIRK_AMPLIFIER_REMOTE) - # - self.input_device = tradfri_button(mqtt_client, config.TOPIC_GFW_DIRK_INPUT_DEVICE) - self.led_array = gui_led_array(mqtt_client, config.TOPIC_GFW_DIRK_DEVICE_CHOOSER_LED) - self.led_array.add_channel_name(gui_led_array.KEY_LED_0, "Main Light") - self.led_array.add_channel_name(gui_led_array.KEY_LED_1, "Desk Light") - self.led_array.add_channel_name(gui_led_array.KEY_LED_2, "Amplifier") - # + self.desk_light.add_callback(my_powerplug.KEY_OUTPUT_0, self.desk_light_zigbee.power_on, True) + self.desk_light.add_callback(my_powerplug.KEY_OUTPUT_0, self.desk_light_zigbee.power_off, False) + self.heating_valve = brennenstuhl_heating_valve(mqtt_client, config.TOPIC_GFW_DIRK_HEATING_VALVE_ZIGBEE) - self.gui_heating_valve = gui_heating_valve(mqtt_client, config.TOPIC_GFW_DIRK_HEATING_VALVE_GUI) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_GFW_DIRK_MAIN_LIGHT_VIDEV, True, True, True) + self.videv_amplifier = videv_light(mqtt_client, config.TOPIC_GFW_DIRK_AMPLIFIER_VIDEV, True, False, False) + self.videv_desk_light = videv_light(mqtt_client, config.TOPIC_GFW_DIRK_DESK_LIGHT_VIDEV, True, True, True) + self.videv_cd_player = videv_light(mqtt_client, config.TOPIC_GFW_DIRK_CD_PLAYER_VIDEV, True, False, False) + self.videv_pc_dock = videv_light(mqtt_client, config.TOPIC_GFW_DIRK_PC_DOCK_VIDEV, True, False, False) class gfw(base): @@ -103,32 +103,43 @@ class gfw(base): class ffw_julian(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_GUI, True, True, True) self.main_light = shelly(mqtt_client, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_ZIGBEE, True, True, True) - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, "on") - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, "off") + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, False) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_VIDEV, True, True, True) class ffw_livingroom(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_FFW_LIVINGROOM_MAIN_LIGHT_GUI, True, True, True) self.main_light = shelly(mqtt_client, config.TOPIC_FFW_LIVINGROOM_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFW_LIVINGROOM_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, False) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFW_LIVINGROOM_MAIN_LIGHT_VIDEV, True, True, True) class ffw_sleep(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_FFW_SLEEP_MAIN_LIGHT_GUI, True, True, False) self.main_light = shelly(mqtt_client, config.TOPIC_FFW_SLEEP_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFW_SLEEP_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, False) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFW_SLEEP_MAIN_LIGHT_VIDEV, True, True, False) class ffw_bath(base): def __init__(self, mqtt_client): self.heating_valve = brennenstuhl_heating_valve(mqtt_client, config.TOPIC_FFW_BATH_HEATING_VALVE_ZIGBEE) - self.gui_heating_valve = gui_heating_valve(mqtt_client, config.TOPIC_FFW_BATH_HEATING_VALVE_GUI) class ffw(base): @@ -141,77 +152,90 @@ class ffw(base): class ffe_floor(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_FFE_FLOOR_MAIN_LIGHT_GUI, True, False, False) self.main_light = shelly(mqtt_client, config.TOPIC_FFE_FLOOR_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFE_FLOOR_MAIN_LIGHT_VIDEV, True, False, False) + class ffe_kitchen(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_FFE_KITCHEN_MAIN_LIGHT_GUI, True, False, False) self.main_light = shelly(mqtt_client, config.TOPIC_FFE_KITCHEN_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") - # - self.gui_circulation_pump = gui_light(mqtt_client, config.TOPIC_FFE_KITCHEN_CIRCULATION_PUMP_GUI, True, False, False) + self.circulation_pump = shelly(mqtt_client, config.TOPIC_FFE_KITCHEN_CIRCULATION_PUMP_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER, output_0_auto_off=10*60) self.circulation_pump.add_channel_name(shelly.KEY_OUTPUT_0, "Circulation Pump") + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFE_KITCHEN_MAIN_LIGHT_VIDEV, True, False, False) + self.videv_circulation_pump = videv_light(mqtt_client, config.TOPIC_FFE_KITCHEN_CIRCULATION_PUMP_VIDEV, True, False, False, True) + class ffe_diningroom(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_FFE_DININGROOM_MAIN_LIGHT_GUI, True, False, False) self.main_light = shelly(mqtt_client, config.TOPIC_FFE_DININGROOM_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") - self.gui_floor_lamp = gui_light(mqtt_client, config.TOPIC_FFE_DININGROOM_FLOOR_LAMP_GUI, True, False, False) + self.floor_lamp = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_DININGROOM_FLOOR_LAMP_POWERPLUG) self.floor_lamp.add_channel_name(silvercrest_powerplug.KEY_OUTPUT_0, "Floor Lamp") + if config.CHRISTMAS: self.garland = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_DININGROOM_GARLAND_POWERPLUG) self.garland.add_channel_name(silvercrest_powerplug, "Garland") + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFE_DININGROOM_MAIN_LIGHT_VIDEV, True, False, False) + self.videv_floor_lamp = videv_light(mqtt_client, config.TOPIC_FFE_DININGROOM_FLOOR_LAMP_VIDEV, True, False, False) + if config.CHRISTMAS: + self.videv_garland = videv_light(mqtt_client, config.TOPIC_FFE_DININGROOM_GARLAND_VIDEV, True, False, False) + class ffe_sleep(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_GUI, True, True, True) self.main_light = shelly(mqtt_client, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_ZIGBEE, True, True, True) - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, "on") - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, "off") - # - self.gui_bed_light_di = gui_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_DI_GUI, True, True, False) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, False) + self.bed_light_di_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_DI_ZIGBEE, True, True, False) - self.gui_bed_light_ma = gui_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_MA_GUI, True, False, False) - self.bed_light_ma_powerplug = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_MA_POWERPLUG) - # - self.input_device = tradfri_button(mqtt_client, config.TOPIC_FFE_SLEEP_INPUT_DEVICE) - self.led_array = gui_led_array(mqtt_client, config.TOPIC_FFE_SLEEP_DEVICE_CHOOSER_LED) - self.led_array.add_channel_name(gui_led_array.KEY_LED_0, "Main Light") - self.led_array.add_channel_name(gui_led_array.KEY_LED_1, "Bed Light Dirk") - # + self.bed_light_ma = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_MA_POWERPLUG) + self.heating_valve = brennenstuhl_heating_valve(mqtt_client, config.TOPIC_FFE_SLEEP_HEATING_VALVE_ZIGBEE) - self.gui_heating_valve = gui_heating_valve(mqtt_client, config.TOPIC_FFE_SLEEP_HEATING_VALVE_GUI) + + # + self.videv_bed_light_ma = videv_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_MA_VIDEV, True, False, False) + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_VIDEV, True, True, True) + self.videv_bed_light_di = videv_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_DI_VIDEV, True, True, False) + self.videv_bed_light_ma = videv_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_MA_VIDEV, True, False, False) class ffe_livingroom(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_GUI, True, True, True) self.main_light = shelly(mqtt_client, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_ZIGBEE, True, True, True) - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, "on") - self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, "off") - for i in range(1, 7): + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, False) + + self.floor_lamp_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_FLOOR_LAMP_ZIGBEE % 1, True, True, True) + for i in range(2, 7): setattr(self, "floor_lamp_zigbee_%d" % i, tradfri_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_FLOOR_LAMP_ZIGBEE % i, True, True, True)) - self.gui_floor_lamp = gui_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_FLOOR_LAMP_GUI, True, True, True) + if config.CHRISTMAS: self.xmas_tree = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_LIVINGROOM_XMAS_TREE_POWERPLUG) self.xmas_tree.add_channel_name(silvercrest_powerplug, "Xmas-Tree") - self.gui_xmas_tree = gui_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_XMAS_TREE_GUI) self.xmas_star = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_LIVINGROOM_XMAS_STAR_POWERPLUG) self.xmas_star.add_channel_name(silvercrest_powerplug, "Xmas-Star") + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_VIDEV, True, True, True) + self.videv_floor_lamp = videv_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_FLOOR_LAMP_VIDEV, True, True, True) + if config.CHRISTMAS: + self.videv_xmas_tree = videv_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_XMAS_TREE_VIDEV) + class ffe(base): def __init__(self, mqtt_client): @@ -224,13 +248,11 @@ class ffe(base): class stairway(base): def __init__(self, mqtt_client): - self.gui_main_light = gui_light(mqtt_client, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_GUI, True, False, False) - self.gui_main_light.add_led_name(self.gui_main_light.KEY_LED_X % 0, "Motion Ground Floor") - self.gui_main_light.add_led_name(self.gui_main_light.KEY_LED_X % 1, "Motion First Floor") self.main_light = shelly(mqtt_client, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_SHELLY, input_0_func=shelly.INPUT_FUNC_OUT1_TRIGGER) self.main_light.add_channel_name(shelly.KEY_OUTPUT_0, "Main Light") - self.motion_sensor_gf = silvercrest_motion_sensor(mqtt_client, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_MOTION_SENSOR_GF) - self.motion_sensor_ff = silvercrest_motion_sensor(mqtt_client, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_MOTION_SENSOR_FF) + + # + self.videv_main_light = videv_light(mqtt_client, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_VIDEV, True, False, False, True) class house(base): diff --git a/__simulation__/test.py b/__simulation__/test.py index 8fea8dc..1350fb8 100644 --- a/__simulation__/test.py +++ b/__simulation__/test.py @@ -1,232 +1,249 @@ import colored +import inspect +from __simulation__ import devices import time -DT_TOGGLE = 0.1 +DT_TOGGLE = 0.3 + + +TEST_FULL = 'full' +TEST_SMOKE = 'smoke' +# +COLOR_SUCCESS = colored.fg("light_green") +COLOR_FAIL = colored.fg("light_red") class test_smarthome(object): - def __init__(self, house): - self.house = house - - def __smoke__(self): - result = "" - result += "Smoke-Test\n" - result += " GUI Element exists for every shelly instance\n" - result += self.__gui_element_exists__("shelly") - result += " On-Off test for every shelly instance\n" - result += self.__on_off_test__("shelly") - result += " GUI Element exists for every silvercrest_powerplug instance\n" - result += self.__gui_element_exists__("silvercrest_powerplug") - result += " On-Off test for every silvercrest_powerplug instance\n" - result += self.__on_off_test__("silvercrest_powerplug") - result += " GUI Element exists for every my_powerplug instance and port\n" - result += self.__gui_element_exists_my_powerplug__() - result += " On-Off test for every my_powerplug instance\n" - result += self.__on_off_test_my_powerplug__() - result += " GUI Element exists for every tradfri_light instance\n" - result += self.__gui_element_exists__("tradfri_light") - result += " Enable and disable test for gui elements corresponding with tradfri_light\n" - result += self.__br_ct_enable_test__() - result += " Chnage brightness and color_temp by gui test\n" - result += self.__br_ct_change_test__() - return result - - def __full__(self): - result = "Full-Test" - return result - - def smoke(self): - print(self.__smoke__()) - - def full(self): - out = self.__smoke__() - out += self.__full__() - print(out) - - def print_error(self, text, lvl=2): - return lvl*" " + colored.fg("light_red") + '* ' + text + "\n" + colored.attr("reset") - - def print_success(self, text, lvl=2): - return lvl*" " + colored.fg("light_green") + '* ' + text + "\n" + colored.attr("reset") - - def __gui_element_exists__(self, obj_name): - result = "" - for member in self.house.getmembers(): - obj = self.house.getobjbyname(member) - if obj.__class__.__name__ == obj_name: - if obj_name == "tradfri_light": - basename = member[:member.index('_zigbee')] - else: - basename = member + def __init__(self, rooms): + # add testcases by room objects + for name in rooms.getmembers(): + obj = rooms.getobjbyname(name) + if obj.__class__.__name__ == "videv_light": + common_name = '.'.join(name.split('.')[:-1]) + '.' + name.split('.')[-1][6:] + li_device = rooms.getobjbyname(common_name + '_zigbee') if obj.enable_brightness or obj.enable_color_temp else None try: - gui = self.house.getobjbyname('.'.join(basename.split('.')[:-1]) + '.gui_' + basename.split('.')[-1]) + sw_device = rooms.getobjbyname(common_name) if obj.enable_state else None except AttributeError: - result += self.print_error("No GUI element available, while testing %s (%s)." % (member, obj_name)) - else: - result += self.print_success("GUI element available, while testing %s (%s)." % (member, obj_name)) - return result + # must be a device without switching device + sw_device = li_device + setattr(self, common_name.replace('.', '_'), testcase_light(obj, sw_device, li_device)) + # add test collection + self.all = test_collection(self) - def __gui_element_exists_my_powerplug__(self): - result = "" - for member in self.house.getmembers(): - obj = self.house.getobjbyname(member) - if obj.__class__.__name__ == "my_powerplug": - for channel in [obj.KEY_OUTPUT_0, obj.KEY_OUTPUT_1, obj.KEY_OUTPUT_2, obj.KEY_OUTPUT_3]: - try: - gui = self.house.getobjbyname('.'.join(member.split( - '.')[:-1]) + '.gui_' + obj.names.get(channel.lower(), "__dummy__").lower()) - except AttributeError: - result += self.print_error("No GUI element available, while testing %s (%s)." % (member, obj.names.get(channel))) - else: - result += self.print_success("GUI element available, while testing %s (%s)." % (member, obj.names.get(channel))) - return result - - def __on_off_test_my_powerplug__(self): - result = "" - for member in self.house.getmembers(): - obj = self.house.getobjbyname(member) - if obj.__class__.__name__ == "my_powerplug": - for channel in [obj.KEY_OUTPUT_0, obj.KEY_OUTPUT_1, obj.KEY_OUTPUT_2, obj.KEY_OUTPUT_3]: - try: - gui = self.house.getobjbyname('.'.join(member.split( - '.')[:-1]) + '.gui_' + obj.names.get(channel.lower(), "__dummy__").lower()) - except AttributeError: - raise AttributeError - pass # exists test already covers non existing gui-elements - else: - success = True - # Initial state equal between obj and gui - obj_state = obj.data.get(channel) - gui_state = gui.data.get(gui.KEY_STATE) - if obj_state != gui_state: - result += self.print_error("Initial state of %s (%s) is not equal to GUI state (%s), while testing %s (%s)" % - ("my_powerplug", obj_state, gui_state, member, obj.names.get(channel))) - success = False - # state obj change results in state change of obj and gui - for i in range(1, 3): - gui.command("toggle_state") - time.sleep(2 * DT_TOGGLE) - last_obj_state = obj_state - obj_state = obj.data.get(channel) - gui_state = gui.data.get(gui.KEY_STATE) - if last_obj_state == obj_state: - result += self.print_error("State after %d. toggle of gui state: State unchanged (%s), while testing %s (%s)" % - (i, obj_state, member, obj.names.get(channel))) - success = False - if obj_state != gui_state: - result += self.print_error("State after %d. toggle of gui state:: State of device (%s) is not equal to GUI state (%s), while testing %s (%s)" % - (i, obj_state, gui_state, member, obj.names.get(channel))) - success = False - # - if success: - result += self.print_success("On-Off test successfull, while testing %s (%s)." % (member, obj.names.get(channel))) - return result - - def __on_off_test__(self, obj_name): - result = "" - for member in self.house.getmembers(): - obj = self.house.getobjbyname(member) - if obj.__class__.__name__ == obj_name: + def getmembers(self, prefix=''): + rv = [] + for name, obj in inspect.getmembers(self): + if prefix: + full_name = prefix + '.' + name + else: + full_name = name + if not name.startswith('_'): try: - gui = self.house.getobjbyname('.'.join(member.split('.')[:-1]) + '.gui_' + member.split('.')[-1]) + if obj.__class__.__bases__[0].__name__ == "testcase" or obj.__class__.__name__ == "test_collection": + rv.append(full_name) + else: + rv.extend(obj.getmembers(full_name)) except AttributeError: - pass # exists test already covers non existing gui-elements - else: - success = True - # Initial state equal between obj and gui - obj_state = obj.data.get(obj.KEY_OUTPUT_0).lower() == "on" - gui_state = gui.data.get(gui.KEY_STATE) - if obj_state != gui_state: - result += self.print_error("Initial state of %s (%s) is not equal to GUI state (%s), while testing %s (%s)" % - (obj_name, obj_state, gui_state, member, obj_name)) - success = False - # state obj change results in state change of obj and gui - for i in range(1, 3): - gui.command("toggle_state") - time.sleep(2 * DT_TOGGLE) - last_obj_state = obj_state - obj_state = obj.data.get(obj.KEY_OUTPUT_0).lower() == "on" - gui_state = gui.data.get(gui.KEY_STATE) - if last_obj_state == obj_state: - result += self.print_error("State after %d. toggle of gui state: State unchanged (%s), while testing %s (%s)" % - (i, obj_state, member, obj_name)) - success = False - if obj_state != gui_state: - result += self.print_error("State after %d. toggle of gui state:: State of device (%s) is not equal to GUI state (%s), while testing %s (%s)" % - (i, obj_state, gui_state, member, obj_name)) - success = False - # - if success: - result += self.print_success("On-Off test successfull, while testing %s." % (member)) - return result + pass + return rv - def __br_ct_enable_test__(self): - result = "" - for member in self.house.getmembers(): - obj = self.house.getobjbyname(member) - if obj.__class__.__name__ == "tradfri_light": - basename = member[:member.index('_zigbee')] - gui = self.house.getobjbyname('.'.join(basename.split('.')[:-1]) + '.gui_' + basename.split('.')[-1]) - success = True - # - if gui.data.get(gui.KEY_ENABLE) != False: - result += self.print_error("Inital enable state is not False, while testing %s." % (member)) - success = False - # - gui.command("toggle_state") - time.sleep(2 * DT_TOGGLE) - if gui.data.get(gui.KEY_ENABLE) != True: - result += self.print_error("Enable state is not True after switching on, while testing %s." % (member)) - success = False - # - gui.command("toggle_state") - time.sleep(2 * DT_TOGGLE) - if gui.data.get(gui.KEY_ENABLE) != False: - result += self.print_error("Enable state is not False after switching off, while testing %s." % (member)) - success = False - # - if success: - result += self.print_success("Enable-Disable test successfull, while testing %s." % (member)) - return result + def getobjbyname(self, name): + if name.startswith("test."): + name = name[5:] + obj = self + for subname in name.split('.'): + obj = getattr(obj, subname) + return obj - def __br_ct_change_test__(self): - result = "" - for member in self.house.getmembers(): - obj = self.house.getobjbyname(member) - if obj.__class__.__name__ == "tradfri_light": - basename = member[:member.index('_zigbee')] - gui = self.house.getobjbyname('.'.join(basename.split('.')[:-1]) + '.gui_' + basename.split('.')[-1]) - success = True - # - if gui.data.get(gui.KEY_STATE) != True: - gui.command("toggle_state") - time.sleep(2 * DT_TOGGLE) - if gui.data.get(gui.KEY_STATE) != True: - result += self.print_error("Unable to switch on light, while testing %s." % (member)) - success = False - continue - # - if "set_brightness" in obj.capabilities(): - brightness = gui.data.get(obj.KEY_BRIGHTNESS) - targetvalue = brightness + (25 if brightness <= 50 else -25) - gui.command("set_brightness %d" % targetvalue) - time.sleep(2 * DT_TOGGLE) - if gui.data.get(obj.KEY_BRIGHTNESS) != targetvalue: - result += self.print_error("Brightness change by gui was not successfull, while testing %s." % (member)) - success = False - if "set_color_temp" in obj.capabilities(): - color_temp = gui.data.get(obj.KEY_COLOR_TEMP) - targetvalue = color_temp + (3 if color_temp <= 5 else -3) - gui.command("set_color_temp %d" % targetvalue) - time.sleep(2 * DT_TOGGLE) - if gui.data.get(obj.KEY_COLOR_TEMP) != targetvalue: - result += self.print_error("Color temperature change by gui was not successfull, while testing %s." % (member)) - success = False - # - gui.command("toggle_state") - time.sleep(2 * DT_TOGGLE) - # - if success: - result += self.print_success("Brightness-ColorTemp test successfull, while testing %s." % (member)) - return result + def command(self, full_command): + try: + parameter = " " + full_command.split(' ')[1] + except IndexError: + parameter = "" + command = full_command.split(' ')[0].split('.')[-1] + parameter + device_name = '.'.join(full_command.split(' ')[0].split('.')[:-1]) + self.getobjbyname(device_name).command(command) + + +class test_result_base(object): + def __init__(self): + self.__init_test_counters__() + + def __init_test_counters__(self): + self.test_counter = 0 + self.success_tests = 0 + self.failed_tests = 0 + + def statistic(self): + return (self.test_counter, self.success_tests, self.failed_tests) + + def print_statistic(self): + color = COLOR_SUCCESS if self.test_counter == self.success_tests else COLOR_FAIL + print(color + "*** SUCCESS: (%4d/%4d) FAIL: (%4d/%4d) ***\n" % (self.success_tests, + self.test_counter, self.failed_tests, self.test_counter) + colored.attr("reset")) + + +class test_collection(test_result_base): + def __init__(self, test_instance): + super().__init__() + self.test_instance = test_instance + + def capabilities(self): + return [TEST_FULL, TEST_SMOKE] + + def command(self, command): + self.__init_test_counters__() + for member in self.test_instance.getmembers(): + obj = self.test_instance.getobjbyname(member) + if id(obj) != id(self): + obj.test_all(command) + num, suc, fail = obj.statistic() + self.test_counter += num + self.success_tests += suc + self.failed_tests += fail + self.print_statistic() + + +class testcase(test_result_base): + def __init__(self): + super().__init__() + self.__test_list__ = [] + + def capabilities(self): + if len(self.__test_list__) > 0 and not 'test_all' in self.__test_list__: + self.__test_list__.append('test_all') + self.__test_list__.sort() + return self.__test_list__ + + def test_all(self, test=TEST_FULL): + test_counter = 0 + success_tests = 0 + failed_tests = 0 + for tc_name in self.capabilities(): + if tc_name != "test_all": + self.command(tc_name, test) + test_counter += self.test_counter + success_tests += self.success_tests + failed_tests += self.failed_tests + self.test_counter = test_counter + self.success_tests = success_tests + self.failed_tests = failed_tests + + def command(self, command, test=TEST_FULL): + self.__init_test_counters__() + tc = getattr(self, command) + self.__init_test_counters__() + rv = tc(test) + self.print_statistic() + + def heading(self, desciption): + print(desciption) + + def sub_heading(self, desciption): + print(2 * " " + desciption) + + def result(self, desciption, success): + self.test_counter += 1 + if success: + self.success_tests += 1 + else: + self.failed_tests += 1 + print(4 * " " + ("SUCCESS - " if success else "FAIL - ") + desciption) + + +class testcase_light(testcase): + def __init__(self, videv, sw_device, li_device): + self.videv = videv + self.sw_device = sw_device + self.li_device = li_device + self.__test_list__ = [] + if self.videv.enable_state: + self.__test_list__.append('test_power_on_off') + if self.videv.enable_brightness: + self.__test_list__.append('test_brightness') + if self.videv.enable_color_temp: + self.__test_list__.append('test_color_temp') + + def test_power_on_off(self, test=TEST_FULL): + self.heading("Power On/ Off test (%s)" % self.videv.topic) + # + sw_state = self.sw_device.get_data(self.sw_device.KEY_OUTPUT_0) + # + for i in range(0, 2): + self.sub_heading("State change of switching device") + # + self.sw_device.store_data(**{self.sw_device.KEY_OUTPUT_0: not self.sw_device.data.get(self.sw_device.KEY_OUTPUT_0)}) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after Switch on by switching device", sw_state != self.videv.get_data(self.videv.KEY_STATE)) + self.result("Switching device state after Switch on by switching device", + sw_state != self.sw_device.get_data(self.sw_device.KEY_OUTPUT_0)) + + self.sub_heading("State change of virtual device") + # + self.videv.send(self.videv.KEY_STATE, not self.videv.data.get(self.videv.KEY_STATE)) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after Switch off by virtual device", sw_state == self.videv.get_data(self.videv.KEY_STATE)) + self.result("Switching device state after Switch on by switching device", + sw_state == self.sw_device.get_data(self.sw_device.KEY_OUTPUT_0)) + + def test_brightness(self, test=TEST_FULL): + self.heading("Brightness test (%s)" % self.videv.topic) + # + br_state = self.li_device.get_data(self.li_device.KEY_BRIGHTNESS) + delta = -15 if br_state > 50 else 15 + + self.sw_device.store_data(**{self.sw_device.KEY_OUTPUT_0: True}) + time.sleep(DT_TOGGLE) + + for i in range(0, 2): + self.sub_heading("Brightness change by light device") + # + self.li_device.store_data(**{self.li_device.KEY_BRIGHTNESS: br_state + delta}) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after setting brightness by light device", + br_state + delta == self.videv.get_data(self.videv.KEY_BRIGHTNESS)) + self.result("Light device state after setting brightness by light device", br_state + + delta == self.li_device.get_data(self.li_device.KEY_BRIGHTNESS)) + + self.sub_heading("Brightness change by virtual device") + # + self.videv.send(self.videv.KEY_BRIGHTNESS, br_state) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after setting brightness by light device", br_state == self.videv.get_data(self.videv.KEY_BRIGHTNESS)) + self.result("Light device state after setting brightness by light device", + br_state == self.li_device.get_data(self.li_device.KEY_BRIGHTNESS)) + + self.sw_device.store_data(**{self.sw_device.KEY_OUTPUT_0: False}) + time.sleep(DT_TOGGLE) + + def test_color_temp(self, test=TEST_FULL): + self.heading("Color temperature test (%s)" % self.videv.topic) + # + ct_state = self.li_device.get_data(self.li_device.KEY_COLOR_TEMP) + delta = -3 if ct_state > 5 else 3 + + self.sw_device.store_data(**{self.sw_device.KEY_OUTPUT_0: True}) + time.sleep(DT_TOGGLE) + + for i in range(0, 2): + self.sub_heading("Color temperature change by light device") + # + self.li_device.store_data(**{self.li_device.KEY_COLOR_TEMP: ct_state + delta}) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after setting color temperature by light device", + ct_state + delta == self.videv.get_data(self.videv.KEY_COLOR_TEMP)) + self.result("Light device state after setting color temperature by light device", ct_state + + delta == self.li_device.get_data(self.li_device.KEY_COLOR_TEMP)) + + self.sub_heading("Color temperature change by virtual device") + # + self.videv.send(self.videv.KEY_COLOR_TEMP, ct_state) + time.sleep(DT_TOGGLE) + self.result("Virtual device state after setting color temperature by light device", + ct_state == self.videv.get_data(self.videv.KEY_COLOR_TEMP)) + self.result("Light device state after setting color temperature by light device", + ct_state == self.li_device.get_data(self.li_device.KEY_COLOR_TEMP)) + + self.sw_device.store_data(**{self.sw_device.KEY_OUTPUT_0: False}) + time.sleep(DT_TOGGLE) diff --git a/house_n_gui_sim.py b/house_n_gui_sim.py index addb228..696c6af 100644 --- a/house_n_gui_sim.py +++ b/house_n_gui_sim.py @@ -13,7 +13,7 @@ if __name__ == "__main__": mc = mqtt.mqtt_client(host=config.MQTT_SERVER, port=config.MQTT_PORT, username=config.MQTT_USER, password=config.MQTT_PASSWORD, name=config.APP_NAME + '_simulation') # - COMMANDS = ['quit', 'help', 'test.smoke', 'test.full'] + COMMANDS = ['quit', 'help'] # h = house(mc) for name in h.getmembers(): @@ -22,6 +22,10 @@ if __name__ == "__main__": COMMANDS.append(name + '.' + c) # ts = test_smarthome(h) + for name in ts.getmembers(): + d = ts.getobjbyname(name) + for c in d.capabilities(): + COMMANDS.append('test.' + name + '.' + c) def reduced_list(text): """ @@ -68,8 +72,8 @@ if __name__ == "__main__": break elif userfeedback == 'help': print("Help is not yet implemented!") - elif userfeedback == 'test.full': - ts.full() + elif userfeedback.startswith("test"): + ts.command(userfeedback) elif userfeedback == 'test.smoke': ts.smoke() elif command in COMMANDS[2:]: diff --git a/smart_brain.py b/smart_brain.py index 99bd463..781bb8f 100644 --- a/smart_brain.py +++ b/smart_brain.py @@ -7,11 +7,19 @@ import time logger = logging.getLogger(config.APP_NAME) -# TODO: Change Nodered topics to videv -# TODO: Extend virtual devices +# TODO: Extend virtual devices and implement all_off functionality in function.all_functions.init_off_functionality # * All Off # * ... -# TODO: Remove gui from rooms and devices +# TODO: Restructure nodered gui (own heating page - with circulation pump) +# TODO: Extend tests in simulation +# - Synch functions (ffe.livingroom.floorlamp [with main_light and 1-6], ffe.diningroom/floorlamp, ffe.dirk.amplifier (with spotify, mpd, cd_player), gfw.floor.main_light) +# - Remote actions after amplifier on +# - Switching button functions (gfw_dirk, ffe.sleep) +# - Heating functionality (base: set temp, set default, away_mode, summer_mode, start and stop boost) +# - Brightness button functions (gfw.dirk, ffe.sleep) +# - Motion stairways (incl. sensor feedback) +# - Heating functionality (extended: timer) +# - Timer (circulation and stairways) # TODO: Rework devices to base.mqtt (pack -> set, ...) # TODO: Implement handling of warnings (videv element to show in webapp?)