From e702960e4de28d40bef76a9433b366f81079b480 Mon Sep 17 00:00:00 2001 From: Dirk Alders Date: Mon, 19 Dec 2022 10:35:20 +0100 Subject: [PATCH] Initial Version with very limited functions --- .gitignore | 59 +++-- .gitmodules | 6 + .vscode/settings.json | 11 + __install__.py | 41 +++ devices/__init__.py | 461 +++++++++++++++++++++++++++++++++ function/__init__.py | 19 ++ function/first_floor_dining.py | 24 ++ mqtt | 1 + report | 1 + smart_brain.py | 94 +++++++ smart_brain.sh | 4 + 11 files changed, 700 insertions(+), 21 deletions(-) create mode 100644 .gitmodules create mode 100644 .vscode/settings.json create mode 100644 __install__.py create mode 100644 devices/__init__.py create mode 100644 function/__init__.py create mode 100644 function/first_floor_dining.py create mode 160000 mqtt create mode 160000 report create mode 100644 smart_brain.py create mode 100755 smart_brain.sh diff --git a/.gitignore b/.gitignore index 7f68ee7..2ff1af4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,41 @@ +config.py + +# ---> Linux +*~ + +# temporary files which can be created if a process still has a handle open of a deleted file +.fuse_hidden* + +# KDE directory preferences +.directory + +# Linux trash folder which might appear on any partition or disk +.Trash-* + +# .nfs files are created when an open file is removed but is still being accessed +.nfs* + +# ---> Backup +*.bak +*.gho +*.ori +*.orig +*.tmp + +# ---> VirtualEnv +# Virtualenv +# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ +.Python +[Bb]in +[Ii]nclude +[Ll]ib +[Ll]ib64 +[Ll]ocal +[Ss]cripts +pyvenv.cfg +.venv +pip-selfcheck.json + # ---> Python # Byte-compiled / optimized / DLL files __pycache__/ @@ -114,24 +152,3 @@ dmypy.json # Pyre type checker .pyre/ -# ---> Backup -*.bak -*.gho -*.ori -*.orig -*.tmp - -# ---> VirtualEnv -# Virtualenv -# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ -.Python -[Bb]in -[Ii]nclude -[Ll]ib -[Ll]ib64 -[Ll]ocal -[Ss]cripts -pyvenv.cfg -.venv -pip-selfcheck.json - diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..4fd1c93 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "mqtt"] + path = mqtt + url = https://git.mount-mockery.de/pylib/mqtt.git +[submodule "report"] + path = report + url = https://git.mount-mockery.de/pylib/report.git diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..576bc33 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,11 @@ +{ + "python.defaultInterpreterPath": "./venv/bin/python", + "editor.formatOnSave": true, + "autopep8.args": [ + "--max-line-length=120" + ], + "editor.fontSize": 14, + "emmet.includeLanguages": { + "django-html": "html" + } +} \ No newline at end of file diff --git a/__install__.py b/__install__.py new file mode 100644 index 0000000..69626dd --- /dev/null +++ b/__install__.py @@ -0,0 +1,41 @@ +#!/bin/python +# +import os +import sys + +SERVICE_FILE = """ +[Unit] +Description=Smarthome Ambient Information Service +After=network-online.target +Wants=network-online.target +[Service] +User=%(UID)d +Group=%(GID)d +ExecStart=%(MY_PATH)s/smart_brain.sh +Type=simple +[Install] +WantedBy=default.target +""" + + +def help(): + print("Usage: prog ") + +if __name__ == "__main__": + if len(sys.argv) == 4: + try: + uid = int(sys.argv[1]) + gid = int(sys.argv[2]) + except ValueError: + help() + else: + if os.path.isdir(sys.argv[3]): + with open(os.path.join(sys.argv[3], 'smart_brain.service'), "w") as fh: + fh.write(SERVICE_FILE % { + "MY_PATH": os.path.dirname(os.path.abspath(__file__)), + "UID": uid, + "GID": gid}) + else: + help() + else: + help() diff --git a/devices/__init__.py b/devices/__init__.py new file mode 100644 index 0000000..26e104a --- /dev/null +++ b/devices/__init__.py @@ -0,0 +1,461 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +""" +devices (DEVICES) +=========== + +**Author:** + +* Dirk Alders + +**Description:** + + This Module supports smarthome devices + +**Submodules:** + +* :mod:`shelly` +* :mod:`silvercrest_powerplug` + +**Unittest:** + + See also the :download:`unittest ` documentation. + +**Module Documentation:** + +""" + +__DEPENDENCIES__ = [] + +import json +import logging + +try: + from config import APP_NAME as ROOT_LOGGER_NAME +except ImportError: + ROOT_LOGGER_NAME = 'root' +logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) + +BATTERY_WARN_LEVEL = 5 + + +def is_json(data): + try: + json.loads(data) + except json.decoder.JSONDecodeError: + return False + else: + return True + + +class base(dict): + TX_TOPIC = None + TX_VALUE = 0 + TX_DICT = 1 + TX_TYPE = -1 + TX_FILTER_DATA_KEYS = [] + # + RX_LOG_INFO_ALWAYS_KEYS = [] + RX_KEYS = [] + RX_IGNORE_TOPICS = [] + RX_IGNORE_KEYS = [] + RX_FILTER_DATA_KEYS = [] + + def __init__(self, mqtt_client, topic): + # data storage + self.mqtt_client = mqtt_client + self.topic = topic + # initialisations + dict.__init__(self) + mqtt_client.add_callback( + topic=self.topic, callback=self.receive_callback) + mqtt_client.add_callback( + topic=self.topic+"/#", callback=self.receive_callback) + # + self.callback_list = [] + self.warning_callback = None + + def receive_callback(self, client, userdata, message): + self.unpack(message) + + def unpack_filter(self, key): + if key in self.RX_FILTER_DATA_KEYS: + if self.get(key) == 1 or self.get(key) == 'on' or self.get(key) == 'ON': + self[key] = True + elif self.get(key) == 0 or self.get(key) == 'off' or self.get(key) == 'OFF': + self[key] = False + + def unpack_single_value(self, key, data): + prev_value = self.get(key) + if key in self.RX_KEYS: + self[key] = data + # Filter, if needed + self.unpack_filter(key) + logger.log(logging.INFO if key in self.RX_LOG_INFO_ALWAYS_KEYS or prev_value != self.get(key) else logging.DEBUG, + "Received data for (%s) %s - %s", self.topic, key, str(self.get(key))) + self.callback_caller(key, self[key]) + elif key not in self.RX_IGNORE_KEYS: + logger.warning('Got a message with unparsed content "%s"', key) + else: + logger.debug("Ignoring key %s", key) + + def unpack(self, message): + content_key = message.topic[len(self.topic) + 1:] + if content_key not in self.RX_IGNORE_TOPICS: + logger.debug("Unpacking content_key \"%s\" from message.", content_key) + if is_json(message.payload): + data = json.loads(message.payload) + if type(data) is dict: + for key in data: + self.unpack_single_value(key, data[key]) + else: + self.unpack_single_value(content_key, data) + # String + else: + self.unpack_single_value( + content_key, message.payload.decode('utf-8')) + self.warning_caller() + else: + logger.debug("Ignoring topic %s", content_key) + + def pack_filter(self, key, data): + if key in self.TX_FILTER_DATA_KEYS: + if data is True: + return "on" + elif data is False: + return "off" + else: + return data + return data + + def pack(self, key, data): + data = self.pack_filter(key, data) + if self.TX_TOPIC is not None: + if self.TX_TYPE < 0: + logger.error( + "Unknown tx type. Set TX_TYPE of class to a known value") + else: + if self.TX_TYPE == self.TX_DICT: + self.mqtt_client.send('/'.join([self.topic, self.TX_TOPIC]), json.dumps({key: data})) + else: + if type(data) not in [str, bytes]: + data = json.dumps(data) + self.mqtt_client.send('/'.join([self.topic, key, self.TX_TOPIC]), data) + else: + logger.error( + "Unknown tx toptic. Set TX_TOPIC of class to a known value") + + def add_callback(self, key, data, callback): + """ + key: key or None for all keys + data: data or None for all data + """ + cb_tup = (key, data, callback) + if cb_tup not in self.callback_list: + self.callback_list.append(cb_tup) + + def add_warning_callback(self, callback): + self.warning_callback = callback + + def warning_call_condition(self): + return False + + def callback_caller(self, key, data): + for cb_key, cb_data, callback in self.callback_list: + if (cb_key == key or cb_key is None) and (cb_data == data or cb_data is None): + callback(self, key, data) + + def warning_caller(self): + if self.warning_call_condition(): + warn_txt = self.warning_text() + logger.warning(warn_txt) + if self.warning_callback is not None: + self.warning_callback(self, warn_txt) + + def warning_text(self, data): + return "default warning text - replace parent warning_text function" + + +class shelly(base): + KEY_OUTPUT_0 = "relay/0" + KEY_OUTPUT_1 = "relay/1" + KEY_INPUT_0 = "input/0" + KEY_INPUT_1 = "input/1" + KEY_TEMPERATURE = "temperature" + KEY_OVERTEMPERATURE = "overtemperature" + # + TX_TOPIC = 'command' + TX_TYPE = base.TX_VALUE + TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1] + # + RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, + KEY_INPUT_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE] + RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + TX_TOPIC, KEY_OUTPUT_1 + '/' + TX_TOPIC, + KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy"] + RX_IGNORE_KEYS = ['temperature_f'] + RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, + KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + # + # WARNING CALL + # + def warning_call_condition(self): + return self.get(self.KEY_OVERTEMPERATURE) + + def warning_text(self): + if self.overtemperature: + if self.temperature is not None: + return "Overtemperature detected for %s. Temperature was %.1f°C." % (self.topic, self.temperature) + else: + return "Overtemperature detected for %s." % self.topic + + # + # RX + # + @property + def output_0(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_0) + + @property + def output_1(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_1) + + @property + def input_0(self): + """rv: [True, False]""" + return self.get(self.KEY_INPUT_0) + + @property + def input_1(self): + """rv: [True, False]""" + return self.get(self.KEY_INPUT_1) + + @property + def temperature(self): + """rv: numeric value""" + return self.get(self.KEY_TEMPERATURE) + + # + # TX + # + def set_output_0(self, state): + """state: [True, False, 'toggle']""" + self.pack(self.KEY_OUTPUT_0, state) + + def set_output_1(self, state): + """state: [True, False, 'toggle']""" + self.pack(self.KEY_OUTPUT_1, state) + + +class silvercrest_powerplug(base): + KEY_LINKQUALITY = "linkquality" + KEY_OUTPUT_0 = "state" + # + TX_TOPIC = 'set' + TX_TYPE = base.TX_DICT + TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0] + # + RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0] + RX_IGNORE_TOPICS = [TX_TOPIC] + RX_IGNORE_KEYS = [] + RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + # + # RX + # + @property + def output_0(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_0) + + @property + def linkquality(self): + """rv: numeric value""" + return self.get(self.KEY_LINKQUALITY) + + # + # TX + # + def set_output_0(self, state): + """state: [True, False, 'toggle']""" + self.pack(self.KEY_OUTPUT_0, state) + + +class my_powerplug(base): + KEY_OUTPUT_0 = "output/1" + KEY_OUTPUT_1 = "output/2" + KEY_OUTPUT_2 = "output/3" + KEY_OUTPUT_3 = "output/4" + KEY_OUTPUT_ALL = "output/all" + # + TX_TOPIC = 'set' + TX_TYPE = base.TX_VALUE + # + RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3] + RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + "/" + TX_TOPIC, KEY_OUTPUT_1 + "/" + TX_TOPIC, + KEY_OUTPUT_2 + "/" + TX_TOPIC, KEY_OUTPUT_3 + "/" + TX_TOPIC] + RX_IGNORE_KEYS = [] + RX_FILTER_DATA_KEYS = [] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + # + # RX + # + @property + def output_0(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_0) + + @property + def output_1(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_1) + + @property + def output_2(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_2) + + @property + def output_3(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_3) + + # + # TX + # + def set_output_0(self, state): + """state: [True, False, 'toggle']""" + self.pack(self.KEY_OUTPUT_0, state) + + def set_output_1(self, state): + """state: [True, False, 'toggle']""" + self.pack(self.KEY_OUTPUT_1, state) + + def set_output_2(self, state): + """state: [True, False, 'toggle']""" + self.pack(self.KEY_OUTPUT_2, state) + + def set_output_3(self, state): + """state: [True, False, 'toggle']""" + self.pack(self.KEY_OUTPUT_3, state) + + def set_output_all(self, state): + """state: [True, False, 'toggle']""" + self.pack(self.KEY_OUTPUT_ALL, state) + + +class tradfri_light(base): + KEY_LINKQUALITY = "linkquality" + KEY_OUTPUT_0 = "state" + KEY_BRIGHTNESS = "brightness" + KEY_COLOR_TEMP = "color_temp" + # + TX_TOPIC = 'set' + TX_TYPE = base.TX_DICT + TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] + # + RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] + RX_IGNORE_TOPICS = [TX_TOPIC] + RX_IGNORE_KEYS = ['update', 'color_mode'] + RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def unpack_filter(self, key): + if key == self.KEY_BRIGHTNESS: + self[key] = self[key] * 100 / 256 + elif key == self.KEY_COLOR_TEMP: + self[key] = (self[key] - 250) * 100 / 204 + else: + super().unpack_filter(key) + + def pack_filter(self, key, data): + if key == self.KEY_BRIGHTNESS: + return data * 256 / 100 + elif key == self.KEY_COLOR_TEMP: + return data * 204 / 100 + 250 + else: + return super().pack_filter(key, data) + + # + # RX + # + @property + def output_0(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_0) + + @property + def linkquality(self): + """rv: numeric value""" + return self.get(self.KEY_LINKQUALITY) + + @property + def brightness(self): + """rv: numeric value [0%, ..., 100%""" + return self.get(self.KEY_BRIGHTNESS) + + @property + def color_temp(self): + """rv: numeric value [0%, ..., 100%""" + return self.get(self.KEY_COLOR_TEMP) + + # + # TX + # + def set_output_0(self, state): + """state: [True, False, 'toggle']""" + self.pack(self.KEY_OUTPUT_0, state) + + def set_brightness(self, brightness): + """brightness: [0, ..., 100]""" + self.pack(self.KEY_BRIGHTNESS, brightness) + + def set_color_temp(self, color_temp): + """color_temp: [0, ..., 100]""" + self.pack(self.KEY_COLOR_TEMP, color_temp) + + +class tradfri_button(base): + KEY_LINKQUALITY = "linkquality" + KEY_BATTERY = "battery" + KEY_ACTION = "action" + # + RX_LOG_INFO_ALWAYS_KEYS = [KEY_ACTION] + RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_ACTION] + RX_IGNORE_TOPICS = [] + RX_IGNORE_KEYS = ['update'] + RX_FILTER_DATA_KEYS = [] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + # + # RX + # + @property + def action(self): + """rv: action_txt""" + return self.get(self.KEY_ACTION) + + # + # WARNING CALL + # + def warning_call_condition(self): + return self.get(self.KEY_BATTERY) <= BATTERY_WARN_LEVEL + + def warning_text(self): + return "Low battery level detected for %s. Battery level was %.0f%%." % (self.topic, self.get(self.KEY_BATTERY)) diff --git a/function/__init__.py b/function/__init__.py new file mode 100644 index 0000000..17e5f43 --- /dev/null +++ b/function/__init__.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +import logging + +__all__ = ['all_functions', 'first_floor_dining'] + +from . import first_floor_dining + +try: + from config import APP_NAME as ROOT_LOGGER_NAME +except ImportError: + ROOT_LOGGER_NAME = 'root' +logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) + + +class all_functions(object): + def __init__(self, device_collection): + first_floor_dining.room_function(device_collection) diff --git a/function/first_floor_dining.py b/function/first_floor_dining.py new file mode 100644 index 0000000..14d1a32 --- /dev/null +++ b/function/first_floor_dining.py @@ -0,0 +1,24 @@ +import devices +import logging + +try: + from config import APP_NAME as ROOT_LOGGER_NAME +except ImportError: + ROOT_LOGGER_NAME = 'root' +logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) + + +class room_function(object): + def __init__(self, device_collection): + self.main_light_shelly = device_collection.shellies.dinigroom + self.floorlamp_powerplug = device_collection.powerplugs.dining_floorlamp + # + self.main_light_shelly.add_callback(devices.shelly.KEY_OUTPUT_0, None, self.main_light_shelly_callback) + # + self.main_light_shelly_last = None + + def main_light_shelly_callback(self, device, key, data): + if data != self.main_light_shelly_last: + logger.info("Test") + self.floorlamp_powerplug.set_output_0(data) + self.main_light_shelly_last diff --git a/mqtt b/mqtt new file mode 160000 index 0000000..1921bc6 --- /dev/null +++ b/mqtt @@ -0,0 +1 @@ +Subproject commit 1921bc619a9c4af682a7707d6fe58069478c59cd diff --git a/report b/report new file mode 160000 index 0000000..e2392c9 --- /dev/null +++ b/report @@ -0,0 +1 @@ +Subproject commit e2392c9f28d88ee54463681850acf95ae496c9a0 diff --git a/smart_brain.py b/smart_brain.py new file mode 100644 index 0000000..29de7c3 --- /dev/null +++ b/smart_brain.py @@ -0,0 +1,94 @@ +import config +import devices +import function +import inspect +import logging +import mqtt +import os +import report +import time + +logger = logging.getLogger(config.APP_NAME) + + +class shellies(object): + def __init__(self, mc): + self.dinigroom = devices.shelly(mc, topic="shellies/diningroom") # http://shelly1l-84CCA8ADD055 + self.sleep_madi = devices.shelly(mc, topic="shellies/sleep_madi") # http://shelly1l-E8DB84A254C7 + # self._ = devices.shelly(mc, topic="") # http:// + # self._ = devices.shelly(mc, topic="") # http:// + # self._ = devices.shelly(mc, topic="") # http:// + # self._ = devices.shelly(mc, topic="") # http:// + # self._ = devices.shelly(mc, topic="") # http:// + # self._ = devices.shelly(mc, topic="") # http:// + # self._ = devices.shelly(mc, topic="") # http:// + + +class powerplugs(object): + def __init__(self, mc): + self.dining_floorlamp = devices.silvercrest_powerplug(mc, "zigbee_og_e/powerplug/dining_floorlamp") + self.aux = devices.silvercrest_powerplug(mc, topic="zigbee_og_e/powerplug/aux") + self.dirk = devices.my_powerplug(mc, "powerplug/dirk") + + +class lights(object): + def __init__(self, mc): + self.sleep_madi = devices.tradfri_light(mc, topic="zigbee_og_e/light/sleep_madi") + self.sleep_bed_di = devices.tradfri_light(mc, topic="zigbee_og_e/light/sleep_bed_di") + # self._ = devices.tradfri_light(mc, topic="") + # self._ = devices.tradfri_light(mc, topic="") + # self._ = devices.tradfri_light(mc, topic="") + # self._ = devices.tradfri_light(mc, topic="") + # self._ = devices.tradfri_light(mc, topic="") + # self._ = devices.tradfri_light(mc, topic="") + # self._ = devices.tradfri_light(mc, topic="") + # self._ = devices.tradfri_light(mc, topic="") + # self._ = devices.tradfri_light(mc, topic="") + + +class input_devices(object): + def __init__(self, mc): + self.og_east = devices.tradfri_button(mc, topic="zigbee_og_e/input_device/og_east") + + +class all_devices(object): + def __init__(self, mc): + self.shellies = shellies(mc) + self.powerplugs = powerplugs(mc) + self.lights = lights(mc) + self.input_devices = input_devices(mc) + + def devicelist(self): + rv = [] + for name, obj in inspect.getmembers(self): + if not name.startswith('_') and name != inspect.stack()[0][3]: + for devicename, deviceobj in inspect.getmembers(obj): + if not devicename.startswith('_'): + rv.append(deviceobj) + return rv + + +if __name__ == "__main__": + if config.DEBUG: + report.stdoutLoggingConfigure(([config.APP_NAME, config.DEBUGLEVEL], ), report.LONG_FMT) + else: + report.stdoutLoggingConfigure(((config.APP_NAME, logging.INFO), + (config.APP_NAME+'.devices', logging.WARNING)), report.SHORT_FMT) + # + mc = mqtt.mqtt_client(host=config.MQTT_SERVER, port=config.MQTT_PORT, + username=config.MQTT_USER, password=config.MQTT_PASSWORD, name=config.APP_NAME) + + ad = all_devices(mc) + func = function.all_functions(ad) + + # def wcb(device, txt): + # logger.warning("%s: %s", device.topic, txt) + # for device in ad.devicelist(): + # device.add_warning_callback(wcb) + + # def cb(device, key, data): + # print("Callback: %s::%s" % (key, str(data))) + # ad.shellies.dinigroom.add_callback(devices.shelly.KEY_OUTPUT_0, None, cb) + + while (True): + time.sleep(1) diff --git a/smart_brain.sh b/smart_brain.sh new file mode 100755 index 0000000..acc4eac --- /dev/null +++ b/smart_brain.sh @@ -0,0 +1,4 @@ +#!/bin/sh +# +BASEPATH=`dirname $0` +$BASEPATH/venv/bin/python $BASEPATH/smart_brain.py \ No newline at end of file