Battery Levels adapted and battery driven devices added

This commit is contained in:
Dirk Alders 2023-12-19 07:12:56 +01:00
parent b7d12a5044
commit c21bbf4716
5 changed files with 148 additions and 29 deletions

1
check_z_motion_battery Symbolic link
View File

@ -0,0 +1 @@
check_z_motion_heartbeat

43
check_z_motion_heartbeat Executable file
View File

@ -0,0 +1,43 @@
#!/bin/python3
#
import argparse
import nagios
import time
from z_server import config
from z_server import tcp_socket
from z_server.z_protocol import server as client_prot
from z_server.z_protocol import DID_FOLLOWS_HEATING_SETPOINT, DID_BATTERY_LEVEL, DID_HEARTBEAT
from z_server import socket_protocol
import sys
if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog='ProgramName',
description='What the program does',
epilog='Text at the bottom of help')
parser.add_argument('-s', '--stg', required=True)
parser.add_argument('-l', '--loc', required=True)
parser.add_argument('-r', '--roo', required=True)
args = parser.parse_args()
#
c = tcp_socket.tcp_client_stp('127.0.0.1', config.SOCK_PROT_PORT)
sp = client_prot(c, channel_name='example_client')
#
data = {
"stg": args.stg,
"loc": args.loc,
"roo": args.roo,
"fun": "FUN_MSE" # <-- Const, because script is for input_device
}
#
if sys.argv[0].endswith('check_z_motion_heartbeat'):
sp.send(socket_protocol.SID_READ_REQUEST, DID_HEARTBEAT, data)
sp_data = sp.receive(socket_protocol.SID_READ_RESPONSE, DID_HEARTBEAT).get_data()
elif sys.argv[0].endswith('check_z_motion_battery'):
sp.send(socket_protocol.SID_READ_REQUEST, DID_BATTERY_LEVEL, data)
sp_data = sp.receive(socket_protocol.SID_READ_RESPONSE, DID_BATTERY_LEVEL).get_data()
else:
sys.stderr.write('No action for command called "%s"\n' % sys.argv[0])
sys.exit(100)
nagios.Nagios().exit(**sp_data)

1
check_z_tr_inp_battery Symbolic link
View File

@ -0,0 +1 @@
check_z_tr_inp_heartbeat

43
check_z_tr_inp_heartbeat Executable file
View File

@ -0,0 +1,43 @@
#!/bin/python3
#
import argparse
import nagios
import time
from z_server import config
from z_server import tcp_socket
from z_server.z_protocol import server as client_prot
from z_server.z_protocol import DID_FOLLOWS_HEATING_SETPOINT, DID_BATTERY_LEVEL, DID_HEARTBEAT
from z_server import socket_protocol
import sys
if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog='ProgramName',
description='What the program does',
epilog='Text at the bottom of help')
parser.add_argument('-s', '--stg', required=True)
parser.add_argument('-l', '--loc', required=True)
parser.add_argument('-r', '--roo', required=True)
args = parser.parse_args()
#
c = tcp_socket.tcp_client_stp('127.0.0.1', config.SOCK_PROT_PORT)
sp = client_prot(c, channel_name='example_client')
#
data = {
"stg": args.stg,
"loc": args.loc,
"roo": args.roo,
"fun": "FUN_INP" # <-- Const, because script is for input_device
}
#
if sys.argv[0].endswith('check_z_tr_inp_heartbeat'):
sp.send(socket_protocol.SID_READ_REQUEST, DID_HEARTBEAT, data)
sp_data = sp.receive(socket_protocol.SID_READ_RESPONSE, DID_HEARTBEAT).get_data()
elif sys.argv[0].endswith('check_z_tr_inp_battery'):
sp.send(socket_protocol.SID_READ_REQUEST, DID_BATTERY_LEVEL, data)
sp_data = sp.receive(socket_protocol.SID_READ_RESPONSE, DID_BATTERY_LEVEL).get_data()
else:
sys.stderr.write('No action for command called "%s"\n' % sys.argv[0])
sys.exit(100)
nagios.Nagios().exit(**sp_data)

View File

@ -16,8 +16,8 @@ class base(object):
FOLLOW_REQUEST_ERROR = 60 # Seconds, till error comes up, if device does not follow the command FOLLOW_REQUEST_ERROR = 60 # Seconds, till error comes up, if device does not follow the command
FOLLOW_KEYS = ["current_heating_setpoint", ] FOLLOW_KEYS = ["current_heating_setpoint", ]
# #
BATTERY_LVL_WARNING = 20 BATTERY_LVL_WARNING = 15
BATTERY_LVL_ERROR = 10 BATTERY_LVL_ERROR = 5
# #
LAST_MSG_WARNING = 6 * 24 * 60 * 60 LAST_MSG_WARNING = 6 * 24 * 60 * 60
LAST_MSG_ERROR = 24 * 24 * 60 * 60 LAST_MSG_ERROR = 24 * 24 * 60 * 60
@ -89,9 +89,9 @@ class base(object):
elif key == "battery": elif key == "battery":
if self.battery is None: if self.battery is None:
return {"status": nagios.Nagios.UNKNOWN, "msg": "Device exists, but no data received or unknown monitoring"} return {"status": nagios.Nagios.UNKNOWN, "msg": "Device exists, but no data received or unknown monitoring"}
elif self.battery < self.BATTERY_LVL_ERROR: elif self.battery <= self.BATTERY_LVL_ERROR:
return {"status": nagios.Nagios.ERROR, "msg": "Battery level critical low (%.1f%%)" % self.battery} return {"status": nagios.Nagios.ERROR, "msg": "Battery level critical low (%.1f%%)" % self.battery}
elif self.battery < self.BATTERY_LVL_WARNING: elif self.battery <= self.BATTERY_LVL_WARNING:
return {"status": nagios.Nagios.WARNING, "msg": "Battery level low (%.1f%%)" % self.battery} return {"status": nagios.Nagios.WARNING, "msg": "Battery level low (%.1f%%)" % self.battery}
else: else:
return {"status": nagios.Nagios.OK, "msg": "Battery okay (%.1f%%)" % self.battery} return {"status": nagios.Nagios.OK, "msg": "Battery okay (%.1f%%)" % self.battery}
@ -119,7 +119,22 @@ class tradfri_sw_br_ct(base):
class tradfri_button(base): class tradfri_button(base):
pass def __rx__(self, client, userdata, message):
try:
payload = json.loads(message.payload)
except json.decoder.JSONDecodeError:
logger.warning("JSON decode error %s", self.topic)
else:
#
# heartbeat
#
if message.topic == self.topic:
self.last_device_msg = time.time()
#
# battery level
#
if "battery" in payload and message.topic == self.topic:
self.battery = payload["battery"]
class livarno_sw_br_ct(base): class livarno_sw_br_ct(base):
@ -127,14 +142,15 @@ class livarno_sw_br_ct(base):
class brennenstuhl_heatingvalve(base): class brennenstuhl_heatingvalve(base):
BATTERY_LVL_WARNING = 10 BATTERY_LVL_WARNING = 4
BATTERY_LVL_ERROR = 5 BATTERY_LVL_ERROR = 3
def __init__(self, mqtt_client: mqtt.mqtt_client, topic):
base.__init__(self, mqtt_client, topic)
def __rx__(self, client, userdata, message): def __rx__(self, client, userdata, message):
try:
payload = json.loads(message.payload) payload = json.loads(message.payload)
except json.decoder.JSONDecodeError:
logger.warning("JSON decode error %s", self.topic)
else:
# #
# heartbeat # heartbeat
# #
@ -160,7 +176,22 @@ class silvercrest_powerplug(base):
class silvercrest_motion_sensor(base): class silvercrest_motion_sensor(base):
pass def __rx__(self, client, userdata, message):
try:
payload = json.loads(message.payload)
except json.decoder.JSONDecodeError:
logger.warning("JSON decode error %s", self.topic)
else:
#
# heartbeat
#
if message.topic == self.topic:
self.last_device_msg = time.time()
#
# battery level
#
if "battery" in payload and message.topic == self.topic:
self.battery = payload["battery"]
class my_powerplug(base): class my_powerplug(base):