import json import logging import task try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' class common_base(dict): DEFAULT_VALUES = {} def __init__(self, default_values=None): super().__init__(default_values or self.DEFAULT_VALUES) # self.__callback_list__ = [] self.logger = logging.getLogger(ROOT_LOGGER_NAME).getChild("devices") def add_callback(self, key, data, callback, on_change_only=False, init_now=False): """ key: key or None for all keys data: data or None for all data """ cb_tup = (key, data, callback, on_change_only) if cb_tup not in self.__callback_list__: self.__callback_list__.append(cb_tup) if init_now and self.get(key) is not None: callback(self, key, self[key]) def set(self, key, data, block_callback=[]): if key in self.keys(): value_changed = self[key] != data self[key] = data for cb_key, cb_data, callback, on_change_only in self.__callback_list__: if cb_key is None or key == cb_key: # key fits to callback definition if cb_data is None or cb_data == self[key]: # data fits to callback definition if value_changed or not on_change_only: # change status fits to callback definition if not callback in block_callback: # block given callbacks callback(self, key, self[key]) else: self.logger.warning("Unexpected key %s", key) class mqtt_base(common_base): def __init__(self, mqtt_client, topic, default_values=None): super().__init__(default_values) # self.mqtt_client = mqtt_client self.topic = topic for entry in self.topic.split('/'): self.logger = self.logger.getChild(entry) class videv_base(mqtt_base): KEY_INFO = '__info__' # SET_TOPIC = "set" def __init__(self, mqtt_client, topic, default_values=None): super().__init__(mqtt_client, topic, default_values=default_values) self.__display_dict__ = {} self.__control_dict__ = {} self.__periodic__ = task.periodic(300, self.send_all) self.__periodic__.run() def send_all(self, rt): try: for key in self: if self[key] is not None: self.__tx__(key, self[key]) except RuntimeError: self.logger.warning("Runtimeerror while sending cyclic videv information. This may happen on startup.") def add_display(self, my_key, ext_device, ext_key, on_change_only=True): """ listen to data changes of ext_device and update videv information """ if my_key not in self.keys(): self[my_key] = None if ext_device.__class__.__name__ == "group": # store information to identify callback from ext_device self.__display_dict__[(id(ext_device[0]), ext_key)] = my_key # register a callback to listen for data from external device ext_device[0].add_callback(ext_key, None, self.__rx_ext_device_data__, on_change_only, init_now=True) else: # store information to identify callback from ext_device self.__display_dict__[(id(ext_device), ext_key)] = my_key # register a callback to listen for data from external device ext_device.add_callback(ext_key, None, self.__rx_ext_device_data__, on_change_only, init_now=True) # send initial display data to videv interface data = ext_device.get(ext_key) if data is not None: self.__tx__(my_key, data) def __rx_ext_device_data__(self, ext_device, ext_key, data): my_key = self.__display_dict__[(id(ext_device), ext_key)] self.set(my_key, data) self.__tx__(my_key, data) def __tx__(self, key, data): if type(data) not in (str, ): data = json.dumps(data) self.mqtt_client.send('/'.join([self.topic, key]), data) def add_control(self, my_key, ext_device, ext_key, on_change_only=True): """ listen to videv information and pass data to ext_device """ if my_key not in self.keys(): self[my_key] = None # store information to identify callback from videv self.__control_dict__[my_key] = (ext_device, ext_key, on_change_only) # add callback for videv changes self.mqtt_client.add_callback('/'.join([self.topic, my_key, self.SET_TOPIC]), self.__rx_videv_data__) def __rx_videv_data__(self, client, userdata, message): my_key = message.topic.split('/')[-2] try: data = json.loads(message.payload) except json.decoder.JSONDecodeError: data = message.payload ext_device, ext_key, on_change_only = self.__control_dict__[my_key] if my_key in self.keys(): if data != self[my_key] or not on_change_only: ext_device.send_command(ext_key, data) else: self.logger.info("Ignoring rx message with topic %s", message.topic) def add_routing(self, my_key, ext_device, ext_key, on_change_only_disp=True, on_change_only_videv=True): """ listen to data changes of ext_device and update videv information and listen to videv information and pass data to ext_device """ # add display self.add_display(my_key, ext_device, ext_key, on_change_only_disp) self.add_control(my_key, ext_device, ext_key, on_change_only_videv)