#!/usr/bin/env python # -*- coding: utf-8 -*- # import config import devices from function.db import get_gui_radiator_data, set_gui_radiator_data from function.rooms import room_shelly import logging import task try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) class brightness_choose_n_action(object): def __init__(self, mqtt_client, button_tradfri, topic_led): self.gui_led_active_device = devices.nodered_gui_leds(mqtt_client, topic_led) # brightness change button_tradfri.add_callback(devices.tradfri_button.KEY_ACTION, devices.tradfri_button.ACTION_BRIGHTNESS_DOWN_LONG, self.brightness_action) button_tradfri.add_callback(devices.tradfri_button.KEY_ACTION, devices.tradfri_button.ACTION_BRIGHTNESS_UP_LONG, self.brightness_action) button_tradfri.add_callback(devices.tradfri_button.KEY_ACTION, devices.tradfri_button.ACTION_BRIGHTNESS_DOWN_RELEASE, self.brightness_action) button_tradfri.add_callback(devices.tradfri_button.KEY_ACTION, devices.tradfri_button.ACTION_BRIGHTNESS_UP_RELEASE, self.brightness_action) # device change button_tradfri.add_callback(devices.tradfri_button.KEY_ACTION, devices.tradfri_button.ACTION_BRIGHTNESS_UP, self.choose_next_device) button_tradfri.add_callback(devices.tradfri_button.KEY_ACTION, devices.tradfri_button.ACTION_BRIGHTNESS_DOWN, self.choose_prev_device) # self.brightness_device_list = [] self.callback_device_list = [] self.device_states = [] self.active_device_state = None self.update_active_device_led() def add(self, brightness_device, callback_device, callback_key): """ brightness_device: A device for brightness function needs to have the following methods: * .default_inc() * .default_dec() * .default_stop() callback_device: A device for installing callback which are executed, when the device is switched on or off. It needs the following method: * .add_callback(key, data or None, callback, on_changes_only) """ if len(self.brightness_device_list) >= len(devices.nodered_gui_leds.KEY_LED_LIST): raise ValueError("Number of devices is limited by number of leds in devices.nodered_gui_leds.") self.brightness_device_list.append(brightness_device) self.callback_device_list.append((callback_device, callback_key)) self.device_states.append(False) callback_device.add_callback(callback_key, True, self.device_state_action, True) callback_device.add_callback(callback_key, False, self.device_state_action, True) def device_state_action(self, device, key, data): self.device_states[self.callback_device_list.index((device, key))] = data if data is True: self.active_device_state = self.callback_device_list.index((device, key)) self.update_active_device_led() else: self.choose_next_device() def update_active_device_led(self): for i in range(0, len(self.brightness_device_list)): self.gui_led_active_device.set_led(devices.nodered_gui_leds.KEY_LED_LIST[i], self.active_device_state == i) def choose_prev_device(self, device=None, key=None, data=None): if self.active_device_state is not None: start_value = self.active_device_state for i in range(0, len(self.brightness_device_list)): target_state = (start_value - i - 1) % (len(self.brightness_device_list)) if self.device_states[target_state]: self.active_device_state = target_state self.update_active_device_led() return self.active_device_state = None self.update_active_device_led() def choose_next_device(self, device=None, key=None, data=None): if self.active_device_state is not None: start_value = self.active_device_state for i in range(0, len(self.brightness_device_list)): target_state = (start_value + i + 1) % (len(self.brightness_device_list)) if self.device_states[target_state]: self.active_device_state = target_state self.update_active_device_led() return self.active_device_state = None self.update_active_device_led() def brightness_action(self, device, key, data): if self.active_device_state is not None: target = self.brightness_device_list[self.active_device_state] if data == devices.tradfri_button.ACTION_BRIGHTNESS_UP_LONG: logger.info("Increasing \"%s\" - %s", type(self).__name__, target.topic) target.default_inc() elif data == devices.tradfri_button.ACTION_BRIGHTNESS_DOWN_LONG: logger.info("Decreasing \"%s\" - %s", type(self).__name__, target.topic) target.default_dec() elif data in [devices.tradfri_button.ACTION_BRIGHTNESS_UP_RELEASE, devices.tradfri_button.ACTION_BRIGHTNESS_DOWN_RELEASE]: target.default_stop() class circulation_pump(room_shelly): def __init__(self, mqtt_client): super().__init__(mqtt_client, config.TOPIC_FFE_KITCHEN_CIRCULATION_PUMP_SHELLY, config.TOPIC_FFE_KITCHEN_CIRCULATION_PUMP_GUI) # self.main_light_shelly.add_callback(devices.shelly.KEY_OUTPUT_0, None, self.circ_pump_actions, True) # self.gui_main_light.set_timer('-') # self.ct = task.periodic(6, self.cyclic_task) self.pump_timer = None # self.ct.run() def circ_pump_actions(self, device, key, data): if data is True: self.pump_timer = 10 self.gui_main_light.set_timer(self.pump_timer) else: self.pump_timer = None self.gui_main_light.set_timer('-') def cyclic_task(self, rt): if self.pump_timer is not None: if self.pump_timer <= 0: self.pump_timer = None self.gui_main_light.set_timer('-') else: self.gui_main_light.set_timer(self.pump_timer) self.pump_timer -= self.ct.cycle_time / 60 class radiator_function(object): BOOST_TEMPERATURE = 30 AWAY_REDUCTION = 5 SUMMER_TEMPERATURE = 5 def __init__(self, mqtt_client, topic_valve, topic_gui, default_temperature): self.default_temperature = default_temperature self.boost_timer = None # device initialisation self.heating_valve = devices.brennenstuhl_heatingvalve(mqtt_client, topic_valve) self.gui_heating = devices.nodered_gui_radiator(mqtt_client, topic_gui) # db-stored data initialisation db_data = get_gui_radiator_data(topic_gui) self.__away_mode__ = db_data[0] or False self.__summer_mode__ = db_data[1] or False self.__user_temperature_setpoint__ = db_data[2] or default_temperature if self.__away_mode__: self.away_mode(None, None, True) elif self.__summer_mode__: self.summer_mode(None, None, True) else: self.set_heating_setpoint(None, None, self.__user_temperature_setpoint__) # callback initialisation self.heating_valve.add_callback(devices.brennenstuhl_heatingvalve.KEY_TEMPERATURE, None, self.gui_heating.set_temperature_mcb) self.heating_valve.add_callback(devices.brennenstuhl_heatingvalve.KEY_HEATING_SETPOINT, None, self.get_radiator_setpoint) # self.gui_heating.add_callback(devices.nodered_gui_radiator.KEY_SETPOINT_TEMP, None, self.set_heating_setpoint) self.gui_heating.add_callback(devices.nodered_gui_radiator.KEY_BOOST, None, self.boost) self.gui_heating.add_callback(devices.nodered_gui_radiator.KEY_SETPOINT_TO_DEFAULT, None, self.setpoint_to_default) self.gui_heating.add_callback(devices.nodered_gui_radiator.KEY_AWAY, None, self.away_mode) self.gui_heating.add_callback(devices.nodered_gui_radiator.KEY_SUMMER, None, self.summer_mode) # cyclic task initialisation self.ct = task.periodic(1, self.cyclic_task) self.ct.run() def cyclic_task(self, rt): if self.boost_timer is not None: self.gui_heating.set_timer(round(self.boost_timer / 60, 1)) # self.boost_timer -= self.ct.cycle_time if self.boost_timer <= 0: self.cancel_boost() self.heating_valve.set_heating_setpoint(self.__user_temperature_setpoint__) self.heating_valve.logger.info('Timer expired. returning to regular temperature setpoint %.1f°C.', self.__user_temperature_setpoint__) def cancel_boost(self, device=None, key=None, data=None): if self.boost_timer is not None: self.boost_timer = None self.gui_heating.set_timer('-') def update_states(self, away_mode=None, summer_mode=None, user_temperature_setpoint=None): if away_mode is not None: self.__away_mode__ = away_mode if summer_mode is not None: self.__summer_mode__ = summer_mode if user_temperature_setpoint is not None: self.__user_temperature_setpoint__ = user_temperature_setpoint set_gui_radiator_data(self.gui_heating.topic, self.__away_mode__, self.__summer_mode__, self.__user_temperature_setpoint__) # self.gui_heating.set_away(self.__away_mode__) self.gui_heating.set_summer(self.__summer_mode__) self.gui_heating.set_enable(not self.__away_mode__ and not self.__summer_mode__) def away_mode(self, device, key, value): if value is True: self.cancel_boost() self.update_states(away_mode=value, summer_mode=False) self.heating_valve.set_heating_setpoint(self.__user_temperature_setpoint__ - self.AWAY_REDUCTION) else: self.update_states(away_mode=value) self.heating_valve.set_heating_setpoint(self.__user_temperature_setpoint__) def summer_mode(self, device, key, value): if value is True: self.cancel_boost() self.update_states(away_mode=False, summer_mode=value) self.heating_valve.set_heating_setpoint(self.SUMMER_TEMPERATURE) else: self.update_states(summer_mode=value) self.heating_valve.set_heating_setpoint(self.__user_temperature_setpoint__) def boost(self, device, key, data): if self.boost_timer is None: self.heating_valve.logger.info('Starting boost mode with setpoint %.1f°C.', self.BOOST_TEMPERATURE) self.boost_timer = 15*60 self.heating_valve.set_heating_setpoint(self.BOOST_TEMPERATURE) else: self.boost_timer += 15 * 60 if self.boost_timer > 60 * 60: self.boost_timer = 60 * 60 self.update_states(away_mode=False, summer_mode=False) def setpoint_to_default(self, device, key, data): self.set_heating_setpoint(device, key, self.default_temperature) def set_heating_setpoint(self, device, key, data): self.cancel_boost() self.update_states(away_mode=False, summer_mode=False, user_temperature_setpoint=data) self.heating_valve.set_heating_setpoint(data) def get_radiator_setpoint(self, device, key, data): if self.boost_timer is None and not self.__away_mode__ and not self.__summer_mode__: self.update_states(user_temperature_setpoint=data) else: self.update_states() self.gui_heating.set_setpoint_temperature(data)