Initial Version with very limited functions

This commit is contained in:
Dirk Alders 2022-12-19 10:35:20 +01:00
parent d27c9c9c35
commit e702960e4d
11 changed files with 700 additions and 21 deletions

59
.gitignore vendored
View File

@ -1,3 +1,41 @@
config.py
# ---> Linux
*~
# temporary files which can be created if a process still has a handle open of a deleted file
.fuse_hidden*
# KDE directory preferences
.directory
# Linux trash folder which might appear on any partition or disk
.Trash-*
# .nfs files are created when an open file is removed but is still being accessed
.nfs*
# ---> Backup
*.bak
*.gho
*.ori
*.orig
*.tmp
# ---> VirtualEnv
# Virtualenv
# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/
.Python
[Bb]in
[Ii]nclude
[Ll]ib
[Ll]ib64
[Ll]ocal
[Ss]cripts
pyvenv.cfg
.venv
pip-selfcheck.json
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
@ -114,24 +152,3 @@ dmypy.json
# Pyre type checker # Pyre type checker
.pyre/ .pyre/
# ---> Backup
*.bak
*.gho
*.ori
*.orig
*.tmp
# ---> VirtualEnv
# Virtualenv
# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/
.Python
[Bb]in
[Ii]nclude
[Ll]ib
[Ll]ib64
[Ll]ocal
[Ss]cripts
pyvenv.cfg
.venv
pip-selfcheck.json

6
.gitmodules vendored Normal file
View File

@ -0,0 +1,6 @@
[submodule "mqtt"]
path = mqtt
url = https://git.mount-mockery.de/pylib/mqtt.git
[submodule "report"]
path = report
url = https://git.mount-mockery.de/pylib/report.git

11
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,11 @@
{
"python.defaultInterpreterPath": "./venv/bin/python",
"editor.formatOnSave": true,
"autopep8.args": [
"--max-line-length=120"
],
"editor.fontSize": 14,
"emmet.includeLanguages": {
"django-html": "html"
}
}

41
__install__.py Normal file
View File

@ -0,0 +1,41 @@
#!/bin/python
#
import os
import sys
SERVICE_FILE = """
[Unit]
Description=Smarthome Ambient Information Service
After=network-online.target
Wants=network-online.target
[Service]
User=%(UID)d
Group=%(GID)d
ExecStart=%(MY_PATH)s/smart_brain.sh
Type=simple
[Install]
WantedBy=default.target
"""
def help():
print("Usage: prog <UID> <GID> <TARGET_PATH>")
if __name__ == "__main__":
if len(sys.argv) == 4:
try:
uid = int(sys.argv[1])
gid = int(sys.argv[2])
except ValueError:
help()
else:
if os.path.isdir(sys.argv[3]):
with open(os.path.join(sys.argv[3], 'smart_brain.service'), "w") as fh:
fh.write(SERVICE_FILE % {
"MY_PATH": os.path.dirname(os.path.abspath(__file__)),
"UID": uid,
"GID": gid})
else:
help()
else:
help()

461
devices/__init__.py Normal file
View File

@ -0,0 +1,461 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
devices (DEVICES)
===========
**Author:**
* Dirk Alders <sudo-dirk@mount-mockery.de>
**Description:**
This Module supports smarthome devices
**Submodules:**
* :mod:`shelly`
* :mod:`silvercrest_powerplug`
**Unittest:**
See also the :download:`unittest <devices/_testresults_/unittest.pdf>` documentation.
**Module Documentation:**
"""
__DEPENDENCIES__ = []
import json
import logging
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
BATTERY_WARN_LEVEL = 5
def is_json(data):
try:
json.loads(data)
except json.decoder.JSONDecodeError:
return False
else:
return True
class base(dict):
TX_TOPIC = None
TX_VALUE = 0
TX_DICT = 1
TX_TYPE = -1
TX_FILTER_DATA_KEYS = []
#
RX_LOG_INFO_ALWAYS_KEYS = []
RX_KEYS = []
RX_IGNORE_TOPICS = []
RX_IGNORE_KEYS = []
RX_FILTER_DATA_KEYS = []
def __init__(self, mqtt_client, topic):
# data storage
self.mqtt_client = mqtt_client
self.topic = topic
# initialisations
dict.__init__(self)
mqtt_client.add_callback(
topic=self.topic, callback=self.receive_callback)
mqtt_client.add_callback(
topic=self.topic+"/#", callback=self.receive_callback)
#
self.callback_list = []
self.warning_callback = None
def receive_callback(self, client, userdata, message):
self.unpack(message)
def unpack_filter(self, key):
if key in self.RX_FILTER_DATA_KEYS:
if self.get(key) == 1 or self.get(key) == 'on' or self.get(key) == 'ON':
self[key] = True
elif self.get(key) == 0 or self.get(key) == 'off' or self.get(key) == 'OFF':
self[key] = False
def unpack_single_value(self, key, data):
prev_value = self.get(key)
if key in self.RX_KEYS:
self[key] = data
# Filter, if needed
self.unpack_filter(key)
logger.log(logging.INFO if key in self.RX_LOG_INFO_ALWAYS_KEYS or prev_value != self.get(key) else logging.DEBUG,
"Received data for (%s) %s - %s", self.topic, key, str(self.get(key)))
self.callback_caller(key, self[key])
elif key not in self.RX_IGNORE_KEYS:
logger.warning('Got a message with unparsed content "%s"', key)
else:
logger.debug("Ignoring key %s", key)
def unpack(self, message):
content_key = message.topic[len(self.topic) + 1:]
if content_key not in self.RX_IGNORE_TOPICS:
logger.debug("Unpacking content_key \"%s\" from message.", content_key)
if is_json(message.payload):
data = json.loads(message.payload)
if type(data) is dict:
for key in data:
self.unpack_single_value(key, data[key])
else:
self.unpack_single_value(content_key, data)
# String
else:
self.unpack_single_value(
content_key, message.payload.decode('utf-8'))
self.warning_caller()
else:
logger.debug("Ignoring topic %s", content_key)
def pack_filter(self, key, data):
if key in self.TX_FILTER_DATA_KEYS:
if data is True:
return "on"
elif data is False:
return "off"
else:
return data
return data
def pack(self, key, data):
data = self.pack_filter(key, data)
if self.TX_TOPIC is not None:
if self.TX_TYPE < 0:
logger.error(
"Unknown tx type. Set TX_TYPE of class to a known value")
else:
if self.TX_TYPE == self.TX_DICT:
self.mqtt_client.send('/'.join([self.topic, self.TX_TOPIC]), json.dumps({key: data}))
else:
if type(data) not in [str, bytes]:
data = json.dumps(data)
self.mqtt_client.send('/'.join([self.topic, key, self.TX_TOPIC]), data)
else:
logger.error(
"Unknown tx toptic. Set TX_TOPIC of class to a known value")
def add_callback(self, key, data, callback):
"""
key: key or None for all keys
data: data or None for all data
"""
cb_tup = (key, data, callback)
if cb_tup not in self.callback_list:
self.callback_list.append(cb_tup)
def add_warning_callback(self, callback):
self.warning_callback = callback
def warning_call_condition(self):
return False
def callback_caller(self, key, data):
for cb_key, cb_data, callback in self.callback_list:
if (cb_key == key or cb_key is None) and (cb_data == data or cb_data is None):
callback(self, key, data)
def warning_caller(self):
if self.warning_call_condition():
warn_txt = self.warning_text()
logger.warning(warn_txt)
if self.warning_callback is not None:
self.warning_callback(self, warn_txt)
def warning_text(self, data):
return "default warning text - replace parent warning_text function"
class shelly(base):
KEY_OUTPUT_0 = "relay/0"
KEY_OUTPUT_1 = "relay/1"
KEY_INPUT_0 = "input/0"
KEY_INPUT_1 = "input/1"
KEY_TEMPERATURE = "temperature"
KEY_OVERTEMPERATURE = "overtemperature"
#
TX_TOPIC = 'command'
TX_TYPE = base.TX_VALUE
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1]
#
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0,
KEY_INPUT_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE]
RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + TX_TOPIC, KEY_OUTPUT_1 + '/' + TX_TOPIC,
KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy"]
RX_IGNORE_KEYS = ['temperature_f']
RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1,
KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
# WARNING CALL
#
def warning_call_condition(self):
return self.get(self.KEY_OVERTEMPERATURE)
def warning_text(self):
if self.overtemperature:
if self.temperature is not None:
return "Overtemperature detected for %s. Temperature was %.1f°C." % (self.topic, self.temperature)
else:
return "Overtemperature detected for %s." % self.topic
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def output_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_1)
@property
def input_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_INPUT_0)
@property
def input_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_INPUT_1)
@property
def temperature(self):
"""rv: numeric value"""
return self.get(self.KEY_TEMPERATURE)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_0, state)
def set_output_1(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_1, state)
class silvercrest_powerplug(base):
KEY_LINKQUALITY = "linkquality"
KEY_OUTPUT_0 = "state"
#
TX_TOPIC = 'set'
TX_TYPE = base.TX_DICT
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0]
#
RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0]
RX_IGNORE_TOPICS = [TX_TOPIC]
RX_IGNORE_KEYS = []
RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def linkquality(self):
"""rv: numeric value"""
return self.get(self.KEY_LINKQUALITY)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_0, state)
class my_powerplug(base):
KEY_OUTPUT_0 = "output/1"
KEY_OUTPUT_1 = "output/2"
KEY_OUTPUT_2 = "output/3"
KEY_OUTPUT_3 = "output/4"
KEY_OUTPUT_ALL = "output/all"
#
TX_TOPIC = 'set'
TX_TYPE = base.TX_VALUE
#
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3]
RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + "/" + TX_TOPIC, KEY_OUTPUT_1 + "/" + TX_TOPIC,
KEY_OUTPUT_2 + "/" + TX_TOPIC, KEY_OUTPUT_3 + "/" + TX_TOPIC]
RX_IGNORE_KEYS = []
RX_FILTER_DATA_KEYS = []
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def output_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_1)
@property
def output_2(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_2)
@property
def output_3(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_3)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_0, state)
def set_output_1(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_1, state)
def set_output_2(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_2, state)
def set_output_3(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_3, state)
def set_output_all(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_ALL, state)
class tradfri_light(base):
KEY_LINKQUALITY = "linkquality"
KEY_OUTPUT_0 = "state"
KEY_BRIGHTNESS = "brightness"
KEY_COLOR_TEMP = "color_temp"
#
TX_TOPIC = 'set'
TX_TYPE = base.TX_DICT
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP]
#
RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP]
RX_IGNORE_TOPICS = [TX_TOPIC]
RX_IGNORE_KEYS = ['update', 'color_mode']
RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def unpack_filter(self, key):
if key == self.KEY_BRIGHTNESS:
self[key] = self[key] * 100 / 256
elif key == self.KEY_COLOR_TEMP:
self[key] = (self[key] - 250) * 100 / 204
else:
super().unpack_filter(key)
def pack_filter(self, key, data):
if key == self.KEY_BRIGHTNESS:
return data * 256 / 100
elif key == self.KEY_COLOR_TEMP:
return data * 204 / 100 + 250
else:
return super().pack_filter(key, data)
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def linkquality(self):
"""rv: numeric value"""
return self.get(self.KEY_LINKQUALITY)
@property
def brightness(self):
"""rv: numeric value [0%, ..., 100%"""
return self.get(self.KEY_BRIGHTNESS)
@property
def color_temp(self):
"""rv: numeric value [0%, ..., 100%"""
return self.get(self.KEY_COLOR_TEMP)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False, 'toggle']"""
self.pack(self.KEY_OUTPUT_0, state)
def set_brightness(self, brightness):
"""brightness: [0, ..., 100]"""
self.pack(self.KEY_BRIGHTNESS, brightness)
def set_color_temp(self, color_temp):
"""color_temp: [0, ..., 100]"""
self.pack(self.KEY_COLOR_TEMP, color_temp)
class tradfri_button(base):
KEY_LINKQUALITY = "linkquality"
KEY_BATTERY = "battery"
KEY_ACTION = "action"
#
RX_LOG_INFO_ALWAYS_KEYS = [KEY_ACTION]
RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_ACTION]
RX_IGNORE_TOPICS = []
RX_IGNORE_KEYS = ['update']
RX_FILTER_DATA_KEYS = []
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
# RX
#
@property
def action(self):
"""rv: action_txt"""
return self.get(self.KEY_ACTION)
#
# WARNING CALL
#
def warning_call_condition(self):
return self.get(self.KEY_BATTERY) <= BATTERY_WARN_LEVEL
def warning_text(self):
return "Low battery level detected for %s. Battery level was %.0f%%." % (self.topic, self.get(self.KEY_BATTERY))

19
function/__init__.py Normal file
View File

@ -0,0 +1,19 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
import logging
__all__ = ['all_functions', 'first_floor_dining']
from . import first_floor_dining
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
class all_functions(object):
def __init__(self, device_collection):
first_floor_dining.room_function(device_collection)

View File

@ -0,0 +1,24 @@
import devices
import logging
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
class room_function(object):
def __init__(self, device_collection):
self.main_light_shelly = device_collection.shellies.dinigroom
self.floorlamp_powerplug = device_collection.powerplugs.dining_floorlamp
#
self.main_light_shelly.add_callback(devices.shelly.KEY_OUTPUT_0, None, self.main_light_shelly_callback)
#
self.main_light_shelly_last = None
def main_light_shelly_callback(self, device, key, data):
if data != self.main_light_shelly_last:
logger.info("Test")
self.floorlamp_powerplug.set_output_0(data)
self.main_light_shelly_last

1
mqtt Submodule

@ -0,0 +1 @@
Subproject commit 1921bc619a9c4af682a7707d6fe58069478c59cd

1
report Submodule

@ -0,0 +1 @@
Subproject commit e2392c9f28d88ee54463681850acf95ae496c9a0

94
smart_brain.py Normal file
View File

@ -0,0 +1,94 @@
import config
import devices
import function
import inspect
import logging
import mqtt
import os
import report
import time
logger = logging.getLogger(config.APP_NAME)
class shellies(object):
def __init__(self, mc):
self.dinigroom = devices.shelly(mc, topic="shellies/diningroom") # http://shelly1l-84CCA8ADD055
self.sleep_madi = devices.shelly(mc, topic="shellies/sleep_madi") # http://shelly1l-E8DB84A254C7
# self._ = devices.shelly(mc, topic="") # http://
# self._ = devices.shelly(mc, topic="") # http://
# self._ = devices.shelly(mc, topic="") # http://
# self._ = devices.shelly(mc, topic="") # http://
# self._ = devices.shelly(mc, topic="") # http://
# self._ = devices.shelly(mc, topic="") # http://
# self._ = devices.shelly(mc, topic="") # http://
class powerplugs(object):
def __init__(self, mc):
self.dining_floorlamp = devices.silvercrest_powerplug(mc, "zigbee_og_e/powerplug/dining_floorlamp")
self.aux = devices.silvercrest_powerplug(mc, topic="zigbee_og_e/powerplug/aux")
self.dirk = devices.my_powerplug(mc, "powerplug/dirk")
class lights(object):
def __init__(self, mc):
self.sleep_madi = devices.tradfri_light(mc, topic="zigbee_og_e/light/sleep_madi")
self.sleep_bed_di = devices.tradfri_light(mc, topic="zigbee_og_e/light/sleep_bed_di")
# self._ = devices.tradfri_light(mc, topic="")
# self._ = devices.tradfri_light(mc, topic="")
# self._ = devices.tradfri_light(mc, topic="")
# self._ = devices.tradfri_light(mc, topic="")
# self._ = devices.tradfri_light(mc, topic="")
# self._ = devices.tradfri_light(mc, topic="")
# self._ = devices.tradfri_light(mc, topic="")
# self._ = devices.tradfri_light(mc, topic="")
# self._ = devices.tradfri_light(mc, topic="")
class input_devices(object):
def __init__(self, mc):
self.og_east = devices.tradfri_button(mc, topic="zigbee_og_e/input_device/og_east")
class all_devices(object):
def __init__(self, mc):
self.shellies = shellies(mc)
self.powerplugs = powerplugs(mc)
self.lights = lights(mc)
self.input_devices = input_devices(mc)
def devicelist(self):
rv = []
for name, obj in inspect.getmembers(self):
if not name.startswith('_') and name != inspect.stack()[0][3]:
for devicename, deviceobj in inspect.getmembers(obj):
if not devicename.startswith('_'):
rv.append(deviceobj)
return rv
if __name__ == "__main__":
if config.DEBUG:
report.stdoutLoggingConfigure(([config.APP_NAME, config.DEBUGLEVEL], ), report.LONG_FMT)
else:
report.stdoutLoggingConfigure(((config.APP_NAME, logging.INFO),
(config.APP_NAME+'.devices', logging.WARNING)), report.SHORT_FMT)
#
mc = mqtt.mqtt_client(host=config.MQTT_SERVER, port=config.MQTT_PORT,
username=config.MQTT_USER, password=config.MQTT_PASSWORD, name=config.APP_NAME)
ad = all_devices(mc)
func = function.all_functions(ad)
# def wcb(device, txt):
# logger.warning("%s: %s", device.topic, txt)
# for device in ad.devicelist():
# device.add_warning_callback(wcb)
# def cb(device, key, data):
# print("Callback: %s::%s" % (key, str(data)))
# ad.shellies.dinigroom.add_callback(devices.shelly.KEY_OUTPUT_0, None, cb)
while (True):
time.sleep(1)

4
smart_brain.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/sh
#
BASEPATH=`dirname $0`
$BASEPATH/venv/bin/python $BASEPATH/smart_brain.py