#!/usr/bin/env python # -*- coding: utf-8 -*- # """ Communication (MQTT) brennenstuhl_heatingvalve { | "away_mode": ["ON", "OFF"] | "battery": [0...100] % | "child_lock": ["LOCK", "UNLOCK"] | "current_heating_setpoint": [5...30] °C | "linkquality": [0...255] lqi | "local_temperature": [numeric] °C | "preset": ["manual", ...] | "system_mode": ["heat", ...] | "valve_detection": ["ON", "OFF"] | "window_detection": ["ON", "OFF"] | } +- set { "away_mode": ["ON", "OFF", "TOGGLE"] "child_lock": ["LOCK", "UNLOCK"] "current_heating_setpoint": [5...30] °C "preset": ["manual", ...] "system_mode": ["heat", ...] "valve_detection": ["ON", "OFF", "TOGGLE"] "window_detection": ["ON", "OFF", "TOGGLE"] } """ from devices.base import base import json import task import time """ ANSWER of a device: { "away_mode":"OFF", "battery":5, "child_lock":"UNLOCK", "current_heating_setpoint":21, "linkquality":196, "local_temperature":21.2, "preset":"manual", "system_mode":"heat", "valve_detection":"ON", "window_detection":"ON" } """ class vlv(base): """A tradfri device with switching functionality Args: mqtt_client (mqtt.mqtt_client): A MQTT Client instance topic (str): the base topic for this device """ PROPERTIES = [ "away_mode", "battery", "child_lock", "current_heating_setpoint", "linkquality", "local_temperature", "preset", "system_mode", "valve_detection", "window_detection" ] def __init__(self, mqtt_client, topic, **kwargs): super().__init__(mqtt_client, topic, **kwargs) self["away_mode"] = "OFF" self["battery"] = 87 self["child_lock"] = "UNLOCK" self["current_heating_setpoint"] = 21 self["linkquality"] = 196 self["local_temperature"] = 21.2 self["preset"] = "manual" self["system_mode"] = "heat" self["valve_detection"] = "ON" self["window_detection"] = "ON" # self.mqtt_client.add_callback(self.topic + '/set', self.__rx_set__) # self.tq = task.threaded_queue() self.tq.run() # cmd_base = self.topic.replace('/', '.') + '.' self.user_cmds = { cmd_base + 'setpoint': self.__ui_setpoint__, } def set_state(self, value): self.__set__("state", "on" if value else "off") self.send_device_status() def __rx_set__(self, client, userdata, message): data = json.loads(message.payload) self.logger.info("Received set data: %s", repr(data)) for key in data: self.__set__(key, data[key]) self.tq.enqueue(1, self.send_device_status, data) def send_device_status(self, rt, data): for i in range(0, 75): time.sleep(0.01) if self.tq.qsize() >= 3: return for key in self: if key not in data: data[key] = self[key] self.logger.info("Sending status: %s", repr(data)) self.mqtt_client.send(self.topic, json.dumps(data)) def __ui_setpoint__(self, *args): try: value = float(args[0].replace(',', '.')) except (ValueError, IndexError): print("You need to give a numeric value as first argument.") else: self.__set__("current_heating_setpoint", value) self.tq.enqueue(1, self.send_device_status, self)