248 satır
7.5 KiB
Python
248 satır
7.5 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
from devices.base import base
|
|
import logging
|
|
|
|
|
|
class powerplug(base):
|
|
""" Communication (MQTT)
|
|
|
|
my_powerplug
|
|
+- output
|
|
+- 1 [True, False] <- status
|
|
| +- set [True, False, "toggle"] <- command
|
|
+- 2 [True, False] <- status
|
|
| +- set [True, False, "toggle"] <- command
|
|
+- 3 [True, False] <- status
|
|
| +- set [True, False, "toggle"] <- command
|
|
+- 4 [True, False] <- status
|
|
| +- set [True, False, "toggle"] <- command
|
|
+- all
|
|
+- set [True, False, "toggle"] <- command
|
|
"""
|
|
KEY_OUTPUT_0 = "output/1"
|
|
KEY_OUTPUT_1 = "output/2"
|
|
KEY_OUTPUT_2 = "output/3"
|
|
KEY_OUTPUT_3 = "output/4"
|
|
KEY_OUTPUT_ALL = "output/all"
|
|
KEY_OUTPUT_LIST = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3]
|
|
#
|
|
TX_TYPE = base.TX_VALUE
|
|
#
|
|
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3]
|
|
|
|
def __init__(self, mqtt_client, topic):
|
|
super().__init__(mqtt_client, topic)
|
|
|
|
#
|
|
# RX
|
|
#
|
|
@property
|
|
def output_0(self):
|
|
"""rv: [True, False]"""
|
|
return self.get(self.KEY_OUTPUT_0)
|
|
|
|
@property
|
|
def output_1(self):
|
|
"""rv: [True, False]"""
|
|
return self.get(self.KEY_OUTPUT_1)
|
|
|
|
@property
|
|
def output_2(self):
|
|
"""rv: [True, False]"""
|
|
return self.get(self.KEY_OUTPUT_2)
|
|
|
|
@property
|
|
def output_3(self):
|
|
"""rv: [True, False]"""
|
|
return self.get(self.KEY_OUTPUT_3)
|
|
|
|
#
|
|
# TX
|
|
#
|
|
def set_output(self, key, state):
|
|
if key in self.KEY_OUTPUT_LIST:
|
|
self.send_command(key, state)
|
|
else:
|
|
logging.error("Unknown key to set the output!")
|
|
|
|
def set_output_0(self, state):
|
|
"""state: [True, False]"""
|
|
self.send_command(self.KEY_OUTPUT_0, state)
|
|
|
|
def set_output_0_mcb(self, device, key, data):
|
|
self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data))
|
|
self.set_output_0(data)
|
|
|
|
def toggle_output_0_mcb(self, device, key, data):
|
|
self.logger.info("Toggeling output 0")
|
|
self.set_output_0(not self.output_0)
|
|
|
|
def set_output_1(self, state):
|
|
"""state: [True, False]"""
|
|
self.send_command(self.KEY_OUTPUT_1, state)
|
|
|
|
def set_output_1_mcb(self, device, key, data):
|
|
self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data))
|
|
self.set_output_1(data)
|
|
|
|
def toggle_output_1_mcb(self, device, key, data):
|
|
self.logger.info("Toggeling output 1")
|
|
self.set_output_1(not self.output_1)
|
|
|
|
def set_output_2(self, state):
|
|
"""state: [True, False]"""
|
|
self.send_command(self.KEY_OUTPUT_2, state)
|
|
|
|
def set_output_2_mcb(self, device, key, data):
|
|
self.logger.log(logging.INFO if data != self.output_2 else logging.DEBUG, "Changing output 2 to %s", str(data))
|
|
self.set_output_2(data)
|
|
|
|
def toggle_output_2_mcb(self, device, key, data):
|
|
self.logger.info("Toggeling output 2")
|
|
self.set_output_2(not self.output_2)
|
|
|
|
def set_output_3(self, state):
|
|
"""state: [True, False]"""
|
|
self.send_command(self.KEY_OUTPUT_3, state)
|
|
|
|
def set_output_3_mcb(self, device, key, data):
|
|
self.logger.log(logging.INFO if data != self.output_3 else logging.DEBUG, "Changing output 3 to %s", str(data))
|
|
self.set_output_3(data)
|
|
|
|
def toggle_output_3_mcb(self, device, key, data):
|
|
self.logger.info("Toggeling output 3")
|
|
self.set_output_3(not self.output_3)
|
|
|
|
def set_output_all(self, state):
|
|
"""state: [True, False, 'toggle']"""
|
|
self.send_command(self.KEY_OUTPUT_ALL, state)
|
|
|
|
def set_output_all_mcb(self, device, key, data):
|
|
self.logger.info("Changing all outputs to %s", str(data))
|
|
self.set_output_all(data)
|
|
|
|
def all_off(self):
|
|
self.set_output_all(False)
|
|
|
|
|
|
class remote(base):
|
|
""" Communication (MQTT)
|
|
|
|
remote (RAS5) <- command
|
|
+- CD [dc]
|
|
+- LINE1 [dc]
|
|
+- LINE2 [dc]
|
|
+- LINE3 [dc]
|
|
+- MUTE [dc]
|
|
+- POWER [dc]
|
|
+- VOLDOWN [dc]
|
|
+- VOLUP [dc]
|
|
+- PHONO [dc]
|
|
+- DOCK [dc]
|
|
|
|
remote (EUR642100) <- command
|
|
+- OPEN_CLOSE [dc]
|
|
+- VOLDOWN [dc]
|
|
+- VOLUP [dc]
|
|
+- ONE [dc]
|
|
+- TWO [dc]
|
|
+- THREE [dc]
|
|
+- FOUR [dc]
|
|
+- FIVE [dc]
|
|
+- SIX [dc]
|
|
+- SEVEN [dc]
|
|
+- EIGHT [dc]
|
|
+- NINE [dc]
|
|
+- ZERO [dc]
|
|
+- TEN [dc]
|
|
+- TEN_PLUS [dc]
|
|
+- PROGRAM [dc]
|
|
+- CLEAR [dc]
|
|
+- RECALL [dc]
|
|
+- TIME_MODE [dc]
|
|
+- A_B_REPEAT [dc]
|
|
+- REPEAT [dc]
|
|
+- RANDOM [dc]
|
|
+- AUTO_CUE [dc]
|
|
+- TAPE_LENGTH [dc]
|
|
+- SIDE_A_B [dc]
|
|
+- TIME_FADE [dc]
|
|
+- PEAK_SEARCH [dc]
|
|
+- SEARCH_BACK [dc]
|
|
+- SEARCH_FOR [dc]
|
|
+- TRACK_NEXT [dc]
|
|
+- TRACK_PREV [dc]
|
|
+- STOP [dc]
|
|
+- PAUSE [dc]
|
|
+- PLAY [dc]
|
|
"""
|
|
KEY_CD = "CD"
|
|
KEY_LINE1 = "LINE1"
|
|
KEY_LINE3 = "LINE3"
|
|
KEY_MUTE = "MUTE"
|
|
KEY_POWER = "POWER"
|
|
KEY_VOLDOWN = "VOLDOWN"
|
|
KEY_VOLUP = "VOLUP"
|
|
#
|
|
TX_TOPIC = ''
|
|
TX_TYPE = base.TX_VALUE
|
|
#
|
|
RX_IGNORE_TOPICS = [KEY_CD, KEY_LINE1, KEY_LINE3, KEY_MUTE, KEY_POWER, KEY_VOLUP, KEY_VOLDOWN]
|
|
|
|
def set_cd(self, device=None, key=None, data=None):
|
|
self.send_command(self.KEY_CD, None)
|
|
|
|
def set_line1(self, device=None, key=None, data=None):
|
|
self.send_command(self.KEY_LINE1, None)
|
|
|
|
def set_line3(self, device=None, key=None, data=None):
|
|
self.send_command(self.KEY_LINE3, None)
|
|
|
|
def set_mute(self, device=None, key=None, data=None):
|
|
self.send_command(self.KEY_MUTE, None)
|
|
|
|
def set_power(self, device=None, key=None, data=None):
|
|
self.send_command(self.KEY_POWER, None)
|
|
|
|
def set_volume_up(self, data=False):
|
|
"""data: [True, False]"""
|
|
self.send_command(self.KEY_VOLUP, data)
|
|
|
|
def set_volume_down(self, data=False):
|
|
"""data: [True, False]"""
|
|
self.send_command(self.KEY_VOLDOWN, data)
|
|
|
|
def default_inc(self, device=None, key=None, data=None):
|
|
self.set_volume_up(True)
|
|
|
|
def default_dec(self, device=None, key=None, data=None):
|
|
self.set_volume_down(True)
|
|
|
|
def default_stop(self, device=None, key=None, data=None):
|
|
self.set_volume_up(False)
|
|
|
|
|
|
class audio_status(base):
|
|
""" Communication (MQTT)
|
|
|
|
audio_status
|
|
+- state [True, False] <- status
|
|
+- title [text] <- status
|
|
"""
|
|
KEY_STATE = "state"
|
|
KEY_TITLE = "title"
|
|
#
|
|
TX_TYPE = base.TX_VALUE
|
|
#
|
|
RX_KEYS = [KEY_STATE, KEY_TITLE]
|
|
|
|
def set_state(self, num, data):
|
|
"""data: [True, False]"""
|
|
self.send_command(self.KEY_STATE + "/" + str(num), data)
|
|
|
|
def set_state_mcb(self, device, key, data):
|
|
self.logger.info("Changing state to %s", str(data))
|
|
self.set_state(data)
|