smart_brain/devices/mydevices.py

253 rader
7.5 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
from devices.base import base
import logging
class powerplug(base):
""" Communication (MQTT)
my_powerplug
+- output
+- 1 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- 2 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- 3 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- 4 [True, False] <- status
| +- set [True, False, "toggle"] <- command
+- all
+- set [True, False, "toggle"] <- command
"""
KEY_OUTPUT_0 = "output/1"
KEY_OUTPUT_1 = "output/2"
KEY_OUTPUT_2 = "output/3"
KEY_OUTPUT_3 = "output/4"
KEY_OUTPUT_ALL = "output/all"
KEY_OUTPUT_LIST = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3]
#
TX_TYPE = base.TX_VALUE
#
RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3]
def __state_logging__(self, inst, key, data):
if key in self.KEY_OUTPUT_LIST:
self.logger.info("State change of '%s' to '%s'", key, repr(data))
#
# RX
#
@property
def output_0(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_0)
@property
def output_1(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_1)
@property
def output_2(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_2)
@property
def output_3(self):
"""rv: [True, False]"""
return self.get(self.KEY_OUTPUT_3)
#
# TX
#
def set_output(self, key, state):
if key in self.KEY_OUTPUT_LIST:
self.send_command(key, state)
else:
logging.error("Unknown key to set the output!")
def set_output_0(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_0, state)
def set_output_0_mcb(self, device, key, data):
self.set_output_0(data)
def toggle_output_0_mcb(self, device, key, data):
self.set_output_0(not self.output_0)
def set_output_1(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_1, state)
def set_output_1_mcb(self, device, key, data):
self.set_output_1(data)
def toggle_output_1_mcb(self, device, key, data):
self.set_output_1(not self.output_1)
def set_output_2(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_2, state)
def set_output_2_mcb(self, device, key, data):
self.set_output_2(data)
def toggle_output_2_mcb(self, device, key, data):
self.set_output_2(not self.output_2)
def set_output_3(self, state):
"""state: [True, False]"""
self.send_command(self.KEY_OUTPUT_3, state)
def set_output_3_mcb(self, device, key, data):
self.set_output_3(data)
def toggle_output_3_mcb(self, device, key, data):
self.set_output_3(not self.output_3)
def set_output_all(self, state):
"""state: [True, False, 'toggle']"""
self.send_command(self.KEY_OUTPUT_ALL, state)
def set_output_all_mcb(self, device, key, data):
self.set_output_all(data)
def all_off(self):
self.set_output_all(False)
class remote(base):
""" Communication (MQTT)
remote (RAS5) <- command
+- CD [dc]
+- LINE1 [dc]
+- LINE2 [dc]
+- LINE3 [dc]
+- MUTE [dc]
+- POWER [dc]
+- VOLDOWN [dc]
+- VOLUP [dc]
+- PHONO [dc]
+- DOCK [dc]
remote (EUR642100) <- command
+- OPEN_CLOSE [dc]
+- VOLDOWN [dc]
+- VOLUP [dc]
+- ONE [dc]
+- TWO [dc]
+- THREE [dc]
+- FOUR [dc]
+- FIVE [dc]
+- SIX [dc]
+- SEVEN [dc]
+- EIGHT [dc]
+- NINE [dc]
+- ZERO [dc]
+- TEN [dc]
+- TEN_PLUS [dc]
+- PROGRAM [dc]
+- CLEAR [dc]
+- RECALL [dc]
+- TIME_MODE [dc]
+- A_B_REPEAT [dc]
+- REPEAT [dc]
+- RANDOM [dc]
+- AUTO_CUE [dc]
+- TAPE_LENGTH [dc]
+- SIDE_A_B [dc]
+- TIME_FADE [dc]
+- PEAK_SEARCH [dc]
+- SEARCH_BACK [dc]
+- SEARCH_FOR [dc]
+- TRACK_NEXT [dc]
+- TRACK_PREV [dc]
+- STOP [dc]
+- PAUSE [dc]
+- PLAY [dc]
"""
KEY_CD = "CD"
KEY_LINE1 = "LINE1"
KEY_LINE3 = "LINE3"
KEY_MUTE = "MUTE"
KEY_POWER = "POWER"
KEY_VOLDOWN = "VOLDOWN"
KEY_VOLUP = "VOLUP"
#
TX_TOPIC = ''
TX_TYPE = base.TX_VALUE
#
RX_IGNORE_TOPICS = [KEY_CD, KEY_LINE1, KEY_LINE3, KEY_MUTE, KEY_POWER, KEY_VOLUP, KEY_VOLDOWN]
def __state_logging__(self, inst, key, data):
pass # This is just a TX device using self.set_*
def set_cd(self, device=None, key=None, data=None):
self.logger.info("Changing amplifier source to CD")
self.send_command(self.KEY_CD, None)
def set_line1(self, device=None, key=None, data=None):
self.logger.info("Changing amplifier source to LINE1")
self.send_command(self.KEY_LINE1, None)
def set_line3(self, device=None, key=None, data=None):
self.logger.info("Changing amplifier source to LINE3")
self.send_command(self.KEY_LINE3, None)
def set_mute(self, device=None, key=None, data=None):
self.logger.info("Muting / Unmuting amplifier")
self.send_command(self.KEY_MUTE, None)
def set_power(self, device=None, key=None, data=None):
self.logger.info("Power on/off amplifier")
self.send_command(self.KEY_POWER, None)
def set_volume_up(self, data=False):
"""data: [True, False]"""
self.logger.info("Increasing amplifier volume")
self.send_command(self.KEY_VOLUP, data)
def set_volume_down(self, data=False):
"""data: [True, False]"""
self.logger.info("Decreasing amplifier volume")
self.send_command(self.KEY_VOLDOWN, data)
def default_inc(self, device=None, key=None, data=None):
self.set_volume_up(True)
def default_dec(self, device=None, key=None, data=None):
self.set_volume_down(True)
def default_stop(self, device=None, key=None, data=None):
self.set_volume_up(False)
class audio_status(base):
""" Communication (MQTT)
audio_status
+- state [True, False] <- status
+- title [text] <- status
"""
KEY_STATE = "state"
KEY_TITLE = "title"
#
TX_TYPE = base.TX_VALUE
#
RX_KEYS = [KEY_STATE, KEY_TITLE]
def __state_logging__(self, inst, key, data):
if key in [self.KEY_STATE, self.KEY_TITLE]:
self.logger.info("State change of '%s' to '%s'", key, repr(data))
def set_state(self, num, data):
"""data: [True, False]"""
self.send_command(self.KEY_STATE + "/" + str(num), data)
def set_state_mcb(self, device, key, data):
self.set_state(data)