123 рядки
3.2 KiB
Python
123 рядки
3.2 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
"""
|
|
serial_interface (Serial Interface Module)
|
|
==========================================
|
|
|
|
**Author:**
|
|
|
|
* Dirk Alders <sudo-dirk@mount-mockery.de>
|
|
|
|
**Description:**
|
|
|
|
This Module supports a common interface for a serial port
|
|
|
|
**Submodules:**
|
|
|
|
* :mod:`mmod.module.sub1`
|
|
* :class:`mmod.module.sub2`
|
|
* :func:`mmod.module.sub2`
|
|
|
|
**Unittest:**
|
|
|
|
See also the :download:`unittest <../../serial_interface/_testresults_/unittest.pdf>` documentation.
|
|
"""
|
|
__DEPENDENCIES__ = ['stringtools']
|
|
|
|
import stringtools
|
|
|
|
import logging
|
|
import serial
|
|
import time
|
|
|
|
logger_name = 'SERIAL_INTERFACE'
|
|
logger = logging.getLogger(logger_name)
|
|
|
|
|
|
class serial_interface(object):
|
|
"""
|
|
Class to send and receive data via a serial port.
|
|
|
|
.. note:: All parameters are given to the standard :py:class:`serial.Serial` library
|
|
"""
|
|
LOG_PREFIX = 'SER:'
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._args = args
|
|
self._kwargs = kwargs
|
|
self.com = None
|
|
self.open()
|
|
|
|
def open(self):
|
|
"""
|
|
This opens the serial port.
|
|
|
|
:returns: True if the port has been successfully opened
|
|
:rtype: bool
|
|
"""
|
|
if self.com is not None:
|
|
self.close()
|
|
try:
|
|
self.com = serial.Serial(*self._args, **self._kwargs)
|
|
except:
|
|
self.com = None
|
|
logger.warning('%s Unable to open comport (%s)', self.LOG_PREFIX, self._args[0])
|
|
return False
|
|
else:
|
|
logger.debug('%s Opening comport (%s)', self.LOG_PREFIX, self._args[0])
|
|
return True
|
|
|
|
def close(self):
|
|
"""
|
|
This closes the serial port.
|
|
"""
|
|
if self.com is not None:
|
|
logger.debug('%s Closing comport (%s)', self.LOG_PREFIX, self._args[0])
|
|
self.com.close()
|
|
self.com = None
|
|
|
|
def bytes_in_queue(self):
|
|
"""
|
|
This returns the number of available bytes for receiption.
|
|
|
|
:returns: Number of bytes in queue
|
|
:rtype: int
|
|
"""
|
|
if self.com is not None:
|
|
return self.com.inWaiting()
|
|
else:
|
|
return 0
|
|
|
|
def receive(self, max_length=None):
|
|
"""
|
|
This receives data from the serial interface.
|
|
|
|
:param int length: An optinal number of bytes to be received. None will return all available bytes.
|
|
:returns: The received data
|
|
:rtype: str
|
|
"""
|
|
length = self.bytes_in_queue()
|
|
if max_length is not None:
|
|
length = min(length, max_length)
|
|
if self.com is not None and length > 0:
|
|
data = self.com.read(length)
|
|
self._handle_debug('RX <- ', data)
|
|
return data
|
|
|
|
def send(self, data):
|
|
"""
|
|
This sends data via the serial interface.
|
|
|
|
:param str data: The data to be send
|
|
"""
|
|
if self.com is not None:
|
|
self._handle_debug('TX -> ', data)
|
|
self.com.write(data)
|
|
|
|
def _handle_debug(self, msg, data):
|
|
logger.debug('%s ' + msg + stringtools.hexlify(data), self.LOG_PREFIX)
|
|
|
|
def __del__(self):
|
|
self.close()
|