#!/usr/bin/env python # -*- coding: utf-8 -*- # """ tradfri devices =============== **Author:** * Dirk Alders **Description:** Emulation of tradfri devices Communication (MQTT) tradfri_light { | "state": ["ON" / "OFF" / "TOGGLE"] | "linkquality": [0...255] lqi | "brightness": [0...254] | "color_mode": ["color_temp"] | "color_temp": ["coolest", "cool", "neutral", "warm", "warmest", 250...454] | "color_temp_startup": ["coolest", "cool", "neutral", "warm", "warmest", "previous", 250...454] | "update": [] | } +- get { | "state": "" | } +- set { "state": ["ON" / "OFF"] "brightness": [0...256] "color_temp": [250...454] "transition": [0...] seconds "brightness_move": [-X...0...X] X/s "brightness_step": [-X...0...X] "color_temp_move": [-X...0...X] X/s "color_temp_step": [-X...0...X] } """ from devices.base import base import json import task import time class sw(base): """A tradfri device with switching functionality Args: mqtt_client (mqtt.mqtt_client): A MQTT Client instance topic (str): the base topic for this device """ KEY_OUTPUT_0 = "state" PROPERTIES = [ KEY_OUTPUT_0, ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self[self.KEY_OUTPUT_0] = "off" # self.mqtt_client.add_callback(self.topic + '/set', self.__rx_set__) self.mqtt_client.add_callback(self.topic + '/get', self.__rx_get__) # cmd_base = self.topic.replace('/', '.') + '.' self.user_cmds = { cmd_base + 'toggle': self.__ui_toggle_output_0__, } def set_state(self, value): self.__set__(self.KEY_OUTPUT_0, "on" if value else "off") self.send_device_status() def __rx_set__(self, client, userdata, message): data = json.loads(message.payload) self.logger.info("Received set data: %s", repr(data)) for key in data: self.__set__(key, data[key]) self.send_device_status() if self.KEY_OUTPUT_0 in data and data.get(self.KEY_OUTPUT_0, 'OFF').lower() == "on": self.power_on(self.KEY_OUTPUT_0) def __rx_get__(self, client, userdata, message): self.send_device_status() def power_on_action(self): self[self.KEY_OUTPUT_0] = "on" self.send_device_status() def send_device_status(self): data = json.dumps(self) self.logger.info("Sending status: %s", repr(data)) self.mqtt_client.send(self.topic, data) def __ui_toggle_output_0__(self, *args): self.__set__(self.KEY_OUTPUT_0, 'off' if self.get(self.KEY_OUTPUT_0).lower() == 'on' else 'on') self.send_device_status() class sw_br(sw): """A tradfri device with switching and brightness functionality Args: mqtt_client (mqtt.mqtt_client): A MQTT Client instance topic (str): the base topic for this device """ PROPERTIES = sw.PROPERTIES + [ "brightness", "brightness_move", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self.task = task.periodic(0.1, self.__task__) self.task.run() # self["brightness"] = 64 def __task__(self, rt): db = self.get('brightness_move', 0) if db != 0: b = self["brightness"] b += (db / 10) if b < 0: b = 0 elif b > 254: b = 254 if b != self["brightness"]: self["brightness"] = b self.send_device_status() class sw_br_ct(sw_br): """A tradfri device with switching, brightness and colortemp functionality Args: mqtt_client (mqtt.mqtt_client): A MQTT Client instance topic (str): the base topic for this device """ PROPERTIES = sw_br.PROPERTIES + [ "color_temp", ] def __init__(self, mqtt_client, topic): super().__init__(mqtt_client, topic) self["color_temp"] = 413 class button(base): """ Communication (MQTT) tradfri_button { "action": [ "arrow_left_click", "arrow_left_hold", "arrow_left_release", "arrow_right_click", "arrow_right_hold", "arrow_right_release", "brightness_down_click", "brightness_down_hold", "brightness_down_release", "brightness_up_click", "brightness_up_hold", "brightness_up_release", "toggle" ] "action_duration": [0...] s "battery": [0...100] % "linkquality": [0...255] lqi "update": [] } """ ACTION_TOGGLE = "toggle" ACTION_BRIGHTNESS_UP = "brightness_up_click" ACTION_BRIGHTNESS_DOWN = "brightness_down_click" ACTION_RIGHT = "arrow_right_click" ACTION_LEFT = "arrow_left_click" ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold" ACTION_BRIGHTNESS_UP_RELEASE = "brightness_up_release" ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold" ACTION_BRIGHTNESS_DOWN_RELEASE = "brightness_down_release" ACTION_RIGHT_LONG = "arrow_right_hold" ACTION_RIGHT_RELEASE = "arrow_right_release" ACTION_LEFT_LONG = "arrow_left_hold" ACTION_LEFT_RELEASE = "arrow_left_release" ACTION_RELEASE = { ACTION_BRIGHTNESS_DOWN_LONG: ACTION_BRIGHTNESS_DOWN_RELEASE, ACTION_BRIGHTNESS_UP_LONG: ACTION_BRIGHTNESS_UP_RELEASE, ACTION_LEFT_LONG: ACTION_LEFT_RELEASE, ACTION_RIGHT_LONG: ACTION_RIGHT_RELEASE } # KEY_LINKQUALITY = "linkquality" KEY_BATTERY = "battery" KEY_ACTION = "action" KEY_ACTION_DURATION = "action_duration" def __init__(self, mqtt_client, topic, **kwargs): super().__init__(mqtt_client, topic, **kwargs) self.__device_in_use__ = False # cmd_base = self.topic.replace('/', '.') + '.' self.user_cmds = { cmd_base + 'toggle': self.__ui_button_toggle__, cmd_base + 'left': self.__ui_button_left__, cmd_base + 'left_long': self.__ui_button_left_long__, cmd_base + 'right': self.__ui_button_right__, cmd_base + 'right_long': self.__ui_button_right_long__, cmd_base + 'up': self.__ui_button_up__, cmd_base + 'up_long': self.__ui_button_up_long__, cmd_base + 'down': self.__ui_button_down__, cmd_base + 'down_long': self.__ui_button_down_long__, } def send_device_status(self, data): self.logger.info("Sending status: %s", repr(data)) self.mqtt_client.send(self.topic, json.dumps(data)) def __ui_button_toggle__(self, *args): self.send_device_status({self.KEY_ACTION: self.ACTION_TOGGLE}) def __ui_button_left__(self, *args): self.send_device_status({self.KEY_ACTION: self.ACTION_LEFT}) def __ui_button_left_long__(self, *args): self.__ui_button_long_press__(self.ACTION_LEFT_LONG, *args) def __ui_button_right__(self, *args): self.send_device_status({self.KEY_ACTION: self.ACTION_RIGHT}) def __ui_button_right_long__(self, *args): self.__ui_button_long_press__(self.ACTION_RIGHT_LONG, *args) def __ui_button_up__(self, *args): self.send_device_status({self.KEY_ACTION: self.ACTION_BRIGHTNESS_UP}) def __ui_button_up_long__(self, *args): self.__ui_button_long_press__(self.ACTION_BRIGHTNESS_UP_LONG, *args) def __ui_button_down__(self, *args): self.send_device_status({self.KEY_ACTION: self.ACTION_BRIGHTNESS_DOWN}) def __ui_button_down_long__(self, *args): self.__ui_button_long_press__(self.ACTION_BRIGHTNESS_DOWN_LONG, *args) def __ui_button_long_press__(self, action, *args): try: dt = float(args[0]) except (IndexError, ValueError): print("You need to give a numeric argument to define the period.") else: if not self.__device_in_use__: self.__device_in_use__ = True self.send_device_status({self.KEY_ACTION: action}) time.sleep(dt) self.send_device_status({self.KEY_ACTION: self.ACTION_RELEASE[action]}) self.__device_in_use__ = False def __ui_button_long_release__(self): self.__device_in_use__ = False