#!/usr/bin/env python # -*- coding: utf-8 -*- # from devices.base import base import logging class powerplug(base): """ Communication (MQTT) my_powerplug +- output +- 1 [True, False] <- status | +- set [True, False, "toggle"] <- command +- 2 [True, False] <- status | +- set [True, False, "toggle"] <- command +- 3 [True, False] <- status | +- set [True, False, "toggle"] <- command +- 4 [True, False] <- status | +- set [True, False, "toggle"] <- command +- all +- set [True, False, "toggle"] <- command """ KEY_OUTPUT_0 = "output/1" KEY_OUTPUT_1 = "output/2" KEY_OUTPUT_2 = "output/3" KEY_OUTPUT_3 = "output/4" KEY_OUTPUT_ALL = "output/all" KEY_OUTPUT_LIST = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3] # TX_TYPE = base.TX_VALUE # RX_KEYS = [KEY_OUTPUT_0, KEY_OUTPUT_1, KEY_OUTPUT_2, KEY_OUTPUT_3] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) # # RX # @property def output_0(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_0) @property def output_1(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_1) @property def output_2(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_2) @property def output_3(self): """rv: [True, False]""" return self.get(self.KEY_OUTPUT_3) # # TX # def set_output(self, key, state): if key in self.KEY_OUTPUT_LIST: self.send_command(key, state) else: logging.error("Unknown key to set the output!") def set_output_0(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_0, state) def set_output_0_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_0 else logging.DEBUG, "Changing output 0 to %s", str(data)) self.set_output_0(data) def toggle_output_0_mcb(self, device, key, data): self.logger.info("Toggeling output 0") self.set_output_0(not self.output_0) def set_output_1(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_1, state) def set_output_1_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_1 else logging.DEBUG, "Changing output 1 to %s", str(data)) self.set_output_1(data) def toggle_output_1_mcb(self, device, key, data): self.logger.info("Toggeling output 1") self.set_output_1(not self.output_1) def set_output_2(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_2, state) def set_output_2_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_2 else logging.DEBUG, "Changing output 2 to %s", str(data)) self.set_output_2(data) def toggle_output_2_mcb(self, device, key, data): self.logger.info("Toggeling output 2") self.set_output_2(not self.output_2) def set_output_3(self, state): """state: [True, False]""" self.send_command(self.KEY_OUTPUT_3, state) def set_output_3_mcb(self, device, key, data): self.logger.log(logging.INFO if data != self.output_3 else logging.DEBUG, "Changing output 3 to %s", str(data)) self.set_output_3(data) def toggle_output_3_mcb(self, device, key, data): self.logger.info("Toggeling output 3") self.set_output_3(not self.output_3) def set_output_all(self, state): """state: [True, False, 'toggle']""" self.send_command(self.KEY_OUTPUT_ALL, state) def set_output_all_mcb(self, device, key, data): self.logger.info("Changing all outputs to %s", str(data)) self.set_output_all(data) def all_off(self): self.set_output_all(False) class remote(base): """ Communication (MQTT) remote (RAS5) <- command +- CD [dc] +- LINE1 [dc] +- LINE2 [dc] +- LINE3 [dc] +- MUTE [dc] +- POWER [dc] +- VOLDOWN [dc] +- VOLUP [dc] +- PHONO [dc] +- DOCK [dc] remote (EUR642100) <- command +- OPEN_CLOSE [dc] +- VOLDOWN [dc] +- VOLUP [dc] +- ONE [dc] +- TWO [dc] +- THREE [dc] +- FOUR [dc] +- FIVE [dc] +- SIX [dc] +- SEVEN [dc] +- EIGHT [dc] +- NINE [dc] +- ZERO [dc] +- TEN [dc] +- TEN_PLUS [dc] +- PROGRAM [dc] +- CLEAR [dc] +- RECALL [dc] +- TIME_MODE [dc] +- A_B_REPEAT [dc] +- REPEAT [dc] +- RANDOM [dc] +- AUTO_CUE [dc] +- TAPE_LENGTH [dc] +- SIDE_A_B [dc] +- TIME_FADE [dc] +- PEAK_SEARCH [dc] +- SEARCH_BACK [dc] +- SEARCH_FOR [dc] +- TRACK_NEXT [dc] +- TRACK_PREV [dc] +- STOP [dc] +- PAUSE [dc] +- PLAY [dc] """ KEY_CD = "CD" KEY_LINE1 = "LINE1" KEY_LINE3 = "LINE3" KEY_MUTE = "MUTE" KEY_POWER = "POWER" KEY_VOLDOWN = "VOLDOWN" KEY_VOLUP = "VOLUP" # TX_TOPIC = '' TX_TYPE = base.TX_VALUE # RX_IGNORE_TOPICS = [KEY_CD, KEY_LINE1, KEY_LINE3, KEY_MUTE, KEY_POWER, KEY_VOLUP, KEY_VOLDOWN] def set_cd(self, device=None, key=None, data=None): self.send_command(self.KEY_CD, None) def set_line1(self, device=None, key=None, data=None): self.send_command(self.KEY_LINE1, None) def set_line3(self, device=None, key=None, data=None): self.send_command(self.KEY_LINE3, None) def set_mute(self, device=None, key=None, data=None): self.send_command(self.KEY_MUTE, None) def set_power(self, device=None, key=None, data=None): self.send_command(self.KEY_POWER, None) def set_volume_up(self, data=False): """data: [True, False]""" self.send_command(self.KEY_VOLUP, data) def set_volume_down(self, data=False): """data: [True, False]""" self.send_command(self.KEY_VOLDOWN, data) def default_inc(self, device=None, key=None, data=None): self.set_volume_up(True) def default_dec(self, device=None, key=None, data=None): self.set_volume_down(True) def default_stop(self, device=None, key=None, data=None): self.set_volume_up(False) class audio_status(base): """ Communication (MQTT) audio_status +- state [True, False] <- status +- title [text] <- status """ KEY_STATE = "state" KEY_TITLE = "title" # TX_TYPE = base.TX_VALUE # RX_KEYS = [KEY_STATE, KEY_TITLE] def set_state(self, num, data): """data: [True, False]""" self.send_command(self.KEY_STATE + "/" + str(num), data) def set_state_mcb(self, device, key, data): self.logger.info("Changing state to %s", str(data)) self.set_state(data)