From 8084bf24c93cf60931f8b43321bd4b802c2c21e3 Mon Sep 17 00:00:00 2001 From: Dirk Alders Date: Sun, 26 Jan 2020 16:05:31 +0100 Subject: [PATCH] Initial bluetooth_socket implementation --- __init__.py | 207 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 __init__.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..0c7072b --- /dev/null +++ b/__init__.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +""" +bluetooth_socket (Bluetooth Socket) +=================================== + +**Author:** + +* Dirk Alders + +**Description:** + + This Module support bluetooth communication + +**Submodules:** + +* :mod:`mmod.module.sub1` +* :class:`mmod.module.sub2` +* :func:`mmod.module.sub2` + +**Unittest:** + + See also the :download:`unittest <../../bluetooth_socket/_testresults_/unittest.pdf>` documentation. +""" +import stringtools + +import bluetooth +import logging +import time + +BT_TIMEOUT = 0.1 + + +def nearby_devices(): + return bluetooth.discover_devices() + + +def services(bt_id): + return bluetooth.find_service(address=bt_id) + + +class bt_base(object): + def send(self, data, logger=None, log=True): + logit = logger or self.logger + if self.connected(): + if log: + logit.debug('BT: TX -> %s', ' '.join(['%02x' % ord(byte) for byte in data])) + try: + self._client_sock.send(data) + except bluetooth.btcommon.BluetoothError, e: + if e.message == 'timed out': + logit.debug('BT: On send - timeout') + return False + else: + logit.warning('BT: On send - %s', str(e)) + self._connection_established = False + return False + else: + return False + return True + + def receive(self, length=4096, logger=None, log=True): + logit = logger or self.logger + if self.connected(): + try: + data = self._client_sock.recv(length) + except bluetooth.btcommon.BluetoothError, e: + if e.message == 'timed out': + # logit.debug('BT: On receive - timeout') + return None + else: + logit.warning('BT: On receive - %s', str(e)) + self._connection_established = False + return None + else: + if log: + logit.debug('BT: RX <- %s', ' '.join(['%02x' % ord(byte) for byte in data])) + return data + + def connected(self): + return self._connection_established + + +class stp_bt_base(object): + def send(self, data, logger=None): + logit = logger or self.logger + + logit.debug('BT: TX(ETP) -> %s', ' '.join(['%02x' % ord(byte) for byte in data])) + self._bt.send(self, stringtools.stp.build_frame(data), log=False) + + def receive(self, timeout=5, logger=None): + logit = logger or self.logger + max_time = time.time() + timeout + while self.connected() and (timeout is None or time.time() < max_time): + data = self._bt.receive(self, 1, log=False) + if data is not None: + max_time += BT_TIMEOUT + msg = self._stp.process(data) + if msg is not None: + logit.debug('BT: RX(ETP) <- %s', ' '.join(['%02x' % ord(byte) for byte in msg])) + return msg + + +class bt_server(bt_base): + def __init__(self, uuid, port=2, name='BT-RFCOM-Server', logger=None): + self._connection_established = False + self._uuid = uuid + self._name = name + self._server_sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) + self._server_sock.bind(("Python", port)) + self._server_sock.listen(1) + + bluetooth.advertise_service(self._server_sock, + name, + service_classes=[self._uuid, bluetooth.SERIAL_PORT_CLASS], + profiles=[bluetooth.SERIAL_PORT_PROFILE] + ) + + def listen(self, logger=None): + logit = logger or self.logger + logit.info('BT: %s waiting for a connection on RFCOMM "%s", port=%d, uuid=%s', '??:??:??:??:??', self._name, self._server_sock.getsockname()[1], self._uuid) + # TODO: find method to get my bt_id and use it instead of '??:??:??:??:??' + self._client_sock, client_info = self._server_sock.accept() + self._client_sock.settimeout(BT_TIMEOUT) + logit.info("BT: Accepted connection from %s, port=%d", client_info[0], client_info[1]) + self._connection_established = True + + def listen_receive_loop(self, callback=None, logger=None): + while True: + self.listen() + while self.connected(): + data = self.receive() + if callback is not None: + callback(self, data, logger) + + def close(self, logger=None): + if self._client_sock is not None: + self._client_sock.close() + self._client_sock = None + if self._server_sock is not None: + self._server_sock.close() + self._server_sock = None + + def __del__(self): + self.close() + + +class stp_bt_server(stp_bt_base, bt_server): + def __init__(self, *args, **kwargs): + bt_server.__init__(self, *args, **kwargs) + self._stp = stringtools.stp.stp() + self._bt = bt_server + + +class bt_client(bt_base): + def __init__(self, bt_id, uuid, logger=None): + self.logger = logger or logging.getLogger('%s' % (self.__class__.__name__)) + self._connection_established = False + self._bt_id = bt_id + self._uuid = uuid + self._client_sock = None + self._connected = False + + def connect(self, logger=None): + logit = logger or self.logger + service_matches = bluetooth.find_service(uuid=self._uuid, address=self._bt_id) + # TODO: find_named_service and remove service_id from classes + if len(service_matches) == 0: + logit.debug("BT: couldn't find the RFCOM service %s", self._uuid) + else: + first_match = service_matches[0] + port = first_match["port"] + name = first_match["name"] + host = first_match["host"] + + # Create the client socket + logit.debug("connecting to \"%s\" on %s", name, host) + self._client_sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) + try: + self._client_sock.connect((host, port)) + except bluetooth.btcommon.BluetoothError, e: + if e.message == "(111, 'Connection refused')": + logit.warning('BT: Connection refused') + else: + raise e + else: + self._client_sock.settimeout(0.1) + logit.debug("connection established") + self._connection_established = True + return True + return False + + def close(self, logger=None): + if self._client_sock is not None: + self._client_sock.close() + self._client_sock = None + + def __del__(self): + self.close() + + +class stp_bt_client(stp_bt_base, bt_client): + def __init__(self, *args, **kwargs): + bt_client.__init__(self, *args, **kwargs) + self._stp = stringtools.stp.stp() + self._bt = bt_client