smart_brain/function/modules.py

318 lines
14 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
2023-01-20 08:03:06 +01:00
"""
Functional Modules
Targets:
* Device like structure to be compatible with videv
- KEY_* as part of the class for all parameters which needs to be accessed from videv
- Method *.set(key, data) to pass data from videv to Module
- Method .add_calback(key, data, callback, on_change_only=False) to register videv actualisation on changes
"""
from base import common_base
import devices
from function.db import get_radiator_data, set_radiator_data
from function.helpers import now, sunset_time, sunrise_time
import logging
import task
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
class brightness_choose_n_action(common_base):
KEY_ACTIVE_DEVICE = 'active_device'
#
DEFAULT_VALUES = {KEY_ACTIVE_DEVICE: None}
2023-01-20 08:03:06 +01:00
def __init__(self, button_tradfri):
super().__init__()
# brightness change
button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_DOWN_LONG, self.brightness_action)
button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_UP_LONG, self.brightness_action)
button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_DOWN_RELEASE, self.brightness_action)
button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_UP_RELEASE, self.brightness_action)
# device change
button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_UP, self.choose_next_device)
button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_DOWN, self.choose_prev_device)
#
2022-12-28 14:52:24 +01:00
self.brightness_device_list = []
self.callback_device_list = []
self.device_states = []
2022-12-28 14:52:24 +01:00
def add(self, brightness_device, callback_device, callback_key):
"""
2022-12-28 14:52:24 +01:00
brightness_device: A device for brightness function needs to have the following methods:
* .default_inc()
* .default_dec()
* .default_stop()
2022-12-28 14:52:24 +01:00
callback_device: A device for installing callback which are executed, when the device is switched on or off. It needs the following method:
* .add_callback(key, data or None, callback, on_changes_only)
"""
if len(self.brightness_device_list) >= len(devices.nodered_gui_leds.KEY_LED_LIST):
raise ValueError("Number of devices is limited by number of leds in devices.nodered_gui_leds.")
2022-12-28 14:52:24 +01:00
self.brightness_device_list.append(brightness_device)
self.callback_device_list.append((callback_device, callback_key))
self.device_states.append(False)
2022-12-28 14:52:24 +01:00
callback_device.add_callback(callback_key, True, self.device_state_action, True)
callback_device.add_callback(callback_key, False, self.device_state_action, True)
def device_state_action(self, device, key, data):
2022-12-28 14:52:24 +01:00
self.device_states[self.callback_device_list.index((device, key))] = data
if data is True:
self.set(self.KEY_ACTIVE_DEVICE, self.callback_device_list.index((device, key)))
else:
if self[self.KEY_ACTIVE_DEVICE] is not None:
if self.callback_device_list[self[self.KEY_ACTIVE_DEVICE]][0] == device:
self.choose_next_device()
def choose_prev_device(self, device=None, key=None, data=None):
if self[self.KEY_ACTIVE_DEVICE] is not None:
start_value = self[self.KEY_ACTIVE_DEVICE]
2022-12-28 14:52:24 +01:00
for i in range(0, len(self.brightness_device_list)):
target_state = (start_value - i - 1) % (len(self.brightness_device_list))
if self.device_states[target_state]:
self.set(self.KEY_ACTIVE_DEVICE, target_state)
return
self.set(self.KEY_ACTIVE_DEVICE, None)
def choose_next_device(self, device=None, key=None, data=None):
if self[self.KEY_ACTIVE_DEVICE] is not None:
start_value = self[self.KEY_ACTIVE_DEVICE]
2022-12-28 14:52:24 +01:00
for i in range(0, len(self.brightness_device_list)):
target_state = (start_value + i + 1) % (len(self.brightness_device_list))
if self.device_states[target_state]:
self.set(self.KEY_ACTIVE_DEVICE, target_state)
return
self.set(self.KEY_ACTIVE_DEVICE, None)
def brightness_action(self, device, key, data):
if self[self.KEY_ACTIVE_DEVICE] is not None:
target = self.brightness_device_list[self[self.KEY_ACTIVE_DEVICE]]
if data == devices.tradfri_button.ACTION_BRIGHTNESS_UP_LONG:
logger.info("Increasing \"%s\" - %s", type(self).__name__, target.topic)
target.default_inc()
elif data == devices.tradfri_button.ACTION_BRIGHTNESS_DOWN_LONG:
logger.info("Decreasing \"%s\" - %s", type(self).__name__, target.topic)
target.default_dec()
elif data in [devices.tradfri_button.ACTION_BRIGHTNESS_UP_RELEASE, devices.tradfri_button.ACTION_BRIGHTNESS_DOWN_RELEASE]:
target.default_stop()
class timer_on_activation(common_base):
KEY_TIMER = 'timer'
#
DEFAULT_VALUES = {
KEY_TIMER: 0
}
def __init__(self, sw_device, sw_key, timer_reload_value):
super().__init__()
#
self.timer_reload_value = timer_reload_value
#
sw_device.add_callback(sw_key, None, self.circ_pump_actions, True)
2023-01-09 16:42:18 +01:00
#
self.ct = task.periodic(6, self.cyclic_task)
self.ct.run()
2023-01-09 16:42:18 +01:00
def circ_pump_actions(self, device, key, data):
if data is True:
self.set(self.KEY_TIMER, self.timer_reload_value)
2023-01-09 16:42:18 +01:00
else:
self.set(self.KEY_TIMER, 0)
2023-01-09 16:42:18 +01:00
def cyclic_task(self, rt):
timer_value = self[self.KEY_TIMER] - self.ct.cycle_time
if timer_value <= 0:
self.set(self.KEY_TIMER, 0)
else:
self.set(self.KEY_TIMER, timer_value)
class heating_function(common_base):
KEY_USER_TEMPERATURE_SETPOINT = 'user_temperature_setpoint'
KEY_TEMPERATURE_SETPOINT = 'temperature_setpoint'
KEY_TEMPERATURE_CURRENT = 'temperature_current'
KEY_AWAY_MODE = 'away_mode'
KEY_SUMMER_MODE = 'summer_mode'
KEY_START_BOOST = 'start_boost'
KEY_SET_DEFAULT_TEMPERATURE = 'set_default_temperature'
KEY_BOOST_TIMER = 'boost_timer'
#
2023-01-10 23:17:47 +01:00
BOOST_TEMPERATURE = 30
AWAY_REDUCTION = 5
SUMMER_TEMPERATURE = 5
def __init__(self, heating_valve, default_temperature):
db_data = get_radiator_data(heating_valve.topic)
super().__init__({
self.KEY_USER_TEMPERATURE_SETPOINT: db_data[2] or default_temperature,
self.KEY_TEMPERATURE_SETPOINT: db_data[3] or default_temperature,
self.KEY_TEMPERATURE_CURRENT: None,
self.KEY_AWAY_MODE: db_data[0] or False,
self.KEY_SUMMER_MODE: db_data[1] or False,
self.KEY_START_BOOST: True,
self.KEY_SET_DEFAULT_TEMPERATURE: False,
self.KEY_BOOST_TIMER: 0
})
#
2023-01-09 16:42:18 +01:00
self.default_temperature = default_temperature
self.heating_valve = heating_valve
self.heating_valve.set_heating_setpoint(self[self.KEY_TEMPERATURE_SETPOINT])
#
self.heating_valve.add_callback(self.heating_valve.KEY_HEATING_SETPOINT, None, self.get_radiator_setpoint)
self.heating_valve.add_callback(self.heating_valve.KEY_TEMPERATURE, None, self.get_radiator_temperature)
2023-01-09 16:42:18 +01:00
#
self.add_callback(self.KEY_USER_TEMPERATURE_SETPOINT, None, self.user_temperature_setpoint, False)
self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.set_heating_setpoint, True)
self.add_callback(self.KEY_AWAY_MODE, None, self.away_mode, True)
self.add_callback(self.KEY_SUMMER_MODE, None, self.summer_mode, True)
self.add_callback(self.KEY_SET_DEFAULT_TEMPERATURE, None, self.setpoint_to_default)
self.add_callback(self.KEY_START_BOOST, True, self.boost, False)
self.add_callback(self.KEY_BOOST_TIMER, 0, self.timer_expired, True)
# cyclic task initialisation
self.ct = task.periodic(1, self.cyclic_task)
self.ct.run()
def timer_expired(self, device, data, key):
self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT])
self.heating_valve.logger.info('Timer expired. returning to regular temperature setpoint %.1f°C.',
self[self.KEY_TEMPERATURE_SETPOINT])
def cyclic_task(self, rt):
timer_value = self[self.KEY_BOOST_TIMER] - self.ct.cycle_time
if self[self.KEY_BOOST_TIMER] <= 0:
self.set(self.KEY_BOOST_TIMER, 0)
else:
self.set(self.KEY_BOOST_TIMER, timer_value)
def cancel_boost(self):
self.set(self.KEY_BOOST_TIMER, 0, block_callback=[self.timer_expired])
def set(self, key, data, block_callback=[]):
rv = super().set(key, data, block_callback)
set_radiator_data(self.heating_valve.topic, self[self.KEY_AWAY_MODE], self[self.KEY_SUMMER_MODE],
self[self.KEY_USER_TEMPERATURE_SETPOINT], self[self.KEY_TEMPERATURE_SETPOINT])
return rv
2023-01-11 08:17:37 +01:00
2023-01-10 23:17:47 +01:00
def away_mode(self, device, key, value):
if value is True:
self.cancel_boost()
self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode])
self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT] - self.AWAY_REDUCTION)
2023-01-10 23:17:47 +01:00
else:
self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT])
2023-01-10 23:17:47 +01:00
def summer_mode(self, device, key, value):
if value is True:
self.cancel_boost()
self.set(self.KEY_AWAY_MODE, False, [self.away_mode])
self.set(self.KEY_TEMPERATURE_SETPOINT, self.SUMMER_TEMPERATURE)
2023-01-10 23:17:47 +01:00
else:
self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT])
2023-01-09 16:42:18 +01:00
2023-01-10 23:17:47 +01:00
def boost(self, device, key, data):
if self[self.KEY_BOOST_TIMER] == 0:
2023-01-10 23:17:47 +01:00
self.heating_valve.logger.info('Starting boost mode with setpoint %.1f°C.', self.BOOST_TEMPERATURE)
self.set(self.KEY_BOOST_TIMER, 15*60)
self.set(self.KEY_TEMPERATURE_SETPOINT, self.BOOST_TEMPERATURE)
2023-01-09 16:42:18 +01:00
else:
self.set(self.KEY_BOOST_TIMER, min(self[self.KEY_BOOST_TIMER] + 15 * 60, 60 * 60))
self.set(self.KEY_AWAY_MODE, False, [self.away_mode])
self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode])
2023-01-09 16:42:18 +01:00
2023-01-10 23:17:47 +01:00
def setpoint_to_default(self, device, key, data):
self.cancel_boost()
self.set(self.KEY_AWAY_MODE, False, [self.away_mode])
self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode])
self.set(self.KEY_USER_TEMPERATURE_SETPOINT, self.default_temperature, [self.user_temperature_setpoint])
self.set(self.KEY_TEMPERATURE_SETPOINT, self.default_temperature)
2023-01-10 23:17:47 +01:00
def user_temperature_setpoint(self, device, key, data):
2023-01-11 08:17:37 +01:00
self.cancel_boost()
self.set(self.KEY_AWAY_MODE, False, [self.away_mode])
self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode])
self.set(self.KEY_TEMPERATURE_SETPOINT, data)
def set_heating_setpoint(self, device, key, data):
self.heating_valve.set_heating_setpoint(data)
2023-01-10 23:17:47 +01:00
def get_radiator_setpoint(self, device, key, data):
if self[self.KEY_BOOST_TIMER] == 0 and not self[self.KEY_AWAY_MODE] and not self[self.KEY_SUMMER_MODE]:
self.set(self.KEY_USER_TEMPERATURE_SETPOINT, data, block_callback=[self.set_heating_setpoint])
def get_radiator_temperature(self, device, key, data):
self.set(self.KEY_TEMPERATURE_CURRENT, data)
class motion_sensor_light(common_base):
KEY_TIMER = 'timer'
KEY_MOTION_SENSOR = 'motion_%d'
KEY_MOTION_SENSOR_0 = 'motion_%d' % 0
KEY_MOTION_SENSOR_1 = 'motion_%d' % 1
KEY_MOTION_SENSOR_2 = 'motion_%d' % 2
KEY_MOTION_SENSOR_3 = 'motion_%d' % 3
KEY_MOTION_SENSOR_4 = 'motion_%d' % 4
def __init__(self, sw_device, sw_method, *args, timer_value=30):
"""
sw_device is the device switching the light, args are 0-n motion sensors
"""
dv = dict.fromkeys([self.KEY_MOTION_SENSOR % i for i in range(0, len(args))])
for key in dv:
dv[key] = False
dv[self.KEY_TIMER] = 0
super().__init__(default_values=dv)
#
self.sw_device = sw_device
self.sw_method = sw_method
2023-01-28 14:32:52 +01:00
self.motion_sensors = args
self.timer_reload_value = timer_value
#
sw_device.add_callback(devices.shelly.KEY_OUTPUT_0, True, self.reload_timer, True)
sw_device.add_callback(devices.shelly.KEY_OUTPUT_0, False, self.reset_timer, True)
for motion_sensor in args:
motion_sensor.add_callback(motion_sensor.KEY_OCCUPANCY, None, self.set_motion_detected, True)
#
self.add_callback(self.KEY_TIMER, 0, self.timer_expired, True)
#
cyclic_task = task.periodic(1, self.cyclic_task)
cyclic_task.run()
def reload_timer(self, device, key, data):
self.set(self.KEY_TIMER, self.timer_reload_value)
def reset_timer(self, device=None, key=None, data=None):
self.set(self.KEY_TIMER, 0)
def set_motion_detected(self, device, key, data):
2023-01-28 14:32:52 +01:00
for sensor_index, arg_device in enumerate(self.motion_sensors):
if arg_device.topic == device.topic:
break
self.set(self.KEY_MOTION_SENSOR % sensor_index, data)
if now() < sunrise_time(60) or now() > sunset_time(-60):
if data is True:
logger.info("%s: Motion detected - Switching on main light %s", device.topic, self.sw_device.topic)
self.sw_method(True)
def motion_detected(self):
2023-01-28 14:32:52 +01:00
for i in range(0, len(self.motion_sensors)):
if self[self.KEY_MOTION_SENSOR % i]:
return True
return False
def timer_expired(self, device, key, data):
logger.info("No motion and time ran out - Switching off main light %s", self.sw_device.topic)
self.sw_method(False)
def cyclic_task(self, cyclic_task):
min_value = 10 if self.motion_detected() else 0
2023-01-25 11:53:38 +01:00
if self[self.KEY_TIMER] != 0:
self.set(self.KEY_TIMER, max(min_value, self[self.KEY_TIMER] - cyclic_task.cycle_time))