Compare commits

..

No commits in common. "master" and "new_devdi_concept" have entirely different histories.

34 changed files with 40010 additions and 38830 deletions

3
.gitmodules vendored
View File

@ -13,6 +13,3 @@
[submodule "devdi"]
path = devdi
url = https://git.mount-mockery.de/smarthome/smart_devdi.git
[submodule "smart_devices"]
path = smart_devices
url = https://git.mount-mockery.de/smarthome/smart_devices.git

View File

@ -8,5 +8,5 @@ COV3_CMD=venv/bin/coverage
coverage:
$(COV3_CMD) erase
$(COV3_CMD) run -a --branch --source=devdi,smart_devices,devices,function smart_brain.py
$(COV3_CMD) run -a --branch --source=devdi,devices,function smart_brain.py
$(COV3_CMD) xml -o ../smart_brain_test/testresults/coverage.xml

View File

@ -0,0 +1,35 @@
import json
from mqtt import mqtt_client
import time
TEST_CLIENT_ID = "__test_device_tester__"
mqtt_test_client = mqtt_client(TEST_CLIENT_ID, "localhost")
def init_state(all_state_keys, device):
for state_topic in all_state_keys:
assert device.get(state_topic, 0) == None
def state_change_by_mqtt(all_state_keys, num_states, mqtt_test_client, base_topic, device, mqtt_data, state_data, warning_condition, mqtt_signal_time):
tm_warning = None
for i in range(num_states):
for state_topic in all_state_keys:
if device.TX_TYPE == device.TX_VALUE:
data = json.dumps(mqtt_data(state_topic)[i])
mqtt_test_client.send(base_topic + '/' + state_topic, data)
elif device.TX_TYPE == device.TX_DICT:
mqtt_test_client.send(base_topic, json.dumps({state_topic: mqtt_data(state_topic)[i]}))
else:
raise TypeError("Unknown TX_TYPE for device.")
if callable(warning_condition):
if warning_condition(state_topic, mqtt_data(state_topic)[i]):
tm_warning = int(time.time())
time.sleep(mqtt_signal_time)
for state_topic in all_state_keys:
assert device.get(state_topic) == state_data(state_topic)[i]
return tm_warning

View File

@ -0,0 +1,45 @@
from module import mqtt_test_client, init_state, state_change_by_mqtt
from devices import my_powerplug as test_device
from devices.base import warning
from mqtt import mqtt_client
import pytest
import time
DUT_CLIENT_ID = "__%s__" % __name__
TOPIC = "__test__/%s" % __name__
#
MQTT_SIGNAL_TIME = 0.2
ALL_STATE_KEYS = ["output/1", "output/2", "output/3", "output/4", ]
BOOL_KEYS = ALL_STATE_KEYS
@pytest.fixture
def this_device():
mc = mqtt_client(DUT_CLIENT_ID, 'localhost')
return test_device(mc, TOPIC)
def test_initial_states(this_device: test_device):
# test all initial values
init_state(ALL_STATE_KEYS, this_device)
def test_state_change_by_mqtt(this_device: test_device):
def state_data(key):
return (True, False)
def mqtt_data(key):
return state_data(key)
# test state changes
tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device,
mqtt_data, state_data, None, MQTT_SIGNAL_TIME)
def test_specific_get_functions(this_device: test_device):
assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0)
assert this_device.output_1 == this_device.get(this_device.KEY_OUTPUT_1)
assert this_device.output_2 == this_device.get(this_device.KEY_OUTPUT_2)
assert this_device.output_3 == this_device.get(this_device.KEY_OUTPUT_3)

View File

@ -0,0 +1,250 @@
from module import mqtt_test_client, init_state, state_change_by_mqtt
from devices import shelly_sw1 as test_device
from devices.base import warning
from mqtt import mqtt_client
import pytest
import time
DUT_CLIENT_ID = "__%s__" % __name__
TOPIC = "__test__/%s" % __name__
#
MQTT_SIGNAL_TIME = 0.2
ALL_STATE_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "temperature", "overtemperature"]
BOOL_KEYS = ["relay/0", "relay/1", "input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"]
@pytest.fixture
def this_device():
mc = mqtt_client(DUT_CLIENT_ID, 'localhost')
return test_device(mc, TOPIC)
def test_initial_states(this_device: test_device):
# test all initial values
init_state(ALL_STATE_KEYS, this_device)
def test_state_change_by_mqtt(this_device: test_device):
def state_data(key):
if key in BOOL_KEYS:
return (True, False)
elif key == "temperature":
return (85.3, 20.1)
else:
raise IndexError("No return value defined for key %s" % key)
def mqtt_data(key):
if key in ["relay/0", "relay/1"]:
return ('on', 'off')
elif key in ["input/0", "input/1", "longpush/0", "longpush/1", "overtemperature"]:
return (1, 0)
else:
return state_data(key)
def warning_condition(state_topic, value):
return state_topic == "overtemperature" and value == 1
# test state changes
tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device,
mqtt_data, state_data, warning_condition, MQTT_SIGNAL_TIME)
# test warning
w: warning = this_device.get(this_device.KEY_WARNING)
assert w.get(w.KEY_ID) == TOPIC
assert w.get(w.KEY_TYPE) == w.TYPE_OVERTEMPERATURE
wt = time.mktime(w.get(w.KEY_TM))
wt_min = tm_warning
wt_max = tm_warning + 2
assert wt >= wt_min and wt <= wt_max
def test_specific_get_functions(this_device: test_device):
assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0)
assert this_device.output_1 == this_device.get(this_device.KEY_OUTPUT_1)
assert this_device.input_0 == this_device.get(this_device.KEY_INPUT_0)
assert this_device.input_1 == this_device.get(this_device.KEY_INPUT_1)
assert this_device.longpush_0 == this_device.get(this_device.KEY_LONGPUSH_0)
assert this_device.longpush_1 == this_device.get(this_device.KEY_LONGPUSH_1)
assert this_device.temperature == this_device.get(this_device.KEY_TEMPERATURE)
def test_send_command(this_device: test_device):
this_device.set_output_0(True)
this_device.set_output_0(False)
'''
class shelly(base):
""" Communication (MQTT)
shelly
+- relay
| +- 0 ["on" / "off"] <- status
| | +- command ["on"/ "off"] <- command
| | +- energy [numeric] <- status
| +- 1 ["on" / "off"] <- status
| +- command ["on"/ "off"] <- command
| +- energy [numeric] <- status
+- input
| +- 0 [0 / 1] <- status
| +- 1 [0 / 1] <- status
+- input_event
| +- 0 <- status
| +- 1 <- status
+- logpush
| +- 0 [0 / 1] <- status
| +- 1 [0 / 1] <- status
+- temperature [numeric] °C <- status
+- temperature_f [numeric] F <- status
+- overtemperature [0 / 1] <- status
+- id <- status
+- model <- status
+- mac <- status
+- ip <- status
+- new_fw <- status
+- fw_ver <- status
"""
KEY_OUTPUT_0 = "relay/0"
KEY_OUTPUT_1 = "relay/1"
KEY_INPUT_0 = "input/0"
KEY_INPUT_1 = "input/1"
KEY_LONGPUSH_0 = "longpush/0"
KEY_LONGPUSH_1 = "longpush/1"
KEY_TEMPERATURE = "temperature"
KEY_OVERTEMPERATURE = "overtemperature"
KEY_ID = "id"
KEY_MODEL = "model"
KEY_MAC = "mac"
KEY_IP = "ip"
KEY_NEW_FIRMWARE = "new_fw"
KEY_FIRMWARE_VERSION = "fw_ver"
#
TX_TOPIC = "command"
TX_TYPE = base.TX_VALUE
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1]
#
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE,
KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION]
RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1']
RX_IGNORE_KEYS = ['temperature_f']
RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
self.output_key_delayed = None
self.delayed_flash_task = task.delayed(0.3, self.flash_task)
self.delayed_off_task = task.delayed(0.3, self.off_task)
#
self.add_callback(self.KEY_OVERTEMPERATURE, True, self.__warning__, True)
#
self.all_off_requested = False
def flash_task(self, *args):
if self.flash_active:
self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed))
self.output_key_delayed = None
if self.all_off_requested:
self.delayed_off_task.run()
def off_task(self, *args):
self.all_off()
@property
def flash_active(self):
return self.output_key_delayed is not None
#
# WARNING CALL
#
def __warning__(self, client, key, data):
w = warning(self.topic, warning.TYPE_OVERTEMPERATURE, "Temperature to high (%.1f°C)", self.get(self.KEY_TEMPERATURE) or math.nan)
self.logger.warning(w)
self.set(self.KEY_WARNING, w)
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def output_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_1)
@property
def input_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_INPUT_0)
@property
def input_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_INPUT_1)
@property
def longpush_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_LONGPUSH_0)
@property
def longpush_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_LONGPUSH_1)
@property
def temperature(self):
"""rv: numeric value"""
return self.get(self.KEY_TEMPERATURE)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_0, state)
def set_output_0_mcb(self, device, key, data):
self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data))
self.set_output_0(data)
def toggle_output_0_mcb(self, device, key, data):
self.logger.info("Toggeling output 0")
self.set_output_0(not self.output_0)
def set_output_1(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_1, state)
def set_output_1_mcb(self, device, key, data):
self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data))
self.set_output_1(data)
def toggle_output_1_mcb(self, device, key, data):
self.logger.info("Toggeling output 1")
self.set_output_1(not self.output_1)
def flash_0_mcb(self, device, key, data):
self.output_key_delayed = self.KEY_OUTPUT_0
self.toggle_output_0_mcb(device, key, data)
self.delayed_flash_task.run()
def flash_1_mcb(self, device, key, data):
self.output_key_delayed = self.KEY_OUTPUT_1
self.toggle_output_1_mcb(device, key, data)
self.delayed_flash_task.run()
def all_off(self):
if self.flash_active:
self.all_off_requested = True
else:
if self.output_0:
self.set_output_0(False)
if self.output_1:
self.set_output_1(False)
'''

View File

@ -0,0 +1,55 @@
from module import mqtt_test_client, init_state, state_change_by_mqtt
from devices import silvercrest_motion_sensor as test_device
from devices.base import warning
from mqtt import mqtt_client
import pytest
import time
DUT_CLIENT_ID = "__%s__" % __name__
TOPIC = "__test__/%s" % __name__
#
MQTT_SIGNAL_TIME = 0.2
ALL_STATE_KEYS = ["battery", "battery_low", "linkquality", "occupancy", "tamper", "voltage"]
BOOL_KEYS = ["battery_low", "occupancy", "tamper"]
@pytest.fixture
def this_device():
mc = mqtt_client(DUT_CLIENT_ID, 'localhost')
return test_device(mc, TOPIC)
def test_initial_states(this_device: test_device):
# test all initial values
init_state(ALL_STATE_KEYS, this_device)
def test_state_change_by_mqtt(this_device: test_device):
def state_data(key):
if key in BOOL_KEYS:
return (True, False)
elif key == "battery":
return (2, 87)
elif key == "linkquality":
return (1, 217)
elif key == "voltage":
return (1.17, 2.53)
else:
raise IndexError("No return value defined for key %s" % key)
def mqtt_data(key):
return state_data(key)
def warning_condition(state_topic, value):
return state_topic == "battery_low" and value is True
# test state changes
tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device,
mqtt_data, state_data, None, MQTT_SIGNAL_TIME)
def test_specific_get_functions(this_device: test_device):
assert this_device.linkquality == this_device.get(this_device.KEY_LINKQUALITY)
assert this_device.battery == this_device.get(this_device.KEY_BATTERY)

View File

@ -0,0 +1,49 @@
from module import mqtt_test_client, init_state, state_change_by_mqtt
from devices import silvercrest_powerplug as test_device
from devices.base import warning
from mqtt import mqtt_client
import pytest
import time
DUT_CLIENT_ID = "__%s__" % __name__
TOPIC = "__test__/%s" % __name__
#
MQTT_SIGNAL_TIME = 0.2
ALL_STATE_KEYS = ["state"]
BOOL_KEYS = ["state"]
@pytest.fixture
def this_device():
mc = mqtt_client(DUT_CLIENT_ID, 'localhost')
return test_device(mc, TOPIC)
def test_initial_states(this_device: test_device):
# test all initial values
init_state(ALL_STATE_KEYS, this_device)
def test_state_change_by_mqtt(this_device: test_device):
def state_data(key):
if key in BOOL_KEYS:
return (True, False)
else:
raise IndexError("No return value defined for key %s" % key)
def mqtt_data(key):
if key in BOOL_KEYS:
return ('ON', 'OFF')
else:
return state_data(key)
# test state changes
tm_warning = state_change_by_mqtt(ALL_STATE_KEYS, 2, mqtt_test_client, TOPIC, this_device,
mqtt_data, state_data, None, MQTT_SIGNAL_TIME)
def test_specific_get_functions(this_device: test_device):
assert this_device.linkquality == this_device.get(this_device.KEY_LINKQUALITY)
assert this_device.output_0 == this_device.get(this_device.KEY_OUTPUT_0)

View File

@ -0,0 +1,40 @@
from function.modules import heating_function as test_class
"""
config.DEFAULT_TEMPERATURE[heating_valve.topic],
db_data = get_radiator_data(heating_valve.topic)
**{
test_class.KEY_USER_TEMPERATURE_SETPOINT: db_data[2],
test_class.KEY_TEMPERATURE_SETPOINT: db_data[3],
test_class.KEY_AWAY_MODE: db_data[0],
test_class.KEY_SUMMER_MODE: db_data[1],
})
"""
def test_initial_states():
class heating_valve(object):
KEY_HEATING_SETPOINT = 'hsp'
KEY_TEMPERATURE = 'temp'
def set_heating_setpoint(self, value):
pass
def add_callback(self, key, value, callback):
pass
#
#
#
tc = test_class(
heating_valve(),
21, **{
test_class.KEY_USER_TEMPERATURE_SETPOINT: 22,
test_class.KEY_TEMPERATURE_SETPOINT: 17,
test_class.KEY_AWAY_MODE: True,
test_class.KEY_SUMMER_MODE: False,
})
assert tc.get(test_class.KEY_USER_TEMPERATURE_SETPOINT) == 22
assert tc.get(test_class.KEY_TEMPERATURE_SETPOINT) == 17
assert tc.get(test_class.KEY_AWAY_MODE) == True
assert tc.get(test_class.KEY_SUMMER_MODE) == False

View File

@ -0,0 +1,2 @@
pytest
pytest-cov

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

Binary file not shown.

146
base.py Normal file
View File

@ -0,0 +1,146 @@
import json
import logging
import task
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
class common_base(dict):
DEFAULT_VALUES = {}
DEVICENAME = "DEFAULT"
def __init__(self, default_values=None):
super().__init__(default_values or self.DEFAULT_VALUES)
#
self.__callback_list__ = []
self.ch_names = {}
self.logger = logging.getLogger(ROOT_LOGGER_NAME).getChild("devices")
def set_ch_name(self, key, name):
self.ch_names[key] = name
def get_name(self, key, default=None):
return self.DEVICENAME + "." + self.ch_names.get(key, key) + " (" + ".".join(self.topic.split('/')[1:]) + ")"
def add_callback(self, key, data, callback, on_change_only=False, init_now=False):
"""
key: key or None for all keys
data: data or None for all data
"""
cb_tup = (key, data, callback, on_change_only)
if cb_tup not in self.__callback_list__:
self.__callback_list__.append(cb_tup)
if init_now and self.get(key) is not None:
callback(self, key, self[key])
def set(self, key, data, block_callback=[]):
if key in self.keys():
value_changed = self[key] != data
self[key] = data
for cb_key, cb_data, callback, on_change_only in self.__callback_list__:
if cb_key is None or key == cb_key: # key fits to callback definition
if cb_data is None or cb_data == self[key]: # data fits to callback definition
if value_changed or not on_change_only: # change status fits to callback definition
if not callback in block_callback: # block given callbacks
callback(self, key, self[key])
else:
self.logger.warning("Unexpected key %s", key)
class mqtt_base(common_base):
def __init__(self, mqtt_client, topic, default_values=None):
super().__init__(default_values)
#
self.mqtt_client = mqtt_client
self.topic = topic
for entry in self.topic.split('/'):
self.logger = self.logger.getChild(entry)
class videv_base(mqtt_base):
KEY_INFO = '__info__'
#
SET_TOPIC = "set"
def __init__(self, mqtt_client, topic, default_values=None):
super().__init__(mqtt_client, topic, default_values=default_values)
self.__display_dict__ = {}
self.__control_dict__ = {}
self.__periodic__ = task.periodic(300, self.send_all)
self.__periodic__.run()
def send_all(self, rt):
try:
for key in self:
if self[key] is not None:
self.__tx__(key, self[key])
except RuntimeError:
self.logger.warning("Runtimeerror while sending cyclic videv information. This may happen on startup.")
def add_display(self, my_key, ext_device, ext_key, on_change_only=True):
"""
listen to data changes of ext_device and update videv information
"""
if my_key not in self.keys():
self[my_key] = None
if ext_device.__class__.__name__ == "group":
# store information to identify callback from ext_device
self.__display_dict__[(id(ext_device[0]), ext_key)] = my_key
# register a callback to listen for data from external device
ext_device[0].add_callback(ext_key, None, self.__rx_ext_device_data__, on_change_only, init_now=True)
else:
# store information to identify callback from ext_device
self.__display_dict__[(id(ext_device), ext_key)] = my_key
# register a callback to listen for data from external device
ext_device.add_callback(ext_key, None, self.__rx_ext_device_data__, on_change_only, init_now=True)
# send initial display data to videv interface
data = ext_device.get(ext_key)
if data is not None:
self.__tx__(my_key, data)
def __rx_ext_device_data__(self, ext_device, ext_key, data):
my_key = self.__display_dict__[(id(ext_device), ext_key)]
self.set(my_key, data)
self.__tx__(my_key, data)
def __tx__(self, key, data):
if type(data) not in (str, ):
data = json.dumps(data)
self.mqtt_client.send('/'.join([self.topic, key]), data)
def add_control(self, my_key, ext_device, ext_key, on_change_only=True):
"""
listen to videv information and pass data to ext_device
"""
if my_key not in self.keys():
self[my_key] = None
# store information to identify callback from videv
self.__control_dict__[my_key] = (ext_device, ext_key, on_change_only)
# add callback for videv changes
self.mqtt_client.add_callback('/'.join([self.topic, my_key, self.SET_TOPIC]), self.__rx_videv_data__)
def __rx_videv_data__(self, client, userdata, message):
my_key = message.topic.split('/')[-2]
try:
data = json.loads(message.payload)
except json.decoder.JSONDecodeError:
data = message.payload
ext_device, ext_key, on_change_only = self.__control_dict__[my_key]
if my_key in self.keys():
if data != self[my_key] or not on_change_only:
ext_device.send_command(ext_key, data)
else:
self.logger.info("Ignoring rx message with topic %s", message.topic)
def add_routing(self, my_key, ext_device, ext_key, on_change_only_disp=True, on_change_only_videv=True):
"""
listen to data changes of ext_device and update videv information
and
listen to videv information and pass data to ext_device
"""
# add display
self.add_display(my_key, ext_device, ext_key, on_change_only_disp)
self.add_control(my_key, ext_device, ext_key, on_change_only_videv)

View File

@ -1,5 +1,6 @@
import geo
import logging
from topics import *
DEBUG = False
#

0
conftest.py Normal file
View File

2
devdi

@ -1 +1 @@
Subproject commit 5a5679b0baa9ba978f75d8581cb0dc7c13158e34
Subproject commit f0b994ef9c3a0526c4b699c491ccb478bc3847d0

114
device_development.py Normal file
View File

@ -0,0 +1,114 @@
import config
import mqtt
import readline
import sys
import report
import logging
import devices
import json
if __name__ == "__main__":
report.stdoutLoggingConfigure([[config.APP_NAME, logging.INFO], ], fmt=report.SHORT_FMT)
mc = mqtt.mqtt_client(
host=config.MQTT_SERVER,
port=config.MQTT_PORT,
username=config.MQTT_USER,
password=config.MQTT_PASSWORD,
name=config.APP_NAME + '_devicetest'
)
#
devicedict = {}
for device in [
# devices.shelly_pro3(mc, "shellies/gfw/pro3"),
# devices.brennenstuhl_heatingvalve(mc, "zigbee_raspi/heatvlv"),
# devices.silvercrest_button(mc, "zigbee_raspi/button"),
devices.hue_sw_br_ct(mc, "zigbee_ffe/kitchen/main_light_1"),
]:
devicedict[device.topic.replace("/", "_")] = device
#
COMMANDS = ['quit', 'help', 'action']
for key in devicedict:
device = devicedict[key]
for cmd in device.__class__.__dict__:
obj = getattr(device, cmd)
if callable(obj) and not cmd.startswith("_"):
COMMANDS.append(key + "." + cmd)
#
def reduced_list(text):
"""
Create reduced completation list
"""
reduced_list = {}
for cmd in COMMANDS:
next_dot = cmd[len(text):].find('.')
if next_dot >= 0:
reduced_list[cmd[:len(text) + next_dot + 1]] = None
else:
reduced_list[cmd] = None
return reduced_list.keys()
def completer(text, state):
"""
Our custom completer function
"""
options = [x for x in reduced_list(text) if x.startswith(text)]
return options[state]
def complete(text, state):
for cmd in COMMANDS:
if cmd.startswith(text):
if not state:
hit = ""
index = 0
sub_list = cmd.split('.')
while len(text) >= len(hit):
hit += sub_list[index] + '.'
return hit # cmd
else:
state -= 1
if len(sys.argv) == 1:
readline.parse_and_bind("tab: complete")
readline.set_completer(completer)
print("\nEnter command: ")
while True:
userfeedback = input('')
command = userfeedback.split(' ')[0]
if userfeedback == 'quit':
break
elif userfeedback == 'help':
print("Help is not yet implemented!")
else:
try:
key, command = userfeedback.split(".", 1)
except ValueError:
print("Unknown device.")
else:
device = devicedict[key]
try:
command, params = command.split(" ", 1)
except ValueError:
params = None
try:
obj = getattr(device, command)
except AttributeError:
print("Unknown command.")
else:
if params is not None:
params = params.replace("True", "true")
params = params.replace("False", "false")
params = params.replace("None", "null")
params = params.replace(",", " ")
params = params.split(" ")
params = " ".join([p for p in params if p])
try:
params = json.loads("[" + params.replace(" ", ", ") + "]")
except json.decoder.JSONDecodeError:
print("You need to give python like parameters (e.g. 'test' for a string containing test).")
params = None
try:
obj(*params)
except TypeError:
print("Give the correct parameters to execute.")

View File

@ -1,34 +1,58 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
devices (DEVICES)
=================
**Author:**
* Dirk Alders <sudo-dirk@mount-mockery.de>
**Description:**
This Module supports smarthome devices
**Submodules:**
* :mod:`shelly`
* :mod:`silvercrest_powerplug`
**Unittest:**
See also the :download:`unittest <devices/_testresults_/unittest.pdf>` documentation.
**Module Documentation:**
"""
import logging
from smart_devices.shelly import shelly as shelly_sw1
from smart_devices.shelly import shelly_rpc as shelly_pro3
from smart_devices.hue import hue_light as hue_sw_br_ct
from smart_devices.tradfri import tradfri_light as tradfri_sw
from smart_devices.tradfri import tradfri_light as tradfri_sw_br
from smart_devices.tradfri import tradfri_light as tradfri_sw_br_ct
from smart_devices.tradfri import tradfri_button as tradfri_button
from smart_devices.tradfri import tradfri_light as livarno_sw_br_ct
from smart_devices.brennenstuhl import brennenstuhl_heatingvalve
from smart_devices.silvercrest import silvercrest_button
from smart_devices.silvercrest import silvercrest_powerplug
from smart_devices.silvercrest import silvercrest_motion_sensor
from smart_devices.mydevices import powerplug as my_powerplug
from smart_devices.mydevices import audio_status
from smart_devices.mydevices import remote
from devices.shelly import shelly as shelly_sw1
from devices.shelly import shelly_rpc as shelly_pro3
from devices.hue import hue_light as hue_sw_br_ct
from devices.tradfri import tradfri_light as tradfri_sw
from devices.tradfri import tradfri_light as tradfri_sw_br
from devices.tradfri import tradfri_light as tradfri_sw_br_ct
from devices.tradfri import tradfri_button as tradfri_button
from devices.tradfri import tradfri_light as livarno_sw_br_ct
from devices.brennenstuhl import brennenstuhl_heatingvalve
from devices.silvercrest import silvercrest_button
from devices.silvercrest import silvercrest_powerplug
from devices.silvercrest import silvercrest_motion_sensor
from devices.mydevices import powerplug as my_powerplug
from devices.mydevices import audio_status
from devices.mydevices import remote
from function.videv import videv_switching as videv_sw
from function.videv import videv_switch_brightness as videv_sw_br
from function.videv import videv_switch_brightness_color_temp as videv_sw_br_ct
from function.videv import videv_switching_timer as videv_sw_tm
from function.videv import videv_switching_motion as videv_sw_mo
from function.videv import videv_heating as videv_hea
from function.videv import videv_pure_switch
from function.videv import videv_multistate
from function.videv import videv_audio_player
from smart_devices.videv import videv_switching as videv_sw
from smart_devices.videv import videv_switch_brightness as videv_sw_br
from smart_devices.videv import videv_switch_brightness_color_temp as videv_sw_br_ct
from smart_devices.videv import videv_switching_timer as videv_sw_tm
from smart_devices.videv import videv_switching_motion as videv_sw_mo
from smart_devices.videv import videv_heating as videv_hea
from smart_devices.videv import videv_pure_switch
from smart_devices.videv import videv_multistate
from smart_devices.videv import videv_audio_player
from smart_devices.videv import videv_all_off
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:

235
devices/base.py Normal file
View File

@ -0,0 +1,235 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
from base import mqtt_base
from base import videv_base
import json
def is_json(data):
try:
json.loads(data)
except json.decoder.JSONDecodeError:
return False
else:
return True
class base(mqtt_base):
TX_TOPIC = "set"
TX_VALUE = 0
TX_DICT = 1
TX_TYPE = -1
TX_FILTER_DATA_KEYS = []
#
RX_KEYS = []
RX_IGNORE_TOPICS = []
RX_IGNORE_KEYS = []
RX_FILTER_DATA_KEYS = []
#
CFG_DATA = {}
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, default_values=dict.fromkeys(self.RX_KEYS))
# data storage
self.__cfg_by_mid__ = None
# initialisations
mqtt_client.add_callback(topic=self.topic, callback=self.receive_callback)
mqtt_client.add_callback(topic=self.topic+"/#", callback=self.receive_callback)
#
self.add_callback(None, None, self.__state_logging__, on_change_only=True)
def __cfg_callback__(self, key, value, mid):
if self.CFG_DATA.get(key) != value and self.__cfg_by_mid__ != mid and mid is not None:
self.__cfg_by_mid__ = mid
self.logger.warning("Differing configuration identified: Sending default configuration to defice: %s", repr(self.CFG_DATA))
if self.TX_TYPE == self.TX_DICT:
self.mqtt_client.send(self.topic + '/' + self.TX_TOPIC, json.dumps(self.CFG_DATA))
else:
for key in self.CFG_DATA:
self.send_command(key, self.CFG_DATA.get(key))
def set(self, key, data, mid=None, block_callback=[]):
if key in self.CFG_DATA:
self.__cfg_callback__(key, data, mid)
if key in self.RX_IGNORE_KEYS:
pass # ignore these keys
elif key in self.RX_KEYS:
return super().set(key, data, block_callback)
else:
self.logger.warning("Unexpected key %s", key)
def receive_callback(self, client, userdata, message):
if message.topic != self.topic + '/' + videv_base.KEY_INFO:
content_key = message.topic[len(self.topic) + 1:]
if content_key not in self.RX_IGNORE_TOPICS and (not message.topic.endswith(self.TX_TOPIC) or len(self.TX_TOPIC) == 0):
self.logger.debug("Unpacking content_key \"%s\" from message.", content_key)
if is_json(message.payload):
data = json.loads(message.payload)
if type(data) is dict:
for key in data:
self.set(key, self.__device_to_instance_filter__(key, data[key]), message.mid)
else:
self.set(content_key, self.__device_to_instance_filter__(content_key, data), message.mid)
# String
else:
self.set(content_key, self.__device_to_instance_filter__(content_key, message.payload.decode('utf-8')), message.mid)
else:
self.logger.debug("Ignoring topic %s", content_key)
def __device_to_instance_filter__(self, key, data):
if key in self.RX_FILTER_DATA_KEYS:
if data in [1, 'on', 'ON']:
return True
elif data in [0, 'off', 'OFF']:
return False
return data
def __instance_to_device_filter__(self, key, data):
if key in self.TX_FILTER_DATA_KEYS:
if data is True:
return "on"
elif data is False:
return "off"
return data
def send_command(self, key, data):
data = self.__instance_to_device_filter__(key, data)
if self.TX_TOPIC is not None:
if self.TX_TYPE < 0:
self.logger.error("Unknown tx type. Set TX_TYPE of class to a known value")
else:
self.logger.debug("Sending data for %s - %s", key, str(data))
if self.TX_TYPE == self.TX_DICT:
try:
self.mqtt_client.send('/'.join([self.topic, self.TX_TOPIC]), json.dumps({key: data}))
except TypeError:
print(self.topic)
print(key.__dict__)
print(key)
print(data)
raise TypeError
else:
if type(data) not in [str, bytes]:
data = json.dumps(data)
self.mqtt_client.send('/'.join([self.topic, key, self.TX_TOPIC] if len(self.TX_TOPIC) > 0 else [self.topic, key]), data)
else:
self.logger.error("Unknown tx toptic. Set TX_TOPIC of class to a known value")
class base_rpc(mqtt_base):
SRC_RESPONSE = "/response"
SRC_NULL = "/null"
#
EVENTS_TOPIC = "/events/rpc"
TX_TOPIC = "/rpc"
RESPONSE_TOPIC = SRC_RESPONSE + "/rpc"
NULL_TOPIC = SRC_NULL + "/rpc"
#
RPC_ID_GET_STATUS = 1
RPC_ID_SET = 1734
#
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, default_values=dict.fromkeys(self.RX_KEYS))
# data storage
self.__cfg_by_mid__ = None
# initialisations
mqtt_client.add_callback(topic=self.topic, callback=self.receive_callback)
mqtt_client.add_callback(topic=self.topic+"/#", callback=self.receive_callback)
#
self.add_callback(None, None, self.__state_logging__, on_change_only=False)
#
self.rpc_get_status()
def receive_callback(self, client, userdata, message):
data = json.loads(message.payload)
#
if message.topic == self.topic + self.EVENTS_TOPIC:
self.events(data)
elif message.topic == self.topic + self.RESPONSE_TOPIC:
self.response(data)
elif message.topic == self.topic + self.NULL_TOPIC or message.topic == self.topic + self.TX_TOPIC or message.topic == self.topic + "/online":
pass # Ignore response
else:
self.logger.warning("Unexpected message received: %s::%s", message.topic, json.dumps(data, sort_keys=True, indent=4))
def events(self, data):
for rx_key in data["params"]:
if rx_key == "events":
for evt in data["params"]["events"]:
key = evt["component"]
event = evt["event"]
if key in self.RX_KEYS:
if event == "btn_down":
self.set(key, True)
elif event == "btn_up":
self.set(key, False)
else:
key = key + ":" + event
if key in self.RX_KEYS:
self.set(key, True)
else:
self.logger.warning("Unexpected event with data=%s", json.dumps(data, sort_keys=True, indent=4))
elif rx_key in self.RX_KEYS:
state = data["params"][rx_key].get("output")
if state is not None:
self.set(rx_key, state)
def response(self, data):
try:
rpc_id = data.get("id")
except AttributeError:
rpc_id = None
try:
rpc_method = data.get("method")
except AttributeError:
rpc_method = None
if rpc_id == self.RPC_ID_GET_STATUS:
#
# Shelly.GetStatus
#
for rx_key in data.get("result", []):
if rx_key in self.RX_KEYS:
key_data = data["result"][rx_key]
state = key_data.get("output", key_data.get("state"))
if state is not None:
self.set(rx_key, state)
else:
self.logger.warning("Unexpected response with data=%s", json.dumps(data, sort_keys=True, indent=4))
def rpc_tx(self, **kwargs):
if not "id" in kwargs:
raise AttributeError("'id' is missing in keyword arguments")
self.mqtt_client.send(self.topic + self.TX_TOPIC, json.dumps(kwargs))
def rpc_get_status(self):
self.rpc_tx(
id=self.RPC_ID_GET_STATUS,
src=self.topic + self.SRC_RESPONSE,
method="Shelly.GetStatus"
)
def rpc_switch_set(self, key, state: bool):
self.rpc_tx(
id=self.RPC_ID_SET,
src=self.topic + self.SRC_NULL,
method="Switch.Set",
params={"id": int(key[-1]), "on": state}
)
class base_output(base):
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
self.__all_off_enabled__ = True
def disable_all_off(self, state=True):
self.__all_off_enabled__ = not state
def all_off(self):
if self.__all_off_enabled__:
try:
self.__all_off__()
except (AttributeError, TypeError) as e:
self.logger.warning("Method all_off was used, but __all_off__ method wasn't callable: %s", repr(e))

118
devices/brennenstuhl.py Normal file
View File

@ -0,0 +1,118 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
from devices.base import base
import task
import time
class brennenstuhl_heatingvalve(base):
""" Communication (MQTT)
brennenstuhl_heatingvalve {
| "away_mode": ["ON", "OFF"]
| "battery": [0...100] %
| "child_lock": ["LOCK", "UNLOCK"]
| "current_heating_setpoint": [5...30] °C
| "linkquality": [0...255] lqi
| "local_temperature": [numeric] °C
| "preset": ["manual", ...]
| "system_mode": ["heat", ...]
| "valve_detection": ["ON", "OFF"]
| "window_detection": ["ON", "OFF"]
| }
+- set {
"away_mode": ["ON", "OFF", "TOGGLE"]
"child_lock": ["LOCK", "UNLOCK"]
"current_heating_setpoint": [5...30] °C
"preset": ["manual", ...]
"system_mode": ["heat", ...]
"valve_detection": ["ON", "OFF", "TOGGLE"]
"window_detection": ["ON", "OFF", "TOGGLE"]
}
"""
KEY_LINKQUALITY = "linkquality"
KEY_BATTERY = "battery"
KEY_HEATING_SETPOINT = "current_heating_setpoint"
KEY_TEMPERATURE = "local_temperature"
#
KEY_AWAY_MODE = "away_mode"
KEY_CHILD_LOCK = "child_lock"
KEY_PRESET = "preset"
KEY_SYSTEM_MODE = "system_mode"
KEY_VALVE_DETECTION = "valve_detection"
KEY_WINDOW_DETECTION = "window_detection"
#
RETRY_CYCLE_TIME = 2.5
MAX_TX_RETRIES = 20
RETRY_TIMEOUT = RETRY_CYCLE_TIME * MAX_TX_RETRIES
#
TX_TYPE = base.TX_DICT
#
RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_HEATING_SETPOINT, KEY_TEMPERATURE]
RX_IGNORE_KEYS = [KEY_AWAY_MODE, KEY_CHILD_LOCK, KEY_PRESET, KEY_SYSTEM_MODE, KEY_VALVE_DETECTION, KEY_WINDOW_DETECTION]
#
CFG_DATA = {
KEY_WINDOW_DETECTION: "ON",
KEY_VALVE_DETECTION: "ON",
KEY_SYSTEM_MODE: "heat",
KEY_PRESET: "manual"
}
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
self.add_callback(self.KEY_HEATING_SETPOINT, None, self.__valave_temp_rx__)
self.__tx_temperature__ = None
self.__rx_temperature__ = None
self.__tx_timestamp__ = 0
#
self.task = task.periodic(self.RETRY_CYCLE_TIME, self.__task__)
self.task.run()
def __state_logging__(self, inst, key, data):
if key in [self.KEY_HEATING_SETPOINT, self.KEY_CHILD_LOCK, self.KEY_WINDOW_DETECTION, self.KEY_VALVE_DETECTION]:
self.logger.info("State change of '%s' to '%s'", key, repr(data))
def send_command(self, key, data):
if key == self.KEY_HEATING_SETPOINT:
self.__tx_temperature__ = data
self.__tx_timestamp__ = time.time()
base.send_command(self, key, data)
def __valave_temp_rx__(self, inst, key, data):
if key == self.KEY_HEATING_SETPOINT:
self.__rx_temperature__ = data
def __task__(self, rt):
if self.__tx_temperature__ is not None and self.__tx_timestamp__ is not None: # Already send a setpoint
if self.__tx_temperature__ != self.__rx_temperature__: # Setpoint and valve feedback are unequal
if time.time() - self.__tx_timestamp__ < self.RETRY_TIMEOUT: # Timeout condition allows resend of setpoint
self.logger.warning("Setpoint not yet acknoledged by device. Sending setpoint again")
self.set_heating_setpoint(self.__tx_temperature__)
return
else:
self.__tx_timestamp__ = None # Disable resend logic, if setpoint and valve setpoint are equal
#
# RX
#
@property
def linkqulity(self):
return self.get(self.KEY_LINKQUALITY)
@property
def heating_setpoint(self):
return self.get(self.KEY_HEATING_SETPOINT)
@property
def temperature(self):
return self.get(self.KEY_TEMPERATURE)
#
# TX
#
def set_heating_setpoint(self, setpoint):
self.send_command(self.KEY_HEATING_SETPOINT, setpoint)
def set_heating_setpoint_mcb(self, device, key, data):
self.set_heating_setpoint(data)

118
devices/hue.py Normal file
View File

@ -0,0 +1,118 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
from devices.base import base, base_output
import logging
class hue_light(base_output):
""" Communication (MQTT)
hue_light {
| "state": ["ON" / "OFF" / "TOGGLE"]
| "linkquality": [0...255] lqi
| "brightness": [0...254]
| "color_mode": ["color_temp"]
| "color_temp": ["coolest", "cool", "neutral", "warm", "warmest", 250...454]
| }
+- get {
| "state": ""
| }
+- set {
"state": ["ON" / "OFF"]
"brightness": [0...256]
"color_temp": [250...454]
"transition": [0...] seconds
"brightness_move": [-X...0...X] X/s
"brightness_step": [-X...0...X]
"color_temp_move": [-X...0...X] X/s
"color_temp_step": [-X...0...X]
}
"""
KEY_LINKQUALITY = "linkquality"
KEY_OUTPUT_0 = "state"
KEY_BRIGHTNESS = "brightness"
KEY_COLOR_TEMP = "color_temp"
#
TX_TYPE = base.TX_DICT
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP]
STATE_KEYS = TX_FILTER_DATA_KEYS
#
RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP]
RX_IGNORE_KEYS = ['update', 'color_mode']
RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP]
def __state_logging__(self, inst, key, data):
if key in [self.KEY_OUTPUT_0, self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP]:
self.logger.info("State change of '%s' to '%s'", key, repr(data))
def __device_to_instance_filter__(self, key, data):
if key == self.KEY_BRIGHTNESS:
return int(round((data - 1) * 100 / 253, 0))
elif key == self.KEY_COLOR_TEMP:
return int(round((data - 250) * 10 / 204, 0))
return super().__device_to_instance_filter__(key, data)
def __instance_to_device_filter__(self, key, data):
if key == self.KEY_BRIGHTNESS:
return int(round(data * 253 / 100 + 1, 0))
elif key == self.KEY_COLOR_TEMP:
return int(round(data * 204 / 10 + 250, 0))
return super().__instance_to_device_filter__(key, data)
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0, False)
@property
def linkquality(self):
"""rv: numeric value"""
return self.get(self.KEY_LINKQUALITY, 0)
@property
def brightness(self):
"""rv: numeric value [0%, ..., 100%]"""
return self.get(self.KEY_BRIGHTNESS, 0)
@property
def color_temp(self):
"""rv: numeric value [0, ..., 10]"""
return self.get(self.KEY_COLOR_TEMP, 0)
#
# TX
#
def request_data(self, device=None, key=None, data=None):
self.mqtt_client.send(self.topic + "/set", '{"hue_power_on_behavior": "recover"}')
def set_output_0(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_0, state)
def set_output_0_mcb(self, device, key, data):
self.set_output_0(data)
def toggle_output_0_mcb(self, device, key, data):
self.set_output_0(not self.output_0)
def set_brightness(self, brightness):
"""brightness: [0, ..., 100]"""
self.send_command(self.KEY_BRIGHTNESS, brightness)
def set_brightness_mcb(self, device, key, data):
self.set_brightness(data)
def set_color_temp(self, color_temp):
"""color_temp: [0, ..., 10]"""
self.send_command(self.KEY_COLOR_TEMP, color_temp)
def set_color_temp_mcb(self, device, key, data):
self.set_color_temp(data)
def __all_off__(self):
if self.output_0:
self.set_output_0(False)

262
devices/mydevices.py Normal file
View File

@ -0,0 +1,262 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
from devices.base import base, base_output
import logging
class powerplug(base_output):
""" Communication (MQTT)
my_powerplug
+- output
+- 1 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- 2 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- 3 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- 4 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- all
+- set [True, False, "toggle"] <- command
"""
KEY_OUTPUT_0 = "output/1"
KEY_OUTPUT_1 = "output/2"
KEY_OUTPUT_2 = "output/3"
KEY_OUTPUT_3 = "output/4"
KEY_OUTPUT_ALL = "output/all"
KEY_OUTPUT_LIST = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3]
#
TX_TYPE = base.TX_VALUE
#
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3]
def __state_logging__(self, inst, key, data):
if key in self.KEY_OUTPUT_LIST:
self.logger.info("State change of '%s' to '%s'", key, repr(data))
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def output_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_1)
@property
def output_2(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_2)
@property
def output_3(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_3)
#
# TX
#
def set_output(self, key, state):
if key in self.KEY_OUTPUT_LIST:
self.send_command(key, state)
else:
logging.error("Unknown key to set the output!")
def set_output_0(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_0, state)
def set_output_0_mcb(self, device, key, data):
self.set_output_0(data)
def toggle_output_0_mcb(self, device, key, data):
self.set_output_0(not self.output_0)
def set_output_1(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_1, state)
def set_output_1_mcb(self, device, key, data):
self.set_output_1(data)
def toggle_output_1_mcb(self, device, key, data):
self.set_output_1(not self.output_1)
def set_output_2(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_2, state)
def set_output_2_mcb(self, device, key, data):
self.set_output_2(data)
def toggle_output_2_mcb(self, device, key, data):
self.set_output_2(not self.output_2)
def set_output_3(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_3, state)
def set_output_3_mcb(self, device, key, data):
self.set_output_3(data)
def toggle_output_3_mcb(self, device, key, data):
self.set_output_3(not self.output_3)
def set_output_all(self, state):
"""state: [True, False, 'toggle']"""
self.send_command(self.KEY_OUTPUT_ALL, state)
def set_output_all_mcb(self, device, key, data):
self.set_output_all(data)
def __all_off__(self):
self.set_output_all(False)
class remote(base):
""" Communication (MQTT)
remote (RAS5) <- command
+- CD [dc]
+- LINE1 [dc]
+- LINE2 [dc]
+- LINE3 [dc]
+- MUTE [dc]
+- POWER [dc]
+- VOLDOWN [dc]
+- VOLUP [dc]
+- PHONO [dc]
+- DOCK [dc]
remote (EUR642100) <- command
+- OPEN_CLOSE [dc]
+- VOLDOWN [dc]
+- VOLUP [dc]
+- ONE [dc]
+- TWO [dc]
+- THREE [dc]
+- FOUR [dc]
+- FIVE [dc]
+- SIX [dc]
+- SEVEN [dc]
+- EIGHT [dc]
+- NINE [dc]
+- ZERO [dc]
+- TEN [dc]
+- TEN_PLUS [dc]
+- PROGRAM [dc]
+- CLEAR [dc]
+- RECALL [dc]
+- TIME_MODE [dc]
+- A_B_REPEAT [dc]
+- REPEAT [dc]
+- RANDOM [dc]
+- AUTO_CUE [dc]
+- TAPE_LENGTH [dc]
+- SIDE_A_B [dc]
+- TIME_FADE [dc]
+- PEAK_SEARCH [dc]
+- SEARCH_BACK [dc]
+- SEARCH_FOR [dc]
+- TRACK_NEXT [dc]
+- TRACK_PREV [dc]
+- STOP [dc]
+- PAUSE [dc]
+- PLAY [dc]
"""
KEY_CD = "CD"
KEY_LINE1 = "LINE1"
KEY_LINE2 = "LINE2"
KEY_LINE3 = "LINE3"
KEY_PHONO = "PHONO"
KEY_MUTE = "MUTE"
KEY_POWER = "POWER"
KEY_VOLDOWN = "VOLDOWN"
KEY_VOLUP = "VOLUP"
#
TX_TOPIC = ''
TX_TYPE = base.TX_VALUE
#
RX_IGNORE_TOPICS = [KEY_CD, KEY_LINE1, KEY_LINE2, KEY_LINE3, KEY_PHONO, KEY_MUTE, KEY_POWER, KEY_VOLUP, KEY_VOLDOWN]
def __state_logging__(self, inst, key, data):
pass # This is just a TX device using self.set_*
def set_cd(self, device=None, key=None, data=None):
self.logger.info("Changing amplifier source to CD")
self.send_command(self.KEY_CD, None)
def set_line1(self, device=None, key=None, data=None):
self.logger.info("Changing amplifier source to LINE1")
self.send_command(self.KEY_LINE1, None)
def set_line2(self, device=None, key=None, data=None):
self.logger.info("Changing amplifier source to LINE2")
self.send_command(self.KEY_LINE2, None)
def set_line3(self, device=None, key=None, data=None):
self.logger.info("Changing amplifier source to LINE3")
self.send_command(self.KEY_LINE3, None)
def set_phono(self, device=None, key=None, data=None):
self.logger.info("Changing amplifier source to PHONO")
self.send_command(self.KEY_PHONO, None)
def set_mute(self, device=None, key=None, data=None):
self.logger.info("Muting / Unmuting amplifier")
self.send_command(self.KEY_MUTE, None)
def set_power(self, device=None, key=None, data=None):
self.logger.info("Power on/off amplifier")
self.send_command(self.KEY_POWER, None)
def set_volume_up(self, data=False):
"""data: [True, False]"""
self.logger.info("Increasing amplifier volume")
self.send_command(self.KEY_VOLUP, data)
def set_volume_down(self, data=False):
"""data: [True, False]"""
self.logger.info("Decreasing amplifier volume")
self.send_command(self.KEY_VOLDOWN, data)
def default_inc(self, device=None, key=None, data=None):
self.set_volume_up(True)
def default_dec(self, device=None, key=None, data=None):
self.set_volume_down(True)
def default_stop(self, device=None, key=None, data=None):
self.set_volume_up(False)
class audio_status(base):
""" Communication (MQTT)
audio_status
+- state [True, False] <- status
+- title [text] <- status
"""
KEY_STATE = "state"
KEY_TITLE = "title"
#
TX_TYPE = base.TX_VALUE
#
RX_KEYS = [KEY_STATE, KEY_TITLE]
def __state_logging__(self, inst, key, data):
if key in [self.KEY_STATE, self.KEY_TITLE]:
self.logger.info("State change of '%s' to '%s'", key, repr(data))
def set_state(self, num, data):
"""data: [True, False]"""
self.send_command(self.KEY_STATE + "/" + str(num), data)
def set_state_mcb(self, device, key, data):
self.set_state(data)

238
devices/shelly.py Normal file
View File

@ -0,0 +1,238 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
from devices.base import base_output
from devices.base import base_rpc
import task
class shelly(base_output):
""" Communication (MQTT)
shelly
+- relay
| +- 0 ["on" / "off"] <- status
| | +- command ["on"/ "off"] <- command
| | +- energy [numeric] <- status
| +- 1 ["on" / "off"] <- status
| +- command ["on"/ "off"] <- command
| +- energy [numeric] <- status
+- input
| +- 0 [0 / 1] <- status
| +- 1 [0 / 1] <- status
+- input_event
| +- 0 <- status
| +- 1 <- status
+- logpush
| +- 0 [0 / 1] <- status
| +- 1 [0 / 1] <- status
+- temperature [numeric] °C <- status
+- temperature_f [numeric] F <- status
+- overtemperature [0 / 1] <- status
+- id <- status
+- model <- status
+- mac <- status
+- ip <- status
+- new_fw <- status
+- fw_ver <- status
"""
KEY_OUTPUT_0 = "relay/0"
KEY_OUTPUT_1 = "relay/1"
KEY_INPUT_0 = "input/0"
KEY_INPUT_1 = "input/1"
KEY_LONGPUSH_0 = "longpush/0"
KEY_LONGPUSH_1 = "longpush/1"
KEY_TEMPERATURE = "temperature"
KEY_OVERTEMPERATURE = "overtemperature"
KEY_ID = "id"
KEY_MODEL = "model"
KEY_MAC = "mac"
KEY_IP = "ip"
KEY_NEW_FIRMWARE = "new_fw"
KEY_FIRMWARE_VERSION = "fw_ver"
#
TX_TOPIC = "command"
TX_TYPE = base_output.TX_VALUE
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1]
#
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE,
KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION]
RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1']
RX_IGNORE_KEYS = ['temperature_f']
RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
self.output_key_delayed = None
self.delayed_flash_task = task.delayed(0.75, self.flash_task)
self.delayed_off_task = task.delayed(0.75, self.off_task)
#
self.all_off_requested = False
def __state_logging__(self, inst, key, data):
if key in [self.KEY_OUTPUT_0, self.KEY_OUTPUT_1]:
self.logger.info("State change of '%s' to '%s'", key, repr(data))
elif key in [self.KEY_INPUT_0, self.KEY_INPUT_1, self.KEY_LONGPUSH_0, self.KEY_LONGPUSH_1]:
self.logger.info("Input action '%s' with '%s'", key, repr(data))
def flash_task(self, *args):
if self.flash_active:
self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed))
self.output_key_delayed = None
if self.all_off_requested:
self.delayed_off_task.run()
def off_task(self, *args):
self.all_off()
@property
def flash_active(self):
return self.output_key_delayed is not None
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def output_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_1)
@property
def input_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_INPUT_0)
@property
def input_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_INPUT_1)
@property
def longpush_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_LONGPUSH_0)
@property
def longpush_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_LONGPUSH_1)
@property
def temperature(self):
"""rv: numeric value"""
return self.get(self.KEY_TEMPERATURE)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_0, state)
def set_output_0_mcb(self, device, key, data):
self.set_output_0(data)
def toggle_output_0_mcb(self, device, key, data):
self.set_output_0(not self.output_0)
def set_output_1(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_1, state)
def set_output_1_mcb(self, device, key, data):
self.set_output_1(data)
def toggle_output_1_mcb(self, device, key, data):
self.set_output_1(not self.output_1)
def flash_0_mcb(self, device, key, data):
self.output_key_delayed = self.KEY_OUTPUT_0
self.toggle_output_0_mcb(device, key, data)
self.delayed_flash_task.run()
def flash_1_mcb(self, device, key, data):
self.output_key_delayed = self.KEY_OUTPUT_1
self.toggle_output_1_mcb(device, key, data)
self.delayed_flash_task.run()
def __all_off__(self):
if self.flash_active:
self.all_off_requested = True
else:
if self.output_0:
self.set_output_0(False)
if self.output_1:
self.set_output_1(False)
class shelly_rpc(base_rpc):
KEY_OUTPUT_0 = "switch:0"
KEY_OUTPUT_1 = "switch:1"
KEY_OUTPUT_2 = "switch:2"
KEY_INPUT_0 = "input:0"
KEY_INPUT_1 = "input:1"
KEY_INPUT_2 = "input:2"
KEY_LONGPUSH_0 = "input:0:long_push"
KEY_LONGPUSH_1 = "input:1:long_push"
KEY_LONGPUSH_2 = "input:2:long_push"
KEY_SINGLEPUSH_0 = "input:0:single_push"
KEY_SINGLEPUSH_1 = "input:1:single_push"
KEY_SINGLEPUSH_2 = "input:2:single_push"
KEY_DOUBLEPUSH_0 = "input:0:double_push"
KEY_DOUBLEPUSH_1 = "input:1:double_push"
KEY_DOUBLEPUSH_2 = "input:2:double_push"
KEY_TRIPLEPUSH_0 = "input:0:triple_push"
KEY_TRIPLEPUSH_1 = "input:1:triple_push"
KEY_TRIPLEPUSH_2 = "input:2:triple_push"
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_INPUT_0, KEY_INPUT_1, KEY_INPUT_2,
KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_LONGPUSH_2, KEY_SINGLEPUSH_0, KEY_SINGLEPUSH_1, KEY_SINGLEPUSH_2,
KEY_DOUBLEPUSH_0, KEY_DOUBLEPUSH_1, KEY_DOUBLEPUSH_2, KEY_TRIPLEPUSH_0, KEY_TRIPLEPUSH_1, KEY_TRIPLEPUSH_2]
def __state_logging__(self, inst, key, data):
if key in [self.KEY_OUTPUT_0, self.KEY_OUTPUT_1, self.KEY_OUTPUT_2]:
self.logger.info("State change of '%s' to '%s'", key, repr(data))
elif key in [self.KEY_INPUT_0, self.KEY_INPUT_1, self.KEY_INPUT_2]:
self.logger.info("Input action '%s' with '%s'", key, repr(data))
elif key in [self.KEY_LONGPUSH_0, self.KEY_LONGPUSH_1, self.KEY_LONGPUSH_2,
self.KEY_SINGLEPUSH_0, self.KEY_SINGLEPUSH_1, self.KEY_SINGLEPUSH_2,
self.KEY_DOUBLEPUSH_0, self.KEY_DOUBLEPUSH_1, self.KEY_DOUBLEPUSH_2,
self.KEY_TRIPLEPUSH_0, self.KEY_TRIPLEPUSH_1, self.KEY_TRIPLEPUSH_2]:
self.logger.info("Input action '%s'", key)
def set_output_0(self, state):
"""state: [True, False]"""
self.rpc_switch_set(self.KEY_OUTPUT_0, state)
def set_output_0_mcb(self, device, key, data):
self.set_output_0(data)
def toggle_output_0_mcb(self, device, key, data):
self.set_output_0(not self.get(self.KEY_OUTPUT_0))
def set_output_1(self, state):
"""state: [True, False]"""
self.rpc_switch_set(self.KEY_OUTPUT_1, state)
def set_output_1_mcb(self, device, key, data):
self.set_output_1(data)
def toggle_output_1_mcb(self, device, key, data):
print(self.get(self.KEY_OUTPUT_1))
self.set_output_1(not self.get(self.KEY_OUTPUT_1))
def set_output_2(self, state):
"""state: [True, False]"""
self.rpc_switch_set(self.KEY_OUTPUT_2, state)
def set_output_2_mcb(self, device, key, data):
self.set_output_2(data)
def toggle_output_2_mcb(self, device, key, data):
self.set_output_2(not self.get(self.KEY_OUTPUT_2))

148
devices/silvercrest.py Normal file
View File

@ -0,0 +1,148 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
from devices.base import base, base_output
import logging
class silvercrest_button(base):
""" Communication (MQTT)
tradfri_button {
"action": ["pressed"]
"battery": [0...100] %
"battery_low": [true | false]
"tamper": [true | false]
"linkquality": [0...255] lqi
"update": []
}
"""
ACTION_PRESSED = "pressed"
#
KEY_LINKQUALITY = "linkquality"
KEY_BATTERY = "battery"
KEY_BATTERY_LOW = "battery_low"
KEY_TAMPER = "tamper"
KEY_ACTION = "action"
#
RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_ACTION, KEY_BATTERY_LOW, KEY_TAMPER]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def __state_logging__(self, inst, key, data):
if key == self.KEY_ACTION:
self.logger.info("Input '%s' with '%s'", key, repr(data))
self[self.KEY_ACTION] = None
elif key in [self.KEY_BATTERY_LOW, self.KEY_TAMPER]:
self.logger.info("Input '%s' with '%s'", key, repr(data))
#
# RX
#
@property
def action(self):
"""rv: action_txt"""
return self.get(self.KEY_ACTION)
class silvercrest_powerplug(base_output):
""" Communication (MQTT)
silvercrest_powerplug {
| "state": ["ON" / "OFF"]
| "linkquality": [0...255] lqi
| }
+- get {
| "state": ""
| }
+- set {
"state": ["ON" / "OFF"]
}
"""
KEY_LINKQUALITY = "linkquality"
KEY_OUTPUT_0 = "state"
#
TX_TYPE = base.TX_DICT
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0]
#
RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0]
RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0]
def __state_logging__(self, inst, key, data):
if key in [self.KEY_OUTPUT_0]:
self.logger.info("State change of '%s' to '%s'", key, repr(data))
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def linkquality(self):
"""rv: numeric value"""
return self.get(self.KEY_LINKQUALITY)
#
# TX
#
def set_output_0(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_0, state)
def set_output_0_mcb(self, device, key, data):
self.set_output_0(data)
def toggle_output_0_mcb(self, device, key, data):
self.set_output_0(not self.output_0)
def __all_off__(self):
if self.output_0:
self.set_output_0(False)
class silvercrest_motion_sensor(base):
""" Communication (MQTT)
silvercrest_motion_sensor {
battery: [0...100] %
battery_low: [True, False]
linkquality: [0...255] lqi
occupancy: [True, False]
tamper: [True, False]
voltage: [0...] mV
}
"""
KEY_BATTERY = "battery"
KEY_BATTERY_LOW = "battery_low"
KEY_LINKQUALITY = "linkquality"
KEY_OCCUPANCY = "occupancy"
KEY_UNMOUNTED = "tamper"
KEY_VOLTAGE = "voltage"
#
TX_TYPE = base.TX_DICT
#
RX_KEYS = [KEY_BATTERY, KEY_BATTERY_LOW, KEY_LINKQUALITY, KEY_OCCUPANCY, KEY_UNMOUNTED, KEY_VOLTAGE]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def __state_logging__(self, inst, key, data):
if key in [self.KEY_OCCUPANCY, self.KEY_UNMOUNTED]:
self.logger.info("State change of '%s' to '%s'", key, repr(data))
#
# RX
#
@property
def linkquality(self):
"""rv: numeric value"""
return self.get(self.KEY_LINKQUALITY)
@property
def battery(self):
"""rv: numeric value"""
return self.get(self.KEY_BATTERY)

192
devices/tradfri.py Normal file
View File

@ -0,0 +1,192 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
from devices.base import base, base_output
import logging
class tradfri_light(base_output):
""" Communication (MQTT)
tradfri_light {
| "state": ["ON" / "OFF" / "TOGGLE"]
| "linkquality": [0...255] lqi
| "brightness": [0...254]
| "color_mode": ["color_temp"]
| "color_temp": ["coolest", "cool", "neutral", "warm", "warmest", 250...454]
| "color_temp_startup": ["coolest", "cool", "neutral", "warm", "warmest", "previous", 250...454]
| "update": []
| }
+- get {
| "state": ""
| }
+- set {
"state": ["ON" / "OFF"]
"brightness": [0...256]
"color_temp": [250...454]
"transition": [0...] seconds
"brightness_move": [-X...0...X] X/s
"brightness_step": [-X...0...X]
"color_temp_move": [-X...0...X] X/s
"color_temp_step": [-X...0...X]
}
"""
KEY_LINKQUALITY = "linkquality"
KEY_OUTPUT_0 = "state"
KEY_BRIGHTNESS = "brightness"
KEY_COLOR_TEMP = "color_temp"
KEY_BRIGHTNESS_FADE = "brightness_move"
#
TX_TYPE = base.TX_DICT
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP, KEY_BRIGHTNESS_FADE]
#
RX_KEYS = [KEY_LINKQUALITY, KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP]
RX_IGNORE_KEYS = ['update', 'color_mode', 'color_temp_startup']
RX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_BRIGHTNESS, KEY_COLOR_TEMP]
def __state_logging__(self, inst, key, data):
if key in [self.KEY_OUTPUT_0, self.KEY_BRIGHTNESS, self.KEY_COLOR_TEMP, self.KEY_BRIGHTNESS_FADE]:
self.logger.info("State change of '%s' to '%s'", key, repr(data))
def __device_to_instance_filter__(self, key, data):
if key == self.KEY_BRIGHTNESS:
return int(round((data - 1) * 100 / 253, 0))
elif key == self.KEY_COLOR_TEMP:
return int(round((data - 250) * 10 / 204, 0))
return super().__device_to_instance_filter__(key, data)
def __instance_to_device_filter__(self, key, data):
if key == self.KEY_BRIGHTNESS:
return int(round(data * 253 / 100 + 1, 0))
elif key == self.KEY_COLOR_TEMP:
return int(round(data * 204 / 10 + 250, 0))
return super().__instance_to_device_filter__(key, data)
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0, False)
@property
def linkquality(self):
"""rv: numeric value"""
return self.get(self.KEY_LINKQUALITY, 0)
@property
def brightness(self):
"""rv: numeric value [0%, ..., 100%]"""
return self.get(self.KEY_BRIGHTNESS, 0)
@property
def color_temp(self):
"""rv: numeric value [0, ..., 10]"""
return self.get(self.KEY_COLOR_TEMP, 0)
#
# TX
#
def request_data(self, device=None, key=None, data=None):
self.mqtt_client.send(self.topic + "/get", '{"%s": ""}' % self.KEY_OUTPUT_0)
def set_output_0(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_0, state)
def set_output_0_mcb(self, device, key, data):
self.set_output_0(data)
def toggle_output_0_mcb(self, device, key, data):
self.set_output_0(not self.output_0)
def set_brightness(self, brightness):
"""brightness: [0, ..., 100]"""
self.send_command(self.KEY_BRIGHTNESS, brightness)
def set_brightness_mcb(self, device, key, data):
self.set_brightness(data)
def default_inc(self, speed=40):
self.send_command(self.KEY_BRIGHTNESS_FADE, speed)
def default_dec(self, speed=-40):
self.default_inc(speed)
def default_stop(self):
self.default_inc(0)
def set_color_temp(self, color_temp):
"""color_temp: [0, ..., 10]"""
self.send_command(self.KEY_COLOR_TEMP, color_temp)
def set_color_temp_mcb(self, device, key, data):
self.set_color_temp(data)
def __all_off__(self):
if self.output_0:
self.set_output_0(False)
class tradfri_button(base):
""" Communication (MQTT)
tradfri_button {
"action": [
"arrow_left_click",
"arrow_left_hold",
"arrow_left_release",
"arrow_right_click",
"arrow_right_hold",
"arrow_right_release",
"brightness_down_click",
"brightness_down_hold",
"brightness_down_release",
"brightness_up_click",
"brightness_up_hold",
"brightness_up_release",
"toggle"
]
"action_duration": [0...] s
"battery": [0...100] %
"linkquality": [0...255] lqi
"update": []
}
"""
ACTION_TOGGLE = "toggle"
ACTION_BRIGHTNESS_UP = "brightness_up_click"
ACTION_BRIGHTNESS_DOWN = "brightness_down_click"
ACTION_RIGHT = "arrow_right_click"
ACTION_LEFT = "arrow_left_click"
ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold"
ACTION_BRIGHTNESS_UP_RELEASE = "brightness_up_release"
ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold"
ACTION_BRIGHTNESS_DOWN_RELEASE = "brightness_down_release"
ACTION_RIGHT_LONG = "arrow_right_hold"
ACTION_RIGHT_RELEASE = "arrow_right_release"
ACTION_LEFT_LONG = "arrow_left_hold"
ACTION_LEFT_RELEASE = "arrow_left_release"
#
KEY_LINKQUALITY = "linkquality"
KEY_BATTERY = "battery"
KEY_ACTION = "action"
KEY_ACTION_DURATION = "action_duration"
#
RX_KEYS = [KEY_LINKQUALITY, KEY_BATTERY, KEY_ACTION]
RX_IGNORE_KEYS = ['update', KEY_ACTION_DURATION]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def __state_logging__(self, inst, key, data):
if key in [self.KEY_ACTION]:
self.logger.info("Input '%s' with '%s'", key, repr(data))
#
# RX
#
@property
def action(self):
"""rv: action_txt"""
return self.get(self.KEY_ACTION)

View File

@ -2,14 +2,15 @@
# -*- coding: utf-8 -*-
#
import config
from devdi import rooms as devdi_rooms
from devdi.topic import STOP_EXECUTION_TOPIC
import devices
from function.garden import garden
from function.stairway import stairway
from function.ground_floor_west import ground_floor_west
from function.first_floor_west import first_floor_west
from function.first_floor_east import first_floor_east
from function.rooms import room_collection
from function.videv import all_off, videv_pure_switch
import json
import logging
import mqtt
@ -21,10 +22,9 @@ except ImportError:
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
class all_functions(room_collection, devdi_rooms.collection):
class all_functions(room_collection):
def __init__(self, mqtt_client: mqtt.mqtt_client):
super().__init__(mqtt_client)
devdi_rooms.collection.__init__(self, mqtt_client)
#
self.run = True
if config.DEBUG:
@ -72,7 +72,7 @@ class all_functions(room_collection, devdi_rooms.collection):
def init_off_functionality(self):
# ALL OFF - Virtual device
self.videv_all_off.connect_room_collection(self)
self.videv_all_off = all_off(self.mqtt_client, config.TOPIC_ALL_OFF_VIDEV, self)
# ALL OFF - Long push stairway
self.stw.stairway.switch_main_light.add_callback(self.stw.stairway.switch_main_light.KEY_LONGPUSH_0,
@ -85,8 +85,7 @@ class all_functions(room_collection, devdi_rooms.collection):
self.ffe.floor.switch_main_light.add_callback(self.ffe.floor.switch_main_light.KEY_LONGPUSH_0, True, self.ffe.all_off)
# FFE ALL OFF - Long push input device
self.ffe.sleep.input_device.add_callback(self.ffe.sleep.input_device.KEY_ACTION,
self.ffe.sleep.input_device.ACTION_RIGHT_LONG, self.ffe.all_off)
self.ffe.sleep.input_device.add_callback(devices.tradfri_button.KEY_ACTION, devices.tradfri_button.ACTION_RIGHT_LONG, self.ffe.all_off)
# FFW ALL OFF - Long push ffw_floor
self.ffw.floor.switch_main_light.add_callback(self.ffw.floor.switch_main_light.KEY_LONGPUSH_0,
@ -94,6 +93,9 @@ class all_functions(room_collection, devdi_rooms.collection):
self.ffw.floor.switch_main_light.add_callback(self.ffw.floor.switch_main_light.KEY_LONGPUSH_0, True, self.ffw.all_off)
def init_sumer_winter_mode(self):
# ALL summer/winter mode
self.videv_summer_mode = videv_pure_switch(self.mqtt_client, config.TOPIC_ALL_SUMMER_WINTER_MODE)
self.videv_summer_mode.add_callback(self.videv_summer_mode.KEY_STATE, None, self.gfw.summer_mode)
self.videv_summer_mode.add_callback(self.videv_summer_mode.KEY_STATE, None, self.ffw.summer_mode)
self.videv_summer_mode.add_callback(self.videv_summer_mode.KEY_STATE, None, self.ffe.summer_mode)

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
from mqtt.smarthome import common_base
from base import common_base
import config
import geo
import task

View File

@ -11,7 +11,8 @@ Targets:
- Method .add_calback(key, data, callback, on_change_only=False) to register videv actualisation on changes
"""
from mqtt.smarthome import common_base
from base import common_base
import config
import devices
from function.helpers import day_state
import logging

View File

@ -13,8 +13,6 @@ logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
class room(object):
ADD_TO_VIDEV_ALL_OFF = None
def __init__(self, mqtt_client):
self.mqtt_client = mqtt_client
@ -22,7 +20,7 @@ class room(object):
logger.info("Switching all off \"%s\"", type(self).__name__)
for name, obj in inspect.getmembers(self):
try:
if obj.__module__.startswith('smart_devices'):
if obj.__module__.startswith('devices'):
obj.all_off()
except AttributeError:
pass # not a module or has no method all_off
@ -35,7 +33,7 @@ class room(object):
class room_collection(object):
ADD_TO_VIDEV_ALL_OFF = None
ALLOWED_CLASSES = ("room", "room_collection")
def __init__(self, mqtt_client):
self.mqtt_client = mqtt_client
@ -51,8 +49,6 @@ class room_collection(object):
sub.all_off()
except AttributeError:
pass # don't mind, if sub has no method all_off
except:
logger.error("Failed to switch off %s (%s)", repr(sub_name), type(sub).__name__)
def summer_mode(self, device=None, key=None, data=None):
logger.info("Changing to %s \"%s\"", "summer mode" if data else "winter_mode", type(self).__name__)
@ -60,7 +56,7 @@ class room_collection(object):
# attribute name is not private
if not sub_name.startswith("__"):
sub = getattr(self, sub_name)
if isinstance(sub, (room, room_collection)):
if sub.__class__.__bases__[0].__name__ in self.ALLOWED_CLASSES:
sub.summer_mode(data)
def all_devices(self, object_to_analyse=None, depth=0):

193
function/videv.py Normal file
View File

@ -0,0 +1,193 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
Virtual Device(s)
Targets:
* MQTT-Interface to control joined devices as one virtual device
* Primary signal routing
* No functionality should be implemented here
"""
from base import videv_base
from function.rooms import room, room_collection
class videv_pure_switch(videv_base):
KEY_STATE = 'state'
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
self[self.KEY_STATE] = False
#
self.mqtt_client.add_callback(self.topic + '/state/set', self.__state__)
def __state__(self, mqtt_client, userdata, message):
self.set(self.KEY_STATE, message.payload == b'true')
self.__tx__(self.KEY_STATE, message.payload == b'true')
class videv_switching(videv_base):
KEY_STATE = 'state'
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def connect_sw_device(self, sw_device, sw_key):
self.add_routing(self.KEY_STATE, sw_device, sw_key)
class videv_switching_timer(videv_switching):
KEY_TIMER = 'timer'
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def connect_tm_device(self, tm_device, tm_key):
self.add_display(self.KEY_TIMER, tm_device, tm_key)
class videv_switching_motion(videv_switching):
KEY_STATE = 'state'
#
KEY_TIMER = 'timer'
KEY_MOTION_SENSOR = 'motion_%d'
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def connect_mo_function(self, mo_function):
self.add_display(self.KEY_TIMER, mo_function, mo_function.KEY_TIMER)
# motion sensor state
for index, motion_sensor in enumerate(mo_function.motion_sensors):
self.add_display(self.KEY_MOTION_SENSOR % index, motion_sensor, motion_sensor.KEY_OCCUPANCY)
class videv_switch_brightness(videv_switching):
KEY_BRIGHTNESS = 'brightness'
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def connect_br_device(self, br_device, br_key):
self.add_routing(self.KEY_BRIGHTNESS, br_device, br_key)
class videv_switch_brightness_color_temp(videv_switch_brightness):
KEY_COLOR_TEMP = 'color_temp'
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def connect_ct_device(self, ct_device, ct_key):
self.add_routing(self.KEY_COLOR_TEMP, ct_device, ct_key)
class videv_heating(videv_base):
KEY_USER_TEMPERATURE_SETPOINT = 'user_temperature_setpoint'
KEY_VALVE_TEMPERATURE_SETPOINT = 'valve_temperature_setpoint'
KEY_AWAY_MODE = 'away_mode'
KEY_SUMMER_MODE = 'summer_mode'
KEY_START_BOOST = 'start_boost'
KEY_SET_DEFAULT_TEMPERATURE = 'set_default_temperature'
KEY_BOOST_TIMER = 'boost_timer'
#
KEY_TEMPERATURE = 'temperature'
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def connect_heating_function(self, heating_function):
#
self.add_routing(self.KEY_USER_TEMPERATURE_SETPOINT, heating_function, heating_function.KEY_USER_TEMPERATURE_SETPOINT)
self.add_routing(self.KEY_AWAY_MODE, heating_function, heating_function.KEY_AWAY_MODE)
self.add_routing(self.KEY_SUMMER_MODE, heating_function, heating_function.KEY_SUMMER_MODE)
#
self.add_control(self.KEY_START_BOOST, heating_function, heating_function.KEY_START_BOOST, False)
self.add_control(self.KEY_SET_DEFAULT_TEMPERATURE, heating_function, heating_function.KEY_SET_DEFAULT_TEMPERATURE, False)
#
self.add_display(self.KEY_VALVE_TEMPERATURE_SETPOINT, heating_function, heating_function.KEY_TEMPERATURE_SETPOINT)
self.add_display(self.KEY_BOOST_TIMER, heating_function, heating_function.KEY_BOOST_TIMER)
self.add_display(self.KEY_TEMPERATURE, heating_function, heating_function.KEY_TEMPERATURE_CURRENT, False)
class videv_multistate(videv_base):
KEY_STATE = 'state_%d'
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
def connect_br_function(self, device, key_for_device, num_states):
self.num_states = num_states
# send default values
for i in range(0, num_states):
self.__tx__(self.KEY_STATE % i, False)
#
device.add_callback(key_for_device, None, self.__index_rx__, True)
def __index_rx__(self, device, key, data):
for i in range(0, self.num_states):
self.__tx__(self.KEY_STATE % i, i == data)
class videv_audio_player(videv_base):
KEY_ACTIVE_PLAYER = 'player_%d'
KEY_TITLE = 'title'
NO_TITLE = '---'
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
self.__device_cnt__ = 0
def connect_audio_device(self, device):
self.add_display(self.KEY_ACTIVE_PLAYER % self.__device_cnt__, device, device.KEY_STATE)
device.add_callback(device.KEY_TITLE, None, self.__title_rx__, True)
self.__device_cnt__ += 1
def __title_rx__(self, device, key, data):
self.__tx__(self.KEY_TITLE, data or self.NO_TITLE)
class all_off(videv_base):
ALLOWED_CLASSES = (room, room_collection, )
def __init__(self, mqtt_client, topic, room_collection):
super().__init__(mqtt_client, topic)
self.__room_collection__ = room_collection
# init __inst_dict__
self.__inst_dict__ = {}
self.__add_instances__("all", self.__room_collection__)
# register mqtt callbacks for all my keys
for key in self.__inst_dict__:
mqtt_client.add_callback(topic + "/" + key, self.all_off)
def __check_inst_capabilities__(self, name, inst):
# fits to specified classes
if isinstance(inst, self.ALLOWED_CLASSES):
try:
# all_off method is callable
return callable(inst.all_off)
except AttributeError:
# all_off method does not exist
return False
return False
def __add_instances__(self, name, inst, level=0):
if self.__check_inst_capabilities__(name, inst):
# add given instance to my __inst_dict__
self.__inst_dict__[name] = inst
# iterate over all attribute names of instance
for sub_name in dir(inst):
# attribute name is not private
if not sub_name.startswith("__"):
sub = getattr(inst, sub_name)
# recurse with this object
if level == 0:
self.__add_instances__(sub_name, sub, level=level+1)
else:
self.__add_instances__(name + "/" + sub_name, sub, level=level+1)
def all_off(self, client, userdata, message):
key = message.topic[len(self.topic) + 1:]
self.__inst_dict__[key].all_off()

2
mqtt

@ -1 +1 @@
Subproject commit c2b32852127bc1a3aca078a2dad3993f46cb81c2
Subproject commit 95dda53a55b40591bbd5200ae5ea8c354610b913

View File

@ -11,7 +11,7 @@ logger = report.default_logging_config()
VERS_MAJOR = 1
VERS_MINOR = 4
VERS_MINOR = 3
VERS_PATCH = 2
INFO_TOPIC = "__info__"

@ -1 +0,0 @@
Subproject commit c75c0cfacd48e9d51817f128539f96487cef2f98

5
topics.py Normal file
View File

@ -0,0 +1,5 @@
#
# TOPICS
#
TOPIC_ALL_OFF_VIDEV = "videv/off"
TOPIC_ALL_SUMMER_WINTER_MODE = "videv/summer_mode"