123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- """
- serial_interface (Serial Interface Module)
- ==========================================
-
- **Author:**
-
- * Dirk Alders <sudo-dirk@mount-mockery.de>
-
- **Description:**
-
- This Module supports a common interface for a serial port
-
- **Submodules:**
-
- * :mod:`mmod.module.sub1`
- * :class:`mmod.module.sub2`
- * :func:`mmod.module.sub2`
-
- **Unittest:**
-
- See also the :download:`unittest <../../serial_interface/_testresults_/unittest.pdf>` documentation.
- """
- __DEPENDENCIES__ = ['stringtools']
-
- import stringtools
-
- import logging
- import serial
- import time
-
- try:
- from config import APP_NAME as ROOT_LOGGER_NAME
- except ImportError:
- ROOT_LOGGER_NAME = 'root'
- logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
-
-
- class serial_interface(object):
- """
- Class to send and receive data via a serial port.
-
- .. note:: All parameters are given to the standard :py:class:`serial.Serial` library
- """
- LOG_PREFIX = 'SER:'
-
- def __init__(self, *args, **kwargs):
- self._args = args
- self._kwargs = kwargs
- self.com = None
- self.open()
-
- def open(self):
- """
- This opens the serial port.
-
- :returns: True if the port has been successfully opened
- :rtype: bool
- """
- if self.com is not None:
- self.close()
- try:
- self.com = serial.Serial(*self._args, **self._kwargs)
- except:
- self.com = None
- logger.warning('%s Unable to open comport (%s)', self.LOG_PREFIX, self._args[0])
- return False
- else:
- logger.debug('%s Opening comport (%s)', self.LOG_PREFIX, self._args[0])
- return True
-
- def close(self):
- """
- This closes the serial port.
- """
- if self.com is not None:
- logger.debug('%s Closing comport (%s)', self.LOG_PREFIX, self._args[0])
- self.com.close()
- self.com = None
-
- def bytes_in_queue(self):
- """
- This returns the number of available bytes for receiption.
-
- :returns: Number of bytes in queue
- :rtype: int
- """
- if self.com is not None:
- return self.com.inWaiting()
- else:
- return 0
-
- def receive(self, max_length=None):
- """
- This receives data from the serial interface.
-
- :param int length: An optinal number of bytes to be received. None will return all available bytes.
- :returns: The received data
- :rtype: str
- """
- length = self.bytes_in_queue()
- if max_length is not None:
- length = min(length, max_length)
- if self.com is not None and length > 0:
- data = self.com.read(length)
- self._handle_debug('RX <- ', data)
- return data
-
- def send(self, data):
- """
- This sends data via the serial interface.
-
- :param str data: The data to be send
- """
- if self.com is not None:
- self._handle_debug('TX -> ', data)
- self.com.write(data)
-
- def _handle_debug(self, msg, data):
- logger.debug('%s ' + msg + stringtools.hexlify(data), self.LOG_PREFIX)
-
- def __del__(self):
- self.close()
|