From fd546ffcc3c4a285ba41a6f27788d44285f03a5c Mon Sep 17 00:00:00 2001 From: Dirk Alders Date: Wed, 25 Dec 2024 22:11:37 +0100 Subject: [PATCH] Shelly pro3 added --- devices/__init__.py | 1 + devices/base.py | 103 +++++++++++++++++++++++++++++++++++++++++++- devices/shelly.py | 69 ++++++++++++++++++++++++++++- 3 files changed, 171 insertions(+), 2 deletions(-) diff --git a/devices/__init__.py b/devices/__init__.py index 03d6553..d192291 100644 --- a/devices/__init__.py +++ b/devices/__init__.py @@ -32,6 +32,7 @@ except ImportError: ROOT_LOGGER_NAME = 'root' from devices.shelly import shelly as shelly_sw1 +from devices.shelly import shelly_rpc as shelly_pro3 from devices.tradfri import tradfri_light as tradfri_sw from devices.tradfri import tradfri_light as tradfri_sw_br from devices.tradfri import tradfri_light as tradfri_sw_br_ct diff --git a/devices/base.py b/devices/base.py index 90047e8..1e4746e 100644 --- a/devices/base.py +++ b/devices/base.py @@ -4,7 +4,6 @@ from base import mqtt_base from base import videv_base import json -import time def is_json(data): @@ -118,6 +117,108 @@ class base(mqtt_base): self.logger.error("Unknown tx toptic. Set TX_TOPIC of class to a known value") +class base_rpc(mqtt_base): + SRC_RESPONSE = "/response" + SRC_NULL = "/null" + # + EVENTS_TOPIC = "/events/rpc" + TX_TOPIC = "/rpc" + RESPONSE_TOPIC = SRC_RESPONSE + "/rpc" + NULL_TOPIC = SRC_NULL + "/rpc" + # + RPC_ID_GET_STATUS = 1 + RPC_ID_SET = 1734 + # + + def __init__(self, mqtt_client, topic): + super().__init__(mqtt_client, topic, default_values=dict.fromkeys(self.RX_KEYS)) + # data storage + self.__cfg_by_mid__ = None + # initialisations + mqtt_client.add_callback(topic=self.topic, callback=self.receive_callback) + mqtt_client.add_callback(topic=self.topic+"/#", callback=self.receive_callback) + # + self.add_callback(None, None, self.__state_logging__, on_change_only=False) + # + self.rpc_get_status() + + def receive_callback(self, client, userdata, message): + data = json.loads(message.payload) + # + if message.topic == self.topic + self.EVENTS_TOPIC: + self.events(data) + elif message.topic == self.topic + self.RESPONSE_TOPIC: + self.response(data) + elif message.topic == self.topic + self.NULL_TOPIC or message.topic == self.topic + self.TX_TOPIC or message.topic == self.topic + "/online": + pass # Ignore response + else: + self.logger.warning("Unexpected message received: %s::%s", message.topic, json.dumps(data, sort_keys=True, indent=4)) + + def events(self, data): + for rx_key in data["params"]: + if rx_key == "events": + for evt in data["params"]["events"]: + key = evt["component"] + event = evt["event"] + if key in self.RX_KEYS: + if event == "btn_down": + self.set(key, True) + elif event == "btn_up": + self.set(key, False) + else: + key = key + ":" + event + if key in self.RX_KEYS: + self.set(key, True) + else: + self.logger.warning("Unexpected event with data=%s", json.dumps(data, sort_keys=True, indent=4)) + elif rx_key in self.RX_KEYS: + state = data["params"][rx_key].get("output") + if state is not None: + self.set(rx_key, state) + + def response(self, data): + try: + rpc_id = data.get("id") + except AttributeError: + rpc_id = None + try: + rpc_method = data.get("method") + except AttributeError: + rpc_method = None + if rpc_id == self.RPC_ID_GET_STATUS: + # + # Shelly.GetStatus + # + for rx_key in data.get("result", []): + if rx_key in self.RX_KEYS: + key_data = data["result"][rx_key] + state = key_data.get("output", key_data.get("state")) + if state is not None: + self.set(rx_key, state) + else: + self.logger.warning("Unexpected response with data=%s", json.dumps(data, sort_keys=True, indent=4)) + + def rpc_tx(self, **kwargs): + if not "id" in kwargs: + raise AttributeError("'id' is missing in keyword arguments") + self.mqtt_client.send(self.topic + self.TX_TOPIC, json.dumps(kwargs)) + + def rpc_get_status(self): + self.rpc_tx( + id=self.RPC_ID_GET_STATUS, + src=self.topic + self.SRC_RESPONSE, + method="Shelly.GetStatus" + ) + + def rpc_switch_set(self, key, state: bool): + self.rpc_tx( + id=self.RPC_ID_SET, + src=self.topic + self.SRC_NULL, + method="Switch.Set", + params={"id": int(key[-1]), "on": state} + ) + + class base_output(base): def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) diff --git a/devices/shelly.py b/devices/shelly.py index 12c2495..ff7fc94 100644 --- a/devices/shelly.py +++ b/devices/shelly.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- # from devices.base import base_output -import logging +from devices.base import base_rpc import task @@ -169,3 +169,70 @@ class shelly(base_output): self.set_output_0(False) if self.output_1: self.set_output_1(False) + + +class shelly_rpc(base_rpc): + KEY_OUTPUT_0 = "switch:0" + KEY_OUTPUT_1 = "switch:1" + KEY_OUTPUT_2 = "switch:2" + KEY_INPUT_0 = "input:0" + KEY_INPUT_1 = "input:1" + KEY_INPUT_2 = "input:2" + KEY_LONGPUSH_0 = "input:0:long_push" + KEY_LONGPUSH_1 = "input:1:long_push" + KEY_LONGPUSH_2 = "input:2:long_push" + KEY_SINGLEPUSH_0 = "input:0:single_push" + KEY_SINGLEPUSH_1 = "input:1:single_push" + KEY_SINGLEPUSH_2 = "input:2:single_push" + KEY_DOUBLEPUSH_0 = "input:0:double_push" + KEY_DOUBLEPUSH_1 = "input:1:double_push" + KEY_DOUBLEPUSH_2 = "input:2:double_push" + KEY_TRIPLEPUSH_0 = "input:0:triple_push" + KEY_TRIPLEPUSH_1 = "input:1:triple_push" + KEY_TRIPLEPUSH_2 = "input:2:triple_push" + + RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_INPUT_0, KEY_INPUT_1, KEY_INPUT_2, + KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_LONGPUSH_2, KEY_SINGLEPUSH_0, KEY_SINGLEPUSH_1, KEY_SINGLEPUSH_2, + KEY_DOUBLEPUSH_0, KEY_DOUBLEPUSH_1, KEY_DOUBLEPUSH_2, KEY_TRIPLEPUSH_0, KEY_TRIPLEPUSH_1, KEY_TRIPLEPUSH_2] + + def __state_logging__(self, inst, key, data): + if key in [self.KEY_OUTPUT_0, self.KEY_OUTPUT_1, self.KEY_OUTPUT_2]: + self.logger.info("State change of '%s' to '%s'", key, repr(data)) + elif key in [self.KEY_INPUT_0, self.KEY_INPUT_1, self.KEY_INPUT_2]: + self.logger.info("Input action '%s' with '%s'", key, repr(data)) + elif key in [self.KEY_LONGPUSH_0, self.KEY_LONGPUSH_1, self.KEY_LONGPUSH_2, + self.KEY_SINGLEPUSH_0, self.KEY_SINGLEPUSH_1, self.KEY_SINGLEPUSH_2, + self.KEY_DOUBLEPUSH_0, self.KEY_DOUBLEPUSH_1, self.KEY_DOUBLEPUSH_2, + self.KEY_TRIPLEPUSH_0, self.KEY_TRIPLEPUSH_1, self.KEY_TRIPLEPUSH_2]: + self.logger.info("Input action '%s'", key) + + def set_output_0(self, state): + """state: [True, False]""" + self.rpc_switch_set(self.KEY_OUTPUT_0, state) + + def set_output_0_mcb(self, device, key, data): + self.set_output_0(data) + + def toggle_output_0_mcb(self, device, key, data): + self.set_output_0(not self.get(self.KEY_OUTPUT_0)) + + def set_output_1(self, state): + """state: [True, False]""" + self.rpc_switch_set(self.KEY_OUTPUT_1, state) + + def set_output_1_mcb(self, device, key, data): + self.set_output_1(data) + + def toggle_output_1_mcb(self, device, key, data): + print(self.get(self.KEY_OUTPUT_1)) + self.set_output_1(not self.get(self.KEY_OUTPUT_1)) + + def set_output_2(self, state): + """state: [True, False]""" + self.rpc_switch_set(self.KEY_OUTPUT_2, state) + + def set_output_2_mcb(self, device, key, data): + self.set_output_2(data) + + def toggle_output_2_mcb(self, device, key, data): + self.set_output_2(not self.get(self.KEY_OUTPUT_2))