import colored import json import sys import task import time COLOR_GUI_ACTIVE = colored.fg("light_blue") COLOR_GUI_PASSIVE = COLOR_GUI_ACTIVE + colored.attr("dim") COLOR_SHELLY = colored.fg("light_magenta") COLOR_POWERPLUG = colored.fg("light_cyan") COLOR_LIGHT_ACTIVE = colored.fg("yellow") COLOR_LIGHT_PASSIVE = COLOR_LIGHT_ACTIVE + colored.attr("dim") COLOR_MOTION_SENSOR = colored.fg("dark_orange_3b") COLOR_RADIATOR_VALVE = colored.fg("red") COLOR_REMOTE = colored.fg("green") def payload_filter(payload): try: return json.loads(payload) except json.decoder.JSONDecodeError: return payload.decode("utf-8") def percent_bar(value): rv = "" for i in range(0, 10): rv += u"\u25ac" if (value - 5) > 10*i else u"\u25ad" return rv def green_led(): return colored.fg('green') + "\u2b24" + colored.attr("reset") def grey_led(): return colored.fg('light_gray') + "\u2b24" + colored.attr("reset") def command_int_value(value): try: return int(value) except TypeError: print("You need to give a integer parameter not '%s'" % str(value)) def command_float_value(value): try: return float(value) except TypeError: print("You need to give a integer parameter not '%s'" % str(value)) class base(object): AUTOSEND = True COMMANDS = [] def __init__(self, mqtt_client, topic): self.mqtt_client = mqtt_client self.topic = topic # self.data = {} self.callbacks = {} self.names = {} self.commands = self.COMMANDS[:] # self.mqtt_client.add_callback(self.topic, self.__rx__) self.mqtt_client.add_callback(self.topic + '/#', self.__rx__) def add_callback(self, key, callback, value): if self.callbacks.get(key) is None: self.callbacks[key] = [] self.callbacks[key].append((callback, value)) def add_channel_name(self, key, name): self.names[key] = name def capabilities(self): return self.commands def store_data(self, *args, **kwargs): keys_changed = [] for key in kwargs: if kwargs[key] is not None and kwargs[key] != self.data.get(key): keys_changed.append(key) self.data[key] = kwargs[key] for callback, value in self.callbacks.get(key, [(None, None)]): if callback is not None and (value is None or value == kwargs[key]): callback(self, key, kwargs[key]) if self.AUTOSEND and len(keys_changed) > 0: self.__tx__(keys_changed) def __tx__(self, keys_changed): self.mqtt_client.send(self.topic, json.dumps(self.data)) def __rx__(self, client, userdata, message): print("%s: __rx__ not handled!" % self.__class__.__name__) class shelly(base): KEY_OUTPUT_0 = "relay/0" KEY_OUTPUT_1 = "relay/1" KEY_INPUT_0 = "input/0" KEY_INPUT_1 = "input/1" KEY_LONGPUSH_0 = "longpush/0" KEY_LONGPUSH_1 = "longpush/1" # INPUT_FUNC_OUT1_FOLLOW = "out1_follow" INPUT_FUNC_OUT1_TRIGGER = "out1_trigger" INPUT_FUNC_OUT2_FOLLOW = "out2_follow" INPUT_FUNC_OUT2_TRIGGER = "out2_trigger" # COMMANDS = [ "get_relay_0", "toggle_relay_0", "get_relay_1", "toggle_relay_1", "get_input_0", "toggle_input_0", "get_input_1", "toggle_input_1", "trigger_long_input_0", "trigger_long_input_1", ] def __init__(self, mqtt_client, topic, input_0_func=None, input_1_func=None, output_0_auto_off=None): super().__init__(mqtt_client, topic) # self.store_data(**{self.KEY_OUTPUT_0: "off", self.KEY_OUTPUT_1: "off", self.KEY_INPUT_0: "off", self.KEY_INPUT_1: "off"}) self.__input_0_func = input_0_func self.__input_1_func = input_1_func self.__output_0_auto_off__ = output_0_auto_off if self.__output_0_auto_off__ is not None: self.__delayed_off__ = task.delayed(float(self.__output_0_auto_off__), self.__auto_off__, self.KEY_OUTPUT_0) # self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) self.add_callback(self.KEY_OUTPUT_0, self.__start_auto_off__, "on") self.add_callback(self.KEY_OUTPUT_0, self.__stop_auto_off__, "off") self.add_callback(self.KEY_OUTPUT_1, self.print_formatted, None) # self.add_callback(self.KEY_INPUT_0, self.__input_function__, None) self.add_callback(self.KEY_INPUT_1, self.__input_function__, None) def __rx__(self, client, userdata, message): value = payload_filter(message.payload) if message.topic.startswith(self.topic) and message.topic.endswith("/command"): key = '/'.join(message.topic.split('/')[-3:-1]) if value == 'toggle': self.__toggle_data__(key) else: self.__set_data__(key, value) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic + '/' + key, self.data.get(key)) def __input_function__(self, device, key, data): if key == self.KEY_INPUT_0: func = self.__input_0_func elif key == self.KEY_INPUT_1: func = self.__input_1_func else: func = None if func == self.INPUT_FUNC_OUT1_FOLLOW: self.__set_data__(self.KEY_OUTPUT_0, data) elif func == self.INPUT_FUNC_OUT1_TRIGGER: self.__toggle_data__(self.KEY_OUTPUT_0) elif func == self.INPUT_FUNC_OUT2_FOLLOW: self.__set_data__(self.KEY_OUTPUT_1, data) elif func == self.INPUT_FUNC_OUT2_TRIGGER: self.__toggle_data__(self.KEY_OUTPUT_1) def __start_auto_off__(self, device, key, data): self.__stop_auto_off__(device, key, data) if self.__output_0_auto_off__ is not None: self.__delayed_off__.run() def __stop_auto_off__(self, device, key, data): if self.__output_0_auto_off__ is not None: if not self.__delayed_off__._stopped: self.__delayed_off__.stop() def __auto_off__(self, key): if key == self.KEY_OUTPUT_0: self.__set_data__(key, 'off') def __set_data__(self, key, value): if value in ["on", "off"]: self.store_data(**{key: value}) else: print("Wrong value (%s)!" % repr(value)) def __toggle_data__(self, key): if key in self.data: self.__set_data__(key, 'on' if self.data.get(key) == 'off' else 'off') def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.__toggle_data__(self.KEY_OUTPUT_0) elif command == self.COMMANDS[2]: self.print_formatted(self, self.KEY_OUTPUT_1, self.data.get(self.KEY_OUTPUT_1)) elif command == self.COMMANDS[3]: self.__toggle_data__(self.KEY_OUTPUT_1) elif command == self.COMMANDS[4]: self.print_formatted(self, self.KEY_INPUT_0, self.data.get(self.KEY_INPUT_0)) elif command == self.COMMANDS[5]: self.__toggle_data__(self.KEY_INPUT_0) elif command == self.COMMANDS[6]: self.print_formatted(self, self.KEY_INPUT_1, self.data.get(self.KEY_INPUT_1)) elif command == self.COMMANDS[7]: self.__toggle_data__(self.KEY_INPUT_1) elif command == self.COMMANDS[8]: self.__toggle_data__(self.KEY_INPUT_0) time.sleep(0.4) self.__set_data__(self.KEY_LONGPUSH_0, 'on') time.sleep(0.1) self.__set_data__(self.KEY_LONGPUSH_0, 'off') elif command == self.COMMANDS[9]: self.__toggle_data__(self.KEY_INPUT_1) time.sleep(0.4) self.__set_data__(self.KEY_LONGPUSH_1, 'on') time.sleep(0.1) self.__set_data__(self.KEY_LONGPUSH_1, 'off') else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: icon = u'\u2b24' if value == "on" else u'\u25ef' info = (" - %ds" % self.__output_0_auto_off__) if self.__output_0_auto_off__ is not None and value == "on" else "" channel = "(%s%s)" % (self.names.get(key, key), info) devicename = " - ".join(self.topic.split('/')[1:]) print(COLOR_SHELLY + 10 * ' ' + icon + 9 * ' ' + devicename + ' ' + channel + colored.attr("reset")) class my_powerplug(base): KEY_OUTPUT_0 = "0" KEY_OUTPUT_1 = "1" KEY_OUTPUT_2 = "2" KEY_OUTPUT_3 = "3" COMMANDS = [ "get_output_0", "toggle_output_0", "get_output_1", "toggle_output_1", "get_output_2", "toggle_output_2", "get_output_3", "toggle_output_3", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # for i in range(0, 4): self.data[str(i)] = False self.add_callback(str(i), self.print_formatted, None) def __rx__(self, client, userdata, message): if message.topic.startswith(self.topic + '/output/') and message.topic.endswith('/set'): try: channels = [int(message.topic.split('/')[-2]) - 1] except ValueError: channels = range(0, 4) for channel in channels: payload = payload_filter(message.payload) if payload == "toggle": payload = not self.data.get(str(channel)) self.store_data(**{str(channel): payload}) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic + "/output/" + str(int(key) + 1), json.dumps(self.data.get(key))) def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.store_data(**{self.KEY_OUTPUT_0: not self.data.get(self.KEY_OUTPUT_0)}) elif command == self.COMMANDS[2]: self.print_formatted(self, self.KEY_OUTPUT_1, self.data.get(self.KEY_OUTPUT_1)) elif command == self.COMMANDS[3]: self.store_data(**{self.KEY_OUTPUT_1: not self.data.get(self.KEY_OUTPUT_1)}) elif command == self.COMMANDS[4]: self.print_formatted(self, self.KEY_OUTPUT_2, self.data.get(self.KEY_OUTPUT_2)) elif command == self.COMMANDS[5]: self.store_data(**{self.KEY_OUTPUT_2: not self.data.get(self.KEY_OUTPUT_2)}) elif command == self.COMMANDS[6]: self.print_formatted(self, self.KEY_OUTPUT_3, self.data.get(self.KEY_OUTPUT_3)) elif command == self.COMMANDS[7]: self.store_data(**{self.KEY_OUTPUT_3: not self.data.get(self.KEY_OUTPUT_3)}) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: icon = u'\u2b24' if value else u'\u25ef' channel = "(%s)" % self.names.get(key, "Channel %d" % (int(key) + 1)) devicename = " - ".join(self.topic.split('/')[1:]) print(COLOR_POWERPLUG + 10 * ' ' + icon + 9 * ' ' + devicename + ' ' + channel + colored.attr("reset")) class silvercrest_powerplug(base): KEY_OUTPUT_0 = "state" # COMMANDS = [ "get_state", "set_state", "unset_state", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) # self.store_data(**{self.KEY_OUTPUT_0: "off"}) def __rx__(self, client, userdata, message): if message.topic == self.topic + '/set': STATES = ["on", "off", "toggle"] # state = json.loads(message.payload).get('state').lower() if state in STATES: if state == STATES[0]: self.store_data(**{self.KEY_OUTPUT_0: 'on'}) elif state == STATES[1]: self.store_data(**{self.KEY_OUTPUT_0: 'off'}) else: self.store_data(**{self.KEY_OUTPUT_0: "off" if self.data.get(self.KEY_OUTPUT_0) == "on" else "on"}) def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.store_data(**{self.KEY_OUTPUT_0: 'on'}) elif command == self.COMMANDS[2]: self.store_data(**{self.KEY_OUTPUT_0: 'off'}) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: icon = u'\u2b24' if value == "on" else u'\u25ef' channel = "(%s)" % self.names.get(key, key) devicename = " - ".join(self.topic.split('/')[1:]) print(COLOR_POWERPLUG + 10 * ' ' + icon + 9 * ' ' + devicename + ' ' + channel + colored.attr("reset")) class silvercrest_motion_sensor(base): KEY_OCCUPANCY = "occupancy" COMMANDS = ['motion'] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.data[self.KEY_OCCUPANCY] = False self.add_callback(self.KEY_OCCUPANCY, self.print_formatted, None) def __rx__(self, client, userdata, message): pass def command(self, command): try: command, value = command.split(' ') except ValueError: value = None else: value = json.loads(value) if command == self.COMMANDS[0]: self.store_data(**{self.KEY_OCCUPANCY: True}) time.sleep(value or 10) self.store_data(**{self.KEY_OCCUPANCY: False}) def print_formatted(self, device, key, value): if value is not None: if value: print(COLOR_MOTION_SENSOR + 10 * ' ' + u'\u2b24' + 9 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) else: print(COLOR_MOTION_SENSOR + 10 * ' ' + u'\u25ef' + 9 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) class tradfri_light(base): KEY_STATE = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" KEY_BRIGHTNESS_MOVE = "brightness_move" # STATE_COMMANDS = ("get_state", "toggle_state", ) BRIGHTNESS_COMMANDS = ("get_brightness", "set_brightness",) COLOR_TEMP_COMMANDS = ("get_color_temp", "set_color_temp",) def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, send_on_power_on=True): super().__init__(mqtt_client, topic) self.send_on_power_on = send_on_power_on self.add_callback(self.KEY_STATE, self.print_formatted, None) self.add_callback(self.KEY_BRIGHTNESS, self.print_formatted, None) self.add_callback(self.KEY_COLOR_TEMP, self.print_formatted, None) # self.commands = [] if enable_state: self.commands.extend(self.STATE_COMMANDS) if enable_brightness: self.commands.extend(self.BRIGHTNESS_COMMANDS) if enable_color_temp: self.commands.extend(self.COLOR_TEMP_COMMANDS) self.__init_data__(enable_state, enable_brightness, enable_color_temp) def __init_data__(self, enable_state, enable_brightness, enable_color_temp): data = {} if enable_state: data[self.KEY_STATE] = 'off' self.commands.extend(self.STATE_COMMANDS) if enable_brightness: data[self.KEY_BRIGHTNESS] = 128 self.brightnes_move = (0, time.time()) self.commands.extend(self.BRIGHTNESS_COMMANDS) if enable_color_temp: data[self.KEY_COLOR_TEMP] = 352 self.commands.extend(self.COLOR_TEMP_COMMANDS) self.store_data(**data) def __rx__(self, client, userdata, message): data = json.loads(message.payload) if self.data.get(self.KEY_STATE) == 'on' or data.get(self.KEY_STATE) in ['on', 'toggle']: if message.topic.startswith(self.topic) and message.topic.endswith('/set'): for targetkey in data: value = data[targetkey] if targetkey in self.data.keys(): if targetkey == self.KEY_STATE and value == "toggle": value = "on" if self.data.get(self.KEY_STATE) == "off" else "off" self.store_data(**{targetkey: value}) else: if targetkey == self.KEY_BRIGHTNESS_MOVE: new_value = self.data.get(self.KEY_BRIGHTNESS) + (time.time() - self.brightnes_move[1]) * self.brightnes_move[0] if new_value < 0: new_value = 0 if new_value > 255: new_value = 255 self.store_data(**{self.KEY_BRIGHTNESS: int(new_value)}) self.brightnes_move = (value, time.time()) else: print("%s: UNKNOWN KEY %s" % (message.topic, targetkey)) elif message.topic == self.topic + '/get': self.__tx__(None) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.capabilities(): if command == self.STATE_COMMANDS[0]: self.print_formatted(self, self.KEY_STATE, self.data.get(self.KEY_STATE)) elif command == self.STATE_COMMANDS[1]: self.store_data(**{self.KEY_STATE: 'off' if self.data.get(self.KEY_STATE) == 'on' else 'on'}) elif command == self.BRIGHTNESS_COMMANDS[0]: self.print_formatted(self, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) elif command == self.BRIGHTNESS_COMMANDS[1]: self.store_data(**{self.KEY_BRIGHTNESS: command_int_value(value)}) elif command == self.COLOR_TEMP_COMMANDS[0]: self.print_formatted(self, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif command == self.COLOR_TEMP_COMMANDS[1]: self.store_data(**{self.KEY_COLOR_TEMP: command_int_value(value)}) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def power_off(self, device, key, value): self.data[self.KEY_STATE] = 'off' self.print_formatted(self, self.KEY_STATE, 'off') def power_on(self, device, key, value): if self.send_on_power_on: self.store_data(**{self.KEY_STATE: 'on'}) else: self.data[self.KEY_STATE] = 'on' self.print_formatted(self, self.KEY_STATE, 'on') def print_formatted(self, device, key, value): if value is not None: color = COLOR_LIGHT_ACTIVE if key == self.KEY_STATE: if value == "on": print(color + 10 * ' ' + u'\u2b24' + 9 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) else: print(color + 10 * ' ' + u'\u25ef' + 9 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) self.print_formatted(device, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) self.print_formatted(device, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: if self.data.get(self.KEY_STATE) != "on": color = COLOR_LIGHT_PASSIVE if key == self.KEY_BRIGHTNESS: value = round(value * 100 / 256, 0) else: value = round((value - 250) * 100 / 204, 0) sys.stdout.write(color) sys.stdout.write('B' if key == gui_light.KEY_BRIGHTNESS else 'C') sys.stdout.write(percent_bar(value)) sys.stdout.write("%3d%%" % value + 5 * " ") print(" - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) class gui_light(tradfri_light): AUTOSEND = False # KEY_ENABLE = "enable" def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False): super().__init__(mqtt_client, topic, enable_state, enable_brightness, enable_color_temp) self.add_callback(self.KEY_ENABLE, self.print_formatted, None) def __init_data__(self, enable_state, enable_brightness, enable_color_temp): data = {} data[self.KEY_ENABLE] = False if enable_state: data[self.KEY_STATE] = False if enable_brightness: data[self.KEY_BRIGHTNESS] = 50 if enable_color_temp: data[self.KEY_COLOR_TEMP] = 5 self.store_data(**data) def __rx__(self, client, userdata, message): value = payload_filter(message.payload) if message.topic.startswith(self.topic) and message.topic.endswith('/set'): targetkey = message.topic.split('/')[-2] if targetkey in self.data.keys(): self.store_data(**{targetkey: value}) else: print("Unknown key %s in %s::%s" % (targetkey, message.topic, self.__class__.__name__)) elif message.topic == self.topic + '/get': self.__tx__(None) def send(self, key, data): if data is not None: topic = self.topic + '/' + key self.mqtt_client.send(topic, json.dumps(data)) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.capabilities(): if command == self.STATE_COMMANDS[0]: self.print_formatted(self, self.KEY_STATE, self.data.get(self.KEY_STATE)) elif command == self.STATE_COMMANDS[1]: self.send(self.KEY_STATE, not self.data.get(self.KEY_STATE)) elif command == self.BRIGHTNESS_COMMANDS[0]: self.print_formatted(self, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) elif command == self.BRIGHTNESS_COMMANDS[1]: self.send(self.KEY_BRIGHTNESS, command_int_value(value)) elif command == self.COLOR_TEMP_COMMANDS[0]: self.print_formatted(self, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif command == self.COLOR_TEMP_COMMANDS[1]: self.send(self.KEY_COLOR_TEMP, command_int_value(value)) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: color = COLOR_GUI_ACTIVE if key == self.KEY_STATE: if value == True: print(color + 10 * ' ' + u'\u25a0' + 9 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) else: print(color + 10 * ' ' + u'\u25a1' + 9*' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) elif key == self.KEY_ENABLE: self.print_formatted(device, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) self.print_formatted(device, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: if not self.data.get(self.KEY_ENABLE, False): color = COLOR_GUI_PASSIVE value = round(value * 10 if key == self.KEY_COLOR_TEMP else value, 0) sys.stdout.write(color) sys.stdout.write('B' if key == self.KEY_BRIGHTNESS else 'C') sys.stdout.write(percent_bar(value)) sys.stdout.write("%3d%%" % value + 5 * " ") print(" - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) class tradfri_button(base): KEY_ACTION = "action" # ACTION_TOGGLE = "toggle" ACTION_BRIGHTNESS_UP = "brightness_up_click" ACTION_BRIGHTNESS_DOWN = "brightness_down_click" ACTION_RIGHT = "arrow_right_click" ACTION_LEFT = "arrow_left_click" ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" ACTION_RIGHT_LONG = "arrow_right_hold" ACTION_LEFT_LONG = "arrow_left_hold" # COMMANDS = [ACTION_TOGGLE, ACTION_LEFT, ACTION_RIGHT, ACTION_BRIGHTNESS_UP, ACTION_BRIGHTNESS_DOWN, ACTION_LEFT_LONG, ACTION_RIGHT_LONG, ACTION_BRIGHTNESS_UP_LONG, ACTION_BRIGHTNESS_DOWN_LONG] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) def __rx__(self, client, userdata, message): pass def command(self, command): try: command, value = command.split(' ') except ValueError: value = None else: value = json.loads(value) if command in self.capabilities(): action = self.COMMANDS[self.COMMANDS.index(command)] if self.COMMANDS.index(command) <= 4: self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) elif self.COMMANDS.index(command) <= 8: self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) time.sleep(value or 0.5) action = '_'.join(action.split('_')[:-1] + ['release']) self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) class gui_led_array(base): AUTOSEND = False # KEY_LED_0 = "led0" KEY_LED_1 = "led1" KEY_LED_2 = "led2" KEY_LED_3 = "led3" KEY_LED_4 = "led4" KEY_LED_5 = "led5" KEY_LED_6 = "led6" KEY_LED_7 = "led7" KEY_LED_8 = "led8" KEY_LED_9 = "led9" def __init__(self, mqtt_client, topic, ): super().__init__(mqtt_client, topic) for i in range(0, 10): key = getattr(self, "KEY_LED_%d" % i) self.data[key] = False self.add_callback(key, self.print_formatted, None) def __rx__(self, client, userdata, message): value = payload_filter(message.payload) if message.topic.startswith(self.topic) and message.topic.endswith('/set'): targetkey = message.topic.split('/')[-2] if targetkey in self.data.keys(): self.store_data(**{targetkey: value}) else: print("Unknown key %s in %s" % (targetkey, self.__class__.__name__)) def print_formatted(self, device, key, value): led = green_led() if value else grey_led() channel = '(%s)' % self.names.get(key, key) devicename = ' - '.join(self.topic.split('/')[1:]) print(10 * ' ' + led + 9 * ' ' + COLOR_GUI_ACTIVE + devicename + ' ' + channel + colored.attr("reset")) class remote(base): def __rx__(self, client, userdata, message): if message.topic == self.topic + "/VOLUP": if payload_filter(message.payload): icon = u'\u1403' else: icon = u'\u25a1' elif message.topic == self.topic + "/VOLDOWN": if payload_filter(message.payload): icon = u'\u1401' else: icon = u'\u25a1' else: return devicename = ' - '.join(self.topic.split('/')[1:-1]) print(COLOR_REMOTE + 10 * ' ' + icon + 6 * ' ' + devicename + colored.attr("reset")) class gui_timer(base): AUTOSEND = False def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.maxvalue = None self.last_printed = None def __rx__(self, client, userdata, message): payload = payload_filter(message.payload) if message.topic.startswith(self.topic) and message.topic.endswith('/feedback/set'): if isinstance(payload, (int, float, complex)) and not isinstance(payload, bool): if self.maxvalue is None: self.maxvalue = payload perc = payload / self.maxvalue * 100 if self.last_printed is None or abs(self.last_printed - perc) >= 4.8: sys.stdout.write(COLOR_GUI_ACTIVE + 't' + percent_bar(perc)) print('%3d%%' % perc + 2 * ' ' + ' - '.join(self.topic.split('/')[1:]) + ' (%.1f)' % payload + colored.attr('reset')) self.last_printed = perc else: self.maxvalue = None self.last_printed = None sys.stdout.write(COLOR_GUI_ACTIVE + 't' + percent_bar(0)) print('%3d%%' % 0 + 2 * ' ' + ' - '.join(self.topic.split('/')[1:]) + colored.attr('reset')) else: print("Unknown Message") class brennenstuhl_radiator_valve(base): TEMP_RANGE = [10, 30] # KEY_TEMPERATURE_SETPOINT = "current_heating_setpoint" KEY_TEMPERATURE = "local_temperature" # COMMANDS = [ "get_temperature_setpoint", "set_temperature_setpoint", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.store_data(**{ self.KEY_TEMPERATURE_SETPOINT: 20, self.KEY_TEMPERATURE: 20.7, }) self.add_callback(self.KEY_TEMPERATURE_SETPOINT, self.print_formatted, None) def __rx__(self, client, userdata, message): if message.topic.startswith(self.topic) and message.topic.endswith("/set"): payload = payload_filter(message.payload) self.store_data(**payload) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_TEMPERATURE_SETPOINT, self.data.get(self.KEY_TEMPERATURE_SETPOINT)) elif command == self.COMMANDS[1]: self.store_data(**{self.KEY_TEMPERATURE_SETPOINT: command_float_value(value)}) def print_formatted(self, device, key, value): devicename = ' - '.join(self.topic.split('/')[1:]) if key == self.KEY_TEMPERATURE_SETPOINT: perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) perc = 100 if perc > 100 else perc perc = 0 if perc < 0 else perc sys.stdout.write(COLOR_RADIATOR_VALVE + '\u03d1' + percent_bar(perc)) sys.stdout.write("%4.1f°C" % value + 3 * " ") print(devicename + colored.attr("reset")) class gui_radiator_valve(base): AUTOSEND = False # TEMP_RANGE = [10, 30] # KEY_BOOST_LED = "boost_led" KEY_TEMPERATURE_SETPOINT = "temperature_setpoint" # COMMANDS = [ "get_temperature_setpoint", "set_temperature_setpoint", "get_boost_state", "trigger_boost", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.add_callback(self.KEY_TEMPERATURE_SETPOINT, self.print_formatted, None) self.add_callback(self.KEY_BOOST_LED, self.print_formatted, None) def __rx__(self, client, userdata, message): if message.topic.startswith(self.topic) and message.topic.endswith("/set"): payload = payload_filter(message.payload) if type(payload) == bool: self.store_data(**{self.KEY_BOOST_LED: payload}) else: self.store_data(**{self.KEY_TEMPERATURE_SETPOINT: payload}) else: print(message.topic) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_TEMPERATURE_SETPOINT, self.data.get(self.KEY_TEMPERATURE_SETPOINT)) elif command == self.COMMANDS[1]: ################################### TEMPORARY!!! ################################### self.mqtt_client.send(self.topic + "/feedback/set", command_float_value(value)) ################################### TEMPORARY!!! ################################### elif command == self.COMMANDS[2]: self.print_formatted(self, self.KEY_BOOST_LED, self.data.get(self.KEY_BOOST_LED)) elif command == self.COMMANDS[3]: ################################### TEMPORARY!!! ################################### self.mqtt_client.send(self.topic + "/state", json.dumps(True)) ################################### TEMPORARY!!! ################################### def print_formatted(self, device, key, value): devicename = ' - '.join(self.topic.split('/')[1:]) if key == self.KEY_BOOST_LED: led = green_led() if value else grey_led() print(10 * ' ' + led + 9 * ' ' + COLOR_GUI_ACTIVE + devicename + " (Boost)" + colored.attr("reset")) elif key == self.KEY_TEMPERATURE_SETPOINT: perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) perc = 100 if perc > 100 else perc perc = 0 if perc < 0 else perc sys.stdout.write(COLOR_GUI_ACTIVE + '\u03d1' + percent_bar(perc)) sys.stdout.write("%4.1f°C" % value + 3 * " ") print(devicename + colored.attr("reset"))