#!/usr/bin/env python # -*- coding: utf-8 -*- # """ Functional Modules Targets: * Device like structure to be compatible with videv - KEY_* as part of the class for all parameters which needs to be accessed from videv - Method *.set(key, data) to pass data from videv to Module - Method .add_calback(key, data, callback, on_change_only=False) to register videv actualisation on changes """ from base import common_base import devices from function.db import get_radiator_data, set_radiator_data from function.helpers import now, sunset_time, sunrise_time import logging import task try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) class brightness_choose_n_action(common_base): KEY_ACTIVE_DEVICE = 'active_device' # DEFAULT_VALUES = {KEY_ACTIVE_DEVICE: None} def __init__(self, button_tradfri): super().__init__() # brightness change button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_DOWN_LONG, self.brightness_action) button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_UP_LONG, self.brightness_action) button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_DOWN_RELEASE, self.brightness_action) button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_UP_RELEASE, self.brightness_action) # device change button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_UP, self.choose_next_device) button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_DOWN, self.choose_prev_device) # self.brightness_device_list = [] self.callback_device_list = [] self.device_states = [] def add(self, brightness_device, callback_device, callback_key): """ brightness_device: A device for brightness function needs to have the following methods: * .default_inc() * .default_dec() * .default_stop() callback_device: A device for installing callback which are executed, when the device is switched on or off. It needs the following method: * .add_callback(key, data or None, callback, on_changes_only) """ if len(self.brightness_device_list) >= len(devices.nodered_gui_leds.KEY_LED_LIST): raise ValueError("Number of devices is limited by number of leds in devices.nodered_gui_leds.") self.brightness_device_list.append(brightness_device) self.callback_device_list.append((callback_device, callback_key)) self.device_states.append(False) callback_device.add_callback(callback_key, True, self.device_state_action, True) callback_device.add_callback(callback_key, False, self.device_state_action, True) def device_state_action(self, device, key, data): self.device_states[self.callback_device_list.index((device, key))] = data if data is True: self.set(self.KEY_ACTIVE_DEVICE, self.callback_device_list.index((device, key))) else: if self[self.KEY_ACTIVE_DEVICE] is not None: if self.callback_device_list[self[self.KEY_ACTIVE_DEVICE]][0] == device: self.choose_next_device() def choose_prev_device(self, device=None, key=None, data=None): if self[self.KEY_ACTIVE_DEVICE] is not None: start_value = self[self.KEY_ACTIVE_DEVICE] for i in range(0, len(self.brightness_device_list)): target_state = (start_value - i - 1) % (len(self.brightness_device_list)) if self.device_states[target_state]: self.set(self.KEY_ACTIVE_DEVICE, target_state) return self.set(self.KEY_ACTIVE_DEVICE, None) def choose_next_device(self, device=None, key=None, data=None): if self[self.KEY_ACTIVE_DEVICE] is not None: start_value = self[self.KEY_ACTIVE_DEVICE] for i in range(0, len(self.brightness_device_list)): target_state = (start_value + i + 1) % (len(self.brightness_device_list)) if self.device_states[target_state]: self.set(self.KEY_ACTIVE_DEVICE, target_state) return self.set(self.KEY_ACTIVE_DEVICE, None) def brightness_action(self, device, key, data): if self[self.KEY_ACTIVE_DEVICE] is not None: target = self.brightness_device_list[self[self.KEY_ACTIVE_DEVICE]] if data == devices.tradfri_button.ACTION_BRIGHTNESS_UP_LONG: logger.info("Increasing \"%s\" - %s", type(self).__name__, target.topic) target.default_inc() elif data == devices.tradfri_button.ACTION_BRIGHTNESS_DOWN_LONG: logger.info("Decreasing \"%s\" - %s", type(self).__name__, target.topic) target.default_dec() elif data in [devices.tradfri_button.ACTION_BRIGHTNESS_UP_RELEASE, devices.tradfri_button.ACTION_BRIGHTNESS_DOWN_RELEASE]: target.default_stop() class timer_on_activation(common_base): KEY_TIMER = 'timer' # DEFAULT_VALUES = { KEY_TIMER: 0 } def __init__(self, sw_device, sw_key, timer_reload_value): super().__init__() # self.timer_reload_value = timer_reload_value # sw_device.add_callback(sw_key, None, self.circ_pump_actions, True) # self.ct = task.periodic(6, self.cyclic_task) self.ct.run() def circ_pump_actions(self, device, key, data): if data is True: self.set(self.KEY_TIMER, self.timer_reload_value) else: self.set(self.KEY_TIMER, 0) def cyclic_task(self, rt): timer_value = self[self.KEY_TIMER] - self.ct.cycle_time if timer_value <= 0: self.set(self.KEY_TIMER, 0) else: self.set(self.KEY_TIMER, timer_value) class heating_function(common_base): KEY_USER_TEMPERATURE_SETPOINT = 'user_temperature_setpoint' KEY_TEMPERATURE_SETPOINT = 'temperature_setpoint' KEY_TEMPERATURE_CURRENT = 'temperature_current' KEY_AWAY_MODE = 'away_mode' KEY_SUMMER_MODE = 'summer_mode' KEY_START_BOOST = 'start_boost' KEY_SET_DEFAULT_TEMPERATURE = 'set_default_temperature' KEY_BOOST_TIMER = 'boost_timer' # BOOST_TEMPERATURE = 30 AWAY_REDUCTION = 5 SUMMER_TEMPERATURE = 5 def __init__(self, heating_valve, default_temperature): db_data = get_radiator_data(heating_valve.topic) super().__init__({ self.KEY_USER_TEMPERATURE_SETPOINT: db_data[2] or default_temperature, self.KEY_TEMPERATURE_SETPOINT: db_data[3] or default_temperature, self.KEY_TEMPERATURE_CURRENT: None, self.KEY_AWAY_MODE: db_data[0] or False, self.KEY_SUMMER_MODE: db_data[1] or False, self.KEY_START_BOOST: True, self.KEY_SET_DEFAULT_TEMPERATURE: False, self.KEY_BOOST_TIMER: 0 }) # self.default_temperature = default_temperature self.heating_valve = heating_valve self.heating_valve.set_heating_setpoint(self[self.KEY_TEMPERATURE_SETPOINT]) # self.heating_valve.add_callback(self.heating_valve.KEY_HEATING_SETPOINT, None, self.get_radiator_setpoint) self.heating_valve.add_callback(self.heating_valve.KEY_TEMPERATURE, None, self.get_radiator_temperature) # self.add_callback(self.KEY_USER_TEMPERATURE_SETPOINT, None, self.user_temperature_setpoint, False) self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.set_heating_setpoint, True) self.add_callback(self.KEY_AWAY_MODE, None, self.away_mode, True) self.add_callback(self.KEY_SUMMER_MODE, None, self.summer_mode, True) self.add_callback(self.KEY_SET_DEFAULT_TEMPERATURE, None, self.setpoint_to_default) self.add_callback(self.KEY_START_BOOST, True, self.boost, False) self.add_callback(self.KEY_BOOST_TIMER, 0, self.timer_expired, True) # cyclic task initialisation self.ct = task.periodic(1, self.cyclic_task) self.ct.run() def timer_expired(self, device, data, key): self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT]) self.heating_valve.logger.info('Timer expired. returning to regular temperature setpoint %.1f°C.', self[self.KEY_TEMPERATURE_SETPOINT]) def cyclic_task(self, rt): timer_value = self[self.KEY_BOOST_TIMER] - self.ct.cycle_time if self[self.KEY_BOOST_TIMER] <= 0: self.set(self.KEY_BOOST_TIMER, 0) else: self.set(self.KEY_BOOST_TIMER, timer_value) def cancel_boost(self): self.set(self.KEY_BOOST_TIMER, 0, block_callback=[self.timer_expired]) def set(self, key, data, block_callback=[]): rv = super().set(key, data, block_callback) set_radiator_data(self.heating_valve.topic, self[self.KEY_AWAY_MODE], self[self.KEY_SUMMER_MODE], self[self.KEY_USER_TEMPERATURE_SETPOINT], self[self.KEY_TEMPERATURE_SETPOINT]) return rv def away_mode(self, device, key, value): if value is True: self.cancel_boost() self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode]) self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT] - self.AWAY_REDUCTION) else: self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT]) def summer_mode(self, device, key, value): if value is True: self.cancel_boost() self.set(self.KEY_AWAY_MODE, False, [self.away_mode]) self.set(self.KEY_TEMPERATURE_SETPOINT, self.SUMMER_TEMPERATURE) else: self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT]) def boost(self, device, key, data): if self[self.KEY_BOOST_TIMER] == 0: self.heating_valve.logger.info('Starting boost mode with setpoint %.1f°C.', self.BOOST_TEMPERATURE) self.set(self.KEY_BOOST_TIMER, 15*60) self.set(self.KEY_TEMPERATURE_SETPOINT, self.BOOST_TEMPERATURE) else: self.set(self.KEY_BOOST_TIMER, min(self[self.KEY_BOOST_TIMER] + 15 * 60, 60 * 60)) self.set(self.KEY_AWAY_MODE, False, [self.away_mode]) self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode]) def setpoint_to_default(self, device, key, data): self.cancel_boost() self.set(self.KEY_AWAY_MODE, False, [self.away_mode]) self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode]) self.set(self.KEY_USER_TEMPERATURE_SETPOINT, self.default_temperature, [self.user_temperature_setpoint]) self.set(self.KEY_TEMPERATURE_SETPOINT, self.default_temperature) def user_temperature_setpoint(self, device, key, data): self.cancel_boost() self.set(self.KEY_AWAY_MODE, False, [self.away_mode]) self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode]) self.set(self.KEY_TEMPERATURE_SETPOINT, data) def set_heating_setpoint(self, device, key, data): self.heating_valve.set_heating_setpoint(data) def get_radiator_setpoint(self, device, key, data): if self[self.KEY_BOOST_TIMER] == 0 and not self[self.KEY_AWAY_MODE] and not self[self.KEY_SUMMER_MODE]: self.set(self.KEY_USER_TEMPERATURE_SETPOINT, data, block_callback=[self.set_heating_setpoint]) def get_radiator_temperature(self, device, key, data): self.set(self.KEY_TEMPERATURE_CURRENT, data) class motion_sensor_light(common_base): KEY_TIMER = 'timer' KEY_MOTION_SENSOR = 'motion_%d' KEY_MOTION_SENSOR_0 = 'motion_%d' % 0 KEY_MOTION_SENSOR_1 = 'motion_%d' % 1 KEY_MOTION_SENSOR_2 = 'motion_%d' % 2 KEY_MOTION_SENSOR_3 = 'motion_%d' % 3 KEY_MOTION_SENSOR_4 = 'motion_%d' % 4 def __init__(self, sw_device, sw_method, *args, timer_value=30): """ sw_device is the device switching the light, args are 0-n motion sensors """ dv = dict.fromkeys([self.KEY_MOTION_SENSOR % i for i in range(0, len(args))]) for key in dv: dv[key] = False dv[self.KEY_TIMER] = 0 super().__init__(default_values=dv) # self.sw_device = sw_device self.sw_method = sw_method self.args = args self.timer_reload_value = timer_value # sw_device.add_callback(devices.shelly.KEY_OUTPUT_0, True, self.reload_timer, True) sw_device.add_callback(devices.shelly.KEY_OUTPUT_0, False, self.reset_timer, True) for motion_sensor in args: motion_sensor.add_callback(motion_sensor.KEY_OCCUPANCY, None, self.set_motion_detected, True) # self.add_callback(self.KEY_TIMER, 0, self.timer_expired, True) # cyclic_task = task.periodic(1, self.cyclic_task) cyclic_task.run() def reload_timer(self, device, key, data): self.set(self.KEY_TIMER, self.timer_reload_value) def reset_timer(self, device=None, key=None, data=None): self.set(self.KEY_TIMER, 0) def set_motion_detected(self, device, key, data): for sensor_index, arg_device in enumerate(self.args): if arg_device.topic == device.topic: break self.set(self.KEY_MOTION_SENSOR % sensor_index, data) if now() < sunrise_time(60) or now() > sunset_time(-60): if data is True: logger.info("%s: Motion detected - Switching on main light %s", device.topic, self.sw_device.topic) self.sw_method(True) def motion_detected(self): for i in range(0, len(self.args)): if self[self.KEY_MOTION_SENSOR % i]: return True return False def timer_expired(self, device, key, data): logger.info("No motion and time ran out - Switching off main light %s", self.sw_device.topic) self.sw_method(False) def cyclic_task(self, cyclic_task): min_value = 10 if self.motion_detected() else 0 self.set(self.KEY_TIMER, max(min_value, self[self.KEY_TIMER] - cyclic_task.cycle_time))