36 wiersze
1.3 KiB
Python
36 wiersze
1.3 KiB
Python
import json
|
|
from mqtt import mqtt_client
|
|
import time
|
|
|
|
TEST_CLIENT_ID = "__test_device_tester__"
|
|
|
|
|
|
mqtt_test_client = mqtt_client(TEST_CLIENT_ID, "localhost")
|
|
|
|
|
|
def init_state(all_state_keys, device):
|
|
for state_topic in all_state_keys:
|
|
assert device.get(state_topic, 0) == None
|
|
|
|
|
|
def state_change_by_mqtt(all_state_keys, num_states, mqtt_test_client, base_topic, device, mqtt_data, state_data, warning_condition, mqtt_signal_time):
|
|
tm_warning = None
|
|
|
|
for i in range(num_states):
|
|
for state_topic in all_state_keys:
|
|
if device.TX_TYPE == device.TX_VALUE:
|
|
data = json.dumps(mqtt_data(state_topic)[i])
|
|
mqtt_test_client.send(base_topic + '/' + state_topic, data)
|
|
elif device.TX_TYPE == device.TX_DICT:
|
|
mqtt_test_client.send(base_topic, json.dumps({state_topic: mqtt_data(state_topic)[i]}))
|
|
else:
|
|
raise TypeError("Unknown TX_TYPE for device.")
|
|
if callable(warning_condition):
|
|
if warning_condition(state_topic, mqtt_data(state_topic)[i]):
|
|
tm_warning = int(time.time())
|
|
time.sleep(mqtt_signal_time)
|
|
for state_topic in all_state_keys:
|
|
assert device.get(state_topic) == state_data(state_topic)[i]
|
|
|
|
return tm_warning
|