import colored import copy import json import task import time COLOR_GUI_ACTIVE = colored.fg("light_blue") COLOR_GUI_PASSIVE = COLOR_GUI_ACTIVE + colored.attr("dim") COLOR_SHELLY = colored.fg("light_magenta") COLOR_POWERPLUG = colored.fg("light_cyan") COLOR_LIGHT_ACTIVE = colored.fg("yellow") COLOR_LIGHT_PASSIVE = COLOR_LIGHT_ACTIVE + colored.attr("dim") COLOR_MOTION_SENSOR = colored.fg("dark_orange_3b") COLOR_HEATING_VALVE = colored.fg("red") COLOR_REMOTE = colored.fg("green") def payload_filter(payload): try: return json.loads(payload) except json.decoder.JSONDecodeError: return payload.decode("utf-8") def command_int_value(value): try: return int(value) except TypeError: print("You need to give a integer parameter not '%s'" % str(value)) def command_float_value(value): try: return float(value) except TypeError: print("You need to give a integer parameter not '%s'" % str(value)) def devicename(topic): return " - ".join(topic.split('/')[1:]) def percent_bar(value): rv = "" for i in range(0, 10): rv += u"\u25ac" if (value - 5) > 10*i else u"\u25ad" return rv def print_light(color, state, topic, description, led=False): if led is True: if state is True: icon = colored.fg('green') + "\u2b24" + color else: icon = colored.fg('light_gray') + "\u2b24" + color else: icon = u'\u2b24' if state is True else u'\u25ef' print(color + 10 * ' ' + icon + 9 * ' ' + devicename(topic), description + colored.attr("reset")) def print_switch(color, state, topic, description): icon = u'\u25a0' if state is True else u'\u25a1' print(color + 10 * ' ' + icon + 9 * ' ' + devicename(topic), description + colored.attr("reset")) def print_percent(color, prefix, perc_value, value_str, topic, description): if len(prefix) > 1 or len(value_str) > 7: raise ValueError("Length of prefix (%d) > 1 or length of value_str (%d) > 7" % (len(prefix), len(value_str))) print(color + prefix + percent_bar(perc_value), value_str + (8 - len(value_str)) * ' ' + devicename(topic), description + colored.attr("reset")) class base(object): AUTOSEND = True COMMANDS = [] def __init__(self, mqtt_client, topic): self.mqtt_client = mqtt_client self.topic = topic # self.data = {} self.callbacks = {} self.names = {} self.commands = self.COMMANDS[:] # self.mqtt_client.add_callback(self.topic, self.__rx__) self.mqtt_client.add_callback(self.topic + '/#', self.__rx__) def add_callback(self, key, callback, value): if self.callbacks.get(key) is None: self.callbacks[key] = [] self.callbacks[key].append((callback, value)) def add_channel_name(self, key, name): self.names[key] = name def capabilities(self): return self.commands def store_data(self, *args, **kwargs): keys_changed = [] for key in kwargs: if kwargs[key] is not None and kwargs[key] != self.data.get(key): keys_changed.append(key) self.data[key] = kwargs[key] for callback, value in self.callbacks.get(key, [(None, None)]): if callback is not None and (value is None or value == kwargs[key]): callback(self, key, kwargs[key]) if self.AUTOSEND and len(keys_changed) > 0: self.__tx__(keys_changed) def get_data(self, key, default=None): rv = self.data.get(key, default) try: rv = True if rv.lower() == 'on' else rv rv = False if rv.lower() == 'off' else rv except AttributeError: pass return rv def __tx__(self, keys_changed): self.mqtt_client.send(self.topic, json.dumps(self.data)) def __rx__(self, client, userdata, message): print("%s: __rx__ not handled!" % self.__class__.__name__) class shelly(base): KEY_OUTPUT_0 = "relay/0" KEY_OUTPUT_1 = "relay/1" KEY_INPUT_0 = "input/0" KEY_INPUT_1 = "input/1" KEY_LONGPUSH_0 = "longpush/0" KEY_LONGPUSH_1 = "longpush/1" # INPUT_FUNC_OUT1_FOLLOW = "out1_follow" INPUT_FUNC_OUT1_TRIGGER = "out1_trigger" INPUT_FUNC_OUT2_FOLLOW = "out2_follow" INPUT_FUNC_OUT2_TRIGGER = "out2_trigger" # COMMANDS = [ "get_relay_0", "toggle_relay_0", "get_relay_1", "toggle_relay_1", "get_input_0", "toggle_input_0", "get_input_1", "toggle_input_1", "trigger_long_input_0", "trigger_long_input_1", ] def __init__(self, mqtt_client, topic, input_0_func=None, input_1_func=None, output_0_auto_off=None): super().__init__(mqtt_client, topic) # self.store_data(**{self.KEY_OUTPUT_0: False, self.KEY_OUTPUT_1: False, self.KEY_INPUT_0: False, self.KEY_INPUT_1: False}) self.__input_0_func = input_0_func self.__input_1_func = input_1_func self.__output_0_auto_off__ = output_0_auto_off if self.__output_0_auto_off__ is not None: self.__delayed_off__ = task.delayed(float(self.__output_0_auto_off__), self.__auto_off__, self.KEY_OUTPUT_0) # self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) self.add_callback(self.KEY_OUTPUT_0, self.__start_auto_off__, True) self.add_callback(self.KEY_OUTPUT_0, self.__stop_auto_off__, True) self.add_callback(self.KEY_OUTPUT_1, self.print_formatted, None) # self.add_callback(self.KEY_INPUT_0, self.__input_function__, None) self.add_callback(self.KEY_INPUT_1, self.__input_function__, None) def __rx__(self, client, userdata, message): value = payload_filter(message.payload) if message.topic.startswith(self.topic) and message.topic.endswith("/command"): key = '/'.join(message.topic.split('/')[-3:-1]) if value == 'toggle': self.__toggle_data__(key) else: self.__set_data__(key, value) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic + '/' + key, "on" if self.data.get(key) else "off") def __input_function__(self, device, key, data): if key == self.KEY_INPUT_0: func = self.__input_0_func elif key == self.KEY_INPUT_1: func = self.__input_1_func else: func = None if func == self.INPUT_FUNC_OUT1_FOLLOW: self.__set_data__(self.KEY_OUTPUT_0, data) elif func == self.INPUT_FUNC_OUT1_TRIGGER: self.__toggle_data__(self.KEY_OUTPUT_0) elif func == self.INPUT_FUNC_OUT2_FOLLOW: self.__set_data__(self.KEY_OUTPUT_1, data) elif func == self.INPUT_FUNC_OUT2_TRIGGER: self.__toggle_data__(self.KEY_OUTPUT_1) def __start_auto_off__(self, device, key, data): self.__stop_auto_off__(device, key, data) if self.__output_0_auto_off__ is not None: self.__delayed_off__.run() def __stop_auto_off__(self, device, key, data): if self.__output_0_auto_off__ is not None: if not self.__delayed_off__._stopped: self.__delayed_off__.stop() def __auto_off__(self, key): if key == self.KEY_OUTPUT_0: self.__set_data__(key, 'off') def __set_data__(self, key, value): self.store_data(**{key: value == "on"}) def __toggle_data__(self, key): if key in self.data: self.__set_data__(key, "on" if not self.data.get(key) else "off") def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.__toggle_data__(self.KEY_OUTPUT_0) elif command == self.COMMANDS[2]: self.print_formatted(self, self.KEY_OUTPUT_1, self.data.get(self.KEY_OUTPUT_1)) elif command == self.COMMANDS[3]: self.__toggle_data__(self.KEY_OUTPUT_1) elif command == self.COMMANDS[4]: self.print_formatted(self, self.KEY_INPUT_0, self.data.get(self.KEY_INPUT_0)) elif command == self.COMMANDS[5]: self.__toggle_data__(self.KEY_INPUT_0) elif command == self.COMMANDS[6]: self.print_formatted(self, self.KEY_INPUT_1, self.data.get(self.KEY_INPUT_1)) elif command == self.COMMANDS[7]: self.__toggle_data__(self.KEY_INPUT_1) elif command == self.COMMANDS[8]: self.__toggle_data__(self.KEY_INPUT_0) time.sleep(0.4) self.__set_data__(self.KEY_LONGPUSH_0, True) time.sleep(0.1) self.__set_data__(self.KEY_LONGPUSH_0, False) elif command == self.COMMANDS[9]: self.__toggle_data__(self.KEY_INPUT_1) time.sleep(0.4) self.__set_data__(self.KEY_LONGPUSH_1, True) time.sleep(0.1) self.__set_data__(self.KEY_LONGPUSH_1, False) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: info = (" - %ds" % self.__output_0_auto_off__) if self.__output_0_auto_off__ is not None and value else "" channel = "(%s%s)" % (self.names.get(key, key), info) print_light(COLOR_SHELLY, value, self.topic, channel) class my_powerplug(base): KEY_OUTPUT_0 = "state" COMMANDS = [ "get_output", "toggle_output", ] def __init__(self, mqtt_client, topic, channel): super().__init__(mqtt_client, topic + '/' + "output/%d" % (channel + 1)) # self.data[self.KEY_OUTPUT_0] = False self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) def __rx__(self, client, userdata, message): if message.topic == self.topic + '/set': payload = payload_filter(message.payload) if payload == "toggle": payload = not self.data.get(self.KEY_OUTPUT_0) self.store_data(**{self.KEY_OUTPUT_0: payload}) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic, json.dumps(self.data.get(key))) def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.store_data(**{self.KEY_OUTPUT_0: not self.data.get(self.KEY_OUTPUT_0)}) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: print_light(COLOR_POWERPLUG, value, self.topic, "(%s)" % self.names.get(key, "State")) class silvercrest_powerplug(base): KEY_OUTPUT_0 = "state" # COMMANDS = [ "get_state", "set_state", "unset_state", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) # self.store_data(**{self.KEY_OUTPUT_0: False}) def __rx__(self, client, userdata, message): if message.topic == self.topic + '/set': STATES = ["on", "off", "toggle"] # state = json.loads(message.payload).get('state').lower() if state in STATES: if state == STATES[0]: self.store_data(**{self.KEY_OUTPUT_0: True}) elif state == STATES[1]: self.store_data(**{self.KEY_OUTPUT_0: False}) else: self.store_data(**{not self.data.get(self.KEY_OUTPUT_0)}) def __tx__(self, keys_changed): for key in keys_changed: self.mqtt_client.send(self.topic + '/' + key, "on" if self.data.get(key) else "off") def command(self, command): if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.COMMANDS[1]: self.store_data(**{self.KEY_OUTPUT_0: True}) elif command == self.COMMANDS[2]: self.store_data(**{self.KEY_OUTPUT_0: False}) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: print_light(COLOR_POWERPLUG, value, self.topic, "(%s)" % self.names.get(key, key)) class tradfri_light(base): KEY_OUTPUT_0 = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" KEY_BRIGHTNESS_MOVE = "brightness_move" # STATE_COMMANDS = ("get_state", "toggle_state", ) BRIGHTNESS_COMMANDS = ("get_brightness", "set_brightness",) COLOR_TEMP_COMMANDS = ("get_color_temp", "set_color_temp",) def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, send_on_power_on=True): super().__init__(mqtt_client, topic) self.send_on_power_on = send_on_power_on self.add_callback(self.KEY_OUTPUT_0, self.print_formatted, None) self.add_callback(self.KEY_BRIGHTNESS, self.print_formatted, None) self.add_callback(self.KEY_COLOR_TEMP, self.print_formatted, None) # self.commands = [] if enable_state: self.commands.extend(self.STATE_COMMANDS) if enable_brightness: self.commands.extend(self.BRIGHTNESS_COMMANDS) if enable_color_temp: self.commands.extend(self.COLOR_TEMP_COMMANDS) self.__init_data__(enable_state, enable_brightness, enable_color_temp) def __init_data__(self, enable_state, enable_brightness, enable_color_temp): data = {} if enable_state: data[self.KEY_OUTPUT_0] = False self.commands.extend(self.STATE_COMMANDS) if enable_brightness: data[self.KEY_BRIGHTNESS] = 50 self.brightnes_move = (0, time.time()) self.commands.extend(self.BRIGHTNESS_COMMANDS) if enable_color_temp: data[self.KEY_COLOR_TEMP] = 5 self.commands.extend(self.COLOR_TEMP_COMMANDS) self.store_data(**data) def __rx__(self, client, userdata, message): data = json.loads(message.payload) if self.data.get(self.KEY_OUTPUT_0) or data.get(self.KEY_OUTPUT_0) in ['on', 'toggle']: if message.topic.startswith(self.topic) and message.topic.endswith('/set'): for targetkey in data: value = data[targetkey] if targetkey in self.data.keys(): if targetkey == self.KEY_OUTPUT_0: if value == "toggle": value = not self.data.get(self.KEY_OUTPUT_0) else: value = value == "on" elif targetkey == self.KEY_BRIGHTNESS: value = round((value - 1) / 2.53, 0) elif targetkey == self.KEY_COLOR_TEMP: value = round((value - 250) / 20.4, 0) self.store_data(**{targetkey: value}) else: if targetkey == self.KEY_BRIGHTNESS_MOVE: new_value = self.data.get(self.KEY_BRIGHTNESS) + (time.time() - self.brightnes_move[1]) * self.brightnes_move[0] if new_value < 0: new_value = 0 if new_value > 256: new_value = 256 self.store_data(**{self.KEY_BRIGHTNESS: int(new_value)}) self.brightnes_move = (value, time.time()) else: print("%s: UNKNOWN KEY %s" % (message.topic, targetkey)) elif message.topic == self.topic + '/get': self.__tx__(None) def __tx__(self, keys_changed): tx_data = copy.copy(self.data) if self.KEY_OUTPUT_0 in tx_data: tx_data[self.KEY_OUTPUT_0] = "on" if tx_data[self.KEY_OUTPUT_0] else "off" if self.KEY_BRIGHTNESS in tx_data: tx_data[self.KEY_BRIGHTNESS] = 1 + round(2.53 * tx_data[self.KEY_BRIGHTNESS], 0) if self.KEY_COLOR_TEMP in tx_data: tx_data[self.KEY_COLOR_TEMP] = 250 + round(20.4 * tx_data[self.KEY_COLOR_TEMP], 0) self.mqtt_client.send(self.topic, json.dumps(tx_data)) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.capabilities(): if command == self.STATE_COMMANDS[0]: self.print_formatted(self, self.KEY_OUTPUT_0, self.data.get(self.KEY_OUTPUT_0)) elif command == self.STATE_COMMANDS[1]: self.store_data(**{self.KEY_OUTPUT_0: not self.data.get(self.KEY_OUTPUT_0)}) elif command == self.BRIGHTNESS_COMMANDS[0]: self.print_formatted(self, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) elif command == self.BRIGHTNESS_COMMANDS[1]: self.store_data(**{self.KEY_BRIGHTNESS: command_int_value(value)}) elif command == self.COLOR_TEMP_COMMANDS[0]: self.print_formatted(self, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif command == self.COLOR_TEMP_COMMANDS[1]: self.store_data(**{self.KEY_COLOR_TEMP: command_int_value(value)}) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def power_off(self, device, key, value): self.data[self.KEY_OUTPUT_0] = False self.print_formatted(self, self.KEY_OUTPUT_0, False) def power_on(self, device, key, value): if self.send_on_power_on: self.store_data(**{self.KEY_OUTPUT_0: True}) else: self.data[self.KEY_OUTPUT_0] = True self.print_formatted(self, self.KEY_OUTPUT_0, True) def print_formatted(self, device, key, value): if value is not None: color = COLOR_LIGHT_ACTIVE if key == self.KEY_OUTPUT_0: print_light(COLOR_LIGHT_ACTIVE, value, self.topic, "") self.print_formatted(device, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) self.print_formatted(device, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: perc_value = round(value, 0) if key == self.KEY_BRIGHTNESS else round(10 * value, 0) print_percent( COLOR_LIGHT_PASSIVE if not self.data.get(self.KEY_OUTPUT_0) else COLOR_LIGHT_ACTIVE, 'B' if key == self.KEY_BRIGHTNESS else 'C', perc_value, "%3d%%" % perc_value, self.topic, "" ) class brennenstuhl_heating_valve(base): TEMP_RANGE = [10, 30] # KEY_TEMPERATURE_SETPOINT = "current_heating_setpoint" KEY_TEMPERATURE = "local_temperature" # COMMANDS = [ "get_temperature_setpoint", "set_temperature_setpoint", "set_local_temperature", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.store_data(**{ self.KEY_TEMPERATURE_SETPOINT: 20, self.KEY_TEMPERATURE: 20.7, }) self.add_callback(self.KEY_TEMPERATURE_SETPOINT, self.print_formatted, None) def __rx__(self, client, userdata, message): if message.topic.startswith(self.topic) and message.topic.endswith("/set"): payload = payload_filter(message.payload) self.store_data(**payload) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.COMMANDS: if command == self.COMMANDS[0]: self.print_formatted(self, self.KEY_TEMPERATURE_SETPOINT, self.data.get(self.KEY_TEMPERATURE_SETPOINT)) elif command == self.COMMANDS[1]: self.store_data(**{self.KEY_TEMPERATURE_SETPOINT: command_float_value(value)}) elif command == self.COMMANDS[2]: self.store_data(**{self.KEY_TEMPERATURE: command_float_value(value)}) def print_formatted(self, device, key, value): devicename = ' - '.join(self.topic.split('/')[1:]) if key == self.KEY_TEMPERATURE_SETPOINT: perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) perc = 100 if perc > 100 else perc perc = 0 if perc < 0 else perc print_percent(COLOR_HEATING_VALVE, '\u03d1', perc, "%4.1f°C" % value, self.topic, "") class videv_light(base): AUTOSEND = False # KEY_STATE = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" KEY_TIMER = "timer" # STATE_COMMANDS = ("get_state", "toggle_state", ) BRIGHTNESS_COMMANDS = ("get_brightness", "set_brightness", ) COLOR_TEMP_COMMANDS = ("get_color_temp", "set_color_temp", ) TIMER_COMMANDS = ("get_timer", ) def __init__(self, mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, enable_timer=False): super().__init__(mqtt_client, topic) self.enable_state = enable_state self.enable_brightness = enable_brightness self.enable_color_temp = enable_color_temp self.enable_timer = enable_timer # self.maxvalue = None # add commands to be available if enable_state: # init default value self.data[self.KEY_STATE] = False # add print callback self.add_callback(self.KEY_STATE, self.print_formatted, None) # add commands to be available self.commands.extend(self.STATE_COMMANDS) if enable_brightness: # init default value self.data[self.KEY_BRIGHTNESS] = 50 # add print callback self.add_callback(self.KEY_BRIGHTNESS, self.print_formatted, None) # add commands to be available self.commands.extend(self.BRIGHTNESS_COMMANDS) if enable_color_temp: # init default value self.data[self.KEY_COLOR_TEMP] = 5 # add print callback self.add_callback(self.KEY_COLOR_TEMP, self.print_formatted, None) # add commands to be available self.commands.extend(self.COLOR_TEMP_COMMANDS) if enable_timer: # init default value self.data[self.KEY_TIMER] = 0 # add print callback self.add_callback(self.KEY_TIMER, self.print_formatted, None) # add commands to be available self.commands.extend(self.TIMER_COMMANDS) def __rx__(self, client, userdata, message): value = payload_filter(message.payload) if message.topic.startswith(self.topic): targetkey = message.topic.split('/')[-1] if targetkey in self.data.keys(): self.store_data(**{targetkey: value}) elif targetkey != "__info__": print("Unknown key %s in %s::%s" % (targetkey, message.topic, self.__class__.__name__)) elif message.topic == self.topic + '/get': self.__tx__(None) def send(self, key, data): if data is not None: topic = self.topic + '/' + key self.mqtt_client.send(topic, json.dumps(data)) def command(self, command): try: command, value = command.split(' ') except ValueError: value = None if command in self.capabilities(): if command == self.STATE_COMMANDS[0]: self.print_formatted(self, self.KEY_STATE, self.data.get(self.KEY_STATE)) elif command == self.STATE_COMMANDS[1]: self.send(self.KEY_STATE, not self.data.get(self.KEY_STATE)) elif command == self.BRIGHTNESS_COMMANDS[0]: self.print_formatted(self, self.KEY_BRIGHTNESS, self.data.get(self.KEY_BRIGHTNESS)) elif command == self.BRIGHTNESS_COMMANDS[1]: self.send(self.KEY_BRIGHTNESS, command_int_value(value)) elif command == self.COLOR_TEMP_COMMANDS[0]: self.print_formatted(self, self.KEY_COLOR_TEMP, self.data.get(self.KEY_COLOR_TEMP)) elif command == self.COLOR_TEMP_COMMANDS[1]: self.send(self.KEY_COLOR_TEMP, command_int_value(value)) elif command == self.TIMER_COMMANDS[0]: self.print_formatted(self, self.KEY_TIMER, self.data.get(self.KEY_TIMER)) else: print("%s: not yet implemented!" % command) else: print("Unknown command!") def print_formatted(self, device, key, value): if value is not None: device = " - ".join(self.topic.split('/')[1:]) if key == self.KEY_STATE: print_switch(COLOR_GUI_ACTIVE, value, self.topic, "") elif key in [self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: perc_value = round(value * 10 if key == self.KEY_COLOR_TEMP else value, 0) print_percent( COLOR_GUI_ACTIVE, 'B' if key == self.KEY_BRIGHTNESS else 'C', perc_value, "%3d%%" % perc_value, self.topic, "" ) elif key == self.KEY_TIMER: if value > 0: if self.maxvalue is None and value != 0: self.maxvalue = value disp_value = value try: perc = disp_value / self.maxvalue * 100 except ZeroDivisionError: perc = 0 else: disp_value = 0 perc = 0 self.maxvalue = None print_percent(COLOR_GUI_ACTIVE, 't', perc, '%3d%%' % perc, self.topic, '(%.1f)' % disp_value) # class silvercrest_motion_sensor(base): # KEY_OCCUPANCY = "occupancy" # COMMANDS = ['motion'] # def __init__(self, mqtt_client, topic): # super().__init__(mqtt_client, topic) # self.data[self.KEY_OCCUPANCY] = False # self.add_callback(self.KEY_OCCUPANCY, self.print_formatted, None) # def __rx__(self, client, userdata, message): # pass # def command(self, command): # try: # command, value = command.split(' ') # except ValueError: # value = None # else: # value = json.loads(value) # if command == self.COMMANDS[0]: # self.store_data(**{self.KEY_OCCUPANCY: True}) # time.sleep(value or 10) # self.store_data(**{self.KEY_OCCUPANCY: False}) # def print_formatted(self, device, key, value): # if value is not None: # print_light(COLOR_MOTION_SENSOR, value, self.topic, "") # class tradfri_button(base): # KEY_ACTION = "action" # # # ACTION_TOGGLE = "toggle" # ACTION_BRIGHTNESS_UP = "brightness_up_click" # ACTION_BRIGHTNESS_DOWN = "brightness_down_click" # ACTION_RIGHT = "arrow_right_click" # ACTION_LEFT = "arrow_left_click" # ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" # ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" # ACTION_RIGHT_LONG = "arrow_right_hold" # ACTION_LEFT_LONG = "arrow_left_hold" # # # COMMANDS = [ACTION_TOGGLE, ACTION_LEFT, ACTION_RIGHT, ACTION_BRIGHTNESS_UP, ACTION_BRIGHTNESS_DOWN, # ACTION_LEFT_LONG, ACTION_RIGHT_LONG, ACTION_BRIGHTNESS_UP_LONG, ACTION_BRIGHTNESS_DOWN_LONG] # def __init__(self, mqtt_client, topic): # super().__init__(mqtt_client, topic) # def __rx__(self, client, userdata, message): # pass # def command(self, command): # try: # command, value = command.split(' ') # except ValueError: # value = None # else: # value = json.loads(value) # if command in self.capabilities(): # action = self.COMMANDS[self.COMMANDS.index(command)] # if self.COMMANDS.index(command) <= 4: # self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) # elif self.COMMANDS.index(command) <= 8: # self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) # time.sleep(value or 0.5) # action = '_'.join(action.split('_')[:-1] + ['release']) # self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) # class remote(base): # def __rx__(self, client, userdata, message): # if message.topic == self.topic + "/VOLUP": # if payload_filter(message.payload): # icon = u'\u1403' # else: # icon = u'\u25a1' # elif message.topic == self.topic + "/VOLDOWN": # if payload_filter(message.payload): # icon = u'\u1401' # else: # icon = u'\u25a1' # else: # return # devicename = ' - '.join(self.topic.split('/')[1:-1]) # print(COLOR_REMOTE + 10 * ' ' + icon + 6 * ' ' + devicename + colored.attr("reset")) # class gui_heating_valve(base): # AUTOSEND = False # # # TEMP_RANGE = [10, 30] # # # KEY_TIMER = "timer" # KEY_TEMPERATURE = "temperature" # KEY_SETPOINT_TEMP = "setpoint_temp" # KEY_SETPOINT_TO_DEFAULT = "setpoint_to_default" # KEY_BOOST = 'boost' # KEY_AWAY = "away" # KEY_SUMMER = "summer" # KEY_ENABLE = "enable" # # # COMMANDS = [ # "get_temperature", # "get_temperature_setpoint", "set_temperature_setpoint", # "trigger_boost", "trigger_setpoint_to_default", # "toggle_away", "toggle_summer", # ] # def __init__(self, mqtt_client, topic): # super().__init__(mqtt_client, topic) # self.add_callback(self.KEY_SETPOINT_TEMP, self.print_formatted, None) # self.add_callback(self.KEY_TIMER, self.print_formatted, None) # self.add_callback(self.KEY_AWAY, self.print_formatted, None) # self.add_callback(self.KEY_SUMMER, self.print_formatted, None) # # # self.store_data(**{ # self.KEY_TEMPERATURE: 20.7, # self.KEY_SETPOINT_TEMP: 20, # self.KEY_TIMER: 0, # self.KEY_AWAY: False, # self.KEY_SUMMER: False, # self.KEY_ENABLE: True # }) # def __rx__(self, client, userdata, message): # value = payload_filter(message.payload) # if message.topic.startswith(self.topic) and message.topic.endswith('/set'): # targetkey = message.topic.split('/')[-2] # if targetkey in self.data.keys(): # self.store_data(**{targetkey: value}) # else: # print("Unknown key %s in %s::%s" % (targetkey, message.topic, self.__class__.__name__)) # elif message.topic == self.topic + '/get': # self.__tx__(None) # def send(self, key, data): # if data is not None: # topic = self.topic + '/' + key # self.mqtt_client.send(topic, json.dumps(data)) # def command(self, command): # try: # command, value = command.split(' ') # except ValueError: # value = None # if command in self.COMMANDS: # if command == self.COMMANDS[0]: # self.print_formatted(self, self.KEY_TEMPERATURE, self.data.get(self.KEY_TEMPERATURE)) # elif command == self.COMMANDS[1]: # self.print_formatted(self, self.KEY_SETPOINT_TEMP, self.data.get(self.KEY_SETPOINT_TEMP)) # elif command == self.COMMANDS[2]: # self.send(self.KEY_SETPOINT_TEMP, command_float_value(value)) # elif command == self.COMMANDS[3]: # self.send(self.KEY_BOOST, True) # elif command == self.COMMANDS[4]: # self.send(self.KEY_SETPOINT_TO_DEFAULT, True) # elif command == self.COMMANDS[5]: # self.send(self.KEY_AWAY, not self.data.get(self.KEY_AWAY)) # elif command == self.COMMANDS[6]: # self.send(self.KEY_SUMMER, not self.data.get(self.KEY_SUMMER)) # def print_formatted(self, device, key, value): # devicename = ' - '.join(self.topic.split('/')[1:]) # if key == self.KEY_TIMER: # value /= 60 # try: # perc = 100 * value / 60 # except TypeError: # value = 0 # perc = 0 # print_percent(COLOR_GUI_ACTIVE, 't', perc, "%4.1fmin" % value, self.topic, "(Timer)") # elif key == self.KEY_TEMPERATURE: # perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) # perc = 100 if perc > 100 else perc # perc = 0 if perc < 0 else perc # print_percent(COLOR_GUI_ACTIVE, '\u03d1', perc, "%4.1f°C" % value, self.topic, "(Temperature)") # elif key == self.KEY_SETPOINT_TEMP: # perc = 100 * (value - self.TEMP_RANGE[0]) / (self.TEMP_RANGE[1] - self.TEMP_RANGE[0]) # perc = 100 if perc > 100 else perc # perc = 0 if perc < 0 else perc # print_percent(COLOR_GUI_ACTIVE if self.data.get(self.KEY_ENABLE) else COLOR_GUI_PASSIVE, # '\u03d1', perc, "%4.1f°C" % value, self.topic, "(Setpoint)") # elif key == self.KEY_AWAY: # print_switch(COLOR_GUI_ACTIVE, value, self.topic, "(Away Mode)") # elif key == self.KEY_SUMMER: # print_switch(COLOR_GUI_ACTIVE, value, self.topic, "(Summer Mode)")