Smarthome Functionen
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_shelly.py 8.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. from module import mqtt_test_client, init_state, state_change_by_mqtt
  2. from devices import shelly as test_device
  3. from devices import warning
  4. from mqtt import mqtt_client
  5. import pytest
  6. import time
  7. DUT_CLIENT_ID = "__%s__" % __name__
  8. TOPIC = "__test__/%s" % __name__
  9. #
  10. MQTT_SIGNAL_TIME = 0.2
  11. ALL_STATE_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "temperature", "overtemperature"]
  12. BOOL_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"]
  13. @pytest.fixture
  14. def this_device():
  15. mc = mqtt_client(DUT_CLIENT_ID, 'localhost')
  16. return test_device(mc, TOPIC)
  17. def test_initial_states(this_device: test_device):
  18. # test all initial values
  19. init_state(ALL_STATE_KEYS, this_device)
  20. def test_state_change_by_mqtt(this_device: test_device):
  21. def state_data(key):
  22. if key in BOOL_KEYS:
  23. return (True, False)
  24. elif key == "temperature":
  25. return (85.3, 20.1)
  26. else:
  27. raise IndexError("No return value defined for key %s" % key)
  28. def mqtt_data(key):
  29. if key in ["relay/0", "relay/1"]:
  30. return ('on', 'off')
  31. elif key in ["input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"]:
  32. return (1, 0)
  33. else:
  34. return state_data(key)
  35. def warning_condition(state_topic, value):
  36. return state_topic == "overtemperature" and value == 1
  37. # test state changes
  38. tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device,
  39. mqtt_data, state_data, warning_condition, MQTT_SIGNAL_TIME)
  40. # test warning
  41. w: warning = this_device.get(this_device.KEY_WARNING)
  42. assert w.get(w.KEY_ID) == TOPIC
  43. assert w.get(w.KEY_TYPE) == w.TYPE_OVERTEMPERATURE
  44. wt = time.mktime(w.get(w.KEY_TM))
  45. wt_min = tm_warning
  46. wt_max = tm_warning + 2
  47. assert wt >= wt_min and wt <= wt_max
  48. def test_specific_get_functions(this_device: test_device):
  49. assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0)
  50. assert this_device.output_1 == this_device.get(this_device.KEY_OUTPUT_1)
  51. assert this_device.input_0 == this_device.get(this_device.KEY_INPUT_0)
  52. assert this_device.input_1 == this_device.get(this_device.KEY_INPUT_1)
  53. assert this_device.longpush_0 == this_device.get(this_device.KEY_LONGPUSH_0)
  54. assert this_device.longpush_1 == this_device.get(this_device.KEY_LONGPUSH_1)
  55. assert this_device.temperature == this_device.get(this_device.KEY_TEMPERATURE)
  56. def test_send_command(this_device: test_device):
  57. this_device.set_output_0(True)
  58. this_device.set_output_0(False)
  59. '''
  60. class shelly(base):
  61. """ Communication (MQTT)
  62. shelly
  63. +- relay
  64. | +- 0 ["on" / "off"] <- status
  65. | | +- command ["on"/ "off"] <- command
  66. | | +- energy [numeric] <- status
  67. | +- 1 ["on" / "off"] <- status
  68. | +- command ["on"/ "off"] <- command
  69. | +- energy [numeric] <- status
  70. +- input
  71. | +- 0 [0 / 1] <- status
  72. | +- 1 [0 / 1] <- status
  73. +- input_event
  74. | +- 0 <- status
  75. | +- 1 <- status
  76. +- logpush
  77. | +- 0 [0 / 1] <- status
  78. | +- 1 [0 / 1] <- status
  79. +- temperature [numeric] °C <- status
  80. +- temperature_f [numeric] F <- status
  81. +- overtemperature [0 / 1] <- status
  82. +- id <- status
  83. +- model <- status
  84. +- mac <- status
  85. +- ip <- status
  86. +- new_fw <- status
  87. +- fw_ver <- status
  88. """
  89. KEY_OUTPUT_0 = "relay/0"
  90. KEY_OUTPUT_1 = "relay/1"
  91. KEY_INPUT_0 = "input/0"
  92. KEY_INPUT_1 = "input/1"
  93. KEY_LONGPUSH_0 = "longpush/0"
  94. KEY_LONGPUSH_1 = "longpush/1"
  95. KEY_TEMPERATURE = "temperature"
  96. KEY_OVERTEMPERATURE = "overtemperature"
  97. KEY_ID = "id"
  98. KEY_MODEL = "model"
  99. KEY_MAC = "mac"
  100. KEY_IP = "ip"
  101. KEY_NEW_FIRMWARE = "new_fw"
  102. KEY_FIRMWARE_VERSION = "fw_ver"
  103. #
  104. TX_TOPIC = "command"
  105. TX_TYPE = base.TX_VALUE
  106. TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1]
  107. #
  108. RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE,
  109. KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION]
  110. RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1']
  111. RX_IGNORE_KEYS = ['temperature_f']
  112. RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE]
  113. def __init__(self, mqtt_client, topic):
  114. super().__init__(mqtt_client, topic)
  115. #
  116. self.output_key_delayed = None
  117. self.delayed_flash_task = task.delayed(0.3, self.flash_task)
  118. self.delayed_off_task = task.delayed(0.3, self.off_task)
  119. #
  120. self.add_callback(self.KEY_OVERTEMPERATURE, True, self.__warning__, True)
  121. #
  122. self.all_off_requested = False
  123. def flash_task(self, *args):
  124. if self.flash_active:
  125. self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed))
  126. self.output_key_delayed = None
  127. if self.all_off_requested:
  128. self.delayed_off_task.run()
  129. def off_task(self, *args):
  130. self.all_off()
  131. @property
  132. def flash_active(self):
  133. return self.output_key_delayed is not None
  134. #
  135. # WARNING CALL
  136. #
  137. def __warning__(self, client, key, data):
  138. w = warning(self.topic, warning.TYPE_OVERTEMPERATURE, "Temperature to high (%.1f°C)", self.get(self.KEY_TEMPERATURE) or math.nan)
  139. self.logger.warning(w)
  140. self.set(self.KEY_WARNING, w)
  141. #
  142. # RX
  143. #
  144. @property
  145. def output_0(self):
  146. """rv: [True, False]"""
  147. return self.get(self.KEY_OUTPUT_0)
  148. @property
  149. def output_1(self):
  150. """rv: [True, False]"""
  151. return self.get(self.KEY_OUTPUT_1)
  152. @property
  153. def input_0(self):
  154. """rv: [True, False]"""
  155. return self.get(self.KEY_INPUT_0)
  156. @property
  157. def input_1(self):
  158. """rv: [True, False]"""
  159. return self.get(self.KEY_INPUT_1)
  160. @property
  161. def longpush_0(self):
  162. """rv: [True, False]"""
  163. return self.get(self.KEY_LONGPUSH_0)
  164. @property
  165. def longpush_1(self):
  166. """rv: [True, False]"""
  167. return self.get(self.KEY_LONGPUSH_1)
  168. @property
  169. def temperature(self):
  170. """rv: numeric value"""
  171. return self.get(self.KEY_TEMPERATURE)
  172. #
  173. # TX
  174. #
  175. def set_output_0(self, state):
  176. """state: [True, False]"""
  177. self.send_command(self.KEY_OUTPUT_0, state)
  178. def set_output_0_mcb(self, device, key, data):
  179. self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data))
  180. self.set_output_0(data)
  181. def toggle_output_0_mcb(self, device, key, data):
  182. self.logger.info("Toggeling output 0")
  183. self.set_output_0(not self.output_0)
  184. def set_output_1(self, state):
  185. """state: [True, False]"""
  186. self.send_command(self.KEY_OUTPUT_1, state)
  187. def set_output_1_mcb(self, device, key, data):
  188. self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data))
  189. self.set_output_1(data)
  190. def toggle_output_1_mcb(self, device, key, data):
  191. self.logger.info("Toggeling output 1")
  192. self.set_output_1(not self.output_1)
  193. def flash_0_mcb(self, device, key, data):
  194. self.output_key_delayed = self.KEY_OUTPUT_0
  195. self.toggle_output_0_mcb(device, key, data)
  196. self.delayed_flash_task.run()
  197. def flash_1_mcb(self, device, key, data):
  198. self.output_key_delayed = self.KEY_OUTPUT_1
  199. self.toggle_output_1_mcb(device, key, data)
  200. self.delayed_flash_task.run()
  201. def all_off(self):
  202. if self.flash_active:
  203. self.all_off_requested = True
  204. else:
  205. if self.output_0:
  206. self.set_output_0(False)
  207. if self.output_1:
  208. self.set_output_1(False)
  209. '''