2025-09-02 20:26:06 +02:00

195 lines
7.0 KiB
Python

import logging
from simulation.devices import tradfri_light # indirect by classes tradfri_..., livarno_...
from simulation.devices import silvercrest_powerplug
from simulation.devices import shelly as shelly_sw1
from simulation.devices import brennenstuhl_heatingvalve
from simulation.devices import my_powerplug
from simulation.devices import videv_heating as videv_hea
from simulation.devices import videv_light # indirect by classes videv_
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
# TODO: Not yet implemented devices ###################################
def shelly_pro3(mqtt_client, topic):
logger.warning("Device type shelly_pro3 is not yet implemented. Topic %s will not be supported", topic)
return None
def tradfri_button(mqtt_client, topic):
logger.warning("Device type tradfri_button is not yet implemented. Topic %s will not be supported", topic)
return None
def silvercrest_button(mqtt_client, topic):
logger.warning("Device type silvercrest_button is not yet implemented. Topic %s will not be supported", topic)
return None
def silvercrest_motion_sensor(mqtt_client, topic):
logger.warning("Device type silvercrest_motion_sensor is not yet implemented. Topic %s will not be supported", topic)
return None
def audio_status(mqtt_client, topic):
logger.warning("Device type audio_status is not yet implemented. Topic %s will not be supported", topic)
return None
def remote(mqtt_client, topic):
logger.warning("Device type remote is not yet implemented. Topic %s will not be supported", topic)
return None
def my_ambient(mqtt_client, topic):
logger.warning("Device type my_ambient is not yet implemented. Topic %s will not be supported", topic)
return None
def videv_pure_switch(mqtt_client, topic):
logger.warning("Device type videv_pure_switch is not yet implemented. Topic %s will not be supported", topic)
return None
def videv_multistate(mqtt_client, topic):
logger.warning("Device type videv_multistate is not yet implemented. Topic %s will not be supported", topic)
return None
def videv_audio_player(mqtt_client, topic):
logger.warning("Device type videv_audio_player is not yet implemented. Topic %s will not be supported", topic)
return None
def videv_all_off(mqtt_client, topic):
logger.warning("Device type videv_all_off is not yet implemented. Topic %s will not be supported", topic)
return None
# TODO: Not yet implemented devices ###################################
class group(object):
def __init__(self, *args):
super().__init__()
self._members = args
self._iter_counter = 0
#
self.methods = []
self.variables = []
for name in [m for m in args[0].__class__.__dict__.keys()]:
if not name.startswith('_') and callable(getattr(args[0], name)): # add all public callable attributes to the list
self.methods.append(name)
if not name.startswith('_') and not callable(getattr(args[0], name)): # add all public callable attributes to the list
self.variables.append(name)
#
for member in self:
methods = [m for m in member.__class__.__dict__.keys() if not m.startswith(
'_') if not m.startswith('_') and callable(getattr(args[0], m))]
if self.methods != methods:
raise ValueError("All given instances needs to have same methods:", self.methods, methods)
#
variables = [v for v in member.__class__.__dict__.keys() if not v.startswith(
'_') if not v.startswith('_') and not callable(getattr(args[0], v))]
if self.variables != variables:
raise ValueError("All given instances needs to have same variables:", self.variables, variables)
def __iter__(self):
return self
def __next__(self):
if self._iter_counter < len(self):
self._iter_counter += 1
return self._members[self._iter_counter - 1]
self._iter_counter = 0
raise StopIteration
def __getitem__(self, i):
return self._members[i]
def __len__(self):
return len(self._members)
def __getattribute__(self, name):
def group_execution(*args, **kwargs):
rv_list = []
for member in self[:]:
m = getattr(member, name)
rv_list.append(m(*args, **kwargs))
rv = rv_list[0]
for other in rv_list[1:]:
if other != rv:
if name == "get_name":
# ge_name will not be tested, it is only for test documentation
return rv_list[0].replace("_1", "")
else:
return None # Hopefully None was not expected ;-)
return rv # If all return values are identical
try:
rv = super().__getattribute__(name)
except AttributeError:
if callable(getattr(self[0], name)):
return group_execution
else:
return getattr(self[0], name)
else:
return rv
class tradfri_sw(tradfri_light):
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, enable_state=True, enable_brightness=False, enable_color_temp=False, send_on_power_on=True)
class tradfri_sw_br(tradfri_light):
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, enable_state=True, enable_brightness=True, enable_color_temp=False, send_on_power_on=True)
class tradfri_sw_br_ct(tradfri_light):
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, enable_state=True, enable_brightness=True, enable_color_temp=True, send_on_power_on=True)
class livarno_sw_br_ct(tradfri_light):
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, enable_state=True, enable_brightness=True, enable_color_temp=True, send_on_power_on=False)
class hue_sw_br_ct(tradfri_light):
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, enable_state=True, enable_brightness=True, enable_color_temp=True, send_on_power_on=False)
class videv_sw(videv_light):
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, True, False, False, False)
class videv_sw_br(videv_light):
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, True, True, False, False)
class videv_sw_br_ct(videv_light):
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, True, True, True, False)
class videv_sw_tm(videv_light):
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, True, True, True, True)
class videv_sw_mo(videv_light):
# TODO: Motion functions not included yet
def __init__(self, mqtt_client, topic):
super().__init__(mqtt_client, topic, True, True, True, True)