#!/usr/bin/env python # -*- coding: utf-8 -*- # """ state_machine (State Machine) ============================= **Author:** * Dirk Alders **Description:** This Module helps implementing state machines. **Submodules:** * :class:`state_machine.state_machine` **Unittest:** See also the :download:`unittest ` documentation. **Module Documentation:** """ __DEPENDENCIES__ = [] import logging import time try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) __INTERPRETER__ = (2, 3) """The supported Interpreter-Versions""" __DESCRIPTION__ = """This Module helps implementing state machines.""" """The Module description""" class state_machine(object): """ :param default_state: The default state which is set on initialisation. :param log_lvl: The log level, this Module logs to (see Loging-Levels of Module :mod:`logging`) .. note:: Additional keyword parameters well be stored as varibles of the instance (e.g. to give variables or methods for transition condition calculation). A state machine class can be created by deriving it from this class. The transitions are defined by overriding the variable `TRANSITIONS`. This Variable is a dictionary, where the key is the start-state and the content is a tuple or list of transitions. Each transition is a tuple or list including the following information: (condition-method (str), transition-time (number), target_state (str)). .. note:: The condition-method needs to be implemented as part of the new class. .. note:: It is usefull to define the states as variables of this class. **Example:** .. literalinclude:: ../examples/example.py .. literalinclude:: ../examples/example.log """ TRANSITIONS = {} LOG_PREFIX = 'StateMachine:' def __init__(self, default_state, log_lvl, **kwargs): self.__state__ = None self.__last_transition_condition__ = None self.__conditions_start_time__ = {} self.__state_change_callbacks__ = {} self.__log_lvl__ = log_lvl self.__set_state__(default_state, '__init__') self.__callback_id__ = 0 for key in kwargs: setattr(self, key, kwargs.get(key)) def register_state_change_callback(self, state, condition, callback, *args, **kwargs): """ :param state: The target state. The callback will be executed, if the state machine changes to this state. None means all states. :type state: str :param condition: The transition condition. The callback will be executed, if this condition is responsible for the state change. None means all conditions. :type condition: str :param callback: The callback to be executed. .. note:: Additional arguments and keyword parameters are supported. These arguments and parameters will be used as arguments and parameters for the callback execution. This methods allows to register callbacks which will be executed on state changes. """ if state not in self.__state_change_callbacks__: self.__state_change_callbacks__[state] = {} if condition not in self.__state_change_callbacks__[state]: self.__state_change_callbacks__[state][condition] = [] self.__state_change_callbacks__[state][condition].append((self.__callback_id__, callback, args, kwargs)) self.__callback_id__ += 1 def this_state(self): """ :return: The current state. This method returns the current state of the state machine. """ return self.__state__ def this_state_is(self, state): """ :param state: The state to be checked :type state: str :return: True if the given state is currently active, else False. :rtype: bool This methods returns the boolean information if the state machine is currently in the given state. """ return self.__state__ == state def this_state_duration(self): """ :return: The time how long the current state is active. :rtype: float This method returns the time how long the current state is active. """ return time.time() - self.__time_stamp_state_change__ def last_transition_condition(self): """ :return: The last transition condition. :rtype: str This method returns the last transition condition. """ return self.__last_transition_condition__ def last_transition_condition_was(self, condition): """ :param condition: The condition to be checked :type condition: str :return: True if the given condition was the last transition condition, else False. :rtype: bool This methods returns the boolean information if the last transition condition is equivalent to the given condition. """ return self.__last_transition_condition__ == condition def previous_state(self): """ :return: The previous state. :rtype: str This method returns the previous state of the state machine. """ return self.__prev_state__ def previous_state_was(self, state): """ :param state: The state to be checked :type state: str :return: True if the given state was previously active, else False. :rtype: bool This methods returns the boolean information if the state machine was previously in the given state. """ return self.__prev_state__ == state def previous_state_duration(self): """ :return: The time how long the previous state was active. :rtype: float This method returns the time how long the previous state was active. """ return self.__prev_state_dt__ def __set_state__(self, target_state, condition): logger.log(self.__log_lvl__, "%s State change (%s): %s -> %s", self.LOG_PREFIX, repr(condition), repr(self.__state__), repr(target_state)) timestamp = time.time() self.__prev_state__ = self.__state__ if self.__prev_state__ is None: self.__prev_state_dt__ = 0. else: self.__prev_state_dt__ = timestamp - self.__time_stamp_state_change__ self.__state__ = target_state self.__last_transition_condition__ = condition self.__time_stamp_state_change__ = timestamp self.__conditions_start_time__ = {} # Callback collect this_state_change_callbacks = [] this_state_change_callbacks.extend(self.__state_change_callbacks__.get(None, {}).get(None, [])) this_state_change_callbacks.extend(self.__state_change_callbacks__.get(target_state, {}).get(None, [])) this_state_change_callbacks.extend(self.__state_change_callbacks__.get(None, {}).get(condition, [])) this_state_change_callbacks.extend(self.__state_change_callbacks__.get(target_state, {}).get(condition, [])) # Callback sorting this_state_change_callbacks.sort() # Callback execution for cid, callback, args, kwargs in this_state_change_callbacks: logger.debug('Executing callback %d - %s.%s', cid, callback.__module__, callback.__name__) callback(*args, **kwargs) def work(self): """ This Method needs to be executed cyclicly to enable the state machine. """ tm = time.time() transitions = self.TRANSITIONS.get(self.this_state()) if transitions is not None: active_transitions = [] cnt = 0 for method_name, transition_delay, target_state in transitions: method = getattr(self, method_name) if method(): if method_name not in self.__conditions_start_time__: self.__conditions_start_time__[method_name] = tm if tm - self.__conditions_start_time__[method_name] >= transition_delay: active_transitions.append((transition_delay - tm + self.__conditions_start_time__[method_name], cnt, target_state, method_name)) else: self.__conditions_start_time__[method_name] = tm cnt += 1 if len(active_transitions) > 0: active_transitions.sort() self.__set_state__(active_transitions[0][2], active_transitions[0][3])