85 líneas
3.1 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
""" Communication (MQTT)
shelly
+- relay
| +- 0 ["on" / "off"] <- status
| | +- command ["on"/ "off"] <- command
| | +- energy [numeric] <- status
| +- 1 ["on" / "off"] <- status
| +- command ["on"/ "off"] <- command
| +- energy [numeric] <- status
+- input
| +- 0 [0 / 1] <- status
| +- 1 [0 / 1] <- status
+- input_event
| +- 0 <- status
| +- 1 <- status
+- logpush
| +- 0 [0 / 1] <- status
| +- 1 [0 / 1] <- status
+- temperature [numeric] °C <- status
+- temperature_f [numeric] F <- status
+- overtemperature [0 / 1] <- status
+- id <- status
+- model <- status
+- mac <- status
+- ip <- status
+- new_fw <- status
+- fw_ver <- status
"""
from devices.base import base
import json
import task
class shelly_sw1(base):
"""A shelly device with switching functionality
Args:
mqtt_client (mqtt.mqtt_client): A MQTT Client instance
topic (str): the base topic for this device
kwargs (**dict): cd_r0=list of devices connected to relay/0
"""
PROPERTIES = [
"relay/0",
]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
self["state"] = "off"
#
self.__auto_off__ = None
#
self.mqtt_client.add_callback(self.topic + '/relay/0/command', self.__rx_set__)
def __rx_set__(self, client, userdata, message):
data = message.payload.decode('utf-8')
key = message.topic.split('/')[-3] + '/' + message.topic.split('/')[-2]
self.logger.info("Received set data for %s: %s", key, repr(data))
self.__set__(key, data)
self.send_device_status(key)
if key == "relay/0":
if data.lower() == "on":
self.power_on(key)
if self.__auto_off__ is not None:
self.__auto_off__.run()
else:
if self.__auto_off__ is not None:
self.__auto_off__.stop()
def send_device_status(self, key):
data = self[key]
self.logger.info("Sending status for %s: %s", key, repr(data))
self.mqtt_client.send(self.topic + '/' + key, data)
def auto_off(self, sec):
self.__auto_off__ = task.delayed(sec, self.__auto_off_func__)
def __auto_off_func__(self):
if self.get(self.PROPERTIES[0]).lower() != 'off':
self.__set__(self.PROPERTIES[0], "off")
self.send_device_status(self.PROPERTIES[0])