1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- """ Communication (MQTT)
- shelly
- +- relay
- | +- 0 ["on" / "off"] <- status
- | | +- command ["on"/ "off"] <- command
- | | +- energy [numeric] <- status
- | +- 1 ["on" / "off"] <- status
- | +- command ["on"/ "off"] <- command
- | +- energy [numeric] <- status
- +- input
- | +- 0 [0 / 1] <- status
- | +- 1 [0 / 1] <- status
- +- input_event
- | +- 0 <- status
- | +- 1 <- status
- +- logpush
- | +- 0 [0 / 1] <- status
- | +- 1 [0 / 1] <- status
- +- temperature [numeric] °C <- status
- +- temperature_f [numeric] F <- status
- +- overtemperature [0 / 1] <- status
- +- id <- status
- +- model <- status
- +- mac <- status
- +- ip <- status
- +- new_fw <- status
- +- fw_ver <- status
- """
-
- from devices.base import base
- import json
- import task
-
-
- class shelly_sw1(base):
- """A shelly device with switching functionality
-
- Args:
- mqtt_client (mqtt.mqtt_client): A MQTT Client instance
- topic (str): the base topic for this device
- kwargs (**dict): cd_r0=list of devices connected to relay/0
- """
- PROPERTIES = [
- "relay/0",
- ]
- def __init__(self, mqtt_client, topic):
- super().__init__(mqtt_client, topic)
- self["state"] = "off"
- #
- self.__auto_off__ = None
- #
- self.mqtt_client.add_callback(self.topic + '/relay/0/command', self.__rx_set__)
-
- def __rx_set__(self, client, userdata, message):
- data = message.payload.decode('utf-8')
- key = message.topic.split('/')[-3] + '/' + message.topic.split('/')[-2]
- self.logger.info("Received set data for %s: %s", key, repr(data))
- self.__set__(key, data)
- self.send_device_status(key)
- if key == "relay/0":
- if data.lower() == "on":
- self.power_on()
- if self.__auto_off__ is not None:
- self.__auto_off__.run()
- else:
- if self.__auto_off__ is not None:
- self.__auto_off__.stop()
-
- def send_device_status(self, key):
- data = self[key]
- self.logger.info("Sending status for %s: %s", key, repr(data))
- self.mqtt_client.send(self.topic + '/' + key, data)
-
- def auto_off(self, sec):
- self.__auto_off__ = task.delayed(sec, self.__auto_off_func__)
-
- def __auto_off_func__(self):
- if self.get(self.PROPERTIES[0]).lower() != 'off':
- self.__set__(self.PROPERTIES[0], "off")
- self.send_device_status(self.PROPERTIES[0])
|