Initial bluetooth_socket implementation

This commit is contained in:
Dirk Alders 2020-01-26 16:05:31 +01:00
parent 076729d809
commit 8084bf24c9

207
__init__.py Normal file
View File

@ -0,0 +1,207 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
bluetooth_socket (Bluetooth Socket)
===================================
**Author:**
* Dirk Alders <sudo-dirk@mount-mockery.de>
**Description:**
This Module support bluetooth communication
**Submodules:**
* :mod:`mmod.module.sub1`
* :class:`mmod.module.sub2`
* :func:`mmod.module.sub2`
**Unittest:**
See also the :download:`unittest <../../bluetooth_socket/_testresults_/unittest.pdf>` documentation.
"""
import stringtools
import bluetooth
import logging
import time
BT_TIMEOUT = 0.1
def nearby_devices():
return bluetooth.discover_devices()
def services(bt_id):
return bluetooth.find_service(address=bt_id)
class bt_base(object):
def send(self, data, logger=None, log=True):
logit = logger or self.logger
if self.connected():
if log:
logit.debug('BT: TX -> %s', ' '.join(['%02x' % ord(byte) for byte in data]))
try:
self._client_sock.send(data)
except bluetooth.btcommon.BluetoothError, e:
if e.message == 'timed out':
logit.debug('BT: On send - timeout')
return False
else:
logit.warning('BT: On send - %s', str(e))
self._connection_established = False
return False
else:
return False
return True
def receive(self, length=4096, logger=None, log=True):
logit = logger or self.logger
if self.connected():
try:
data = self._client_sock.recv(length)
except bluetooth.btcommon.BluetoothError, e:
if e.message == 'timed out':
# logit.debug('BT: On receive - timeout')
return None
else:
logit.warning('BT: On receive - %s', str(e))
self._connection_established = False
return None
else:
if log:
logit.debug('BT: RX <- %s', ' '.join(['%02x' % ord(byte) for byte in data]))
return data
def connected(self):
return self._connection_established
class stp_bt_base(object):
def send(self, data, logger=None):
logit = logger or self.logger
logit.debug('BT: TX(ETP) -> %s', ' '.join(['%02x' % ord(byte) for byte in data]))
self._bt.send(self, stringtools.stp.build_frame(data), log=False)
def receive(self, timeout=5, logger=None):
logit = logger or self.logger
max_time = time.time() + timeout
while self.connected() and (timeout is None or time.time() < max_time):
data = self._bt.receive(self, 1, log=False)
if data is not None:
max_time += BT_TIMEOUT
msg = self._stp.process(data)
if msg is not None:
logit.debug('BT: RX(ETP) <- %s', ' '.join(['%02x' % ord(byte) for byte in msg]))
return msg
class bt_server(bt_base):
def __init__(self, uuid, port=2, name='BT-RFCOM-Server', logger=None):
self._connection_established = False
self._uuid = uuid
self._name = name
self._server_sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self._server_sock.bind(("Python", port))
self._server_sock.listen(1)
bluetooth.advertise_service(self._server_sock,
name,
service_classes=[self._uuid, bluetooth.SERIAL_PORT_CLASS],
profiles=[bluetooth.SERIAL_PORT_PROFILE]
)
def listen(self, logger=None):
logit = logger or self.logger
logit.info('BT: %s waiting for a connection on RFCOMM "%s", port=%d, uuid=%s', '??:??:??:??:??', self._name, self._server_sock.getsockname()[1], self._uuid)
# TODO: find method to get my bt_id and use it instead of '??:??:??:??:??'
self._client_sock, client_info = self._server_sock.accept()
self._client_sock.settimeout(BT_TIMEOUT)
logit.info("BT: Accepted connection from %s, port=%d", client_info[0], client_info[1])
self._connection_established = True
def listen_receive_loop(self, callback=None, logger=None):
while True:
self.listen()
while self.connected():
data = self.receive()
if callback is not None:
callback(self, data, logger)
def close(self, logger=None):
if self._client_sock is not None:
self._client_sock.close()
self._client_sock = None
if self._server_sock is not None:
self._server_sock.close()
self._server_sock = None
def __del__(self):
self.close()
class stp_bt_server(stp_bt_base, bt_server):
def __init__(self, *args, **kwargs):
bt_server.__init__(self, *args, **kwargs)
self._stp = stringtools.stp.stp()
self._bt = bt_server
class bt_client(bt_base):
def __init__(self, bt_id, uuid, logger=None):
self.logger = logger or logging.getLogger('%s' % (self.__class__.__name__))
self._connection_established = False
self._bt_id = bt_id
self._uuid = uuid
self._client_sock = None
self._connected = False
def connect(self, logger=None):
logit = logger or self.logger
service_matches = bluetooth.find_service(uuid=self._uuid, address=self._bt_id)
# TODO: find_named_service and remove service_id from classes
if len(service_matches) == 0:
logit.debug("BT: couldn't find the RFCOM service %s", self._uuid)
else:
first_match = service_matches[0]
port = first_match["port"]
name = first_match["name"]
host = first_match["host"]
# Create the client socket
logit.debug("connecting to \"%s\" on %s", name, host)
self._client_sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
try:
self._client_sock.connect((host, port))
except bluetooth.btcommon.BluetoothError, e:
if e.message == "(111, 'Connection refused')":
logit.warning('BT: Connection refused')
else:
raise e
else:
self._client_sock.settimeout(0.1)
logit.debug("connection established")
self._connection_established = True
return True
return False
def close(self, logger=None):
if self._client_sock is not None:
self._client_sock.close()
self._client_sock = None
def __del__(self):
self.close()
class stp_bt_client(stp_bt_base, bt_client):
def __init__(self, *args, **kwargs):
bt_client.__init__(self, *args, **kwargs)
self._stp = stringtools.stp.stp()
self._bt = bt_client