322 lines
13 KiB
Python
322 lines
13 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
import json
|
|
from mqtt.smarthome import mqtt_base
|
|
import task
|
|
|
|
|
|
def is_json(data):
|
|
try:
|
|
json.loads(data)
|
|
except json.decoder.JSONDecodeError:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
class base(mqtt_base):
|
|
TX_TOPIC = "set"
|
|
TX_VALUE = 0
|
|
TX_DICT = 1
|
|
TX_TYPE = -1
|
|
TX_FILTER_DATA_KEYS = []
|
|
#
|
|
RX_KEYS = []
|
|
RX_IGNORE_TOPICS = []
|
|
RX_IGNORE_KEYS = []
|
|
RX_FILTER_DATA_KEYS = []
|
|
#
|
|
CFG_DATA = {}
|
|
|
|
def __init__(self, mqtt_client, topic):
|
|
super().__init__(mqtt_client, topic, default_values=dict.fromkeys(self.RX_KEYS))
|
|
# data storage
|
|
self.__cfg_by_mid__ = None
|
|
# initialisations
|
|
mqtt_client.add_callback(topic=self.topic, callback=self.receive_callback)
|
|
mqtt_client.add_callback(topic=self.topic+"/#", callback=self.receive_callback)
|
|
#
|
|
self.add_callback(None, None, self.__state_logging__, on_change_only=True)
|
|
|
|
def __cfg_callback__(self, key, value, mid):
|
|
if self.CFG_DATA.get(key) != value and self.__cfg_by_mid__ != mid and mid is not None:
|
|
self.__cfg_by_mid__ = mid
|
|
self.logger.warning("Differing configuration identified: Sending default configuration to defice: %s", repr(self.CFG_DATA))
|
|
if self.TX_TYPE == self.TX_DICT:
|
|
self.mqtt_client.send(self.topic + '/' + self.TX_TOPIC, json.dumps(self.CFG_DATA))
|
|
else:
|
|
for key in self.CFG_DATA:
|
|
self.send_command(key, self.CFG_DATA.get(key))
|
|
|
|
def set(self, key, data, mid=None, block_callback=[]):
|
|
if key in self.CFG_DATA:
|
|
self.__cfg_callback__(key, data, mid)
|
|
if key in self.RX_IGNORE_KEYS:
|
|
pass # ignore these keys
|
|
elif key in self.RX_KEYS:
|
|
return super().set(key, data, block_callback)
|
|
else:
|
|
self.logger.warning("Unexpected key %s", key)
|
|
|
|
def receive_callback(self, client, userdata, message):
|
|
if message.topic != self.topic + '/' + videv_base.KEY_INFO:
|
|
content_key = message.topic[len(self.topic) + 1:]
|
|
if content_key not in self.RX_IGNORE_TOPICS and (not message.topic.endswith(self.TX_TOPIC) or len(self.TX_TOPIC) == 0):
|
|
self.logger.debug("Unpacking content_key \"%s\" from message.", content_key)
|
|
if is_json(message.payload):
|
|
data = json.loads(message.payload)
|
|
if type(data) is dict:
|
|
for key in data:
|
|
self.set(key, self.__device_to_instance_filter__(key, data[key]), message.mid)
|
|
else:
|
|
self.set(content_key, self.__device_to_instance_filter__(content_key, data), message.mid)
|
|
# String
|
|
else:
|
|
self.set(content_key, self.__device_to_instance_filter__(content_key, message.payload.decode('utf-8')), message.mid)
|
|
else:
|
|
self.logger.debug("Ignoring topic %s", content_key)
|
|
|
|
def __device_to_instance_filter__(self, key, data):
|
|
if key in self.RX_FILTER_DATA_KEYS:
|
|
if data in [1, 'on', 'ON']:
|
|
return True
|
|
elif data in [0, 'off', 'OFF']:
|
|
return False
|
|
return data
|
|
|
|
def __instance_to_device_filter__(self, key, data):
|
|
if key in self.TX_FILTER_DATA_KEYS:
|
|
if data is True:
|
|
return "on"
|
|
elif data is False:
|
|
return "off"
|
|
return data
|
|
|
|
def send_command(self, key, data):
|
|
data = self.__instance_to_device_filter__(key, data)
|
|
if self.TX_TOPIC is not None:
|
|
if self.TX_TYPE < 0:
|
|
self.logger.error("Unknown tx type. Set TX_TYPE of class to a known value")
|
|
else:
|
|
self.logger.debug("Sending data for %s - %s", key, str(data))
|
|
if self.TX_TYPE == self.TX_DICT:
|
|
try:
|
|
self.mqtt_client.send('/'.join([self.topic, self.TX_TOPIC]), json.dumps({key: data}))
|
|
except TypeError:
|
|
print(self.topic)
|
|
print(key.__dict__)
|
|
print(key)
|
|
print(data)
|
|
raise TypeError
|
|
else:
|
|
if type(data) not in [str, bytes]:
|
|
data = json.dumps(data)
|
|
self.mqtt_client.send('/'.join([self.topic, key, self.TX_TOPIC] if len(self.TX_TOPIC) > 0 else [self.topic, key]), data)
|
|
else:
|
|
self.logger.error("Unknown tx toptic. Set TX_TOPIC of class to a known value")
|
|
|
|
|
|
class base_rpc(mqtt_base):
|
|
SRC_RESPONSE = "/response"
|
|
SRC_NULL = "/null"
|
|
#
|
|
EVENTS_TOPIC = "/events/rpc"
|
|
TX_TOPIC = "/rpc"
|
|
RESPONSE_TOPIC = SRC_RESPONSE + "/rpc"
|
|
NULL_TOPIC = SRC_NULL + "/rpc"
|
|
#
|
|
RPC_ID_GET_STATUS = 1
|
|
RPC_ID_SET = 1734
|
|
#
|
|
|
|
def __init__(self, mqtt_client, topic):
|
|
super().__init__(mqtt_client, topic, default_values=dict.fromkeys(self.RX_KEYS))
|
|
# data storage
|
|
self.__cfg_by_mid__ = None
|
|
# initialisations
|
|
mqtt_client.add_callback(topic=self.topic, callback=self.receive_callback)
|
|
mqtt_client.add_callback(topic=self.topic+"/#", callback=self.receive_callback)
|
|
#
|
|
self.add_callback(None, None, self.__state_logging__, on_change_only=False)
|
|
#
|
|
self.rpc_get_status()
|
|
|
|
def receive_callback(self, client, userdata, message):
|
|
data = json.loads(message.payload)
|
|
#
|
|
if message.topic == self.topic + self.EVENTS_TOPIC:
|
|
self.events(data)
|
|
elif message.topic == self.topic + self.RESPONSE_TOPIC:
|
|
self.response(data)
|
|
elif message.topic == self.topic + self.NULL_TOPIC or message.topic == self.topic + self.TX_TOPIC or message.topic == self.topic + "/online":
|
|
pass # Ignore response
|
|
else:
|
|
self.logger.warning("Unexpected message received: %s::%s", message.topic, json.dumps(data, sort_keys=True, indent=4))
|
|
|
|
def events(self, data):
|
|
for rx_key in data["params"]:
|
|
if rx_key == "events":
|
|
for evt in data["params"]["events"]:
|
|
key = evt["component"]
|
|
event = evt["event"]
|
|
if key in self.RX_KEYS:
|
|
if event == "btn_down":
|
|
self.set(key, True)
|
|
elif event == "btn_up":
|
|
self.set(key, False)
|
|
else:
|
|
key = key + ":" + event
|
|
if key in self.RX_KEYS:
|
|
self.set(key, True)
|
|
else:
|
|
self.logger.warning("Unexpected event with data=%s", json.dumps(data, sort_keys=True, indent=4))
|
|
elif rx_key in self.RX_KEYS:
|
|
state = data["params"][rx_key].get("output")
|
|
if state is not None:
|
|
self.set(rx_key, state)
|
|
|
|
def response(self, data):
|
|
try:
|
|
rpc_id = data.get("id")
|
|
except AttributeError:
|
|
rpc_id = None
|
|
try:
|
|
rpc_method = data.get("method")
|
|
except AttributeError:
|
|
rpc_method = None
|
|
if rpc_id == self.RPC_ID_GET_STATUS:
|
|
#
|
|
# Shelly.GetStatus
|
|
#
|
|
for rx_key in data.get("result", []):
|
|
if rx_key in self.RX_KEYS:
|
|
key_data = data["result"][rx_key]
|
|
state = key_data.get("output", key_data.get("state"))
|
|
if state is not None:
|
|
self.set(rx_key, state)
|
|
else:
|
|
self.logger.warning("Unexpected response with data=%s", json.dumps(data, sort_keys=True, indent=4))
|
|
|
|
def rpc_tx(self, **kwargs):
|
|
if not "id" in kwargs:
|
|
raise AttributeError("'id' is missing in keyword arguments")
|
|
self.mqtt_client.send(self.topic + self.TX_TOPIC, json.dumps(kwargs))
|
|
|
|
def rpc_get_status(self):
|
|
self.rpc_tx(
|
|
id=self.RPC_ID_GET_STATUS,
|
|
src=self.topic + self.SRC_RESPONSE,
|
|
method="Shelly.GetStatus"
|
|
)
|
|
|
|
def rpc_switch_set(self, key, state: bool):
|
|
self.rpc_tx(
|
|
id=self.RPC_ID_SET,
|
|
src=self.topic + self.SRC_NULL,
|
|
method="Switch.Set",
|
|
params={"id": int(key[-1]), "on": state}
|
|
)
|
|
|
|
|
|
class base_output(base):
|
|
def __init__(self, mqtt_client, topic):
|
|
super().__init__(mqtt_client, topic)
|
|
self.__all_off_enabled__ = True
|
|
|
|
def disable_all_off(self, state=True):
|
|
self.__all_off_enabled__ = not state
|
|
|
|
def all_off(self):
|
|
if self.__all_off_enabled__:
|
|
try:
|
|
self.__all_off__()
|
|
except (AttributeError, TypeError) as e:
|
|
self.logger.warning("Method all_off was used, but __all_off__ method wasn't callable: %s", repr(e))
|
|
|
|
|
|
class videv_base(mqtt_base):
|
|
KEY_INFO = '__info__'
|
|
#
|
|
SET_TOPIC = "set"
|
|
|
|
def __init__(self, mqtt_client, topic, default_values=None):
|
|
super().__init__(mqtt_client, topic, default_values=default_values)
|
|
self.__display_dict__ = {}
|
|
self.__control_dict__ = {}
|
|
self.__periodic__ = task.periodic(300, self.send_all)
|
|
self.__periodic__.run()
|
|
|
|
def send_all(self, rt):
|
|
try:
|
|
for key in self:
|
|
if self[key] is not None:
|
|
self.__tx__(key, self[key])
|
|
except RuntimeError:
|
|
self.logger.warning("Runtimeerror while sending cyclic videv information. This may happen on startup.")
|
|
|
|
def add_display(self, my_key, ext_device, ext_key, on_change_only=True):
|
|
"""
|
|
listen to data changes of ext_device and update videv information
|
|
"""
|
|
if my_key not in self.keys():
|
|
self[my_key] = None
|
|
if ext_device.__class__.__name__ == "group":
|
|
# store information to identify callback from ext_device
|
|
self.__display_dict__[(id(ext_device[0]), ext_key)] = my_key
|
|
# register a callback to listen for data from external device
|
|
ext_device[0].add_callback(ext_key, None, self.__rx_ext_device_data__, on_change_only, init_now=True)
|
|
else:
|
|
# store information to identify callback from ext_device
|
|
self.__display_dict__[(id(ext_device), ext_key)] = my_key
|
|
# register a callback to listen for data from external device
|
|
ext_device.add_callback(ext_key, None, self.__rx_ext_device_data__, on_change_only, init_now=True)
|
|
# send initial display data to videv interface
|
|
data = ext_device.get(ext_key)
|
|
if data is not None:
|
|
self.__tx__(my_key, data)
|
|
|
|
def __rx_ext_device_data__(self, ext_device, ext_key, data):
|
|
my_key = self.__display_dict__[(id(ext_device), ext_key)]
|
|
self.set(my_key, data)
|
|
self.__tx__(my_key, data)
|
|
|
|
def __tx__(self, key, data):
|
|
if type(data) not in (str, ):
|
|
data = json.dumps(data)
|
|
self.mqtt_client.send('/'.join([self.topic, key]), data)
|
|
|
|
def add_control(self, my_key, ext_device, ext_key, on_change_only=True):
|
|
"""
|
|
listen to videv information and pass data to ext_device
|
|
"""
|
|
if my_key not in self.keys():
|
|
self[my_key] = None
|
|
# store information to identify callback from videv
|
|
self.__control_dict__[my_key] = (ext_device, ext_key, on_change_only)
|
|
# add callback for videv changes
|
|
self.mqtt_client.add_callback('/'.join([self.topic, my_key, self.SET_TOPIC]), self.__rx_videv_data__)
|
|
|
|
def __rx_videv_data__(self, client, userdata, message):
|
|
my_key = message.topic.split('/')[-2]
|
|
try:
|
|
data = json.loads(message.payload)
|
|
except json.decoder.JSONDecodeError:
|
|
data = message.payload
|
|
ext_device, ext_key, on_change_only = self.__control_dict__[my_key]
|
|
if my_key in self.keys():
|
|
if data != self[my_key] or not on_change_only:
|
|
ext_device.send_command(ext_key, data)
|
|
else:
|
|
self.logger.info("Ignoring rx message with topic %s", message.topic)
|
|
|
|
def add_routing(self, my_key, ext_device, ext_key, on_change_only_disp=True, on_change_only_videv=True):
|
|
"""
|
|
listen to data changes of ext_device and update videv information
|
|
and
|
|
listen to videv information and pass data to ext_device
|
|
"""
|
|
# add display
|
|
self.add_display(my_key, ext_device, ext_key, on_change_only_disp)
|
|
self.add_control(my_key, ext_device, ext_key, on_change_only_videv)
|