initial testenvironment added
This commit is contained in:
parent
03e5e29f7b
commit
769dffe20d
22
.vscode/settings.json
vendored
22
.vscode/settings.json
vendored
@ -1,11 +1,15 @@
|
||||
{
|
||||
"python.defaultInterpreterPath": "./venv/bin/python",
|
||||
"editor.formatOnSave": true,
|
||||
"autopep8.args": [
|
||||
"--max-line-length=150"
|
||||
],
|
||||
"editor.fontSize": 14,
|
||||
"emmet.includeLanguages": {
|
||||
"django-html": "html"
|
||||
}
|
||||
"python.defaultInterpreterPath": "./venv/bin/python",
|
||||
"autopep8.args": ["--max-line-length=150"],
|
||||
"python.formatting.provider": "none",
|
||||
"[python]": {
|
||||
"editor.defaultFormatter": "ms-python.python",
|
||||
"editor.formatOnSave": true
|
||||
},
|
||||
"editor.formatOnSave": true,
|
||||
"editor.fontSize": 14,
|
||||
"emmet.includeLanguages": { "django-html": "html" },
|
||||
"python.testing.pytestArgs": ["-v", "--cov", "--cov-report=xml", "__test__"],
|
||||
"python.testing.unittestEnabled": false,
|
||||
"python.testing.pytestEnabled": true
|
||||
}
|
||||
|
35
__test__/devices/module.py
Normal file
35
__test__/devices/module.py
Normal file
@ -0,0 +1,35 @@
|
||||
import json
|
||||
from mqtt import mqtt_client
|
||||
import time
|
||||
|
||||
TEST_CLIENT_ID = "__test_device_tester__"
|
||||
|
||||
|
||||
mqtt_test_client = mqtt_client(TEST_CLIENT_ID, "localhost")
|
||||
|
||||
|
||||
def init_state(all_state_keys, device):
|
||||
for state_topic in all_state_keys:
|
||||
assert device.get(state_topic, 0) == None
|
||||
|
||||
|
||||
def state_change_by_mqtt(all_state_keys, num_states, mqtt_test_client, base_topic, device, mqtt_data, state_data, warning_condition, mqtt_signal_time):
|
||||
tm_warning = None
|
||||
|
||||
for i in range(num_states):
|
||||
for state_topic in all_state_keys:
|
||||
if device.TX_TYPE == device.TX_VALUE:
|
||||
data = json.dumps(mqtt_data(state_topic)[i])
|
||||
mqtt_test_client.send(base_topic + '/' + state_topic, data)
|
||||
elif device.TX_TYPE == device.TX_DICT:
|
||||
mqtt_test_client.send(base_topic, json.dumps({state_topic: mqtt_data(state_topic)[i]}))
|
||||
else:
|
||||
raise TypeError("Unknown TX_TYPE for device.")
|
||||
if callable(warning_condition):
|
||||
if warning_condition(state_topic, mqtt_data(state_topic)[i]):
|
||||
tm_warning = int(time.time())
|
||||
time.sleep(mqtt_signal_time)
|
||||
for state_topic in all_state_keys:
|
||||
assert device.get(state_topic) == state_data(state_topic)[i]
|
||||
|
||||
return tm_warning
|
45
__test__/devices/test_my_powerplug.py
Normal file
45
__test__/devices/test_my_powerplug.py
Normal file
@ -0,0 +1,45 @@
|
||||
from module import mqtt_test_client, init_state, state_change_by_mqtt
|
||||
from devices import my_powerplug as test_device
|
||||
from devices import warning
|
||||
from mqtt import mqtt_client
|
||||
import pytest
|
||||
import time
|
||||
|
||||
DUT_CLIENT_ID = "__%s__" % __name__
|
||||
TOPIC = "__test__/%s" % __name__
|
||||
#
|
||||
MQTT_SIGNAL_TIME = 0.2
|
||||
|
||||
|
||||
ALL_STATE_KEYS = ["output/1", "output/2", "output/3", "output/4", ]
|
||||
BOOL_KEYS = ALL_STATE_KEYS
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def this_device():
|
||||
mc = mqtt_client(DUT_CLIENT_ID, 'localhost')
|
||||
return test_device(mc, TOPIC)
|
||||
|
||||
|
||||
def test_initial_states(this_device: test_device):
|
||||
# test all initial values
|
||||
init_state(ALL_STATE_KEYS, this_device)
|
||||
|
||||
|
||||
def test_state_change_by_mqtt(this_device: test_device):
|
||||
def state_data(key):
|
||||
return (True, False)
|
||||
|
||||
def mqtt_data(key):
|
||||
return state_data(key)
|
||||
|
||||
# test state changes
|
||||
tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device,
|
||||
mqtt_data, state_data, None, MQTT_SIGNAL_TIME)
|
||||
|
||||
|
||||
def test_specific_get_functions(this_device: test_device):
|
||||
assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0)
|
||||
assert this_device.output_1 == this_device.get(this_device.KEY_OUTPUT_1)
|
||||
assert this_device.output_2 == this_device.get(this_device.KEY_OUTPUT_2)
|
||||
assert this_device.output_3 == this_device.get(this_device.KEY_OUTPUT_3)
|
250
__test__/devices/test_shelly.py
Normal file
250
__test__/devices/test_shelly.py
Normal file
@ -0,0 +1,250 @@
|
||||
from module import mqtt_test_client, init_state, state_change_by_mqtt
|
||||
from devices import shelly as test_device
|
||||
from devices import warning
|
||||
from mqtt import mqtt_client
|
||||
import pytest
|
||||
import time
|
||||
|
||||
DUT_CLIENT_ID = "__%s__" % __name__
|
||||
TOPIC = "__test__/%s" % __name__
|
||||
#
|
||||
MQTT_SIGNAL_TIME = 0.2
|
||||
|
||||
|
||||
ALL_STATE_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "temperature", "overtemperature"]
|
||||
BOOL_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def this_device():
|
||||
mc = mqtt_client(DUT_CLIENT_ID, 'localhost')
|
||||
return test_device(mc, TOPIC)
|
||||
|
||||
|
||||
def test_initial_states(this_device: test_device):
|
||||
# test all initial values
|
||||
init_state(ALL_STATE_KEYS, this_device)
|
||||
|
||||
|
||||
def test_state_change_by_mqtt(this_device: test_device):
|
||||
def state_data(key):
|
||||
if key in BOOL_KEYS:
|
||||
return (True, False)
|
||||
elif key == "temperature":
|
||||
return (85.3, 20.1)
|
||||
else:
|
||||
raise IndexError("No return value defined for key %s" % key)
|
||||
|
||||
def mqtt_data(key):
|
||||
if key in ["relay/0", "relay/1"]:
|
||||
return ('on', 'off')
|
||||
elif key in ["input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"]:
|
||||
return (1, 0)
|
||||
else:
|
||||
return state_data(key)
|
||||
|
||||
def warning_condition(state_topic, value):
|
||||
return state_topic == "overtemperature" and value == 1
|
||||
|
||||
# test state changes
|
||||
tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device,
|
||||
mqtt_data, state_data, warning_condition, MQTT_SIGNAL_TIME)
|
||||
|
||||
# test warning
|
||||
w: warning = this_device.get(this_device.KEY_WARNING)
|
||||
assert w.get(w.KEY_ID) == TOPIC
|
||||
assert w.get(w.KEY_TYPE) == w.TYPE_OVERTEMPERATURE
|
||||
wt = time.mktime(w.get(w.KEY_TM))
|
||||
wt_min = tm_warning
|
||||
wt_max = tm_warning + 2
|
||||
assert wt >= wt_min and wt <= wt_max
|
||||
|
||||
|
||||
def test_specific_get_functions(this_device: test_device):
|
||||
assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0)
|
||||
assert this_device.output_1 == this_device.get(this_device.KEY_OUTPUT_1)
|
||||
assert this_device.input_0 == this_device.get(this_device.KEY_INPUT_0)
|
||||
assert this_device.input_1 == this_device.get(this_device.KEY_INPUT_1)
|
||||
assert this_device.longpush_0 == this_device.get(this_device.KEY_LONGPUSH_0)
|
||||
assert this_device.longpush_1 == this_device.get(this_device.KEY_LONGPUSH_1)
|
||||
assert this_device.temperature == this_device.get(this_device.KEY_TEMPERATURE)
|
||||
|
||||
|
||||
def test_send_command(this_device: test_device):
|
||||
this_device.set_output_0(True)
|
||||
this_device.set_output_0(False)
|
||||
|
||||
|
||||
'''
|
||||
class shelly(base):
|
||||
""" Communication (MQTT)
|
||||
|
||||
shelly
|
||||
+- relay
|
||||
| +- 0 ["on" / "off"] <- status
|
||||
| | +- command ["on"/ "off"] <- command
|
||||
| | +- energy [numeric] <- status
|
||||
| +- 1 ["on" / "off"] <- status
|
||||
| +- command ["on"/ "off"] <- command
|
||||
| +- energy [numeric] <- status
|
||||
+- input
|
||||
| +- 0 [0 / 1] <- status
|
||||
| +- 1 [0 / 1] <- status
|
||||
+- input_event
|
||||
| +- 0 <- status
|
||||
| +- 1 <- status
|
||||
+- logpush
|
||||
| +- 0 [0 / 1] <- status
|
||||
| +- 1 [0 / 1] <- status
|
||||
+- temperature [numeric] °C <- status
|
||||
+- temperature_f [numeric] F <- status
|
||||
+- overtemperature [0 / 1] <- status
|
||||
+- id <- status
|
||||
+- model <- status
|
||||
+- mac <- status
|
||||
+- ip <- status
|
||||
+- new_fw <- status
|
||||
+- fw_ver <- status
|
||||
"""
|
||||
KEY_OUTPUT_0 = "relay/0"
|
||||
KEY_OUTPUT_1 = "relay/1"
|
||||
KEY_INPUT_0 = "input/0"
|
||||
KEY_INPUT_1 = "input/1"
|
||||
KEY_LONGPUSH_0 = "longpush/0"
|
||||
KEY_LONGPUSH_1 = "longpush/1"
|
||||
KEY_TEMPERATURE = "temperature"
|
||||
KEY_OVERTEMPERATURE = "overtemperature"
|
||||
KEY_ID = "id"
|
||||
KEY_MODEL = "model"
|
||||
KEY_MAC = "mac"
|
||||
KEY_IP = "ip"
|
||||
KEY_NEW_FIRMWARE = "new_fw"
|
||||
KEY_FIRMWARE_VERSION = "fw_ver"
|
||||
#
|
||||
TX_TOPIC = "command"
|
||||
TX_TYPE = base.TX_VALUE
|
||||
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1]
|
||||
#
|
||||
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE,
|
||||
KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION]
|
||||
RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1']
|
||||
RX_IGNORE_KEYS = ['temperature_f']
|
||||
RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE]
|
||||
|
||||
def __init__(self, mqtt_client, topic):
|
||||
super().__init__(mqtt_client, topic)
|
||||
#
|
||||
self.output_key_delayed = None
|
||||
self.delayed_flash_task = task.delayed(0.3, self.flash_task)
|
||||
self.delayed_off_task = task.delayed(0.3, self.off_task)
|
||||
#
|
||||
self.add_callback(self.KEY_OVERTEMPERATURE, True, self.__warning__, True)
|
||||
#
|
||||
self.all_off_requested = False
|
||||
|
||||
def flash_task(self, *args):
|
||||
if self.flash_active:
|
||||
self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed))
|
||||
self.output_key_delayed = None
|
||||
if self.all_off_requested:
|
||||
self.delayed_off_task.run()
|
||||
|
||||
def off_task(self, *args):
|
||||
self.all_off()
|
||||
|
||||
@property
|
||||
def flash_active(self):
|
||||
return self.output_key_delayed is not None
|
||||
|
||||
#
|
||||
# WARNING CALL
|
||||
#
|
||||
def __warning__(self, client, key, data):
|
||||
w = warning(self.topic, warning.TYPE_OVERTEMPERATURE, "Temperature to high (%.1f°C)", self.get(self.KEY_TEMPERATURE) or math.nan)
|
||||
self.logger.warning(w)
|
||||
self.set(self.KEY_WARNING, w)
|
||||
|
||||
#
|
||||
# RX
|
||||
#
|
||||
@property
|
||||
def output_0(self):
|
||||
"""rv: [True, False]"""
|
||||
return self.get(self.KEY_OUTPUT_0)
|
||||
|
||||
@property
|
||||
def output_1(self):
|
||||
"""rv: [True, False]"""
|
||||
return self.get(self.KEY_OUTPUT_1)
|
||||
|
||||
@property
|
||||
def input_0(self):
|
||||
"""rv: [True, False]"""
|
||||
return self.get(self.KEY_INPUT_0)
|
||||
|
||||
@property
|
||||
def input_1(self):
|
||||
"""rv: [True, False]"""
|
||||
return self.get(self.KEY_INPUT_1)
|
||||
|
||||
@property
|
||||
def longpush_0(self):
|
||||
"""rv: [True, False]"""
|
||||
return self.get(self.KEY_LONGPUSH_0)
|
||||
|
||||
@property
|
||||
def longpush_1(self):
|
||||
"""rv: [True, False]"""
|
||||
return self.get(self.KEY_LONGPUSH_1)
|
||||
|
||||
@property
|
||||
def temperature(self):
|
||||
"""rv: numeric value"""
|
||||
return self.get(self.KEY_TEMPERATURE)
|
||||
|
||||
#
|
||||
# TX
|
||||
#
|
||||
def set_output_0(self, state):
|
||||
"""state: [True, False]"""
|
||||
self.send_command(self.KEY_OUTPUT_0, state)
|
||||
|
||||
def set_output_0_mcb(self, device, key, data):
|
||||
self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data))
|
||||
self.set_output_0(data)
|
||||
|
||||
def toggle_output_0_mcb(self, device, key, data):
|
||||
self.logger.info("Toggeling output 0")
|
||||
self.set_output_0(not self.output_0)
|
||||
|
||||
def set_output_1(self, state):
|
||||
"""state: [True, False]"""
|
||||
self.send_command(self.KEY_OUTPUT_1, state)
|
||||
|
||||
def set_output_1_mcb(self, device, key, data):
|
||||
self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data))
|
||||
self.set_output_1(data)
|
||||
|
||||
def toggle_output_1_mcb(self, device, key, data):
|
||||
self.logger.info("Toggeling output 1")
|
||||
self.set_output_1(not self.output_1)
|
||||
|
||||
def flash_0_mcb(self, device, key, data):
|
||||
self.output_key_delayed = self.KEY_OUTPUT_0
|
||||
self.toggle_output_0_mcb(device, key, data)
|
||||
self.delayed_flash_task.run()
|
||||
|
||||
def flash_1_mcb(self, device, key, data):
|
||||
self.output_key_delayed = self.KEY_OUTPUT_1
|
||||
self.toggle_output_1_mcb(device, key, data)
|
||||
self.delayed_flash_task.run()
|
||||
|
||||
def all_off(self):
|
||||
if self.flash_active:
|
||||
self.all_off_requested = True
|
||||
else:
|
||||
if self.output_0:
|
||||
self.set_output_0(False)
|
||||
if self.output_1:
|
||||
self.set_output_1(False)
|
||||
'''
|
55
__test__/devices/test_silvercrest_motion_sensor.py
Normal file
55
__test__/devices/test_silvercrest_motion_sensor.py
Normal file
@ -0,0 +1,55 @@
|
||||
from module import mqtt_test_client, init_state, state_change_by_mqtt
|
||||
from devices import silvercrest_motion_sensor as test_device
|
||||
from devices import warning
|
||||
from mqtt import mqtt_client
|
||||
import pytest
|
||||
import time
|
||||
|
||||
DUT_CLIENT_ID = "__%s__" % __name__
|
||||
TOPIC = "__test__/%s" % __name__
|
||||
#
|
||||
MQTT_SIGNAL_TIME = 0.2
|
||||
|
||||
|
||||
ALL_STATE_KEYS = ["battery", "battery_low", "linkquality", "occupancy", "tamper", "voltage"]
|
||||
BOOL_KEYS = ["battery_low", "occupancy", "tamper"]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def this_device():
|
||||
mc = mqtt_client(DUT_CLIENT_ID, 'localhost')
|
||||
return test_device(mc, TOPIC)
|
||||
|
||||
|
||||
def test_initial_states(this_device: test_device):
|
||||
# test all initial values
|
||||
init_state(ALL_STATE_KEYS, this_device)
|
||||
|
||||
|
||||
def test_state_change_by_mqtt(this_device: test_device):
|
||||
def state_data(key):
|
||||
if key in BOOL_KEYS:
|
||||
return (True, False)
|
||||
elif key == "battery":
|
||||
return (2, 87)
|
||||
elif key == "linkquality":
|
||||
return (1, 217)
|
||||
elif key == "voltage":
|
||||
return (1.17, 2.53)
|
||||
else:
|
||||
raise IndexError("No return value defined for key %s" % key)
|
||||
|
||||
def mqtt_data(key):
|
||||
return state_data(key)
|
||||
|
||||
def warning_condition(state_topic, value):
|
||||
return state_topic == "battery_low" and value is True
|
||||
|
||||
# test state changes
|
||||
tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device,
|
||||
mqtt_data, state_data, None, MQTT_SIGNAL_TIME)
|
||||
|
||||
|
||||
def test_specific_get_functions(this_device: test_device):
|
||||
assert this_device.linkquality == this_device.get(this_device.KEY_LINKQUALITY)
|
||||
assert this_device.battery == this_device.get(this_device.KEY_BATTERY)
|
49
__test__/devices/test_silvercrest_powerplug.py
Normal file
49
__test__/devices/test_silvercrest_powerplug.py
Normal file
@ -0,0 +1,49 @@
|
||||
from module import mqtt_test_client, init_state, state_change_by_mqtt
|
||||
from devices import silvercrest_powerplug as test_device
|
||||
from devices import warning
|
||||
from mqtt import mqtt_client
|
||||
import pytest
|
||||
import time
|
||||
|
||||
DUT_CLIENT_ID = "__%s__" % __name__
|
||||
TOPIC = "__test__/%s" % __name__
|
||||
#
|
||||
MQTT_SIGNAL_TIME = 0.2
|
||||
|
||||
|
||||
ALL_STATE_KEYS = ["state"]
|
||||
BOOL_KEYS = ["state"]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def this_device():
|
||||
mc = mqtt_client(DUT_CLIENT_ID, 'localhost')
|
||||
return test_device(mc, TOPIC)
|
||||
|
||||
|
||||
def test_initial_states(this_device: test_device):
|
||||
# test all initial values
|
||||
init_state(ALL_STATE_KEYS, this_device)
|
||||
|
||||
|
||||
def test_state_change_by_mqtt(this_device: test_device):
|
||||
def state_data(key):
|
||||
if key in BOOL_KEYS:
|
||||
return (True, False)
|
||||
else:
|
||||
raise IndexError("No return value defined for key %s" % key)
|
||||
|
||||
def mqtt_data(key):
|
||||
if key in BOOL_KEYS:
|
||||
return ('ON', 'OFF')
|
||||
else:
|
||||
return state_data(key)
|
||||
|
||||
# test state changes
|
||||
tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device,
|
||||
mqtt_data, state_data, None, MQTT_SIGNAL_TIME)
|
||||
|
||||
|
||||
def test_specific_get_functions(this_device: test_device):
|
||||
assert this_device.linkquality == this_device.get(this_device.KEY_LINKQUALITY)
|
||||
assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0)
|
40
__test__/function/modules/test_heating_function.py
Normal file
40
__test__/function/modules/test_heating_function.py
Normal file
@ -0,0 +1,40 @@
|
||||
from function.modules import heating_function as test_class
|
||||
|
||||
"""
|
||||
config.DEFAULT_TEMPERATURE[heating_valve.topic],
|
||||
db_data = get_radiator_data(heating_valve.topic)
|
||||
**{
|
||||
test_class.KEY_USER_TEMPERATURE_SETPOINT: db_data[2],
|
||||
test_class.KEY_TEMPERATURE_SETPOINT: db_data[3],
|
||||
test_class.KEY_AWAY_MODE: db_data[0],
|
||||
test_class.KEY_SUMMER_MODE: db_data[1],
|
||||
})
|
||||
"""
|
||||
|
||||
|
||||
def test_initial_states():
|
||||
|
||||
class heating_valve(object):
|
||||
KEY_HEATING_SETPOINT = 'hsp'
|
||||
KEY_TEMPERATURE = 'temp'
|
||||
|
||||
def set_heating_setpoint(self, value):
|
||||
pass
|
||||
|
||||
def add_callback(self, key, value, callback):
|
||||
pass
|
||||
#
|
||||
#
|
||||
#
|
||||
tc = test_class(
|
||||
heating_valve(),
|
||||
21, **{
|
||||
test_class.KEY_USER_TEMPERATURE_SETPOINT: 22,
|
||||
test_class.KEY_TEMPERATURE_SETPOINT: 17,
|
||||
test_class.KEY_AWAY_MODE: True,
|
||||
test_class.KEY_SUMMER_MODE: False,
|
||||
})
|
||||
assert tc.get(test_class.KEY_USER_TEMPERATURE_SETPOINT) == 22
|
||||
assert tc.get(test_class.KEY_TEMPERATURE_SETPOINT) == 17
|
||||
assert tc.get(test_class.KEY_AWAY_MODE) == True
|
||||
assert tc.get(test_class.KEY_SUMMER_MODE) == False
|
2
__test__/requirements.txt
Normal file
2
__test__/requirements.txt
Normal file
@ -0,0 +1,2 @@
|
||||
pytest
|
||||
pytest-cov
|
0
conftest.py
Normal file
0
conftest.py
Normal file
Loading…
x
Reference in New Issue
Block a user