MQTT Interface
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

__init__.py 3.1KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. """
  5. mqtt (MQTT)
  6. ===========
  7. **Author:**
  8. * Dirk Alders <sudo-dirk@mount-mockery.de>
  9. **Description:**
  10. This Module supports functionality for mqtt connections
  11. **Submodules:**
  12. * :mod:`mqtt_client`
  13. **Unittest:**
  14. See also the :download:`unittest <mqtt/_testresults_/unittest.pdf>` documentation.
  15. **Module Documentation:**
  16. """
  17. __DEPENDENCIES__ = []
  18. import logging
  19. import paho.mqtt.client as paho
  20. import socket
  21. import time
  22. try:
  23. from config import APP_NAME as ROOT_LOGGER_NAME
  24. except ImportError:
  25. ROOT_LOGGER_NAME = 'root'
  26. logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
  27. class mqtt_client(object):
  28. def __init__(self, name, host, port=1883, username=None, password=None):
  29. self.__block_add_callbacks__ = True
  30. logger.info("Initiating mqtt client instance")
  31. self.__callbacks__ = {}
  32. self.__client__ = paho.Client(name) # create client object
  33. if username is not None and password is not None:
  34. logger.debug("Configuring authentification")
  35. self.__client__.username_pw_set(username, password) # login with credentials
  36. self.__client__.on_message = self.__receive__ # attach function to callback
  37. self.__client__.on_connect = self.__on_connect__
  38. self.__client__.on_disconnect = self.__on_disconnect__
  39. try:
  40. self.__client__.connect(host, port) # establish connection
  41. except (socket.timeout, OSError) as e:
  42. logger.warning("Can not connect to MQTT-Server")
  43. self.__client__.loop_start() # start the loop
  44. def add_callback(self, topic, callback):
  45. while self.__block_add_callbacks__:
  46. time.sleep(.1)
  47. logger.debug("Adding callback for topic %s", topic)
  48. self.__callbacks__[topic] = callback
  49. self.__client__.subscribe(topic)
  50. def send(self, topic, payload):
  51. logger.debug("Sending message with topic %s and payload %s",
  52. topic, str(payload))
  53. self.__client__.publish(topic, payload)
  54. def __on_connect__(self, client, userdata, flags, rc):
  55. logger.debug("Connect with rc=%d", rc)
  56. if rc == 0:
  57. self.__block_add_callbacks__ = True
  58. logger.debug("Registering topics...")
  59. for topic in self.__callbacks__:
  60. self.__client__.subscribe(topic)
  61. self.__block_add_callbacks__ = False
  62. def __on_disconnect__(self, client, flags, rc):
  63. logger.warning("Disconnect with rc=%d", rc)
  64. def __receive__(self, client, userdata, message):
  65. logger.debug("Received message with topic %s and payload %s", message.topic, str(message.payload))
  66. for topic in self.__callbacks__:
  67. if topic.endswith('#'):
  68. if message.topic.startswith(topic[:-1]):
  69. self.__callbacks__[topic](client, userdata, message)
  70. else:
  71. if topic == message.topic:
  72. self.__callbacks__[topic](client, userdata, message)