123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- from devices.base import base_output
- from devices.base import base_rpc
- import task
-
-
- class shelly(base_output):
- """ Communication (MQTT)
-
- shelly
- +- relay
- | +- 0 ["on" / "off"] <- status
- | | +- command ["on"/ "off"] <- command
- | | +- energy [numeric] <- status
- | +- 1 ["on" / "off"] <- status
- | +- command ["on"/ "off"] <- command
- | +- energy [numeric] <- status
- +- input
- | +- 0 [0 / 1] <- status
- | +- 1 [0 / 1] <- status
- +- input_event
- | +- 0 <- status
- | +- 1 <- status
- +- logpush
- | +- 0 [0 / 1] <- status
- | +- 1 [0 / 1] <- status
- +- temperature [numeric] °C <- status
- +- temperature_f [numeric] F <- status
- +- overtemperature [0 / 1] <- status
- +- id <- status
- +- model <- status
- +- mac <- status
- +- ip <- status
- +- new_fw <- status
- +- fw_ver <- status
- """
- KEY_OUTPUT_0 = "relay/0"
- KEY_OUTPUT_1 = "relay/1"
- KEY_INPUT_0 = "input/0"
- KEY_INPUT_1 = "input/1"
- KEY_LONGPUSH_0 = "longpush/0"
- KEY_LONGPUSH_1 = "longpush/1"
- KEY_TEMPERATURE = "temperature"
- KEY_OVERTEMPERATURE = "overtemperature"
- KEY_ID = "id"
- KEY_MODEL = "model"
- KEY_MAC = "mac"
- KEY_IP = "ip"
- KEY_NEW_FIRMWARE = "new_fw"
- KEY_FIRMWARE_VERSION = "fw_ver"
- #
- TX_TOPIC = "command"
- TX_TYPE = base_output.TX_VALUE
- TX_FILTER_DATA_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1]
- #
- RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OVERTEMPERATURE, KEY_TEMPERATURE,
- KEY_ID, KEY_MODEL, KEY_MAC, KEY_IP, KEY_NEW_FIRMWARE, KEY_FIRMWARE_VERSION]
- RX_IGNORE_TOPICS = [KEY_OUTPUT_0 + '/' + "energy", KEY_OUTPUT_1 + '/' + "energy", 'input_event/0', 'input_event/1']
- RX_IGNORE_KEYS = ['temperature_f']
- RX_FILTER_DATA_KEYS = [KEY_INPUT_0, KEY_INPUT_1, KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OVERTEMPERATURE]
-
- def __init__(self, mqtt_client, topic):
- super().__init__(mqtt_client, topic)
- #
- self.output_key_delayed = None
- self.delayed_flash_task = task.delayed(0.3, self.flash_task)
- self.delayed_off_task = task.delayed(0.3, self.off_task)
- #
- self.all_off_requested = False
-
- def __state_logging__(self, inst, key, data):
- if key in [self.KEY_OUTPUT_0, self.KEY_OUTPUT_1]:
- self.logger.info("State change of '%s' to '%s'", key, repr(data))
- elif key in [self.KEY_INPUT_0, self.KEY_INPUT_1, self.KEY_LONGPUSH_0, self.KEY_LONGPUSH_1]:
- self.logger.info("Input action '%s' with '%s'", key, repr(data))
-
- def flash_task(self, *args):
- if self.flash_active:
- self.send_command(self.output_key_delayed, not self.get(self.output_key_delayed))
- self.output_key_delayed = None
- if self.all_off_requested:
- self.delayed_off_task.run()
-
- def off_task(self, *args):
- self.all_off()
-
- @property
- def flash_active(self):
- return self.output_key_delayed is not None
-
- #
- # RX
- #
- @property
- def output_0(self):
- """rv: [True, False]"""
- return self.get(self.KEY_OUTPUT_0)
-
- @property
- def output_1(self):
- """rv: [True, False]"""
- return self.get(self.KEY_OUTPUT_1)
-
- @property
- def input_0(self):
- """rv: [True, False]"""
- return self.get(self.KEY_INPUT_0)
-
- @property
- def input_1(self):
- """rv: [True, False]"""
- return self.get(self.KEY_INPUT_1)
-
- @property
- def longpush_0(self):
- """rv: [True, False]"""
- return self.get(self.KEY_LONGPUSH_0)
-
- @property
- def longpush_1(self):
- """rv: [True, False]"""
- return self.get(self.KEY_LONGPUSH_1)
-
- @property
- def temperature(self):
- """rv: numeric value"""
- return self.get(self.KEY_TEMPERATURE)
-
- #
- # TX
- #
- def set_output_0(self, state):
- """state: [True, False]"""
- self.send_command(self.KEY_OUTPUT_0, state)
-
- def set_output_0_mcb(self, device, key, data):
- self.set_output_0(data)
-
- def toggle_output_0_mcb(self, device, key, data):
- self.set_output_0(not self.output_0)
-
- def set_output_1(self, state):
- """state: [True, False]"""
- self.send_command(self.KEY_OUTPUT_1, state)
-
- def set_output_1_mcb(self, device, key, data):
- self.set_output_1(data)
-
- def toggle_output_1_mcb(self, device, key, data):
- self.set_output_1(not self.output_1)
-
- def flash_0_mcb(self, device, key, data):
- self.output_key_delayed = self.KEY_OUTPUT_0
- self.toggle_output_0_mcb(device, key, data)
- self.delayed_flash_task.run()
-
- def flash_1_mcb(self, device, key, data):
- self.output_key_delayed = self.KEY_OUTPUT_1
- self.toggle_output_1_mcb(device, key, data)
- self.delayed_flash_task.run()
-
- def __all_off__(self):
- if self.flash_active:
- self.all_off_requested = True
- else:
- if self.output_0:
- self.set_output_0(False)
- if self.output_1:
- self.set_output_1(False)
-
-
- class shelly_rpc(base_rpc):
- KEY_OUTPUT_0 = "switch:0"
- KEY_OUTPUT_1 = "switch:1"
- KEY_OUTPUT_2 = "switch:2"
- KEY_INPUT_0 = "input:0"
- KEY_INPUT_1 = "input:1"
- KEY_INPUT_2 = "input:2"
- KEY_LONGPUSH_0 = "input:0:long_push"
- KEY_LONGPUSH_1 = "input:1:long_push"
- KEY_LONGPUSH_2 = "input:2:long_push"
- KEY_SINGLEPUSH_0 = "input:0:single_push"
- KEY_SINGLEPUSH_1 = "input:1:single_push"
- KEY_SINGLEPUSH_2 = "input:2:single_push"
- KEY_DOUBLEPUSH_0 = "input:0:double_push"
- KEY_DOUBLEPUSH_1 = "input:1:double_push"
- KEY_DOUBLEPUSH_2 = "input:2:double_push"
- KEY_TRIPLEPUSH_0 = "input:0:triple_push"
- KEY_TRIPLEPUSH_1 = "input:1:triple_push"
- KEY_TRIPLEPUSH_2 = "input:2:triple_push"
-
- RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_INPUT_0, KEY_INPUT_1, KEY_INPUT_2,
- KEY_LONGPUSH_0, KEY_LONGPUSH_1, KEY_LONGPUSH_2, KEY_SINGLEPUSH_0, KEY_SINGLEPUSH_1, KEY_SINGLEPUSH_2,
- KEY_DOUBLEPUSH_0, KEY_DOUBLEPUSH_1, KEY_DOUBLEPUSH_2, KEY_TRIPLEPUSH_0, KEY_TRIPLEPUSH_1, KEY_TRIPLEPUSH_2]
-
- def __state_logging__(self, inst, key, data):
- if key in [self.KEY_OUTPUT_0, self.KEY_OUTPUT_1, self.KEY_OUTPUT_2]:
- self.logger.info("State change of '%s' to '%s'", key, repr(data))
- elif key in [self.KEY_INPUT_0, self.KEY_INPUT_1, self.KEY_INPUT_2]:
- self.logger.info("Input action '%s' with '%s'", key, repr(data))
- elif key in [self.KEY_LONGPUSH_0, self.KEY_LONGPUSH_1, self.KEY_LONGPUSH_2,
- self.KEY_SINGLEPUSH_0, self.KEY_SINGLEPUSH_1, self.KEY_SINGLEPUSH_2,
- self.KEY_DOUBLEPUSH_0, self.KEY_DOUBLEPUSH_1, self.KEY_DOUBLEPUSH_2,
- self.KEY_TRIPLEPUSH_0, self.KEY_TRIPLEPUSH_1, self.KEY_TRIPLEPUSH_2]:
- self.logger.info("Input action '%s'", key)
-
- def set_output_0(self, state):
- """state: [True, False]"""
- self.rpc_switch_set(self.KEY_OUTPUT_0, state)
-
- def set_output_0_mcb(self, device, key, data):
- self.set_output_0(data)
-
- def toggle_output_0_mcb(self, device, key, data):
- self.set_output_0(not self.get(self.KEY_OUTPUT_0))
-
- def set_output_1(self, state):
- """state: [True, False]"""
- self.rpc_switch_set(self.KEY_OUTPUT_1, state)
-
- def set_output_1_mcb(self, device, key, data):
- self.set_output_1(data)
-
- def toggle_output_1_mcb(self, device, key, data):
- print(self.get(self.KEY_OUTPUT_1))
- self.set_output_1(not self.get(self.KEY_OUTPUT_1))
-
- def set_output_2(self, state):
- """state: [True, False]"""
- self.rpc_switch_set(self.KEY_OUTPUT_2, state)
-
- def set_output_2_mcb(self, device, key, data):
- self.set_output_2(data)
-
- def toggle_output_2_mcb(self, device, key, data):
- self.set_output_2(not self.get(self.KEY_OUTPUT_2))
|