From d2db60ac5b210542c3a9c9d48d1d6038da18cf21 Mon Sep 17 00:00:00 2001 From: Dirk Alders Date: Wed, 4 Jan 2023 01:21:03 +0100 Subject: [PATCH] First house simulation implemented --- __simulation__/__init__.py | 0 __simulation__/devices.py | 560 +++++++++++++++++++++++++++++++++++++ __simulation__/rooms.py | 189 +++++++++++++ house_n_gui_sim.py | 70 +++++ 4 files changed, 819 insertions(+) create mode 100644 __simulation__/__init__.py create mode 100644 __simulation__/devices.py create mode 100644 __simulation__/rooms.py create mode 100644 house_n_gui_sim.py diff --git a/__simulation__/__init__.py b/__simulation__/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/__simulation__/devices.py b/__simulation__/devices.py new file mode 100644 index 0000000..bb97d45 --- /dev/null +++ b/__simulation__/devices.py @@ -0,0 +1,560 @@ +import colored +import json +import sys +import time + +# TODO: implement my powerplug + +COLOR_GUI_ACTIVE = colored.fg("light_blue") +COLOR_GUI_PASSIVE = COLOR_GUI_ACTIVE + colored.attr("dim") +COLOR_SHELLY_1 = colored.fg("light_magenta") +COLOR_SHELLY_2 = colored.fg("light_cyan") +COLOR_LIGHT_ACTIVE = colored.fg("yellow") +COLOR_LIGHT_PASSIVE = COLOR_LIGHT_ACTIVE + colored.attr("dim") +COLOR_MOTION_SENSOR = colored.fg("red") + + +def payload_filter(payload): + try: + return json.loads(payload) + except json.decoder.JSONDecodeError: + return payload.decode("utf-8") + + +def percent_print(value): + for i in range(0, 10): + if (value - 5) > 10*i: + sys.stdout.write(u"\u25ac") + else: + sys.stdout.write(u"\u25ad") + + +def command_int_value(value): + try: + return int(value) + except TypeError: + print("You need to give a integer parameter not '%s'" % str(value)) + + +class base(object): + AUTOSEND = True + COMMANDS = [] + + def __init__(self, mqtt_client, topic): + self.mqtt_client = mqtt_client + self.topic = topic + # + self.data = {} + self.callbacks = {} + self.commands = self.COMMANDS[:] + # + self.mqtt_client.add_callback(self.topic, self.__rx__) + self.mqtt_client.add_callback(self.topic + '/#', self.__rx__) + + def add_callback(self, key, callback, value): + if self.callbacks.get(key) is None: + self.callbacks[key] = [] + self.callbacks[key].append((callback, value)) + + def capabilities(self): + return self.commands + + def store_data(self, *args, **kwargs): + keys_changed = [] + for key in kwargs: + if kwargs[key] is not None and kwargs[key] != self.data.get(key): + keys_changed.append(key) + self.data[key] = kwargs[key] + for callback, value in self.callbacks.get(key, [(None, None)]): + if callback is not None and (value is None or value == kwargs[key]): + callback(self, key, kwargs[key]) + if self.AUTOSEND and len(keys_changed) > 0: + self.__tx__(keys_changed) + + def __tx__(self, keys_changed): + self.mqtt_client.send(self.topic, json.dumps(self.data)) + + def __rx__(self, client, userdata, message): + print("%s: __rx__ not handled!" % self.__class__.__name__) + + +class shelly(base): + KEY_OUTPUT_0 = "relay/0" + KEY_OUTPUT_1 = "relay/1" + # + COMMANDS = [ + "get_relay_0", "set_relay_0", "unset_relay_0", + "get_relay_1", "set_relay_1", "unset_relay_1", + ] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + # + self.store_data(**{self.KEY_OUTPUT_0: "off", self.KEY_OUTPUT_1: "off"}) + self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) + + def __rx__(self, client, userdata, message): + value = payload_filter(message.payload) + if message.topic == self.topic + "/relay/0/command": + if value in ['on', 'off']: + self.store_data(**{self.KEY_OUTPUT_0: value}) + elif value == 'toggle': + self.store_data(**{self.KEY_OUTPUT_0: 'on' if self.data.get(self.KEY_OUTPUT_0) == 'off' else 'off'}) + elif message.topic == self.topic + "/relay/1/command": + if value in ['on', 'off']: + self.store_data(**{self.KEY_OUTPUT_1: value}) + elif value == 'toggle': + self.store_data(**{self.KEY_OUTPUT_1: 'on' if self.data.get(self.KEY_OUTPUT_1) == 'off' else 'off'}) + + def __tx__(self, keys_changed): + for key in keys_changed: + self.mqtt_client.send(self.topic + '/' + key, self.data.get(key)) + + def command(self, command): + if command in self.COMMANDS: + if command == self.COMMANDS[0]: + self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) + elif command == self.COMMANDS[1]: + self.store_data(**{self.KEY_OUTPUT_0: "on"}) + elif command == self.COMMANDS[2]: + self.store_data(**{self.KEY_OUTPUT_0: "off"}) + elif command == self.COMMANDS[3]: + self.print_formatted(self, self.KEY_OUTPUT_1, self.data.get(self.KEY_OUTPUT_1)) + elif command == self.COMMANDS[4]: + self.store_data(**{self.KEY_OUTPUT_1: "on"}) + elif command == self.COMMANDS[5]: + self.store_data(**{self.KEY_OUTPUT_1: "off"}) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def print_formatted(self, device, key, value): + if value is not None: + if key == shelly.KEY_OUTPUT_0: + color = COLOR_SHELLY_1 + else: + color = COLOR_SHELLY_2 + if value == "on": + print(color + 10 * ' ' + u'\u2b24' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) + else: + print(color + 10 * ' ' + u'\u25ef' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) + + +class my_powerplug(base): + COMMANDS = [ + "get_state", "set_state", "unset_state", + ] + + def __init__(self, mqtt_client, topic, names=[]): + super().__init__(mqtt_client, topic) + # + self.names = [] + # + for i in range(0, 4): + self.data[str(i)] = False + try: + self.names.append(names[i]) + except IndexError: + self.names.append("Channel %d" % (i + 1)) + self.add_callback(str(i), self.print_formatted, None) + + def __rx__(self, client, userdata, message): + if message.topic.startswith(self.topic + '/output/') and message.topic.endswith('/set'): + channel = int(message.topic.split('/')[-2]) - 1 + payload = payload_filter(message.payload) + if payload == "toggle": + payload = not self.data.get(str(channel)) + self.store_data(**{str(channel): payload}) + + def __tx__(self, keys_changed): + for key in keys_changed: + self.mqtt_client.send(self.topic + "/output/" + str(int(key) + 1), json.dumps(self.data.get(key))) + + def command(self, command): + if command in self.COMMANDS: + if command == self.COMMANDS[0]: + self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) + elif command == self.COMMANDS[1]: + self.store_data(**{self.KEY_OUTPUT_0: 'on'}) + elif command == self.COMMANDS[2]: + self.store_data(**{self.KEY_OUTPUT_0: 'off'}) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def print_formatted(self, device, key, value): + if value is not None: + if value: + print(COLOR_SHELLY_1 + 10 * ' ' + u'\u2b24' + 6 * ' ' + " - ".join(self.topic.split('/') + [1:]) + " (%s)" % self.names[int(key)] + colored.attr("reset")) + else: + print(COLOR_SHELLY_1 + 10 * ' ' + u'\u25ef' + 6 * ' ' + " - ".join(self.topic.split('/') + [1:]) + " (%s)" % self.names[int(key)] + colored.attr("reset")) + + +class silvercrest_powerplug(base): + KEY_OUTPUT_0 = "state" + # + COMMANDS = [ + "get_state", "set_state", "unset_state", + ] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) + # + self.store_data(**{self.KEY_OUTPUT_0: "off"}) + + def __rx__(self, client, userdata, message): + if message.topic == self.topic + '/set': + STATES = ["on", "off", "toggle"] + # + state = json.loads(message.payload).get('state').lower() + if state in STATES: + if state == STATES[0]: + self.store_data(**{self.KEY_OUTPUT_0: 'on'}) + elif state == STATES[1]: + self.store_data(**{self.KEY_OUTPUT_0: 'off'}) + else: + self.store_data(**{self.KEY_OUTPUT_0: "off" if self.data.get(self.KEY_OUTPUT_0) == "on" else "on"}) + + def command(self, command): + if command in self.COMMANDS: + if command == self.COMMANDS[0]: + self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) + elif command == self.COMMANDS[1]: + self.store_data(**{self.KEY_OUTPUT_0: 'on'}) + elif command == self.COMMANDS[2]: + self.store_data(**{self.KEY_OUTPUT_0: 'off'}) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def print_formatted(self, device, key, value): + if value is not None: + if value == "on": + print(COLOR_SHELLY_1 + 10 * ' ' + u'\u2b24' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) + else: + print(COLOR_SHELLY_1 + 10 * ' ' + u'\u25ef' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) + + +class silvercrest_motion_sensor(base): + KEY_OCCUPANCY = "occupancy" + COMMANDS = ['motion'] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + self.data[self.KEY_OCCUPANCY] = False + self.add_callback(self.KEY_OCCUPANCY, self.print_formatted, None) + + def __rx__(self, client, userdata, message): + pass + + def command(self, command): + try: + command, value = command.split(' ') + except ValueError: + value = None + else: + value = json.loads(value) + if command == self.COMMANDS[0]: + self.store_data(**{self.KEY_OCCUPANCY: True}) + time.sleep(value or 10) + self.store_data(**{self.KEY_OCCUPANCY: False}) + + def print_formatted(self, device, key, value): + if value is not None: + if value: + print(COLOR_MOTION_SENSOR + 10 * ' ' + u'\u2b24' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) + else: + print(COLOR_MOTION_SENSOR + 10 * ' ' + u'\u25ef' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) + + +class tradfri_light(base): + KEY_STATE = "state" + KEY_BRIGHTNESS = "brightness" + KEY_COLOR_TEMP = "color_temp" + KEY_BRIGHTNESS_MOVE = "brightness_move" + # + STATE_COMMANDS = ("get_state", "change_state", ) + BRIGHTNESS_COMMANDS = ("get_brightness", "set_brightness",) + COLOR_TEMP_COMMANDS = ("get_color_temp", "set_color_temp",) + + def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, send_on_power_on=True): + super().__init__(mqtt_client, topic) + self.send_on_power_on = send_on_power_on + self.add_callback(self.KEY_STATE, self.print_formatted, None) + self.add_callback(self.KEY_BRIGHTNESS, self.print_formatted, None) + self.add_callback(self.KEY_COLOR_TEMP, self.print_formatted, None) + # + self.commands = [] + if enable_state: + self.commands.extend(self.STATE_COMMANDS) + if enable_brightness: + self.commands.extend(self.BRIGHTNESS_COMMANDS) + if enable_color_temp: + self.commands.extend(self.COLOR_TEMP_COMMANDS) + self.__init_data__(enable_state, enable_brightness, enable_color_temp) + + def __init_data__(self, enable_state, enable_brightness, enable_color_temp): + data = {} + if enable_state: + data[self.KEY_STATE] = 'off' + self.commands.extend(self.STATE_COMMANDS) + if enable_brightness: + data[self.KEY_BRIGHTNESS] = 128 + self.brightnes_move = (0, time.time()) + self.commands.extend(self.BRIGHTNESS_COMMANDS) + if enable_color_temp: + data[self.KEY_COLOR_TEMP] = 352 + self.commands.extend(self.COLOR_TEMP_COMMANDS) + self.store_data(**data) + + def __rx__(self, client, userdata, message): + data = json.loads(message.payload) + if self.data.get(self.KEY_STATE) == 'on' or data.get(self.KEY_STATE) in ['on', 'toggle']: + if message.topic.startswith(self.topic) and message.topic.endswith('/set'): + for targetkey in data: + value = data[targetkey] + if targetkey in self.data.keys(): + if targetkey == self.KEY_STATE and value == "toggle": + value = "on" if self.data.get(self.KEY_STATE) == "off" else "off" + self.store_data(**{targetkey: value}) + else: + if targetkey == self.KEY_BRIGHTNESS_MOVE: + new_value = self.data.get(self.KEY_BRIGHTNESS) + (time.time() - self.brightnes_move[1]) * self.brightnes_move[0] + if new_value < 0: + new_value = 0 + if new_value > 255: + new_value = 255 + self.store_data(**{self.KEY_BRIGHTNESS: int(new_value)}) + self.brightnes_move = (value, time.time()) + else: + print("%s: UNKNOWN KEY %s" % (message.topic, targetkey)) + elif message.topic == self.topic + '/get': + self.__tx__(None) + + def command(self, command): + try: + command, value = command.split(' ') + except ValueError: + value = None + if command in self.capabilities(): + if command == self.STATE_COMMANDS[0]: + self.print_formatted(self, self.KEY_STATE, self.data.get(self.KEY_STATE)) + elif command == self.STATE_COMMANDS[1]: + self.store_data(**{self.KEY_STATE: 'off' if self.data.get(self.KEY_STATE) == 'on' else 'on'}) + elif command == self.BRIGHTNESS_COMMANDS[0]: + self.print_formatted(self, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) + elif command == self.BRIGHTNESS_COMMANDS[1]: + self.store_data(**{self.KEY_BRIGHTNESS: command_int_value(value)}) + elif command == self.COLOR_TEMP_COMMANDS[0]: + self.print_formatted(self, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) + elif command == self.COLOR_TEMP_COMMANDS[1]: + self.store_data(**{self.KEY_COLOR_TEMP: command_int_value(value)}) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def power_off(self, device, key, value): + self.data[self.KEY_STATE] = 'off' + self.print_formatted(self, self.KEY_STATE, 'off') + + def power_on(self, device, key, value): + if self.send_on_power_on: + self.store_data(**{self.KEY_STATE: 'on'}) + else: + self.data[self.KEY_STATE] = 'on' + self.print_formatted(self, self.KEY_STATE, 'on') + + def print_formatted(self, device, key, value): + if value is not None: + color = COLOR_LIGHT_ACTIVE + if key == self.KEY_STATE: + if value == "on": + print(color + 10 * ' ' + u'\u2b24' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) + else: + print(color + 10 * ' ' + u'\u25ef' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) + self.print_formatted(device, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) + self.print_formatted(device, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) + elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: + if self.data.get(self.KEY_STATE) != "on": + color = COLOR_LIGHT_PASSIVE + if key == self.KEY_BRIGHTNESS: + value = round(value * 100 / 256, 0) + else: + value = round((value - 250) * 100 / 204, 0) + sys.stdout.write(color) + sys.stdout.write('B' if key == gui_light.KEY_BRIGHTNESS else 'C') + percent_print(value) + sys.stdout.write("%3d%% " % value) + print(" - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) + + +class gui_light(tradfri_light): + AUTOSEND = False + # + KEY_ENABLE = "enable" + + def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False): + super().__init__(mqtt_client, topic, enable_state, enable_brightness, enable_color_temp) + self.add_callback(self.KEY_ENABLE, self.print_formatted, None) + + def __init_data__(self, enable_state, enable_brightness, enable_color_temp): + data = {} + data[self.KEY_ENABLE] = False + if enable_state: + data[self.KEY_STATE] = False + if enable_brightness: + data[self.KEY_BRIGHTNESS] = 50 + if enable_color_temp: + data[self.KEY_COLOR_TEMP] = 5 + self.store_data(**data) + + def __rx__(self, client, userdata, message): + value = payload_filter(message.payload) + if message.topic.startswith(self.topic) and message.topic.endswith('/set'): + targetkey = message.topic.split('/')[-2] + if targetkey in self.data.keys(): + self.store_data(**{targetkey: value}) + else: + print("Unknown key %s in %s" % (targetkey, self.__class__.__name__)) + elif message.topic == self.topic + '/get': + self.__tx__(None) + + def send(self, key, data): + if data is not None: + topic = self.topic + '/' + key + self.mqtt_client.send(topic, json.dumps(data)) + + def command(self, command): + try: + command, value = command.split(' ') + except ValueError: + value = None + if command in self.capabilities(): + if command == self.STATE_COMMANDS[0]: + self.print_formatted(self, self.KEY_STATE, self.data.get(self.KEY_STATE)) + elif command == self.STATE_COMMANDS[1]: + self.send(self.KEY_STATE, not self.data.get(self.KEY_STATE)) + elif command == self.BRIGHTNESS_COMMANDS[0]: + self.print_formatted(self, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) + elif command == self.BRIGHTNESS_COMMANDS[1]: + self.send(self.KEY_BRIGHTNESS, command_int_value(value)) + elif command == self.COLOR_TEMP_COMMANDS[0]: + self.print_formatted(self, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) + elif command == self.COLOR_TEMP_COMMANDS[1]: + self.send(self.KEY_COLOR_TEMP, command_int_value(value)) + else: + print("%s: not yet implemented!" % command) + else: + print("Unknown command!") + + def print_formatted(self, device, key, value): + if value is not None: + color = COLOR_GUI_ACTIVE + if key == self.KEY_STATE: + if value == True: + print(color + 10 * ' ' + u'\u25a0' + 6 * ' ' + " - ".join(self.topic.split('/')[1:-1]) + colored.attr("reset")) + else: + print(color + 10 * ' ' + u'\u25a1' + 6*' ' + " - ".join(self.topic.split('/')[1:-1]) + colored.attr("reset")) + elif key == self.KEY_ENABLE: + self.print_formatted(device, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) + self.print_formatted(device, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) + elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: + if not self.data.get(self.KEY_ENABLE, False): + color = COLOR_GUI_PASSIVE + value = round(value * 10 if key == self.KEY_COLOR_TEMP else value, 0) + sys.stdout.write(color) + sys.stdout.write('B' if key == self.KEY_BRIGHTNESS else 'C') + percent_print(value) + sys.stdout.write("%3d%% " % value) + print(" - ".join(self.topic.split('/')[1:-1]) + colored.attr("reset")) + + +class tradfri_button(base): + KEY_ACTION = "action" + # + ACTION_TOGGLE = "toggle" + ACTION_BRIGHTNESS_UP = "brightness_up_click" + ACTION_BRIGHTNESS_DOWN = "brightness_down_click" + ACTION_RIGHT = "arrow_right_click" + ACTION_LEFT = "arrow_left_click" + ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" + ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" + ACTION_RIGHT_LONG = "arrow_right_hold" + ACTION_LEFT_LONG = "arrow_left_hold" + # + COMMANDS = [ACTION_TOGGLE, ACTION_LEFT, ACTION_RIGHT, ACTION_BRIGHTNESS_UP, ACTION_BRIGHTNESS_DOWN, + ACTION_LEFT_LONG, ACTION_RIGHT_LONG, ACTION_BRIGHTNESS_UP_LONG, ACTION_BRIGHTNESS_DOWN_LONG] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def __rx__(self, client, userdata, message): + pass + + def command(self, command): + try: + command, value = command.split(' ') + except ValueError: + value = None + else: + value = json.loads(value) + if command in self.capabilities(): + action = self.COMMANDS[self.COMMANDS.index(command)] + if self.COMMANDS.index(command) <= 4: + self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) + elif self.COMMANDS.index(command) <= 8: + self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) + time.sleep(value or 0.5) + action = '_'.join(action.split('_')[:-1] + ['release']) + self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) + + +class gui_led_array(base): + AUTOSEND = False + # + KEY_LED_0 = "led0" + KEY_LED_1 = "led1" + KEY_LED_2 = "led2" + KEY_LED_3 = "led3" + KEY_LED_4 = "led4" + KEY_LED_5 = "led5" + KEY_LED_6 = "led6" + KEY_LED_7 = "led7" + KEY_LED_8 = "led8" + KEY_LED_9 = "led9" + + def __init__(self, mqtt_client, topic, names=[]): + super().__init__(mqtt_client, topic) + self.names = {} + for i in range(0, 10): + key = getattr(self, "KEY_LED_%d" % i) + self.data[key] = False + try: + self.names[key] = names[i] + except IndexError: + self.names[key] = key + self.add_callback(key, self.print_formatted, None) + + def __rx__(self, client, userdata, message): + value = payload_filter(message.payload) + if message.topic.startswith(self.topic) and message.topic.endswith('/set'): + targetkey = message.topic.split('/')[-2] + if targetkey in self.data.keys(): + self.store_data(**{targetkey: value}) + else: + print("Unknown key %s in %s" % (targetkey, self.__class__.__name__)) + + def print_formatted(self, device, key, value): + if value: + color = colored.fg('green') + else: + color = "" + print(color + 10 * ' ' + u"\u2b24" + 6 * ' ' + COLOR_GUI_ACTIVE + + ' - '.join(self.topic.split('/')[:-1]) + ' (%s)' % self.names[key] + colored.attr("reset")) diff --git a/__simulation__/rooms.py b/__simulation__/rooms.py new file mode 100644 index 0000000..b3647a0 --- /dev/null +++ b/__simulation__/rooms.py @@ -0,0 +1,189 @@ +import config +from __simulation__.devices import shelly, silvercrest_powerplug, gui_light, tradfri_light, tradfri_button, gui_led_array, silvercrest_motion_sensor, my_powerplug +import inspect + +# TODO: ffe_floor: Implement longpress shelly +# TODO: gfw_dirk: Implement 2nd input for floor light +# TODO: ffe_sleep: Implement heating valve +# TODO: gfw_dirk: Add at least brightness amplifier remote feedback to console +# TODO: ffe_diningroom: Implement garland gui_switch +# TODO: ffe_kitchen: Implement circulation pump + + +class base(object): + def getmembers(self, prefix=''): + rv = [] + for name, obj in inspect.getmembers(self): + if prefix: + full_name = prefix + '.' + name + else: + full_name = name + if not name.startswith('_'): + try: + if obj.__module__.endswith('devices'): + rv.append(full_name) + else: + rv.extend(obj.getmembers(full_name)) + except AttributeError: + pass + return rv + + def getobjbyname(self, name): + obj = self + for subname in name.split('.'): + obj = getattr(obj, subname) + return obj + + def command(self, full_command): + try: + parameter = " " + full_command.split(' ')[1] + except IndexError: + parameter = "" + command = full_command.split(' ')[0].split('.')[-1] + parameter + device_name = '.'.join(full_command.split(' ')[0].split('.')[:-1]) + self.getobjbyname(device_name).command(command) + + +class gfw_floor(base): + def __init__(self, mqtt_client): + self.gui_switch_main_light = gui_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_GUI_SWITCH, True, False, False) + self.main_light = shelly(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_SHELLY) + self.main_light_zigbee_1 = tradfri_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_1_ZIGBEE, True, True, True, False) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee_1.power_on, "on") + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee_1.power_off, "off") + self.main_light_zigbee_2 = tradfri_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_2_ZIGBEE, True, True, True, False) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee_2.power_on, "on") + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee_1.power_off, "off") + self.gui_br_ct_main_light = gui_light(mqtt_client, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_GUI_BR_CT, False, True, True) + + +class gfw_marion(base): + def __init__(self, mqtt_client): + self.gui_switch_main_light = gui_light(mqtt_client, config.TOPIC_GFW_MARION_MAIN_LIGHT_GUI_SWITCH, True, False, False) + self.main_light = shelly(mqtt_client, config.TOPIC_GFW_MARION_MAIN_LIGHT_SHELLY) + + +class gfw_dirk(base): + def __init__(self, mqtt_client): + self.gui_switch_main_light = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_MAIN_LIGHT_GUI_SWITCH, True, False, False) + self.main_light = shelly(mqtt_client, config.TOPIC_GFW_DIRK_MAIN_LIGHT_SHELLY) + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_GFW_DIRK_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, "on") + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, "off") + self.gui_br_ct_main_light = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_MAIN_LIGHT_GUI_BR_CT, False, True, True) + # + self.powerplug = my_powerplug(mqtt_client, config.TOPIC_GFW_DIRK_POWERPLUG, ["Amplifier", "Desk Light", "CD-Player", "PC-Dock"]) + self.gui_switch_amplifier = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_AMPLIFIER_GUI_SWITCH, True, False, False) + self.gui_switch_desk_light = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_DESK_LIGHT_GUI_SWITCH, True, False, False) + self.gui_br_ct_desk_light = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_DESK_LIGHT_GUI_BR_CT, False, True, True) + self.gui_switch_cd_player = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_CD_PLAYER_GUI_SWITCH, True, False, False) + self.gui_switch_pc_dock = gui_light(mqtt_client, config.TOPIC_GFW_DIRK_PC_DOCK_GUI_SWITCH, True, False, False) + # + self.input_device = tradfri_button(mqtt_client, config.TOPIC_GFW_DIRK_INPUT_DEVICE) + self.led_array = gui_led_array(mqtt_client, config.TOPIC_GFW_DIRK_DEVICE_CHOOSER_LED, ["Main Light", "Desk Light", "Amplifier"]) + + +class gfw(base): + def __init__(self, mqtt_client): + self.floor = gfw_floor(mqtt_client) + self.marion = gfw_marion(mqtt_client) + self.dirk = gfw_dirk(mqtt_client) + + +class ffw_julian(base): + def __init__(self, mqtt_client): + self.gui_switch_main_light = gui_light(mqtt_client, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_GUI_SWITCH, True, False, False) + self.main_light = shelly(mqtt_client, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_SHELLY) + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, "on") + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, "off") + self.gui_br_ct_main_light = gui_light(mqtt_client, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_GUI_BR_CT, False, True, True) + + +class ffw_livingroom(base): + def __init__(self, mqtt_client): + self.gui_switch_main_light = gui_light(mqtt_client, config.TOPIC_FFW_LIVINGROOM_MAIN_LIGHT_GUI_SWITCH, True, False, False) + self.main_light = shelly(mqtt_client, config.TOPIC_FFW_LIVINGROOM_MAIN_LIGHT_SHELLY) + + +class ffw(base): + def __init__(self, mqtt_client): + self.julian = ffw_julian(mqtt_client) + self.livingroom = ffw_livingroom(mqtt_client) + + +class ffe_floor(base): + def __init__(self, mqtt_client): + self.gui_switch_main_light = gui_light(mqtt_client, config.TOPIC_FFE_FLOOR_MAIN_LIGHT_GUI_SWITCH, True, False, False) + self.main_light = shelly(mqtt_client, config.TOPIC_FFE_FLOOR_MAIN_LIGHT_SHELLY) + + +class ffe_kitchen(base): + def __init__(self, mqtt_client): + self.gui_switch_main_light = gui_light(mqtt_client, config.TOPIC_FFE_KITCHEN_MAIN_LIGHT_GUI_SWITCH, True, False, False) + self.main_light = shelly(mqtt_client, config.TOPIC_FFE_KITCHEN_MAIN_LIGHT_SHELLY) + + +class ffe_diningroom(base): + def __init__(self, mqtt_client): + self.gui_switch_main_light = gui_light(mqtt_client, config.TOPIC_FFE_DININGROOM_MAIN_LIGHT_GUI_SWITCH, True, False, False) + self.main_light = shelly(mqtt_client, config.TOPIC_FFE_DININGROOM_MAIN_LIGHT_SHELLY) + self.gui_switch_floor_lamp = gui_light(mqtt_client, config.TOPIC_FFE_DININGROOM_FLOOR_LAMP_GUI_SWITCH, True, False, False) + self.floor_lamp = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_DININGROOM_FLOOR_LAMP_POWERPLUG) + self.garland = silvercrest_powerplug(mqtt_client, config.TOPIC_FFE_DININGROOM_GARLAND_POWERPLUG) + + +class ffe_sleep(base): + def __init__(self, mqtt_client): + self.gui_switch_main_light = gui_light(mqtt_client, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_GUI_SWITCH, True, False, False) + self.main_light = shelly(mqtt_client, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_SHELLY) + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, "on") + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, "off") + self.gui_br_ct_main_light = gui_light(mqtt_client, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_GUI_BR_CT, False, True, True) + # + self.gui_switch_bed_light_di = gui_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_DI_GUI_SWITCH, True, False, False) + self.bed_light_di_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_DI_ZIGBEE, True, True, False) + self.gui_br_ct_bed_light_di = gui_light(mqtt_client, config.TOPIC_FFE_SLEEP_BED_LIGHT_DI_GUI_BR_CT, False, True, False) + # + self.input_device = tradfri_button(mqtt_client, config.TOPIC_FFE_SLEEP_INPUT_DEVICE) + self.led_array = gui_led_array(mqtt_client, config.TOPIC_FFE_SLEEP_DEVICE_CHOOSER_LED, ["Main Light", "Bed Light Dirk"]) + + +class ffe_livingroom(base): + def __init__(self, mqtt_client): + self.gui_switch_main_light = gui_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_GUI_SWITCH, True, False, False) + self.main_light = shelly(mqtt_client, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_SHELLY) + self.main_light_zigbee = tradfri_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_ZIGBEE, True, True, True) + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_on, "on") + self.main_light.add_callback(shelly.KEY_OUTPUT_0, self.main_light_zigbee.power_off, "off") + self.gui_br_ct_main_light = gui_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_GUI_BR_CT, False, True, True) + for i in range(1, 7): + setattr(self, "floor_lamp_%d" % i, tradfri_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_FLOOR_LAMP_ZIGBEE % i, True, True, True)) + self.gui_switch_floor_lamp = gui_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_FLOOR_LAMP_GUI_SWITCH, True, False, False) + self.gui_br_ct_floor_lamp = gui_light(mqtt_client, config.TOPIC_FFE_LIVINGROOM_FLOOR_LAMP_GUI_BR_CT, False, True, True) + + +class ffe(base): + def __init__(self, mqtt_client): + self.floor = ffe_floor(mqtt_client) + self.kitchen = ffe_kitchen(mqtt_client) + self.diningroom = ffe_diningroom(mqtt_client) + self.sleep = ffe_sleep(mqtt_client) + self.livingroom = ffe_livingroom(mqtt_client) + + +class stairway(base): + def __init__(self, mqtt_client): + self.gui_switch_main_light = gui_light(mqtt_client, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_GUI_SWITCH, True, False, False) + self.main_light = shelly(mqtt_client, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_SHELLY) + self.motion_sensor_gf = silvercrest_motion_sensor(mqtt_client, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_MOTION_SENSOR_GF) + self.motion_sensor_ff = silvercrest_motion_sensor(mqtt_client, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_MOTION_SENSOR_FF) + + +class house(base): + def __init__(self, mqtt_client): + self.gfw = gfw(mqtt_client) + self.ffw = ffw(mqtt_client) + self.ffe = ffe(mqtt_client) + self.stairway = stairway(mqtt_client) diff --git a/house_n_gui_sim.py b/house_n_gui_sim.py new file mode 100644 index 0000000..b1b067d --- /dev/null +++ b/house_n_gui_sim.py @@ -0,0 +1,70 @@ +import config +import logging +import mqtt +import readline +import report +from __simulation__.rooms import house +import time + +if __name__ == "__main__": + report.stdoutLoggingConfigure(((config.APP_NAME, logging.WARNING), ), report.SHORT_FMT) + # + mc = mqtt.mqtt_client(host=config.MQTT_SERVER, port=config.MQTT_PORT, username=config.MQTT_USER, + password=config.MQTT_PASSWORD, name=config.APP_NAME + '_simulation') + # + COMMANDS = ['quit', 'help'] + # + h = house(mc) + for name in h.getmembers(): + d = h.getobjbyname(name) + for c in d.capabilities(): + COMMANDS.append(name + '.' + c) + # + + def reduced_list(text): + """ + Create reduced completation list + """ + reduced_list = {} + for cmd in COMMANDS: + next_dot = cmd[len(text):].find('.') + if next_dot >= 0: + reduced_list[cmd[:len(text) + next_dot + 1]] = None + else: + reduced_list[cmd] = None + return reduced_list.keys() + + def completer(text, state): + """ + Our custom completer function + """ + options = [x for x in reduced_list(text) if x.startswith(text)] + return options[state] + + def complete(text, state): + for cmd in COMMANDS: + if cmd.startswith(text): + if not state: + hit = "" + index = 0 + sub_list = cmd.split('.') + while len(text) >= len(hit): + hit += sub_list[index] + '.' + return hit # cmd + else: + state -= 1 + + readline.parse_and_bind("tab: complete") + readline.set_completer(completer) + print("\nEnter command: ") + while True: + userfeedback = input('') + command = userfeedback.split(' ')[0] + if userfeedback == 'quit': + break + elif userfeedback == 'help': + print("Help is not yet implemented!") + elif command in COMMANDS[2:]: + h.command(userfeedback) + else: + print("Unknown command!")