#!/usr/bin/env python # -*- coding: utf-8 -*- # """ Communication (MQTT) shelly +- relay | +- 0 ["on" / "off"] <- status | | +- command ["on"/ "off"] <- command | | +- energy [numeric] <- status | +- 1 ["on" / "off"] <- status | +- command ["on"/ "off"] <- command | +- energy [numeric] <- status +- input | +- 0 [0 / 1] <- status | +- 1 [0 / 1] <- status +- input_event | +- 0 <- status | +- 1 <- status +- logpush | +- 0 [0 / 1] <- status | +- 1 [0 / 1] <- status +- temperature [numeric] °C <- status +- temperature_f [numeric] F <- status +- overtemperature [0 / 1] <- status +- id <- status +- model <- status +- mac <- status +- ip <- status +- new_fw <- status +- fw_ver <- status """ from devices.base import base import json import task class shelly_sw1(base): """A shelly device with switching functionality Args: mqtt_client (mqtt.mqtt_client): A MQTT Client instance topic (str): the base topic for this device kwargs (**dict): cd_r0=list of devices connected to relay/0 """ KEY_OUTPUT_0 = "relay/0" # PROPERTIES = [ KEY_OUTPUT_0, ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self[self.KEY_OUTPUT_0] = "off" # self.__auto_off__ = None # self.mqtt_client.add_callback(self.topic + '/' + self.KEY_OUTPUT_0 + '/command', self.__rx_set__) # cmd_base = self.topic.replace('/', '.') + '.' self.user_cmds = { cmd_base + 'toggle': self.__ui_toggle_output_0__, } def __rx_set__(self, client, userdata, message): data = message.payload.decode('utf-8') key = message.topic.split('/')[-3] + '/' + message.topic.split('/')[-2] self.logger.info("Received set data for %s: %s", key, repr(data)) self.__set__(key, data) def __set__(self, key, data): base.__set__(self, key, data) self.send_device_status(key) if key == self.KEY_OUTPUT_0: if data.lower() == "on": self.power_on(key) if self.__auto_off__ is not None: self.__auto_off__.run() else: if self.__auto_off__ is not None: self.__auto_off__.stop() def send_device_status(self, key): data = self[key] self.logger.info("Sending status for %s: %s", key, repr(data)) self.mqtt_client.send(self.topic + '/' + key, data) def auto_off(self, sec): self.__auto_off__ = task.delayed(sec, self.__auto_off_func__) def __auto_off_func__(self): if self.get(self.KEY_OUTPUT_0).lower() != 'off': self.__set__(self.KEY_OUTPUT_0, "off") self.send_device_status(self.KEY_OUTPUT_0) def __ui_toggle_output_0__(self, *args): self.__set__(self.KEY_OUTPUT_0, 'off' if self.get(self.KEY_OUTPUT_0).lower() == 'on' else 'on')