helpers/__init__.py

555 linhas
18 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
helpers (Helpers)
=================
**Author:**
* Dirk Alders <sudo-dirk@mount-mockery.de>
**Description:**
This module supports functions and classes which don't have an own Module (yet).
**Submodules:**
* :class:`helpers.continues_statistic`
* :class:`helpers.continues_statistic_multivalue`
* :class:`helpers.direct_socket_client`
* :class:`helpers.direct_socket_server`
* :class:`helpers.ringbuffer`
**Unittest:**
See also the :download:`unittest <helpers/_testresults_/unittest.pdf>` documentation.
**Module Documentation:**
"""
import logging
import numbers
import os
import time
import stringtools
import task
__DEPENDENCIES__ = ['stringtools', 'task', ]
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
__DESCRIPTION__ = """The Module {\\tt %s} is designed to help with some function which don't have an own Module (yet).
For more Information read the documentation.""" % __name__.replace('_', '\_')
"""The Module Description"""
__INTERPRETER__ = (2, 3)
"""The Tested Interpreter-Versions"""
class continues_statistic(dict):
"""
This class stores a statistic for a stream of values. The statistic includes: mean, min, max, quantifier.
.. note:: You can use the mathematic operators "+, +=" to add a value to the statistic.
:param mean: The mean start value, the start value or None.
:type mean: numeric
:param min_val: The min start value or None
:type min_val: numeric
:param max_val: The max start value or None
:type max_val: numeric
:param quantifier: The quantifier start value or 0
:type quantifier: int
.. note:: You are able to initialise this class in the following ways:
* Without arguments to get an empty instance
* With one single value to get an starting instance
* With mean, min_val, max_val, quantifier to initialise an existing statistic
**Example:**
.. literalinclude:: helpers/_examples_/continues_statistic.py
Will result to the following output:
.. literalinclude:: helpers/_examples_/continues_statistic.log
"""
def __init__(self, mean=None, min_val=None, max_val=None, quantifier=0):
dict.__init__(self)
if mean is None:
self.__init_data__(None, None, None, 0)
elif min_val is not None and max_val is not None and quantifier > 0:
self.__init_data__(mean, min_val, max_val, quantifier)
else:
self.__init_data__(mean, mean, mean, 1)
def __init_data__(self, mean, min_val, max_val, quantifier):
self['quantifier'] = quantifier
self['max_val'] = max_val
self['min_val'] = min_val
self['mean'] = mean
def __add__(self, other):
if self.quantifier == 0:
return continues_statistic(**other)
elif other.quantifier == 0:
return continues_statistic(**self)
else:
return continues_statistic(
mean=(self.mean * self.quantifier + other.mean * other.quantifier) / (self.quantifier + other.quantifier),
min_val=min(self.min, other.min),
max_val=max(self.max, other.max),
quantifier=self.quantifier + other.quantifier,
)
@property
def mean(self):
"""
The current mean value.
"""
return self.get('mean')
@property
def min(self):
"""
The current min value.
"""
return self.get('min_val')
@property
def max(self):
"""
The current max value.
"""
return self.get('max_val')
@property
def quantifier(self):
"""
The current quantifier (number of values for continues statistic).
"""
return self.get('quantifier')
def pop(self):
"""
This pops out the current statistic data and returns these as an instance of :class:`helpers.continues_statistic`.
:returns: The current statistic or None, if no value is available.
:rtype: :class:`helpers.continues_statistic` or None
"""
if self.quantifier == 0:
return continues_statistic()
else:
rv = continues_statistic(self.mean, self.min, self.max, self.quantifier)
self.__init_data__(None, None, None, 0)
return rv
def __str__(self):
return "mean=%(mean)s, min=%(min_val)s, max=%(max_val)s, quantifier=%(quantifier)s" % self
class continues_statistic_multivalue(dict):
"""
This class stores multiple statistics of stream values. The statistic of each value is stored as :class:`helpers.continues_statistic`.
.. note:: You can use the mathematic operators "+, +=" to add instances.
.. note:: You are able to initialise this class in the following ways:
* Without arguments to get an empty instance
* With a keword argument(s) which will be passed to :class:`continues_statistic`
* With a dict equivalent of :class:`continues_statistic_multivalue` to initialise an existing statistic
**Example:**
.. literalinclude:: helpers/_examples_/continues_statistic_multivalue.py
Will result to the following output:
.. literalinclude:: helpers/_examples_/continues_statistic_multivalue.log
"""
def __init__(self, *args, **kwargs):
dict.__init__(self)
if len(args) == 0 and len(kwargs) >= 0:
for key in kwargs:
if type(kwargs[key]) is continues_statistic:
self[key] = kwargs[key]
elif type(kwargs[key]) is dict:
self[key] = continues_statistic(**kwargs[key])
else:
self[key] = continues_statistic(kwargs[key])
elif len(args) == 1 and len(kwargs) == 0:
for key in args[0]:
self[key] = continues_statistic(**args[0][key])
else:
raise TypeError("No valid initialisation value(s)!")
def __add__(self, other):
rv_dict = {}
for key in set(list(self.keys()) + list(other.keys())):
rv_dict[key] = self.get(key, continues_statistic()) + other.get(key, continues_statistic())
return continues_statistic_multivalue(**rv_dict)
def pop(self, key=None):
"""
This pops out the current statistic data for a given key and returns these as an instance of :class:`helpers.continues_statistic`.
If no key is given it pops out the current statistic data and returns these as an instance of :class:`helpers.continues_statistic_multiple`.
:param key: The key to get data for
:type key: str or NoneType
:returns: The current statistic or None, if no value is available.
:rtype: :class:`helpers.continues_statistic` or None
"""
if key is None:
if len(self) == 0:
return continues_statistic_multivalue()
else:
rv = continues_statistic_multivalue(**self)
self.clear()
return rv
else:
return self[key].pop()
def __str__(self):
if len(self) == 0:
return '-'
else:
return '\n'.join([key + ': ' + str(self[key]) for key in self])
class direct_socket_base(object):
"""
This is the base class for other classes in this module.
"""
DEFAULT_CHANNEL_NAME = 'all_others'
IS_CLIENT = False
def __init__(self, max_len=None, virtual_rate_bps=None):
self.__max_length__ = max_len
self.__rate_bps__ = virtual_rate_bps
#
self.init_channel_name()
self.__queue__ = task.threaded_queue()
self.__queue__.run()
#
self.__remote_socket__ = None
self.__last_remote_socket__ = None
self.__data_callback__ = None
self.__connect_callback__ = None
self.__disconnect_callback__ = None
#
self.__clean_buffer__()
def __chunks__(self, data):
chunks = []
if self.__max_length__ is None:
chunks.append(data)
else:
for i in range(0, len(data), self.__max_length__):
chunks.append(data[i:i + self.__max_length__])
return chunks
def __clean_buffer__(self):
self.__rx_buffer__ = b''
self.logger.debug('%s Cleaning up receive-buffer', self.__log_prefix__())
def __connect__(self, remote_socket):
if self.__remote_socket__ is None:
self.__remote_socket__ = remote_socket
self.logger.info('%s Connection established...', self.__log_prefix__())
self.__clean_buffer__()
if self.__connect_callback__ is not None:
self.__connect_callback__()
remote_socket.__connect__(self)
def __log_prefix__(self):
return 'comm-client:' if self.IS_CLIENT else 'comm-server:'
def __rx__(self, data):
self.__rx_buffer__ += data
self.logger.info('%s RX <- %s', self.__log_prefix__(), stringtools.hexlify(data))
if self.__data_callback__ is not None:
self.__data_callback__(self)
def __tx__(self, q, data):
self.logger.info('%s TX -> %s', self.__log_prefix__(), stringtools.hexlify(data))
if self.__rate_bps__ is not None:
time.sleep(len(data) / self.__rate_bps__)
self.__remote_socket__.__rx__(data)
def disconnect(self):
"""
Method to disconnect client and server.
"""
if self.__remote_socket__ is not None:
self.__last_remote_socket__ = self.__remote_socket__
self.__remote_socket__ = None
self.logger.info('%s Connection Lost...', self.__log_prefix__())
if self.__disconnect_callback__ is not None:
self.__disconnect_callback__()
self.__last_remote_socket__.disconnect()
def init_channel_name(self, channel_name=None):
"""
With this Method, the channel name for logging can be changed.
:param channel_name: The name for the logging channel
:type channel_name: str
"""
if channel_name is None:
self.logger = logger.getChild(self.DEFAULT_CHANNEL_NAME)
else:
self.logger = logger.getChild(channel_name)
def is_connected(self):
"""
With this Method the connection status can be identified.
:return: True, if a connection is established, otherwise False.
:rtype: bool
"""
return self.__remote_socket__ is not None
def receive(self, timeout=1.0, num=None):
"""
This method returns received data.
:param timeout: The timeout for receiving data (at least after the timeout the method returns data or None).
:type timeout: float
:param num: the number of bytes to receive (use None to get all available data).
:type num: int or None
:return: The received data.
:rtype: bytes
"""
i = 0
while len(self.__rx_buffer__) < (num or 1) and i < timeout * 10:
i += 1
time.sleep(.1)
if len(self.__rx_buffer__) < (num or 1):
return self.__rx_buffer__[0:0]
else:
if num is None:
rv = self.__rx_buffer__
self.__rx_buffer__ = rv[0:0]
else:
rv = self.__rx_buffer__[:num]
self.__rx_buffer__ = self.__rx_buffer__[num:]
return rv
def register_callback(self, callback):
"""
This method stores the callback which is executed, if data is available. You need to execute :func:`receive` of this instance
given as first argument.
:param callback: The callback which will be executed, when data is available.
:type callback:
"""
self.__data_callback__ = callback
def register_connect_callback(self, callback):
"""
This method stores the callback which is executed, if a connection is established.
:param callback: The callback which will be executed, when a connection is established.
:type callback:
"""
self.__connect_callback__ = callback
def register_disconnect_callback(self, callback):
"""
This method stores the callback which is executed, after the connection is lost.
:param callback: The callback which will be executed, after the connection is lost.
:type callback:
"""
self.__disconnect_callback__ = callback
def send(self, data, timeout=1.0):
"""
This method sends data via the initiated communication channel.
:param data: The data to be send over the communication channel.
:type data: bytes
:param timeout: The timeout for sending data (e.g. time to establish new connection).
:type timeout: float
:return: True if data had been sent, otherwise False.
:rtype: bool
"""
i = 0
while not self.is_connected() and i < timeout * 10:
i += 1
time.sleep(.1)
if not self.is_connected():
return False
else:
for tx_data in self.__chunks__(data):
self.__queue__.enqueue(1, self.__tx__, tx_data)
return True
class direct_socket_stp_base(direct_socket_base):
IS_CLIENT = False
def __init__(self, *args, **kwargs):
direct_socket_base.__init__(self, *args, **kwargs)
self.__stp_rx__ = stringtools.stp.stp()
def __chunks__(self, data):
return direct_socket_base.__chunks__(self, stringtools.stp.build_frame(data))
def __clean_buffer__(self):
self.__rx_buffer__ = []
self.logger.debug('%s Cleaning up receive-buffer', self.__log_prefix__())
def __rx__(self, data):
self.logger.debug('%s RX <- %s', self.__log_prefix__(), stringtools.hexlify(data))
msg = self.__stp_rx__.process(data)
if len(msg) > 0:
self.__rx_buffer__.extend(msg)
if len(self.__rx_buffer__) > 0:
if self.__data_callback__ is not None:
self.__data_callback__(self)
def receive(self, timeout=1):
"""
This method returns one received messages via the initiated communication channel.
:param timeout: The timeout for receiving data (at least after the timeout the method returns data or None).
:type timeout: float
:return: The received data.
:rtype: bytes
"""
try:
return direct_socket_base.receive(self, timeout=timeout, num=1)[0]
except TypeError:
return None
class direct_socket_client(direct_socket_base):
"""
Class to create a direct client socket. See also parent :class:`helpers.direct_socket_base`.
**Example:**
.. literalinclude:: helpers/_examples_/direct_socket.py
Will result to the following output:
.. literalinclude:: helpers/_examples_/direct_socket.log
"""
IS_CLIENT = True
def connect(self, remote_socket):
"""
Method to create a connection between this client and a :class:`helpers.direct_socket_server` instance.
:param remote_socket: The remote socket to connect to.
:type remote_socket: :class:`helpers.direct_socket_server`
"""
self.__connect__(remote_socket)
def reconnect(self):
"""
Method to do a reconnect.
.. note:: The :const:`remote_socket` of the prefious :func:`connect` call will be used.
"""
if self.__last_remote_socket__ is not None and self.__remote_socket__ is None:
self.connect(self.__last_remote_socket__)
return True
return False
class direct_socket_server(direct_socket_base):
"""
Class to create a direct server socket. See also parent :class:`helpers.direct_socket_base`.
**Example:**
.. literalinclude:: helpers/_examples_/direct_socket.py
Will result to the following output:
.. literalinclude:: helpers/_examples_/direct_socket.log
"""
IS_CLIENT = False
def __init__(self, *args, **kwargs):
direct_socket_base.__init__(self, *args, **kwargs)
self.logger.info('%s Waiting for incomming connection', self.__log_prefix__())
class direct_socket_stp_client(direct_socket_stp_base):
IS_CLIENT = True
def connect(self, remote_socket):
"""
Method to create a connection between this client and a :class:`helpers.direct_socket_server` instance.
:param remote_socket: The remote socket to connect to.
:type remote_socket: :class:`helpers.direct_socket_server`
"""
self.__connect__(remote_socket)
def reconnect(self):
"""
Method to do a reconnect.
.. note:: The :const:`remote_socket` of the prefious :func:`connect` call will be used.
"""
if self.__last_remote_socket__ is not None and self.__remote_socket__ is None:
self.connect(self.__last_remote_socket__)
return True
return False
class direct_socket_stp_server(direct_socket_stp_base):
IS_CLIENT = False
def __init__(self, *args, **kwargs):
direct_socket_stp_base.__init__(self, *args, **kwargs)
self.logger.info('%s Waiting for incomming connection', self.__log_prefix__())
class ringbuffer(list):
"""
Class for a list with a limited number of elements.
.. note:: On adding Objects, the list will be reduced to the maximum length by deleting entries starting from index 0.
**Example:**
.. literalinclude:: helpers/_examples_/ringbuffer.py
Will result to the following output:
.. literalinclude:: helpers/_examples_/ringbuffer.log
"""
def __init__(self, *args, **kwargs):
self.__max_length__ = kwargs.pop('length')
list.__init__(self, *args, **kwargs)
self.__reduce_list__()
def __reduce_list__(self):
while len(self) > self.__max_length__:
self.pop(0)
def append(self, obj):
rv = list.append(self, obj)
self.__reduce_list__()
return rv
def extend(self, iterable):
rv = list.extend(self, iterable)
self.__reduce_list__()
return rv