#!/usr/bin/env python # -*- coding: utf-8 -*- # """ socket_protocol (Socket Protocol) ================================= **Author:** * Dirk Alders **Description:** This Module supports point to point communication for client-server issues. **Submodules:** * :class:`socket_protocol.struct_json_protocol` * :class:`socket_protocol.pure_json_protocol` **Unittest:** See also the :download:`unittest <../../socket_protocol/_testresults_/unittest.pdf>` documentation. """ __DEPENDENCIES__ = ['stringtools'] import stringtools import binascii import hashlib import json import logging import os import struct import sys import time try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) __DESCRIPTION__ = """The Module {\\tt %s} is designed to pack and unpack data for serial transportation. For more Information read the sphinx documentation.""" % __name__.replace('_', '\_') """The Module Description""" __INTERPRETER__ = (2, 3) """The Tested Interpreter-Versions""" class callback_storage(dict): DEFAULT_CHANNEL_NAME = 'all_others' def __init__(self, channel_name): self.init_channel_name(channel_name) dict.__init__(self) def init_channel_name(self, channel_name): if channel_name is None: self.logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__ + '.' + self.DEFAULT_CHANNEL_NAME) else: self.logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__ + '.' + channel_name) def get(self, service_id, data_id): if service_id is not None and data_id is not None: try: return self[service_id][data_id] except KeyError: pass # nothing to append if data_id is not None: try: return self[None][data_id] except KeyError: pass # nothing to append if service_id is not None: try: return self[service_id][None] except KeyError: pass # nothing to append try: return self[None][None] except KeyError: pass # nothing to append return (None, None, None) def add(self, service_id, data_id, callback, *args, **kwargs): cb_data = self.get(service_id, data_id) if cb_data != (None, None, None): self.logger.warning("Overwriting existing callback %s for service_id (%s) and data_id (%s) to %s!", repr(cb_data[0].__name__), repr(service_id), repr(data_id), repr(callback.__name__)) if service_id not in self: self[service_id] = {} self[service_id][data_id] = (callback, args, kwargs) class data_storage(dict): KEY_STATUS = 'status' KEY_SERVICE_ID = 'service_id' KEY_DATA_ID = 'data_id' KEY_DATA = 'data' def __init__(self, *args, **kwargs): dict.__init__(self, *args, **kwargs) def get_status(self, default=None): return self.get(self.KEY_STATUS, default) def get_service_id(self, default=None): return self.get(self.KEY_SERVICE_ID, default) def get_data_id(self, default=None): return self.get(self.KEY_DATA_ID, default) def get_data(self, default=None): return self.get(self.KEY_DATA, default) class struct_json_protocol(object): """ :param comm_instance: a communication instance supportin at least these functions: :func:`register_callback`, :func:`register_disconnect_callback`, :func:`send`. :type comm_instance: instance :param secret: A secret (e.g. created by ``binascii.hexlify(os.urandom(24))``). :type secret: str This communication protocol supports to transfer a Service-ID, Data-ID and Data. The transmitted data is shorter than :class:`pure_json_protocol`. .. note:: This class is here for compatibility reasons. Usage of :class:`pure_json_protocol` is recommended. **Example:** Server: .. literalinclude:: ../../socket_protocol/_examples_/socket_protocol__struct_json_protocol_server.py .. literalinclude:: ../../socket_protocol/_examples_/socket_protocol__struct_json_protocol_server.log Client: .. literalinclude:: ../../socket_protocol/_examples_/socket_protocol__struct_json_protocol_client.py .. literalinclude:: ../../socket_protocol/_examples_/socket_protocol__struct_json_protocol_client.log """ DEFAULT_CHANNEL_NAME = 'all_others' SID_AUTH_SEED_REQUEST = 1 SID_AUTH_KEY_REQUEST = 2 SID_AUTH_KEY_CHECK_REQUEST = 3 SID_AUTH_KEY_CHECK_RESPONSE = 4 SID_CHANNEL_NAME_REQUEST = 5 SID_CHANNEL_NAME_RESPONSE = 6 SID_READ_REQUEST = 10 SID_READ_RESPONSE = 11 SID_WRITE_REQUEST = 20 SID_WRITE_RESPONSE = 21 SID_EXECUTE_REQUEST = 30 SID_EXECUTE_RESPONSE = 31 SID_RESPONSE_DICT = {SID_AUTH_SEED_REQUEST: SID_AUTH_KEY_REQUEST, SID_AUTH_KEY_REQUEST: SID_AUTH_KEY_CHECK_REQUEST, SID_AUTH_KEY_CHECK_REQUEST: SID_AUTH_KEY_CHECK_RESPONSE, SID_CHANNEL_NAME_REQUEST: SID_CHANNEL_NAME_RESPONSE, SID_READ_REQUEST: SID_READ_RESPONSE, SID_WRITE_REQUEST: SID_WRITE_RESPONSE, SID_EXECUTE_REQUEST: SID_EXECUTE_RESPONSE} SID_AUTH_LIST = [ SID_AUTH_SEED_REQUEST, SID_AUTH_KEY_REQUEST, SID_AUTH_KEY_CHECK_REQUEST, SID_AUTH_KEY_CHECK_RESPONSE, SID_CHANNEL_NAME_REQUEST, SID_CHANNEL_NAME_RESPONSE ] STATUS_OKAY = 0 STATUS_BUFFERING_UNHANDLED_REQUEST = 1 STATUS_AUTH_REQUIRED = 2 STATUS_SERVICE_OR_DATA_UNKNOWN = 3 STATUS_CHECKSUM_ERROR = 4 STATUS_OPERATION_NOT_PERMITTED = 5 STATUS_NAMES = {STATUS_OKAY: 'Okay', STATUS_BUFFERING_UNHANDLED_REQUEST: 'Request has no callback. Data buffered.', STATUS_AUTH_REQUIRED: 'Authentification required', STATUS_SERVICE_OR_DATA_UNKNOWN: 'Service or Data unknown', STATUS_CHECKSUM_ERROR: 'Checksum Error', STATUS_OPERATION_NOT_PERMITTED: 'Operation not permitted'} AUTH_STATE_UNKNOWN_CLIENT = 0 AUTH_STATE_SEED_REQUESTED = 1 AUTH_STATE_SEED_TRANSFERRED = 2 AUTH_STATE_KEY_TRANSFERRED = 3 AUTH_STATE_TRUSTED_CLIENT = 4 AUTH_STATUS_NAMES = {AUTH_STATE_UNKNOWN_CLIENT: 'Unknown Client', AUTH_STATE_SEED_REQUESTED: 'Seed was requested', AUTH_STATE_SEED_TRANSFERRED: 'Seed has been sent', AUTH_STATE_KEY_TRANSFERRED: 'Key has been sent', AUTH_STATE_TRUSTED_CLIENT: 'Trusted Client'} def __init__(self, comm_instance, secret=None, auto_auth=False, channel_name=None): self.__comm_inst__ = comm_instance self.__secret__ = secret self.__auto_auth__ = auto_auth # self.__callbacks__ = callback_storage(channel_name) self.__init_channel_name__(channel_name) # self.__clean_receive_buffer__() self.__callbacks__.add(self.SID_AUTH_SEED_REQUEST, 0, self.__authentificate_create_seed__) self.__callbacks__.add(self.SID_AUTH_KEY_REQUEST, 0, self.__authentificate_create_key__) self.__callbacks__.add(self.SID_AUTH_KEY_CHECK_REQUEST, 0, self.__authentificate_check_key__) self.__callbacks__.add(self.SID_AUTH_KEY_CHECK_RESPONSE, 0, self.__authentificate_process_feedback__) self.__callbacks__.add(self.SID_CHANNEL_NAME_REQUEST, 0, self.__channel_name_request__) self.__callbacks__.add(self.SID_CHANNEL_NAME_RESPONSE, 0, self.__channel_name_response__) self.__authentification_state_reset__() self.__seed__ = None self.__comm_inst__.register_callback(self.__data_available_callback__) self.__comm_inst__.register_connect_callback(self.__connection_established__) self.__comm_inst__.register_disconnect_callback(self.__authentification_state_reset__) def __init_channel_name__(self, channel_name): self.__comm_inst__.init_channel_name(channel_name) self.__callbacks__.init_channel_name(channel_name) if channel_name is None: self.logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__ + '.' + self.DEFAULT_CHANNEL_NAME) else: self.logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__ + '.' + channel_name) @property def __channel_name__(self): cn = self.logger.name.split('.')[-1] if cn != self.DEFAULT_CHANNEL_NAME: return cn def __log_prefix__(self): return ' SP client:' if self.__comm_inst__.IS_CLIENT else ' SP server:' def connected(self): return self.__comm_inst__.is_connected() def connection_established(self): return self.connected() and (self.__secret__ is None or self.check_authentification_state()) def reconnect(self): return self.__comm_inst__.reconnect() def __connection_established__(self): self.__clean_receive_buffer__() if not self.__comm_inst__.IS_CLIENT: self.send(self.SID_CHANNEL_NAME_REQUEST, 0, self.__channel_name__) if self.__auto_auth__ and self.__comm_inst__.IS_CLIENT and self.__secret__ is not None: self.authentificate() def __channel_name_request__(self, msg): data = msg.get_data() if data is None: return self.STATUS_OKAY, self.__channel_name__ else: prev_channel_name = self.__channel_name__ self.__init_channel_name__(data) if prev_channel_name is not None and prev_channel_name != data: self.logger.warning('%s overwriting user defined channel name from %s to %s', self.__log_prefix__(), repr(prev_channel_name), repr(data)) elif prev_channel_name is None: self.logger.info('%s channel name is now %s', self.__log_prefix__(), repr(self.__channel_name__)) return self.STATUS_OKAY, None def __channel_name_response__(self, msg): data = msg.get_data() if self.__channel_name__ is None and data is not None: self.__init_channel_name__(data) self.logger.info('%s channel name is now %s', self.__log_prefix__(), repr(self.__channel_name__)) return self.STATUS_OKAY, None def __authentification_state_reset__(self): self.logger.info("%s Resetting authentification state to AUTH_STATE_UNKNOWN_CLIENT", self.__log_prefix__()) self.__authentification_state__ = self.AUTH_STATE_UNKNOWN_CLIENT def __analyse_frame__(self, frame): status, service_id, data_id = struct.unpack('>III', frame[0:12]) if sys.version_info >= (3, 0): data = json.loads(frame[12:-1].decode('utf-8')) else: data = json.loads(frame[12:-1]) return self.__mk_msg__(status, service_id, data_id, data) def __build_frame__(self, service_id, data_id, data, status=STATUS_OKAY): frame = struct.pack('>III', status, service_id, data_id) if sys.version_info >= (3, 0): frame += bytes(json.dumps(data), 'utf-8') frame += self.__calc_chksum__(frame) else: frame += json.dumps(data) frame += self.__calc_chksum__(frame) return frame def __calc_chksum__(self, raw_data): chksum = 0 for b in raw_data: if sys.version_info >= (3, 0): chksum ^= b else: chksum ^= ord(b) if sys.version_info >= (3, 0): return bytes([chksum]) else: return chr(chksum) def __check_frame_checksum__(self, frame): return self.__calc_chksum__(frame[:-1]) == frame[-1:] def __data_available_callback__(self, comm_inst): frame = comm_inst.receive() if not self.__check_frame_checksum__(frame): self.logger.warning("%s Received message has a wrong checksum and will be ignored: %s.", self.__log_prefix__(), stringtools.hexlify(frame)) else: msg = self.__analyse_frame__(frame) self.logger.info( '%s RX <- status: %s, service_id: %s, data_id: %s, data: "%s"', self.__log_prefix__(), repr(msg.get_status()), repr(msg.get_service_id()), repr(msg.get_data_id()), repr(msg.get_data()) ) callback, args, kwargs = self.__callbacks__.get(msg.get_service_id(), msg.get_data_id()) if msg.get_service_id() in self.SID_RESPONSE_DICT.keys(): # # REQUEST RECEIVED # if self.__secret__ is not None and not self.check_authentification_state() and msg.get_service_id() not in self.SID_AUTH_LIST: status = self.STATUS_AUTH_REQUIRED data = None self.logger.warning("%s Received message needs authentification: %s. Sending negative response.", self.__log_prefix__(), self.AUTH_STATUS_NAMES.get(self.__authentification_state__, 'Unknown authentification status!')) elif callback is None: self.logger.warning("%s Received message with no registered callback. Sending negative response.", self.__log_prefix__()) status = self.STATUS_BUFFERING_UNHANDLED_REQUEST data = None else: try: self.logger.debug("%s Executing callback %s to process received data", self.__log_prefix__(), callback.__name__) status, data = callback(msg, *args, **kwargs) except TypeError: raise TypeError('Check return value of callback function {callback_name} for service_id {service_id} and data_id {data_id}'.format(callback_name=callback.__name__, service_id=repr(msg.get_service_id()), data_id=repr(msg.get_data_id()))) self.send(self.SID_RESPONSE_DICT[msg.get_service_id()], msg.get_data_id(), data, status=status) else: # # RESPONSE RECEIVED # if msg.get_status() not in [self.STATUS_OKAY]: self.logger.warning("%s Received message has a peculiar status: %s", self.__log_prefix__(), self.STATUS_NAMES.get(msg.get_status(), 'Unknown status response!')) if callback is None: status = self.STATUS_OKAY data = None self.__buffer_received_data__(msg) else: try: self.logger.debug("%s Executing callback %s to process received data", self.__log_prefix__(), callback.__name__) status, data = callback(msg, *args, **kwargs) except TypeError: raise TypeError('Check return value of callback function {callback_name} for service_id {service_id} and data_id {data_id}'.format(callback_name=callback.__name__, service_id=repr(msg.get_service_id()), data_id=repr(msg.get_data_id()))) def __buffer_received_data__(self, msg): if not msg.get_service_id() in self.__msg_buffer__: self.__msg_buffer__[msg.get_service_id()] = {} if not msg.get_data_id() in self.__msg_buffer__[msg.get_service_id()]: self.__msg_buffer__[msg.get_service_id()][msg.get_data_id()] = [] self.__msg_buffer__[msg.get_service_id()][msg.get_data_id()].append(msg) self.logger.debug("%s Message data is stored in buffer and is now ready to be retrieved by receive method", self.__log_prefix__()) def __clean_receive_buffer__(self): self.logger.debug("%s Cleaning up receive-buffer", self.__log_prefix__()) self.__msg_buffer__ = {} def receive(self, service_id, data_id, timeout=1): data = None cnt = 0 while data is None and cnt < timeout * 10: try: data = self.__msg_buffer__.get(service_id, {}).get(data_id, []).pop(0) except IndexError: data = None cnt += 1 time.sleep(0.1) if data is None and cnt >= timeout * 10: self.logger.warning('%s TIMEOUT (%ss): Requested data (service_id: %s; data_id: %s) not in buffer.', self.__log_prefix__(), repr(timeout), repr(service_id), repr(data_id)) return data def __mk_msg__(self, status, service_id, data_id, data): return data_storage({data_storage.KEY_DATA_ID: data_id, data_storage.KEY_SERVICE_ID: service_id, data_storage.KEY_STATUS: status, data_storage.KEY_DATA: data}) def send(self, service_id, data_id, data, status=STATUS_OKAY, timeout=2, log_lvl=logging.INFO): """ :param service_id: The Service-ID for the message. See class definitions starting with ``SERVICE_``. :type service_id: int :param data_id: The Data-ID for the message. :type data_id: int :param data: The data to be transfered. The data needs to be json compatible. :type data: str :param status: The Status for the message. All requests should have ``STATUS_OKAY``. :type status: int :param timeout: The timeout for sending data (e.g. time to establish new connection). :type timeout: float :param rx_log_lvl: The log level to log outgoing TX-data :type rx_log_lvl: int :return: True if data had been sent, otherwise False. :rtype: bool This methods sends out a message with the given content. """ self.logger.log(log_lvl, '%s TX -> status: %d, service_id: %d, data_id: %d, data: "%s"', self.__log_prefix__(), status, service_id, data_id, repr(data)) return self.__comm_inst__.send(self.__build_frame__(service_id, data_id, data, status), timeout=timeout, log_lvl=logging.DEBUG) def register_callback(self, service_id, data_id, callback, *args, **kwargs): """ :param service_id: The Service-ID for the message. See class definitions starting with ``SID_``. :type service_id: int :param data_id: The Data-ID for the message. :type data_id: int :returns: True, if registration was successfull; False, if registration failed (e.g. existance of a callback for this configuration) :rtype: bool This method registers a callback for the given parameters. Givin ``None`` means, that all Service-IDs or all Data-IDs are used. If a message hitting these parameters has been received, the callback will be executed. .. note:: The :func:`callback` is priorised in the following order: * Callbacks with defined Service-ID and Data-ID. * Callbacks with a defined Data-ID. * Callbacks with a defined Service-ID. * Unspecific Callbacks .. note:: The :func:`callback` is executed with these arguments: :param msg: A :class:`dict` containing all message information. :returns: status (see class definition starting with ``STATUS_``), response_data (JSON compatible object) """ self.__callbacks__.add(service_id, data_id, callback, *args, **kwargs) def authentificate(self, timeout=2): """ :param timeout: The timeout for the authentification (requesting seed, sending key and getting authentification_feedback). :type timeout: float :returns: True, if authentification was successfull; False, if not. :rtype: bool This method authetificates the client at the server. .. note:: An authentification will only processed, if a secret had been given on initialisation. .. note:: Client and Server needs to use the same secret. """ if self.__secret__ is not None: self.__authentification_state__ = self.AUTH_STATE_SEED_REQUESTED self.logger.info("%s Requesting seed for authentification", self.__log_prefix__()) self.send(self.SID_AUTH_SEED_REQUEST, 0, None) cnt = 0 while cnt < timeout * 10: time.sleep(0.1) if self.__authentification_state__ == self.AUTH_STATE_TRUSTED_CLIENT: return True elif self.__authentification_state__ == self.AUTH_STATE_UNKNOWN_CLIENT: break cnt += 1 return False def check_authentification_state(self): """ :return: True, if authentification state is okay, otherwise False :rtype: bool """ return self.__authentification_state__ == self.AUTH_STATE_TRUSTED_CLIENT def __authentificate_salt_and_hash__(self, seed): if sys.version_info >= (3, 0): return hashlib.sha512(bytes(seed, 'utf-8') + self.__secret__).hexdigest() else: return hashlib.sha512(seed.encode('utf-8') + self.__secret__.encode('utf-8')).hexdigest() def __authentificate_create_seed__(self, msg): self.logger.info("%s Got seed request, sending seed for authentification", self.__log_prefix__()) self.__authentification_state__ = self.AUTH_STATE_SEED_TRANSFERRED if sys.version_info >= (3, 0): self.__seed__ = binascii.hexlify(os.urandom(32)).decode('utf-8') else: self.__seed__ = binascii.hexlify(os.urandom(32)) return self.STATUS_OKAY, self.__seed__ def __authentificate_create_key__(self, msg): self.logger.info("%s Got seed, sending key for authentification", self.__log_prefix__()) self.__authentification_state__ = self.AUTH_STATE_KEY_TRANSFERRED seed = msg.get_data() key = self.__authentificate_salt_and_hash__(seed) return self.STATUS_OKAY, key def __authentificate_check_key__(self, msg): key = msg.get_data() if key == self.__authentificate_salt_and_hash__(self.__seed__): self.__authentification_state__ = self.AUTH_STATE_TRUSTED_CLIENT self.logger.info("%s Got correct key, sending positive authentification feedback", self.__log_prefix__()) return self.STATUS_OKAY, True else: self.__authentification_state__ = self.AUTH_STATE_UNKNOWN_CLIENT self.logger.info("%s Got incorrect key, sending negative authentification feedback", self.__log_prefix__()) return self.STATUS_OKAY, False def __authentificate_process_feedback__(self, msg): feedback = msg.get_data() if feedback: self.__authentification_state__ = self.AUTH_STATE_TRUSTED_CLIENT self.logger.info("%s Got positive authentification feedback", self.__log_prefix__()) else: self.__authentification_state__ = self.AUTH_STATE_UNKNOWN_CLIENT self.logger.warning("%s Got negative authentification feedback", self.__log_prefix__()) return self.STATUS_OKAY, None class pure_json_protocol(struct_json_protocol): """ :param comm_instance: a communication instance supportin at least these functions: :func:`register_callback`, :func:`register_disconnect_callback`, :func:`send`. :type comm_instance: instance :param secret: A secret (e.g. created by ``binascii.hexlify(os.urandom(24))``). :type secret: str This communication protocol supports to transfer a Service-ID, Data-ID and Data. **Example:** Server: .. literalinclude:: ../../socket_protocol/_examples_/socket_protocol__pure_json_protocol_server.py .. literalinclude:: ../../socket_protocol/_examples_/socket_protocol__pure_json_protocol_server.log Client: .. literalinclude:: ../../socket_protocol/_examples_/socket_protocol__pure_json_protocol_client.py .. literalinclude:: ../../socket_protocol/_examples_/socket_protocol__pure_json_protocol_client.log """ def __init__(self, *args, **kwargs): struct_json_protocol.__init__(self, *args, **kwargs) def __build_frame__(self, service_id, data_id, data, status=struct_json_protocol.STATUS_OKAY): data_frame = json.dumps(self.__mk_msg__(status, service_id, data_id, data)) if sys.version_info >= (3, 0): data_frame = bytes(data_frame, 'utf-8') checksum = self.__calc_chksum__(data_frame) return data_frame + checksum def __analyse_frame__(self, frame): if sys.version_info >= (3, 0): return data_storage(json.loads(frame[:-4].decode('utf-8'))) else: return data_storage(json.loads(frame[:-4])) def __calc_chksum__(self, raw_data): return struct.pack('>I', binascii.crc32(raw_data) & 0xffffffff) def __check_frame_checksum__(self, frame): return self.__calc_chksum__(frame[:-4]) == frame[-4:]