264 lines
9.3 KiB
Python
264 lines
9.3 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
"""
|
|
tradfri devices
|
|
===============
|
|
|
|
**Author:**
|
|
|
|
* Dirk Alders <sudo-dirk@mount-mockery.de>
|
|
|
|
**Description:**
|
|
|
|
Emulation of tradfri devices
|
|
|
|
Communication (MQTT)
|
|
|
|
tradfri_light {
|
|
| "state": ["ON" / "OFF" / "TOGGLE"]
|
|
| "linkquality": [0...255] lqi
|
|
| "brightness": [0...254]
|
|
| "color_mode": ["color_temp"]
|
|
| "color_temp": ["coolest", "cool", "neutral", "warm", "warmest", 250...454]
|
|
| "color_temp_startup": ["coolest", "cool", "neutral", "warm", "warmest", "previous", 250...454]
|
|
| "update": []
|
|
| }
|
|
+- get {
|
|
| "state": ""
|
|
| }
|
|
+- set {
|
|
"state": ["ON" / "OFF"]
|
|
"brightness": [0...256]
|
|
"color_temp": [250...454]
|
|
"transition": [0...] seconds
|
|
"brightness_move": [-X...0...X] X/s
|
|
"brightness_step": [-X...0...X]
|
|
"color_temp_move": [-X...0...X] X/s
|
|
"color_temp_step": [-X...0...X]
|
|
}
|
|
"""
|
|
|
|
from devices.base import base
|
|
import json
|
|
import task
|
|
import time
|
|
|
|
|
|
class sw(base):
|
|
"""A tradfri device with switching functionality
|
|
|
|
Args:
|
|
mqtt_client (mqtt.mqtt_client): A MQTT Client instance
|
|
topic (str): the base topic for this device
|
|
"""
|
|
KEY_OUTPUT_0 = "state"
|
|
PROPERTIES = [
|
|
KEY_OUTPUT_0,
|
|
]
|
|
|
|
def __init__(self, mqtt_client, topic):
|
|
super().__init__(mqtt_client, topic)
|
|
self[self.KEY_OUTPUT_0] = "off"
|
|
#
|
|
self.mqtt_client.add_callback(self.topic + '/set', self.__rx_set__)
|
|
self.mqtt_client.add_callback(self.topic + '/get', self.__rx_get__)
|
|
#
|
|
cmd_base = self.topic.replace('/', '.') + '.'
|
|
self.user_cmds = {
|
|
cmd_base + 'toggle': self.__ui_toggle_output_0__,
|
|
}
|
|
|
|
def set_state(self, value):
|
|
self.__set__(self.KEY_OUTPUT_0, "on" if value else "off")
|
|
self.send_device_status()
|
|
|
|
def __rx_set__(self, client, userdata, message):
|
|
data = json.loads(message.payload)
|
|
self.logger.info("Received set data: %s", repr(data))
|
|
for key in data:
|
|
self.__set__(key, data[key])
|
|
self.send_device_status()
|
|
if self.KEY_OUTPUT_0 in data and data.get(self.KEY_OUTPUT_0, 'OFF').lower() == "on":
|
|
self.power_on(self.KEY_OUTPUT_0)
|
|
|
|
def __rx_get__(self, client, userdata, message):
|
|
self.send_device_status()
|
|
|
|
def power_on_action(self):
|
|
self[self.KEY_OUTPUT_0] = "on"
|
|
self.send_device_status()
|
|
|
|
def send_device_status(self):
|
|
data = json.dumps(self)
|
|
self.logger.info("Sending status: %s", repr(data))
|
|
self.mqtt_client.send(self.topic, data)
|
|
|
|
def __ui_toggle_output_0__(self, *args):
|
|
self.__set__(self.KEY_OUTPUT_0, 'off' if self.get(self.KEY_OUTPUT_0).lower() == 'on' else 'on')
|
|
self.send_device_status()
|
|
|
|
|
|
class sw_br(sw):
|
|
"""A tradfri device with switching and brightness functionality
|
|
|
|
Args:
|
|
mqtt_client (mqtt.mqtt_client): A MQTT Client instance
|
|
topic (str): the base topic for this device
|
|
"""
|
|
PROPERTIES = sw.PROPERTIES + [
|
|
"brightness",
|
|
"brightness_move",
|
|
]
|
|
|
|
def __init__(self, mqtt_client, topic):
|
|
super().__init__(mqtt_client, topic)
|
|
self.task = task.periodic(0.1, self.__task__)
|
|
self.task.run()
|
|
#
|
|
self["brightness"] = 64
|
|
|
|
def __task__(self, rt):
|
|
db = self.get('brightness_move', 0)
|
|
if db != 0:
|
|
b = self["brightness"]
|
|
b += (db / 10)
|
|
if b < 0:
|
|
b = 0
|
|
elif b > 254:
|
|
b = 254
|
|
if b != self["brightness"]:
|
|
self["brightness"] = b
|
|
self.send_device_status()
|
|
|
|
|
|
class sw_br_ct(sw_br):
|
|
"""A tradfri device with switching, brightness and colortemp functionality
|
|
|
|
Args:
|
|
mqtt_client (mqtt.mqtt_client): A MQTT Client instance
|
|
topic (str): the base topic for this device
|
|
"""
|
|
PROPERTIES = sw_br.PROPERTIES + [
|
|
"color_temp",
|
|
]
|
|
|
|
def __init__(self, mqtt_client, topic):
|
|
super().__init__(mqtt_client, topic)
|
|
self["color_temp"] = 413
|
|
|
|
|
|
class button(base):
|
|
""" Communication (MQTT)
|
|
|
|
tradfri_button {
|
|
"action": [
|
|
"arrow_left_click",
|
|
"arrow_left_hold",
|
|
"arrow_left_release",
|
|
"arrow_right_click",
|
|
"arrow_right_hold",
|
|
"arrow_right_release",
|
|
"brightness_down_click",
|
|
"brightness_down_hold",
|
|
"brightness_down_release",
|
|
"brightness_up_click",
|
|
"brightness_up_hold",
|
|
"brightness_up_release",
|
|
"toggle"
|
|
]
|
|
"action_duration": [0...] s
|
|
"battery": [0...100] %
|
|
"linkquality": [0...255] lqi
|
|
"update": []
|
|
}
|
|
"""
|
|
ACTION_TOGGLE = "toggle"
|
|
ACTION_BRIGHTNESS_UP = "brightness_up_click"
|
|
ACTION_BRIGHTNESS_DOWN = "brightness_down_click"
|
|
ACTION_RIGHT = "arrow_right_click"
|
|
ACTION_LEFT = "arrow_left_click"
|
|
ACTION_BRIGHTNESS_UP_LONG = "brightness_up_hold"
|
|
ACTION_BRIGHTNESS_UP_RELEASE = "brightness_up_release"
|
|
ACTION_BRIGHTNESS_DOWN_LONG = "brightness_down_hold"
|
|
ACTION_BRIGHTNESS_DOWN_RELEASE = "brightness_down_release"
|
|
ACTION_RIGHT_LONG = "arrow_right_hold"
|
|
ACTION_RIGHT_RELEASE = "arrow_right_release"
|
|
ACTION_LEFT_LONG = "arrow_left_hold"
|
|
ACTION_LEFT_RELEASE = "arrow_left_release"
|
|
ACTION_RELEASE = {
|
|
ACTION_BRIGHTNESS_DOWN_LONG: ACTION_BRIGHTNESS_DOWN_RELEASE,
|
|
ACTION_BRIGHTNESS_UP_LONG: ACTION_BRIGHTNESS_UP_RELEASE,
|
|
ACTION_LEFT_LONG: ACTION_LEFT_RELEASE,
|
|
ACTION_RIGHT_LONG: ACTION_RIGHT_RELEASE
|
|
}
|
|
#
|
|
KEY_LINKQUALITY = "linkquality"
|
|
KEY_BATTERY = "battery"
|
|
KEY_ACTION = "action"
|
|
KEY_ACTION_DURATION = "action_duration"
|
|
|
|
def __init__(self, mqtt_client, topic, **kwargs):
|
|
super().__init__(mqtt_client, topic, **kwargs)
|
|
self.__device_in_use__ = False
|
|
#
|
|
cmd_base = self.topic.replace('/', '.') + '.'
|
|
self.user_cmds = {
|
|
cmd_base + 'toggle': self.__ui_button_toggle__,
|
|
cmd_base + 'left': self.__ui_button_left__,
|
|
cmd_base + 'left_long': self.__ui_button_left_long__,
|
|
cmd_base + 'right': self.__ui_button_right__,
|
|
cmd_base + 'right_long': self.__ui_button_right_long__,
|
|
cmd_base + 'up': self.__ui_button_up__,
|
|
cmd_base + 'up_long': self.__ui_button_up_long__,
|
|
cmd_base + 'down': self.__ui_button_down__,
|
|
cmd_base + 'down_long': self.__ui_button_down_long__,
|
|
}
|
|
|
|
def send_device_status(self, data):
|
|
self.logger.info("Sending status: %s", repr(data))
|
|
self.mqtt_client.send(self.topic, json.dumps(data))
|
|
|
|
def __ui_button_toggle__(self, *args):
|
|
self.send_device_status({self.KEY_ACTION: self.ACTION_TOGGLE})
|
|
|
|
def __ui_button_left__(self, *args):
|
|
self.send_device_status({self.KEY_ACTION: self.ACTION_LEFT})
|
|
|
|
def __ui_button_left_long__(self, *args):
|
|
self.__ui_button_long_press__(self.ACTION_LEFT_LONG, *args)
|
|
|
|
def __ui_button_right__(self, *args):
|
|
self.send_device_status({self.KEY_ACTION: self.ACTION_RIGHT})
|
|
|
|
def __ui_button_right_long__(self, *args):
|
|
self.__ui_button_long_press__(self.ACTION_RIGHT_LONG, *args)
|
|
|
|
def __ui_button_up__(self, *args):
|
|
self.send_device_status({self.KEY_ACTION: self.ACTION_BRIGHTNESS_UP})
|
|
|
|
def __ui_button_up_long__(self, *args):
|
|
self.__ui_button_long_press__(self.ACTION_BRIGHTNESS_UP_LONG, *args)
|
|
|
|
def __ui_button_down__(self, *args):
|
|
self.send_device_status({self.KEY_ACTION: self.ACTION_BRIGHTNESS_DOWN})
|
|
|
|
def __ui_button_down_long__(self, *args):
|
|
self.__ui_button_long_press__(self.ACTION_BRIGHTNESS_DOWN_LONG, *args)
|
|
|
|
def __ui_button_long_press__(self, action, *args):
|
|
try:
|
|
dt = float(args[0])
|
|
except (IndexError, ValueError):
|
|
print("You need to give a numeric argument to define the period.")
|
|
else:
|
|
if not self.__device_in_use__:
|
|
self.__device_in_use__ = True
|
|
self.send_device_status({self.KEY_ACTION: action})
|
|
time.sleep(dt)
|
|
self.send_device_status({self.KEY_ACTION: self.ACTION_RELEASE[action]})
|
|
self.__device_in_use__ = False
|
|
|
|
def __ui_button_long_release__(self):
|
|
self.__device_in_use__ = False
|