from base import mqtt_base import colored import copy import json import task import time COLOR_GUI_ACTIVE = colored.fg("light_blue") COLOR_GUI_PASSIVE = COLOR_GUI_ACTIVE + colored.attr("dim") COLOR_SHELLY = colored.fg("light_magenta") COLOR_POWERPLUG = colored.fg("light_cyan") COLOR_LIGHT_ACTIVE = colored.fg("yellow") COLOR_LIGHT_PASSIVE = COLOR_LIGHT_ACTIVE + colored.attr("dim") COLOR_MOTION_SENSOR = colored.fg("dark_orange_3b") COLOR_HEATING_VALVE = colored.fg("red") COLOR_REMOTE = colored.fg("green") class base(mqtt_base): AUTOSEND = True COMMANDS = [] BOOL_KEYS = [] def __init__(self, mqtt_client, topic, default_values=None): super().__init__(mqtt_client, topic, default_values) self.names = {} self.commands = self.COMMANDS[:] # self.mqtt_client.add_callback(self.topic, self.__rx__) self.mqtt_client.add_callback(self.topic + '/#', self.__rx__) def add_channel_name(self, key, name): self.names[key] = name def capabilities(self): return self.commands def __payload_filter__(self, payload): try: return json.loads(payload) except json.decoder.JSONDecodeError: return payload.decode("utf-8") def __rx__(self, client, userdata, message): print("%s: __rx__ not handled!" % self.__class__.__name__) def __tx__(self, keys_changed): print("%s: __tx__ not handled!" % self.__class__.__name__) def __send__(self, client, key, data): self.__tx__([key]) def __ext_to_int__(self, key, data): if key in self.BOOL_KEYS: if data == 'toggle': return not self.get(key) return data == "on" return data def __int_to_ext__(self, key, value): if key in self.BOOL_KEYS: return "on" if value is True else "off" return value def __devicename__(self): return " - ".join(self.topic.split('/')[1:]) def __percent_bar__(self, percent_value): rv = "" for i in range(0, 10): rv += u"\u25ac" if (percent_value - 5) > 10*i else u"\u25ad" return rv def __command_int_value__(self, value): try: return int(value) except TypeError: print("You need to give a integer parameter not '%s'" % str(value)) def __command_float_value__(self, value): try: return float(value) except TypeError: print("You need to give a numeric parameter not '%s'" % str(value)) def print_formatted_light(self, color, state, description, led=False): if led is True: if state is True: icon = colored.fg('green') + "\u2b24" + color else: icon = colored.fg('light_gray') + "\u2b24" + color else: icon = u'\u2b24' if state is True else u'\u25ef' print(color + 10 * ' ' + icon + 9 * ' ' + self.__devicename__(), description + colored.attr("reset")) def print_formatted_videv(self, color, state, description): icon = u'\u25a0' if state is True else u'\u25a1' print(color + 10 * ' ' + icon + 9 * ' ' + self.__devicename__(), description + colored.attr("reset")) def print_formatted_percent(self, color, prefix, perc_value, value_str, description): if len(prefix) > 1 or len(value_str) > 7: raise ValueError("Length of prefix (%d) > 1 or length of value_str (%d) > 7" % (len(prefix), len(value_str))) print(color + prefix + self.__percent_bar__(perc_value), value_str + (8 - len(value_str)) * ' ' + self.__devicename__(), description + colored.attr("reset")) class base_videv(base): def set(self, key, data, block_callback=[]): if key in self.keys(): return super().set(key, data, block_callback) else: self.__send__(self, key, data) class shelly(base): KEY_OUTPUT_0 = "relay/0" KEY_OUTPUT_1 = "relay/1" KEY_INPUT_0 = "input/0" KEY_INPUT_1 = "input/1" KEY_LONGPUSH_0 = "longpush/0" KEY_LONGPUSH_1 = "longpush/1" # BOOL_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, ] # INPUT_FUNC_OUT1_FOLLOW = "out1_follow" INPUT_FUNC_OUT1_TRIGGER = "out1_trigger" INPUT_FUNC_OUT2_FOLLOW = "out2_follow" INPUT_FUNC_OUT2_TRIGGER = "out2_trigger" # COMMANDS = [ "get_output_0", "toggle_output_0", "get_output_1", "toggle_output_1", "get_input_0", "toggle_input_0", "get_input_1", "toggle_input_1", "trigger_input_0_longpress", "trigger_input_1_longpress", ] def __init__(self, mqtt_client, topic, input_0_func=None, input_1_func=None, output_0_auto_off=None): super().__init__(mqtt_client, topic, default_values={self.KEY_OUTPUT_0: False, self.KEY_OUTPUT_1: False, self.KEY_INPUT_0: False, self.KEY_INPUT_1: False}) # self.__input_0_func = input_0_func self.__input_1_func = input_1_func self.__output_0_auto_off__ = output_0_auto_off # print ouput changes self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) self.add_callback(self.KEY_OUTPUT_1, None, self.print_formatted, True) # publish state changes self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) self.add_callback(self.KEY_OUTPUT_1, None, self.__send__, True) # if self.__output_0_auto_off__ is not None: self.__delayed_off__ = task.delayed(float(self.__output_0_auto_off__), self.__auto_off__, self.KEY_OUTPUT_0) self.add_callback(self.KEY_OUTPUT_0, True, self.__start_auto_off__) # self.add_callback(self.KEY_INPUT_0, self.__input_function__, None) self.add_callback(self.KEY_INPUT_1, self.__input_function__, None) # self.__tx__((self.KEY_OUTPUT_0, self.KEY_OUTPUT_1)) def __rx__(self, client, userdata, message): value = self.__payload_filter__(message.payload) if message.topic.startswith(self.topic) and message.topic.endswith("/command"): key = '/'.join(message.topic.split('/')[-3:-1]) self.set(key, self.__ext_to_int__(key, value)) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic + '/' + key, self.__int_to_ext__(key, self[key])) def __input_function__(self, device, key, data): if key == self.KEY_INPUT_0: func = self.__input_0_func elif key == self.KEY_INPUT_1: func = self.__input_1_func else: func = None if func == self.INPUT_FUNC_OUT1_FOLLOW: self.set(self.KEY_OUTPUT_0, data) elif func == self.INPUT_FUNC_OUT1_TRIGGER: self.set__toggle_data__(self.KEY_OUTPUT_0) elif func == self.INPUT_FUNC_OUT2_FOLLOW: self.__set_data__(self.KEY_OUTPUT_1, data) elif func == self.INPUT_FUNC_OUT2_TRIGGER: self.__toggle_data__(self.KEY_OUTPUT_1) def __start_auto_off__(self, device, key, data): # stop delayed task if needed if self.__output_0_auto_off__ is not None: if not self.__delayed_off__._stopped: self.__delayed_off__.stop() # start delayed task if self.__output_0_auto_off__ is not None: self.__delayed_off__.run() def __auto_off__(self, key): if key == self.KEY_OUTPUT_0: self.set(key, False) def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.set(self.KEY_OUTPUT_0, not self[self.KEY_OUTPUT_0]) elif command == self.COMMANDS[2]: self.print_formatted(self, self.KEY_OUTPUT_1, self.get(self.KEY_OUTPUT_1)) elif command == self.COMMANDS[3]: self.set(self.KEY_OUTPUT_1, not self[self.KEY_OUTPUT_1]) elif command == self.COMMANDS[4]: self.print_formatted(self, self.KEY_INPUT_0, self.get(self.KEY_INPUT_0)) elif command == self.COMMANDS[5]: self.set(self.KEY_INPUT_0, not self[self.KEY_INPUT_0]) elif command == self.COMMANDS[6]: self.print_formatted(self, self.KEY_INPUT_1, self.get(self.KEY_INPUT_1)) elif command == self.COMMANDS[7]: self.set(self.KEY_INPUT_1, not self[self.KEY_INPUT_1]) elif command == self.COMMANDS[8]: self.set(self.KEY_INPUT_0, not self[self.KEY_INPUT_0]) time.sleep(0.4) self.set(self.KEY_LONGPUSH_0, True) time.sleep(0.1) self.set(self.KEY_INPUT_0, not self[self.KEY_INPUT_0]) self.set(self.KEY_LONGPUSH_0, False) elif command == self.COMMANDS[9]: self.set(self.KEY_INPUT_1, not self[self.KEY_INPUT_1]) time.sleep(0.4) self.set(self.KEY_LONGPUSH_1, True) time.sleep(0.1) self.set(self.KEY_INPUT_1, not self[self.KEY_INPUT_1]) self.set(self.KEY_LONGPUSH_1, False) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: info = (" - %ds" % self.__output_0_auto_off__) if self.__output_0_auto_off__ is not None and value else "" channel = "(%s%s)" % (self.names.get(key, key), info) self.print_formatted_light(COLOR_SHELLY, value, channel) class my_powerplug(base): KEY_OUTPUT_0 = "state" # COMMANDS = [ "get_output", "toggle_output", ] def __init__(self, mqtt_client, topic, channel): super().__init__(mqtt_client, topic + '/' + "output/%d" % (channel + 1), default_values={self.KEY_OUTPUT_0: False}) # self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) # self.__tx__((self.KEY_OUTPUT_0, )) def __rx__(self, client, userdata, message): if message.topic == self.topic + '/set': payload = self.__payload_filter__(message.payload) if payload == "toggle": payload = not self.get(self.KEY_OUTPUT_0) self.set(self.KEY_OUTPUT_0, payload) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic, json.dumps(self.get(key))) def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.set(self.KEY_OUTPUT_0, not self.get(self.KEY_OUTPUT_0)) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: self.print_formatted_light(COLOR_POWERPLUG, value, "(%s)" % self.names.get(key, "State")) class silvercrest_powerplug(base): KEY_OUTPUT_0 = "state" # BOOL_KEYS = [KEY_OUTPUT_0, ] # COMMANDS = [ "get_output", "toggle_output", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic, default_values={self.KEY_OUTPUT_0: False}) # self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) # self.__tx__((self.KEY_OUTPUT_0, )) def __rx__(self, client, userdata, message): if message.topic == self.topic + '/set': state = json.loads(message.payload).get('state') self.set(self.KEY_OUTPUT_0, self.__ext_to_int__(self.KEY_OUTPUT_0, state)) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic + '/' + key, self.__int_to_ext__(self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0))) def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.set(self.KEY_OUTPUT_0, not self.get(self.KEY_OUTPUT_0)) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: self.print_formatted_light(COLOR_POWERPLUG, value, "(%s)" % self.names.get(key, key)) class tradfri_light(base): KEY_OUTPUT_0 = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" # BOOL_KEYS = [KEY_OUTPUT_0, ] # STATE_COMMANDS = ("get_state", "toggle_state", ) BRIGHTNESS_COMMANDS = ("get_brightness", "set_brightness",) COLOR_TEMP_COMMANDS = ("get_color_temp", "set_color_temp",) def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, send_on_power_on=True): default_values = {} if enable_state: default_values[self.KEY_OUTPUT_0] = False if enable_brightness: default_values[self.KEY_BRIGHTNESS] = 50 if enable_color_temp: default_values[self.KEY_COLOR_TEMP] = 5 super().__init__(mqtt_client, topic, default_values=default_values) # self.send_on_power_on = send_on_power_on # if enable_state: self.commands.extend(self.STATE_COMMANDS) if enable_brightness: self.commands.extend(self.BRIGHTNESS_COMMANDS) if enable_color_temp: self.commands.extend(self.COLOR_TEMP_COMMANDS) # self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) self.add_callback(self.KEY_BRIGHTNESS, None, self.print_formatted, True) self.add_callback(self.KEY_COLOR_TEMP, None, self.print_formatted, True) self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) self.add_callback(self.KEY_BRIGHTNESS, None, self.__send__, True) self.add_callback(self.KEY_COLOR_TEMP, None, self.__send__, True) def __ext_to_int__(self, key, data): if key == self.KEY_BRIGHTNESS: return round((data - 1) / 2.53, 0) elif key == self.KEY_COLOR_TEMP: return round((data - 250) / 20.4, 0) else: return super().__ext_to_int__(key, data) def __int_to_ext__(self, key, data): if key == self.KEY_BRIGHTNESS: return 1 + round(2.53 * data, 0) elif key == self.KEY_COLOR_TEMP: return 250 + round(20.4 * data, 0) else: return super().__int_to_ext__(key, data) def __rx__(self, client, userdata, message): data = json.loads(message.payload) if self.get(self.KEY_OUTPUT_0) or data.get(self.KEY_OUTPUT_0) in ['on', 'toggle']: # prevent non power changes, if not powered on if message.topic.startswith(self.topic) and message.topic.endswith('/set'): for targetkey in data.keys(): value = data[targetkey] if targetkey in self.keys(): self.set(targetkey, self.__ext_to_int__(targetkey, value)) elif message.topic == self.topic + '/get': self.__tx__(None) def __tx__(self, keys_changed): tx_data = dict(self) for key in tx_data: tx_data[key] = self.__int_to_ext__(key, self[key]) self.mqtt_client.send(self.topic, json.dumps(tx_data)) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.capabilities(): if command == self.STATE_COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0)) elif command == self.STATE_COMMANDS[1]: self.set(self.KEY_OUTPUT_0, not self.get(self.KEY_OUTPUT_0)) elif command == self.BRIGHTNESS_COMMANDS[0]: self.print_formatted(self, self.KEY_BRIGHTNESS, self.get(self.KEY_BRIGHTNESS)) elif command == self.BRIGHTNESS_COMMANDS[1]: self.set(self.KEY_BRIGHTNESS, self.__command_int_value__(value)) elif command == self.COLOR_TEMP_COMMANDS[0]: self.print_formatted(self, self.KEY_COLOR_TEMP, self.get(self.KEY_COLOR_TEMP)) elif command == self.COLOR_TEMP_COMMANDS[1]: self.set(self.KEY_COLOR_TEMP, self.__command_int_value__(value)) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def power_off(self, device, key, value): self.set(self.KEY_OUTPUT_0, False, block_callback=(self.__send__, )) def power_on(self, device, key, value): block_callback = [] if self.send_on_power_on else (self.__send__, ) self.set(self.KEY_OUTPUT_0, True, block_callback=block_callback) def print_formatted(self, device, key, value): if value is not None: color = COLOR_LIGHT_ACTIVE if key == self.KEY_OUTPUT_0: self.print_formatted_light(COLOR_LIGHT_ACTIVE, value, "") self.print_formatted(device, self.KEY_BRIGHTNESS, self.get(self.KEY_BRIGHTNESS)) self.print_formatted(device, self.KEY_COLOR_TEMP, self.get(self.KEY_COLOR_TEMP)) elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: perc_value = round(value, 0) if key == self.KEY_BRIGHTNESS else round(10 * value, 0) self.print_formatted_percent( COLOR_LIGHT_PASSIVE if not self.get(self.KEY_OUTPUT_0) else COLOR_LIGHT_ACTIVE, 'B' if key == self.KEY_BRIGHTNESS else 'C', perc_value, "%3d%%" % perc_value, "" ) class brennenstuhl_heating_valve(base): TEMP_RANGE = [10, 30] # KEY_TEMPERATURE_SETPOINT = "current_heating_setpoint" KEY_TEMPERATURE = "local_temperature" # COMMANDS = [ "get_temperature_setpoint", "set_temperature_setpoint", "set_local_temperature", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic, default_values={ self.KEY_TEMPERATURE_SETPOINT: 20, self.KEY_TEMPERATURE: 20.7, }) # self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.print_formatted, True) self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.__send__, True) self.add_callback(self.KEY_TEMPERATURE, None, self.__send__, True) # self.__tx__((self.KEY_TEMPERATURE_SETPOINT, )) def __rx__(self, client, userdata, message): if message.topic.startswith(self.topic) and message.topic.endswith("/set"): payload = self.__payload_filter__(message.payload) for key in payload: self.set(key, self.__ext_to_int__(key, payload[key])) def __tx__(self, keys_changed): tx_data = dict(self) for key in tx_data: tx_data[key] = self.__int_to_ext__(key, self[key]) self.mqtt_client.send(self.topic, json.dumps(tx_data)) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_TEMPERATURE_SETPOINT, self.get(self.KEY_TEMPERATURE_SETPOINT)) elif command == self.COMMANDS[1]: self.set(self.KEY_TEMPERATURE_SETPOINT, self.__command_float_value__(value)) elif command == self.COMMANDS[2]: self.set(self.KEY_TEMPERATURE, self.__command_float_value__(value)) def print_formatted(self, device, key, value): if key == self.KEY_TEMPERATURE_SETPOINT: perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) perc = 100 if perc > 100 else perc perc = 0 if perc < 0 else perc self.print_formatted_percent(COLOR_HEATING_VALVE, '\u03d1', perc, "%4.1f°C" % value, "") class videv_light(base_videv): KEY_OUTPUT_0 = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" KEY_TIMER = "timer" # STATE_COMMANDS = ("get_state", "toggle_state", ) BRIGHTNESS_COMMANDS = ("get_brightness", "set_brightness", ) COLOR_TEMP_COMMANDS = ("get_color_temp", "set_color_temp", ) TIMER_COMMANDS = ("get_timer", ) def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, enable_timer=False): default_values = {} if enable_state: default_values[self.KEY_OUTPUT_0] = False if enable_brightness: default_values[self.KEY_BRIGHTNESS] = 50 if enable_color_temp: default_values[self.KEY_COLOR_TEMP] = 5 if enable_timer: default_values[self.KEY_TIMER] = 0 super().__init__(mqtt_client, topic, default_values=default_values) # self.enable_state = enable_state self.enable_brightness = enable_brightness self.enable_color_temp = enable_color_temp self.enable_timer = enable_timer # if enable_state: self.commands.extend(self.STATE_COMMANDS) if enable_brightness: self.commands.extend(self.BRIGHTNESS_COMMANDS) if enable_color_temp: self.commands.extend(self.COLOR_TEMP_COMMANDS) if enable_timer: self.commands.extend(self.TIMER_COMMANDS) # self.timer_maxvalue = None # add commands to be available self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) self.add_callback(self.KEY_BRIGHTNESS, None, self.print_formatted, True) self.add_callback(self.KEY_COLOR_TEMP, None, self.print_formatted, True) self.add_callback(self.KEY_TIMER, None, self.print_formatted, True) self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) self.add_callback(self.KEY_BRIGHTNESS, None, self.__send__, True) self.add_callback(self.KEY_COLOR_TEMP, None, self.__send__, True) self.add_callback(self.KEY_TIMER, None, self.__send__, True) def __rx__(self, client, userdata, message): value = self.__payload_filter__(message.payload) if message.topic.startswith(self.topic): targetkey = message.topic.split('/')[-1] if targetkey in self.keys(): self.set(targetkey, value, block_callback=(self.__send__, )) elif targetkey != "__info__": print("Unknown key %s in %s::%s" % (targetkey, message.topic, self.__class__.__name__)) def __tx__(self, keys_changed): for key in keys_changed: topic = self.topic + '/' + key self.mqtt_client.send(topic, json.dumps(self[key])) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.capabilities(): if command == self.STATE_COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0)) elif command == self.STATE_COMMANDS[1]: self.set(self.KEY_OUTPUT_0, not self.get(self.KEY_OUTPUT_0)) elif command == self.BRIGHTNESS_COMMANDS[0]: self.print_formatted(self, self.KEY_BRIGHTNESS, self.get(self.KEY_BRIGHTNESS)) elif command == self.BRIGHTNESS_COMMANDS[1]: self.set(self.KEY_BRIGHTNESS, self.__command_int_value__(value)) elif command == self.COLOR_TEMP_COMMANDS[0]: self.print_formatted(self, self.KEY_COLOR_TEMP, self.get(self.KEY_COLOR_TEMP)) elif command == self.COLOR_TEMP_COMMANDS[1]: self.set(self.KEY_COLOR_TEMP, self.__command_int_value__(value)) elif command == self.TIMER_COMMANDS[0]: self.print_formatted(self, self.KEY_TIMER, self.get(self.KEY_TIMER)) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: if key == self.KEY_OUTPUT_0: self.print_formatted_videv(COLOR_GUI_ACTIVE, value, "") elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: perc_value = round(value * 10 if key == self.KEY_COLOR_TEMP else value, 0) self.print_formatted_percent( COLOR_GUI_ACTIVE, 'B' if key == self.KEY_BRIGHTNESS else 'C', perc_value, "%3d%%" % perc_value, "" ) elif key == self.KEY_TIMER: if value > 0: if self.timer_maxvalue is None and value != 0: self.timer_maxvalue = value disp_value = value try: perc = disp_value / self.timer_maxvalue * 100 except ZeroDivisionError: perc = 0 else: disp_value = 0 perc = 0 self.timer_maxvalue = None self.print_formatted_percent(COLOR_GUI_ACTIVE, 't', perc, '%3d%%' % perc, '(%.1f)' % disp_value) class videv_heating(base_videv): TEMP_RANGE = [10, 30] # KEY_USER_TEMPERATURE_SETPOINT = 'user_temperature_setpoint' KEY_VALVE_TEMPERATURE_SETPOINT = 'valve_temperature_setpoint' KEY_AWAY_MODE = 'away_mode' KEY_SUMMER_MODE = 'summer_mode' KEY_SET_DEFAULT_TEMPERATURE = 'set_default_temperature' KEY_START_BOOST = 'start_boost' KEY_BOOST_TIMER = 'boost_timer' # KEY_TEMPERATURE = 'temperature' # COMMANDS = ["get_temperature_setpoint", "set_temperature_setpoint", "toggle_away_mode", "toggle_summer_mode", "trigger_default_temperature", "trigger_boost"] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic, default_values={ self.KEY_USER_TEMPERATURE_SETPOINT: 20, self.KEY_VALVE_TEMPERATURE_SETPOINT: 20, self.KEY_TEMPERATURE: 20.7, self.KEY_AWAY_MODE: False, self.KEY_SUMMER_MODE: False, self.KEY_BOOST_TIMER: 0 }) self.add_callback(self.KEY_USER_TEMPERATURE_SETPOINT, None, self.print_formatted, True) self.add_callback(self.KEY_VALVE_TEMPERATURE_SETPOINT, None, self.print_formatted, True) self.add_callback(self.KEY_TEMPERATURE, None, self.print_formatted, True) self.add_callback(self.KEY_AWAY_MODE, None, self.print_formatted, True) self.add_callback(self.KEY_SUMMER_MODE, None, self.print_formatted, True) self.add_callback(self.KEY_BOOST_TIMER, None, self.print_formatted, True) self.add_callback(self.KEY_USER_TEMPERATURE_SETPOINT, None, self.__send__, True) self.add_callback(self.KEY_TEMPERATURE, None, self.__send__, True) self.add_callback(self.KEY_AWAY_MODE, None, self.__send__, True) self.add_callback(self.KEY_SUMMER_MODE, None, self.__send__, True) # self.timer_maxvalue = None def __rx__(self, client, userdata, message): value = self.__payload_filter__(message.payload) if message.topic.startswith(self.topic): targetkey = message.topic.split('/')[-1] if targetkey in self.keys(): self.set(targetkey, value, block_callback=(self.__send__, )) elif targetkey not in ["__info__", self.KEY_SET_DEFAULT_TEMPERATURE, self.KEY_START_BOOST]: print("Unknown key %s in %s::%s" % (targetkey, message.topic, self.__class__.__name__)) def __tx__(self, keys_changed): for key in keys_changed: topic = self.topic + '/' + key try: self.mqtt_client.send(topic, json.dumps(self[key])) except KeyError: self.mqtt_client.send(topic, json.dumps(True)) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.capabilities(): if command == self.commands[0]: self.print_formatted(self, self.KEY_USER_TEMPERATURE_SETPOINT, self.get(self.KEY_USER_TEMPERATURE_SETPOINT)) elif command == self.commands[1]: self.set(self.KEY_USER_TEMPERATURE_SETPOINT, self.__command_float_value__(value)) elif command == self.commands[2]: self.set(self.KEY_AWAY_MODE, not self.get(self.KEY_AWAY_MODE)) elif command == self.commands[3]: self.set(self.KEY_SUMMER_MODE, not self.get(self.KEY_SUMMER_MODE)) elif command == self.commands[4]: self.set(self, self.KEY_SET_DEFAULT_TEMPERATURE, True) elif command == self.commands[5]: self.set(self, self.KEY_START_BOOST, True) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): desc_temp_dict = { self.KEY_TEMPERATURE: "Current Temperature", self.KEY_USER_TEMPERATURE_SETPOINT: "User Setpoint", self.KEY_VALVE_TEMPERATURE_SETPOINT: "Valve Setpoint" } if value is not None: if key in [self.KEY_TEMPERATURE, self.KEY_USER_TEMPERATURE_SETPOINT, self.KEY_VALVE_TEMPERATURE_SETPOINT]: perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) perc = 100 if perc > 100 else perc perc = 0 if perc < 0 else perc self.print_formatted_percent(COLOR_GUI_ACTIVE, '\u03d1', perc, "%4.1f°C" % value, "(%s)" % desc_temp_dict[key]) elif key in [self.KEY_AWAY_MODE, self.KEY_SUMMER_MODE]: self.print_formatted_videv(COLOR_GUI_ACTIVE, value, "(%s)" % "Away-Mode" if key == self.KEY_AWAY_MODE else "Summer-Mode") elif key == self.KEY_BOOST_TIMER: if value > 0: if self.timer_maxvalue is None and value != 0: self.timer_maxvalue = value disp_value = value try: perc = disp_value / self.timer_maxvalue * 100 except ZeroDivisionError: perc = 0 else: disp_value = 0 perc = 0 self.timer_maxvalue = None self.print_formatted_percent(COLOR_GUI_ACTIVE, 't', perc, '%3d%%' % perc, '(%.1f)' % disp_value) # class silvercrest_motion_sensor(base): # KEY_OCCUPANCY = "occupancy" # COMMANDS = ['motion'] # def __init__(self, mqtt_client, topic): # super().__init__(mqtt_client, topic) # self.data[self.KEY_OCCUPANCY] = False # self.add_callback(self.KEY_OCCUPANCY, self.print_formatted, None) # def __rx__(self, client, userdata, message): # pass # def command(self, command): # try: # command, value = command.split(' ') # except ValueError: # value = None # else: # value = json.loads(value) # if command == self.COMMANDS[0]: # self.store_data(**{self.KEY_OCCUPANCY: True}) # time.sleep(value or 10) # self.store_data(**{self.KEY_OCCUPANCY: False}) # def print_formatted(self, device, key, value): # if value is not None: # print_light(COLOR_MOTION_SENSOR, value, self.topic, "") # class tradfri_button(base): # KEY_ACTION = "action" # # # ACTION_TOGGLE = "toggle" # ACTION_BRIGHTNESS_UP = "brightness_up_click" # ACTION_BRIGHTNESS_DOWN = "brightness_down_click" # ACTION_RIGHT = "arrow_right_click" # ACTION_LEFT = "arrow_left_click" # ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" # ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" # ACTION_RIGHT_LONG = "arrow_right_hold" # ACTION_LEFT_LONG = "arrow_left_hold" # # # COMMANDS = [ACTION_TOGGLE, ACTION_LEFT, ACTION_RIGHT, ACTION_BRIGHTNESS_UP, ACTION_BRIGHTNESS_DOWN, # ACTION_LEFT_LONG, ACTION_RIGHT_LONG, ACTION_BRIGHTNESS_UP_LONG, ACTION_BRIGHTNESS_DOWN_LONG] # def __init__(self, mqtt_client, topic): # super().__init__(mqtt_client, topic) # def __rx__(self, client, userdata, message): # pass # def command(self, command): # try: # command, value = command.split(' ') # except ValueError: # value = None # else: # value = json.loads(value) # if command in self.capabilities(): # action = self.COMMANDS[self.COMMANDS.index(command)] # if self.COMMANDS.index(command) <= 4: # self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) # elif self.COMMANDS.index(command) <= 8: # self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) # time.sleep(value or 0.5) # action = '_'.join(action.split('_')[:-1] + ['release']) # self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) # class remote(base): # def __rx__(self, client, userdata, message): # if message.topic == self.topic + "/VOLUP": # if self.__payload_filter__(message.payload): # icon = u'\u1403' # else: # icon = u'\u25a1' # elif message.topic == self.topic + "/VOLDOWN": # if self.__payload_filter__(message.payload): # icon = u'\u1401' # else: # icon = u'\u25a1' # else: # return # devicename = ' - '.join(self.topic.split('/')[1:-1]) # print(COLOR_REMOTE + 10 * ' ' + icon + 6 * ' ' + devicename + colored.attr("reset"))