User interface for warning and triggering device warning states

This commit is contained in:
Dirk Alders 2023-02-13 16:01:17 +01:00
parent 0d7b9c3903
commit 60b18fac64
4 changed files with 96 additions and 7 deletions

View File

@ -11,7 +11,7 @@ COLOR_SHELLY = colored.fg("light_magenta")
COLOR_POWERPLUG = colored.fg("light_cyan") COLOR_POWERPLUG = colored.fg("light_cyan")
COLOR_LIGHT_ACTIVE = colored.fg("yellow") COLOR_LIGHT_ACTIVE = colored.fg("yellow")
COLOR_LIGHT_PASSIVE = COLOR_LIGHT_ACTIVE + colored.attr("dim") COLOR_LIGHT_PASSIVE = COLOR_LIGHT_ACTIVE + colored.attr("dim")
COLOR_MOTION_SENSOR = colored.fg("dark_orange_3b") COLOR_WARNINGS = colored.fg("dark_orange_3b")
COLOR_HEATING_VALVE = colored.fg("red") COLOR_HEATING_VALVE = colored.fg("red")
COLOR_REMOTE = colored.fg("green") COLOR_REMOTE = colored.fg("green")
@ -125,8 +125,10 @@ class shelly(base):
KEY_INPUT_1 = "input/1" KEY_INPUT_1 = "input/1"
KEY_LONGPUSH_0 = "longpush/0" KEY_LONGPUSH_0 = "longpush/0"
KEY_LONGPUSH_1 = "longpush/1" KEY_LONGPUSH_1 = "longpush/1"
KEY_TEMPERATURE = "temperature"
KEY_OVERTEMPERATURE = "overtemperature"
# #
BOOL_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, ] BOOL_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, ]
# #
INPUT_FUNC_OUT1_FOLLOW = "out1_follow" INPUT_FUNC_OUT1_FOLLOW = "out1_follow"
INPUT_FUNC_OUT1_TRIGGER = "out1_trigger" INPUT_FUNC_OUT1_TRIGGER = "out1_trigger"
@ -139,11 +141,13 @@ class shelly(base):
"get_input_0", "toggle_input_0", "get_input_0", "toggle_input_0",
"get_input_1", "toggle_input_1", "get_input_1", "toggle_input_1",
"trigger_input_0_longpress", "trigger_input_1_longpress", "trigger_input_0_longpress", "trigger_input_1_longpress",
"toggle_overtemperature",
] ]
def __init__(self, mqtt_client, topic, input_0_func=None, input_1_func=None, output_0_auto_off=None): def __init__(self, mqtt_client, topic, input_0_func=None, input_1_func=None, output_0_auto_off=None):
super().__init__(mqtt_client, topic, default_values={self.KEY_OUTPUT_0: False, self.KEY_OUTPUT_1: False, super().__init__(mqtt_client, topic, default_values={self.KEY_OUTPUT_0: False, self.KEY_OUTPUT_1: False,
self.KEY_INPUT_0: False, self.KEY_INPUT_1: False}) self.KEY_INPUT_0: False, self.KEY_INPUT_1: False,
self.KEY_TEMPERATURE: 35.2, self.KEY_OVERTEMPERATURE: False})
# #
self.__input_0_func = input_0_func self.__input_0_func = input_0_func
self.__input_1_func = input_1_func self.__input_1_func = input_1_func
@ -154,6 +158,8 @@ class shelly(base):
# publish state changes # publish state changes
self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True) self.add_callback(self.KEY_OUTPUT_0, None, self.__send__, True)
self.add_callback(self.KEY_OUTPUT_1, None, self.__send__, True) self.add_callback(self.KEY_OUTPUT_1, None, self.__send__, True)
self.add_callback(self.KEY_TEMPERATURE, None, self.__send__, True)
self.add_callback(self.KEY_OVERTEMPERATURE, None, self.__send__, True)
# #
if self.__output_0_auto_off__ is not None: if self.__output_0_auto_off__ is not None:
self.__delayed_off__ = task.delayed(float(self.__output_0_auto_off__), self.__auto_off__, self.KEY_OUTPUT_0) self.__delayed_off__ = task.delayed(float(self.__output_0_auto_off__), self.__auto_off__, self.KEY_OUTPUT_0)
@ -164,6 +170,11 @@ class shelly(base):
# #
self.__tx__((self.KEY_OUTPUT_0, self.KEY_OUTPUT_1)) self.__tx__((self.KEY_OUTPUT_0, self.KEY_OUTPUT_1))
def __int_to_ext__(self, key, value):
if key == self.KEY_OVERTEMPERATURE:
return int(value)
return super().__int_to_ext__(key, value)
def __rx__(self, client, userdata, message): def __rx__(self, client, userdata, message):
value = self.__payload_filter__(message.payload) value = self.__payload_filter__(message.payload)
if message.topic.startswith(self.topic) and message.topic.endswith("/command"): if message.topic.startswith(self.topic) and message.topic.endswith("/command"):
@ -235,6 +246,11 @@ class shelly(base):
time.sleep(0.1) time.sleep(0.1)
self.set(self.KEY_INPUT_1, not self[self.KEY_INPUT_1]) self.set(self.KEY_INPUT_1, not self[self.KEY_INPUT_1])
self.set(self.KEY_LONGPUSH_1, False) self.set(self.KEY_LONGPUSH_1, False)
elif command == self.COMMANDS[10]:
if self.get(self.KEY_OVERTEMPERATURE):
self.warning_state_off()
else:
self.warning_state_on()
else: else:
print("%s: not yet implemented!" % command) print("%s: not yet implemented!" % command)
else: else:
@ -246,6 +262,14 @@ class shelly(base):
channel = "(%s%s)" % (self.names.get(key, key), info) channel = "(%s%s)" % (self.names.get(key, key), info)
self.print_formatted_light(COLOR_SHELLY, value, channel) self.print_formatted_light(COLOR_SHELLY, value, channel)
def warning_state_on(self):
self.set(self.KEY_TEMPERATURE, 78.3)
self.set(self.KEY_OVERTEMPERATURE, True)
def warning_state_off(self):
self.set(self.KEY_TEMPERATURE, 34.1)
self.set(self.KEY_OVERTEMPERATURE, False)
class my_powerplug(base): class my_powerplug(base):
KEY_OUTPUT_0 = "state" KEY_OUTPUT_0 = "state"
@ -312,8 +336,11 @@ class silvercrest_powerplug(base):
self.set(self.KEY_OUTPUT_0, self.__ext_to_int__(self.KEY_OUTPUT_0, state)) self.set(self.KEY_OUTPUT_0, self.__ext_to_int__(self.KEY_OUTPUT_0, state))
def __tx__(self, keys_changed): def __tx__(self, keys_changed):
data = {}
for key in self:
data[key] = self.__int_to_ext__(key, self[key])
for key in keys_changed: for key in keys_changed:
self.mqtt_client.send(self.topic + '/' + key, self.__int_to_ext__(self.KEY_OUTPUT_0, self.get(self.KEY_OUTPUT_0))) self.mqtt_client.send(self.topic, json.dumps(data))
def command(self, command): def command(self, command):
if command in self.COMMANDS: if command in self.COMMANDS:
@ -454,20 +481,23 @@ class brennenstuhl_heating_valve(base):
# #
KEY_TEMPERATURE_SETPOINT = "current_heating_setpoint" KEY_TEMPERATURE_SETPOINT = "current_heating_setpoint"
KEY_TEMPERATURE = "local_temperature" KEY_TEMPERATURE = "local_temperature"
KEY_BATTERY = "battery"
# #
COMMANDS = [ COMMANDS = [
"get_temperature_setpoint", "set_temperature_setpoint", "set_local_temperature", "get_temperature_setpoint", "set_temperature_setpoint", "set_local_temperature", "set_battery",
] ]
def __init__(self, mqtt_client, topic): def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, default_values={ super().__init__(mqtt_client, topic, default_values={
self.KEY_TEMPERATURE_SETPOINT: 20, self.KEY_TEMPERATURE_SETPOINT: 20,
self.KEY_TEMPERATURE: 20.7, self.KEY_TEMPERATURE: 20.7,
self.KEY_BATTERY: 97,
}) })
# #
self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.print_formatted, True) self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.print_formatted, True)
self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.__send__, True) self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.__send__, True)
self.add_callback(self.KEY_TEMPERATURE, None, self.__send__, True) self.add_callback(self.KEY_TEMPERATURE, None, self.__send__, True)
self.add_callback(self.KEY_BATTERY, None, self.__send__, True)
# #
self.__tx__((self.KEY_TEMPERATURE_SETPOINT, )) self.__tx__((self.KEY_TEMPERATURE_SETPOINT, ))
@ -495,6 +525,8 @@ class brennenstuhl_heating_valve(base):
self.set(self.KEY_TEMPERATURE_SETPOINT, self.__command_float_value__(value)) self.set(self.KEY_TEMPERATURE_SETPOINT, self.__command_float_value__(value))
elif command == self.COMMANDS[2]: elif command == self.COMMANDS[2]:
self.set(self.KEY_TEMPERATURE, self.__command_float_value__(value)) self.set(self.KEY_TEMPERATURE, self.__command_float_value__(value))
elif command == self.COMMANDS[3]:
self.set(self.KEY_BATTERY, self.__command_float_value__(value))
def print_formatted(self, device, key, value): def print_formatted(self, device, key, value):
if key == self.KEY_TEMPERATURE_SETPOINT: if key == self.KEY_TEMPERATURE_SETPOINT:
@ -503,6 +535,12 @@ class brennenstuhl_heating_valve(base):
perc = 0 if perc < 0 else perc perc = 0 if perc < 0 else perc
self.print_formatted_percent(COLOR_HEATING_VALVE, '\u03d1', perc, "%4.1f°C" % value, "") self.print_formatted_percent(COLOR_HEATING_VALVE, '\u03d1', perc, "%4.1f°C" % value, "")
def warning_state_on(self):
self.set(self.KEY_BATTERY, 7)
def warning_state_off(self):
self.set(self.KEY_BATTERY, 97)
class videv_light(base_videv): class videv_light(base_videv):
KEY_OUTPUT_0 = "state" KEY_OUTPUT_0 = "state"
@ -733,8 +771,44 @@ class videv_heating(base_videv):
self.print_formatted_percent(COLOR_GUI_ACTIVE, 't', perc, '%3d%%' % perc, '(%.1f)' % disp_value) self.print_formatted_percent(COLOR_GUI_ACTIVE, 't', perc, '%3d%%' % perc, '(%.1f)' % disp_value)
class videv_warnings(base):
KEY_WARNING = "html_short"
#
COMMANDS = [
"get_warnings",
]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, dict.fromkeys([self.KEY_WARNING]))
#
self.add_callback(self.KEY_WARNING, None, self.print_formatted, True)
def __rx__(self, client, userdata, message):
if message.topic == self.topic + "/" + self.KEY_WARNING:
self.set(self.KEY_WARNING, message.payload.decode("utf-8"))
def command(self, command):
if command in self.COMMANDS:
if command == self.COMMANDS[0]:
self.print_formatted(self, self.KEY_WARNING, self.get(self.KEY_WARNING))
else:
print("%s: not yet implemented!" % command)
else:
print("Unknown command!")
def print_formatted(self, device, key, value):
if OUTPUT_ACTIVE:
value = value.replace("<br><br>", "\n")
value = value.replace("<br>", " - ")
print(COLOR_WARNINGS + "** WARNINGS: *************************************************")
for line in value.split("\n"):
print(" ", line)
print("**************************************************************" + colored.attr("reset"))
# class silvercrest_motion_sensor(base): # class silvercrest_motion_sensor(base):
# KEY_OCCUPANCY = "occupancy" # KEY_OCCUPANCY = "occupancy"
# KEY_BATTERY = "battery"
# KEY_BATTERY_LOW = "battery_low"
# COMMANDS = ['motion'] # COMMANDS = ['motion']
# def __init__(self, mqtt_client, topic): # def __init__(self, mqtt_client, topic):
@ -761,9 +835,17 @@ class videv_heating(base_videv):
# if value is not None: # if value is not None:
# print_light(COLOR_MOTION_SENSOR, value, self.topic, "") # print_light(COLOR_MOTION_SENSOR, value, self.topic, "")
# def warning_state_on(self):
# self.set(self.KEY_BATTERY, 7)
# self.set(self.KEY_BATTERY_LOW, True)
# def warning_state_off(self):
# self.set(self.KEY_BATTERY, 97)
# self.set(self.KEY_BATTERY_LOW, False)
# class tradfri_button(base): # class tradfri_button(base):
# KEY_ACTION = "action" # KEY_ACTION = "action"
# KEY_BATTERY = "battery"
# # # #
# ACTION_TOGGLE = "toggle" # ACTION_TOGGLE = "toggle"
# ACTION_BRIGHTNESS_UP = "brightness_up_click" # ACTION_BRIGHTNESS_UP = "brightness_up_click"
@ -800,6 +882,11 @@ class videv_heating(base_videv):
# time.sleep(value or 0.5) # time.sleep(value or 0.5)
# action = '_'.join(action.split('_')[:-1] + ['release']) # action = '_'.join(action.split('_')[:-1] + ['release'])
# self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action})) # self.mqtt_client.send(self.topic, json.dumps({self.KEY_ACTION: action}))
#
# def warning_state_on(self):
# self.set(self.KEY_BATTERY, 7)
# def warning_state_off(self):
# self.set(self.KEY_BATTERY, 97)
# class remote(base): # class remote(base):

View File

@ -1,6 +1,6 @@
import config import config
from simulation.devices import shelly, silvercrest_powerplug, tradfri_light, my_powerplug, brennenstuhl_heating_valve from simulation.devices import shelly, silvercrest_powerplug, tradfri_light, my_powerplug, brennenstuhl_heating_valve
from simulation.devices import videv_light, videv_heating from simulation.devices import videv_light, videv_heating, videv_warnings
import inspect import inspect
@ -266,3 +266,4 @@ class house(base):
self.ffw = ffw(mqtt_client) self.ffw = ffw(mqtt_client)
self.ffe = ffe(mqtt_client) self.ffe = ffe(mqtt_client)
self.stairway = stairway(mqtt_client) self.stairway = stairway(mqtt_client)
self.warnings = videv_warnings(mqtt_client, config.TOPIC_WARNINGS)

View File

@ -6,6 +6,7 @@ import sys
from tests.all import test_smarthome from tests.all import test_smarthome
# TODO: Extend tests in simulation # TODO: Extend tests in simulation
# - Test: Check of warning messages for battery and overtemperature
# - Switching button functions (gfw_dirk, ffe.sleep) # - Switching button functions (gfw_dirk, ffe.sleep)
# - Brightness button functions (gfw.dirk, ffe.sleep) # - Brightness button functions (gfw.dirk, ffe.sleep)
# - Synch functions of amplifier with spotify, mpd # - Synch functions of amplifier with spotify, mpd

View File

@ -83,7 +83,7 @@ class test_smarthome(object):
system_info[jsonlog.SYSI_USERNAME] = getpass.getuser() system_info[jsonlog.SYSI_USERNAME] = getpass.getuser()
system_info[jsonlog.SYSI_PATH] = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) system_info[jsonlog.SYSI_PATH] = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
self.tcl = report.testSession(['__unittest__', 'unittest', ROOT_LOGGER_NAME]) self.tcl = report.testSession(['__unittest__', ROOT_LOGGER_NAME])
self.tcl[jsonlog.MAIN_KEY_SYSTEM_INFO] = system_info self.tcl[jsonlog.MAIN_KEY_SYSTEM_INFO] = system_info
self.tcl["testcase_names"] = report.TCEL_NAMES self.tcl["testcase_names"] = report.TCEL_NAMES