From 6f57eebee5341b2c7bd50a05dc08aafb590aa3e7 Mon Sep 17 00:00:00 2001 From: Dirk Alders Date: Sat, 30 Aug 2025 15:56:37 +0200 Subject: [PATCH] Intial smart devices taken over to submodule --- __init__.py | 6 + base.py | 321 ++++++++++++++++++++++++++++++++++++++++++++++++ brennenstuhl.py | 118 ++++++++++++++++++ hue.py | 118 ++++++++++++++++++ mydevices.py | 262 +++++++++++++++++++++++++++++++++++++++ shelly.py | 238 +++++++++++++++++++++++++++++++++++ silvercrest.py | 148 ++++++++++++++++++++++ tradfri.py | 193 +++++++++++++++++++++++++++++ videv.py | 197 +++++++++++++++++++++++++++++ 9 files changed, 1601 insertions(+) create mode 100644 __init__.py create mode 100644 base.py create mode 100644 brennenstuhl.py create mode 100644 hue.py create mode 100644 mydevices.py create mode 100644 shelly.py create mode 100644 silvercrest.py create mode 100644 tradfri.py create mode 100644 videv.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..eb06e38 --- /dev/null +++ b/__init__.py @@ -0,0 +1,6 @@ +from . import brennenstuhl +from . import hue +from . import mydevices +from . import shelly +from . import silvercrest +from . import tradfri diff --git a/base.py b/base.py new file mode 100644 index 0000000..aef2ceb --- /dev/null +++ b/base.py @@ -0,0 +1,321 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +import json +from mqtt.smarthome import mqtt_base +import task + + +def is_json(data): + try: + json.loads(data) + except json.decoder.JSONDecodeError: + return False + else: + return True + + +class base(mqtt_base): + TX_TOPIC = "set" + TX_VALUE = 0 + TX_DICT = 1 + TX_TYPE = -1 + TX_FILTER_DATA_KEYS = [] + # + RX_KEYS = [] + RX_IGNORE_TOPICS = [] + RX_IGNORE_KEYS = [] + RX_FILTER_DATA_KEYS = [] + # + CFG_DATA = {} + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic, default_values=dict.fromkeys(self.RX_KEYS)) + # data storage + self.__cfg_by_mid__ = None + # initialisations + mqtt_client.add_callback(topic=self.topic, callback=self.receive_callback) + mqtt_client.add_callback(topic=self.topic+"/#", callback=self.receive_callback) + # + self.add_callback(None, None, self.__state_logging__, on_change_only=True) + + def __cfg_callback__(self, key, value, mid): + if self.CFG_DATA.get(key) != value and self.__cfg_by_mid__ != mid and mid is not None: + self.__cfg_by_mid__ = mid + self.logger.warning("Differing configuration identified: Sending default configuration to defice: %s", repr(self.CFG_DATA)) + if self.TX_TYPE == self.TX_DICT: + self.mqtt_client.send(self.topic + '/' + self.TX_TOPIC, json.dumps(self.CFG_DATA)) + else: + for key in self.CFG_DATA: + self.send_command(key, self.CFG_DATA.get(key)) + + def set(self, key, data, mid=None, block_callback=[]): + if key in self.CFG_DATA: + self.__cfg_callback__(key, data, mid) + if key in self.RX_IGNORE_KEYS: + pass # ignore these keys + elif key in self.RX_KEYS: + return super().set(key, data, block_callback) + else: + self.logger.warning("Unexpected key %s", key) + + def receive_callback(self, client, userdata, message): + if message.topic != self.topic + '/' + videv_base.KEY_INFO: + content_key = message.topic[len(self.topic) + 1:] + if content_key not in self.RX_IGNORE_TOPICS and (not message.topic.endswith(self.TX_TOPIC) or len(self.TX_TOPIC) == 0): + self.logger.debug("Unpacking content_key \"%s\" from message.", content_key) + if is_json(message.payload): + data = json.loads(message.payload) + if type(data) is dict: + for key in data: + self.set(key, self.__device_to_instance_filter__(key, data[key]), message.mid) + else: + self.set(content_key, self.__device_to_instance_filter__(content_key, data), message.mid) + # String + else: + self.set(content_key, self.__device_to_instance_filter__(content_key, message.payload.decode('utf-8')), message.mid) + else: + self.logger.debug("Ignoring topic %s", content_key) + + def __device_to_instance_filter__(self, key, data): + if key in self.RX_FILTER_DATA_KEYS: + if data in [1, 'on', 'ON']: + return True + elif data in [0, 'off', 'OFF']: + return False + return data + + def __instance_to_device_filter__(self, key, data): + if key in self.TX_FILTER_DATA_KEYS: + if data is True: + return "on" + elif data is False: + return "off" + return data + + def send_command(self, key, data): + data = self.__instance_to_device_filter__(key, data) + if self.TX_TOPIC is not None: + if self.TX_TYPE < 0: + self.logger.error("Unknown tx type. Set TX_TYPE of class to a known value") + else: + self.logger.debug("Sending data for %s - %s", key, str(data)) + if self.TX_TYPE == self.TX_DICT: + try: + self.mqtt_client.send('/'.join([self.topic, self.TX_TOPIC]), json.dumps({key: data})) + except TypeError: + print(self.topic) + print(key.__dict__) + print(key) + print(data) + raise TypeError + else: + if type(data) not in [str, bytes]: + data = json.dumps(data) + self.mqtt_client.send('/'.join([self.topic, key, self.TX_TOPIC] if len(self.TX_TOPIC) > 0 else [self.topic, key]), data) + else: + self.logger.error("Unknown tx toptic. Set TX_TOPIC of class to a known value") + + +class base_rpc(mqtt_base): + SRC_RESPONSE = "/response" + SRC_NULL = "/null" + # + EVENTS_TOPIC = "/events/rpc" + TX_TOPIC = "/rpc" + RESPONSE_TOPIC = SRC_RESPONSE + "/rpc" + NULL_TOPIC = SRC_NULL + "/rpc" + # + RPC_ID_GET_STATUS = 1 + RPC_ID_SET = 1734 + # + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic, default_values=dict.fromkeys(self.RX_KEYS)) + # data storage + self.__cfg_by_mid__ = None + # initialisations + mqtt_client.add_callback(topic=self.topic, callback=self.receive_callback) + mqtt_client.add_callback(topic=self.topic+"/#", callback=self.receive_callback) + # + self.add_callback(None, None, self.__state_logging__, on_change_only=False) + # + self.rpc_get_status() + + def receive_callback(self, client, userdata, message): + data = json.loads(message.payload) + # + if message.topic == self.topic + self.EVENTS_TOPIC: + self.events(data) + elif message.topic == self.topic + self.RESPONSE_TOPIC: + self.response(data) + elif message.topic == self.topic + self.NULL_TOPIC or message.topic == self.topic + self.TX_TOPIC or message.topic == self.topic + "/online": + pass # Ignore response + else: + self.logger.warning("Unexpected message received: %s::%s", message.topic, json.dumps(data, sort_keys=True, indent=4)) + + def events(self, data): + for rx_key in data["params"]: + if rx_key == "events": + for evt in data["params"]["events"]: + key = evt["component"] + event = evt["event"] + if key in self.RX_KEYS: + if event == "btn_down": + self.set(key, True) + elif event == "btn_up": + self.set(key, False) + else: + key = key + ":" + event + if key in self.RX_KEYS: + self.set(key, True) + else: + self.logger.warning("Unexpected event with data=%s", json.dumps(data, sort_keys=True, indent=4)) + elif rx_key in self.RX_KEYS: + state = data["params"][rx_key].get("output") + if state is not None: + self.set(rx_key, state) + + def response(self, data): + try: + rpc_id = data.get("id") + except AttributeError: + rpc_id = None + try: + rpc_method = data.get("method") + except AttributeError: + rpc_method = None + if rpc_id == self.RPC_ID_GET_STATUS: + # + # Shelly.GetStatus + # + for rx_key in data.get("result", []): + if rx_key in self.RX_KEYS: + key_data = data["result"][rx_key] + state = key_data.get("output", key_data.get("state")) + if state is not None: + self.set(rx_key, state) + else: + self.logger.warning("Unexpected response with data=%s", json.dumps(data, sort_keys=True, indent=4)) + + def rpc_tx(self, **kwargs): + if not "id" in kwargs: + raise AttributeError("'id' is missing in keyword arguments") + self.mqtt_client.send(self.topic + self.TX_TOPIC, json.dumps(kwargs)) + + def rpc_get_status(self): + self.rpc_tx( + id=self.RPC_ID_GET_STATUS, + src=self.topic + self.SRC_RESPONSE, + method="Shelly.GetStatus" + ) + + def rpc_switch_set(self, key, state: bool): + self.rpc_tx( + id=self.RPC_ID_SET, + src=self.topic + self.SRC_NULL, + method="Switch.Set", + params={"id": int(key[-1]), "on": state} + ) + + +class base_output(base): + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + self.__all_off_enabled__ = True + + def disable_all_off(self, state=True): + self.__all_off_enabled__ = not state + + def all_off(self): + if self.__all_off_enabled__: + try: + self.__all_off__() + except (AttributeError, TypeError) as e: + self.logger.warning("Method all_off was used, but __all_off__ method wasn't callable: %s", repr(e)) + + +class videv_base(mqtt_base): + KEY_INFO = '__info__' + # + SET_TOPIC = "set" + + def __init__(self, mqtt_client, topic, default_values=None): + super().__init__(mqtt_client, topic, default_values=default_values) + self.__display_dict__ = {} + self.__control_dict__ = {} + self.__periodic__ = task.periodic(300, self.send_all) + self.__periodic__.run() + + def send_all(self, rt): + try: + for key in self: + if self[key] is not None: + self.__tx__(key, self[key]) + except RuntimeError: + self.logger.warning("Runtimeerror while sending cyclic videv information. This may happen on startup.") + + def add_display(self, my_key, ext_device, ext_key, on_change_only=True): + """ + listen to data changes of ext_device and update videv information + """ + if my_key not in self.keys(): + self[my_key] = None + if ext_device.__class__.__name__ == "group": + # store information to identify callback from ext_device + self.__display_dict__[(id(ext_device[0]), ext_key)] = my_key + # register a callback to listen for data from external device + ext_device[0].add_callback(ext_key, None, self.__rx_ext_device_data__, on_change_only, init_now=True) + else: + # store information to identify callback from ext_device + self.__display_dict__[(id(ext_device), ext_key)] = my_key + # register a callback to listen for data from external device + ext_device.add_callback(ext_key, None, self.__rx_ext_device_data__, on_change_only, init_now=True) + # send initial display data to videv interface + data = ext_device.get(ext_key) + if data is not None: + self.__tx__(my_key, data) + + def __rx_ext_device_data__(self, ext_device, ext_key, data): + my_key = self.__display_dict__[(id(ext_device), ext_key)] + self.set(my_key, data) + self.__tx__(my_key, data) + + def __tx__(self, key, data): + if type(data) not in (str, ): + data = json.dumps(data) + self.mqtt_client.send('/'.join([self.topic, key]), data) + + def add_control(self, my_key, ext_device, ext_key, on_change_only=True): + """ + listen to videv information and pass data to ext_device + """ + if my_key not in self.keys(): + self[my_key] = None + # store information to identify callback from videv + self.__control_dict__[my_key] = (ext_device, ext_key, on_change_only) + # add callback for videv changes + self.mqtt_client.add_callback('/'.join([self.topic, my_key, self.SET_TOPIC]), self.__rx_videv_data__) + + def __rx_videv_data__(self, client, userdata, message): + my_key = message.topic.split('/')[-2] + try: + data = json.loads(message.payload) + except json.decoder.JSONDecodeError: + data = message.payload + ext_device, ext_key, on_change_only = self.__control_dict__[my_key] + if my_key in self.keys(): + if data != self[my_key] or not on_change_only: + ext_device.send_command(ext_key, data) + else: + self.logger.info("Ignoring rx message with topic %s", message.topic) + + def add_routing(self, my_key, ext_device, ext_key, on_change_only_disp=True, on_change_only_videv=True): + """ + listen to data changes of ext_device and update videv information + and + listen to videv information and pass data to ext_device + """ + # add display + self.add_display(my_key, ext_device, ext_key, on_change_only_disp) + self.add_control(my_key, ext_device, ext_key, on_change_only_videv) diff --git a/brennenstuhl.py b/brennenstuhl.py new file mode 100644 index 0000000..426aca8 --- /dev/null +++ b/brennenstuhl.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +from .base import base +import task +import time + + +class brennenstuhl_heatingvalve(base): + """ Communication (MQTT) + + brennenstuhl_heatingvalve { + | "away_mode": ["ON", "OFF"] + | "battery": [0...100] % + | "child_lock": ["LOCK", "UNLOCK"] + | "current_heating_setpoint": [5...30] °C + | "linkquality": [0...255] lqi + | "local_temperature": [numeric] °C + | "preset": ["manual", ...] + | "system_mode": ["heat", ...] + | "valve_detection": ["ON", "OFF"] + | "window_detection": ["ON", "OFF"] + | } + +- set { + "away_mode": ["ON", "OFF", "TOGGLE"] + "child_lock": ["LOCK", "UNLOCK"] + "current_heating_setpoint": [5...30] °C + "preset": ["manual", ...] + "system_mode": ["heat", ...] + "valve_detection": ["ON", "OFF", "TOGGLE"] + "window_detection": ["ON", "OFF", "TOGGLE"] + } + """ + KEY_LINKQUALITY = "linkquality" + KEY_BATTERY = "battery" + KEY_HEATING_SETPOINT = "current_heating_setpoint" + KEY_TEMPERATURE = "local_temperature" + # + KEY_AWAY_MODE = "away_mode" + KEY_CHILD_LOCK = "child_lock" + KEY_PRESET = "preset" + KEY_SYSTEM_MODE = "system_mode" + KEY_VALVE_DETECTION = "valve_detection" + KEY_WINDOW_DETECTION = "window_detection" + # + RETRY_CYCLE_TIME = 2.5 + MAX_TX_RETRIES = 20 + RETRY_TIMEOUT = RETRY_CYCLE_TIME * MAX_TX_RETRIES + # + TX_TYPE = base.TX_DICT + # + RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_HEATING_SETPOINT, KEY_TEMPERATURE] + RX_IGNORE_KEYS = [KEY_AWAY_MODE, KEY_CHILD_LOCK, KEY_PRESET, KEY_SYSTEM_MODE, KEY_VALVE_DETECTION, KEY_WINDOW_DETECTION] + # + CFG_DATA = { + KEY_WINDOW_DETECTION: "ON", + KEY_VALVE_DETECTION: "ON", + KEY_SYSTEM_MODE: "heat", + KEY_PRESET: "manual" + } + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + self.add_callback(self.KEY_HEATING_SETPOINT, None, self.__valave_temp_rx__) + self.__tx_temperature__ = None + self.__rx_temperature__ = None + self.__tx_timestamp__ = 0 + # + self.task = task.periodic(self.RETRY_CYCLE_TIME, self.__task__) + self.task.run() + + def __state_logging__(self, inst, key, data): + if key in [self.KEY_HEATING_SETPOINT, self.KEY_CHILD_LOCK, self.KEY_WINDOW_DETECTION, self.KEY_VALVE_DETECTION]: + self.logger.info("State change of '%s' to '%s'", key, repr(data)) + + def send_command(self, key, data): + if key == self.KEY_HEATING_SETPOINT: + self.__tx_temperature__ = data + self.__tx_timestamp__ = time.time() + base.send_command(self, key, data) + + def __valave_temp_rx__(self, inst, key, data): + if key == self.KEY_HEATING_SETPOINT: + self.__rx_temperature__ = data + + def __task__(self, rt): + if self.__tx_temperature__ is not None and self.__tx_timestamp__ is not None: # Already send a setpoint + if self.__tx_temperature__ != self.__rx_temperature__: # Setpoint and valve feedback are unequal + if time.time() - self.__tx_timestamp__ < self.RETRY_TIMEOUT: # Timeout condition allows resend of setpoint + self.logger.warning("Setpoint not yet acknoledged by device. Sending setpoint again") + self.set_heating_setpoint(self.__tx_temperature__) + return + else: + self.__tx_timestamp__ = None # Disable resend logic, if setpoint and valve setpoint are equal + + # + # RX + # + @property + def linkqulity(self): + return self.get(self.KEY_LINKQUALITY) + + @property + def heating_setpoint(self): + return self.get(self.KEY_HEATING_SETPOINT) + + @property + def temperature(self): + return self.get(self.KEY_TEMPERATURE) + + # + # TX + # + def set_heating_setpoint(self, setpoint): + self.send_command(self.KEY_HEATING_SETPOINT, setpoint) + + def set_heating_setpoint_mcb(self, device, key, data): + self.set_heating_setpoint(data) diff --git a/hue.py b/hue.py new file mode 100644 index 0000000..1ecfef8 --- /dev/null +++ b/hue.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +from .base import base, base_output +import logging + + +class hue_light(base_output): + """ Communication (MQTT) + + hue_light { + | "state": ["ON" / "OFF" / "TOGGLE"] + | "linkquality": [0...255] lqi + | "brightness": [0...254] + | "color_mode": ["color_temp"] + | "color_temp": ["coolest", "cool", "neutral", "warm", "warmest", 250...454] + | } + +- get { + | "state": "" + | } + +- set { + "state": ["ON" / "OFF"] + "brightness": [0...256] + "color_temp": [250...454] + "transition": [0...] seconds + "brightness_move": [-X...0...X] X/s + "brightness_step": [-X...0...X] + "color_temp_move": [-X...0...X] X/s + "color_temp_step": [-X...0...X] + } + """ + KEY_LINKQUALITY = "linkquality" + KEY_OUTPUT_0 = "state" + KEY_BRIGHTNESS = "brightness" + KEY_COLOR_TEMP = "color_temp" + # + TX_TYPE = base.TX_DICT + TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] + STATE_KEYS = TX_FILTER_DATA_KEYS + # + RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] + RX_IGNORE_KEYS = ['update', 'color_mode'] + RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] + + def __state_logging__(self, inst, key, data): + if key in [self.KEY_OUTPUT_0, self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]: + self.logger.info("State change of '%s' to '%s'", key, repr(data)) + + def __device_to_instance_filter__(self, key, data): + if key == self.KEY_BRIGHTNESS: + return int(round((data - 1) * 100 / 253, 0)) + elif key == self.KEY_COLOR_TEMP: + return int(round((data - 250) * 10 / 204, 0)) + return super().__device_to_instance_filter__(key, data) + + def __instance_to_device_filter__(self, key, data): + if key == self.KEY_BRIGHTNESS: + return int(round(data * 253 / 100 + 1, 0)) + elif key == self.KEY_COLOR_TEMP: + return int(round(data * 204 / 10 + 250, 0)) + return super().__instance_to_device_filter__(key, data) + + # + # RX + # + @property + def output_0(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_0, False) + + @property + def linkquality(self): + """rv: numeric value""" + return self.get(self.KEY_LINKQUALITY, 0) + + @property + def brightness(self): + """rv: numeric value [0%, ..., 100%]""" + return self.get(self.KEY_BRIGHTNESS, 0) + + @property + def color_temp(self): + """rv: numeric value [0, ..., 10]""" + return self.get(self.KEY_COLOR_TEMP, 0) + + # + # TX + # + def request_data(self, device=None, key=None, data=None): + self.mqtt_client.send(self.topic + "/set", '{"hue_power_on_behavior": "recover"}') + + def set_output_0(self, state): + """state: [True, False]""" + self.send_command(self.KEY_OUTPUT_0, state) + + def set_output_0_mcb(self, device, key, data): + self.set_output_0(data) + + def toggle_output_0_mcb(self, device, key, data): + self.set_output_0(not self.output_0) + + def set_brightness(self, brightness): + """brightness: [0, ..., 100]""" + self.send_command(self.KEY_BRIGHTNESS, brightness) + + def set_brightness_mcb(self, device, key, data): + self.set_brightness(data) + + def set_color_temp(self, color_temp): + """color_temp: [0, ..., 10]""" + self.send_command(self.KEY_COLOR_TEMP, color_temp) + + def set_color_temp_mcb(self, device, key, data): + self.set_color_temp(data) + + def __all_off__(self): + if self.output_0: + self.set_output_0(False) diff --git a/mydevices.py b/mydevices.py new file mode 100644 index 0000000..f264bdf --- /dev/null +++ b/mydevices.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +from .base import base, base_output +import logging + + +class powerplug(base_output): + """ Communication (MQTT) + + my_powerplug + +- output + +- 1 [True, False] <- status + | +- set [True, False, "toggle"] <- command + +- 2 [True, False] <- status + | +- set [True, False, "toggle"] <- command + +- 3 [True, False] <- status + | +- set [True, False, "toggle"] <- command + +- 4 [True, False] <- status + | +- set [True, False, "toggle"] <- command + +- all + +- set [True, False, "toggle"] <- command + """ + KEY_OUTPUT_0 = "output/1" + KEY_OUTPUT_1 = "output/2" + KEY_OUTPUT_2 = "output/3" + KEY_OUTPUT_3 = "output/4" + KEY_OUTPUT_ALL = "output/all" + KEY_OUTPUT_LIST = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3] + # + TX_TYPE = base.TX_VALUE + # + RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3] + + def __state_logging__(self, inst, key, data): + if key in self.KEY_OUTPUT_LIST: + self.logger.info("State change of '%s' to '%s'", key, repr(data)) + + # + # RX + # + @property + def output_0(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_0) + + @property + def output_1(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_1) + + @property + def output_2(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_2) + + @property + def output_3(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_3) + + # + # TX + # + def set_output(self, key, state): + if key in self.KEY_OUTPUT_LIST: + self.send_command(key, state) + else: + logging.error("Unknown key to set the output!") + + def set_output_0(self, state): + """state: [True, False]""" + self.send_command(self.KEY_OUTPUT_0, state) + + def set_output_0_mcb(self, device, key, data): + self.set_output_0(data) + + def toggle_output_0_mcb(self, device, key, data): + self.set_output_0(not self.output_0) + + def set_output_1(self, state): + """state: [True, False]""" + self.send_command(self.KEY_OUTPUT_1, state) + + def set_output_1_mcb(self, device, key, data): + self.set_output_1(data) + + def toggle_output_1_mcb(self, device, key, data): + self.set_output_1(not self.output_1) + + def set_output_2(self, state): + """state: [True, False]""" + self.send_command(self.KEY_OUTPUT_2, state) + + def set_output_2_mcb(self, device, key, data): + self.set_output_2(data) + + def toggle_output_2_mcb(self, device, key, data): + self.set_output_2(not self.output_2) + + def set_output_3(self, state): + """state: [True, False]""" + self.send_command(self.KEY_OUTPUT_3, state) + + def set_output_3_mcb(self, device, key, data): + self.set_output_3(data) + + def toggle_output_3_mcb(self, device, key, data): + self.set_output_3(not self.output_3) + + def set_output_all(self, state): + """state: [True, False, 'toggle']""" + self.send_command(self.KEY_OUTPUT_ALL, state) + + def set_output_all_mcb(self, device, key, data): + self.set_output_all(data) + + def __all_off__(self): + self.set_output_all(False) + + +class remote(base): + """ Communication (MQTT) + + remote (RAS5) <- command + +- CD [dc] + +- LINE1 [dc] + +- LINE2 [dc] + +- LINE3 [dc] + +- MUTE [dc] + +- POWER [dc] + +- VOLDOWN [dc] + +- VOLUP [dc] + +- PHONO [dc] + +- DOCK [dc] + + remote (EUR642100) <- command + +- OPEN_CLOSE [dc] + +- VOLDOWN [dc] + +- VOLUP [dc] + +- ONE [dc] + +- TWO [dc] + +- THREE [dc] + +- FOUR [dc] + +- FIVE [dc] + +- SIX [dc] + +- SEVEN [dc] + +- EIGHT [dc] + +- NINE [dc] + +- ZERO [dc] + +- TEN [dc] + +- TEN_PLUS [dc] + +- PROGRAM [dc] + +- CLEAR [dc] + +- RECALL [dc] + +- TIME_MODE [dc] + +- A_B_REPEAT [dc] + +- REPEAT [dc] + +- RANDOM [dc] + +- AUTO_CUE [dc] + +- TAPE_LENGTH [dc] + +- SIDE_A_B [dc] + +- TIME_FADE [dc] + +- PEAK_SEARCH [dc] + +- SEARCH_BACK [dc] + +- SEARCH_FOR [dc] + +- TRACK_NEXT [dc] + +- TRACK_PREV [dc] + +- STOP [dc] + +- PAUSE [dc] + +- PLAY [dc] + """ + KEY_CD = "CD" + KEY_LINE1 = "LINE1" + KEY_LINE2 = "LINE2" + KEY_LINE3 = "LINE3" + KEY_PHONO = "PHONO" + KEY_MUTE = "MUTE" + KEY_POWER = "POWER" + KEY_VOLDOWN = "VOLDOWN" + KEY_VOLUP = "VOLUP" + # + TX_TOPIC = '' + TX_TYPE = base.TX_VALUE + # + RX_IGNORE_TOPICS = [KEY_CD, KEY_LINE1, KEY_LINE2, KEY_LINE3, KEY_PHONO, KEY_MUTE, KEY_POWER, KEY_VOLUP, KEY_VOLDOWN] + + def __state_logging__(self, inst, key, data): + pass # This is just a TX device using self.set_* + + def set_cd(self, device=None, key=None, data=None): + self.logger.info("Changing amplifier source to CD") + self.send_command(self.KEY_CD, None) + + def set_line1(self, device=None, key=None, data=None): + self.logger.info("Changing amplifier source to LINE1") + self.send_command(self.KEY_LINE1, None) + + def set_line2(self, device=None, key=None, data=None): + self.logger.info("Changing amplifier source to LINE2") + self.send_command(self.KEY_LINE2, None) + + def set_line3(self, device=None, key=None, data=None): + self.logger.info("Changing amplifier source to LINE3") + self.send_command(self.KEY_LINE3, None) + + def set_phono(self, device=None, key=None, data=None): + self.logger.info("Changing amplifier source to PHONO") + self.send_command(self.KEY_PHONO, None) + + def set_mute(self, device=None, key=None, data=None): + self.logger.info("Muting / Unmuting amplifier") + self.send_command(self.KEY_MUTE, None) + + def set_power(self, device=None, key=None, data=None): + self.logger.info("Power on/off amplifier") + self.send_command(self.KEY_POWER, None) + + def set_volume_up(self, data=False): + """data: [True, False]""" + self.logger.info("Increasing amplifier volume") + self.send_command(self.KEY_VOLUP, data) + + def set_volume_down(self, data=False): + """data: [True, False]""" + self.logger.info("Decreasing amplifier volume") + self.send_command(self.KEY_VOLDOWN, data) + + def default_inc(self, device=None, key=None, data=None): + self.set_volume_up(True) + + def default_dec(self, device=None, key=None, data=None): + self.set_volume_down(True) + + def default_stop(self, device=None, key=None, data=None): + self.set_volume_up(False) + + +class audio_status(base): + """ Communication (MQTT) + + audio_status + +- state [True, False] <- status + +- title [text] <- status + """ + KEY_STATE = "state" + KEY_TITLE = "title" + # + TX_TYPE = base.TX_VALUE + # + RX_KEYS = [KEY_STATE, KEY_TITLE] + + def __state_logging__(self, inst, key, data): + if key in [self.KEY_STATE, self.KEY_TITLE]: + self.logger.info("State change of '%s' to '%s'", key, repr(data)) + + def set_state(self, num, data): + """data: [True, False]""" + self.send_command(self.KEY_STATE + "/" + str(num), data) + + def set_state_mcb(self, device, key, data): + self.set_state(data) diff --git a/shelly.py b/shelly.py new file mode 100644 index 0000000..112c269 --- /dev/null +++ b/shelly.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +from .base import base_output +from .base import base_rpc +import task + + +class shelly(base_output): + """ Communication (MQTT) + + shelly + +- relay + | +- 0 ["on" / "off"] <- status + | | +- command ["on"/ "off"] <- command + | | +- energy [numeric] <- status + | +- 1 ["on" / "off"] <- status + | +- command ["on"/ "off"] <- command + | +- energy [numeric] <- status + +- input + | +- 0 [0 / 1] <- status + | +- 1 [0 / 1] <- status + +- input_event + | +- 0 <- status + | +- 1 <- status + +- logpush + | +- 0 [0 / 1] <- status + | +- 1 [0 / 1] <- status + +- temperature [numeric] °C <- status + +- temperature_f [numeric] F <- status + +- overtemperature [0 / 1] <- status + +- id <- status + +- model <- status + +- mac <- status + +- ip <- status + +- new_fw <- status + +- fw_ver <- status + """ + KEY_OUTPUT_0 = "relay/0" + KEY_OUTPUT_1 = "relay/1" + KEY_INPUT_0 = "input/0" + KEY_INPUT_1 = "input/1" + KEY_LONGPUSH_0 = "longpush/0" + KEY_LONGPUSH_1 = "longpush/1" + KEY_TEMPERATURE = "temperature" + KEY_OVERTEMPERATURE = "overtemperature" + KEY_ID = "id" + KEY_MODEL = "model" + KEY_MAC = "mac" + KEY_IP = "ip" + KEY_NEW_FIRMWARE = "new_fw" + KEY_FIRMWARE_VERSION = "fw_ver" + # + TX_TOPIC = "command" + TX_TYPE = base_output.TX_VALUE + TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1] + # + RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE, + KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION] + RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1'] + RX_IGNORE_KEYS = ['temperature_f'] + RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + # + self.output_key_delayed = None + self.delayed_flash_task = task.delayed(0.75, self.flash_task) + self.delayed_off_task = task.delayed(0.75, self.off_task) + # + self.all_off_requested = False + + def __state_logging__(self, inst, key, data): + if key in [self.KEY_OUTPUT_0, self.KEY_OUTPUT_1]: + self.logger.info("State change of '%s' to '%s'", key, repr(data)) + elif key in [self.KEY_INPUT_0, self.KEY_INPUT_1, self.KEY_LONGPUSH_0, self.KEY_LONGPUSH_1]: + self.logger.info("Input action '%s' with '%s'", key, repr(data)) + + def flash_task(self, *args): + if self.flash_active: + self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed)) + self.output_key_delayed = None + if self.all_off_requested: + self.delayed_off_task.run() + + def off_task(self, *args): + self.all_off() + + @property + def flash_active(self): + return self.output_key_delayed is not None + + # + # RX + # + @property + def output_0(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_0) + + @property + def output_1(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_1) + + @property + def input_0(self): + """rv: [True, False]""" + return self.get(self.KEY_INPUT_0) + + @property + def input_1(self): + """rv: [True, False]""" + return self.get(self.KEY_INPUT_1) + + @property + def longpush_0(self): + """rv: [True, False]""" + return self.get(self.KEY_LONGPUSH_0) + + @property + def longpush_1(self): + """rv: [True, False]""" + return self.get(self.KEY_LONGPUSH_1) + + @property + def temperature(self): + """rv: numeric value""" + return self.get(self.KEY_TEMPERATURE) + + # + # TX + # + def set_output_0(self, state): + """state: [True, False]""" + self.send_command(self.KEY_OUTPUT_0, state) + + def set_output_0_mcb(self, device, key, data): + self.set_output_0(data) + + def toggle_output_0_mcb(self, device, key, data): + self.set_output_0(not self.output_0) + + def set_output_1(self, state): + """state: [True, False]""" + self.send_command(self.KEY_OUTPUT_1, state) + + def set_output_1_mcb(self, device, key, data): + self.set_output_1(data) + + def toggle_output_1_mcb(self, device, key, data): + self.set_output_1(not self.output_1) + + def flash_0_mcb(self, device, key, data): + self.output_key_delayed = self.KEY_OUTPUT_0 + self.toggle_output_0_mcb(device, key, data) + self.delayed_flash_task.run() + + def flash_1_mcb(self, device, key, data): + self.output_key_delayed = self.KEY_OUTPUT_1 + self.toggle_output_1_mcb(device, key, data) + self.delayed_flash_task.run() + + def __all_off__(self): + if self.flash_active: + self.all_off_requested = True + else: + if self.output_0: + self.set_output_0(False) + if self.output_1: + self.set_output_1(False) + + +class shelly_rpc(base_rpc): + KEY_OUTPUT_0 = "switch:0" + KEY_OUTPUT_1 = "switch:1" + KEY_OUTPUT_2 = "switch:2" + KEY_INPUT_0 = "input:0" + KEY_INPUT_1 = "input:1" + KEY_INPUT_2 = "input:2" + KEY_LONGPUSH_0 = "input:0:long_push" + KEY_LONGPUSH_1 = "input:1:long_push" + KEY_LONGPUSH_2 = "input:2:long_push" + KEY_SINGLEPUSH_0 = "input:0:single_push" + KEY_SINGLEPUSH_1 = "input:1:single_push" + KEY_SINGLEPUSH_2 = "input:2:single_push" + KEY_DOUBLEPUSH_0 = "input:0:double_push" + KEY_DOUBLEPUSH_1 = "input:1:double_push" + KEY_DOUBLEPUSH_2 = "input:2:double_push" + KEY_TRIPLEPUSH_0 = "input:0:triple_push" + KEY_TRIPLEPUSH_1 = "input:1:triple_push" + KEY_TRIPLEPUSH_2 = "input:2:triple_push" + + RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_INPUT_0, KEY_INPUT_1, KEY_INPUT_2, + KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_LONGPUSH_2, KEY_SINGLEPUSH_0, KEY_SINGLEPUSH_1, KEY_SINGLEPUSH_2, + KEY_DOUBLEPUSH_0, KEY_DOUBLEPUSH_1, KEY_DOUBLEPUSH_2, KEY_TRIPLEPUSH_0, KEY_TRIPLEPUSH_1, KEY_TRIPLEPUSH_2] + + def __state_logging__(self, inst, key, data): + if key in [self.KEY_OUTPUT_0, self.KEY_OUTPUT_1, self.KEY_OUTPUT_2]: + self.logger.info("State change of '%s' to '%s'", key, repr(data)) + elif key in [self.KEY_INPUT_0, self.KEY_INPUT_1, self.KEY_INPUT_2]: + self.logger.info("Input action '%s' with '%s'", key, repr(data)) + elif key in [self.KEY_LONGPUSH_0, self.KEY_LONGPUSH_1, self.KEY_LONGPUSH_2, + self.KEY_SINGLEPUSH_0, self.KEY_SINGLEPUSH_1, self.KEY_SINGLEPUSH_2, + self.KEY_DOUBLEPUSH_0, self.KEY_DOUBLEPUSH_1, self.KEY_DOUBLEPUSH_2, + self.KEY_TRIPLEPUSH_0, self.KEY_TRIPLEPUSH_1, self.KEY_TRIPLEPUSH_2]: + self.logger.info("Input action '%s'", key) + + def set_output_0(self, state): + """state: [True, False]""" + self.rpc_switch_set(self.KEY_OUTPUT_0, state) + + def set_output_0_mcb(self, device, key, data): + self.set_output_0(data) + + def toggle_output_0_mcb(self, device, key, data): + self.set_output_0(not self.get(self.KEY_OUTPUT_0)) + + def set_output_1(self, state): + """state: [True, False]""" + self.rpc_switch_set(self.KEY_OUTPUT_1, state) + + def set_output_1_mcb(self, device, key, data): + self.set_output_1(data) + + def toggle_output_1_mcb(self, device, key, data): + print(self.get(self.KEY_OUTPUT_1)) + self.set_output_1(not self.get(self.KEY_OUTPUT_1)) + + def set_output_2(self, state): + """state: [True, False]""" + self.rpc_switch_set(self.KEY_OUTPUT_2, state) + + def set_output_2_mcb(self, device, key, data): + self.set_output_2(data) + + def toggle_output_2_mcb(self, device, key, data): + self.set_output_2(not self.get(self.KEY_OUTPUT_2)) diff --git a/silvercrest.py b/silvercrest.py new file mode 100644 index 0000000..5af024b --- /dev/null +++ b/silvercrest.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +from .base import base, base_output +import logging + + +class silvercrest_button(base): + """ Communication (MQTT) + + tradfri_button { + "action": ["pressed"] + "battery": [0...100] % + "battery_low": [true | false] + "tamper": [true | false] + "linkquality": [0...255] lqi + "update": [] + } + """ + ACTION_PRESSED = "pressed" + # + KEY_LINKQUALITY = "linkquality" + KEY_BATTERY = "battery" + KEY_BATTERY_LOW = "battery_low" + KEY_TAMPER = "tamper" + KEY_ACTION = "action" + # + RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_ACTION, KEY_BATTERY_LOW, KEY_TAMPER] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def __state_logging__(self, inst, key, data): + if key == self.KEY_ACTION: + self.logger.info("Input '%s' with '%s'", key, repr(data)) + self[self.KEY_ACTION] = None + elif key in [self.KEY_BATTERY_LOW, self.KEY_TAMPER]: + self.logger.info("Input '%s' with '%s'", key, repr(data)) + + # + # RX + # + @property + def action(self): + """rv: action_txt""" + return self.get(self.KEY_ACTION) + + +class silvercrest_powerplug(base_output): + """ Communication (MQTT) + + silvercrest_powerplug { + | "state": ["ON" / "OFF"] + | "linkquality": [0...255] lqi + | } + +- get { + | "state": "" + | } + +- set { + "state": ["ON" / "OFF"] + } + """ + KEY_LINKQUALITY = "linkquality" + KEY_OUTPUT_0 = "state" + # + TX_TYPE = base.TX_DICT + TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0] + # + RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0] + RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0] + + def __state_logging__(self, inst, key, data): + if key in [self.KEY_OUTPUT_0]: + self.logger.info("State change of '%s' to '%s'", key, repr(data)) + + # + # RX + # + @property + def output_0(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_0) + + @property + def linkquality(self): + """rv: numeric value""" + return self.get(self.KEY_LINKQUALITY) + + # + # TX + # + def set_output_0(self, state): + """state: [True, False]""" + self.send_command(self.KEY_OUTPUT_0, state) + + def set_output_0_mcb(self, device, key, data): + self.set_output_0(data) + + def toggle_output_0_mcb(self, device, key, data): + self.set_output_0(not self.output_0) + + def __all_off__(self): + if self.output_0: + self.set_output_0(False) + + +class silvercrest_motion_sensor(base): + """ Communication (MQTT) + + silvercrest_motion_sensor { + battery: [0...100] % + battery_low: [True, False] + linkquality: [0...255] lqi + occupancy: [True, False] + tamper: [True, False] + voltage: [0...] mV + } + """ + KEY_BATTERY = "battery" + KEY_BATTERY_LOW = "battery_low" + KEY_LINKQUALITY = "linkquality" + KEY_OCCUPANCY = "occupancy" + KEY_UNMOUNTED = "tamper" + KEY_VOLTAGE = "voltage" + # + TX_TYPE = base.TX_DICT + # + RX_KEYS = [KEY_BATTERY, KEY_BATTERY_LOW, KEY_LINKQUALITY, KEY_OCCUPANCY, KEY_UNMOUNTED, KEY_VOLTAGE] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def __state_logging__(self, inst, key, data): + if key in [self.KEY_OCCUPANCY, self.KEY_UNMOUNTED]: + self.logger.info("State change of '%s' to '%s'", key, repr(data)) + + # + # RX + # + @property + def linkquality(self): + """rv: numeric value""" + return self.get(self.KEY_LINKQUALITY) + + @property + def battery(self): + """rv: numeric value""" + return self.get(self.KEY_BATTERY) diff --git a/tradfri.py b/tradfri.py new file mode 100644 index 0000000..9426769 --- /dev/null +++ b/tradfri.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +from .base import base, base_output +import logging + + +class tradfri_light(base_output): + """ Communication (MQTT) + + tradfri_light { + | "state": ["ON" / "OFF" / "TOGGLE"] + | "linkquality": [0...255] lqi + | "brightness": [0...254] + | "color_mode": ["color_temp"] + | "color_temp": ["coolest", "cool", "neutral", "warm", "warmest", 250...454] + | "color_temp_startup": ["coolest", "cool", "neutral", "warm", "warmest", "previous", 250...454] + | "update": [] + | } + +- get { + | "state": "" + | } + +- set { + "state": ["ON" / "OFF"] + "brightness": [0...256] + "color_temp": [250...454] + "transition": [0...] seconds + "brightness_move": [-X...0...X] X/s + "brightness_step": [-X...0...X] + "color_temp_move": [-X...0...X] X/s + "color_temp_step": [-X...0...X] + } + """ + KEY_LINKQUALITY = "linkquality" + KEY_OUTPUT_0 = "state" + KEY_BRIGHTNESS = "brightness" + KEY_COLOR_TEMP = "color_temp" + KEY_BRIGHTNESS_FADE = "brightness_move" + # + TX_TYPE = base.TX_DICT + TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP, KEY_BRIGHTNESS_FADE] + # + RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] + RX_IGNORE_KEYS = ['update', 'color_mode', 'color_temp_startup'] + RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP] + + def __state_logging__(self, inst, key, data): + if key in [self.KEY_OUTPUT_0, self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP, self.KEY_BRIGHTNESS_FADE]: + self.logger.info("State change of '%s' to '%s'", key, repr(data)) + + def __device_to_instance_filter__(self, key, data): + if key == self.KEY_BRIGHTNESS: + return int(round((data - 1) * 100 / 253, 0)) + elif key == self.KEY_COLOR_TEMP: + return int(round((data - 250) * 10 / 204, 0)) + return super().__device_to_instance_filter__(key, data) + + def __instance_to_device_filter__(self, key, data): + if key == self.KEY_BRIGHTNESS: + return int(round(data * 253 / 100 + 1, 0)) + elif key == self.KEY_COLOR_TEMP: + return int(round(data * 204 / 10 + 250, 0)) + return super().__instance_to_device_filter__(key, data) + + # + # RX + # + @property + def output_0(self): + """rv: [True, False]""" + return self.get(self.KEY_OUTPUT_0, False) + + @property + def linkquality(self): + """rv: numeric value""" + return self.get(self.KEY_LINKQUALITY, 0) + + @property + def brightness(self): + """rv: numeric value [0%, ..., 100%]""" + return self.get(self.KEY_BRIGHTNESS, 0) + + @property + def color_temp(self): + """rv: numeric value [0, ..., 10]""" + return self.get(self.KEY_COLOR_TEMP, 0) + + # + # TX + # + def request_data(self, device=None, key=None, data=None): + self.mqtt_client.send(self.topic + "/get", '{"%s": ""}' % self.KEY_OUTPUT_0) + + def set_output_0(self, state): + """state: [True, False]""" + self.send_command(self.KEY_OUTPUT_0, state) + + def set_output_0_mcb(self, device, key, data): + self.set_output_0(data) + + def toggle_output_0_mcb(self, device, key, data): + self.set_output_0(not self.output_0) + + def set_brightness(self, brightness): + """brightness: [0, ..., 100]""" + self.send_command(self.KEY_BRIGHTNESS, brightness) + + def set_brightness_mcb(self, device, key, data): + self.set_brightness(data) + + def default_inc(self, speed=40): + self.send_command(self.KEY_BRIGHTNESS_FADE, speed) + + def default_dec(self, speed=-40): + self.default_inc(speed) + + def default_stop(self): + self.default_inc(0) + + def set_color_temp(self, color_temp): + """color_temp: [0, ..., 10]""" + self.send_command(self.KEY_COLOR_TEMP, color_temp) + self.mqtt_client.send('/'.join([self.topic, self.TX_TOPIC]), '{"color_temp_startup": "previous"}') + + def set_color_temp_mcb(self, device, key, data): + self.set_color_temp(data) + + def __all_off__(self): + if self.output_0: + self.set_output_0(False) + + +class tradfri_button(base): + """ Communication (MQTT) + + tradfri_button { + "action": [ + "arrow_left_click", + "arrow_left_hold", + "arrow_left_release", + "arrow_right_click", + "arrow_right_hold", + "arrow_right_release", + "brightness_down_click", + "brightness_down_hold", + "brightness_down_release", + "brightness_up_click", + "brightness_up_hold", + "brightness_up_release", + "toggle" + ] + "action_duration": [0...] s + "battery": [0...100] % + "linkquality": [0...255] lqi + "update": [] + } + """ + ACTION_TOGGLE = "toggle" + ACTION_BRIGHTNESS_UP = "brightness_up_click" + ACTION_BRIGHTNESS_DOWN = "brightness_down_click" + ACTION_RIGHT = "arrow_right_click" + ACTION_LEFT = "arrow_left_click" + ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" + ACTION_BRIGHTNESS_UP_RELEASE = "brightness_up_release" + ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" + ACTION_BRIGHTNESS_DOWN_RELEASE = "brightness_down_release" + ACTION_RIGHT_LONG = "arrow_right_hold" + ACTION_RIGHT_RELEASE = "arrow_right_release" + ACTION_LEFT_LONG = "arrow_left_hold" + ACTION_LEFT_RELEASE = "arrow_left_release" + # + KEY_LINKQUALITY = "linkquality" + KEY_BATTERY = "battery" + KEY_ACTION = "action" + KEY_ACTION_DURATION = "action_duration" + # + RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_ACTION] + RX_IGNORE_KEYS = ['update', KEY_ACTION_DURATION] + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def __state_logging__(self, inst, key, data): + if key in [self.KEY_ACTION]: + self.logger.info("Input '%s' with '%s'", key, repr(data)) + + # + # RX + # + @property + def action(self): + """rv: action_txt""" + return self.get(self.KEY_ACTION) diff --git a/videv.py b/videv.py new file mode 100644 index 0000000..c9006af --- /dev/null +++ b/videv.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +""" +Virtual Device(s) + +Targets: + * MQTT-Interface to control joined devices as one virtual device + * Primary signal routing + * No functionality should be implemented here +""" + +from .base import videv_base +import logging + + +try: + from config import APP_NAME as ROOT_LOGGER_NAME +except ImportError: + ROOT_LOGGER_NAME = 'root' +logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) + + +class videv_pure_switch(videv_base): + KEY_STATE = 'state' + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + self[self.KEY_STATE] = False + # + self.mqtt_client.add_callback(self.topic + '/state/set', self.__state__) + + def __state__(self, mqtt_client, userdata, message): + self.set(self.KEY_STATE, message.payload == b'true') + self.__tx__(self.KEY_STATE, message.payload == b'true') + + +class videv_switching(videv_base): + KEY_STATE = 'state' + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def connect_sw_device(self, sw_device, sw_key): + self.add_routing(self.KEY_STATE, sw_device, sw_key) + + +class videv_switching_timer(videv_switching): + KEY_TIMER = 'timer' + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def connect_tm_device(self, tm_device, tm_key): + self.add_display(self.KEY_TIMER, tm_device, tm_key) + + +class videv_switching_motion(videv_switching): + KEY_STATE = 'state' + # + KEY_TIMER = 'timer' + KEY_MOTION_SENSOR = 'motion_%d' + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def connect_mo_function(self, mo_function): + self.add_display(self.KEY_TIMER, mo_function, mo_function.KEY_TIMER) + # motion sensor state + for index, motion_sensor in enumerate(mo_function.motion_sensors): + self.add_display(self.KEY_MOTION_SENSOR % index, motion_sensor, motion_sensor.KEY_OCCUPANCY) + + +class videv_switch_brightness(videv_switching): + KEY_BRIGHTNESS = 'brightness' + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def connect_br_device(self, br_device, br_key): + self.add_routing(self.KEY_BRIGHTNESS, br_device, br_key) + + +class videv_switch_brightness_color_temp(videv_switch_brightness): + KEY_COLOR_TEMP = 'color_temp' + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def connect_ct_device(self, ct_device, ct_key): + self.add_routing(self.KEY_COLOR_TEMP, ct_device, ct_key) + + +class videv_heating(videv_base): + KEY_USER_TEMPERATURE_SETPOINT = 'user_temperature_setpoint' + KEY_VALVE_TEMPERATURE_SETPOINT = 'valve_temperature_setpoint' + KEY_AWAY_MODE = 'away_mode' + KEY_SUMMER_MODE = 'summer_mode' + KEY_START_BOOST = 'start_boost' + KEY_SET_DEFAULT_TEMPERATURE = 'set_default_temperature' + KEY_BOOST_TIMER = 'boost_timer' + # + KEY_TEMPERATURE = 'temperature' + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def connect_heating_function(self, heating_function): + # + self.add_routing(self.KEY_USER_TEMPERATURE_SETPOINT, heating_function, heating_function.KEY_USER_TEMPERATURE_SETPOINT) + self.add_routing(self.KEY_AWAY_MODE, heating_function, heating_function.KEY_AWAY_MODE) + self.add_routing(self.KEY_SUMMER_MODE, heating_function, heating_function.KEY_SUMMER_MODE) + # + self.add_control(self.KEY_START_BOOST, heating_function, heating_function.KEY_START_BOOST, False) + self.add_control(self.KEY_SET_DEFAULT_TEMPERATURE, heating_function, heating_function.KEY_SET_DEFAULT_TEMPERATURE, False) + # + self.add_display(self.KEY_VALVE_TEMPERATURE_SETPOINT, heating_function, heating_function.KEY_TEMPERATURE_SETPOINT) + self.add_display(self.KEY_BOOST_TIMER, heating_function, heating_function.KEY_BOOST_TIMER) + self.add_display(self.KEY_TEMPERATURE, heating_function, heating_function.KEY_TEMPERATURE_CURRENT, False) + + +class videv_multistate(videv_base): + KEY_STATE = 'state_%d' + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + + def connect_br_function(self, device, key_for_device, num_states): + self.num_states = num_states + # send default values + for i in range(0, num_states): + self.__tx__(self.KEY_STATE % i, False) + # + device.add_callback(key_for_device, None, self.__index_rx__, True) + + def __index_rx__(self, device, key, data): + for i in range(0, self.num_states): + self.__tx__(self.KEY_STATE % i, i == data) + + +class videv_audio_player(videv_base): + KEY_ACTIVE_PLAYER = 'player_%d' + KEY_TITLE = 'title' + NO_TITLE = '---' + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic) + self.__device_cnt__ = 0 + + def connect_audio_device(self, device): + self.add_display(self.KEY_ACTIVE_PLAYER % self.__device_cnt__, device, device.KEY_STATE) + device.add_callback(device.KEY_TITLE, None, self.__title_rx__, True) + self.__device_cnt__ += 1 + + def __title_rx__(self, device, key, data): + self.__tx__(self.KEY_TITLE, data or self.NO_TITLE) + + +class all_off(videv_base): + def __init__(self, mqtt_client, topic, room_collection): + super().__init__(mqtt_client, topic) + # init __inst_dict__ + self.__inst_dict__ = {} + self.__add_instances__("all", room_collection) + # register mqtt callbacks for all my keys + for key in self.__inst_dict__: + all_off_topic = "/".join([topic, key]) + logger.info("Addin all_off callback with topic %s", repr(all_off_topic)) + mqtt_client.add_callback(all_off_topic, self.all_off) + + def __check_inst_capabilities__(self, name, inst): + # fits to specified classes + try: + # all_off method is callable + return callable(inst.all_off) + except AttributeError: + # all_off method does not exist + return False + + def __add_instances__(self, name, inst, level=0): + if self.__check_inst_capabilities__(name, inst): + # add given instance to my __inst_dict__ + self.__inst_dict__[name] = inst + # iterate over all attribute names of instance + for sub_name in dir(inst): + # attribute name is not private + if not sub_name.startswith("__"): + sub = getattr(inst, sub_name) + # recurse with this object + if level == 0: + self.__add_instances__(sub_name, sub, level=level+1) + else: + self.__add_instances__(name + "/" + sub_name, sub, level=level+1) + + def all_off(self, client, userdata, message): + key = message.topic[len(self.topic) + 1:] + self.__inst_dict__[key].all_off()