181 lines
6.1 KiB
Python
181 lines
6.1 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
from devices.base import base
|
|
from devices.base import warning
|
|
import logging
|
|
import task
|
|
|
|
|
|
class shelly(base):
|
|
""" Communication (MQTT)
|
|
|
|
shelly
|
|
+- relay
|
|
| +- 0 ["on" / "off"] <- status
|
|
| | +- command ["on"/ "off"] <- command
|
|
| | +- energy [numeric] <- status
|
|
| +- 1 ["on" / "off"] <- status
|
|
| +- command ["on"/ "off"] <- command
|
|
| +- energy [numeric] <- status
|
|
+- input
|
|
| +- 0 [0 / 1] <- status
|
|
| +- 1 [0 / 1] <- status
|
|
+- input_event
|
|
| +- 0 <- status
|
|
| +- 1 <- status
|
|
+- logpush
|
|
| +- 0 [0 / 1] <- status
|
|
| +- 1 [0 / 1] <- status
|
|
+- temperature [numeric] °C <- status
|
|
+- temperature_f [numeric] F <- status
|
|
+- overtemperature [0 / 1] <- status
|
|
+- id <- status
|
|
+- model <- status
|
|
+- mac <- status
|
|
+- ip <- status
|
|
+- new_fw <- status
|
|
+- fw_ver <- status
|
|
"""
|
|
KEY_OUTPUT_0 = "relay/0"
|
|
KEY_OUTPUT_1 = "relay/1"
|
|
KEY_INPUT_0 = "input/0"
|
|
KEY_INPUT_1 = "input/1"
|
|
KEY_LONGPUSH_0 = "longpush/0"
|
|
KEY_LONGPUSH_1 = "longpush/1"
|
|
KEY_TEMPERATURE = "temperature"
|
|
KEY_OVERTEMPERATURE = "overtemperature"
|
|
KEY_ID = "id"
|
|
KEY_MODEL = "model"
|
|
KEY_MAC = "mac"
|
|
KEY_IP = "ip"
|
|
KEY_NEW_FIRMWARE = "new_fw"
|
|
KEY_FIRMWARE_VERSION = "fw_ver"
|
|
#
|
|
TX_TOPIC = "command"
|
|
TX_TYPE = base.TX_VALUE
|
|
TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1]
|
|
#
|
|
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE,
|
|
KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION]
|
|
RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1']
|
|
RX_IGNORE_KEYS = ['temperature_f']
|
|
RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE]
|
|
|
|
def __init__(self, mqtt_client, topic):
|
|
super().__init__(mqtt_client, topic)
|
|
#
|
|
self.output_key_delayed = None
|
|
self.delayed_flash_task = task.delayed(0.3, self.flash_task)
|
|
self.delayed_off_task = task.delayed(0.3, self.off_task)
|
|
#
|
|
self.add_callback(self.KEY_OVERTEMPERATURE, True, self.__warning__, True)
|
|
#
|
|
self.all_off_requested = False
|
|
|
|
def flash_task(self, *args):
|
|
if self.flash_active:
|
|
self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed))
|
|
self.output_key_delayed = None
|
|
if self.all_off_requested:
|
|
self.delayed_off_task.run()
|
|
|
|
def off_task(self, *args):
|
|
self.all_off()
|
|
|
|
@property
|
|
def flash_active(self):
|
|
return self.output_key_delayed is not None
|
|
|
|
#
|
|
# WARNING CALL
|
|
#
|
|
def __warning__(self, client, key, data):
|
|
w = warning(self.topic, warning.TYPE_OVERTEMPERATURE, "Temperature to high (%.1f°C)", self.get(self.KEY_TEMPERATURE) or math.nan)
|
|
self.logger.warning(w)
|
|
self.set(self.KEY_WARNING, w)
|
|
|
|
#
|
|
# RX
|
|
#
|
|
@property
|
|
def output_0(self):
|
|
"""rv: [True, False]"""
|
|
return self.get(self.KEY_OUTPUT_0)
|
|
|
|
@property
|
|
def output_1(self):
|
|
"""rv: [True, False]"""
|
|
return self.get(self.KEY_OUTPUT_1)
|
|
|
|
@property
|
|
def input_0(self):
|
|
"""rv: [True, False]"""
|
|
return self.get(self.KEY_INPUT_0)
|
|
|
|
@property
|
|
def input_1(self):
|
|
"""rv: [True, False]"""
|
|
return self.get(self.KEY_INPUT_1)
|
|
|
|
@property
|
|
def longpush_0(self):
|
|
"""rv: [True, False]"""
|
|
return self.get(self.KEY_LONGPUSH_0)
|
|
|
|
@property
|
|
def longpush_1(self):
|
|
"""rv: [True, False]"""
|
|
return self.get(self.KEY_LONGPUSH_1)
|
|
|
|
@property
|
|
def temperature(self):
|
|
"""rv: numeric value"""
|
|
return self.get(self.KEY_TEMPERATURE)
|
|
|
|
#
|
|
# TX
|
|
#
|
|
def set_output_0(self, state):
|
|
"""state: [True, False]"""
|
|
self.send_command(self.KEY_OUTPUT_0, state)
|
|
|
|
def set_output_0_mcb(self, device, key, data):
|
|
self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data))
|
|
self.set_output_0(data)
|
|
|
|
def toggle_output_0_mcb(self, device, key, data):
|
|
self.logger.info("Toggeling output 0")
|
|
self.set_output_0(not self.output_0)
|
|
|
|
def set_output_1(self, state):
|
|
"""state: [True, False]"""
|
|
self.send_command(self.KEY_OUTPUT_1, state)
|
|
|
|
def set_output_1_mcb(self, device, key, data):
|
|
self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data))
|
|
self.set_output_1(data)
|
|
|
|
def toggle_output_1_mcb(self, device, key, data):
|
|
self.logger.info("Toggeling output 1")
|
|
self.set_output_1(not self.output_1)
|
|
|
|
def flash_0_mcb(self, device, key, data):
|
|
self.output_key_delayed = self.KEY_OUTPUT_0
|
|
self.toggle_output_0_mcb(device, key, data)
|
|
self.delayed_flash_task.run()
|
|
|
|
def flash_1_mcb(self, device, key, data):
|
|
self.output_key_delayed = self.KEY_OUTPUT_1
|
|
self.toggle_output_1_mcb(device, key, data)
|
|
self.delayed_flash_task.run()
|
|
|
|
def all_off(self):
|
|
if self.flash_active:
|
|
self.all_off_requested = True
|
|
else:
|
|
if self.output_0:
|
|
self.set_output_0(False)
|
|
if self.output_1:
|
|
self.set_output_1(False)
|