#!/usr/bin/env python # -*- coding: utf-8 -*- # """ devices (DEVICES) =========== **Author:** * Dirk Alders **Description:** This Module supports smarthome devices **Submodules:** * :mod:`shelly` * :mod:`silvercrest_powerplug` **Unittest:** See also the :download:`unittest ` documentation. **Module Documentation:** """ __DEPENDENCIES__ = [] import json import logging try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) BATTERY_WARN_LEVEL = 5 def DEVICE_TYPE_LIST(): return [shelly, silvercrest_powerplug, my_powerplug, tradfri_light, tradfri_button] def is_json(data): try: json.loads(data) except json.decoder.JSONDecodeError: return False else: return True class base(dict): TX_TOPIC = None TX_VALUE = 0 TX_DICT = 1 TX_TYPE = -1 TX_FILTER_DATA_KEYS = [] # RX_LOG_INFO_ALWAYS_KEYS = [] RX_KEYS = [] RX_IGNORE_TOPICS = [] RX_IGNORE_KEYS = [] RX_FILTER_DATA_KEYS = [] def __init__(self, mqtt_client, topic): # data storage self.mqtt_client = mqtt_client self.topic = topic # initialisations dict.__init__(self) mqtt_client.add_callback( topic=self.topic, callback=self.receive_callback) mqtt_client.add_callback( topic=self.topic+"/#", callback=self.receive_callback) # self.callback_list = [] self.warning_callback = None def receive_callback(self, client, userdata, message): self.unpack(message) def unpack_filter(self, key): if key in self.RX_FILTER_DATA_KEYS: if self.get(key) == 1 or self.get(key) == 'on' or self.get(key) == 'ON': self[key] = True elif self.get(key) == 0 or self.get(key) == 'off' or self.get(key) == 'OFF': self[key] = False def unpack_single_value(self, key, data): prev_value = self.get(key) if key in self.RX_KEYS: self[key] = data # Filter, if needed self.unpack_filter(key) logger.log(logging.INFO if key in self.RX_LOG_INFO_ALWAYS_KEYS or prev_value != self.get(key) else logging.DEBUG, "Received data for (%s) %s - %s", self.topic, key, str(self.get(key))) self.callback_caller(key, self[key]) elif key not in self.RX_IGNORE_KEYS: logger.warning('Got a message from \"%s\"with unparsed content "%s"', self.topic, key) else: logger.debug("Ignoring key %s", key) def unpack(self, message): content_key = message.topic[len(self.topic) + 1:] if content_key not in self.RX_IGNORE_TOPICS: logger.debug("Unpacking content_key \"%s\" from message.", content_key) if is_json(message.payload): data = json.loads(message.payload) if type(data) is dict: for key in data: self.unpack_single_value(key, data[key]) else: self.unpack_single_value(content_key, data) # String else: self.unpack_single_value( content_key, message.payload.decode('utf-8')) self.warning_caller() else: logger.debug("Ignoring topic %s", content_key) def pack_filter(self, key, data): if key in self.TX_FILTER_DATA_KEYS: if data is True: return "on" elif data is False: return "off" else: return data return data def pack(self, key, data): data = self.pack_filter(key, data) if self.TX_TOPIC is not None: if self.TX_TYPE < 0: logger.error( "Unknown tx type. Set TX_TYPE of class to a known value") else: if self.TX_TYPE == self.TX_DICT: self.mqtt_client.send('/'.join([self.topic, self.TX_TOPIC]), json.dumps({key: data})) else: if type(data) not in [str, bytes]: data = json.dumps(data) self.mqtt_client.send('/'.join([self.topic, key, self.TX_TOPIC]), data) else: logger.error( "Unknown tx toptic. Set TX_TOPIC of class to a known value") def add_callback(self, key, data, callback): """ key: key or None for all keys data: data or None for all data """ cb_tup = (key, data, callback) if cb_tup not in self.callback_list: self.callback_list.append(cb_tup) def add_warning_callback(self, callback): self.warning_callback = callback def warning_call_condition(self): return False def callback_caller(self, key, data): for cb_key, cb_data, callback in self.callback_list: if (cb_key == key or cb_key is None) and (cb_data == data or cb_data is None) and callback is not None: callback(self, key, data) def warning_caller(self): if self.warning_call_condition(): warn_txt = self.warning_text() logger.warning(warn_txt) if self.warning_callback is not None: self.warning_callback(self, warn_txt) def warning_text(self, data): return "default warning text - replace parent warning_text function" class shelly(base): KEY_OUTPUT_0 = "relay/0" KEY_OUTPUT_1 = "relay/1" KEY_INPUT_0 = "input/0" KEY_INPUT_1 = "input/1" KEY_TEMPERATURE = "temperature" KEY_OVERTEMPERATURE = "overtemperature" # TX_TOPIC = 'command' TX_TYPE = base.TX_VALUE TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1] # RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE] RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + TX_TOPIC, KEY_OUTPUT_1 + '/' + TX_TOPIC, KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy"] RX_IGNORE_KEYS = ['temperature_f'] RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # # WARNING CALL # def warning_call_condition(self): return self.get(self.KEY_OVERTEMPERATURE) def warning_text(self): if self.overtemperature: if self.temperature is not None: return "Overtemperature detected for %s. Temperature was %.1f°C." % (self.topic, self.temperature) else: return "Overtemperature detected for %s." % self.topic # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def output_1(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_1) @property def input_0(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_0) @property def input_1(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_1) @property def temperature(self): """rv: numeric value""" return self.get(self.KEY_TEMPERATURE) # # TX # def set_output_0(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_0, state) def set_output_1(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_1, state) class silvercrest_powerplug(base): KEY_LINKQUALITY = "linkquality" KEY_OUTPUT_0 = "state" # TX_TOPIC = 'set' TX_TYPE = base.TX_DICT TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0] # RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0] RX_IGNORE_TOPICS = [TX_TOPIC] RX_IGNORE_KEYS = [] RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def linkquality(self): """rv: numeric value""" return self.get(self.KEY_LINKQUALITY) # # TX # def set_output_0(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_0, state) class my_powerplug(base): KEY_OUTPUT_0 = "output/1" KEY_OUTPUT_1 = "output/2" KEY_OUTPUT_2 = "output/3" KEY_OUTPUT_3 = "output/4" KEY_OUTPUT_ALL = "output/all" # TX_TOPIC = 'set' TX_TYPE = base.TX_VALUE # RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3] RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + "/" + TX_TOPIC, KEY_OUTPUT_1 + "/" + TX_TOPIC, KEY_OUTPUT_2 + "/" + TX_TOPIC, KEY_OUTPUT_3 + "/" + TX_TOPIC] RX_IGNORE_KEYS = [] RX_FILTER_DATA_KEYS = [] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def output_1(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_1) @property def output_2(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_2) @property def output_3(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_3) # # TX # def set_output_0(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_0, state) def set_output_1(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_1, state) def set_output_2(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_2, state) def set_output_3(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_3, state) def set_output_all(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_ALL, state) class tradfri_light(base): KEY_LINKQUALITY = "linkquality" KEY_OUTPUT_0 = "state" KEY_BRIGHTNESS = "brightness" KEY_COLOR_TEMP = "color_temp" # TX_TOPIC = 'set' TX_TYPE = base.TX_DICT TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] # RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] RX_IGNORE_TOPICS = [TX_TOPIC] RX_IGNORE_KEYS = ['update', 'color_mode'] RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) def unpack_filter(self, key): if key == self.KEY_BRIGHTNESS: self[key] = self[key] * 100 / 256 elif key == self.KEY_COLOR_TEMP: self[key] = (self[key] - 250) * 100 / 204 else: super().unpack_filter(key) def pack_filter(self, key, data): if key == self.KEY_BRIGHTNESS: return data * 256 / 100 elif key == self.KEY_COLOR_TEMP: return data * 204 / 100 + 250 else: return super().pack_filter(key, data) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def linkquality(self): """rv: numeric value""" return self.get(self.KEY_LINKQUALITY) @property def brightness(self): """rv: numeric value [0%, ..., 100%""" return self.get(self.KEY_BRIGHTNESS) @property def color_temp(self): """rv: numeric value [0%, ..., 100%""" return self.get(self.KEY_COLOR_TEMP) # # TX # def set_output_0(self, state): """state: [True, False, 'toggle']""" self.pack(self.KEY_OUTPUT_0, state) def set_brightness(self, brightness): """brightness: [0, ..., 100]""" self.pack(self.KEY_BRIGHTNESS, brightness) def set_color_temp(self, color_temp): """color_temp: [0, ..., 100]""" self.pack(self.KEY_COLOR_TEMP, color_temp) class tradfri_button(base): KEY_LINKQUALITY = "linkquality" KEY_BATTERY = "battery" KEY_ACTION = "action" # RX_LOG_INFO_ALWAYS_KEYS = [KEY_ACTION] RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_ACTION] RX_IGNORE_TOPICS = [] RX_IGNORE_KEYS = ['update'] RX_FILTER_DATA_KEYS = [] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # # RX # @property def action(self): """rv: action_txt""" return self.get(self.KEY_ACTION) # # WARNING CALL # def warning_call_condition(self): return self.get(self.KEY_BATTERY) <= BATTERY_WARN_LEVEL def warning_text(self): return "Low battery level detected for %s. Battery level was %.0f%%." % (self.topic, self.get(self.KEY_BATTERY)) class nodered_gui(base): KEY_FEEDBACK = "feedback" KEY_STATE = "state" # TX_TOPIC = 'set' TX_TYPE = base.TX_VALUE TX_FILTER_DATA_KEYS = [] # RX_LOG_INFO_ALWAYS_KEYS = [] RX_KEYS = [KEY_STATE] RX_IGNORE_TOPICS = [KEY_FEEDBACK + '/' + TX_TOPIC] RX_FILTER_DATA_KEYS = [] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # # RX # @property def state(self): """rv: [True, False]""" return self.get(self.KEY_STATE) # # TX # def set_feedback(self, data): self.pack(self.KEY_FEEDBACK, data)