#!/usr/bin/env python # -*- coding: utf-8 -*- # import config import devices from function.helpers import now, sunset_time, sunrise_time import logging import task try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) class room(object): def __init__(self, mqtt_client): self.mqtt_client = mqtt_client class room_shelly(room): def __init__(self, mqtt_client, topic_shelly, topic_gui): super().__init__(mqtt_client) self.main_light_shelly = devices.shelly(mqtt_client, topic_shelly) # self.gui_main_light = devices.nodered_gui_light(mqtt_client, topic_gui) # # Callback initialisation # self.gui_main_light.add_callback(devices.nodered_gui_light.KEY_STATE, None, self.main_light_shelly.set_output_0_mcb) self.main_light_shelly.add_callback(devices.shelly.KEY_OUTPUT_0, None, self.gui_main_light.set_state_mcb) # self.block_all_off = False self.last_flash_data = None self.delayed_task = task.delayed(.25, self.main_light_shelly.toggle_output_0_mcb, None, None, None) def all_off(self, device=None, key=None, data=None): if not self.block_all_off: logger.info("Switching all off \"%s\"", type(self).__name__) self.main_light_shelly.set_output_0(False) self.main_light_shelly.set_output_1(False) self.block_all_off = False def all_off_feedback(self, device=None, key=None, data=None): logger.info("Flashing \"%s\" main light", type(self).__name__) if self.main_light_shelly.output_0 is False: self.main_light_shelly.set_output_0(True) self.block_all_off = True self.delayed_task.run() def flash_main_light(self, device, key, data): if self.last_flash_data != data and data is True: logger.info("Flashing \"%s\" main light", type(self).__name__) self.main_light_shelly.toggle_output_0_mcb(device, key, data) self.delayed_task.run() self.last_flash_data = data class room_shelly_motion_sensor(room_shelly): def __init__(self, mqtt_client, topic_shelly, topic_gui, topic_motion_sensor_1, topic_motion_sensor_2=None, timer_value=30): super().__init__(mqtt_client, topic_shelly, topic_gui) self.timer_value = timer_value self.motion_detected_1 = False self.motion_detected_2 = False # self.main_light_shelly.add_callback(devices.shelly.KEY_OUTPUT_0, True, self.reload_timer, True) self.main_light_shelly.add_callback(devices.shelly.KEY_OUTPUT_0, False, self.reset_timer, True) # self.motion_sensor_silvercrest_1 = devices.silvercrest_motion_sensor(mqtt_client, topic_motion_sensor_1) self.motion_sensor_silvercrest_1.add_callback(devices.silvercrest_motion_sensor.KEY_OCCUPANCY, None, self.set_motion_detected) # if topic_motion_sensor_2 is not None: self.motion_sensor_silvercrest_2 = devices.silvercrest_motion_sensor(mqtt_client, topic_motion_sensor_2) self.motion_sensor_silvercrest_2.add_callback(devices.silvercrest_motion_sensor.KEY_OCCUPANCY, None, self.set_motion_detected) # self.reset_timer() # cyclic_task = task.periodic(1, self.cyclic_task) cyclic_task.run() def set_motion_detected(self, device, key, data): if device == self.motion_sensor_silvercrest_1: self.motion_detected_1 = data elif device == self.motion_sensor_silvercrest_2: self.motion_detected_2 = data self.gui_main_light.set_led(devices.nodered_gui_light.KEY_LED_0, self.motion_detected_1) self.gui_main_light.set_led(devices.nodered_gui_light.KEY_LED_1, self.motion_detected_2) if now() < sunrise_time(60) or now() > sunset_time(-60): if data is True: logger.info("%s: Motion detected - Switching on main light %s", device.topic, self.main_light_shelly.topic) self.main_light_shelly.set_output_0(True) def reload_timer(self, device, key, data): self.main_light_timer = self.timer_value def reset_timer(self, device=None, key=None, data=None): self.main_light_timer = None self.gui_main_light.set_timer('-') def cyclic_task(self, cyclic_task): if self.main_light_timer is not None: if self.main_light_timer <= 0: logger.info("No motion and time ran out - Switching off main light %s", self.main_light_shelly.topic) self.main_light_shelly.set_output_0(False) self.main_light_timer = None self.gui_main_light.set_timer('-') else: self.gui_main_light.set_timer(self.main_light_timer) if (self.motion_detected_1 or self.motion_detected_2) and self.main_light_timer <= self.timer_value / 10: self.main_light_timer = self.timer_value / 10 else: self.main_light_timer -= cyclic_task.cycle_time class room_shelly_tradfri_light(room_shelly): def __init__(self, mqtt_client, topic_shelly, topic_gui, topic_tradfri_light): super().__init__(mqtt_client, topic_shelly, topic_gui) self.main_light_tradfri = devices.tradfri_light(mqtt_client, topic_tradfri_light) # # Callback initialisation # self.main_light_shelly.add_callback(devices.shelly.KEY_OUTPUT_0, None, self.gui_main_light.set_state_mcb) self.main_light_shelly.add_callback(devices.shelly.KEY_OUTPUT_0, None, self.gui_main_light.set_enable_mcb) self.main_light_tradfri.add_callback(devices.tradfri_light.KEY_BRIGHTNESS, None, self.gui_main_light.set_brightness_mcb) self.main_light_tradfri.add_callback(devices.tradfri_light.KEY_COLOR_TEMP, None, self.gui_main_light.set_color_temp_mcb) self.gui_main_light.add_callback(devices.nodered_gui_light.KEY_BRIGHTNESS, None, self.main_light_tradfri.set_brightness_mcb) self.gui_main_light.add_callback(devices.nodered_gui_light.KEY_COLOR_TEMP, None, self.main_light_tradfri.set_color_temp_mcb) class room_shelly_silvercrest_light(room_shelly_tradfri_light): def __init__(self, mqtt_client, topic_shelly, topic_gui, topic_tradfri_light): super().__init__(mqtt_client, topic_shelly, topic_gui, topic_tradfri_light) # # Callback initialisation # self.main_light_shelly.add_callback(devices.shelly.KEY_OUTPUT_0, None, self.get_initial_main_light_data) # self.main_light_shelly_last = None def get_initial_main_light_data(self, device, key, data): if data is True and self.main_light_shelly_last != data: self.send_init_message_main_light() self.main_light_shelly_last = data def send_init_message_main_light(self): self.main_light_tradfri.request_data()