123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- import config
- import devices
- import geo
- import logging
- import geo.sun
- import mqtt
- import random
- import report
- import state_machine
- import time
-
- logger = logging.getLogger(config.APP_NAME)
-
-
- class day_states(state_machine.state_machine):
- LOG_PREFIX = 'Day states:'
-
- STATE_NIGHT = 'state_night'
- STATE_MORNING = 'state_morning'
- STATE_DAY = 'state_day'
- STATE_EVENING = 'state_evening'
-
- CONDITION_NIGHT_END = 'condition_night_end'
- CONDITION_MORNING_END = 'condition_morning_end'
- CONDITION_DAY_END = 'condition_day_end'
- CONDITION_EVENING_END = 'condition_evening_end'
-
- TRANSITIONS = {
- STATE_NIGHT: (
- (CONDITION_NIGHT_END, 1, STATE_MORNING),
- ),
- STATE_MORNING: (
- (CONDITION_MORNING_END, 1, STATE_DAY),
- ),
- STATE_DAY: (
- (CONDITION_DAY_END, 1, STATE_EVENING),
- ),
- STATE_EVENING: (
- (CONDITION_EVENING_END, 1, STATE_NIGHT),
- ),
- }
-
- def condition_night_end(self):
- ltime = time.localtime(time.time())
- return ltime.tm_hour >= 5 and ltime.tm_min >= 20 and not self.condition_morning_end()
-
- def condition_morning_end(self):
- ltime = time.mktime(time.localtime(time.time()))
- sunrise = time.mktime(geo.sun.sunrise(config.GEO_POSITION)) + 30 * 60
- return ltime > sunrise
-
- def condition_day_end(self):
- ltime = time.mktime(time.localtime(time.time()))
- sunset = time.mktime(geo.sun.sunset(config.GEO_POSITION)) - 30 * 60
- return ltime > sunset
-
- def condition_evening_end(self):
- ltime = time.localtime(time.time())
- return ltime.tm_hour >= 20 and ltime.tm_min >= 30
-
-
- def get_sorted_sw_offs(num):
- SWITCH_DURATION = 30 * 60
- rv = []
- for i in range(0, num):
- rv.append(random.randint(0, SWITCH_DURATION))
- rv.sort()
- return rv
-
-
- def switch_x(state: bool, sm: day_states, mydevs: list):
- tm = time.time()
- random.shuffle(mydevs)
- offsets = get_sorted_sw_offs(len(mydevs))
- logger.info("State changed to %s with offsets = %s", repr(sm.this_state()), repr(offsets))
- allowed_states = [sm.STATE_EVENING, sm.STATE_MORNING] if state else [sm.STATE_DAY, sm.STATE_NIGHT]
- while (len(mydevs) > 0 and sm.this_state() in allowed_states):
- sm.work()
- dt = time.time() - tm
- if dt > offsets[0]:
- offsets.pop(0)
- d: devices.tradfri_sw_br_ct = mydevs.pop()
- logger.info("Swiching %s to state %s", d.topic, repr(state))
- d.set_output_0(state)
- time.sleep(0.25)
-
-
- def switch_on(sm: day_states, devs: list):
- switch_x(True, sm, devs[:])
-
-
- def switch_off(sm: day_states, devs: list):
- switch_x(False, sm, devs[:])
-
-
- if __name__ == "__main__":
-
-
-
- if config.DEBUG:
- report.appLoggingConfigure(None, 'stdout', ((config.APP_NAME, logging.DEBUG), ),
- target_level=logging.WARNING, fmt=report.SHORT_FMT, host='localhost', port=19996)
- else:
- report.stdoutLoggingConfigure(((config.APP_NAME, config.LOGLEVEL), ), report.SHORT_FMT)
-
-
-
-
- mc = mqtt.mqtt_client(host=config.MQTT_SERVER, port=config.MQTT_PORT, username=config.MQTT_USER,
- password=config.MQTT_PASSWORD, name=config.APP_NAME)
-
-
-
- devs = []
- devs.append(devices.tradfri_sw_br_ct(mc, "zigbee_gfe/gfe/room1/window_light"))
- devs.append(devices.tradfri_sw_br_ct(mc, "zigbee_gfe/gfe/room2/window_light"))
- devs.append(devices.tradfri_sw_br_ct(mc, "zigbee_gfe/gfe/room3/window_light"))
-
-
-
-
- sm = day_states(day_states.STATE_DAY, logging.DEBUG)
- sm.register_state_change_callback(sm.STATE_NIGHT, None, switch_off, sm, devs)
- sm.register_state_change_callback(sm.STATE_MORNING, None, switch_on, sm, devs)
- sm.register_state_change_callback(sm.STATE_DAY, None, switch_off, sm, devs)
- sm.register_state_change_callback(sm.STATE_EVENING, None, switch_on, sm, devs)
- while True:
- sm.work()
- time.sleep(0.25)
|