123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- """
- Functional Modules
-
- Targets:
- * Device like structure to be compatible with videv
- - KEY_* as part of the class for all parameters which needs to be accessed from videv
- - Method *.set(key, data) to pass data from videv to Module
- - Method .add_calback(key, data, callback, on_change_only=False) to register videv actualisation on changes
- """
-
- from base import common_base
- import config
- import devices
- from function.helpers import day_state
- import logging
- import task
- import time
-
- try:
- from config import APP_NAME as ROOT_LOGGER_NAME
- except ImportError:
- ROOT_LOGGER_NAME = 'root'
- logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
-
-
- class switched_light(object):
- def __init__(self, sw_device, sw_key, li_device):
- sw_device.add_callback(sw_device.KEY_OUTPUT_0, True, li_device.request_data, True)
-
-
- class brightness_choose_n_action(common_base):
- KEY_ACTIVE_DEVICE = 'active_device'
- #
- DEFAULT_VALUES = {KEY_ACTIVE_DEVICE: None}
-
- def __init__(self, button_tradfri):
- super().__init__()
- # brightness change
- button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_DOWN_LONG, self.brightness_action)
- button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_UP_LONG, self.brightness_action)
- button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_DOWN_RELEASE, self.brightness_action)
- button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_UP_RELEASE, self.brightness_action)
- # device change
- button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_UP, self.choose_next_device)
- button_tradfri.add_callback(button_tradfri.KEY_ACTION, button_tradfri.ACTION_BRIGHTNESS_DOWN, self.choose_prev_device)
- #
- self.brightness_device_list = []
- self.callback_device_list = []
- self.device_states = []
-
- def add(self, brightness_device, callback_device, callback_key):
- """
- brightness_device: A device for brightness function needs to have the following methods:
- * .default_inc()
- * .default_dec()
- * .default_stop()
- callback_device: A device for installing callback which are executed, when the device is switched on or off. It needs the following method:
- * .add_callback(key, data or None, callback, on_changes_only)
- """
- self.brightness_device_list.append(brightness_device)
- self.callback_device_list.append((callback_device, callback_key))
- self.device_states.append(False)
- callback_device.add_callback(callback_key, True, self.device_state_action, True)
- callback_device.add_callback(callback_key, False, self.device_state_action, True)
-
- def device_state_action(self, device, key, data):
- self.device_states[self.callback_device_list.index((device, key))] = data
- if data is True:
- self.set(self.KEY_ACTIVE_DEVICE, self.callback_device_list.index((device, key)))
- else:
- if self[self.KEY_ACTIVE_DEVICE] is not None:
- if self.callback_device_list[self[self.KEY_ACTIVE_DEVICE]][0] == device:
- self.choose_next_device()
-
- def choose_prev_device(self, device=None, key=None, data=None):
- if self[self.KEY_ACTIVE_DEVICE] is not None:
- start_value = self[self.KEY_ACTIVE_DEVICE]
- for i in range(0, len(self.brightness_device_list)):
- target_state = (start_value - i - 1) % (len(self.brightness_device_list))
- if self.device_states[target_state]:
- self.set(self.KEY_ACTIVE_DEVICE, target_state)
- return
- self.set(self.KEY_ACTIVE_DEVICE, None)
-
- def choose_next_device(self, device=None, key=None, data=None):
- if self[self.KEY_ACTIVE_DEVICE] is not None:
- start_value = self[self.KEY_ACTIVE_DEVICE]
- for i in range(0, len(self.brightness_device_list)):
- target_state = (start_value + i + 1) % (len(self.brightness_device_list))
- if self.device_states[target_state]:
- self.set(self.KEY_ACTIVE_DEVICE, target_state)
- return
- self.set(self.KEY_ACTIVE_DEVICE, None)
-
- def brightness_action(self, device, key, data):
- if self[self.KEY_ACTIVE_DEVICE] is not None:
- target = self.brightness_device_list[self[self.KEY_ACTIVE_DEVICE]]
- if data == devices.tradfri_button.ACTION_BRIGHTNESS_UP_LONG:
- logger.info("Increasing \"%s\" - %s", type(self).__name__, target.topic)
- target.default_inc()
- elif data == devices.tradfri_button.ACTION_BRIGHTNESS_DOWN_LONG:
- logger.info("Decreasing \"%s\" - %s", type(self).__name__, target.topic)
- target.default_dec()
- elif data in [devices.tradfri_button.ACTION_BRIGHTNESS_UP_RELEASE, devices.tradfri_button.ACTION_BRIGHTNESS_DOWN_RELEASE]:
- target.default_stop()
-
-
- class timer_on_activation(common_base):
- KEY_TIMER = 'timer'
- #
- DEFAULT_VALUES = {
- KEY_TIMER: 0
- }
-
- def __init__(self, sw_device, sw_key, timer_reload_value):
- super().__init__()
- #
- self.timer_reload_value = timer_reload_value
- #
- sw_device.add_callback(sw_key, None, self.circ_pump_actions, True)
- #
- self.ct = task.periodic(6, self.cyclic_task)
- self.ct.run()
-
- def circ_pump_actions(self, device, key, data):
- if data is True:
- self.set(self.KEY_TIMER, self.timer_reload_value)
- else:
- self.set(self.KEY_TIMER, 0)
-
- def cyclic_task(self, rt):
- timer_value = self[self.KEY_TIMER] - self.ct.cycle_time
- if timer_value <= 0:
- self.set(self.KEY_TIMER, 0)
- else:
- self.set(self.KEY_TIMER, timer_value)
-
-
- class heating_function(common_base):
- KEY_USER_TEMPERATURE_SETPOINT = 'user_temperature_setpoint'
- KEY_TEMPERATURE_SETPOINT = 'temperature_setpoint'
- KEY_TEMPERATURE_CURRENT = 'temperature_current'
- KEY_AWAY_MODE = 'away_mode'
- KEY_SUMMER_MODE = 'summer_mode'
- KEY_START_BOOST = 'start_boost'
- KEY_SET_DEFAULT_TEMPERATURE = 'set_default_temperature'
- KEY_BOOST_TIMER = 'boost_timer'
- #
- BOOST_TEMPERATURE = 30
- AWAY_REDUCTION = 5
- SUMMER_TEMPERATURE = 5
-
- class value_timeout_list(object):
- MAX_DELAY = 10
-
- def __init__(self):
- self.__data__ = []
- self.__time__ = []
-
- def __cleanup__(self):
- now = time.time()
- for i, tm in enumerate(self.__time__):
- if tm + self.MAX_DELAY < now:
- del (self.__data__[i])
- del (self.__time__[i])
-
- def new(self, item):
- self.__cleanup__()
- self.__data__.append(item)
- self.__time__.append(time.time())
-
- def is_valid_value(self, data):
- self.__cleanup__()
- return data not in self.__data__
-
- def __init__(self, heating_valve, default_temperature, **kwargs):
- self.heating_valve = heating_valve
- self.default_temperature = default_temperature
- #
- self.valve_value = self.value_timeout_list()
- #
- super().__init__({
- self.KEY_USER_TEMPERATURE_SETPOINT: kwargs.get(self.KEY_USER_TEMPERATURE_SETPOINT, self.default_temperature),
- self.KEY_TEMPERATURE_SETPOINT: kwargs.get(self.KEY_TEMPERATURE_SETPOINT, self.default_temperature),
- self.KEY_TEMPERATURE_CURRENT: kwargs.get(self.KEY_TEMPERATURE_CURRENT, None),
- self.KEY_AWAY_MODE: kwargs.get(self.KEY_AWAY_MODE, False),
- self.KEY_SUMMER_MODE: kwargs.get(self.KEY_SUMMER_MODE, False),
- self.KEY_START_BOOST: kwargs.get(self.KEY_START_BOOST, True),
- self.KEY_SET_DEFAULT_TEMPERATURE: kwargs.get(self.KEY_SET_DEFAULT_TEMPERATURE, False),
- self.KEY_BOOST_TIMER: kwargs.get(self.KEY_BOOST_TIMER, 0)
- })
- #
- self.heating_valve.set_heating_setpoint(self[self.KEY_TEMPERATURE_SETPOINT])
- #
- self.heating_valve.add_callback(self.heating_valve.KEY_HEATING_SETPOINT, None, self.get_radiator_setpoint)
- self.heating_valve.add_callback(self.heating_valve.KEY_TEMPERATURE, None, self.get_radiator_temperature)
- #
- self.add_callback(self.KEY_USER_TEMPERATURE_SETPOINT, None, self.user_temperature_setpoint, False)
- self.add_callback(self.KEY_TEMPERATURE_SETPOINT, None, self.set_heating_setpoint, True)
- self.add_callback(self.KEY_AWAY_MODE, None, self.away_mode, True)
- self.add_callback(self.KEY_SUMMER_MODE, None, self.summer_mode, True)
- self.add_callback(self.KEY_SET_DEFAULT_TEMPERATURE, None, self.setpoint_to_default)
- self.add_callback(self.KEY_START_BOOST, True, self.boost, False)
- self.add_callback(self.KEY_BOOST_TIMER, 0, self.timer_expired, True)
- # cyclic task initialisation
- self.ct = task.periodic(1, self.cyclic_task)
- self.ct.run()
- self.ct2 = task.periodic(5 * 60, self.cyclic_task_tx_setpoint)
- self.ct2.run()
-
- def timer_expired(self, device, data, key):
- self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT])
- self.heating_valve.logger.info('Timer expired. returning to regular temperature setpoint %.1f°C.',
- self[self.KEY_TEMPERATURE_SETPOINT])
-
- def cyclic_task_tx_setpoint(self, rt):
- self.heating_valve.set_heating_setpoint(self.get(self.KEY_TEMPERATURE_SETPOINT))
-
- def cyclic_task(self, rt):
- timer_value = self[self.KEY_BOOST_TIMER] - self.ct.cycle_time
- if self[self.KEY_BOOST_TIMER] <= 0:
- self.set(self.KEY_BOOST_TIMER, 0)
- else:
- self.set(self.KEY_BOOST_TIMER, timer_value)
-
- def cancel_boost(self):
- self.set(self.KEY_BOOST_TIMER, 0, block_callback=[self.timer_expired])
-
- def send_command(self, key, data, block_callback=[]):
- return super().set(key, data, block_callback)
-
- def away_mode(self, device, key, value):
- if value is True:
- self.cancel_boost()
- self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode])
- self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT] - self.AWAY_REDUCTION)
- else:
- self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT])
-
- def summer_mode(self, device, key, value):
- if value is True:
- self.cancel_boost()
- self.set(self.KEY_AWAY_MODE, False, [self.away_mode])
- self.set(self.KEY_TEMPERATURE_SETPOINT, self.SUMMER_TEMPERATURE)
- else:
- self.set(self.KEY_TEMPERATURE_SETPOINT, self[self.KEY_USER_TEMPERATURE_SETPOINT])
-
- def boost(self, device, key, data):
- if self[self.KEY_BOOST_TIMER] == 0:
- self.heating_valve.logger.info('Starting boost mode with setpoint %.1f°C.', self.BOOST_TEMPERATURE)
- self.set(self.KEY_BOOST_TIMER, 15*60)
- self.set(self.KEY_TEMPERATURE_SETPOINT, self.BOOST_TEMPERATURE)
- else:
- self.set(self.KEY_BOOST_TIMER, min(self[self.KEY_BOOST_TIMER] + 15 * 60, 60 * 60))
- self.set(self.KEY_AWAY_MODE, False, [self.away_mode])
- self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode])
-
- def setpoint_to_default(self, device, key, data):
- self.cancel_boost()
- self.set(self.KEY_AWAY_MODE, False, [self.away_mode])
- self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode])
- self.set(self.KEY_USER_TEMPERATURE_SETPOINT, self.default_temperature, [self.user_temperature_setpoint])
- self.set(self.KEY_TEMPERATURE_SETPOINT, self.default_temperature)
-
- def user_temperature_setpoint(self, device, key, data):
- self.cancel_boost()
- self.set(self.KEY_AWAY_MODE, False, [self.away_mode])
- self.set(self.KEY_SUMMER_MODE, False, [self.summer_mode])
- self.set(self.KEY_TEMPERATURE_SETPOINT, data)
-
- def set_heating_setpoint(self, device, key, data):
- self.valve_value.new(data)
- self.heating_valve.set_heating_setpoint(data)
-
- def get_radiator_setpoint(self, device, key, data):
- if self.valve_value.is_valid_value(data):
- if self[self.KEY_BOOST_TIMER] == 0 and not self[self.KEY_AWAY_MODE] and not self[self.KEY_SUMMER_MODE]:
- self.set(self.KEY_USER_TEMPERATURE_SETPOINT, data, block_callback=[self.set_heating_setpoint])
-
- def get_radiator_temperature(self, device, key, data):
- self.set(self.KEY_TEMPERATURE_CURRENT, data)
-
-
- class motion_sensor_light(common_base):
- KEY_TIMER = 'timer'
- KEY_MOTION_SENSOR = 'motion_%d'
- KEY_MOTION_SENSOR_0 = 'motion_%d' % 0
- KEY_MOTION_SENSOR_1 = 'motion_%d' % 1
- KEY_MOTION_SENSOR_2 = 'motion_%d' % 2
- KEY_MOTION_SENSOR_3 = 'motion_%d' % 3
- KEY_MOTION_SENSOR_4 = 'motion_%d' % 4
-
- def __init__(self, sw_device, sw_method, *args, timer_value=30):
- """
- sw_device is the device switching the light, args are 0-n motion sensors
- """
- dv = dict.fromkeys([self.KEY_MOTION_SENSOR % i for i in range(0, len(args))])
- for key in dv:
- dv[key] = False
- dv[self.KEY_TIMER] = 0
- super().__init__(default_values=dv)
- #
- self.sw_device = sw_device
- self.sw_method = sw_method
- self.motion_sensors = args
- self.timer_reload_value = timer_value
- #
- sw_device.add_callback(devices.shelly_sw1.KEY_OUTPUT_0, True, self.reload_timer, True)
- sw_device.add_callback(devices.shelly_sw1.KEY_OUTPUT_0, False, self.reset_timer, True)
- for motion_sensor in args:
- motion_sensor.add_callback(motion_sensor.KEY_OCCUPANCY, None, self.set_motion_detected, True)
- #
- self.add_callback(self.KEY_TIMER, 0, self.timer_expired, True)
- #
- cyclic_task = task.periodic(1, self.cyclic_task)
- cyclic_task.run()
-
- def reload_timer(self, device, key, data):
- self.set(self.KEY_TIMER, self.timer_reload_value)
-
- def reset_timer(self, device=None, key=None, data=None):
- self.set(self.KEY_TIMER, 0)
-
- def set_motion_detected(self, device, key, data):
- for sensor_index, arg_device in enumerate(self.motion_sensors):
- if arg_device.topic == device.topic:
- break
- self.set(self.KEY_MOTION_SENSOR % sensor_index, data)
- # auto light on with state sunset -> time_offset_sunrise=60 (longer sunset) and time_offset_sunset=-60 (longer sunset)
- if day_state(None, None, 60, -60).get_state() == day_state.KEY_SUNSET:
- if data is True:
- logger.info("%s: Motion detected - Switching on main light %s", device.topic, self.sw_device.topic)
- self.sw_method(True)
-
- def motion_detected(self):
- for i in range(0, len(self.motion_sensors)):
- if self[self.KEY_MOTION_SENSOR % i]:
- return True
- return False
-
- def timer_expired(self, device, key, data):
- logger.info("No motion and time ran out - Switching off main light %s", self.sw_device.topic)
- self.sw_method(False)
-
- def cyclic_task(self, cyclic_task):
- min_value = 10 if self.motion_detected() else 0
- if self[self.KEY_TIMER] != 0:
- self.set(self.KEY_TIMER, max(min_value, self[self.KEY_TIMER] - cyclic_task.cycle_time))
|