Execute a command on receiving a mqtt message

exec_command.py 3.2KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import config
  2. import json
  3. import logging
  4. import mqtt
  5. import os
  6. import report
  7. import subprocess
  8. import time
  9. try:
  10. from config import APP_NAME as ROOT_LOGGER_NAME
  11. except ImportError:
  12. ROOT_LOGGER_NAME = 'root'
  13. logger = logging.getLogger(ROOT_LOGGER_NAME).getChild('main')
  14. my_handler = logging.handlers.RotatingFileHandler(os.path.join(config.__BASEPATH__, 'door_bell.log'), mode='a', maxBytes=5*1024*1024, backupCount=3, encoding=None, delay=0)
  15. my_handler.setLevel(logging.INFO)
  16. my_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
  17. bell_log = logging.getLogger('root')
  18. bell_log.setLevel(logging.INFO)
  19. bell_log.addHandler(my_handler)
  20. class exec_command(mqtt.mqtt_client):
  21. def __init__(self):
  22. self.__enabled__ = True
  23. mqtt.mqtt_client.__init__(self, config.APP_NAME, config.MQTT_SERVER, 1883, config.MQTT_USER, config.MQTT_PASS)
  24. self.add_callback(config.ENABLED_TOPIC + '/set', self.set_enabled)
  25. for topic in config.EXEC_LIST:
  26. self.add_callback(topic, self.mqtt_rx)
  27. # Start a pseudo process
  28. self.process = subprocess.Popen(["sleep", "0"])
  29. self.publish_states()
  30. def set_enabled(self, client, userdata, message):
  31. try:
  32. payload = json.loads(message.payload)
  33. except:
  34. logger.exception("Error decoding json mqtt message")
  35. else:
  36. if self.__enabled__ != payload:
  37. self.__enabled__ = payload
  38. self.publish_states()
  39. def exec_command(self, cmd, message):
  40. self.process = subprocess.Popen(cmd.split(" "))
  41. if 'bell' in message.topic:
  42. bell_log.info('Door Bell activated (%s)', message.topic)
  43. def mqtt_rx(self, client, userdate, message):
  44. payload = None
  45. key = config.EXEC_LIST[message.topic].get('key')
  46. data = config.EXEC_LIST[message.topic].get('data')
  47. if key is None:
  48. try:
  49. payload = message.payload.decode('utf-8')
  50. except:
  51. logger.exception("Error decoding mqtt message")
  52. else:
  53. try:
  54. payload = json.loads(message.payload)
  55. except:
  56. logger.exception("Error decoding json mqtt message")
  57. else:
  58. try:
  59. payload = payload.get(key)
  60. except AttributeError:
  61. logger.exception("payload seems to be no dictionary")
  62. if data is None or payload == data:
  63. if self.__enabled__:
  64. if self.process.poll() is None:
  65. self.process.kill()
  66. logger.debug("Starting execution in background...")
  67. self.exec_command(config.EXEC_LIST[message.topic]['command'], message)
  68. else:
  69. logger.info("Execution is disabled")
  70. def publish_states(self):
  71. self.send(config.ENABLED_TOPIC, json.dumps(self.__enabled__))
  72. if __name__ == '__main__':
  73. report.appLoggingConfigure(config.__BASEPATH__, config.LOGTARGET, ((config.APP_NAME, config.LOGLVL), ), fmt=config.formatter, host=config.LOGHOST, port=config.LOGPORT)
  74. #
  75. ec = exec_command()
  76. #
  77. while True:
  78. time.sleep(30)
  79. ec.publish_states()
  80. try:
  81. ec.join()
  82. finally:
  83. ec.stop()