Initial zigbee heating valve check implemented

This commit is contained in:
Dirk Alders 2023-12-12 12:44:34 +01:00
parent 13ac794416
commit 9b3e64060b
23 changed files with 290 additions and 0 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
z_server/config.py
# ---> Linux
*~

21
.gitmodules vendored
View File

@ -1,3 +1,24 @@
[submodule "nagios"]
path = nagios
url = https://git.mount-mockery.de/pylib/nagios.git
[submodule "z_server/mqtt"]
path = z_server/mqtt
url = https://git.mount-mockery.de/pylib/mqtt.git
[submodule "z_server/report"]
path = z_server/report
url = https://git.mount-mockery.de/pylib/report.git
[submodule "z_server/socket_protocol"]
path = socket_protocol
url = https://git.mount-mockery.de/pylib/socket_protocol.git
[submodule "z_server/stringtools"]
path = stringtools
url = https://git.mount-mockery.de/pylib/stringtools.git
[submodule "z_server/task"]
path = task
url = https://git.mount-mockery.de/pylib/task.git
[submodule "z_server/tcp_socket"]
path = tcp_socket
url = https://git.mount-mockery.de/pylib/tcp_socket.git
[submodule "z_server/smart_devdi"]
path = z_server/devdi
url = https://git.mount-mockery.de/smarthome/smart_devdi.git

16
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,16 @@
{
// Verwendet IntelliSense zum Ermitteln möglicher Attribute.
// Zeigen Sie auf vorhandene Attribute, um die zugehörigen Beschreibungen anzuzeigen.
// Weitere Informationen finden Sie unter https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Main File execution",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/z_server/z_server.py",
"console": "integratedTerminal",
"justMyCode": true
}
]
}

View File

@ -1,4 +1,5 @@
{
"python.defaultInterpreterPath": "./z_server/venv/bin/python",
"autopep8.args": ["--max-line-length=150"],
"[python]": {
"python.formatting.provider": "none",

36
check_z_heat_vlv Executable file
View File

@ -0,0 +1,36 @@
#!/bin/python3
#
import argparse
import nagios
import time
from z_server import config
from z_server import tcp_socket
from z_server.z_protocol import server as client_prot
from z_server.z_protocol import DID_FOLLOWS_HEATING_SETPOINT
from z_server import socket_protocol
import sys
if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog='ProgramName',
description='What the program does',
epilog='Text at the bottom of help')
parser.add_argument('-s', '--stg', required=True)
parser.add_argument('-l', '--loc', required=True)
parser.add_argument('-r', '--roo', required=True)
args = parser.parse_args()
#
c = tcp_socket.tcp_client_stp('127.0.0.1', config.SOCK_PROT_PORT)
sp = client_prot(c, channel_name='example_client')
#
data = {
"stg": args.stg,
"loc": args.loc,
"roo": args.roo,
"fun": "FUN_HEA" # <-- Const, because script is for heat_vlv only
}
sp.send(socket_protocol.SID_READ_REQUEST, DID_FOLLOWS_HEATING_SETPOINT, data)
#
sp_data = sp.receive(socket_protocol.SID_READ_RESPONSE, DID_FOLLOWS_HEATING_SETPOINT).get_data()
nagios.Nagios().exit(**sp_data)

1
socket_protocol Submodule

@ -0,0 +1 @@
Subproject commit 7f2b90c99030ac39f5bf7bfd92d1dd3d6c3657fa

1
stringtools Submodule

@ -0,0 +1 @@
Subproject commit e1f76d96312e540544b2328d0937b0aa41126aa9

1
task Submodule

@ -0,0 +1 @@
Subproject commit af35e83d1f07fd4cb9070bdb77cf1f3bdda3a463

1
tcp_socket Submodule

@ -0,0 +1 @@
Subproject commit 72275a3270b0d34a696a2809cc066f84d72268a5

0
z_server/__init__.py Normal file
View File

View File

@ -0,0 +1,13 @@
import logging
DEBUG = False # False: logging to stdout with given LOGLEVEL - True: logging all to localhost:19996 and warnings or higher to stdout
LOGLEVEL = logging.INFO
APP_NAME = "z_monitor"
MQTT_SERVER = "<hostname>"
MQTT_PORT = 1883
MQTT_USER = "<username>"
MQTT_PASSWORD = "<password>"
SOCK_PROT_PORT = 8380

1
z_server/devdi Submodule

@ -0,0 +1 @@
Subproject commit 7e4f92aaf1cd1dd2626e2b3337277eddfc37578e

View File

@ -0,0 +1,114 @@
import json
import logging
import mqtt
import nagios
import time
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
class base(object):
FOLLOW_REQUEST_WARNING = 5 # Seconds, till warning comes up, if device does not follow the command
FOLLOW_REQUEST_ERROR = 60 # Seconds, till error comes up, if device does not follow the command
def __init__(self, mqtt_client: mqtt.mqtt_client, topic):
self.topic = topic
#
mqtt_client.add_callback(topic, self.__rx__)
mqtt_client.add_callback(topic + '/#', self.__rx__)
#
self.__target_storage__ = {}
self.__state_storage__ = {}
def __rx__(self, client, userdata, message):
pass
def target(self, key, value):
tm_t, value_t = self.__target_storage__.get(key, (0, None))
if value != value_t:
self.__target_storage__[key] = time.time(), value
logger.debug("Target value for device identified: %s: %s", key, repr(value))
def state(self, key, value):
self.__state_storage__[key] = value
logger.debug("Device state identified: %s: %s", key, repr(value))
def status(self, key):
try:
tm_t, value_t = self.__target_storage__[key]
value_s = self.__state_storage__.get(key)
except KeyError:
return {"status": nagios.Nagios.UNKNOWN, "msg": "Device exists, but no setpoint change or unknown monitoring"}
else:
tm = time.time()
dt = tm - tm_t
if value_t != value_s and dt > self.FOLLOW_REQUEST_ERROR:
return {"status": nagios.Nagios.ERROR, "msg": "Requested setpoint unequal valve setpoint since %.1fs" % dt}
elif value_t != value_s and dt > self.FOLLOW_REQUEST_WARNING:
return {"status": nagios.Nagios.WARNING, "msg": "Requested setpoint unequal valve setpoint since %.1fs" % dt}
return {"status": nagios.Nagios.OK, "msg": "Requested setpoint equal valve setpoint"}
class group(object):
def __init__(self, *args, **kwargs):
pass
class shelly_sw1(base):
pass
class tradfri_sw(base):
pass
class tradfri_sw_br(base):
pass
class tradfri_sw_br_ct(base):
pass
class tradfri_button(base):
pass
class livarno_sw_br_ct(base):
pass
class brennenstuhl_heatingvalve(base):
def __init__(self, mqtt_client: mqtt.mqtt_client, topic):
base.__init__(self, mqtt_client, topic)
def __rx__(self, client, userdata, message):
payload = json.loads(message.payload)
if message.topic == self.topic + '/set':
self.target("current_heating_setpoint", payload["current_heating_setpoint"])
if message.topic == self.topic:
self.state("current_heating_setpoint", payload["current_heating_setpoint"])
class silvercrest_powerplug(base):
pass
class silvercrest_motion_sensor(base):
pass
class my_powerplug(base):
pass
class audio_status(base):
pass
class remote(base):
pass

1
z_server/mqtt Submodule

@ -0,0 +1 @@
Subproject commit 1adfb0626e7777c6d29be65d4ad4ce2d57541301

1
z_server/nagios Symbolic link
View File

@ -0,0 +1 @@
../nagios

1
z_server/report Submodule

@ -0,0 +1 @@
Subproject commit b53dd30eae1d679b7eec4999bec50aed55bc105b

1
z_server/socket_protocol Symbolic link
View File

@ -0,0 +1 @@
../socket_protocol

1
z_server/stringtools Symbolic link
View File

@ -0,0 +1 @@
../stringtools

1
z_server/task Symbolic link
View File

@ -0,0 +1 @@
../task

1
z_server/tcp_socket Symbolic link
View File

@ -0,0 +1 @@
../tcp_socket

31
z_server/z_protocol.py Normal file
View File

@ -0,0 +1,31 @@
import nagios
import socket_protocol
# from devdi.topic import topic_by_props
DID_FOLLOWS_HEATING_SETPOINT = 'current_heating_setpoint'
class server(socket_protocol.pure_json_protocol):
def __init__(self, *args, **kwargs):
if 'devices' in kwargs:
self.__devices__ = kwargs.pop('devices')
socket_protocol.pure_json_protocol.__init__(self, *args, **kwargs)
#
# self.add_data((socket_protocol.SID_READ_REQUEST, socket_protocol.SID_READ_RESPONSE),
# DID_FOLLOWS_HEATING_SETPOINT, 'current_heating_setpoint')
#
if not self.__comm_inst__.IS_CLIENT:
self.register_callback(socket_protocol.SID_READ_REQUEST, DID_FOLLOWS_HEATING_SETPOINT, self.follow_heating_setpoint)
def follow_heating_setpoint(self, msg):
if msg.get_status() == socket_protocol.STATUS_OKAY:
try:
dev = self.__devices__.get_str(**msg.get_data())
except:
return socket_protocol.STATUS_CALLBACK_ERROR, {"status": nagios.Nagios.UNKNOWN, "msg": "Exception while getting device."}
if dev is None:
return socket_protocol.STATUS_SERVICE_OR_DATA_UNKNOWN, {"status": nagios.Nagios.UNKNOWN, "msg": "Device does not exist."}
else:
return socket_protocol.STATUS_OKAY, dev.status(msg.get_data_id())
else:
return socket_protocol.STATUS_OPERATION_NOT_PERMITTED, {"status": nagios.Nagios.UNKNOWN, "msg": "Socket protocol error."}

40
z_server/z_server.py Normal file
View File

@ -0,0 +1,40 @@
import config
import devdi.devices as devices
import logging
import mqtt
import report
import z_protocol
import socket_protocol
import tcp_socket
import time
logger = logging.getLogger(config.APP_NAME)
if __name__ == "__main__":
#
# Logging
#
if config.DEBUG:
report.appLoggingConfigure(None, 'stdout', ((config.APP_NAME, logging.DEBUG), ),
target_level=logging.WARNING, fmt=report.SHORT_FMT, host='localhost', port=19996)
else:
report.stdoutLoggingConfigure(((config.APP_NAME, config.LOGLEVEL), ), report.SHORT_FMT)
#
# MQTT Client
#
mc = mqtt.mqtt_client(host=config.MQTT_SERVER, port=config.MQTT_PORT, username=config.MQTT_USER,
password=config.MQTT_PASSWORD, name=config.APP_NAME)
#
# Smarthome physical Devices
#
pd = devices.physical_devices(mc)
#
# Socket Protocol
#
s = tcp_socket.tcp_server_stp('127.0.0.1', config.SOCK_PROT_PORT)
sp = z_protocol.server(s, channel_name='example_server', devices=pd)
while (True):
time.sleep(5)

4
z_server/z_server.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/sh
#
BASEPATH=`dirname $0`
$BASEPATH/venv/bin/python $BASEPATH/z_server.py