tcp_socket/__init__.py

468 line
17 KiB
Python

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
tcp_socket (TCP Socket)
=======================
**Author:**
* Dirk Alders <sudo-dirk@mount-mockery.de>
**Description:**
This Module supports a client/ server tcp socket connection.
**Submodules:**
* :class:`tcp_socket.tcp_client`
* :class:`tcp_socket.tcp_client_stp`
* :class:`tcp_socket.tcp_server`
* :class:`tcp_socket.tcp_server_stp`
**Unittest:**
See also the :download:`unittest <../../tcp_socket/_testresults_/unittest.pdf>` documentation.
"""
__DEPENDENCIES__ = ['stringtools', 'task', ]
import stringtools
import task
import logging
import socket
import time
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
class tcp_base(object):
"""
:param host: The host IP for the TCP socket functionality
:type host: str
:param port: The port for the TCP socket functionality
:type port: int
:param rx_log_lvl: The log level to log incomming RX-data
:type rx_log_lvl: int
This is the base class for other classes in this module.
.. note:: This class is not designed for direct usage.
"""
DEFAULT_CHANNEL_NAME = 'all_others'
RX_LENGTH = 0xff
COM_TIMEOUT = 0.5
IS_CLIENT = False
def __init__(self, host, port, rx_log_lvl=logging.INFO):
self.host = host
self.port = port
self.init_channel_name()
self.__socket__ = None
self.__data_available_callback__ = None
self.__supress_data_available_callback__ = False
self.__connect_callback__ = None
self.__disconnect_callback__ = None
self.__clean_receive_buffer__()
self.__connection__ = None
self.__listening_message_displayed__ = False
self.__client_address__ = None
self.__queue__ = task.threaded_queue()
self.__queue__.enqueue(5, self.__receive_task__, rx_log_lvl)
self.__queue__.run()
def init_channel_name(self, channel_name=None):
if channel_name is None:
self.logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__ + '.' + self.DEFAULT_CHANNEL_NAME)
else:
self.logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__ + '.' + channel_name)
def is_connected(self):
return self.__connection__ is not None
def client_address(self):
"""
:return: The client address.
:rtype: str
This method returns the address of the connected client.
"""
return self.__client_address__[0]
def receive(self, timeout=1, num=None):
"""
:param timeout: The timeout for receiving data (at least after the timeout the method returns data or None).
:type timeout: float
:param num: the number of bytes to receive (use None to get all available data).
:type num: int
:return: The received data.
:rtype: str
This method returns data received via the initiated communication channel.
"""
rv = None
if self.__connection__ is not None:
tm = time.time()
while (num is not None and len(self.__receive_buffer__) < num) or (num is None and len(self.__receive_buffer__) < 1):
if self.__connection__ is None:
return None
if time.time() > tm + timeout:
self.logger.warning('TIMEOUT (%ss): Not enough data in buffer. Requested %s and buffer size is %d.', repr(timeout), repr(num or 'all'), len(self.__receive_buffer__))
return None
time.sleep(0.05)
if num is None:
rv = self.__receive_buffer__
self.__clean_receive_buffer__()
else:
rv = self.__receive_buffer__[:num]
self.__receive_buffer__ = self.__receive_buffer__[num:]
return rv
def send(self, data, timeout=1, log_lvl=logging.INFO):
"""
:param data: The data to be send over the communication channel.
:type data: str
:param timeout: The timeout for sending data (e.g. time to establish new connection).
:type timeout: float
:param rx_log_lvl: The log level to log outgoing TX-data
:type rx_log_lvl: int
:return: True if data had been sent, otherwise False.
:rtype: bool
This method sends data via the initiated communication channel.
"""
tm = time.time()
while time.time() - tm < timeout:
if self.__connection__ is not None:
try:
self.__connection__.sendall(data)
except BlockingIOError:
time.sleep(.1) # try again till timeout exceeds
except BrokenPipeError:
self.logger.exception('Exception while sending data')
self.__connection_lost__()
return False
else:
self.logger.log(log_lvl, 'TX -> "%s"', stringtools.hexlify(data))
return True
else:
time.sleep(.1) # give some time to establish the connection
self.logger.warning('Cound NOT send -> "%s"', stringtools.hexlify(data))
return False
def register_callback(self, callback):
"""
:param callback: The callback which will be executed, when data is available.
:type callback: function
This method sets the callback which is executed, if data is available. You need to execute :func:`receive` of the instance
given with the first argument.
.. note:: The :func:`callback` is executed with these arguments:
:param self: This communication instance
"""
self.__data_available_callback__ = callback
def register_connect_callback(self, callback):
"""
:param callback: The callback which will be executed, when a connect is identified.
:type callback: function
This method sets the callback which is executed, if a connect is identified.
"""
self.__connect_callback__ = callback
def register_disconnect_callback(self, callback):
"""
:param callback: The callback which will be executed, when a disconnect is identified.
:type callback: function
This method sets the callback which is executed, if a disconnect is identified.
"""
self.__disconnect_callback__ = callback
def __receive_task__(self, queue_inst, rx_log_lvl):
if self.__connection__ is not None:
try:
data = self.__connection__.recv(self.RX_LENGTH)
except socket.error as e:
if e.errno != 11:
raise
else:
time.sleep(.05)
else:
if len(data) > 0:
self.logger.log(rx_log_lvl, 'RX <- "%s"', stringtools.hexlify(data))
self.__receive_buffer__ += data
else:
self.__connection_lost__()
self.__call_data_available_callback__()
else:
self.__connect__()
queue_inst.enqueue(5, self.__receive_task__, rx_log_lvl)
def __call_data_available_callback__(self):
if len(self.__receive_buffer__) > 0 and not self.__supress_data_available_callback__ and self.__data_available_callback__ is not None:
self.__supress_data_available_callback__ = True
self.__data_available_callback__(self)
self.__supress_data_available_callback__ = False
def __connection_lost__(self):
self.__listening_message_displayed__ = False
self.__connection__.close()
self.__connection__ = None
self.__client_address__ = None
self.logger.info('Connection lost...')
if self.__disconnect_callback__ is not None:
self.__disconnect_callback__()
def __clean_receive_buffer__(self):
self.logger.debug("Cleaning up receive-buffer")
self.__receive_buffer__ = ""
def close(self):
"""
This method closes the active communication channel, if channel exists.
"""
self.__queue__.stop()
self.__queue__.join()
if self.__connection__ is not None:
self.__connection_lost__()
if self.__socket__ is not None:
self.__socket__.close()
def __del__(self):
self.close()
class tcp_server(tcp_base):
"""
:param host: The host IP for the TCP socket functionality
:type host: str
:param port: The port for the TCP socket functionality
:type port: int
:param rx_log_lvl: The log level to log incomming RX-data
:type rx_log_lvl: int
This class supports a tcp-server transfering a serial stream of bytes (characters).
.. note:: You need a :class:`tcp_client` to communicate with the server.
**Example:**
.. literalinclude:: ../../tcp_socket/_examples_/tcp_socket__tcp_server.py
.. literalinclude:: ../../tcp_socket/_examples_/tcp_socket__tcp_server.log
"""
def __connect__(self):
if self.__socket__ is None:
# Create a TCP/IP socket
self.__socket__ = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Bind the socket to the port
server_address = (self.host, self.port)
self.__socket__.bind(server_address)
# Listen for incoming connections
self.__socket__.listen(1)
self.__socket__.settimeout(self.COM_TIMEOUT)
self.__socket__.setblocking(False)
if not self.__listening_message_displayed__:
self.logger.info('Server listening to %s:%d', self.host, self.port)
self.__listening_message_displayed__ = True
try:
self.__connection__, self.__client_address__ = self.__socket__.accept()
except socket.error as e:
if e.errno != 11:
raise
else:
time.sleep(.05)
else:
self.logger.info('Connection established... (from %s)', self.client_address())
self.__clean_receive_buffer__()
self.__connection__.setblocking(False)
if self.__connect_callback__ is not None:
self.__connect_callback__()
class tcp_client(tcp_base):
"""
:param host: The host IP for the TCP socket functionality
:type host: str
:param port: The port for the TCP socket functionality
:type port: int
:param rx_log_lvl: The log level to log incomming RX-data
:type rx_log_lvl: int
This class supports a tcp-client transfering a serial stream of bytes (characters).
.. note:: You need a running :class:`tcp_server` listening at the given IP and port to communicate.
**Example:**
.. literalinclude:: ../../tcp_socket/_examples_/tcp_socket__tcp_client.py
.. literalinclude:: ../../tcp_socket/_examples_/tcp_socket__tcp_client.log
"""
IS_CLIENT = True
def __connect__(self):
if self.__socket__ is None:
# Create a TCP/IP socket
self.__socket__ = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__socket__.setblocking(False)
# Connect the socket to the port where the server is listening
try:
self.__socket__.connect((self.host, self.port))
except socket.error as e:
if e.errno == 9:
self.__socket__.close()
elif e.errno != 115 and e.errno != 111 and e.errno != 114:
raise
else:
self.__connection__ = None
time.sleep(.05)
else:
self.logger.info('Connection to %s:%s established', self.host, self.port)
self.__clean_receive_buffer__()
self.__connection__ = self.__socket__
if self.__connect_callback__ is not None:
self.__connect_callback__()
def __connection_lost__(self):
self.__socket__ = None
tcp_base.__connection_lost__(self)
def reconnect(self):
self.__connect__()
class tcp_base_stp(tcp_base):
"""
:param host: The host IP for the TCP socket functionality
:type host: str
:param port: The port for the TCP socket functionality
:type port: int
:param rx_log_lvl: The log level to log incomming RX-data
:type rx_log_lvl: int
This is the base class for other classes in this module.
.. note:: This class is not designed for direct usage.
"""
def __init__(self, host, port, rx_log_lvl=logging.INFO):
tcp_base.__init__(self, host, port, rx_log_lvl=rx_log_lvl)
self.__stp__ = stringtools.stp.stp()
def __clean_receive_buffer__(self):
self.logger.debug("Cleaning up receive-buffer")
self.__receive_buffer__ = []
def receive(self, timeout=1):
"""
:param timeout: The timeout for receiving data (at least after the timeout the method returns data or None).
:type timeout: float
:return: The received data.
:rtype: str
This method returns one received messages via the initiated communication channel.
"""
try:
return tcp_base.receive(self, timeout=timeout, num=1)[0]
except TypeError:
return None
def send(self, data, timeout=1, log_lvl=logging.INFO):
"""
:param data: The message to be send over the communication channel.
:type data: str
:param timeout: The timeout for sending data (e.g. time to establish new connection).
:type timeout: float
:param rx_log_lvl: The log level to log outgoing TX-data
:type rx_log_lvl: int
:return: True if data had been sent, otherwise False.
:rtype: bool
This method sends one message via the initiated communication channel.
"""
if tcp_base.send(self, stringtools.stp.build_frame(data), timeout=timeout, log_lvl=logging.DEBUG):
self.logger.log(log_lvl, 'TX -> "%s"', stringtools.hexlify(data))
return True
else:
return False
def __receive_task__(self, queue_inst, rx_log_lvl):
if self.__connection__ is not None:
try:
data = self.__connection__.recv(self.RX_LENGTH)
except socket.error as e:
if e.errno == 104:
self.__connection_lost__()
elif e.errno != 11:
raise
else:
time.sleep(.05)
else:
if len(data) > 0:
self.logger.debug('-- <- "%s"', stringtools.hexlify(data))
content = self.__stp__.process(data)
for msg in content:
self.logger.log(rx_log_lvl, 'RX <- "%s"', stringtools.hexlify(msg))
self.__receive_buffer__.append(msg)
else:
self.__connection_lost__()
self.__call_data_available_callback__()
else:
self.__connect__()
queue_inst.enqueue(5, self.__receive_task__, rx_log_lvl)
class tcp_server_stp(tcp_server, tcp_base_stp):
"""
:param host: The host IP for the TCP socket functionality
:type host: str
:param port: The port for the TCP socket functionality
:type port: int
:param rx_log_lvl: The log level to log incomming RX-data
:type rx_log_lvl: int
This class supports a tcp-server transfering a string. The string will be packed on send and unpacked on receive.
See :mod:`stp` for more information on packing and unpacking.
.. note:: You need a :class:`tcp_client_stp` to communicate with the server.
**Example:**
.. literalinclude:: ../../tcp_socket/_examples_/tcp_socket__tcp_server_stp.py
.. literalinclude:: ../../tcp_socket/_examples_/tcp_socket__tcp_server_stp.log
"""
pass
class tcp_client_stp(tcp_client, tcp_base_stp):
"""
:param host: The host IP for the TCP socket functionality
:type host: str
:param port: The port for the TCP socket functionality
:type port: int
This class supports a tcp-server transfering a string. The string will be packed on send and unpacked on receive.
See :mod:`stp` for more information on packing and unpacking.
.. note:: You need a running :class:`tcp_server_stp` listening at the given IP and port to communicate.
**Example:**
.. literalinclude:: ../../tcp_socket/_examples_/tcp_socket__tcp_server_stp.py
.. literalinclude:: ../../tcp_socket/_examples_/tcp_socket__tcp_server_stp.log
"""
pass