smart_enlife/smart_enlife.py

130 wiersze
4.1 KiB
Python

import config
import devices
import geo
import logging
import geo.sun
import mqtt
import random
import report
import state_machine
import time
logger = logging.getLogger(config.APP_NAME)
class day_states(state_machine.state_machine):
LOG_PREFIX = 'Day states:'
STATE_NIGHT = 'state_night'
STATE_MORNING = 'state_morning'
STATE_DAY = 'state_day'
STATE_EVENING = 'state_evening'
CONDITION_NIGHT_END = 'condition_night_end'
CONDITION_MORNING_END = 'condition_morning_end'
CONDITION_DAY_END = 'condition_day_end'
CONDITION_EVENING_END = 'condition_evening_end'
TRANSITIONS = {
STATE_NIGHT: (
(CONDITION_NIGHT_END, 1, STATE_MORNING),
),
STATE_MORNING: (
(CONDITION_MORNING_END, 1, STATE_DAY),
),
STATE_DAY: (
(CONDITION_DAY_END, 1, STATE_EVENING),
),
STATE_EVENING: (
(CONDITION_EVENING_END, 1, STATE_NIGHT),
),
}
def condition_night_end(self):
ltime = time.localtime(time.time())
return ltime.tm_hour >= 5 and ltime.tm_min >= 20 and not self.condition_morning_end() # 2nd condition to ensure day change
def condition_morning_end(self):
ltime = time.mktime(time.localtime(time.time()))
sunrise = time.mktime(geo.sun.sunrise(config.GEO_POSITION)) + 30 * 60
return ltime > sunrise
def condition_day_end(self):
ltime = time.mktime(time.localtime(time.time()))
sunset = time.mktime(geo.sun.sunset(config.GEO_POSITION)) - 30 * 60
return ltime > sunset
def condition_evening_end(self):
ltime = time.localtime(time.time())
return ltime.tm_hour >= 20 and ltime.tm_min >= 30
def get_sorted_sw_offs(num):
SWITCH_DURATION = 30 * 60
rv = []
for i in range(0, num):
rv.append(random.randint(0, SWITCH_DURATION))
rv.sort()
return rv
def switch_x(state: bool, sm: day_states, mydevs: list):
tm = time.time()
random.shuffle(mydevs)
offsets = get_sorted_sw_offs(len(mydevs))
logger.info("State changed to %s with offsets = %s", repr(sm.this_state()), repr(offsets))
allowed_states = [sm.STATE_EVENING, sm.STATE_MORNING] if state else [sm.STATE_DAY, sm.STATE_NIGHT]
while (len(mydevs) > 0 and sm.this_state() in allowed_states):
sm.work()
dt = time.time() - tm
if dt > offsets[0]:
offsets.pop(0)
d: devices.tradfri_sw_br_ct = mydevs.pop()
logger.info("Swiching %s to state %s", d.topic, repr(state))
d.set_output_0(state)
time.sleep(0.25)
def switch_on(sm: day_states, devs: list):
switch_x(True, sm, devs[:])
def switch_off(sm: day_states, devs: list):
switch_x(False, sm, devs[:])
if __name__ == "__main__":
#
# Logging
#
if config.DEBUG:
report.appLoggingConfigure(None, 'stdout', ((config.APP_NAME, logging.DEBUG), ),
target_level=logging.WARNING, fmt=report.SHORT_FMT, host='localhost', port=19996)
else:
report.stdoutLoggingConfigure(((config.APP_NAME, config.LOGLEVEL), ), report.SHORT_FMT)
#
# MQTT Client
#
mc = mqtt.mqtt_client(host=config.MQTT_SERVER, port=config.MQTT_PORT, username=config.MQTT_USER,
password=config.MQTT_PASSWORD, name=config.APP_NAME)
#
# Smarthome physical Devices
devs = []
devs.append(devices.tradfri_sw_br_ct(mc, "zigbee_gfe/gfe/room1/window_light"))
devs.append(devices.tradfri_sw_br_ct(mc, "zigbee_gfe/gfe/room2/window_light"))
devs.append(devices.tradfri_sw_br_ct(mc, "zigbee_gfe/gfe/room3/window_light"))
#
# Functionality
#
sm = day_states(day_states.STATE_DAY, logging.DEBUG)
sm.register_state_change_callback(sm.STATE_NIGHT, None, switch_off, sm, devs)
sm.register_state_change_callback(sm.STATE_MORNING, None, switch_on, sm, devs)
sm.register_state_change_callback(sm.STATE_DAY, None, switch_off, sm, devs)
sm.register_state_change_callback(sm.STATE_EVENING, None, switch_on, sm, devs)
while True:
sm.work()
time.sleep(0.25)