Skip to content

Commit

Permalink
fix: make sure down adapters are still listed (#71)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Jul 11, 2023
1 parent e382e3c commit 0411edd
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 1 deletion.
12 changes: 12 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import asyncio

from bluetooth_adapters import get_adapters


async def go() -> None:
bluetooth_adapters = get_adapters()
await bluetooth_adapters.refresh()
print(bluetooth_adapters.adapters)


asyncio.run(go())
1 change: 0 additions & 1 deletion src/bluetooth_adapters/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

@dataclass
class AdvertisementHistory:

device: BLEDevice
advertisement_data: AdvertisementData
source: str
Expand Down
19 changes: 19 additions & 0 deletions src/bluetooth_adapters/systems/linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import contextlib
import logging
from typing import Any

import aiohttp
import async_timeout
Expand All @@ -14,6 +15,7 @@
from ..dbus import BlueZDBusObjects
from ..history import AdvertisementHistory
from ..models import AdapterDetails
from .linux_hci import get_adapters_from_hci

_LOGGER = logging.getLogger(__name__)

Expand All @@ -27,6 +29,7 @@ def __init__(self) -> None:
self._adapters: dict[str, AdapterDetails] | None = None
self._devices: dict[str, BluetoothDevice] = {}
self._mac_vendor_lookup: AsyncMacLookup | None = None
self._hci_output: dict[int, dict[str, Any]] | None = None

async def refresh(self) -> None:
"""Refresh the adapters."""
Expand Down Expand Up @@ -57,6 +60,7 @@ def _async_get_vendor(self, mac_address: str) -> str | None:

def _create_bluetooth_devices(self) -> None:
"""Create the bluetooth devices."""
self._hci_output = get_adapters_from_hci()
self._devices = {}
for adapter in self._bluez.adapter_details:
i = int(adapter[3:])
Expand All @@ -81,6 +85,21 @@ def adapters(self) -> dict[str, AdapterDetails]:
"""Get the adapter details."""
if self._adapters is None:
adapters: dict[str, AdapterDetails] = {}
if self._hci_output:
for hci_details in self._hci_output.values():
name = hci_details["name"]
mac_address = hci_details["bdaddr"].upper()
manufacturer = self._async_get_vendor(mac_address)
adapters[name] = AdapterDetails(
address=mac_address,
sw_version="Unknown",
hw_version=None,
passive_scan=False, # assume false if we don't know
manufacturer=manufacturer,
product=None,
vendor_id=None,
product_id=None,
)
adapter_details = self._bluez.adapter_details
for adapter, details in adapter_details.items():
if not (adapter1 := details.get("org.bluez.Adapter1")):
Expand Down
100 changes: 100 additions & 0 deletions src/bluetooth_adapters/systems/linux_hci.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from __future__ import annotations

import ctypes
import fcntl
import logging
import socket
from typing import Any

_LOGGER = logging.getLogger(__name__)

AF_BLUETOOTH = 31
PF_BLUETOOTH = AF_BLUETOOTH
BTPROTO_HCI = 1
HCI_MAX_DEV = 16
HCIGETDEVLIST = 0x800448D2 # _IOR('H', 210, int)
HCIGETDEVINFO = 0x800448D3 # _IOR('H', 211, int)


class hci_dev_req(ctypes.Structure):
_fields_ = [("dev_id", ctypes.c_uint16), ("dev_opt", ctypes.c_uint32)]


class hci_dev_list_req(ctypes.Structure):
_fields_ = [("dev_num", ctypes.c_uint16), ("dev_req", hci_dev_req * HCI_MAX_DEV)]


class bdaddr_t(ctypes.Structure):
_fields_ = [("b", ctypes.c_uint8 * 6)]

def __str__(self) -> str:
return ":".join(["%02X" % x for x in reversed(self.b)])


class hci_dev_stats(ctypes.Structure):
_fields_ = [
("err_rx", ctypes.c_uint32),
("err_tx", ctypes.c_uint32),
("cmd_tx", ctypes.c_uint32),
("evt_rx", ctypes.c_uint32),
("acl_tx", ctypes.c_uint32),
("acl_rx", ctypes.c_uint32),
("sco_tx", ctypes.c_uint32),
("sco_rx", ctypes.c_uint32),
("byte_rx", ctypes.c_uint32),
("byte_tx", ctypes.c_uint32),
]


class hci_dev_info(ctypes.Structure):
_fields_ = [
("dev_id", ctypes.c_uint16),
("name", ctypes.c_char * 8),
("bdaddr", bdaddr_t),
("flags", ctypes.c_uint32),
("type", ctypes.c_uint8),
("features", ctypes.c_uint8 * 8),
("pkt_type", ctypes.c_uint32),
("link_policy", ctypes.c_uint32),
("link_mode", ctypes.c_uint32),
("acl_mtu", ctypes.c_uint16),
("acl_pkts", ctypes.c_uint16),
("sco_mtu", ctypes.c_uint16),
("sco_pkts", ctypes.c_uint16),
("stat", hci_dev_stats),
]


hci_dev_info_p = ctypes.POINTER(hci_dev_info)


def get_adapters_from_hci() -> dict[int, dict[str, Any]]:
"""Get bluetooth adapters from HCI."""
out: dict[int, dict[str, Any]] = {}
sock: socket.socket | None = None
try:
sock = socket.socket(AF_BLUETOOTH, socket.SOCK_RAW, BTPROTO_HCI)
buf = hci_dev_list_req()
buf.dev_num = HCI_MAX_DEV
ret = fcntl.ioctl(sock.fileno(), HCIGETDEVLIST, buf)
if ret < 0:
raise OSError(f"HCIGETDEVLIST failed: {ret}")
for i in range(buf.dev_num):
dev_req = buf.dev_req[i]
dev = hci_dev_info()
dev.dev_id = dev_req.dev_id
ret = fcntl.ioctl(sock.fileno(), HCIGETDEVINFO, dev)
info = {str(k): getattr(dev, k) for k, v_ in dev._fields_}
info["bdaddr"] = str(info["bdaddr"])
info["name"] = info["name"].decode()
out[int(dev.dev_id)] = info
except OSError as error:
_LOGGER.debug("Error while getting HCI devices: %s", error)
return out
except Exception as error: # pylint: disable=broad-except
_LOGGER.exception("Unexpected error while getting HCI devices: %s", error)
return out
finally:
if sock:
sock.close()
return out

0 comments on commit 0411edd

Please sign in to comment.