123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- from base import mqtt_base
- from base import videv_base
- import json
-
-
- def is_json(data):
- try:
- json.loads(data)
- except json.decoder.JSONDecodeError:
- return False
- else:
- return True
-
-
- class base(mqtt_base):
- TX_TOPIC = "set"
- TX_VALUE = 0
- TX_DICT = 1
- TX_TYPE = -1
- TX_FILTER_DATA_KEYS = []
- #
- RX_KEYS = []
- RX_IGNORE_TOPICS = []
- RX_IGNORE_KEYS = []
- RX_FILTER_DATA_KEYS = []
- #
- CFG_DATA = {}
-
- def __init__(self, mqtt_client, topic):
- super().__init__(mqtt_client, topic, default_values=dict.fromkeys(self.RX_KEYS))
- # data storage
- self.__cfg_by_mid__ = None
- # initialisations
- mqtt_client.add_callback(topic=self.topic, callback=self.receive_callback)
- mqtt_client.add_callback(topic=self.topic+"/#", callback=self.receive_callback)
- #
- self.add_callback(None, None, self.__state_logging__, on_change_only=True)
-
- def __cfg_callback__(self, key, value, mid):
- if self.CFG_DATA.get(key) != value and self.__cfg_by_mid__ != mid and mid is not None:
- self.__cfg_by_mid__ = mid
- self.logger.warning("Differing configuration identified: Sending default configuration to defice: %s", repr(self.CFG_DATA))
- if self.TX_TYPE == self.TX_DICT:
- self.mqtt_client.send(self.topic + '/' + self.TX_TOPIC, json.dumps(self.CFG_DATA))
- else:
- for key in self.CFG_DATA:
- self.send_command(key, self.CFG_DATA.get(key))
-
- def set(self, key, data, mid=None, block_callback=[]):
- if key in self.CFG_DATA:
- self.__cfg_callback__(key, data, mid)
- if key in self.RX_IGNORE_KEYS:
- pass # ignore these keys
- elif key in self.RX_KEYS:
- return super().set(key, data, block_callback)
- else:
- self.logger.warning("Unexpected key %s", key)
-
- def receive_callback(self, client, userdata, message):
- if message.topic != self.topic + '/' + videv_base.KEY_INFO:
- content_key = message.topic[len(self.topic) + 1:]
- if content_key not in self.RX_IGNORE_TOPICS and (not message.topic.endswith(self.TX_TOPIC) or len(self.TX_TOPIC) == 0):
- self.logger.debug("Unpacking content_key \"%s\" from message.", content_key)
- if is_json(message.payload):
- data = json.loads(message.payload)
- if type(data) is dict:
- for key in data:
- self.set(key, self.__device_to_instance_filter__(key, data[key]), message.mid)
- else:
- self.set(content_key, self.__device_to_instance_filter__(content_key, data), message.mid)
- # String
- else:
- self.set(content_key, self.__device_to_instance_filter__(content_key, message.payload.decode('utf-8')), message.mid)
- else:
- self.logger.debug("Ignoring topic %s", content_key)
-
- def __device_to_instance_filter__(self, key, data):
- if key in self.RX_FILTER_DATA_KEYS:
- if data in [1, 'on', 'ON']:
- return True
- elif data in [0, 'off', 'OFF']:
- return False
- return data
-
- def __instance_to_device_filter__(self, key, data):
- if key in self.TX_FILTER_DATA_KEYS:
- if data is True:
- return "on"
- elif data is False:
- return "off"
- return data
-
- def send_command(self, key, data):
- data = self.__instance_to_device_filter__(key, data)
- if self.TX_TOPIC is not None:
- if self.TX_TYPE < 0:
- self.logger.error("Unknown tx type. Set TX_TYPE of class to a known value")
- else:
- self.logger.debug("Sending data for %s - %s", key, str(data))
- if self.TX_TYPE == self.TX_DICT:
- try:
- self.mqtt_client.send('/'.join([self.topic, self.TX_TOPIC]), json.dumps({key: data}))
- except TypeError:
- print(self.topic)
- print(key.__dict__)
- print(key)
- print(data)
- raise TypeError
- else:
- if type(data) not in [str, bytes]:
- data = json.dumps(data)
- self.mqtt_client.send('/'.join([self.topic, key, self.TX_TOPIC] if len(self.TX_TOPIC) > 0 else [self.topic, key]), data)
- else:
- self.logger.error("Unknown tx toptic. Set TX_TOPIC of class to a known value")
-
-
- class base_rpc(mqtt_base):
- SRC_RESPONSE = "/response"
- SRC_NULL = "/null"
- #
- EVENTS_TOPIC = "/events/rpc"
- TX_TOPIC = "/rpc"
- RESPONSE_TOPIC = SRC_RESPONSE + "/rpc"
- NULL_TOPIC = SRC_NULL + "/rpc"
- #
- RPC_ID_GET_STATUS = 1
- RPC_ID_SET = 1734
- #
-
- def __init__(self, mqtt_client, topic):
- super().__init__(mqtt_client, topic, default_values=dict.fromkeys(self.RX_KEYS))
- # data storage
- self.__cfg_by_mid__ = None
- # initialisations
- mqtt_client.add_callback(topic=self.topic, callback=self.receive_callback)
- mqtt_client.add_callback(topic=self.topic+"/#", callback=self.receive_callback)
- #
- self.add_callback(None, None, self.__state_logging__, on_change_only=False)
- #
- self.rpc_get_status()
-
- def receive_callback(self, client, userdata, message):
- data = json.loads(message.payload)
- #
- if message.topic == self.topic + self.EVENTS_TOPIC:
- self.events(data)
- elif message.topic == self.topic + self.RESPONSE_TOPIC:
- self.response(data)
- elif message.topic == self.topic + self.NULL_TOPIC or message.topic == self.topic + self.TX_TOPIC or message.topic == self.topic + "/online":
- pass # Ignore response
- else:
- self.logger.warning("Unexpected message received: %s::%s", message.topic, json.dumps(data, sort_keys=True, indent=4))
-
- def events(self, data):
- for rx_key in data["params"]:
- if rx_key == "events":
- for evt in data["params"]["events"]:
- key = evt["component"]
- event = evt["event"]
- if key in self.RX_KEYS:
- if event == "btn_down":
- self.set(key, True)
- elif event == "btn_up":
- self.set(key, False)
- else:
- key = key + ":" + event
- if key in self.RX_KEYS:
- self.set(key, True)
- else:
- self.logger.warning("Unexpected event with data=%s", json.dumps(data, sort_keys=True, indent=4))
- elif rx_key in self.RX_KEYS:
- state = data["params"][rx_key].get("output")
- if state is not None:
- self.set(rx_key, state)
-
- def response(self, data):
- try:
- rpc_id = data.get("id")
- except AttributeError:
- rpc_id = None
- try:
- rpc_method = data.get("method")
- except AttributeError:
- rpc_method = None
- if rpc_id == self.RPC_ID_GET_STATUS:
- #
- # Shelly.GetStatus
- #
- for rx_key in data.get("result", []):
- if rx_key in self.RX_KEYS:
- key_data = data["result"][rx_key]
- state = key_data.get("output", key_data.get("state"))
- if state is not None:
- self.set(rx_key, state)
- else:
- self.logger.warning("Unexpected response with data=%s", json.dumps(data, sort_keys=True, indent=4))
-
- def rpc_tx(self, **kwargs):
- if not "id" in kwargs:
- raise AttributeError("'id' is missing in keyword arguments")
- self.mqtt_client.send(self.topic + self.TX_TOPIC, json.dumps(kwargs))
-
- def rpc_get_status(self):
- self.rpc_tx(
- id=self.RPC_ID_GET_STATUS,
- src=self.topic + self.SRC_RESPONSE,
- method="Shelly.GetStatus"
- )
-
- def rpc_switch_set(self, key, state: bool):
- self.rpc_tx(
- id=self.RPC_ID_SET,
- src=self.topic + self.SRC_NULL,
- method="Switch.Set",
- params={"id": int(key[-1]), "on": state}
- )
-
-
- class base_output(base):
- def __init__(self, mqtt_client, topic):
- super().__init__(mqtt_client, topic)
- self.__all_off_enabled__ = True
-
- def disable_all_off(self, state=True):
- self.__all_off_enabled__ = not state
-
- def all_off(self):
- if self.__all_off_enabled__:
- try:
- self.__all_off__()
- except (AttributeError, TypeError) as e:
- self.logger.warning("Method all_off was used, but __all_off__ method wasn't callable: %s", repr(e))
|