#!/usr/bin/env python # -*- coding: utf-8 -*- # """ serial_interface (Serial Interface Module) ========================================== **Author:** * Dirk Alders **Description:** This Module supports a common interface for a serial port **Submodules:** * :mod:`mmod.module.sub1` * :class:`mmod.module.sub2` * :func:`mmod.module.sub2` **Unittest:** See also the :download:`unittest <../../serial_interface/_testresults_/unittest.pdf>` documentation. """ __DEPENDENCIES__ = ['stringtools'] import stringtools import logging import serial import time try: from config import APP_NAME as ROOT_LOGGER_NAME except ImportError: ROOT_LOGGER_NAME = 'root' logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__) class serial_interface(object): """ Class to send and receive data via a serial port. .. note:: All parameters are given to the standard :py:class:`serial.Serial` library """ LOG_PREFIX = 'SER:' def __init__(self, *args, **kwargs): self._args = args self._kwargs = kwargs self.com = None self.open() def open(self): """ This opens the serial port. :returns: True if the port has been successfully opened :rtype: bool """ if self.com is not None: self.close() try: self.com = serial.Serial(*self._args, **self._kwargs) except: self.com = None logger.warning('%s Unable to open comport (%s)', self.LOG_PREFIX, self._args[0]) return False else: logger.debug('%s Opening comport (%s)', self.LOG_PREFIX, self._args[0]) return True def close(self): """ This closes the serial port. """ if self.com is not None: logger.debug('%s Closing comport (%s)', self.LOG_PREFIX, self._args[0]) self.com.close() self.com = None def bytes_in_queue(self): """ This returns the number of available bytes for receiption. :returns: Number of bytes in queue :rtype: int """ if self.com is not None: return self.com.inWaiting() else: return 0 def receive(self, max_length=None): """ This receives data from the serial interface. :param int length: An optinal number of bytes to be received. None will return all available bytes. :returns: The received data :rtype: str """ length = self.bytes_in_queue() if max_length is not None: length = min(length, max_length) if self.com is not None and length > 0: data = self.com.read(length) self._handle_debug('RX <- ', data) return data def send(self, data): """ This sends data via the serial interface. :param str data: The data to be send """ if self.com is not None: self._handle_debug('TX -> ', data) self.com.write(data) def _handle_debug(self, msg, data): logger.debug('%s ' + msg + stringtools.hexlify(data), self.LOG_PREFIX) def __del__(self): self.close()