smart_brain/devices/__init__.py

497 lines
14 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
devices (DEVICES)
===========
**Author:**
* Dirk Alders <sudo-dirk@mount-mockery.de>
**Description:**
This Module supports smarthome devices
**Submodules:**
* :mod:`shelly`
* :mod:`silvercrest_powerplug`
**Unittest:**
See also the :download:`unittest <devices/_testresults_/unittest.pdf>` documentation.
**Module Documentation:**
"""
__DEPENDENCIES__ = []
import json
import logging
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
BATTERY_WARN_LEVEL = 5
def DEVICE_TYPE_LIST():
return [shelly, silvercrest_powerplug, my_powerplug, tradfri_light, tradfri_button]
def is_json(data):
try:
json.loads(data)
except json.decoder.JSONDecodeError:
return False
else:
return True
class base(dict):
TX_TOPIC = None
TX_VALUE = 0
TX_DICT = 1
TX_TYPE = -1
TX_FILTER_DATA_KEYS = []
#
RX_LOG_INFO_ALWAYS_KEYS = []
RX_KEYS = []
RX_IGNORE_TOPICS = []
RX_IGNORE_KEYS = []
RX_FILTER_DATA_KEYS = []
def __init__(self, mqtt_client, topic):
# data storage
self.mqtt_client = mqtt_client
self.topic = topic
# initialisations
dict.__init__(self)
mqtt_client.add_callback(
topic=self.topic, callback=self.receive_callback)
mqtt_client.add_callback(
topic=self.topic+"/#", callback=self.receive_callback)
#
self.callback_list = []
self.warning_callback = None
def receive_callback(self, client, userdata, message):
self.unpack(message)
def unpack_filter(self, key):
if key in self.RX_FILTER_DATA_KEYS:
if self.get(key) == 1 or self.get(key) == 'on' or self.get(key) == 'ON':
self[key] = True
elif self.get(key) == 0 or self.get(key) == 'off' or self.get(key) == 'OFF':
self[key] = False
def unpack_single_value(self, key, data):
prev_value = self.get(key)
if key in self.RX_KEYS:
self[key] = data
# Filter, if needed
self.unpack_filter(key)
logger.log(logging.INFO if key in self.RX_LOG_INFO_ALWAYS_KEYS or prev_value != self.get(key) else logging.DEBUG,
"Received data for (%s) %s - %s", self.topic, key, str(self.get(key)))
self.callback_caller(key, self[key])
elif key not in self.RX_IGNORE_KEYS:
logger.warning('Got a message with unparsed content "%s"', key)
else:
logger.debug("Ignoring key %s", key)
def unpack(self, message):
content_key = message.topic[len(self.topic) + 1:]
if content_key not in self.RX_IGNORE_TOPICS:
logger.debug("Unpacking content_key \"%s\" from message.", content_key)
if is_json(message.payload):
data = json.loads(message.payload)
if type(data) is dict:
for key in data:
self.unpack_single_value(key, data[key])
else:
self.unpack_single_value(content_key, data)
# String
else:
self.unpack_single_value(
content_key, message.payload.decode('utf-8'))
self.warning_caller()
else:
logger.debug("Ignoring topic %s", content_key)
def pack_filter(self, key, data):
if key in self.TX_FILTER_DATA_KEYS:
if data is True:
return "on"
elif data is False:
return "off"
else:
return data
return data
def pack(self, key, data):
data = self.pack_filter(key, data)
if self.TX_TOPIC is not None:
if self.TX_TYPE < 0:
logger.error(
"Unknown tx type. Set TX_TYPE of class to a known value")
else:
if self.TX_TYPE == self.TX_DICT:
self.mqtt_client.send('/'.join([self.topic, self.TX_TOPIC]), json.dumps({key: data}))
else:
if type(data) not in [str, bytes]:
data = json.dumps(data)
self.mqtt_client.send('/'.join([self.topic, key, self.TX_TOPIC]), data)
else:
logger.error(
"Unknown tx toptic. Set TX_TOPIC of class to a known value")
def add_callback(self, key, data, callback):
"""
key: key or None for all keys
data: data or None for all data
"""
cb_tup = (key, data, callback)
if cb_tup not in self.callback_list:
self.callback_list.append(cb_tup)
def add_warning_callback(self, callback):
self.warning_callback = callback
def warning_call_condition(self):
return False
def callback_caller(self, key, data):
for cb_key, cb_data, callback in self.callback_list:
if (cb_key == key or cb_key is None) and (cb_data == data or cb_data is None) and callback is not None:
callback(self, key, data)
def warning_caller(self):
if self.warning_call_condition():
warn_txt = self.warning_text()
logger.warning(warn_txt)
if self.warning_callback is not None:
self.warning_callback(self, warn_txt)
def warning_text(self, data):
return "default warning text - replace parent warning_text function"
class shelly(base):
KEY_OUTPUT_0 = "relay/0"
KEY_OUTPUT_1 = "relay/1"
KEY_INPUT_0 = "input/0"
KEY_INPUT_1 = "input/1"
KEY_TEMPERATURE = "temperature"
KEY_OVERTEMPERATURE = "overtemperature"
#
TX_TOPIC = 'command'
TX_TYPE = base.TX_VALUE
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1]
#
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0,
KEY_INPUT_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE]
RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + TX_TOPIC, KEY_OUTPUT_1 + '/' + TX_TOPIC,
KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy"]
RX_IGNORE_KEYS = ['temperature_f']
RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1,
KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
# WARNING CALL
#
def warning_call_condition(self):
return self.get(self.KEY_OVERTEMPERATURE)
def warning_text(self):
if self.overtemperature:
if self.temperature is not None:
return "Overtemperature detected for %s. Temperature was %.1f°C." % (self.topic, self.temperature)
else:
return "Overtemperature detected for %s." % self.topic
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def output_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_1)
@property
def input_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_INPUT_0)
@property
def input_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_INPUT_1)
@property
def temperature(self):
"""rv: numeric value"""
return self.get(self.KEY_TEMPERATURE)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_0, state)
def set_output_1(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_1, state)
class silvercrest_powerplug(base):
KEY_LINKQUALITY = "linkquality"
KEY_OUTPUT_0 = "state"
#
TX_TOPIC = 'set'
TX_TYPE = base.TX_DICT
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0]
#
RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0]
RX_IGNORE_TOPICS = [TX_TOPIC]
RX_IGNORE_KEYS = []
RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def linkquality(self):
"""rv: numeric value"""
return self.get(self.KEY_LINKQUALITY)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_0, state)
class my_powerplug(base):
KEY_OUTPUT_0 = "output/1"
KEY_OUTPUT_1 = "output/2"
KEY_OUTPUT_2 = "output/3"
KEY_OUTPUT_3 = "output/4"
KEY_OUTPUT_ALL = "output/all"
#
TX_TOPIC = 'set'
TX_TYPE = base.TX_VALUE
#
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3]
RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + "/" + TX_TOPIC, KEY_OUTPUT_1 + "/" + TX_TOPIC,
KEY_OUTPUT_2 + "/" + TX_TOPIC, KEY_OUTPUT_3 + "/" + TX_TOPIC]
RX_IGNORE_KEYS = []
RX_FILTER_DATA_KEYS = []
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def output_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_1)
@property
def output_2(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_2)
@property
def output_3(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_3)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_0, state)
def set_output_1(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_1, state)
def set_output_2(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_2, state)
def set_output_3(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_3, state)
def set_output_all(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_ALL, state)
class tradfri_light(base):
KEY_LINKQUALITY = "linkquality"
KEY_OUTPUT_0 = "state"
KEY_BRIGHTNESS = "brightness"
KEY_COLOR_TEMP = "color_temp"
#
TX_TOPIC = 'set'
TX_TYPE = base.TX_DICT
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP]
#
RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP]
RX_IGNORE_TOPICS = [TX_TOPIC]
RX_IGNORE_KEYS = ['update', 'color_mode']
RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def unpack_filter(self, key):
if key == self.KEY_BRIGHTNESS:
self[key] = self[key] * 100 / 256
elif key == self.KEY_COLOR_TEMP:
self[key] = (self[key] - 250) * 100 / 204
else:
super().unpack_filter(key)
def pack_filter(self, key, data):
if key == self.KEY_BRIGHTNESS:
return data * 256 / 100
elif key == self.KEY_COLOR_TEMP:
return data * 204 / 100 + 250
else:
return super().pack_filter(key, data)
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def linkquality(self):
"""rv: numeric value"""
return self.get(self.KEY_LINKQUALITY)
@property
def brightness(self):
"""rv: numeric value [0%, ..., 100%"""
return self.get(self.KEY_BRIGHTNESS)
@property
def color_temp(self):
"""rv: numeric value [0%, ..., 100%"""
return self.get(self.KEY_COLOR_TEMP)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_0, state)
def set_brightness(self, brightness):
"""brightness: [0, ..., 100]"""
self.pack(self.KEY_BRIGHTNESS, brightness)
def set_color_temp(self, color_temp):
"""color_temp: [0, ..., 100]"""
self.pack(self.KEY_COLOR_TEMP, color_temp)
class tradfri_button(base):
KEY_LINKQUALITY = "linkquality"
KEY_BATTERY = "battery"
KEY_ACTION = "action"
#
RX_LOG_INFO_ALWAYS_KEYS = [KEY_ACTION]
RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_ACTION]
RX_IGNORE_TOPICS = []
RX_IGNORE_KEYS = ['update']
RX_FILTER_DATA_KEYS = []
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
# RX
#
@property
def action(self):
"""rv: action_txt"""
return self.get(self.KEY_ACTION)
#
# WARNING CALL
#
def warning_call_condition(self):
return self.get(self.KEY_BATTERY) <= BATTERY_WARN_LEVEL
def warning_text(self):
return "Low battery level detected for %s. Battery level was %.0f%%." % (self.topic, self.get(self.KEY_BATTERY))
class nodered_gui(base):
KEY_FEEDBACK = "feedback"
KEY_STATE = "state"
#
TX_TOPIC = 'set'
TX_TYPE = base.TX_VALUE
TX_FILTER_DATA_KEYS = []
#
RX_LOG_INFO_ALWAYS_KEYS = []
RX_KEYS = [KEY_STATE]
RX_IGNORE_TOPICS = [KEY_FEEDBACK + '/' + TX_TOPIC]
RX_FILTER_DATA_KEYS = []
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
# RX
#
@property
def state(self):
"""rv: [True, False]"""
return self.get(self.KEY_STATE)
#
# TX
#
def set_feedback(self, data):
self.pack(self.KEY_FEEDBACK, data)