Device definition and initialisation module integrated

This commit is contained in:
Dirk Alders 2023-10-22 20:13:18 +02:00
parent 0459c0d2d2
commit 0bb194dfd0
10 changed files with 236 additions and 130 deletions

3
.gitmodules vendored
View File

@ -7,3 +7,6 @@
[submodule "report"]
path = report
url = https://git.mount-mockery.de/pylib/report.git
[submodule "devdi"]
path = devdi
url = https://git.mount-mockery.de/smarthome/smart_devdi.git

View File

@ -1 +0,0 @@
../smart_brain/config.py

10
config.py Normal file
View File

@ -0,0 +1,10 @@
DEBUG = True
APP_NAME = "smart_homeemulation"
MQTT_SERVER = "localhost"
MQTT_PORT = 1883
MQTT_USER = None
MQTT_PASSWORD = None
CHRISTMAS = True

1
devdi Submodule

@ -0,0 +1 @@
Subproject commit a5a55a158050fdb978f4e9c1f7d43c8e6aa83a1c

View File

@ -1,23 +1,16 @@
import devices.base
import devices.livarno
import devices.shelly
import devices.tradfri
from devices.shelly import shelly_sw1
from devices.tradfri import sw as tradfri_sw
from devices.tradfri import sw_br as tradfri_sw_br
from devices.tradfri import sw_br_ct as tradfri_sw_br_ct
tradfri_button = None
class null(devices.base.base):
"""A dummy device for not yet existing devicetypes
from devices.livarno import sw_br_ct as livarno_sw_br_ct
silvercrest_powerplug = None
silvercrest_motion_sensor = None
Args:
mqtt_client (mqtt.mqtt_client): A MQTT Client instance
topic (str): the base topic for this device
"""
def __init__(self, mqtt_client, topic, **kwargs):
super().__init__(mqtt_client, topic, **kwargs)
self.mqtt_client.add_callback(self.topic, self.__rx__)
self.mqtt_client.add_callback(self.topic + '/#', self.__rx__)
from devices.brennenstuhl import vlv as brennenstuhl_heatingvalve
def __rx__(self, client, userdata, message):
self.logger.warning("Got messaqge for device with missing implementation: topic=%s, payload=%s", message.topic, repr(message.payload))
def set_state(self, value):
self.logger.warning("Got set_state call for device with missing implementation.")
my_powerplug = None
audio_status = None
remote = None

View File

@ -10,16 +10,16 @@ class base(dict):
"""
PROPERTIES = []
def __init__(self, mqtt_client, topic, **kwargs):
def __init__(self, mqtt_client, topic):
super().__init__()
self.mqtt_client = mqtt_client
self.topic = topic
for key in kwargs:
setattr(self, key, kwargs[key])
#
self.logger = logging.getLogger('devices')
for entry in self.topic.split('/'):
self.logger = self.logger.getChild(entry)
#
self.__power_on_inst__ = []
def __set__(self, key, data):
if key in self.PROPERTIES:
@ -27,3 +27,14 @@ class base(dict):
self[key] = data
else:
self.logger.warning("Ignoring unsupported property %s", key)
def power_on(self):
for i in self.__power_on_inst__:
i.power_on_action()
def register_power_on_instance(self, inst):
if inst not in self.__power_on_inst__:
self.__power_on_inst__.append(inst)
def power_on_action(self):
pass

97
devices/brennenstuhl.py Normal file
View File

@ -0,0 +1,97 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
""" Communication (MQTT)
brennenstuhl_heatingvalve {
| "away_mode": ["ON", "OFF"]
| "battery": [0...100] %
| "child_lock": ["LOCK", "UNLOCK"]
| "current_heating_setpoint": [5...30] °C
| "linkquality": [0...255] lqi
| "local_temperature": [numeric] °C
| "preset": ["manual", ...]
| "system_mode": ["heat", ...]
| "valve_detection": ["ON", "OFF"]
| "window_detection": ["ON", "OFF"]
| }
+- set {
"away_mode": ["ON", "OFF", "TOGGLE"]
"child_lock": ["LOCK", "UNLOCK"]
"current_heating_setpoint": [5...30] °C
"preset": ["manual", ...]
"system_mode": ["heat", ...]
"valve_detection": ["ON", "OFF", "TOGGLE"]
"window_detection": ["ON", "OFF", "TOGGLE"]
}
"""
from devices.base import base
import json
import time
""" ANSWER of a device:
{
"away_mode":"OFF",
"battery":5,
"child_lock":"UNLOCK",
"current_heating_setpoint":21,
"linkquality":196,
"local_temperature":21.2,
"preset":"manual",
"system_mode":"heat",
"valve_detection":"ON",
"window_detection":"ON"
}
"""
class vlv(base):
# """A tradfri device with switching functionality
# Args:
# mqtt_client (mqtt.mqtt_client): A MQTT Client instance
# topic (str): the base topic for this device
# """
PROPERTIES = [
"away_mode",
"battery",
"child_lock",
"current_heating_setpoint",
"linkquality",
"local_temperature",
"preset",
"system_mode",
"valve_detection",
"window_detection"
]
def __init__(self, mqtt_client, topic, **kwargs):
super().__init__(mqtt_client, topic, **kwargs)
self["away_mode"] = "OFF"
self["battery"] = 87
self["child_lock"] = "UNLOCK"
self["current_heating_setpoint"] = 21
self["linkquality"] = 196
self["local_temperature"] = 21.2
self["preset"] = "manual"
self["system_mode"] = "heat"
self["valve_detection"] = "ON"
self["window_detection"] = "ON"
#
self.mqtt_client.add_callback(self.topic + '/set', self.__rx_set__)
def set_state(self, value):
self.__set__("state", "on" if value else "off")
self.send_device_status()
def __rx_set__(self, client, userdata, message):
data = json.loads(message.payload)
self.logger.info("Received set data: %s", repr(data))
for key in data:
self.__set__(key, data[key])
#time.sleep(1.5)
self.send_device_status()
def send_device_status(self):
data = json.dumps(self)
self.logger.info("Sending status: %s", repr(data))
self.mqtt_client.send(self.topic, data)

View File

@ -4,3 +4,6 @@ import devices.tradfri
class sw_br_ct(devices.tradfri.sw_br_ct):
def set_state(self, value):
self.__set__("state", "on" if value else "off")
def power_on_action(self):
pass

View File

@ -34,7 +34,7 @@ from devices.base import base
import json
class sw_plain(base):
class shelly_sw1(base):
"""A shelly device with switching functionality
Args:
@ -45,10 +45,8 @@ class sw_plain(base):
PROPERTIES = [
"relay/0",
]
def __init__(self, mqtt_client, topic, **kwargs):
super().__init__(mqtt_client, topic, **kwargs)
if getattr(self, 'cd_r0', None) is None:
self.cd_r0 = []
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
self["state"] = "off"
#
self.mqtt_client.add_callback(self.topic + '/relay/0/command', self.__rx_set__)
@ -59,9 +57,8 @@ class sw_plain(base):
self.logger.info("Received set data for %s: %s", key, repr(data))
self.__set__(key, data)
self.send_device_status(key)
if key == "relay/0":
for d in self.cd_r0:
d.set_state(data.lower() == "on")
if key == "relay/0" and data.lower() == "on":
self.power_on()
def send_device_status(self, key):
data = self[key]

View File

@ -49,15 +49,12 @@ class sw(base):
Args:
mqtt_client (mqtt.mqtt_client): A MQTT Client instance
topic (str): the base topic for this device
kwargs (**dict): cd_st=list of devices connected to state
"""
PROPERTIES = [
"state",
]
def __init__(self, mqtt_client, topic, **kwargs):
super().__init__(mqtt_client, topic, **kwargs)
if getattr(self, 'cd_st', None) is None:
self.cd_st = []
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
self["state"] = "off"
#
self.mqtt_client.add_callback(self.topic + '/set', self.__rx_set__)
@ -73,13 +70,15 @@ class sw(base):
for key in data:
self.__set__(key, data[key])
self.send_device_status()
if "state" in data:
for d in self.cd_st:
d.set_state(data["state"].lower() == "on")
if "state" in data and data.get("state", 'OFF').lower() == "on":
self.power_on()
def __rx_get__(self, client, userdata, message):
self.send_device_status()
def power_on_action(self):
self.send_device_status()
def send_device_status(self):
data = json.dumps(self)
self.logger.info("Sending status: %s", repr(data))
@ -96,8 +95,8 @@ class sw_br(sw):
PROPERTIES = sw.PROPERTIES + [
"brightness",
]
def __init__(self, mqtt_client, topic, **kwargs):
super().__init__(mqtt_client, topic, **kwargs)
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
self["brightness"] = 64
@ -111,6 +110,6 @@ class sw_br_ct(sw_br):
PROPERTIES = sw_br.PROPERTIES + [
"color_temp",
]
def __init__(self, mqtt_client, topic, **kwargs):
super().__init__(mqtt_client, topic, **kwargs)
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
self["color_temp"] = 413

View File

@ -1,100 +1,93 @@
import config
import devices
import devdi
import devdi.props as props
#import function
#import json
import logging
import mqtt
import os
import report
#import subprocess
import time
logger = logging.getLogger(config.APP_NAME)
class device_creator(dict):
def __init__(self, mqtt_client):
self.mqtt_client = mqtt_client
#
# ground floor west
# floor
l1 = self.add_device(devices.livarno.sw_br_ct, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_ZIGBEE % 1)
l2 = self.add_device(devices.livarno.sw_br_ct, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_ZIGBEE % 2)
self.add_device(devices.shelly.sw_plain, config.TOPIC_GFW_FLOOR_MAIN_LIGHT_SHELLY, cd_r0=[l1, l2])
# marion
self.add_device(devices.shelly.sw_plain, config.TOPIC_GFW_MARION_MAIN_LIGHT_SHELLY)
self.add_device(devices.null, config.TOPIC_GFW_MARION_HEATING_VALVE_ZIGBEE)
# dirk
l = self.add_device(devices.tradfri.sw_br_ct, config.TOPIC_GFW_DIRK_MAIN_LIGHT_ZIGBEE)
self.add_device(devices.shelly.sw_plain, config.TOPIC_GFW_DIRK_MAIN_LIGHT_SHELLY, cd_r0=[l])
self.add_device(devices.null, config.TOPIC_GFW_DIRK_INPUT_DEVICE)
self.add_device(devices.null, config.TOPIC_GFW_DIRK_POWERPLUG)
self.add_device(devices.tradfri.sw_br_ct, config.TOPIC_GFW_DIRK_DESK_LIGHT_ZIGBEE)
self.add_device(devices.null, config.TOPIC_GFW_DIRK_HEATING_VALVE_ZIGBEE)
# first floor west
# julian
l = self.add_device(devices.tradfri.sw_br_ct, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_ZIGBEE)
self.add_device(devices.shelly.sw_plain, config.TOPIC_FFW_JULIAN_MAIN_LIGHT_SHELLY, cd_r0=[l])
# bath
self.add_device(devices.null, config.TOPIC_FFW_BATH_HEATING_VALVE_ZIGBEE)
# livingroom
l = self.add_device(devices.tradfri.sw_br_ct, config.TOPIC_FFW_LIVINGROOM_MAIN_LIGHT_ZIGBEE)
self.add_device(devices.shelly.sw_plain, config.TOPIC_FFW_LIVINGROOM_MAIN_LIGHT_SHELLY, cd_r0=[l])
# sleep
l = self.add_device(devices.tradfri.sw_br, config.TOPIC_FFW_SLEEP_MAIN_LIGHT_ZIGBEE)
self.add_device(devices.shelly.sw_plain, config.TOPIC_FFW_SLEEP_MAIN_LIGHT_SHELLY, cd_r0=[l])
# first floor east
# floor
self.add_device(devices.shelly.sw_plain, config.TOPIC_FFE_FLOOR_MAIN_LIGHT_SHELLY)
# kitchen
self.add_device(devices.shelly.sw_plain, config.TOPIC_FFE_KITCHEN_MAIN_LIGHT_SHELLY)
self.add_device(devices.null, config.TOPIC_FFE_KITCHEN_CIRCULATION_PUMP_SHELLY)
# diningroom
self.add_device(devices.shelly.sw_plain, config.TOPIC_FFE_DININGROOM_MAIN_LIGHT_SHELLY)
self.add_device(devices.null, config.TOPIC_FFE_DININGROOM_FLOOR_LAMP_POWERPLUG)
self.add_device(devices.null, config.TOPIC_FFE_DININGROOM_GARLAND_POWERPLUG)
# sleep
l = self.add_device(devices.tradfri.sw_br_ct, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_ZIGBEE)
self.add_device(devices.shelly.sw_plain, config.TOPIC_FFE_SLEEP_MAIN_LIGHT_SHELLY, cd_r0=[l])
self.add_device(devices.null, config.TOPIC_FFE_SLEEP_INPUT_DEVICE)
self.add_device(devices.tradfri.sw_br, config.TOPIC_FFE_SLEEP_BED_LIGHT_DI_ZIGBEE)
self.add_device(devices.null, config.TOPIC_FFE_SLEEP_BED_LIGHT_MA_POWERPLUG)
self.add_device(devices.null, config.TOPIC_FFE_SLEEP_HEATING_VALVE_ZIGBEE)
# livingroom
l = self.add_device(devices.tradfri.sw_br_ct, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_ZIGBEE)
self.add_device(devices.shelly.sw_plain, config.TOPIC_FFE_LIVINGROOM_MAIN_LIGHT_SHELLY, cd_r0=[l])
for i in range(1,7):
self.add_device(devices.tradfri.sw_br_ct, config.TOPIC_FFE_LIVINGROOM_FLOOR_LAMP_ZIGBEE % i)
self.add_device(devices.null, config.TOPIC_FFE_LIVINGROOM_XMAS_TREE_POWERPLUG)
self.add_device(devices.null, config.TOPIC_FFE_LIVINGROOM_XMAS_STAR_POWERPLUG)
# first floor east
# floor
self.add_device(devices.shelly.sw_plain, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_SHELLY)
self.add_device(devices.null, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_MOTION_SENSOR_FF)
self.add_device(devices.null, config.TOPIC_STW_STAIRWAY_MAIN_LIGHT_MOTION_SENSOR_GF)
def add_device(self, deviceclass, topic, **kwargs):
self[topic] = deviceclass(self.mqtt_client, topic, **kwargs)
return self[topic]
if __name__ == "__main__":
report.stdoutLoggingConfigure((
(config.APP_NAME, logging.DEBUG),
('devices', logging.DEBUG),
), report.SHORT_FMT)
mc = mqtt.mqtt_client(host=config.MQTT_SERVER, port=config.MQTT_PORT, username=config.MQTT_USER,
password=config.MQTT_PASSWORD, name='home_emulation')
#
# Logging
#
if config.DEBUG:
report.appLoggingConfigure(None, 'stdout', ((config.APP_NAME, logging.DEBUG), ),
target_level=logging.WARNING, fmt=report.SHORT_FMT, host='localhost', port=19996)
else:
report.stdoutLoggingConfigure(((config.APP_NAME, logging.WARNING), ), report.SHORT_FMT)
device_dict = device_creator(mc)
#
# MQTT Client
#
mc = mqtt.mqtt_client(host=config.MQTT_SERVER, port=config.MQTT_PORT, username=config.MQTT_USER,
password=config.MQTT_PASSWORD, name=config.APP_NAME)
#
# Smarthome Devices
#
ddi = devdi.devices(mc)
#
# Smart Home Functionality
#
#######
# GFW #
#######
loc = props.LOC_GFW
# DIRK
roo = props.ROO_DIR
sml = ddi.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = ddi.get(props.STG_ZGW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml)
# FLOOR
roo = props.ROO_FLO
sml = ddi.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = ddi.get(props.STG_ZGW, loc, roo, props.FUN_MAL, 1)
sml.register_power_on_instance(tml)
tml = ddi.get(props.STG_ZGW, loc, roo, props.FUN_MAL, 2)
sml.register_power_on_instance(tml)
#######
# FFW #
#######
loc = props.LOC_FFW
# JULIAN
roo = props.ROO_JUL
sml = ddi.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = ddi.get(props.STG_ZFW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml)
# LIVINGROOM
roo = props.ROO_LIV
sml = ddi.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = ddi.get(props.STG_ZFW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml)
# SLEEP
roo = props.ROO_SLP
sml = ddi.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = ddi.get(props.STG_ZFW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml)
#######
# FFE #
#######
loc = props.LOC_FFE
# LIVINGROOM
roo = props.ROO_LIV
sml = ddi.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = ddi.get(props.STG_ZFE, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml)
# SLEEP
roo = props.ROO_SLP
sml = ddi.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = ddi.get(props.STG_ZFE, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml)
while (True):
time.sleep(1)