import config import devices import geo import logging import geo.sun import mqtt import random import report import state_machine import time logger = logging.getLogger(config.APP_NAME) class day_states(state_machine.state_machine): LOG_PREFIX = 'Day states:' STATE_NIGHT = 'state_night' STATE_MORNING = 'state_morning' STATE_DAY = 'state_day' STATE_EVENING = 'state_evening' CONDITION_NIGHT_END = 'condition_night_end' CONDITION_MORNING_END = 'condition_morning_end' CONDITION_DAY_END = 'condition_day_end' CONDITION_EVENING_END = 'condition_evening_end' TRANSITIONS = { STATE_NIGHT: ( (CONDITION_NIGHT_END, 1, STATE_MORNING), ), STATE_MORNING: ( (CONDITION_MORNING_END, 1, STATE_DAY), ), STATE_DAY: ( (CONDITION_DAY_END, 1, STATE_EVENING), ), STATE_EVENING: ( (CONDITION_EVENING_END, 1, STATE_NIGHT), ), } def condition_night_end(self): ltime = time.localtime(time.time()) return ltime.tm_hour >= 5 and ltime.tm_min >= 20 and not self.condition_morning_end() # 2nd condition to ensure day change def condition_morning_end(self): ltime = time.mktime(time.localtime(time.time())) sunrise = time.mktime(geo.sun.sunrise(config.GEO_POSITION)) + 30 * 60 return ltime > sunrise def condition_day_end(self): ltime = time.mktime(time.localtime(time.time())) sunset = time.mktime(geo.sun.sunset(config.GEO_POSITION)) - 30 * 60 return ltime > sunset def condition_evening_end(self): ltime = time.localtime(time.time()) return ltime.tm_hour >= 20 and ltime.tm_min >= 30 def get_sorted_sw_offs(num): SWITCH_DURATION = 30 * 60 rv = [] for i in range(0, num): rv.append(random.randint(0, SWITCH_DURATION)) rv.sort() return rv def switch_x(state: bool, sm: day_states, mydevs: list): tm = time.time() random.shuffle(mydevs) offsets = get_sorted_sw_offs(len(mydevs)) logger.info("State changed to %s with offsets = %s", repr(sm.this_state()), repr(offsets)) allowed_states = [sm.STATE_EVENING, sm.STATE_MORNING] if state else [sm.STATE_DAY, sm.STATE_NIGHT] while (len(mydevs) > 0 and sm.this_state() in allowed_states): sm.work() dt = time.time() - tm if dt > offsets[0]: offsets.pop(0) d: devices.tradfri_sw_br_ct = mydevs.pop() logger.info("Swiching %s to state %s", d.topic, repr(state)) d.set_output_0(state) time.sleep(0.25) def switch_on(sm: day_states, devs: list): switch_x(True, sm, devs[:]) def switch_off(sm: day_states, devs: list): switch_x(False, sm, devs[:]) if __name__ == "__main__": # # Logging # if config.DEBUG: report.appLoggingConfigure(None, 'stdout', ((config.APP_NAME, logging.DEBUG), ), target_level=logging.WARNING, fmt=report.SHORT_FMT, host='localhost', port=19996) else: report.stdoutLoggingConfigure(((config.APP_NAME, config.LOGLEVEL), ), report.SHORT_FMT) # # MQTT Client # mc = mqtt.mqtt_client(host=config.MQTT_SERVER, port=config.MQTT_PORT, username=config.MQTT_USER, password=config.MQTT_PASSWORD, name=config.APP_NAME) # # Smarthome physical Devices devs = [] devs.append(devices.tradfri_sw_br_ct(mc, "zigbee_gfe/gfe/room1/window_light")) devs.append(devices.tradfri_sw_br_ct(mc, "zigbee_gfe/gfe/room2/window_light")) devs.append(devices.tradfri_sw_br_ct(mc, "zigbee_gfe/gfe/room3/window_light")) # # Functionality # sm = day_states(day_states.STATE_DAY, logging.DEBUG) sm.register_state_change_callback(sm.STATE_NIGHT, None, switch_off, sm, devs) sm.register_state_change_callback(sm.STATE_MORNING, None, switch_on, sm, devs) sm.register_state_change_callback(sm.STATE_DAY, None, switch_off, sm, devs) sm.register_state_change_callback(sm.STATE_EVENING, None, switch_on, sm, devs) while True: sm.work() time.sleep(0.25)