#!/usr/bin/env python # -*- coding: utf-8 -*- # from devices.base import base from devices.base import warning import logging import task class shelly(base): """ Communication (MQTT) shelly +- relay | +- 0 ["on" / "off"] <- status | | +- command ["on"/ "off"] <- command | | +- energy [numeric] <- status | +- 1 ["on" / "off"] <- status | +- command ["on"/ "off"] <- command | +- energy [numeric] <- status +- input | +- 0 [0 / 1] <- status | +- 1 [0 / 1] <- status +- input_event | +- 0 <- status | +- 1 <- status +- logpush | +- 0 [0 / 1] <- status | +- 1 [0 / 1] <- status +- temperature [numeric] °C <- status +- temperature_f [numeric] F <- status +- overtemperature [0 / 1] <- status +- id <- status +- model <- status +- mac <- status +- ip <- status +- new_fw <- status +- fw_ver <- status """ KEY_OUTPUT_0 = "relay/0" KEY_OUTPUT_1 = "relay/1" KEY_INPUT_0 = "input/0" KEY_INPUT_1 = "input/1" KEY_LONGPUSH_0 = "longpush/0" KEY_LONGPUSH_1 = "longpush/1" KEY_TEMPERATURE = "temperature" KEY_OVERTEMPERATURE = "overtemperature" KEY_ID = "id" KEY_MODEL = "model" KEY_MAC = "mac" KEY_IP = "ip" KEY_NEW_FIRMWARE = "new_fw" KEY_FIRMWARE_VERSION = "fw_ver" # TX_TOPIC = "command" TX_TYPE = base.TX_VALUE TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1] # RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE, KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION] RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1'] RX_IGNORE_KEYS = ['temperature_f'] RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # self.output_key_delayed = None self.delayed_flash_task = task.delayed(0.3, self.flash_task) self.delayed_off_task = task.delayed(0.3, self.off_task) # self.add_callback(self.KEY_OVERTEMPERATURE, True, self.__warning__, True) # self.all_off_requested = False def __state_logging__(self, inst, key, data): if key in [self.KEY_OUTPUT_0, self.KEY_OUTPUT_1]: self.logger.info("State change of '%s' to '%s'", key, repr(data)) elif key in [self.KEY_INPUT_0, self.KEY_INPUT_1, self.KEY_LONGPUSH_0, self.KEY_LONGPUSH_1]: self.logger.info("Input action '%s' with '%s'", key, repr(data)) def flash_task(self, *args): if self.flash_active: self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed)) self.output_key_delayed = None if self.all_off_requested: self.delayed_off_task.run() def off_task(self, *args): self.all_off() @property def flash_active(self): return self.output_key_delayed is not None # # WARNING CALL # def __warning__(self, client, key, data): w = warning(self.topic, warning.TYPE_OVERTEMPERATURE, "Temperature to high (%.1f°C)", self.get(self.KEY_TEMPERATURE) or math.nan) self.logger.warning(w) self.set(self.KEY_WARNING, w) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def output_1(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_1) @property def input_0(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_0) @property def input_1(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_1) @property def longpush_0(self): """rv: [True, False]""" return self.get(self.KEY_LONGPUSH_0) @property def longpush_1(self): """rv: [True, False]""" return self.get(self.KEY_LONGPUSH_1) @property def temperature(self): """rv: numeric value""" return self.get(self.KEY_TEMPERATURE) # # TX # def set_output_0(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.set_output_0(not self.output_0) def set_output_1(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_1, state) def set_output_1_mcb(self, device, key, data): self.set_output_1(data) def toggle_output_1_mcb(self, device, key, data): self.set_output_1(not self.output_1) def flash_0_mcb(self, device, key, data): self.output_key_delayed = self.KEY_OUTPUT_0 self.toggle_output_0_mcb(device, key, data) self.delayed_flash_task.run() def flash_1_mcb(self, device, key, data): self.output_key_delayed = self.KEY_OUTPUT_1 self.toggle_output_1_mcb(device, key, data) self.delayed_flash_task.run() def all_off(self): if self.flash_active: self.all_off_requested = True else: if self.output_0: self.set_output_0(False) if self.output_1: self.set_output_1(False)