from mqtt.smarthome import mqtt_base import colored import json import task COLOR_GUI_ACTIVE = colored.fg("light_blue") COLOR_GUI_PASSIVE = COLOR_GUI_ACTIVE + colored.attr("dim") COLOR_SHELLY = colored.fg("light_magenta") COLOR_POWERPLUG = colored.fg("light_cyan") COLOR_LIGHT_ACTIVE = colored.fg("yellow") COLOR_LIGHT_PASSIVE = COLOR_LIGHT_ACTIVE + colored.attr("dim") COLOR_WARNINGS = colored.fg("dark_orange_3b") COLOR_HEATING_VALVE = colored.fg("red") COLOR_REMOTE = colored.fg("green") OUTPUT_ACTIVE = False class base_common(mqtt_base): AUTOSEND = True BOOL_KEYS = [] def __init__(self, mqtt_client, topic, default_values=None): super().__init__(mqtt_client, topic, default_values) def __payload_filter__(self, payload): try: return json.loads(payload) except json.decoder.JSONDecodeError: return payload.decode("utf-8") def __rx__(self, client, userdata, message): print("%s: __rx__ not handled!" % self.__class__.__name__) def __tx__(self, keys_changed): print("%s: __tx__ not handled!" % self.__class__.__name__) def __send__(self, client, key, data): self.__tx__([key]) def __ext_to_int__(self, key, data): if key in self.BOOL_KEYS: if data == 'toggle': return not self.get(key) return data == "on" return data def __int_to_ext__(self, key, value): if key in self.BOOL_KEYS: return "on" if value is True else "off" return value def __devicename__(self): return " - ".join(self.topic.split('/')[1:]) def __percent_bar__(self, percent_value): rv = "" for i in range(0, 10): rv += u"\u25ac" if (percent_value - 5) > 10*i else u"\u25ad" return rv def print_formatted_light(self, color, state, description, led=False): if OUTPUT_ACTIVE: if led is True: if state is True: icon = colored.fg('green') + "\u2b24" + color else: icon = colored.fg('light_gray') + "\u2b24" + color else: icon = u'\u2b24' if state is True else u'\u25ef' print(color + 10 * ' ' + icon + 9 * ' ' + self.__devicename__(), description + colored.attr("reset")) def print_formatted_videv(self, color, state, description): if OUTPUT_ACTIVE: icon = u'\u25a0' if state is True else u'\u25a1' print(color + 10 * ' ' + icon + 9 * ' ' + self.__devicename__(), description + colored.attr("reset")) def print_formatted_percent(self, color, prefix, perc_value, value_str, description): if OUTPUT_ACTIVE: if len(prefix) > 1 or len(value_str) > 7: raise ValueError("Length of prefix (%d) > 1 or length of value_str (%d) > 7" % (len(prefix), len(value_str))) print(color + prefix + self.__percent_bar__(perc_value), value_str + (8 - len(value_str)) * ' ' + self.__devicename__(), description + colored.attr("reset")) class base(base_common): def __init__(self, mqtt_client, topic, default_values=None): super().__init__(mqtt_client, topic, default_values) self.mqtt_client.add_callback(self.topic, self.__rx__) self.mqtt_client.add_callback(self.topic + '/#', self.__rx__) class base_videv(base_common): SET_TOPIC = "set" RX_KEYS = [] def __init__(self, mqtt_client, topic, default_values=None): super().__init__(mqtt_client, topic, default_values) # for key in self.RX_KEYS: # add mqtt callbacks for RX data self.mqtt_client.add_callback('/'.join([self.topic, key]), self.__rx__) # add print_formatted for RX data self.add_callback(key, None, self.print_formatted, True) def set(self, key, data, block_callback=[]): self.mqtt_client.send('/'.join([self.topic, key, self.SET_TOPIC]), json.dumps(data)) def __rx__(self, client, userdata, message): value = self.__payload_filter__(message.payload) if message.topic.startswith(self.topic): targetkey = message.topic.split('/')[-1] if targetkey in self.keys(): super().set(targetkey, self.__ext_to_int__(targetkey, value)) elif targetkey != "__info__": print("Unknown key %s in %s::%s" % (targetkey, message.topic, self.__class__.__name__)) class shelly(base): DEVICENAME = "Shelly" # KEY_OUTPUT_0 = "relay/0" KEY_OUTPUT_1 = "relay/1" KEY_INPUT_0 = "input/0" KEY_INPUT_1 = "input/1" KEY_LONGPUSH_0 = "longpush/0" KEY_LONGPUSH_1 = "longpush/1" KEY_TEMPERATURE = "temperature" KEY_OVERTEMPERATURE = "overtemperature" # BOOL_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, ] # INPUT_FUNC_OUT1_FOLLOW = "out1_follow" INPUT_FUNC_OUT1_TRIGGER = "out1_trigger" INPUT_FUNC_OUT2_FOLLOW = "out2_follow" INPUT_FUNC_OUT2_TRIGGER = "out2_trigger" def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic, default_values={self.KEY_OUTPUT_0: False, self.KEY_OUTPUT_1: False, self.KEY_INPUT_0: False, self.KEY_INPUT_1: False, self.KEY_INPUT_0: False, self.KEY_INPUT_1: False, self.KEY_LONGPUSH_0: False, self.KEY_LONGPUSH_1: False, self.KEY_TEMPERATURE: 35.2, self.KEY_OVERTEMPERATURE: False}) # self.ch_names = { self.KEY_INPUT_0: "in0", self.KEY_INPUT_1: "in1", self.KEY_LONGPUSH_0: "long0", self.KEY_LONGPUSH_1: "long1", self.KEY_OUTPUT_0: "out0", self.KEY_OUTPUT_1: "out1", } # self.__input_0_func = None self.__input_1_func = None self.__output_0_auto_off__ = None # print ouput changes self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) self.add_callback(self.KEY_OUTPUT_1, None, self.print_formatted, True) # publish state changes self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) self.add_callback(self.KEY_OUTPUT_1, None, self.__send__, True) self.add_callback(self.KEY_INPUT_0, None, self.__send__, True) self.add_callback(self.KEY_INPUT_1, None, self.__send__, True) self.add_callback(self.KEY_LONGPUSH_0, None, self.__send__, True) self.add_callback(self.KEY_LONGPUSH_1, None, self.__send__, True) self.add_callback(self.KEY_TEMPERATURE, None, self.__send__, True) self.add_callback(self.KEY_OVERTEMPERATURE, None, self.__send__, True) # if self.__output_0_auto_off__ is not None: self.__delayed_off__ = task.delayed(float(self.__output_0_auto_off__), self.__auto_off__, self.KEY_OUTPUT_0) self.add_callback(self.KEY_OUTPUT_0, True, self.__start_auto_off__) # self.add_callback(self.KEY_INPUT_0, self.__input_function__, None) self.add_callback(self.KEY_INPUT_1, self.__input_function__, None) # self.__tx__(self.keys()) # Send state of all values def configure(self, input_0_func=None, input_1_func=None, output_0_auto_off=None): self.__input_0_func = input_0_func self.__input_1_func = input_1_func self.__output_0_auto_off__ = output_0_auto_off def __int_to_ext__(self, key, value): if key == self.KEY_OVERTEMPERATURE: return int(value) elif key in self.BOOL_KEYS: return b'1' if value is True else b'0' return super().__int_to_ext__(key, value) def __rx__(self, client, userdata, message): value = self.__payload_filter__(message.payload) if message.topic.startswith(self.topic) and message.topic.endswith("/command"): key = '/'.join(message.topic.split('/')[-3:-1]) self.set(key, self.__ext_to_int__(key, value)) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic + '/' + key, self.__int_to_ext__(key, self[key])) def __input_function__(self, device, key, data): if key == self.KEY_INPUT_0: func = self.__input_0_func elif key == self.KEY_INPUT_1: func = self.__input_1_func else: func = None if func == self.INPUT_FUNC_OUT1_FOLLOW: self.set(self.KEY_OUTPUT_0, data) elif func == self.INPUT_FUNC_OUT1_TRIGGER: self.set__toggle_data__(self.KEY_OUTPUT_0) elif func == self.INPUT_FUNC_OUT2_FOLLOW: self.__set_data__(self.KEY_OUTPUT_1, data) elif func == self.INPUT_FUNC_OUT2_TRIGGER: self.__toggle_data__(self.KEY_OUTPUT_1) def __start_auto_off__(self, device, key, data): # stop delayed task if needed if self.__output_0_auto_off__ is not None: if not self.__delayed_off__._stopped: self.__delayed_off__.stop() # start delayed task if self.__output_0_auto_off__ is not None: self.__delayed_off__.run() def __auto_off__(self, key): if key == self.KEY_OUTPUT_0: self.set(key, False) def print_formatted(self, device, key, value): if value is not None: info = (" - %ds" % self.__output_0_auto_off__) if self.__output_0_auto_off__ is not None and value else "" channel = "(%s%s)" % (self.get_name(key, key), info) self.print_formatted_light(COLOR_SHELLY, value, channel) def warning_state_on(self): self.set(self.KEY_TEMPERATURE, 78.3) self.set(self.KEY_OVERTEMPERATURE, True) def warning_state_off(self): self.set(self.KEY_TEMPERATURE, 34.1) self.set(self.KEY_OVERTEMPERATURE, False) class my_powerplug(base): DEVICENAME = "Powerplug4P" # KEY_OUTPUT_0 = "output/1" KEY_OUTPUT_1 = "output/2" KEY_OUTPUT_2 = "output/3" KEY_OUTPUT_3 = "output/4" # BOOL_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3, ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic, default_values={self.KEY_OUTPUT_0: False, self.KEY_OUTPUT_1: False, self.KEY_OUTPUT_2: False, self.KEY_OUTPUT_3: False, }) # print ouput changes self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) self.add_callback(self.KEY_OUTPUT_1, None, self.print_formatted, True) self.add_callback(self.KEY_OUTPUT_2, None, self.print_formatted, True) self.add_callback(self.KEY_OUTPUT_3, None, self.print_formatted, True) # publish state changes self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) self.add_callback(self.KEY_OUTPUT_1, None, self.__send__, True) self.add_callback(self.KEY_OUTPUT_2, None, self.__send__, True) self.add_callback(self.KEY_OUTPUT_3, None, self.__send__, True) # self.__tx__((self.KEY_OUTPUT_0, )) self.__tx__((self.KEY_OUTPUT_1, )) self.__tx__((self.KEY_OUTPUT_2, )) self.__tx__((self.KEY_OUTPUT_3, )) def __rx__(self, client, userdata, message): value = self.__payload_filter__(message.payload) if message.topic.startswith(self.topic) and message.topic.endswith("/set"): key = '/'.join(message.topic.split('/')[-3:-1]) self.set(key, value) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic + "/" + key, json.dumps(self.get(key))) def print_formatted(self, device, key, value): if value is not None: self.print_formatted_light(COLOR_POWERPLUG, value, "(%s)" % self.get_name(key, "State")) class silvercrest_powerplug(base): DEVICENAME = "Powerplug1P" # KEY_OUTPUT_0 = "state" # BOOL_KEYS = [KEY_OUTPUT_0, ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic, default_values={self.KEY_OUTPUT_0: False}) # self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) # self.__tx__((self.KEY_OUTPUT_0, )) def __rx__(self, client, userdata, message): if message.topic == self.topic + '/set': state = json.loads(message.payload).get('state') self.set(self.KEY_OUTPUT_0, self.__ext_to_int__(self.KEY_OUTPUT_0, state)) def __tx__(self, keys_changed): data = {} for key in self: data[key] = self.__int_to_ext__(key, self[key]) for key in keys_changed: self.mqtt_client.send(self.topic, json.dumps(data)) def print_formatted(self, device, key, value): if value is not None: self.print_formatted_light(COLOR_POWERPLUG, value, "(%s)" % self.get_name(key, key)) class tradfri_light(base): DEVICENAME = "Light" # KEY_OUTPUT_0 = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" # BOOL_KEYS = [KEY_OUTPUT_0, ] def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, send_on_power_on=True): default_values = {} if enable_state: default_values[self.KEY_OUTPUT_0] = False if enable_brightness: default_values[self.KEY_BRIGHTNESS] = 50 if enable_color_temp: default_values[self.KEY_COLOR_TEMP] = 5 super().__init__(mqtt_client, topic, default_values=default_values) # self.send_on_power_on = send_on_power_on # self.add_callback(self.KEY_OUTPUT_0, None, self.print_formatted, True) self.add_callback(self.KEY_BRIGHTNESS, None, self.print_formatted, True) self.add_callback(self.KEY_COLOR_TEMP, None, self.print_formatted, True) self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) self.add_callback(self.KEY_BRIGHTNESS, None, self.__send__, True) self.add_callback(self.KEY_COLOR_TEMP, None, self.__send__, True) def __ext_to_int__(self, key, data): if key == self.KEY_BRIGHTNESS: return int(round((data - 1) / 2.53, 0)) elif key == self.KEY_COLOR_TEMP: return int(round((data - 250) / 20.4, 0)) else: return super().__ext_to_int__(key, data) def __int_to_ext__(self, key, data): if key == self.KEY_BRIGHTNESS: return 1 + round(2.53 * data, 0) elif key == self.KEY_COLOR_TEMP: return 250 + round(20.4 * data, 0) else: return super().__int_to_ext__(key, data) def __rx__(self, client, userdata, message): data = json.loads(message.payload) if self.get(self.KEY_OUTPUT_0) or data.get(self.KEY_OUTPUT_0) in ['on', 'toggle']: # prevent non power changes, if not powered on if message.topic.startswith(self.topic) and message.topic.endswith('/set'): for targetkey in data.keys(): value = data[targetkey] if targetkey in self.keys(): self.set(targetkey, self.__ext_to_int__(targetkey, value)) elif message.topic == self.topic + '/get': self.__tx__(None) def __tx__(self, keys_changed): tx_data = dict(self) for key in tx_data: tx_data[key] = self.__int_to_ext__(key, self[key]) self.mqtt_client.send(self.topic, json.dumps(tx_data)) def power_off(self, device, key, value): self.set(self.KEY_OUTPUT_0, False, block_callback=(self.__send__, )) def power_on(self, device, key, value): block_callback = [] if self.send_on_power_on else (self.__send__, ) self.set(self.KEY_OUTPUT_0, True, block_callback=block_callback) def print_formatted(self, device, key, value): if value is not None: color = COLOR_LIGHT_ACTIVE if key == self.KEY_OUTPUT_0: self.print_formatted_light(COLOR_LIGHT_ACTIVE, value, "") self.print_formatted(device, self.KEY_BRIGHTNESS, self.get(self.KEY_BRIGHTNESS)) self.print_formatted(device, self.KEY_COLOR_TEMP, self.get(self.KEY_COLOR_TEMP)) elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: perc_value = round(value, 0) if key == self.KEY_BRIGHTNESS else round(10 * value, 0) self.print_formatted_percent( COLOR_LIGHT_PASSIVE if not self.get(self.KEY_OUTPUT_0) else COLOR_LIGHT_ACTIVE, 'B' if key == self.KEY_BRIGHTNESS else 'C', perc_value, "%3d%%" % perc_value, "" ) class brennenstuhl_heatingvalve(base): DEVICENAME = "HeatingValve" # TEMP_RANGE = [10, 30] # KEY_TEMPERATURE_SETPOINT = "current_heating_setpoint" KEY_TEMPERATURE = "local_temperature" KEY_BATTERY = "battery" def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic, default_values={ self.KEY_TEMPERATURE_SETPOINT: 20, self.KEY_TEMPERATURE: 20.7, self.KEY_BATTERY: 97, }) # self.ch_names = { self.KEY_TEMPERATURE: "temp", self.KEY_TEMPERATURE_SETPOINT: "temp_setp" } # self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.print_formatted, True) self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.__send__, True) self.add_callback(self.KEY_TEMPERATURE, None, self.__send__, True) self.add_callback(self.KEY_BATTERY, None, self.__send__, True) # self.__tx__((self.KEY_TEMPERATURE_SETPOINT, )) def __rx__(self, client, userdata, message): if message.topic.startswith(self.topic) and message.topic.endswith("/set"): payload = self.__payload_filter__(message.payload) for key in payload: self.set(key, self.__ext_to_int__(key, payload[key])) def __tx__(self, keys_changed): tx_data = dict(self) for key in tx_data: tx_data[key] = self.__int_to_ext__(key, self[key]) self.mqtt_client.send(self.topic, json.dumps(tx_data)) def print_formatted(self, device, key, value): if key == self.KEY_TEMPERATURE_SETPOINT: perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) perc = 100 if perc > 100 else perc perc = 0 if perc < 0 else perc self.print_formatted_percent(COLOR_HEATING_VALVE, '\u03d1', perc, "%4.1f°C" % value, "") def warning_state_on(self): self.set(self.KEY_BATTERY, 7) def warning_state_off(self): self.set(self.KEY_BATTERY, 97) class videv_light(base_videv): DEVICENAME = "ViDevCommon" # KEY_OUTPUT_0 = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" KEY_TIMER = "timer" # RX_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP, KEY_TIMER] def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, enable_timer=False): default_values = {} if enable_state: default_values[self.KEY_OUTPUT_0] = False if enable_brightness: default_values[self.KEY_BRIGHTNESS] = 50 if enable_color_temp: default_values[self.KEY_COLOR_TEMP] = 5 if enable_timer: default_values[self.KEY_TIMER] = 0 super().__init__(mqtt_client, topic, default_values=default_values) # self.enable_state = enable_state self.enable_brightness = enable_brightness self.enable_color_temp = enable_color_temp self.enable_timer = enable_timer # self.timer_maxvalue = None def __ext_to_int__(self, key, data): if key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: return int(data) return super().__ext_to_int__(key, data) def print_formatted(self, device, key, value): if value is not None: if key == self.KEY_OUTPUT_0: self.print_formatted_videv(COLOR_GUI_ACTIVE, value, "") elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: perc_value = round(value * 10 if key == self.KEY_COLOR_TEMP else value, 0) self.print_formatted_percent( COLOR_GUI_ACTIVE, 'B' if key == self.KEY_BRIGHTNESS else 'C', perc_value, "%3d%%" % perc_value, "" ) elif key == self.KEY_TIMER: if value > 0: if self.timer_maxvalue is None and value != 0: self.timer_maxvalue = value disp_value = value try: perc = disp_value / self.timer_maxvalue * 100 except ZeroDivisionError: perc = 0 else: disp_value = 0 perc = 0 self.timer_maxvalue = None self.print_formatted_percent(COLOR_GUI_ACTIVE, 't', perc, '%3d%%' % perc, '(%.1f)' % disp_value) class videv_heating(base_videv): DEVICENAME = "ViDevHeating" # TEMP_RANGE = [10, 30] # KEY_USER_TEMPERATURE_SETPOINT = 'user_temperature_setpoint' KEY_VALVE_TEMPERATURE_SETPOINT = 'valve_temperature_setpoint' KEY_AWAY_MODE = 'away_mode' KEY_SUMMER_MODE = 'summer_mode' KEY_SET_DEFAULT_TEMPERATURE = 'set_default_temperature' KEY_START_BOOST = 'start_boost' KEY_BOOST_TIMER = 'boost_timer' # KEY_TEMPERATURE = 'temperature' # RX_KEYS = [KEY_USER_TEMPERATURE_SETPOINT, KEY_AWAY_MODE, KEY_SUMMER_MODE, KEY_VALVE_TEMPERATURE_SETPOINT, KEY_BOOST_TIMER] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic, default_values={ self.KEY_USER_TEMPERATURE_SETPOINT: 20, self.KEY_VALVE_TEMPERATURE_SETPOINT: 20, self.KEY_TEMPERATURE: 20.7, self.KEY_AWAY_MODE: False, self.KEY_SUMMER_MODE: False, self.KEY_BOOST_TIMER: 0 }) # self.ch_names = { self.KEY_TEMPERATURE: "temp", self.KEY_USER_TEMPERATURE_SETPOINT: "temp_setp" } # self.timer_maxvalue = None def print_formatted(self, device, key, value): desc_temp_dict = { self.KEY_TEMPERATURE: "Current Temperature", self.KEY_USER_TEMPERATURE_SETPOINT: "User Setpoint", self.KEY_VALVE_TEMPERATURE_SETPOINT: "Valve Setpoint" } if value is not None: if key in [self.KEY_TEMPERATURE, self.KEY_USER_TEMPERATURE_SETPOINT, self.KEY_VALVE_TEMPERATURE_SETPOINT]: perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) perc = 100 if perc > 100 else perc perc = 0 if perc < 0 else perc self.print_formatted_percent(COLOR_GUI_ACTIVE, '\u03d1', perc, "%4.1f°C" % value, "(%s)" % desc_temp_dict[key]) elif key in [self.KEY_AWAY_MODE, self.KEY_SUMMER_MODE]: self.print_formatted_videv(COLOR_GUI_ACTIVE, value, "(%s)" % "Away-Mode" if key == self.KEY_AWAY_MODE else "Summer-Mode") elif key == self.KEY_BOOST_TIMER: if value > 0: if self.timer_maxvalue is None and value != 0: self.timer_maxvalue = value disp_value = value try: perc = disp_value / self.timer_maxvalue * 100 except ZeroDivisionError: perc = 0 else: disp_value = 0 perc = 0 self.timer_maxvalue = None self.print_formatted_percent(COLOR_GUI_ACTIVE, 't', perc, '%3d%%' % perc, '(%.1f)' % disp_value) # class silvercrest_motion_sensor(base): # KEY_OCCUPANCY = "occupancy" # KEY_BATTERY = "battery" # KEY_BATTERY_LOW = "battery_low" # def __init__(self, mqtt_client, topic): # super().__init__(mqtt_client, topic) # self.data[self.KEY_OCCUPANCY] = False # self.add_callback(self.KEY_OCCUPANCY, self.print_formatted, None) # def __rx__(self, client, userdata, message): # pass # def print_formatted(self, device, key, value): # if value is not None: # print_light(COLOR_MOTION_SENSOR, value, self.topic, "") # def warning_state_on(self): # self.set(self.KEY_BATTERY, 7) # self.set(self.KEY_BATTERY_LOW, True) # def warning_state_off(self): # self.set(self.KEY_BATTERY, 97) # self.set(self.KEY_BATTERY_LOW, False) # class tradfri_button(base): # KEY_ACTION = "action" # KEY_BATTERY = "battery" # # # ACTION_TOGGLE = "toggle" # ACTION_BRIGHTNESS_UP = "brightness_up_click" # ACTION_BRIGHTNESS_DOWN = "brightness_down_click" # ACTION_RIGHT = "arrow_right_click" # ACTION_LEFT = "arrow_left_click" # ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" # ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" # ACTION_RIGHT_LONG = "arrow_right_hold" # ACTION_LEFT_LONG = "arrow_left_hold" # # def __init__(self, mqtt_client, topic): # super().__init__(mqtt_client, topic) # def __rx__(self, client, userdata, message): # pass # def warning_state_on(self): # self.set(self.KEY_BATTERY, 7) # def warning_state_off(self): # self.set(self.KEY_BATTERY, 97) # class remote(base): # def __rx__(self, client, userdata, message): # if message.topic == self.topic + "/VOLUP": # if self.__payload_filter__(message.payload): # icon = u'\u1403' # else: # icon = u'\u25a1' # elif message.topic == self.topic + "/VOLDOWN": # if self.__payload_filter__(message.payload): # icon = u'\u1401' # else: # icon = u'\u25a1' # else: # return # devicename = ' - '.join(self.topic.split('/')[1:-1]) # print(COLOR_REMOTE + 10 * ' ' + icon + 6 * ' ' + devicename + colored.attr("reset"))