smart_brain/devices/mydevices.py

248 行
7.5 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
from devices.base import base
import logging
class powerplug(base):
""" Communication (MQTT)
my_powerplug
+- output
+- 1 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- 2 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- 3 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- 4 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- all
+- set [True, False, "toggle"] <- command
"""
KEY_OUTPUT_0 = "output/1"
KEY_OUTPUT_1 = "output/2"
KEY_OUTPUT_2 = "output/3"
KEY_OUTPUT_3 = "output/4"
KEY_OUTPUT_ALL = "output/all"
KEY_OUTPUT_LIST = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3]
#
TX_TYPE = base.TX_VALUE
#
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3]
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic)
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def output_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_1)
@property
def output_2(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_2)
@property
def output_3(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_3)
#
# TX
#
def set_output(self, key, state):
if key in self.KEY_OUTPUT_LIST:
self.send_command(key, state)
else:
logging.error("Unknown key to set the output!")
def set_output_0(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_0, state)
def set_output_0_mcb(self, device, key, data):
self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data))
self.set_output_0(data)
def toggle_output_0_mcb(self, device, key, data):
self.logger.info("Toggeling output 0")
self.set_output_0(not self.output_0)
def set_output_1(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_1, state)
def set_output_1_mcb(self, device, key, data):
self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data))
self.set_output_1(data)
def toggle_output_1_mcb(self, device, key, data):
self.logger.info("Toggeling output 1")
self.set_output_1(not self.output_1)
def set_output_2(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_2, state)
def set_output_2_mcb(self, device, key, data):
self.logger.log(logging.INFO if data != self.output_2 else logging.DEBUG, "Changing output 2 to %s", str(data))
self.set_output_2(data)
def toggle_output_2_mcb(self, device, key, data):
self.logger.info("Toggeling output 2")
self.set_output_2(not self.output_2)
def set_output_3(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_3, state)
def set_output_3_mcb(self, device, key, data):
self.logger.log(logging.INFO if data != self.output_3 else logging.DEBUG, "Changing output 3 to %s", str(data))
self.set_output_3(data)
def toggle_output_3_mcb(self, device, key, data):
self.logger.info("Toggeling output 3")
self.set_output_3(not self.output_3)
def set_output_all(self, state):
"""state: [True, False, 'toggle']"""
self.send_command(self.KEY_OUTPUT_ALL, state)
def set_output_all_mcb(self, device, key, data):
self.logger.info("Changing all outputs to %s", str(data))
self.set_output_all(data)
def all_off(self):
self.set_output_all(False)
class remote(base):
""" Communication (MQTT)
remote (RAS5) <- command
+- CD [dc]
+- LINE1 [dc]
+- LINE2 [dc]
+- LINE3 [dc]
+- MUTE [dc]
+- POWER [dc]
+- VOLDOWN [dc]
+- VOLUP [dc]
+- PHONO [dc]
+- DOCK [dc]
remote (EUR642100) <- command
+- OPEN_CLOSE [dc]
+- VOLDOWN [dc]
+- VOLUP [dc]
+- ONE [dc]
+- TWO [dc]
+- THREE [dc]
+- FOUR [dc]
+- FIVE [dc]
+- SIX [dc]
+- SEVEN [dc]
+- EIGHT [dc]
+- NINE [dc]
+- ZERO [dc]
+- TEN [dc]
+- TEN_PLUS [dc]
+- PROGRAM [dc]
+- CLEAR [dc]
+- RECALL [dc]
+- TIME_MODE [dc]
+- A_B_REPEAT [dc]
+- REPEAT [dc]
+- RANDOM [dc]
+- AUTO_CUE [dc]
+- TAPE_LENGTH [dc]
+- SIDE_A_B [dc]
+- TIME_FADE [dc]
+- PEAK_SEARCH [dc]
+- SEARCH_BACK [dc]
+- SEARCH_FOR [dc]
+- TRACK_NEXT [dc]
+- TRACK_PREV [dc]
+- STOP [dc]
+- PAUSE [dc]
+- PLAY [dc]
"""
KEY_CD = "CD"
KEY_LINE1 = "LINE1"
KEY_LINE3 = "LINE3"
KEY_MUTE = "MUTE"
KEY_POWER = "POWER"
KEY_VOLDOWN = "VOLDOWN"
KEY_VOLUP = "VOLUP"
#
TX_TOPIC = ''
TX_TYPE = base.TX_VALUE
#
RX_IGNORE_TOPICS = [KEY_CD, KEY_LINE1, KEY_LINE3, KEY_MUTE, KEY_POWER, KEY_VOLUP, KEY_VOLDOWN]
def set_cd(self, device=None, key=None, data=None):
self.send_command(self.KEY_CD, None)
def set_line1(self, device=None, key=None, data=None):
self.send_command(self.KEY_LINE1, None)
def set_line3(self, device=None, key=None, data=None):
self.send_command(self.KEY_LINE3, None)
def set_mute(self, device=None, key=None, data=None):
self.send_command(self.KEY_MUTE, None)
def set_power(self, device=None, key=None, data=None):
self.send_command(self.KEY_POWER, None)
def set_volume_up(self, data=False):
"""data: [True, False]"""
self.send_command(self.KEY_VOLUP, data)
def set_volume_down(self, data=False):
"""data: [True, False]"""
self.send_command(self.KEY_VOLDOWN, data)
def default_inc(self, device=None, key=None, data=None):
self.set_volume_up(True)
def default_dec(self, device=None, key=None, data=None):
self.set_volume_down(True)
def default_stop(self, device=None, key=None, data=None):
self.set_volume_up(False)
class audio_status(base):
""" Communication (MQTT)
audio_status
+- state [True, False] <- status
+- title [text] <- status
"""
KEY_STATE = "state"
KEY_TITLE = "title"
#
TX_TYPE = base.TX_VALUE
#
RX_KEYS = [KEY_STATE, KEY_TITLE]
def set_state(self, num, data):
"""data: [True, False]"""
self.send_command(self.KEY_STATE + "/" + str(num), data)
def set_state_mcb(self, device, key, data):
self.logger.info("Changing state to %s", str(data))
self.set_state(data)