123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
-
-
-
- """
- bluetooth_socket (Bluetooth Socket)
- ===================================
-
- **Author:**
-
- * Dirk Alders <sudo-dirk@mount-mockery.de>
-
- **Description:**
-
- This Module support bluetooth communication
-
- **Submodules:**
-
- * :mod:`mmod.module.sub1`
- * :class:`mmod.module.sub2`
- * :func:`mmod.module.sub2`
-
- **Unittest:**
-
- See also the :download:`unittest <../../bluetooth_socket/_testresults_/unittest.pdf>` documentation.
- """
- import stringtools
-
- import bluetooth
- import logging
- import time
-
- BT_TIMEOUT = 0.1
-
-
- def nearby_devices():
- return bluetooth.discover_devices()
-
-
- def services(bt_id):
- return bluetooth.find_service(address=bt_id)
-
-
- class bt_base(object):
- def send(self, data, logger=None, log=True):
- logit = logger or self.logger
- if self.connected():
- if log:
- logit.debug('BT: TX -> %s', ' '.join(['%02x' % ord(byte) for byte in data]))
- try:
- self._client_sock.send(data)
- except bluetooth.btcommon.BluetoothError, e:
- if e.message == 'timed out':
- logit.debug('BT: On send - timeout')
- return False
- else:
- logit.warning('BT: On send - %s', str(e))
- self._connection_established = False
- return False
- else:
- return False
- return True
-
- def receive(self, length=4096, logger=None, log=True):
- logit = logger or self.logger
- if self.connected():
- try:
- data = self._client_sock.recv(length)
- except bluetooth.btcommon.BluetoothError, e:
- if e.message == 'timed out':
-
- return None
- else:
- logit.warning('BT: On receive - %s', str(e))
- self._connection_established = False
- return None
- else:
- if log:
- logit.debug('BT: RX <- %s', ' '.join(['%02x' % ord(byte) for byte in data]))
- return data
-
- def connected(self):
- return self._connection_established
-
-
- class stp_bt_base(object):
- def send(self, data, logger=None):
- logit = logger or self.logger
-
- logit.debug('BT: TX(ETP) -> %s', ' '.join(['%02x' % ord(byte) for byte in data]))
- self._bt.send(self, stringtools.stp.build_frame(data), log=False)
-
- def receive(self, timeout=5, logger=None):
- logit = logger or self.logger
- max_time = time.time() + timeout
- while self.connected() and (timeout is None or time.time() < max_time):
- data = self._bt.receive(self, 1, log=False)
- if data is not None:
- max_time += BT_TIMEOUT
- msg = self._stp.process(data)
- if msg is not None:
- logit.debug('BT: RX(ETP) <- %s', ' '.join(['%02x' % ord(byte) for byte in msg]))
- return msg
-
-
- class bt_server(bt_base):
- def __init__(self, uuid, port=2, name='BT-RFCOM-Server', logger=None):
- self._connection_established = False
- self._uuid = uuid
- self._name = name
- self._server_sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
- self._server_sock.bind(("Python", port))
- self._server_sock.listen(1)
-
- bluetooth.advertise_service(self._server_sock,
- name,
- service_classes=[self._uuid, bluetooth.SERIAL_PORT_CLASS],
- profiles=[bluetooth.SERIAL_PORT_PROFILE]
- )
-
- def listen(self, logger=None):
- logit = logger or self.logger
- logit.info('BT: %s waiting for a connection on RFCOMM "%s", port=%d, uuid=%s', '??:??:??:??:??', self._name, self._server_sock.getsockname()[1], self._uuid)
-
- self._client_sock, client_info = self._server_sock.accept()
- self._client_sock.settimeout(BT_TIMEOUT)
- logit.info("BT: Accepted connection from %s, port=%d", client_info[0], client_info[1])
- self._connection_established = True
-
- def listen_receive_loop(self, callback=None, logger=None):
- while True:
- self.listen()
- while self.connected():
- data = self.receive()
- if callback is not None:
- callback(self, data, logger)
-
- def close(self, logger=None):
- if self._client_sock is not None:
- self._client_sock.close()
- self._client_sock = None
- if self._server_sock is not None:
- self._server_sock.close()
- self._server_sock = None
-
- def __del__(self):
- self.close()
-
-
- class stp_bt_server(stp_bt_base, bt_server):
- def __init__(self, *args, **kwargs):
- bt_server.__init__(self, *args, **kwargs)
- self._stp = stringtools.stp.stp()
- self._bt = bt_server
-
-
- class bt_client(bt_base):
- def __init__(self, bt_id, uuid, logger=None):
- self.logger = logger or logging.getLogger('%s' % (self.__class__.__name__))
- self._connection_established = False
- self._bt_id = bt_id
- self._uuid = uuid
- self._client_sock = None
- self._connected = False
-
- def connect(self, logger=None):
- logit = logger or self.logger
- service_matches = bluetooth.find_service(uuid=self._uuid, address=self._bt_id)
-
- if len(service_matches) == 0:
- logit.debug("BT: couldn't find the RFCOM service %s", self._uuid)
- else:
- first_match = service_matches[0]
- port = first_match["port"]
- name = first_match["name"]
- host = first_match["host"]
-
-
- logit.debug("connecting to \"%s\" on %s", name, host)
- self._client_sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
- try:
- self._client_sock.connect((host, port))
- except bluetooth.btcommon.BluetoothError, e:
- if e.message == "(111, 'Connection refused')":
- logit.warning('BT: Connection refused')
- else:
- raise e
- else:
- self._client_sock.settimeout(0.1)
- logit.debug("connection established")
- self._connection_established = True
- return True
- return False
-
- def close(self, logger=None):
- if self._client_sock is not None:
- self._client_sock.close()
- self._client_sock = None
-
- def __del__(self):
- self.close()
-
-
- class stp_bt_client(stp_bt_base, bt_client):
- def __init__(self, *args, **kwargs):
- bt_client.__init__(self, *args, **kwargs)
- self._stp = stringtools.stp.stp()
- self._bt = bt_client
|