state_machine/__init__.py

225 行
8.4 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
state_machine (State Machine)
=============================
**Author:**
* Dirk Alders <sudo-dirk@mount-mockery.de>
**Description:**
This Module helps implementing state machines.
**Submodules:**
* :class:`state_machine.state_machine`
**Unittest:**
See also the :download:`unittest <state_machine/_testresults_/unittest.pdf>` documentation.
**Module Documentation:**
"""
__DEPENDENCIES__ = []
import logging
import time
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
__INTERPRETER__ = (2, 3)
"""The supported Interpreter-Versions"""
__DESCRIPTION__ = """This Module helps implementing state machines."""
"""The Module description"""
class state_machine(object):
"""
:param default_state: The default state which is set on initialisation.
:param log_lvl: The log level, this Module logs to (see Loging-Levels of Module :mod:`logging`)
.. note:: Additional keyword parameters well be stored as varibles of the instance (e.g. to give variables or methods for transition condition calculation).
A state machine class can be created by deriving it from this class. The transitions are defined by overriding the variable `TRANSITIONS`.
This Variable is a dictionary, where the key is the start-state and the content is a tuple or list of transitions. Each transition is a tuple or list
including the following information: (condition-method (str), transition-time (number), target_state (str)).
.. note:: The condition-method needs to be implemented as part of the new class.
.. note:: It is usefull to define the states as variables of this class.
**Example:**
.. literalinclude:: ../examples/example.py
.. literalinclude:: ../examples/example.log
"""
TRANSITIONS = {}
LOG_PREFIX = 'StateMachine:'
def __init__(self, default_state, log_lvl, **kwargs):
self.__state__ = None
self.__last_transition_condition__ = None
self.__conditions_start_time__ = {}
self.__state_change_callbacks__ = {}
self.__log_lvl__ = log_lvl
self.__set_state__(default_state, '__init__')
self.__callback_id__ = 0
for key in kwargs:
setattr(self, key, kwargs.get(key))
def register_state_change_callback(self, state, condition, callback, *args, **kwargs):
"""
:param state: The target state. The callback will be executed, if the state machine changes to this state. None means all states.
:type state: str
:param condition: The transition condition. The callback will be executed, if this condition is responsible for the state change. None means all conditions.
:type condition: str
:param callback: The callback to be executed.
.. note:: Additional arguments and keyword parameters are supported. These arguments and parameters will be used as arguments and parameters for the callback execution.
This methods allows to register callbacks which will be executed on state changes.
"""
if state not in self.__state_change_callbacks__:
self.__state_change_callbacks__[state] = {}
if condition not in self.__state_change_callbacks__[state]:
self.__state_change_callbacks__[state][condition] = []
self.__state_change_callbacks__[state][condition].append((self.__callback_id__, callback, args, kwargs))
self.__callback_id__ += 1
def this_state(self):
"""
:return: The current state.
This method returns the current state of the state machine.
"""
return self.__state__
def this_state_is(self, state):
"""
:param state: The state to be checked
:type state: str
:return: True if the given state is currently active, else False.
:rtype: bool
This methods returns the boolean information if the state machine is currently in the given state.
"""
return self.__state__ == state
def this_state_duration(self):
"""
:return: The time how long the current state is active.
:rtype: float
This method returns the time how long the current state is active.
"""
return time.time() - self.__time_stamp_state_change__
def last_transition_condition(self):
"""
:return: The last transition condition.
:rtype: str
This method returns the last transition condition.
"""
return self.__last_transition_condition__
def last_transition_condition_was(self, condition):
"""
:param condition: The condition to be checked
:type condition: str
:return: True if the given condition was the last transition condition, else False.
:rtype: bool
This methods returns the boolean information if the last transition condition is equivalent to the given condition.
"""
return self.__last_transition_condition__ == condition
def previous_state(self):
"""
:return: The previous state.
:rtype: str
This method returns the previous state of the state machine.
"""
return self.__prev_state__
def previous_state_was(self, state):
"""
:param state: The state to be checked
:type state: str
:return: True if the given state was previously active, else False.
:rtype: bool
This methods returns the boolean information if the state machine was previously in the given state.
"""
return self.__prev_state__ == state
def previous_state_duration(self):
"""
:return: The time how long the previous state was active.
:rtype: float
This method returns the time how long the previous state was active.
"""
return self.__prev_state_dt__
def __set_state__(self, target_state, condition):
logger.log(self.__log_lvl__, "%s State change (%s): %s -> %s", self.LOG_PREFIX, repr(condition), repr(self.__state__), repr(target_state))
timestamp = time.time()
self.__prev_state__ = self.__state__
if self.__prev_state__ is None:
self.__prev_state_dt__ = 0.
else:
self.__prev_state_dt__ = timestamp - self.__time_stamp_state_change__
self.__state__ = target_state
self.__last_transition_condition__ = condition
self.__time_stamp_state_change__ = timestamp
self.__conditions_start_time__ = {}
# Callback collect
this_state_change_callbacks = []
this_state_change_callbacks.extend(self.__state_change_callbacks__.get(None, {}).get(None, []))
this_state_change_callbacks.extend(self.__state_change_callbacks__.get(target_state, {}).get(None, []))
this_state_change_callbacks.extend(self.__state_change_callbacks__.get(None, {}).get(condition, []))
this_state_change_callbacks.extend(self.__state_change_callbacks__.get(target_state, {}).get(condition, []))
# Callback sorting
this_state_change_callbacks.sort()
# Callback execution
for cid, callback, args, kwargs in this_state_change_callbacks:
logger.debug('Executing callback %d - %s.%s', cid, callback.__module__, callback.__name__)
callback(*args, **kwargs)
def work(self):
"""
This Method needs to be executed cyclicly to enable the state machine.
"""
tm = time.time()
transitions = self.TRANSITIONS.get(self.this_state())
if transitions is not None:
active_transitions = []
cnt = 0
for method_name, transition_delay, target_state in transitions:
method = getattr(self, method_name)
if method():
if method_name not in self.__conditions_start_time__:
self.__conditions_start_time__[method_name] = tm
if tm - self.__conditions_start_time__[method_name] >= transition_delay:
active_transitions.append((transition_delay - tm + self.__conditions_start_time__[method_name], cnt, target_state, method_name))
else:
self.__conditions_start_time__[method_name] = tm
cnt += 1
if len(active_transitions) > 0:
active_transitions.sort()
self.__set_state__(active_transitions[0][2], active_transitions[0][3])