#!/usr/bin/env python # -*- coding: utf-8 -*- # """ mqtt (MQTT) =========== **Author:** * Dirk Alders **Description:** This Module supports functionality for mqtt connections **Submodules:** * :mod:`mqtt_client` **Unittest:** See also the :download:`unittest ` documentation. **Module Documentation:** """ __DEPENDENCIES__ = [] import logging import paho.mqtt.client as paho import socket import time try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) def get_topic_logger(topic): topic_logger = logger for entry in topic.split('/'): topic_logger = topic_logger.getChild(entry) return topic_logger class mqtt_client(object): def __init__(self, name, host, port=1883, username=None, password=None): self.__block_add_callbacks__ = True logger.info("Initiating mqtt client instance") self.__callbacks__ = {} self.__client__ = paho.Client(paho.CallbackAPIVersion.VERSION2) # create client object if username is not None and password is not None: logger.debug("Configuring authentification") self.__client__.username_pw_set(username, password) # login with credentials self.__client__.on_message = self.__receive__ # attach function to callback self.__client__.on_connect = self.__on_connect__ self.__client__.on_disconnect = self.__on_disconnect__ try: self.__client__.connect(host, port, 60) # establish connection except (socket.timeout, OSError) as e: logger.warning("Can not connect to MQTT-Server") self.__client__.loop_start() # start the loop def add_callback(self, topic, callback): while self.__block_add_callbacks__: time.sleep(.1) logger.debug("Adding callback for topic %s", topic) self.__callbacks__[topic] = callback self.__client__.subscribe(topic) def send(self, topic, payload): get_topic_logger(topic).debug("Sending message with topic %s and payload %s", topic, str(payload)) self.__client__.publish(topic, payload) def __on_connect__(self, client, userdata, flags, rc, properties): logger.debug("Connect with rc=%s", repr(rc)) if rc == 0: self.__block_add_callbacks__ = True logger.debug("Registering topics...") for topic in self.__callbacks__: self.__client__.subscribe(topic) self.__block_add_callbacks__ = False def __on_disconnect__(self, client, userdata, flags, rc, properties): logger.warning("Disconnect with rc=%s", repr(rc)) def __receive__(self, client, userdata, message): message.mid = self.__client__._mid_generate() # add a message id to the received message get_topic_logger(message.topic).debug("Received message with topic %s and payload %s", message.topic, str(message.payload)) for topic in self.__callbacks__.copy(): if topic.endswith('#'): if message.topic.startswith(topic[:-1]): self.__callbacks__[topic](client, userdata, message) else: if topic == message.topic: self.__callbacks__[topic](client, userdata, message)