Execute a command on receiving a mqtt message
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

exec_command.py 2.1KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import config
  2. import json
  3. import logging
  4. import mqtt
  5. import report
  6. import subprocess
  7. import time
  8. try:
  9. from config import APP_NAME as ROOT_LOGGER_NAME
  10. except ImportError:
  11. ROOT_LOGGER_NAME = 'root'
  12. logger = logging.getLogger(ROOT_LOGGER_NAME).getChild('main')
  13. class exec_command(mqtt.mqtt_client):
  14. def __init__(self):
  15. self.__block_execution__ = False
  16. mqtt.mqtt_client.__init__(self, config.APP_NAME, config.MQTT_SERVER, 1883, config.MQTT_USER, config.MQTT_PASS)
  17. for topic in config.EXEC_LIST:
  18. self.add_callback(topic, self.mqtt_rx)
  19. # Start a pseudo process
  20. self.process = subprocess.Popen(["sleep", "0"])
  21. def exec_command(self, cmd):
  22. self.process = subprocess.Popen(cmd.split(" "))
  23. def mqtt_rx(self, client, userdate, message):
  24. payload = None
  25. key = config.EXEC_LIST[message.topic].get('key')
  26. data = config.EXEC_LIST[message.topic].get('data')
  27. if key is None:
  28. try:
  29. payload = message.payload.decode('utf-8')
  30. except:
  31. logger.exception("Error decoding mqtt message")
  32. else:
  33. try:
  34. payload = json.loads(message.payload)
  35. except:
  36. logger.exception("Error decoding json mqtt message")
  37. else:
  38. try:
  39. payload = payload.get(key)
  40. except AttributeError:
  41. logger.exception("payload seems to be no dictionary")
  42. if data is None or payload == data:
  43. if self.process.poll() is None:
  44. self.process.kill()
  45. logger.debug("Starting execution in background...")
  46. self.exec_command(config.EXEC_LIST[message.topic]['command'])
  47. if __name__ == '__main__':
  48. report.appLoggingConfigure(config.__BASEPATH__, config.LOGTARGET, ((config.APP_NAME, config.LOGLVL), ), fmt=config.formatter, host=config.LOGHOST, port=config.LOGPORT)
  49. #
  50. ec = exec_command()
  51. #
  52. while True:
  53. time.sleep(30)
  54. try:
  55. ec.join()
  56. finally:
  57. ec.stop()