123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- """
- state_machine (State Machine)
- =============================
-
- **Author:**
-
- * Dirk Alders <sudo-dirk@mount-mockery.de>
-
- **Description:**
-
- This Module helps implementing state machines.
-
- **Submodules:**
-
- * :class:`state_machine.state_machine`
-
- **Unittest:**
-
- See also the :download:`unittest <state_machine/_testresults_/unittest.pdf>` documentation.
-
- **Module Documentation:**
-
- """
- __DEPENDENCIES__ = []
-
- import logging
- import time
-
-
- logger_name = 'STATE_MACHINE'
- logger = logging.getLogger(logger_name)
-
-
- __INTERPRETER__ = (2, 3)
- """The supported Interpreter-Versions"""
- __DESCRIPTION__ = """This Module helps implementing state machines."""
- """The Module description"""
-
-
- class state_machine(object):
- """
- :param default_state: The default state which is set on initialisation.
- :param log_lvl: The log level, this Module logs to (see Loging-Levels of Module :mod:`logging`)
-
- .. note:: Additional keyword parameters well be stored as varibles of the instance (e.g. to give variables or methods for transition condition calculation).
-
- A state machine class can be created by deriving it from this class. The transitions are defined by overriding the variable `TRANSITIONS`.
- This Variable is a dictionary, where the key is the start-state and the content is a tuple or list of transitions. Each transition is a tuple or list
- including the following information: (condition-method (str), transition-time (number), target_state (str)).
-
- .. note:: The condition-method needs to be implemented as part of the new class.
-
- .. note:: It is usefull to define the states as variables of this class.
-
-
- **Example:**
-
- .. literalinclude:: ../examples/example.py
-
- .. literalinclude:: ../examples/example.log
- """
- TRANSITIONS = {}
- LOG_PREFIX = 'StateMachine:'
-
- def __init__(self, default_state, log_lvl, **kwargs):
- self.__state__ = None
- self.__last_transition_condition__ = None
- self.__conditions_start_time__ = {}
- self.__state_change_callbacks__ = {}
- self.__log_lvl__ = log_lvl
- self.__set_state__(default_state, '__init__')
- self.__callback_id__ = 0
- for key in kwargs:
- setattr(self, key, kwargs.get(key))
-
- def register_state_change_callback(self, state, condition, callback, *args, **kwargs):
- """
- :param state: The target state. The callback will be executed, if the state machine changes to this state. None means all states.
- :type state: str
- :param condition: The transition condition. The callback will be executed, if this condition is responsible for the state change. None means all conditions.
- :type condition: str
- :param callback: The callback to be executed.
-
- .. note:: Additional arguments and keyword parameters are supported. These arguments and parameters will be used as arguments and parameters for the callback execution.
-
- This methods allows to register callbacks which will be executed on state changes.
- """
- if state not in self.__state_change_callbacks__:
- self.__state_change_callbacks__[state] = {}
- if condition not in self.__state_change_callbacks__[state]:
- self.__state_change_callbacks__[state][condition] = []
- self.__state_change_callbacks__[state][condition].append((self.__callback_id__, callback, args, kwargs))
- self.__callback_id__ += 1
-
- def this_state(self):
- """
- :return: The current state.
-
- This method returns the current state of the state machine.
- """
- return self.__state__
-
- def this_state_is(self, state):
- """
- :param state: The state to be checked
- :type state: str
- :return: True if the given state is currently active, else False.
- :rtype: bool
-
- This methods returns the boolean information if the state machine is currently in the given state.
- """
- return self.__state__ == state
-
- def this_state_duration(self):
- """
- :return: The time how long the current state is active.
- :rtype: float
-
- This method returns the time how long the current state is active.
- """
- return time.time() - self.__time_stamp_state_change__
-
- def last_transition_condition(self):
- """
- :return: The last transition condition.
- :rtype: str
-
- This method returns the last transition condition.
- """
- return self.__last_transition_condition__
-
- def last_transition_condition_was(self, condition):
- """
- :param condition: The condition to be checked
- :type condition: str
- :return: True if the given condition was the last transition condition, else False.
- :rtype: bool
-
- This methods returns the boolean information if the last transition condition is equivalent to the given condition.
- """
- return self.__last_transition_condition__ == condition
-
- def previous_state(self):
- """
- :return: The previous state.
- :rtype: str
-
- This method returns the previous state of the state machine.
- """
- return self.__prev_state__
-
- def previous_state_was(self, state):
- """
- :param state: The state to be checked
- :type state: str
- :return: True if the given state was previously active, else False.
- :rtype: bool
-
- This methods returns the boolean information if the state machine was previously in the given state.
- """
- return self.__prev_state__ == state
-
- def previous_state_duration(self):
- """
- :return: The time how long the previous state was active.
- :rtype: float
-
- This method returns the time how long the previous state was active.
- """
- return self.__prev_state_dt__
-
- def __set_state__(self, target_state, condition):
- logger.log(self.__log_lvl__, "%s State change (%s): %s -> %s", self.LOG_PREFIX, repr(condition), repr(self.__state__), repr(target_state))
- timestamp = time.time()
- self.__prev_state__ = self.__state__
- if self.__prev_state__ is None:
- self.__prev_state_dt__ = 0.
- else:
- self.__prev_state_dt__ = timestamp - self.__time_stamp_state_change__
- self.__state__ = target_state
- self.__last_transition_condition__ = condition
- self.__time_stamp_state_change__ = timestamp
- self.__conditions_start_time__ = {}
- # Callback collect
- this_state_change_callbacks = self.__state_change_callbacks__.get(None, {}).get(None, [])
- this_state_change_callbacks.extend(self.__state_change_callbacks__.get(target_state, {}).get(None, []))
- this_state_change_callbacks.extend(self.__state_change_callbacks__.get(None, {}).get(condition, []))
- this_state_change_callbacks.extend(self.__state_change_callbacks__.get(target_state, {}).get(condition, []))
- # Callback sorting
- this_state_change_callbacks.sort()
- # Callback execution
- for cid, callback, args, kwargs in this_state_change_callbacks:
- logger.debug('Executing callback %d - %s.%s', cid, callback.__module__, callback.__name__)
- callback(*args, **kwargs)
-
- def work(self):
- """
- This Method needs to be executed cyclicly to enable the state machine.
- """
- tm = time.time()
- transitions = self.TRANSITIONS.get(self.this_state())
- if transitions is not None:
- active_transitions = []
- cnt = 0
- for method_name, transition_delay, target_state in transitions:
- method = getattr(self, method_name)
- if method():
- if method_name not in self.__conditions_start_time__:
- self.__conditions_start_time__[method_name] = tm
- if tm - self.__conditions_start_time__[method_name] >= transition_delay:
- active_transitions.append((transition_delay - tm + self.__conditions_start_time__[method_name], cnt, target_state, method_name))
- else:
- self.__conditions_start_time__[method_name] = tm
- cnt += 1
- if len(active_transitions) > 0:
- active_transitions.sort()
- self.__set_state__(active_transitions[0][2], active_transitions[0][3])
|