#!/usr/bin/env python # -*- coding: utf-8 -*- # """ devices (DEVICES) ================= **Author:** * Dirk Alders **Description:** This Module supports smarthome devices **Submodules:** * :mod:`shelly` * :mod:`silvercrest_powerplug` **Unittest:** See also the :download:`unittest ` documentation. **Module Documentation:** """ import json import logging import task try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' BATTERY_WARN_LEVEL = 5 def is_json(data): try: json.loads(data) except json.decoder.JSONDecodeError: return False else: return True class group(object): def __init__(self, *args): super().__init__() self._members = args self._iter_counter = 0 # self.methods = [] for method in [m for m in args[0].__class__.__dict__.keys()]: if not method.startswith('_') and callable(getattr(args[0], method)): # add all public callable attributes to the list self.methods.append(method) # for member in self: methods = [m for m in member.__class__.__dict__.keys() if not m.startswith( '_') if not m.startswith('_') and callable(getattr(args[0], m))] if self.methods != methods: raise ValueError("All given instances needs to have same attributes:", self.methods, methods) def __iter__(self): return self def __next__(self): if self._iter_counter < len(self): self._iter_counter += 1 return self._members[self._iter_counter - 1] self._iter_counter = 0 raise StopIteration def __getitem__(self, i): return self._members[i] def __len__(self): return len(self._members) def __getattribute__(self, name): def group_execution(*args, **kwargs): for member in self[:]: m = getattr(member, name) m(*args, **kwargs) try: rv = super().__getattribute__(name) except AttributeError: return group_execution else: return rv class base(dict): TX_TOPIC = "set" TX_VALUE = 0 TX_DICT = 1 TX_TYPE = -1 TX_FILTER_DATA_KEYS = [] # RX_KEYS = [] RX_IGNORE_TOPICS = [] RX_IGNORE_KEYS = [] RX_FILTER_DATA_KEYS = [] def __init__(self, mqtt_client, topic): # data storage self.mqtt_client = mqtt_client self.topic = topic self.logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) for entry in self.topic.split('/'): self.logger = self.logger.getChild(entry) # initialisations dict.__init__(self) mqtt_client.add_callback( topic=self.topic, callback=self.receive_callback) mqtt_client.add_callback( topic=self.topic+"/#", callback=self.receive_callback) # self.callback_list = [] self.warning_callback = None # self.__previous__ = {} def receive_callback(self, client, userdata, message): self.unpack(message) def unpack_filter(self, key): if key in self.RX_FILTER_DATA_KEYS: if self.get(key) == 1 or self.get(key) == 'on' or self.get(key) == 'ON': self[key] = True elif self.get(key) == 0 or self.get(key) == 'off' or self.get(key) == 'OFF': self[key] = False def unpack_single_value(self, key, data): prev_value = self.get(key) if key in self.RX_KEYS: self[key] = data self.__previous__[key] = prev_value # Filter, if needed self.unpack_filter(key) self.logger.debug("Received data %s - %s", key, str(self.get(key))) self.callback_caller(key, self[key], self.get(key) != self.__previous__.get(key)) elif key not in self.RX_IGNORE_KEYS: self.logger.warning('Got a message with unparsed content: "%s - %s"', key, str(data)) else: self.logger.debug("Ignoring key %s", key) def unpack(self, message): content_key = message.topic[len(self.topic) + 1:] if content_key not in self.RX_IGNORE_TOPICS and (not message.topic.endswith(self.TX_TOPIC) or len(self.TX_TOPIC) == 0): self.logger.debug("Unpacking content_key \"%s\" from message.", content_key) if is_json(message.payload): data = json.loads(message.payload) if type(data) is dict: for key in data: self.unpack_single_value(key, data[key]) else: self.unpack_single_value(content_key, data) # String else: self.unpack_single_value( content_key, message.payload.decode('utf-8')) self.warning_caller() else: self.logger.debug("Ignoring topic %s", content_key) def pack_filter(self, key, data): if key in self.TX_FILTER_DATA_KEYS: if data is True: return "on" elif data is False: return "off" else: return data return data def set(self, key, data): self.pack(key, data) def pack(self, key, data): data = self.pack_filter(key, data) if self.TX_TOPIC is not None: if self.TX_TYPE < 0: self.logger.error("Unknown tx type. Set TX_TYPE of class to a known value") else: self.logger.debug("Sending data for %s - %s", key, str(data)) if self.TX_TYPE == self.TX_DICT: self.mqtt_client.send('/'.join([self.topic, self.TX_TOPIC]), json.dumps({key: data})) else: if type(data) not in [str, bytes]: data = json.dumps(data) self.mqtt_client.send('/'.join([self.topic, key, self.TX_TOPIC] if len(self.TX_TOPIC) > 0 else [self.topic, key]), data) else: self.logger.error("Unknown tx toptic. Set TX_TOPIC of class to a known value") def add_callback(self, key, data, callback, on_change_only=False): """ key: key or None for all keys data: data or None for all data """ cb_tup = (key, data, callback, on_change_only) if cb_tup not in self.callback_list: self.callback_list.append(cb_tup) def add_warning_callback(self, callback): self.warning_callback = callback def warning_call_condition(self): return False def callback_caller(self, key, data, value_changed): for cb_key, cb_data, callback, on_change_only in self.callback_list: if (cb_key == key or cb_key is None) and (cb_data == data or cb_data is None) and callback is not None: if not on_change_only or value_changed: callback(self, key, data) def warning_caller(self): if self.warning_call_condition(): warn_txt = self.warning_text() self.logger.warning(warn_txt) if self.warning_callback is not None: self.warning_callback(self, warn_txt) def warning_text(self): return "default warning text - replace parent warning_text function" def previous_value(self, key): return self.__previous__.get(key) class shelly(base): KEY_OUTPUT_0 = "relay/0" KEY_OUTPUT_1 = "relay/1" KEY_INPUT_0 = "input/0" KEY_INPUT_1 = "input/1" KEY_LONGPUSH_0 = "longpush/0" KEY_LONGPUSH_1 = "longpush/1" KEY_TEMPERATURE = "temperature" KEY_OVERTEMPERATURE = "overtemperature" KEY_ID = "id" KEY_MODEL = "model" KEY_MAC = "mac" KEY_IP = "ip" KEY_NEW_FIRMWARE = "new_fw" KEY_FIRMWARE_VERSION = "fw_ver" # TX_TOPIC = "command" TX_TYPE = base.TX_VALUE TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1] # RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE, KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION] RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1'] RX_IGNORE_KEYS = ['temperature_f'] RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # self.output_key_delayed = None self.delayed_flash_task = task.delayed(0.3, self.flash_task) self.delayed_off_task = task.delayed(0.3, self.off_task) # self.all_off_requested = False def flash_task(self, *args): if self.flash_active: self.pack(self.output_key_delayed, not self.get(self.output_key_delayed)) self.output_key_delayed = None if self.all_off_requested: self.delayed_off_task.run() def off_task(self, *args): self.all_off() @property def flash_active(self): return self.output_key_delayed is not None # # WARNING CALL # def warning_call_condition(self): return self.get(self.KEY_OVERTEMPERATURE) def warning_text(self): if self.overtemperature: if self.temperature is not None: return "Overtemperature detected for %s. Temperature was %.1f°C." % (self.topic, self.temperature) else: return "Overtemperature detected for %s." % self.topic # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def output_1(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_1) @property def input_0(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_0) @property def input_1(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_1) @property def longpush_0(self): """rv: [True, False]""" return self.get(self.KEY_LONGPUSH_0) @property def longpush_1(self): """rv: [True, False]""" return self.get(self.KEY_LONGPUSH_1) @property def temperature(self): """rv: numeric value""" return self.get(self.KEY_TEMPERATURE) # # TX # def set_output_0(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data)) self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.logger.info("Toggeling output 0") self.set_output_0('toggle') def set_output_1(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_1, state) def set_output_1_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data)) self.set_output_1(data) def toggle_output_1_mcb(self, device, key, data): self.logger.info("Toggeling output 1") self.set_output_1('toggle') def flash_0_mcb(self, device, key, data): self.output_key_delayed = self.KEY_OUTPUT_0 self.toggle_output_0_mcb(device, key, data) self.delayed_flash_task.run() def flash_1_mcb(self, device, key, data): self.output_key_delayed = self.KEY_OUTPUT_1 self.toggle_output_1_mcb(device, key, data) self.delayed_flash_task.run() def all_off(self): if self.flash_active: self.all_off_requested = True else: self.set_output_0(False) self.set_output_1(False) class silvercrest_powerplug(base): KEY_LINKQUALITY = "linkquality" KEY_OUTPUT_0 = "state" # TX_TYPE = base.TX_DICT TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0] # RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0] RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def linkquality(self): """rv: numeric value""" return self.get(self.KEY_LINKQUALITY) # # TX # def set_output_0(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data)) self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.logger.info("Toggeling output 0") self.set_output_0('toggle') def all_off(self): self.set_output_0(False) class silvercrest_motion_sensor(base): KEY_BATTERY = "battery" KEY_BATTERY_LOW = "battery_low" KEY_LINKQUALITY = "linkquality" KEY_OCCUPANCY = "occupancy" KEY_UNMOUNTED = "tamper" KEY_VOLTAGE = "voltage" # RX_KEYS = [KEY_BATTERY, KEY_BATTERY_LOW, KEY_LINKQUALITY, KEY_OCCUPANCY, KEY_UNMOUNTED, KEY_VOLTAGE] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) def warning_call_condition(self): return self.get(self.KEY_BATTERY_LOW) def warning_text(self, data): return "Battery low: level=%d" % self.get(self.KEY_BATTERY) # # RX # @property def linkquality(self): """rv: numeric value""" return self.get(self.KEY_LINKQUALITY) class my_powerplug(base): KEY_OUTPUT_0 = "output/1" KEY_OUTPUT_1 = "output/2" KEY_OUTPUT_2 = "output/3" KEY_OUTPUT_3 = "output/4" KEY_OUTPUT_ALL = "output/all" KEY_OUTPUT_LIST = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3] # TX_TYPE = base.TX_VALUE # RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def output_1(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_1) @property def output_2(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_2) @property def output_3(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_3) # # TX # def set_output(self, key, state): if key in self.KEY_OUTPUT_LIST: self.pack(key, state) else: logging.error("Unknown key to set the output!") def set_output_0(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data)) self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.logger.info("Toggeling output 0") self.set_output_0('toggle') def set_output_1(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_1, state) def set_output_1_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data)) self.set_output_1(data) def toggle_output_1_mcb(self, device, key, data): self.logger.info("Toggeling output 1") self.set_output_1('toggle') def set_output_2(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_2, state) def set_output_2_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_2 else logging.DEBUG, "Changing output 2 to %s", str(data)) self.set_output_2(data) def toggle_output_2_mcb(self, device, key, data): self.logger.info("Toggeling output 2") self.set_output_2('toggle') def set_output_3(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_3, state) def set_output_3_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_3 else logging.DEBUG, "Changing output 3 to %s", str(data)) self.set_output_3(data) def toggle_output_3_mcb(self, device, key, data): self.logger.info("Toggeling output 3") self.set_output_3('toggle') def set_output_all(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_ALL, state) def set_output_all_mcb(self, device, key, data): self.logger.info("Changing all outputs to %s", str(data)) self.set_output_all(data) def toggle_output_all_mcb(self, device, key, data): self.logger.info("Toggeling all outputs") self.set_output_0('toggle') def all_off(self): self.set_output_all(False) class tradfri_light(base): KEY_LINKQUALITY = "linkquality" KEY_OUTPUT_0 = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" KEY_BRIGHTNESS_FADE = "brightness_move" # TX_TYPE = base.TX_DICT TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP, KEY_BRIGHTNESS_FADE] # RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] RX_IGNORE_KEYS = ['update', 'color_mode', 'color_temp_startup'] RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) def unpack_filter(self, key): if key == self.KEY_BRIGHTNESS: self[key] = round((self[key] - 1) * 100 / 253, 0) elif key == self.KEY_COLOR_TEMP: self[key] = round((self[key] - 250) * 10 / 204, 0) else: super().unpack_filter(key) def pack_filter(self, key, data): if key == self.KEY_BRIGHTNESS: return round(data * 253 / 100 + 1, 0) elif key == self.KEY_COLOR_TEMP: return round(data * 204 / 10 + 250, 0) else: return super().pack_filter(key, data) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0, False) @property def linkquality(self): """rv: numeric value""" return self.get(self.KEY_LINKQUALITY, 0) @property def brightness(self): """rv: numeric value [0%, ..., 100%]""" return self.get(self.KEY_BRIGHTNESS, 0) @property def color_temp(self): """rv: numeric value [0, ..., 10]""" return self.get(self.KEY_COLOR_TEMP, 0) # # TX # def request_data(self, device=None, key=None, data=None): self.mqtt_client.send(self.topic + "/get", '{"%s": ""}' % self.KEY_OUTPUT_0) def set_output_0(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data)) self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.logger.info("Toggeling output 0") self.set_output_0('toggle') def set_brightness(self, brightness): """brightness: [0, ..., 100]""" self.pack(self.KEY_BRIGHTNESS, brightness) def set_brightness_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.brightness else logging.DEBUG, "Changing brightness to %s", str(data)) self.set_brightness(data) def default_inc(self, speed=40): self.pack(self.KEY_BRIGHTNESS_FADE, speed) def default_dec(self, speed=-40): self.default_inc(speed) def default_stop(self): self.default_inc(0) def set_color_temp(self, color_temp): """color_temp: [0, ..., 10]""" self.pack(self.KEY_COLOR_TEMP, color_temp) def set_color_temp_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.color_temp else logging.DEBUG, "Changing color temperature to %s", str(data)) self.set_color_temp(data) def all_off(self): self.set_output_0(False) class tradfri_button(base): ACTION_TOGGLE = "toggle" ACTION_BRIGHTNESS_UP = "brightness_up_click" ACTION_BRIGHTNESS_DOWN = "brightness_down_click" ACTION_RIGHT = "arrow_right_click" ACTION_LEFT = "arrow_left_click" ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" ACTION_BRIGHTNESS_UP_RELEASE = "brightness_up_release" ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" ACTION_BRIGHTNESS_DOWN_RELEASE = "brightness_down_release" ACTION_RIGHT_LONG = "arrow_right_hold" ACTION_RIGHT_RELEASE = "arrow_right_release" ACTION_LEFT_LONG = "arrow_left_hold" ACTION_LEFT_RELEASE = "arrow_left_release" # KEY_LINKQUALITY = "linkquality" KEY_BATTERY = "battery" KEY_ACTION = "action" KEY_ACTION_DURATION = "action_duration" # RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_ACTION] RX_IGNORE_KEYS = ['update', KEY_ACTION_DURATION] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # # RX # @property def action(self): """rv: action_txt""" return self.get(self.KEY_ACTION) # # WARNING CALL # def warning_call_condition(self): return self.get(self.KEY_BATTERY) is not None and self.get(self.KEY_BATTERY) <= BATTERY_WARN_LEVEL def warning_text(self): return "Low battery level detected for %s. Battery level was %.0f%%." % (self.topic, self.get(self.KEY_BATTERY)) class brennenstuhl_heatingvalve(base): KEY_LINKQUALITY = "linkquality" KEY_BATTERY = "battery" KEY_HEATING_SETPOINT = "current_heating_setpoint" KEY_TEMPERATURE = "local_temperature" # KEY_AWAY_MODE = "away_mode" KEY_CHILD_LOCK = "child_lock" KEY_PRESET = "preset" KEY_SYSTEM_MODE = "system_mode" KEY_VALVE_DETECTION = "valve_detection" KEY_WINDOW_DETECTION = "window_detection" # TX_TYPE = base.TX_DICT # RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_HEATING_SETPOINT, KEY_TEMPERATURE] RX_IGNORE_KEYS = [KEY_AWAY_MODE, KEY_CHILD_LOCK, KEY_PRESET, KEY_SYSTEM_MODE, KEY_VALVE_DETECTION, KEY_WINDOW_DETECTION] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.mqtt_client.send(self.topic + '/' + self.TX_TOPIC, json.dumps({self.KEY_WINDOW_DETECTION: "ON", self.KEY_CHILD_LOCK: "UNLOCK", self.KEY_VALVE_DETECTION: "ON", self.KEY_SYSTEM_MODE: "heat", self.KEY_PRESET: "manual"})) def warning_call_condition(self): return self.get(self.KEY_BATTERY, 100) <= BATTERY_WARN_LEVEL def warning_text(self): return "Low battery level detected for %s. Battery level was %.0f%%." % (self.topic, self.get(self.KEY_BATTERY)) # # RX # @property def linkqulity(self): return self.get(self.KEY_LINKQUALITY) @property def heating_setpoint(self): return self.get(self.KEY_HEATING_SETPOINT) @property def temperature(self): return self.get(self.KEY_TEMPERATURE) # # TX # def set_heating_setpoint(self, setpoint): self.pack(self.KEY_HEATING_SETPOINT, setpoint) def set_heating_setpoint_mcb(self, device, key, data): self.logger.info("Changing heating setpoint to %s", str(data)) self.set_heating_setpoint(data) class remote(base): KEY_CD = "CD" KEY_LINE1 = "LINE1" KEY_LINE3 = "LINE3" KEY_MUTE = "MUTE" KEY_POWER = "POWER" KEY_VOLDOWN = "VOLDOWN" KEY_VOLUP = "VOLUP" # TX_TOPIC = '' TX_TYPE = base.TX_VALUE # RX_IGNORE_TOPICS = [KEY_CD, KEY_LINE1, KEY_LINE3, KEY_MUTE, KEY_POWER, KEY_VOLUP, KEY_VOLDOWN] def set_cd(self, device=None, key=None, data=None): self.pack(self.KEY_CD, None) def set_line1(self, device=None, key=None, data=None): self.pack(self.KEY_LINE1, None) def set_line3(self, device=None, key=None, data=None): self.pack(self.KEY_LINE3, None) def set_mute(self, device=None, key=None, data=None): self.pack(self.KEY_MUTE, None) def set_power(self, device=None, key=None, data=None): self.pack(self.KEY_POWER, None) def set_volume_up(self, data=False): """data: [True, False]""" self.pack(self.KEY_VOLUP, data) def set_volume_down(self, data=False): """data: [True, False]""" self.pack(self.KEY_VOLDOWN, data) def default_inc(self, device=None, key=None, data=None): self.set_volume_up(True) def default_dec(self, device=None, key=None, data=None): self.set_volume_down(True) def default_stop(self, device=None, key=None, data=None): self.set_volume_up(False) class status(base): KEY_STATE = "state" # TX_TYPE = base.TX_VALUE # RX_KEYS = [KEY_STATE] def set_state(self, num, data): """data: [True, False]""" self.pack(self.KEY_STATE + "/" + str(num), data) def set_state_mcb(self, device, key, data): self.logger.info("Changing state to %s", str(data)) self.set_state(data) class audio_status(status): KEY_TITLE = "title" # RX_KEYS = [status.KEY_STATE, KEY_TITLE]