Python Library State Machine
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

__init__.py 8.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. """
  5. state_machine (State Machine)
  6. =============================
  7. **Author:**
  8. * Dirk Alders <sudo-dirk@mount-mockery.de>
  9. **Description:**
  10. This Module helps implementing state machines.
  11. **Submodules:**
  12. * :class:`state_machine.state_machine`
  13. **Unittest:**
  14. See also the :download:`unittest <state_machine/_testresults_/unittest.pdf>` documentation.
  15. **Module Documentation:**
  16. """
  17. __DEPENDENCIES__ = []
  18. import logging
  19. import time
  20. try:
  21. from config import APP_NAME as ROOT_LOGGER_NAME
  22. except ImportError:
  23. ROOT_LOGGER_NAME = 'root'
  24. logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
  25. __INTERPRETER__ = (3, )
  26. """The supported Interpreter-Versions"""
  27. __DESCRIPTION__ = """This Module helps implementing state machines."""
  28. """The Module description"""
  29. class state_machine(object):
  30. """
  31. :param default_state: The default state which is set on initialisation.
  32. :param log_lvl: The log level, this Module logs to (see Loging-Levels of Module :mod:`logging`)
  33. .. note:: Additional keyword parameters well be stored as varibles of the instance (e.g. to give variables or methods for transition condition calculation).
  34. A state machine class can be created by deriving it from this class. The transitions are defined by overriding the variable `TRANSITIONS`.
  35. This Variable is a dictionary, where the key is the start-state and the content is a tuple or list of transitions. Each transition is a tuple or list
  36. including the following information: (condition-method (str), transition-time (number), target_state (str)).
  37. .. note:: The condition-method needs to be implemented as part of the new class.
  38. .. note:: It is usefull to define the states as variables of this class.
  39. **Example:**
  40. .. literalinclude:: state_machine/_examples_/example.py
  41. .. literalinclude:: state_machine/_examples_/example.log
  42. """
  43. TRANSITIONS = {}
  44. LOG_PREFIX = 'StateMachine:'
  45. def __init__(self, default_state, log_lvl, **kwargs):
  46. self.__state__ = None
  47. self.__last_transition_condition__ = None
  48. self.__conditions_start_time__ = {}
  49. self.__state_change_callbacks__ = {}
  50. self.__log_lvl__ = log_lvl
  51. self.__set_state__(default_state, '__init__')
  52. self.__callback_id__ = 0
  53. for key in kwargs:
  54. setattr(self, key, kwargs.get(key))
  55. def register_state_change_callback(self, state, condition, callback, *args, **kwargs):
  56. """
  57. :param state: The target state. The callback will be executed, if the state machine changes to this state. None means all states.
  58. :type state: str
  59. :param condition: The transition condition. The callback will be executed, if this condition is responsible for the state change. None means all conditions.
  60. :type condition: str
  61. :param callback: The callback to be executed.
  62. .. note:: Additional arguments and keyword parameters are supported. These arguments and parameters will be used as arguments and parameters for the callback execution.
  63. This methods allows to register callbacks which will be executed on state changes.
  64. """
  65. if state not in self.__state_change_callbacks__:
  66. self.__state_change_callbacks__[state] = {}
  67. if condition not in self.__state_change_callbacks__[state]:
  68. self.__state_change_callbacks__[state][condition] = []
  69. self.__state_change_callbacks__[state][condition].append((self.__callback_id__, callback, args, kwargs))
  70. self.__callback_id__ += 1
  71. def this_state(self):
  72. """
  73. :return: The current state.
  74. This method returns the current state of the state machine.
  75. """
  76. return self.__state__
  77. def this_state_is(self, state):
  78. """
  79. :param state: The state to be checked
  80. :type state: str
  81. :return: True if the given state is currently active, else False.
  82. :rtype: bool
  83. This methods returns the boolean information if the state machine is currently in the given state.
  84. """
  85. return self.__state__ == state
  86. def this_state_duration(self):
  87. """
  88. :return: The time how long the current state is active.
  89. :rtype: float
  90. This method returns the time how long the current state is active.
  91. """
  92. return time.time() - self.__time_stamp_state_change__
  93. def last_transition_condition(self):
  94. """
  95. :return: The last transition condition.
  96. :rtype: str
  97. This method returns the last transition condition.
  98. """
  99. return self.__last_transition_condition__
  100. def last_transition_condition_was(self, condition):
  101. """
  102. :param condition: The condition to be checked
  103. :type condition: str
  104. :return: True if the given condition was the last transition condition, else False.
  105. :rtype: bool
  106. This methods returns the boolean information if the last transition condition is equivalent to the given condition.
  107. """
  108. return self.__last_transition_condition__ == condition
  109. def previous_state(self):
  110. """
  111. :return: The previous state.
  112. :rtype: str
  113. This method returns the previous state of the state machine.
  114. """
  115. return self.__prev_state__
  116. def previous_state_was(self, state):
  117. """
  118. :param state: The state to be checked
  119. :type state: str
  120. :return: True if the given state was previously active, else False.
  121. :rtype: bool
  122. This methods returns the boolean information if the state machine was previously in the given state.
  123. """
  124. return self.__prev_state__ == state
  125. def previous_state_duration(self):
  126. """
  127. :return: The time how long the previous state was active.
  128. :rtype: float
  129. This method returns the time how long the previous state was active.
  130. """
  131. return self.__prev_state_dt__
  132. def __set_state__(self, target_state, condition):
  133. logger.log(self.__log_lvl__, "%s State change (%s): %s -> %s", self.LOG_PREFIX, repr(condition), repr(self.__state__), repr(target_state))
  134. timestamp = time.time()
  135. self.__prev_state__ = self.__state__
  136. if self.__prev_state__ is None:
  137. self.__prev_state_dt__ = 0.
  138. else:
  139. self.__prev_state_dt__ = timestamp - self.__time_stamp_state_change__
  140. self.__state__ = target_state
  141. self.__last_transition_condition__ = condition
  142. self.__time_stamp_state_change__ = timestamp
  143. self.__conditions_start_time__ = {}
  144. # Callback collect
  145. this_state_change_callbacks = []
  146. this_state_change_callbacks.extend(self.__state_change_callbacks__.get(None, {}).get(None, []))
  147. this_state_change_callbacks.extend(self.__state_change_callbacks__.get(target_state, {}).get(None, []))
  148. this_state_change_callbacks.extend(self.__state_change_callbacks__.get(None, {}).get(condition, []))
  149. this_state_change_callbacks.extend(self.__state_change_callbacks__.get(target_state, {}).get(condition, []))
  150. # Callback sorting
  151. this_state_change_callbacks.sort()
  152. # Callback execution
  153. for cid, callback, args, kwargs in this_state_change_callbacks:
  154. logger.debug('Executing callback %d - %s.%s', cid, callback.__module__, callback.__name__)
  155. callback(*args, **kwargs)
  156. def work(self):
  157. """
  158. This Method needs to be executed cyclicly to enable the state machine.
  159. """
  160. tm = time.time()
  161. transitions = self.TRANSITIONS.get(self.this_state())
  162. if transitions is not None:
  163. active_transitions = []
  164. cnt = 0
  165. for method_name, transition_delay, target_state in transitions:
  166. method = getattr(self, method_name)
  167. if method():
  168. if method_name not in self.__conditions_start_time__:
  169. self.__conditions_start_time__[method_name] = tm
  170. if tm - self.__conditions_start_time__[method_name] >= transition_delay:
  171. active_transitions.append((transition_delay - tm + self.__conditions_start_time__[method_name], cnt, target_state, method_name))
  172. else:
  173. self.__conditions_start_time__[method_name] = tm
  174. cnt += 1
  175. if len(active_transitions) > 0:
  176. active_transitions.sort()
  177. self.__set_state__(active_transitions[0][2], active_transitions[0][3])