#!/usr/bin/env python # -*- coding: utf-8 -*- # from devices.base import base_output from devices.base import base_rpc import task class shelly(base_output): """ Communication (MQTT) shelly +- relay | +- 0 ["on" / "off"] <- status | | +- command ["on"/ "off"] <- command | | +- energy [numeric] <- status | +- 1 ["on" / "off"] <- status | +- command ["on"/ "off"] <- command | +- energy [numeric] <- status +- input | +- 0 [0 / 1] <- status | +- 1 [0 / 1] <- status +- input_event | +- 0 <- status | +- 1 <- status +- logpush | +- 0 [0 / 1] <- status | +- 1 [0 / 1] <- status +- temperature [numeric] °C <- status +- temperature_f [numeric] F <- status +- overtemperature [0 / 1] <- status +- id <- status +- model <- status +- mac <- status +- ip <- status +- new_fw <- status +- fw_ver <- status """ KEY_OUTPUT_0 = "relay/0" KEY_OUTPUT_1 = "relay/1" KEY_INPUT_0 = "input/0" KEY_INPUT_1 = "input/1" KEY_LONGPUSH_0 = "longpush/0" KEY_LONGPUSH_1 = "longpush/1" KEY_TEMPERATURE = "temperature" KEY_OVERTEMPERATURE = "overtemperature" KEY_ID = "id" KEY_MODEL = "model" KEY_MAC = "mac" KEY_IP = "ip" KEY_NEW_FIRMWARE = "new_fw" KEY_FIRMWARE_VERSION = "fw_ver" # TX_TOPIC = "command" TX_TYPE = base_output.TX_VALUE TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1] # RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE, KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION] RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1'] RX_IGNORE_KEYS = ['temperature_f'] RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # self.output_key_delayed = None self.delayed_flash_task = task.delayed(0.3, self.flash_task) self.delayed_off_task = task.delayed(0.3, self.off_task) # self.all_off_requested = False def __state_logging__(self, inst, key, data): if key in [self.KEY_OUTPUT_0, self.KEY_OUTPUT_1]: self.logger.info("State change of '%s' to '%s'", key, repr(data)) elif key in [self.KEY_INPUT_0, self.KEY_INPUT_1, self.KEY_LONGPUSH_0, self.KEY_LONGPUSH_1]: self.logger.info("Input action '%s' with '%s'", key, repr(data)) def flash_task(self, *args): if self.flash_active: self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed)) self.output_key_delayed = None if self.all_off_requested: self.delayed_off_task.run() def off_task(self, *args): self.all_off() @property def flash_active(self): return self.output_key_delayed is not None # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def output_1(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_1) @property def input_0(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_0) @property def input_1(self): """rv: [True, False]""" return self.get(self.KEY_INPUT_1) @property def longpush_0(self): """rv: [True, False]""" return self.get(self.KEY_LONGPUSH_0) @property def longpush_1(self): """rv: [True, False]""" return self.get(self.KEY_LONGPUSH_1) @property def temperature(self): """rv: numeric value""" return self.get(self.KEY_TEMPERATURE) # # TX # def set_output_0(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.set_output_0(not self.output_0) def set_output_1(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_1, state) def set_output_1_mcb(self, device, key, data): self.set_output_1(data) def toggle_output_1_mcb(self, device, key, data): self.set_output_1(not self.output_1) def flash_0_mcb(self, device, key, data): self.output_key_delayed = self.KEY_OUTPUT_0 self.toggle_output_0_mcb(device, key, data) self.delayed_flash_task.run() def flash_1_mcb(self, device, key, data): self.output_key_delayed = self.KEY_OUTPUT_1 self.toggle_output_1_mcb(device, key, data) self.delayed_flash_task.run() def __all_off__(self): if self.flash_active: self.all_off_requested = True else: if self.output_0: self.set_output_0(False) if self.output_1: self.set_output_1(False) class shelly_rpc(base_rpc): KEY_OUTPUT_0 = "switch:0" KEY_OUTPUT_1 = "switch:1" KEY_OUTPUT_2 = "switch:2" KEY_INPUT_0 = "input:0" KEY_INPUT_1 = "input:1" KEY_INPUT_2 = "input:2" KEY_LONGPUSH_0 = "input:0:long_push" KEY_LONGPUSH_1 = "input:1:long_push" KEY_LONGPUSH_2 = "input:2:long_push" KEY_SINGLEPUSH_0 = "input:0:single_push" KEY_SINGLEPUSH_1 = "input:1:single_push" KEY_SINGLEPUSH_2 = "input:2:single_push" KEY_DOUBLEPUSH_0 = "input:0:double_push" KEY_DOUBLEPUSH_1 = "input:1:double_push" KEY_DOUBLEPUSH_2 = "input:2:double_push" KEY_TRIPLEPUSH_0 = "input:0:triple_push" KEY_TRIPLEPUSH_1 = "input:1:triple_push" KEY_TRIPLEPUSH_2 = "input:2:triple_push" RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_INPUT_0, KEY_INPUT_1, KEY_INPUT_2, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_LONGPUSH_2, KEY_SINGLEPUSH_0, KEY_SINGLEPUSH_1, KEY_SINGLEPUSH_2, KEY_DOUBLEPUSH_0, KEY_DOUBLEPUSH_1, KEY_DOUBLEPUSH_2, KEY_TRIPLEPUSH_0, KEY_TRIPLEPUSH_1, KEY_TRIPLEPUSH_2] def __state_logging__(self, inst, key, data): if key in [self.KEY_OUTPUT_0, self.KEY_OUTPUT_1, self.KEY_OUTPUT_2]: self.logger.info("State change of '%s' to '%s'", key, repr(data)) elif key in [self.KEY_INPUT_0, self.KEY_INPUT_1, self.KEY_INPUT_2]: self.logger.info("Input action '%s' with '%s'", key, repr(data)) elif key in [self.KEY_LONGPUSH_0, self.KEY_LONGPUSH_1, self.KEY_LONGPUSH_2, self.KEY_SINGLEPUSH_0, self.KEY_SINGLEPUSH_1, self.KEY_SINGLEPUSH_2, self.KEY_DOUBLEPUSH_0, self.KEY_DOUBLEPUSH_1, self.KEY_DOUBLEPUSH_2, self.KEY_TRIPLEPUSH_0, self.KEY_TRIPLEPUSH_1, self.KEY_TRIPLEPUSH_2]: self.logger.info("Input action '%s'", key) def set_output_0(self, state): """state: [True, False]""" self.rpc_switch_set(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.set_output_0(not self.get(self.KEY_OUTPUT_0)) def set_output_1(self, state): """state: [True, False]""" self.rpc_switch_set(self.KEY_OUTPUT_1, state) def set_output_1_mcb(self, device, key, data): self.set_output_1(data) def toggle_output_1_mcb(self, device, key, data): print(self.get(self.KEY_OUTPUT_1)) self.set_output_1(not self.get(self.KEY_OUTPUT_1)) def set_output_2(self, state): """state: [True, False]""" self.rpc_switch_set(self.KEY_OUTPUT_2, state) def set_output_2_mcb(self, device, key, data): self.set_output_2(data) def toggle_output_2_mcb(self, device, key, data): self.set_output_2(not self.get(self.KEY_OUTPUT_2))