Initial user interface implemented

This commit is contained in:
Dirk Alders 2023-10-31 18:25:19 +01:00
parent 0f429b2579
commit a6df2c99a4
8 changed files with 239 additions and 84 deletions

View File

@ -7,7 +7,7 @@ from devices.tradfri import sw as tradfri_sw
from devices.tradfri import sw_br as tradfri_sw_br from devices.tradfri import sw_br as tradfri_sw_br
from devices.tradfri import sw_br_ct as tradfri_sw_br_ct from devices.tradfri import sw_br_ct as tradfri_sw_br_ct
tradfri_button = None # TODO: required, when a interface for external device stimulation is available tradfri_button = None
silvercrest_motion_sensor = None silvercrest_motion_sensor = None
audio_status = None audio_status = None
remote = None remote = None
@ -17,7 +17,26 @@ class group(object):
def __init__(self, *args): def __init__(self, *args):
self.device_group = args self.device_group = args
self.topic = self.device_group[0].topic self.topic = self.device_group[0].topic
#
self.user_cmds = {}
for key in self.device_group[0].user_cmds:
self.user_cmds[key] = getattr(self, self.device_group[0].user_cmds[key].__name__)
def power_on_action(self, *args, **kwargs): def power_on_action(self, *args, **kwargs):
for gm in self.device_group: for gm in self.device_group:
gm.power_on_action(*args, **kwargs) gm.power_on_action(*args, **kwargs)
def __getattribute__(self, name):
def group_execution(*args, **kwargs):
for member in self.device_group[:]:
m = getattr(member, name)
m(*args, **kwargs)
try:
rv = super().__getattribute__(name)
except AttributeError:
if callable(getattr(self.device_group[0], name)):
return group_execution
else:
return getattr(self.device_group[0], name)
else:
return rv

View File

@ -46,13 +46,14 @@ import time
} }
""" """
class vlv(base):
# """A tradfri device with switching functionality
# Args: class vlv(base):
# mqtt_client (mqtt.mqtt_client): A MQTT Client instance """A tradfri device with switching functionality
# topic (str): the base topic for this device
# """ Args:
mqtt_client (mqtt.mqtt_client): A MQTT Client instance
topic (str): the base topic for this device
"""
PROPERTIES = [ PROPERTIES = [
"away_mode", "away_mode",
"battery", "battery",
@ -65,6 +66,7 @@ class vlv(base):
"valve_detection", "valve_detection",
"window_detection" "window_detection"
] ]
def __init__(self, mqtt_client, topic, **kwargs): def __init__(self, mqtt_client, topic, **kwargs):
super().__init__(mqtt_client, topic, **kwargs) super().__init__(mqtt_client, topic, **kwargs)
self["away_mode"] = "OFF" self["away_mode"] = "OFF"
@ -82,6 +84,11 @@ class vlv(base):
# #
self.tq = task.threaded_queue() self.tq = task.threaded_queue()
self.tq.run() self.tq.run()
#
cmd_base = self.topic.replace('/', '.') + '.'
self.user_cmds = {
cmd_base + 'setpoint': self.__ui_setpoint__,
}
def set_state(self, value): def set_state(self, value):
self.__set__("state", "on" if value else "off") self.__set__("state", "on" if value else "off")
@ -104,3 +111,12 @@ class vlv(base):
data[key] = self[key] data[key] = self[key]
self.logger.info("Sending status: %s", repr(data)) self.logger.info("Sending status: %s", repr(data))
self.mqtt_client.send(self.topic, json.dumps(data)) self.mqtt_client.send(self.topic, json.dumps(data))
def __ui_setpoint__(self, *args):
try:
value = float(args[0].replace(',', '.'))
except (ValueError, IndexError):
print("You need to give a numeric value as first argument.")
else:
self.__set__("current_heating_setpoint", value)
self.tq.enqueue(1, self.send_device_status, self)

View File

@ -17,14 +17,26 @@ class powerplug(base):
super().__init__(mqtt_client, topic) super().__init__(mqtt_client, topic)
# #
for i in range(0, 4): for i in range(0, 4):
self[self.PROPERTIES[i]] = False self[self.PROPERTIES[i]] = 'false'
self.mqtt_client.add_callback(self.topic + '/output/%d/set' % (i + 1), self.__rx_set__) self.mqtt_client.add_callback(self.topic + '/output/%d/set' % (i + 1), self.__rx_set__)
#
cmd_base = self.topic.replace('/', '.') + '.'
self.user_cmds = {
cmd_base + 'toggle1': self.__ui_toggle_output_0__,
cmd_base + 'toggle2': self.__ui_toggle_output_1__,
cmd_base + 'toggle3': self.__ui_toggle_output_2__,
cmd_base + 'toggle4': self.__ui_toggle_output_3__,
}
def __rx_set__(self, client, userdata, message): def __rx_set__(self, client, userdata, message):
data = message.payload.decode('utf-8') data = message.payload.decode('utf-8')
key = message.topic.split('/')[-3] + '/' + message.topic.split('/')[-2] key = message.topic.split('/')[-3] + '/' + message.topic.split('/')[-2]
self.logger.info("Received set data for %s: %s", key, repr(data)) self.logger.info("Received set data for %s: %s", key, repr(data))
self.__set__(key, data) self.__set__(key, data)
def __set__(self, key, data):
base.__set__(self, key, data)
#
self.send_device_status(key) self.send_device_status(key)
if key.startswith("output/"): if key.startswith("output/"):
if data == "true": if data == "true":
@ -34,3 +46,19 @@ class powerplug(base):
data = self[key] data = self[key]
self.logger.info("Sending status for %s: %s", key, repr(data)) self.logger.info("Sending status for %s: %s", key, repr(data))
self.mqtt_client.send(self.topic + '/' + key, data) self.mqtt_client.send(self.topic + '/' + key, data)
def __ui_toggle_output__(self, num):
key = self.PROPERTIES[num]
self.__set__(key, 'false' if self.get(key).lower() == 'true' else 'true')
def __ui_toggle_output_0__(self, *args):
self.__ui_toggle_output__(0)
def __ui_toggle_output_1__(self, *args):
self.__ui_toggle_output__(1)
def __ui_toggle_output_2__(self, *args):
self.__ui_toggle_output__(2)
def __ui_toggle_output_3__(self, *args):
self.__ui_toggle_output__(3)

View File

@ -43,25 +43,35 @@ class shelly_sw1(base):
topic (str): the base topic for this device topic (str): the base topic for this device
kwargs (**dict): cd_r0=list of devices connected to relay/0 kwargs (**dict): cd_r0=list of devices connected to relay/0
""" """
KEY_OUTPUT_0 = "relay/0"
#
PROPERTIES = [ PROPERTIES = [
"relay/0", KEY_OUTPUT_0,
] ]
def __init__(self, mqtt_client, topic): def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic) super().__init__(mqtt_client, topic)
self["state"] = "off" self[self.KEY_OUTPUT_0] = "off"
# #
self.__auto_off__ = None self.__auto_off__ = None
# #
self.mqtt_client.add_callback(self.topic + '/relay/0/command', self.__rx_set__) self.mqtt_client.add_callback(self.topic + '/' + self.KEY_OUTPUT_0 + '/command', self.__rx_set__)
#
cmd_base = self.topic.replace('/', '.') + '.'
self.user_cmds = {
cmd_base + 'toggle': self.__ui_toggle_output_0__,
}
def __rx_set__(self, client, userdata, message): def __rx_set__(self, client, userdata, message):
data = message.payload.decode('utf-8') data = message.payload.decode('utf-8')
key = message.topic.split('/')[-3] + '/' + message.topic.split('/')[-2] key = message.topic.split('/')[-3] + '/' + message.topic.split('/')[-2]
self.logger.info("Received set data for %s: %s", key, repr(data)) self.logger.info("Received set data for %s: %s", key, repr(data))
self.__set__(key, data) self.__set__(key, data)
def __set__(self, key, data):
base.__set__(self, key, data)
self.send_device_status(key) self.send_device_status(key)
if key == "relay/0": if key == self.KEY_OUTPUT_0:
if data.lower() == "on": if data.lower() == "on":
self.power_on(key) self.power_on(key)
if self.__auto_off__ is not None: if self.__auto_off__ is not None:
@ -79,6 +89,9 @@ class shelly_sw1(base):
self.__auto_off__ = task.delayed(sec, self.__auto_off_func__) self.__auto_off__ = task.delayed(sec, self.__auto_off_func__)
def __auto_off_func__(self): def __auto_off_func__(self):
if self.get(self.PROPERTIES[0]).lower() != 'off': if self.get(self.KEY_OUTPUT_0).lower() != 'off':
self.__set__(self.PROPERTIES[0], "off") self.__set__(self.KEY_OUTPUT_0, "off")
self.send_device_status(self.PROPERTIES[0]) self.send_device_status(self.KEY_OUTPUT_0)
def __ui_toggle_output_0__(self, *args):
self.__set__(self.KEY_OUTPUT_0, 'off' if self.get(self.KEY_OUTPUT_0).lower() == 'on' else 'on')

View File

@ -50,19 +50,25 @@ class sw(base):
mqtt_client (mqtt.mqtt_client): A MQTT Client instance mqtt_client (mqtt.mqtt_client): A MQTT Client instance
topic (str): the base topic for this device topic (str): the base topic for this device
""" """
KEY_OUTPUT_0 = "state"
PROPERTIES = [ PROPERTIES = [
"state", KEY_OUTPUT_0,
] ]
def __init__(self, mqtt_client, topic): def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic) super().__init__(mqtt_client, topic)
self["state"] = "off" self[self.KEY_OUTPUT_0] = "off"
# #
self.mqtt_client.add_callback(self.topic + '/set', self.__rx_set__) self.mqtt_client.add_callback(self.topic + '/set', self.__rx_set__)
self.mqtt_client.add_callback(self.topic + '/get', self.__rx_get__) self.mqtt_client.add_callback(self.topic + '/get', self.__rx_get__)
#
cmd_base = self.topic.replace('/', '.') + '.'
self.user_cmds = {
cmd_base + 'toggle': self.__ui_toggle_output_0__,
}
def set_state(self, value): def set_state(self, value):
self.__set__("state", "on" if value else "off") self.__set__(self.KEY_OUTPUT_0, "on" if value else "off")
self.send_device_status() self.send_device_status()
def __rx_set__(self, client, userdata, message): def __rx_set__(self, client, userdata, message):
@ -71,14 +77,14 @@ class sw(base):
for key in data: for key in data:
self.__set__(key, data[key]) self.__set__(key, data[key])
self.send_device_status() self.send_device_status()
if "state" in data and data.get("state", 'OFF').lower() == "on": if self.KEY_OUTPUT_0 in data and data.get(self.KEY_OUTPUT_0, 'OFF').lower() == "on":
self.power_on("state") self.power_on(self.KEY_OUTPUT_0)
def __rx_get__(self, client, userdata, message): def __rx_get__(self, client, userdata, message):
self.send_device_status() self.send_device_status()
def power_on_action(self): def power_on_action(self):
self["state"] = "on" self[self.KEY_OUTPUT_0] = "on"
self.send_device_status() self.send_device_status()
def send_device_status(self): def send_device_status(self):
@ -86,6 +92,10 @@ class sw(base):
self.logger.info("Sending status: %s", repr(data)) self.logger.info("Sending status: %s", repr(data))
self.mqtt_client.send(self.topic, data) self.mqtt_client.send(self.topic, data)
def __ui_toggle_output_0__(self, *args):
self.__set__(self.KEY_OUTPUT_0, 'off' if self.get(self.KEY_OUTPUT_0).lower() == 'on' else 'on')
self.send_device_status()
class sw_br(sw): class sw_br(sw):
"""A tradfri device with switching and brightness functionality """A tradfri device with switching and brightness functionality

62
home.py Normal file
View File

@ -0,0 +1,62 @@
import devdi.props as props
def functions(pd):
#######
# GFW #
#######
loc = props.LOC_GFW
# DIRK
roo = props.ROO_DIR
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZGW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
sml = pd.get(props.STG_MYA, loc, roo, props.FUN_MPP)
tml = pd.get(props.STG_ZGW, loc, roo, props.FUN_DEL)
sml.register_power_on_instance(tml, sml.PROPERTIES[1])
# FLOOR
roo = props.ROO_FLO
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZGW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
#######
# FFW #
#######
loc = props.LOC_FFW
# JULIAN
roo = props.ROO_JUL
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZFW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
# LIVINGROOM
roo = props.ROO_LIV
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZFW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
# SLEEP
roo = props.ROO_SLP
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZFW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
#######
# FFE #
#######
loc = props.LOC_FFE
# KITCHEN
roo = props.ROO_KIT
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_CIR)
sml.auto_off(600)
# LIVINGROOM
roo = props.ROO_LIV
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZFE, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
# SLEEP
roo = props.ROO_SLP
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZFE, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])

View File

@ -1,14 +1,15 @@
import config import config
import devdi import devdi
import devdi.props as props import home
import logging import logging
import mqtt import mqtt
import os import os
import report import report
import time import user_interface
# TODO: Add some more ui commands for devices
# TODO: Add some test for smart_brain
# TODO: Implementation of missing devices in devices/__init__.py # TODO: Implementation of missing devices in devices/__init__.py
# TODO: Implementation of interface for external device stimulation
logger = logging.getLogger(config.APP_NAME) logger = logging.getLogger(config.APP_NAME)
@ -37,64 +38,17 @@ if __name__ == "__main__":
# #
# Smart Home Functionality # Smart Home Functionality
# #
####### home.functions(pd)
# GFW #
#######
loc = props.LOC_GFW
# DIRK
roo = props.ROO_DIR
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZGW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
sml = pd.get(props.STG_MYA, loc, roo, props.FUN_MPP) #
tml = pd.get(props.STG_ZGW, loc, roo, props.FUN_DEL) # User Interface
sml.register_power_on_instance(tml, sml.PROPERTIES[1]) #
ui = user_interface.user_interface()
# FLOOR # Add the device commands
roo = props.ROO_FLO for d in pd.values():
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL) try:
tml = pd.get(props.STG_ZGW, loc, roo, props.FUN_MAL) for cmd in d.user_cmds:
sml.register_power_on_instance(tml, sml.PROPERTIES[0]) ui.add_command(cmd, d.user_cmds[cmd])
except AttributeError:
####### pass # device seems to have no user commands
# FFW # ui.run()
#######
loc = props.LOC_FFW
# JULIAN
roo = props.ROO_JUL
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZFW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
# LIVINGROOM
roo = props.ROO_LIV
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZFW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
# SLEEP
roo = props.ROO_SLP
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZFW, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
#######
# FFE #
#######
loc = props.LOC_FFE
# KITCHEN
roo = props.ROO_KIT
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_CIR)
sml.auto_off(600)
# LIVINGROOM
roo = props.ROO_LIV
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZFE, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
# SLEEP
roo = props.ROO_SLP
sml = pd.get(props.STG_SHE, loc, roo, props.FUN_MAL)
tml = pd.get(props.STG_ZFE, loc, roo, props.FUN_MAL)
sml.register_power_on_instance(tml, sml.PROPERTIES[0])
while (True):
time.sleep(1)

53
user_interface.py Normal file
View File

@ -0,0 +1,53 @@
import readline
import sys
import time
class user_interface(dict):
def __init__(self):
super().__init__(self)
self.add_command('quit', None)
def run(self):
readline.parse_and_bind("tab: complete")
readline.set_completer(self.completer)
print("\nEnter command: ")
while (True):
userfeedback = input('')
command = userfeedback.split(' ')[0]
if userfeedback == 'quit':
break
else:
self.call_it(userfeedback)
def call_it(self, userfeedback):
args = userfeedback.split(" ")
cmd = args.pop(0)
cb = self.get(cmd)
if callable(cb):
cb(*args)
else:
print("Unknown command \"%s\"" % name)
def add_command(self, name, callback):
self[name] = callback
def reduced_list(self, text):
"""
Create reduced completation list
"""
reduced_list = {}
for cmd in self.keys():
next_dot = cmd[len(text):].find('.')
if next_dot >= 0:
reduced_list[cmd[:len(text) + next_dot + 1]] = None
else:
reduced_list[cmd] = None
return reduced_list.keys()
def completer(self, text, state):
"""
Our custom completer function
"""
options = [x for x in self.reduced_list(text) if x.startswith(text)]
return options[state]