smart_brain/__test__/devices/test_shelly.py

251 řádky
8.6 KiB
Python

from module import mqtt_test_client, init_state, state_change_by_mqtt
from devices import shelly_sw1 as test_device
from devices.base import warning
from mqtt import mqtt_client
import pytest
import time
DUT_CLIENT_ID = "__%s__" % __name__
TOPIC = "__test__/%s" % __name__
#
MQTT_SIGNAL_TIME = 0.2
ALL_STATE_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "temperature", "overtemperature"]
BOOL_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"]
@pytest.fixture
def this_device():
mc = mqtt_client(DUT_CLIENT_ID, 'localhost')
return test_device(mc, TOPIC)
def test_initial_states(this_device: test_device):
# test all initial values
init_state(ALL_STATE_KEYS, this_device)
def test_state_change_by_mqtt(this_device: test_device):
def state_data(key):
if key in BOOL_KEYS:
return (True, False)
elif key == "temperature":
return (85.3, 20.1)
else:
raise IndexError("No return value defined for key %s" % key)
def mqtt_data(key):
if key in ["relay/0", "relay/1"]:
return ('on', 'off')
elif key in ["input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"]:
return (1, 0)
else:
return state_data(key)
def warning_condition(state_topic, value):
return state_topic == "overtemperature" and value == 1
# test state changes
tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device,
mqtt_data, state_data, warning_condition, MQTT_SIGNAL_TIME)
# test warning
w: warning = this_device.get(this_device.KEY_WARNING)
assert w.get(w.KEY_ID) == TOPIC
assert w.get(w.KEY_TYPE) == w.TYPE_OVERTEMPERATURE
wt = time.mktime(w.get(w.KEY_TM))
wt_min = tm_warning
wt_max = tm_warning + 2
assert wt >= wt_min and wt <= wt_max
def test_specific_get_functions(this_device: test_device):
assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0)
assert this_device.output_1 == this_device.get(this_device.KEY_OUTPUT_1)
assert this_device.input_0 == this_device.get(this_device.KEY_INPUT_0)
assert this_device.input_1 == this_device.get(this_device.KEY_INPUT_1)
assert this_device.longpush_0 == this_device.get(this_device.KEY_LONGPUSH_0)
assert this_device.longpush_1 == this_device.get(this_device.KEY_LONGPUSH_1)
assert this_device.temperature == this_device.get(this_device.KEY_TEMPERATURE)
def test_send_command(this_device: test_device):
this_device.set_output_0(True)
this_device.set_output_0(False)
'''
class shelly(base):
""" Communication (MQTT)
shelly
+- relay
| +- 0 ["on" / "off"] <- status
| | +- command ["on"/ "off"] <- command
| | +- energy [numeric] <- status
| +- 1 ["on" / "off"] <- status
| +- command ["on"/ "off"] <- command
| +- energy [numeric] <- status
+- input
| +- 0 [0 / 1] <- status
| +- 1 [0 / 1] <- status
+- input_event
| +- 0 <- status
| +- 1 <- status
+- logpush
| +- 0 [0 / 1] <- status
| +- 1 [0 / 1] <- status
+- temperature [numeric] °C <- status
+- temperature_f [numeric] F <- status
+- overtemperature [0 / 1] <- status
+- id <- status
+- model <- status
+- mac <- status
+- ip <- status
+- new_fw <- status
+- fw_ver <- status
"""
KEY_OUTPUT_0 = "relay/0"
KEY_OUTPUT_1 = "relay/1"
KEY_INPUT_0 = "input/0"
KEY_INPUT_1 = "input/1"
KEY_LONGPUSH_0 = "longpush/0"
KEY_LONGPUSH_1 = "longpush/1"
KEY_TEMPERATURE = "temperature"
KEY_OVERTEMPERATURE = "overtemperature"
KEY_ID = "id"
KEY_MODEL = "model"
KEY_MAC = "mac"
KEY_IP = "ip"
KEY_NEW_FIRMWARE = "new_fw"
KEY_FIRMWARE_VERSION = "fw_ver"
#
TX_TOPIC = "command"
TX_TYPE = base.TX_VALUE
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1]
#
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE,
KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION]
RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1']
RX_IGNORE_KEYS = ['temperature_f']
RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
self.output_key_delayed = None
self.delayed_flash_task = task.delayed(0.3, self.flash_task)
self.delayed_off_task = task.delayed(0.3, self.off_task)
#
self.add_callback(self.KEY_OVERTEMPERATURE, True, self.__warning__, True)
#
self.all_off_requested = False
def flash_task(self, *args):
if self.flash_active:
self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed))
self.output_key_delayed = None
if self.all_off_requested:
self.delayed_off_task.run()
def off_task(self, *args):
self.all_off()
@property
def flash_active(self):
return self.output_key_delayed is not None
#
# WARNING CALL
#
def __warning__(self, client, key, data):
w = warning(self.topic, warning.TYPE_OVERTEMPERATURE, "Temperature to high (%.1f°C)", self.get(self.KEY_TEMPERATURE) or math.nan)
self.logger.warning(w)
self.set(self.KEY_WARNING, w)
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def output_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_1)
@property
def input_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_INPUT_0)
@property
def input_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_INPUT_1)
@property
def longpush_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_LONGPUSH_0)
@property
def longpush_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_LONGPUSH_1)
@property
def temperature(self):
"""rv: numeric value"""
return self.get(self.KEY_TEMPERATURE)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_0, state)
def set_output_0_mcb(self, device, key, data):
self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data))
self.set_output_0(data)
def toggle_output_0_mcb(self, device, key, data):
self.logger.info("Toggeling output 0")
self.set_output_0(not self.output_0)
def set_output_1(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_1, state)
def set_output_1_mcb(self, device, key, data):
self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data))
self.set_output_1(data)
def toggle_output_1_mcb(self, device, key, data):
self.logger.info("Toggeling output 1")
self.set_output_1(not self.output_1)
def flash_0_mcb(self, device, key, data):
self.output_key_delayed = self.KEY_OUTPUT_0
self.toggle_output_0_mcb(device, key, data)
self.delayed_flash_task.run()
def flash_1_mcb(self, device, key, data):
self.output_key_delayed = self.KEY_OUTPUT_1
self.toggle_output_1_mcb(device, key, data)
self.delayed_flash_task.run()
def all_off(self):
if self.flash_active:
self.all_off_requested = True
else:
if self.output_0:
self.set_output_0(False)
if self.output_1:
self.set_output_1(False)
'''