Skip to content

Commit

Permalink
Added support for Google´s Bumble Bluetooth Controller stack
Browse files Browse the repository at this point in the history
The backend supports direct use with Bumble. The HCI Controller
is managed by the Bumble stack and the transport layer can
be defined by the user (e.g. VHCI, Serial, TCP, android-netsim).
  • Loading branch information
vChavezB committed Dec 29, 2024
1 parent c98883b commit d094cf2
Show file tree
Hide file tree
Showing 20 changed files with 1,745 additions and 445 deletions.
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Contributors
* David Johansen <[email protected]>
* JP Hutchins <[email protected]>
* Bram Duvigneau <[email protected]>
* Victor Chavez <[email protected]>

Sponsors
--------
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to `Semantic Versioning <https://semver.org/spec/v2.0.0

`Unreleased`_
=============
Added
-----
* Added support for Google's Bumble Bluetooth stack.


`0.22.3`_ (2024-10-05)
======================
Expand Down
133 changes: 133 additions & 0 deletions bleak/backends/bumble/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2024 Victor Chavez
"""Bumble backend."""
import os
from enum import Enum
from typing import Dict, Final, Optional

from bumble.controller import Controller
from bumble.link import LocalLink
from bumble.transport import Transport, open_transport

transports: Dict[str, Transport] = {}
_link: Final = LocalLink()
_scheme_delimiter: Final = ":"

_env_transport_cfg: Final = os.getenv("BLEAK_BUMBLE")
_env_host_mode: Final = os.getenv("BLEAK_BUMBLE_HOST")


class TransportScheme(Enum):
"""The transport schemes supported by bumble.
https://google.github.io/bumble/transports
"""

SERIAL = "serial"
""": The serial transport implements sending/receiving HCI
packets over a UART (a.k.a serial port).
"""
UDP = "udp"
""": The UDP transport is a UDP socket, receiving packets on a specified port number,
and sending packets to a specified host and port number.
"""
TCP_CLIENT = "tcp-client"
""": The TCP Client transport uses an outgoing TCP connection.
"""
TCP_SERVER = "tcp-server"
""": The TCP Server transport uses an incoming TCP connection.
"""
WS_CLIENT = "ws-client"
""": The WebSocket Client transport is WebSocket connection
to a WebSocket server over which HCI packets are sent and received.
"""
WS_SERVER = "ws-server"
""": The WebSocket Server transport is WebSocket server that accepts
connections from a WebSocket client. HCI packets are sent and received over the connection.
"""
PTY = "pty"
""": The PTY transport uses a Unix pseudo-terminal device to communicate
with another process on the host, as if it were over a serial port.
"""
FILE = "file"
""": The File transport allows opening any named entry on a filesystem
and use it for HCI transport I/O. This is typically used to open a PTY,
or unix driver, not for real files.
"""
VHCI = "vhci"
""": The VHCI transport allows attaching a virtual controller
to the Bluetooth stack on operating systems that offer a
VHCI driver (Linux, if enabled, maybe others).
"""
HCI_SOCKET = "hci-socket"
""": An HCI Socket can send/receive HCI packets to/from a
Bluetooth HCI controller managed by the host OS.
This is only supported on some platforms (currently only tested on Linux).
"""
USB = "usb"
""": The USB transport interfaces with a local Bluetooth USB dongle.
"""
ANDROID_NETSIM = "android-netsim"
""": The Android "netsim" transport either connects, as a host, to a
Netsim virtual controller ("host" mode), or acts as a virtual
controller itself ("controller" mode) accepting host connections.
"""

@classmethod
def from_string(cls, value: str) -> "TransportScheme":
try:
return cls(value)
except ValueError:
raise ValueError(f"'{value}' is not a valid TransportScheme")


class BumbleTransportCfg:
"""Transport configuration for bumble.
Args:
scheme (TransportScheme): The transport scheme supported by bumble.
args (Optional[str]): The arguments used to initialize the transport.
See https://google.github.io/bumble/transports/index.html
"""

def __init__(self, scheme: TransportScheme, args: Optional[str] = None):
self.scheme: Final = scheme
self.args: Final = args

def __str__(self):
return f"{self.scheme.value}:{self.args}" if self.args else self.scheme.value


def get_default_transport_cfg() -> BumbleTransportCfg:
if _env_transport_cfg:
scheme_val, *args = _env_transport_cfg.split(_scheme_delimiter, 1)
return BumbleTransportCfg(
TransportScheme.from_string(scheme_val), args[0] if args else None
)

return BumbleTransportCfg(TransportScheme.TCP_SERVER, "127.0.0.1:1234")


def get_default_host_mode() -> bool:
return True if _env_host_mode else False


async def start_transport(
cfg: BumbleTransportCfg, host_mode: bool = get_default_host_mode()
) -> Transport:
transport_cmd = str(cfg)
if transport_cmd not in transports.keys():
transports[transport_cmd] = await open_transport(transport_cmd)
if not host_mode:
Controller(
"ext",
host_source=transports[transport_cmd].source,
host_sink=transports[transport_cmd].sink,
link=_link,
)
return transports[transport_cmd]


def get_link():
# Assume all transports are linked
return _link
82 changes: 82 additions & 0 deletions bleak/backends/bumble/characteristic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2024 Victor Chavez

from typing import Callable, Final, List, Union
from uuid import UUID

from bumble.gatt import Characteristic
from bumble.gatt_client import CharacteristicProxy, ServiceProxy

from bleak import normalize_uuid_str
from bleak.backends.bumble.utils import bumble_uuid_to_str
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.descriptor import BleakGATTDescriptor


class BleakGATTCharacteristicBumble(BleakGATTCharacteristic):
"""GATT Characteristic implementation for the Bumble backend."""

def __init__(
self,
obj: CharacteristicProxy,
max_write_without_response_size: Callable[[], int],
svc: ServiceProxy,
):
super().__init__(obj, max_write_without_response_size)
self.__descriptors: List[BleakGATTDescriptor] = []
props = [flag for flag in Characteristic.Properties if flag in obj.properties]
self.__props: Final = [str(prop) for prop in props]
self.__svc: Final = svc
uuid = bumble_uuid_to_str(obj.uuid)
self.__uuid: Final = normalize_uuid_str(uuid)

@property
def service_uuid(self) -> str:
"""The uuid of the Service containing this characteristic"""
return bumble_uuid_to_str(self.__svc.uuid)

@property
def service_handle(self) -> int:
"""The integer handle of the Service containing this characteristic"""
return self.__svc.handle

@property
def handle(self) -> int:
"""The handle of this characteristic"""
return int(self.obj.handle)

@property
def uuid(self) -> str:
"""The uuid of this characteristic"""
return self.__uuid

@property
def properties(self) -> List[str]:
"""Properties of this characteristic"""
return self.__props

@property
def descriptors(self) -> List[BleakGATTDescriptor]:
"""List of descriptors for this characteristic"""
return self.__descriptors

def get_descriptor(
self, specifier: Union[int, str, UUID]
) -> Union[BleakGATTDescriptor, None]:
"""Get a descriptor by handle (int) or UUID (str or uuid.UUID)"""
try:
if isinstance(specifier, int):
return next(filter(lambda x: x.handle == specifier, self.descriptors))
else:
return next(
filter(lambda x: x.uuid == str(specifier), self.descriptors)
)
except StopIteration:
return None

def add_descriptor(self, descriptor: BleakGATTDescriptor):
"""Add a :py:class:`~BleakGATTDescriptor` to the characteristic.
Should not be used by end user, but rather by `bleak` itself.
"""
self.__descriptors.append(descriptor)
Loading

0 comments on commit d094cf2

Please sign in to comment.