MQTT Interface
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

__init__.py 3.4KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. """
  5. mqtt (MQTT)
  6. ===========
  7. **Author:**
  8. * Dirk Alders <sudo-dirk@mount-mockery.de>
  9. **Description:**
  10. This Module supports functionality for mqtt connections
  11. **Submodules:**
  12. * :mod:`mqtt_client`
  13. **Unittest:**
  14. See also the :download:`unittest <mqtt/_testresults_/unittest.pdf>` documentation.
  15. **Module Documentation:**
  16. """
  17. __DEPENDENCIES__ = []
  18. import logging
  19. import paho.mqtt.client as paho
  20. import socket
  21. import time
  22. try:
  23. from config import APP_NAME as ROOT_LOGGER_NAME
  24. except ImportError:
  25. ROOT_LOGGER_NAME = 'root'
  26. logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
  27. def get_topic_logger(topic):
  28. topic_logger = logger
  29. for entry in topic.split('/'):
  30. topic_logger = topic_logger.getChild(entry)
  31. return topic_logger
  32. class mqtt_client(object):
  33. def __init__(self, name, host, port=1883, username=None, password=None):
  34. self.__block_add_callbacks__ = True
  35. logger.info("Initiating mqtt client instance")
  36. self.__callbacks__ = {}
  37. self.__client__ = paho.Client(paho.CallbackAPIVersion.VERSION2) # create client object
  38. if username is not None and password is not None:
  39. logger.debug("Configuring authentification")
  40. self.__client__.username_pw_set(username, password) # login with credentials
  41. self.__client__.on_message = self.__receive__ # attach function to callback
  42. self.__client__.on_connect = self.__on_connect__
  43. self.__client__.on_disconnect = self.__on_disconnect__
  44. try:
  45. self.__client__.connect(host, port, 60) # establish connection
  46. except (socket.timeout, OSError) as e:
  47. logger.warning("Can not connect to MQTT-Server")
  48. self.__client__.loop_start() # start the loop
  49. def add_callback(self, topic, callback):
  50. while self.__block_add_callbacks__:
  51. time.sleep(.1)
  52. logger.debug("Adding callback for topic %s", topic)
  53. self.__callbacks__[topic] = callback
  54. self.__client__.subscribe(topic)
  55. def send(self, topic, payload):
  56. get_topic_logger(topic).debug("Sending message with topic %s and payload %s", topic, str(payload))
  57. self.__client__.publish(topic, payload)
  58. def __on_connect__(self, client, userdata, flags, rc, properties):
  59. logger.debug("Connect with rc=%s", repr(rc))
  60. if rc == 0:
  61. self.__block_add_callbacks__ = True
  62. logger.debug("Registering topics...")
  63. for topic in self.__callbacks__:
  64. self.__client__.subscribe(topic)
  65. self.__block_add_callbacks__ = False
  66. def __on_disconnect__(self, client, userdata, flags, rc, properties):
  67. logger.warning("Disconnect with rc=%s", repr(rc))
  68. def __receive__(self, client, userdata, message):
  69. get_topic_logger(message.topic).debug("Received message with topic %s and payload %s", message.topic, str(message.payload))
  70. for topic in self.__callbacks__.copy():
  71. if topic.endswith('#'):
  72. if message.topic.startswith(topic[:-1]):
  73. self.__callbacks__[topic](client, userdata, message)
  74. else:
  75. if topic == message.topic:
  76. self.__callbacks__[topic](client, userdata, message)