#!/usr/bin/env python # -*- coding: utf-8 -*- # """ devices (DEVICES) ================= **Author:** * Dirk Alders **Description:** This Module supports smarthome devices **Submodules:** * :mod:`shelly` * :mod:`silvercrest_powerplug` **Unittest:** See also the :download:`unittest ` documentation. **Module Documentation:** """ from base import mqtt_base from function.videv import base as videv_base import json import logging import task try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' BATTERY_WARN_LEVEL = 5 def is_json(data): try: json.loads(data) except json.decoder.JSONDecodeError: return False else: return True class group(object): def __init__(self, *args): super().__init__() self._members = args self._iter_counter = 0 # self.methods = [] for method in [m for m in args[0].__class__.__dict__.keys()]: if not method.startswith('_') and callable(getattr(args[0], method)): # add all public callable attributes to the list self.methods.append(method) # for member in self: methods = [m for m in member.__class__.__dict__.keys() if not m.startswith( '_') if not m.startswith('_') and callable(getattr(args[0], m))] if self.methods != methods: raise ValueError("All given instances needs to have same attributes:", self.methods, methods) def __iter__(self): return self def __next__(self): if self._iter_counter < len(self): self._iter_counter += 1 return self._members[self._iter_counter - 1] self._iter_counter = 0 raise StopIteration def __getitem__(self, i): return self._members[i] def __len__(self): return len(self._members) def __getattribute__(self, name): def group_execution(*args, **kwargs): for member in self[:]: m = getattr(member, name) m(*args, **kwargs) try: rv = super().__getattribute__(name) except AttributeError: return group_execution else: return rv class base(mqtt_base): TX_TOPIC = "set" TX_VALUE = 0 TX_DICT = 1 TX_TYPE = -1 TX_FILTER_DATA_KEYS = [] # RX_KEYS = [] RX_IGNORE_TOPICS = [] RX_IGNORE_KEYS = [] RX_FILTER_DATA_KEYS = [] # KEY_WARNING = '__WARNING__' def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic, default_values=dict.fromkeys(self.RX_KEYS)) # data storage # initialisations mqtt_client.add_callback(topic=self.topic, callback=self.receive_callback) mqtt_client.add_callback(topic=self.topic+"/#", callback=self.receive_callback) def set(self, key, data, block_callback=[]): if key in self.RX_IGNORE_KEYS: pass # ignore these keys elif key in self.RX_KEYS: return super().set(key, data, block_callback) else: self.logger.warning("Unexpected key %s", key) def receive_callback(self, client, userdata, message): if message.topic != self.topic + '/' + videv_base.KEY_INFO: content_key = message.topic[len(self.topic) + 1:] if content_key not in self.RX_IGNORE_TOPICS and (not message.topic.endswith(self.TX_TOPIC) or len(self.TX_TOPIC) == 0): self.logger.debug("Unpacking content_key \"%s\" from message.", content_key) if is_json(message.payload): data = json.loads(message.payload) if type(data) is dict: for key in data: self.set(key, self.__device_to_instance_filter__(key, data[key])) else: self.set(content_key, self.__device_to_instance_filter__(content_key, data)) # String else: self.set(content_key, self.__device_to_instance_filter__(content_key, message.payload.decode('utf-8'))) else: self.logger.debug("Ignoring topic %s", content_key) def __device_to_instance_filter__(self, key, data): if key in self.RX_FILTER_DATA_KEYS: if data in [1, 'on', 'ON']: return True elif data in [0, 'off', 'OFF']: return False return data def __instance_to_device_filter__(self, key, data): if key in self.TX_FILTER_DATA_KEYS: if data is True: return "on" elif data is False: return "off" return data def send_command(self, key, data): data = self.__instance_to_device_filter__(key, data) if self.TX_TOPIC is not None: if self.TX_TYPE < 0: self.logger.error("Unknown tx type. Set TX_TYPE of class to a known value") else: self.logger.debug("Sending data for %s - %s", key, str(data)) if self.TX_TYPE == self.TX_DICT: self.mqtt_client.send('/'.join([self.topic, self.TX_TOPIC]), json.dumps({key: data})) else: if type(data) not in [str, bytes]: data = json.dumps(data) self.mqtt_client.send('/'.join([self.topic, key, self.TX_TOPIC] if len(self.TX_TOPIC) > 0 else [self.topic, key]), data) else: self.logger.error("Unknown tx toptic. Set TX_TOPIC of class to a known value") class shelly(base): KEY_OUTPUT_0 = "relay/0" KEY_OUTPUT_1 = "relay/1" KEY_INPUT_0 = "input/0" KEY_INPUT_1 = "input/1" KEY_LONGPUSH_0 = "longpush/0" KEY_LONGPUSH_1 = "longpush/1" KEY_TEMPERATURE = "temperature" KEY_OVERTEMPERATURE = "overtemperature" KEY_ID = "id" KEY_MODEL = "model" KEY_MAC = "mac" KEY_IP = "ip" KEY_NEW_FIRMWARE = "new_fw" KEY_FIRMWARE_VERSION = "fw_ver" # TX_TOPIC = "command" TX_TYPE = base.TX_VALUE TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1] # RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE, KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION] RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1'] RX_IGNORE_KEYS = ['temperature_f'] RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # self.output_key_delayed = None self.delayed_flash_task = task.delayed(0.3, self.flash_task) self.delayed_off_task = task.delayed(0.3, self.off_task) # self.add_callback(self.KEY_OVERTEMPERATURE, True, self.__warning__, True) # self.all_off_requested = False def flash_task(self, *args): if self.flash_active: self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed)) self.output_key_delayed = None if self.all_off_requested: self.delayed_off_task.run() def off_task(self, *args): self.all_off() @property def flash_active(self): return self.output_key_delayed is not None # # WARNING CALL # def __warning__(self, client, key, data): pass # TODO: implement warning feedback (key: KEY_OVERTEMPERATURE - info: KEY_TEMPERATURE) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def output_1(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_1) @property def input_0(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_0) @property def input_1(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_1) @property def longpush_0(self): """rv: [True, False]""" return self.get(self.KEY_LONGPUSH_0) @property def longpush_1(self): """rv: [True, False]""" return self.get(self.KEY_LONGPUSH_1) @property def temperature(self): """rv: numeric value""" return self.get(self.KEY_TEMPERATURE) # # TX # def set_output_0(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data)) self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.logger.info("Toggeling output 0") self.set_output_0(not self.output_0) def set_output_1(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_1, state) def set_output_1_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data)) self.set_output_1(data) def toggle_output_1_mcb(self, device, key, data): self.logger.info("Toggeling output 1") self.set_output_1(not self.output_1) def flash_0_mcb(self, device, key, data): self.output_key_delayed = self.KEY_OUTPUT_0 self.toggle_output_0_mcb(device, key, data) self.delayed_flash_task.run() def flash_1_mcb(self, device, key, data): self.output_key_delayed = self.KEY_OUTPUT_1 self.toggle_output_1_mcb(device, key, data) self.delayed_flash_task.run() def all_off(self): if self.flash_active: self.all_off_requested = True else: if self.output_0: self.set_output_0(False) if self.output_1: self.set_output_1(False) class silvercrest_powerplug(base): KEY_LINKQUALITY = "linkquality" KEY_OUTPUT_0 = "state" # TX_TYPE = base.TX_DICT TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0] # RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0] RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def linkquality(self): """rv: numeric value""" return self.get(self.KEY_LINKQUALITY) # # TX # def set_output_0(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data)) self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.logger.info("Toggeling output 0") self.set_output_0(not self.output_0) def all_off(self): if self.output_0: self.set_output_0(False) class silvercrest_motion_sensor(base): KEY_BATTERY = "battery" KEY_BATTERY_LOW = "battery_low" KEY_LINKQUALITY = "linkquality" KEY_OCCUPANCY = "occupancy" KEY_UNMOUNTED = "tamper" KEY_VOLTAGE = "voltage" # RX_KEYS = [KEY_BATTERY, KEY_BATTERY_LOW, KEY_LINKQUALITY, KEY_OCCUPANCY, KEY_UNMOUNTED, KEY_VOLTAGE] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # self.add_callback(self.KEY_BATTERY_LOW, True, self.__warning__, True) # # WARNING CALL # def __warning__(self, client, key, data): pass # TODO: implement warning feedback (key: KEY_BATTERY_LOW - info: KEY_BATTERY) # # RX # @property def linkquality(self): """rv: numeric value""" return self.get(self.KEY_LINKQUALITY) class my_powerplug(base): KEY_OUTPUT_0 = "output/1" KEY_OUTPUT_1 = "output/2" KEY_OUTPUT_2 = "output/3" KEY_OUTPUT_3 = "output/4" KEY_OUTPUT_ALL = "output/all" KEY_OUTPUT_LIST = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3] # TX_TYPE = base.TX_VALUE # RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def output_1(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_1) @property def output_2(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_2) @property def output_3(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_3) # # TX # def set_output(self, key, state): if key in self.KEY_OUTPUT_LIST: self.send_command(key, state) else: logging.error("Unknown key to set the output!") def set_output_0(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data)) self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.logger.info("Toggeling output 0") self.set_output_0(not self.output_0) def set_output_1(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_1, state) def set_output_1_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data)) self.set_output_1(data) def toggle_output_1_mcb(self, device, key, data): self.logger.info("Toggeling output 1") self.set_output_1(not self.output_1) def set_output_2(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_2, state) def set_output_2_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_2 else logging.DEBUG, "Changing output 2 to %s", str(data)) self.set_output_2(data) def toggle_output_2_mcb(self, device, key, data): self.logger.info("Toggeling output 2") self.set_output_2(not self.output_2) def set_output_3(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_3, state) def set_output_3_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_3 else logging.DEBUG, "Changing output 3 to %s", str(data)) self.set_output_3(data) def toggle_output_3_mcb(self, device, key, data): self.logger.info("Toggeling output 3") self.set_output_3(not self.output_3) def set_output_all(self, state): """state: [True, False, 'toggle']""" self.send_command(self.KEY_OUTPUT_ALL, state) def set_output_all_mcb(self, device, key, data): self.logger.info("Changing all outputs to %s", str(data)) self.set_output_all(data) def all_off(self): self.set_output_all(False) class tradfri_light(base): KEY_LINKQUALITY = "linkquality" KEY_OUTPUT_0 = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" KEY_BRIGHTNESS_FADE = "brightness_move" # TX_TYPE = base.TX_DICT TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP, KEY_BRIGHTNESS_FADE] # RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] RX_IGNORE_KEYS = ['update', 'color_mode', 'color_temp_startup'] RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) def __device_to_instance_filter__(self, key, data): if key == self.KEY_BRIGHTNESS: return int(round((data - 1) * 100 / 253, 0)) elif key == self.KEY_COLOR_TEMP: return int(round((data - 250) * 10 / 204, 0)) return super().__device_to_instance_filter__(key, data) def __instance_to_device_filter__(self, key, data): if key == self.KEY_BRIGHTNESS: return int(round(data * 253 / 100 + 1, 0)) elif key == self.KEY_COLOR_TEMP: return int(round(data * 204 / 10 + 250, 0)) return super().__instance_to_device_filter__(key, data) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0, False) @property def linkquality(self): """rv: numeric value""" return self.get(self.KEY_LINKQUALITY, 0) @property def brightness(self): """rv: numeric value [0%, ..., 100%]""" return self.get(self.KEY_BRIGHTNESS, 0) @property def color_temp(self): """rv: numeric value [0, ..., 10]""" return self.get(self.KEY_COLOR_TEMP, 0) # # TX # def request_data(self, device=None, key=None, data=None): self.mqtt_client.send(self.topic + "/get", '{"%s": ""}' % self.KEY_OUTPUT_0) def set_output_0(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data)) self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.logger.info("Toggeling output 0") self.set_output_0(not self.output_0) def set_brightness(self, brightness): """brightness: [0, ..., 100]""" self.send_command(self.KEY_BRIGHTNESS, brightness) def set_brightness_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.brightness else logging.DEBUG, "Changing brightness to %s", str(data)) self.set_brightness(data) def default_inc(self, speed=40): self.send_command(self.KEY_BRIGHTNESS_FADE, speed) def default_dec(self, speed=-40): self.default_inc(speed) def default_stop(self): self.default_inc(0) def set_color_temp(self, color_temp): """color_temp: [0, ..., 10]""" self.send_command(self.KEY_COLOR_TEMP, color_temp) def set_color_temp_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.color_temp else logging.DEBUG, "Changing color temperature to %s", str(data)) self.set_color_temp(data) def all_off(self): if self.output_0: self.set_output_0(False) class tradfri_button(base): ACTION_TOGGLE = "toggle" ACTION_BRIGHTNESS_UP = "brightness_up_click" ACTION_BRIGHTNESS_DOWN = "brightness_down_click" ACTION_RIGHT = "arrow_right_click" ACTION_LEFT = "arrow_left_click" ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" ACTION_BRIGHTNESS_UP_RELEASE = "brightness_up_release" ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" ACTION_BRIGHTNESS_DOWN_RELEASE = "brightness_down_release" ACTION_RIGHT_LONG = "arrow_right_hold" ACTION_RIGHT_RELEASE = "arrow_right_release" ACTION_LEFT_LONG = "arrow_left_hold" ACTION_LEFT_RELEASE = "arrow_left_release" # KEY_LINKQUALITY = "linkquality" KEY_BATTERY = "battery" KEY_ACTION = "action" KEY_ACTION_DURATION = "action_duration" # RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_ACTION] RX_IGNORE_KEYS = ['update', KEY_ACTION_DURATION] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # self.add_callback(self.KEY_BATTERY, None, self.__warning__, True) self.__battery_warning__ = False # # WARNING CALL # def __warning__(self, client, key, data): if data <= BATTERY_WARN_LEVEL: if not self.__battery_warning__: self.__battery_warning__ = True pass # TODO: implement warning feedback (key: KEY_BATTERY_LOW - info: KEY_BATTERY) else: self.__battery_warning__ = False # # RX # @property def action(self): """rv: action_txt""" return self.get(self.KEY_ACTION) class brennenstuhl_heatingvalve(base): KEY_LINKQUALITY = "linkquality" KEY_BATTERY = "battery" KEY_HEATING_SETPOINT = "current_heating_setpoint" KEY_TEMPERATURE = "local_temperature" # KEY_AWAY_MODE = "away_mode" KEY_CHILD_LOCK = "child_lock" KEY_PRESET = "preset" KEY_SYSTEM_MODE = "system_mode" KEY_VALVE_DETECTION = "valve_detection" KEY_WINDOW_DETECTION = "window_detection" # TX_TYPE = base.TX_DICT # RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_HEATING_SETPOINT, KEY_TEMPERATURE] RX_IGNORE_KEYS = [KEY_AWAY_MODE, KEY_CHILD_LOCK, KEY_PRESET, KEY_SYSTEM_MODE, KEY_VALVE_DETECTION, KEY_WINDOW_DETECTION] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.mqtt_client.send(self.topic + '/' + self.TX_TOPIC, json.dumps({self.KEY_WINDOW_DETECTION: "ON", self.KEY_CHILD_LOCK: "UNLOCK", self.KEY_VALVE_DETECTION: "ON", self.KEY_SYSTEM_MODE: "heat", self.KEY_PRESET: "manual"})) # self.add_callback(self.KEY_BATTERY, None, self.__warning__, True) self.__battery_warning__ = False # # WARNING CALL # def __warning__(self, client, key, data): if data <= BATTERY_WARN_LEVEL: if not self.__battery_warning__: self.__battery_warning__ = True pass # TODO: implement warning feedback (key: KEY_BATTERY_LOW - info: KEY_BATTERY) else: self.__battery_warning__ = False # # RX # @property def linkqulity(self): return self.get(self.KEY_LINKQUALITY) @property def heating_setpoint(self): return self.get(self.KEY_HEATING_SETPOINT) @property def temperature(self): return self.get(self.KEY_TEMPERATURE) # # TX # def set_heating_setpoint(self, setpoint): self.send_command(self.KEY_HEATING_SETPOINT, setpoint) def set_heating_setpoint_mcb(self, device, key, data): self.logger.info("Changing heating setpoint to %s", str(data)) self.set_heating_setpoint(data) class remote(base): KEY_CD = "CD" KEY_LINE1 = "LINE1" KEY_LINE3 = "LINE3" KEY_MUTE = "MUTE" KEY_POWER = "POWER" KEY_VOLDOWN = "VOLDOWN" KEY_VOLUP = "VOLUP" # TX_TOPIC = '' TX_TYPE = base.TX_VALUE # RX_IGNORE_TOPICS = [KEY_CD, KEY_LINE1, KEY_LINE3, KEY_MUTE, KEY_POWER, KEY_VOLUP, KEY_VOLDOWN] def set_cd(self, device=None, key=None, data=None): self.send_command(self.KEY_CD, None) def set_line1(self, device=None, key=None, data=None): self.send_command(self.KEY_LINE1, None) def set_line3(self, device=None, key=None, data=None): self.send_command(self.KEY_LINE3, None) def set_mute(self, device=None, key=None, data=None): self.send_command(self.KEY_MUTE, None) def set_power(self, device=None, key=None, data=None): self.send_command(self.KEY_POWER, None) def set_volume_up(self, data=False): """data: [True, False]""" self.send_command(self.KEY_VOLUP, data) def set_volume_down(self, data=False): """data: [True, False]""" self.send_command(self.KEY_VOLDOWN, data) def default_inc(self, device=None, key=None, data=None): self.set_volume_up(True) def default_dec(self, device=None, key=None, data=None): self.set_volume_down(True) def default_stop(self, device=None, key=None, data=None): self.set_volume_up(False) class status(base): KEY_STATE = "state" # TX_TYPE = base.TX_VALUE # RX_KEYS = [KEY_STATE] def set_state(self, num, data): """data: [True, False]""" self.send_command(self.KEY_STATE + "/" + str(num), data) def set_state_mcb(self, device, key, data): self.logger.info("Changing state to %s", str(data)) self.set_state(data) class audio_status(status): KEY_TITLE = "title" # RX_KEYS = [status.KEY_STATE, KEY_TITLE]