12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- """ Communication (MQTT)
- shelly
- +- relay
- | +- 0 ["on" / "off"] <- status
- | | +- command ["on"/ "off"] <- command
- | | +- energy [numeric] <- status
- | +- 1 ["on" / "off"] <- status
- | +- command ["on"/ "off"] <- command
- | +- energy [numeric] <- status
- +- input
- | +- 0 [0 / 1] <- status
- | +- 1 [0 / 1] <- status
- +- input_event
- | +- 0 <- status
- | +- 1 <- status
- +- logpush
- | +- 0 [0 / 1] <- status
- | +- 1 [0 / 1] <- status
- +- temperature [numeric] °C <- status
- +- temperature_f [numeric] F <- status
- +- overtemperature [0 / 1] <- status
- +- id <- status
- +- model <- status
- +- mac <- status
- +- ip <- status
- +- new_fw <- status
- +- fw_ver <- status
- """
-
- from devices.base import base
- import json
- import task
-
-
- class shelly_sw1(base):
- """A shelly device with switching functionality
-
- Args:
- mqtt_client (mqtt.mqtt_client): A MQTT Client instance
- topic (str): the base topic for this device
- kwargs (**dict): cd_r0=list of devices connected to relay/0
- """
- KEY_OUTPUT_0 = "relay/0"
- #
- PROPERTIES = [
- KEY_OUTPUT_0,
- ]
-
- def __init__(self, mqtt_client, topic):
- super().__init__(mqtt_client, topic)
- self[self.KEY_OUTPUT_0] = "off"
- #
- self.__auto_off__ = None
- #
- self.mqtt_client.add_callback(self.topic + '/' + self.KEY_OUTPUT_0 + '/command', self.__rx_set__)
- #
- cmd_base = self.topic.replace('/', '.') + '.'
- self.user_cmds = {
- cmd_base + 'toggle': self.__ui_toggle_output_0__,
- }
-
- def __rx_set__(self, client, userdata, message):
- data = message.payload.decode('utf-8')
- key = message.topic.split('/')[-3] + '/' + message.topic.split('/')[-2]
- self.logger.info("Received set data for %s: %s", key, repr(data))
- self.__set__(key, data)
-
- def __set__(self, key, data):
- base.__set__(self, key, data)
- self.send_device_status(key)
- if key == self.KEY_OUTPUT_0:
- if data.lower() == "on":
- self.power_on(key)
- if self.__auto_off__ is not None:
- self.__auto_off__.run()
- else:
- if self.__auto_off__ is not None:
- self.__auto_off__.stop()
-
- def send_device_status(self, key):
- data = self[key]
- self.logger.info("Sending status for %s: %s", key, repr(data))
- self.mqtt_client.send(self.topic + '/' + key, data)
-
- def auto_off(self, sec):
- self.__auto_off__ = task.delayed(sec, self.__auto_off_func__)
-
- def __auto_off_func__(self):
- if self.get(self.KEY_OUTPUT_0).lower() != 'off':
- self.__set__(self.KEY_OUTPUT_0, "off")
- self.send_device_status(self.KEY_OUTPUT_0)
-
- def __ui_toggle_output_0__(self, *args):
- self.__set__(self.KEY_OUTPUT_0, 'off' if self.get(self.KEY_OUTPUT_0).lower() == 'on' else 'on')
|