import colored import json import sys import time # TODO: implement my powerplug COLOR_GUI_ACTIVE = colored.fg("light_blue") COLOR_GUI_PASSIVE = COLOR_GUI_ACTIVE + colored.attr("dim") COLOR_SHELLY_1 = colored.fg("light_magenta") COLOR_SHELLY_2 = colored.fg("light_cyan") COLOR_LIGHT_ACTIVE = colored.fg("yellow") COLOR_LIGHT_PASSIVE = COLOR_LIGHT_ACTIVE + colored.attr("dim") COLOR_MOTION_SENSOR = colored.fg("red") def payload_filter(payload): try: return json.loads(payload) except json.decoder.JSONDecodeError: return payload.decode("utf-8") def percent_print(value): for i in range(0, 10): if (value - 5) > 10*i: sys.stdout.write(u"\u25ac") else: sys.stdout.write(u"\u25ad") def command_int_value(value): try: return int(value) except TypeError: print("You need to give a integer parameter not '%s'" % str(value)) class base(object): AUTOSEND = True COMMANDS = [] def __init__(self, mqtt_client, topic): self.mqtt_client = mqtt_client self.topic = topic # self.data = {} self.callbacks = {} self.commands = self.COMMANDS[:] # self.mqtt_client.add_callback(self.topic, self.__rx__) self.mqtt_client.add_callback(self.topic + '/#', self.__rx__) def add_callback(self, key, callback, value): if self.callbacks.get(key) is None: self.callbacks[key] = [] self.callbacks[key].append((callback, value)) def capabilities(self): return self.commands def store_data(self, *args, **kwargs): keys_changed = [] for key in kwargs: if kwargs[key] is not None and kwargs[key] != self.data.get(key): keys_changed.append(key) self.data[key] = kwargs[key] for callback, value in self.callbacks.get(key, [(None, None)]): if callback is not None and (value is None or value == kwargs[key]): callback(self, key, kwargs[key]) if self.AUTOSEND and len(keys_changed) > 0: self.__tx__(keys_changed) def __tx__(self, keys_changed): self.mqtt_client.send(self.topic, json.dumps(self.data)) def __rx__(self, client, userdata, message): print("%s: __rx__ not handled!" % self.__class__.__name__) class shelly(base): KEY_OUTPUT_0 = "relay/0" KEY_OUTPUT_1 = "relay/1" # COMMANDS = [ "get_relay_0", "set_relay_0", "unset_relay_0", "get_relay_1", "set_relay_1", "unset_relay_1", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # self.store_data(**{self.KEY_OUTPUT_0: "off", self.KEY_OUTPUT_1: "off"}) self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) def __rx__(self, client, userdata, message): value = payload_filter(message.payload) if message.topic == self.topic + "/relay/0/command": if value in ['on', 'off']: self.store_data(**{self.KEY_OUTPUT_0: value}) elif value == 'toggle': self.store_data(**{self.KEY_OUTPUT_0: 'on' if self.data.get(self.KEY_OUTPUT_0) == 'off' else 'off'}) elif message.topic == self.topic + "/relay/1/command": if value in ['on', 'off']: self.store_data(**{self.KEY_OUTPUT_1: value}) elif value == 'toggle': self.store_data(**{self.KEY_OUTPUT_1: 'on' if self.data.get(self.KEY_OUTPUT_1) == 'off' else 'off'}) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic + '/' + key, self.data.get(key)) def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.store_data(**{self.KEY_OUTPUT_0: "on"}) elif command == self.COMMANDS[2]: self.store_data(**{self.KEY_OUTPUT_0: "off"}) elif command == self.COMMANDS[3]: self.print_formatted(self, self.KEY_OUTPUT_1, self.data.get(self.KEY_OUTPUT_1)) elif command == self.COMMANDS[4]: self.store_data(**{self.KEY_OUTPUT_1: "on"}) elif command == self.COMMANDS[5]: self.store_data(**{self.KEY_OUTPUT_1: "off"}) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: if key == shelly.KEY_OUTPUT_0: color = COLOR_SHELLY_1 else: color = COLOR_SHELLY_2 if value == "on": print(color + 10 * ' ' + u'\u2b24' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) else: print(color + 10 * ' ' + u'\u25ef' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) class my_powerplug(base): COMMANDS = [ "get_state", "set_state", "unset_state", ] def __init__(self, mqtt_client, topic, names=[]): super().__init__(mqtt_client, topic) # self.names = [] # for i in range(0, 4): self.data[str(i)] = False try: self.names.append(names[i]) except IndexError: self.names.append("Channel %d" % (i + 1)) self.add_callback(str(i), self.print_formatted, None) def __rx__(self, client, userdata, message): if message.topic.startswith(self.topic + '/output/') and message.topic.endswith('/set'): channel = int(message.topic.split('/')[-2]) - 1 payload = payload_filter(message.payload) if payload == "toggle": payload = not self.data.get(str(channel)) self.store_data(**{str(channel): payload}) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic + "/output/" + str(int(key) + 1), json.dumps(self.data.get(key))) def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.store_data(**{self.KEY_OUTPUT_0: 'on'}) elif command == self.COMMANDS[2]: self.store_data(**{self.KEY_OUTPUT_0: 'off'}) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: if value: print(COLOR_SHELLY_1 + 10 * ' ' + u'\u2b24' + 6 * ' ' + " - ".join(self.topic.split('/') [1:]) + " (%s)" % self.names[int(key)] + colored.attr("reset")) else: print(COLOR_SHELLY_1 + 10 * ' ' + u'\u25ef' + 6 * ' ' + " - ".join(self.topic.split('/') [1:]) + " (%s)" % self.names[int(key)] + colored.attr("reset")) class silvercrest_powerplug(base): KEY_OUTPUT_0 = "state" # COMMANDS = [ "get_state", "set_state", "unset_state", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) # self.store_data(**{self.KEY_OUTPUT_0: "off"}) def __rx__(self, client, userdata, message): if message.topic == self.topic + '/set': STATES = ["on", "off", "toggle"] # state = json.loads(message.payload).get('state').lower() if state in STATES: if state == STATES[0]: self.store_data(**{self.KEY_OUTPUT_0: 'on'}) elif state == STATES[1]: self.store_data(**{self.KEY_OUTPUT_0: 'off'}) else: self.store_data(**{self.KEY_OUTPUT_0: "off" if self.data.get(self.KEY_OUTPUT_0) == "on" else "on"}) def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.store_data(**{self.KEY_OUTPUT_0: 'on'}) elif command == self.COMMANDS[2]: self.store_data(**{self.KEY_OUTPUT_0: 'off'}) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: if value == "on": print(COLOR_SHELLY_1 + 10 * ' ' + u'\u2b24' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) else: print(COLOR_SHELLY_1 + 10 * ' ' + u'\u25ef' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) class silvercrest_motion_sensor(base): KEY_OCCUPANCY = "occupancy" COMMANDS = ['motion'] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.data[self.KEY_OCCUPANCY] = False self.add_callback(self.KEY_OCCUPANCY, self.print_formatted, None) def __rx__(self, client, userdata, message): pass def command(self, command): try: command, value = command.split(' ') except ValueError: value = None else: value = json.loads(value) if command == self.COMMANDS[0]: self.store_data(**{self.KEY_OCCUPANCY: True}) time.sleep(value or 10) self.store_data(**{self.KEY_OCCUPANCY: False}) def print_formatted(self, device, key, value): if value is not None: if value: print(COLOR_MOTION_SENSOR + 10 * ' ' + u'\u2b24' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) else: print(COLOR_MOTION_SENSOR + 10 * ' ' + u'\u25ef' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) class tradfri_light(base): KEY_STATE = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" KEY_BRIGHTNESS_MOVE = "brightness_move" # STATE_COMMANDS = ("get_state", "change_state", ) BRIGHTNESS_COMMANDS = ("get_brightness", "set_brightness",) COLOR_TEMP_COMMANDS = ("get_color_temp", "set_color_temp",) def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, send_on_power_on=True): super().__init__(mqtt_client, topic) self.send_on_power_on = send_on_power_on self.add_callback(self.KEY_STATE, self.print_formatted, None) self.add_callback(self.KEY_BRIGHTNESS, self.print_formatted, None) self.add_callback(self.KEY_COLOR_TEMP, self.print_formatted, None) # self.commands = [] if enable_state: self.commands.extend(self.STATE_COMMANDS) if enable_brightness: self.commands.extend(self.BRIGHTNESS_COMMANDS) if enable_color_temp: self.commands.extend(self.COLOR_TEMP_COMMANDS) self.__init_data__(enable_state, enable_brightness, enable_color_temp) def __init_data__(self, enable_state, enable_brightness, enable_color_temp): data = {} if enable_state: data[self.KEY_STATE] = 'off' self.commands.extend(self.STATE_COMMANDS) if enable_brightness: data[self.KEY_BRIGHTNESS] = 128 self.brightnes_move = (0, time.time()) self.commands.extend(self.BRIGHTNESS_COMMANDS) if enable_color_temp: data[self.KEY_COLOR_TEMP] = 352 self.commands.extend(self.COLOR_TEMP_COMMANDS) self.store_data(**data) def __rx__(self, client, userdata, message): data = json.loads(message.payload) if self.data.get(self.KEY_STATE) == 'on' or data.get(self.KEY_STATE) in ['on', 'toggle']: if message.topic.startswith(self.topic) and message.topic.endswith('/set'): for targetkey in data: value = data[targetkey] if targetkey in self.data.keys(): if targetkey == self.KEY_STATE and value == "toggle": value = "on" if self.data.get(self.KEY_STATE) == "off" else "off" self.store_data(**{targetkey: value}) else: if targetkey == self.KEY_BRIGHTNESS_MOVE: new_value = self.data.get(self.KEY_BRIGHTNESS) + (time.time() - self.brightnes_move[1]) * self.brightnes_move[0] if new_value < 0: new_value = 0 if new_value > 255: new_value = 255 self.store_data(**{self.KEY_BRIGHTNESS: int(new_value)}) self.brightnes_move = (value, time.time()) else: print("%s: UNKNOWN KEY %s" % (message.topic, targetkey)) elif message.topic == self.topic + '/get': self.__tx__(None) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.capabilities(): if command == self.STATE_COMMANDS[0]: self.print_formatted(self, self.KEY_STATE, self.data.get(self.KEY_STATE)) elif command == self.STATE_COMMANDS[1]: self.store_data(**{self.KEY_STATE: 'off' if self.data.get(self.KEY_STATE) == 'on' else 'on'}) elif command == self.BRIGHTNESS_COMMANDS[0]: self.print_formatted(self, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) elif command == self.BRIGHTNESS_COMMANDS[1]: self.store_data(**{self.KEY_BRIGHTNESS: command_int_value(value)}) elif command == self.COLOR_TEMP_COMMANDS[0]: self.print_formatted(self, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif command == self.COLOR_TEMP_COMMANDS[1]: self.store_data(**{self.KEY_COLOR_TEMP: command_int_value(value)}) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def power_off(self, device, key, value): self.data[self.KEY_STATE] = 'off' self.print_formatted(self, self.KEY_STATE, 'off') def power_on(self, device, key, value): if self.send_on_power_on: self.store_data(**{self.KEY_STATE: 'on'}) else: self.data[self.KEY_STATE] = 'on' self.print_formatted(self, self.KEY_STATE, 'on') def print_formatted(self, device, key, value): if value is not None: color = COLOR_LIGHT_ACTIVE if key == self.KEY_STATE: if value == "on": print(color + 10 * ' ' + u'\u2b24' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) else: print(color + 10 * ' ' + u'\u25ef' + 6 * ' ' + " - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) self.print_formatted(device, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) self.print_formatted(device, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: if self.data.get(self.KEY_STATE) != "on": color = COLOR_LIGHT_PASSIVE if key == self.KEY_BRIGHTNESS: value = round(value * 100 / 256, 0) else: value = round((value - 250) * 100 / 204, 0) sys.stdout.write(color) sys.stdout.write('B' if key == gui_light.KEY_BRIGHTNESS else 'C') percent_print(value) sys.stdout.write("%3d%% " % value) print(" - ".join(self.topic.split('/')[1:]) + colored.attr("reset")) class gui_light(tradfri_light): AUTOSEND = False # KEY_ENABLE = "enable" def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False): super().__init__(mqtt_client, topic, enable_state, enable_brightness, enable_color_temp) self.add_callback(self.KEY_ENABLE, self.print_formatted, None) def __init_data__(self, enable_state, enable_brightness, enable_color_temp): data = {} data[self.KEY_ENABLE] = False if enable_state: data[self.KEY_STATE] = False if enable_brightness: data[self.KEY_BRIGHTNESS] = 50 if enable_color_temp: data[self.KEY_COLOR_TEMP] = 5 self.store_data(**data) def __rx__(self, client, userdata, message): value = payload_filter(message.payload) if message.topic.startswith(self.topic) and message.topic.endswith('/set'): targetkey = message.topic.split('/')[-2] if targetkey in self.data.keys(): self.store_data(**{targetkey: value}) else: print("Unknown key %s in %s" % (targetkey, self.__class__.__name__)) elif message.topic == self.topic + '/get': self.__tx__(None) def send(self, key, data): if data is not None: topic = self.topic + '/' + key self.mqtt_client.send(topic, json.dumps(data)) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.capabilities(): if command == self.STATE_COMMANDS[0]: self.print_formatted(self, self.KEY_STATE, self.data.get(self.KEY_STATE)) elif command == self.STATE_COMMANDS[1]: self.send(self.KEY_STATE, not self.data.get(self.KEY_STATE)) elif command == self.BRIGHTNESS_COMMANDS[0]: self.print_formatted(self, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) elif command == self.BRIGHTNESS_COMMANDS[1]: self.send(self.KEY_BRIGHTNESS, command_int_value(value)) elif command == self.COLOR_TEMP_COMMANDS[0]: self.print_formatted(self, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif command == self.COLOR_TEMP_COMMANDS[1]: self.send(self.KEY_COLOR_TEMP, command_int_value(value)) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: color = COLOR_GUI_ACTIVE if key == self.KEY_STATE: if value == True: print(color + 10 * ' ' + u'\u25a0' + 6 * ' ' + " - ".join(self.topic.split('/')[1:-1]) + colored.attr("reset")) else: print(color + 10 * ' ' + u'\u25a1' + 6*' ' + " - ".join(self.topic.split('/')[1:-1]) + colored.attr("reset")) elif key == self.KEY_ENABLE: self.print_formatted(device, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) self.print_formatted(device, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: if not self.data.get(self.KEY_ENABLE, False): color = COLOR_GUI_PASSIVE value = round(value * 10 if key == self.KEY_COLOR_TEMP else value, 0) sys.stdout.write(color) sys.stdout.write('B' if key == self.KEY_BRIGHTNESS else 'C') percent_print(value) sys.stdout.write("%3d%% " % value) print(" - ".join(self.topic.split('/')[1:-1]) + colored.attr("reset")) class tradfri_button(base): KEY_ACTION = "action" # ACTION_TOGGLE = "toggle" ACTION_BRIGHTNESS_UP = "brightness_up_click" ACTION_BRIGHTNESS_DOWN = "brightness_down_click" ACTION_RIGHT = "arrow_right_click" ACTION_LEFT = "arrow_left_click" ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" ACTION_RIGHT_LONG = "arrow_right_hold" ACTION_LEFT_LONG = "arrow_left_hold" # COMMANDS = [ACTION_TOGGLE, ACTION_LEFT, ACTION_RIGHT, ACTION_BRIGHTNESS_UP, ACTION_BRIGHTNESS_DOWN, ACTION_LEFT_LONG, ACTION_RIGHT_LONG, ACTION_BRIGHTNESS_UP_LONG, ACTION_BRIGHTNESS_DOWN_LONG] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) def __rx__(self, client, userdata, message): pass def command(self, command): try: command, value = command.split(' ') except ValueError: value = None else: value = json.loads(value) if command in self.capabilities(): action = self.COMMANDS[self.COMMANDS.index(command)] if self.COMMANDS.index(command) <= 4: self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) elif self.COMMANDS.index(command) <= 8: self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) time.sleep(value or 0.5) action = '_'.join(action.split('_')[:-1] + ['release']) self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) class gui_led_array(base): AUTOSEND = False # KEY_LED_0 = "led0" KEY_LED_1 = "led1" KEY_LED_2 = "led2" KEY_LED_3 = "led3" KEY_LED_4 = "led4" KEY_LED_5 = "led5" KEY_LED_6 = "led6" KEY_LED_7 = "led7" KEY_LED_8 = "led8" KEY_LED_9 = "led9" def __init__(self, mqtt_client, topic, names=[]): super().__init__(mqtt_client, topic) self.names = {} for i in range(0, 10): key = getattr(self, "KEY_LED_%d" % i) self.data[key] = False try: self.names[key] = names[i] except IndexError: self.names[key] = key self.add_callback(key, self.print_formatted, None) def __rx__(self, client, userdata, message): value = payload_filter(message.payload) if message.topic.startswith(self.topic) and message.topic.endswith('/set'): targetkey = message.topic.split('/')[-2] if targetkey in self.data.keys(): self.store_data(**{targetkey: value}) else: print("Unknown key %s in %s" % (targetkey, self.__class__.__name__)) def print_formatted(self, device, key, value): if value: color = colored.fg('green') else: color = "" print(color + 10 * ' ' + u"\u2b24" + 6 * ' ' + COLOR_GUI_ACTIVE + ' - '.join(self.topic.split('/')[:-1]) + ' (%s)' % self.names[key] + colored.attr("reset"))