Skip to content

Commit

Permalink
adds param validation to adv/scan/conn intervals, adds type annotatio…
Browse files Browse the repository at this point in the history
…ns, fixes couple potential issues
  • Loading branch information
ThomasGerstenberg committed Apr 30, 2020
1 parent 3217fa0 commit df75487
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 51 deletions.
2 changes: 1 addition & 1 deletion blatann/examples/centeral_uart_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def main(serial_port):

# Set scan duration for 4 seconds
ble_device.scanner.set_default_scan_params(timeout_seconds=4)
ble_device.set_default_peripheral_connection_params(10, 30, 4000)
ble_device.set_default_peripheral_connection_params(7.5, 15, 4000)
logger.info("Scanning for peripherals advertising UUID {}".format(nordic_uart.NORDIC_UART_SERVICE_UUID))

target_address = None
Expand Down
8 changes: 5 additions & 3 deletions blatann/examples/peripheral_uart_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def on_connect(peer, event_args):
:param event_args: None
"""
if peer:
logger.info("Connected to peer")
logger.info("Connected to peer, initiating MTU exchange")
peer.exchange_mtu()
else:
logger.warning("Connection timed out")
Expand All @@ -52,6 +52,8 @@ def on_mtu_size_update(peer, event_args):
:type event_args: blatann.event_args.MtuSizeUpdatedEventArgs
"""
logger.info("MTU size updated from {} to {}".format(event_args.previous_mtu_size, event_args.current_mtu_size))
# Request that the connection parameters be re-negotiated using our preferred parameters
peer.update_connection_parameters()


def on_data_rx(service, data):
Expand Down Expand Up @@ -92,7 +94,7 @@ def main(serial_port):

# Configure the client to prefer the max MTU size
ble_device.client.preferred_mtu_size = ble_device.max_mtu_size
ble_device.client.set_connection_parameters(10, 30, 4000)
ble_device.client.set_connection_parameters(7.5, 15, 4000)

# Advertise the service UUID
adv_data = advertising.AdvertisingData(flags=0x06, local_name="Nordic UART Server")
Expand All @@ -115,4 +117,4 @@ def main(serial_port):


if __name__ == '__main__':
main("COM8")
main("COM7")
36 changes: 32 additions & 4 deletions blatann/gap/advertising.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

AdvertisingMode = nrf_types.BLEGapAdvType

MIN_ADVERTISING_INTERVAL_MS = nrf_types.adv_interval_range.min
MAX_ADVERTISING_INTERVAL_MS = nrf_types.adv_interval_range.max


class Advertiser(object):
# Constant used to indicate that the BLE device should advertise indefinitely, until
Expand Down Expand Up @@ -48,8 +51,28 @@ def on_advertising_timeout(self):

@property
def is_advertising(self):
"""
Current state of advertising
:return:
"""
return self._is_advertising

@property
def min_interval_ms(self) -> float:
"""
The minimum allowed advertising interval, in millseconds.
This is defined by the Bluetooth specification.
"""
return MIN_ADVERTISING_INTERVAL_MS

@property
def max_interval_ms(self) -> float:
"""
The maximum allowed advertising interval, in milliseconds.
This is defined by the Bluetooth specification.
"""
return MAX_ADVERTISING_INTERVAL_MS

def set_advertise_data(self, advertise_data=AdvertisingData(), scan_response=AdvertisingData()):
"""
Sets the advertising and scan response data which will be broadcasted to peers during advertising
Expand Down Expand Up @@ -79,11 +102,13 @@ def set_default_advertise_params(self, advertise_interval_ms, timeout_seconds, a
"""
Sets the default advertising parameters so they do not need to be specified on each start
:param advertise_interval_ms: The advertising interval, in milliseconds
:param timeout_seconds: How long to advertise for before timing out, in seconds
:param advertise_interval_ms: The advertising interval, in milliseconds.
Should be a multiple of 0.625ms, otherwise it'll be rounded down to the nearest 0.625ms
:param timeout_seconds: How long to advertise for before timing out, in seconds. For no timeout, use ADVERTISE_FOREVER (0)
:param advertise_mode: The mode the advertiser should use
:type advertise_mode: AdvertisingMode
"""
nrf_types.adv_interval_range.validate(advertise_interval_ms)
self._advertise_interval = advertise_interval_ms
self._timeout = timeout_seconds
self._advertise_mode = advertise_mode
Expand All @@ -92,8 +117,9 @@ def start(self, adv_interval_ms=None, timeout_sec=None, auto_restart=None, adver
"""
Starts advertising with the given parameters. If none given, will use the default
:param adv_interval_ms: The interval at which to send out advertise packets, in milliseconds
:param timeout_sec: The duration which to advertise for
:param adv_interval_ms: The interval at which to send out advertise packets, in milliseconds.
Should be a multiple of 0.625ms, otherwise it'll be rounde down to the nearest 0.625ms
:param timeout_sec: The duration which to advertise for. For no timeout, use ADVERTISE_FOREVER (0)
:param auto_restart: Flag indicating that advertising should restart automatically when the timeout expires, or
when the client disconnects
:param advertise_mode: The mode the advertiser should use
Expand All @@ -105,6 +131,8 @@ def start(self, adv_interval_ms=None, timeout_sec=None, auto_restart=None, adver
self._stop()
if adv_interval_ms is None:
adv_interval_ms = self._advertise_interval
else:
nrf_types.adv_interval_range.validate(adv_interval_ms)
if timeout_sec is None:
timeout_sec = self._timeout
if advertise_mode is None:
Expand Down
48 changes: 39 additions & 9 deletions blatann/gap/scanning.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,34 @@
logger = logging.getLogger(__name__)


MIN_SCAN_INTERVAL_MS = nrf_types.scan_interval_range.min
MAX_SCAN_INTERVAL_MS = nrf_types.scan_interval_range.max
MIN_SCAN_WINDOW_MS = nrf_types.scan_window_range.min
MAX_SCAN_WINDOW_MS = nrf_types.scan_window_range.max
MIN_SCAN_TIMEOUT_S = nrf_types.scan_timeout_range.min
MAX_SCAN_TIMEOUT_S = nrf_types.scan_timeout_range.max


class ScanParameters(nrf_types.BLEGapScanParams):
pass
def validate(self):
self._validate(self.window_ms, self.interval_ms, self.timeout_s)

def update(self, window_ms, interval_ms, timeout_s, active):
self._validate(window_ms, interval_ms, timeout_s)
self.window_ms = window_ms
self.interval_ms = interval_ms
self.timeout_s = timeout_s,
self.active = active

def _validate(self, window_ms, interval_ms, timeout_s):
# Check against absolute limits
nrf_types.scan_window_range.validate(window_ms)
nrf_types.scan_interval_range.validate(interval_ms)
if timeout_s:
nrf_types.scan_timeout_range.validate(timeout_s)
# Verify that the window is not larger than the interval
if window_ms > interval_ms:
raise ValueError(f"Window cannot be greater than the interval (window: {window_ms}, interval: {interval_ms}")


class Scanner(object):
Expand All @@ -19,7 +45,7 @@ def __init__(self, ble_device):
"""
self.ble_device = ble_device
self._default_scan_params = ScanParameters(200, 150, 10)
self.scanning = False
self._is_scanning = False
ble_device.ble_driver.event_subscribe(self._on_adv_report, nrf_events.GapEvtAdvReport)
ble_device.ble_driver.event_subscribe(self._on_timeout_event, nrf_events.GapEvtTimeout)
self.scan_report = ScanReportCollection()
Expand All @@ -42,6 +68,10 @@ def on_scan_timeout(self) -> Event[Scanner, ScanReportCollection]:
"""
return self._on_scan_timeout

@property
def is_scanning(self) -> bool:
return self._is_scanning

def set_default_scan_params(self, interval_ms=200, window_ms=150, timeout_seconds=10, active_scanning=True):
"""
Sets the default scan parameters so they do not have to be specified each time a scan is started.
Expand All @@ -53,10 +83,7 @@ def set_default_scan_params(self, interval_ms=200, window_ms=150, timeout_second
:param timeout_seconds: How long to advertise for, in seconds
:param active_scanning: Whether or not to fetch scan response packets from advertisers
"""
self._default_scan_params.interval_ms = interval_ms
self._default_scan_params.window_ms = window_ms
self._default_scan_params.timeout_s = timeout_seconds
self._default_scan_params.active = active_scanning
self._default_scan_params.update(window_ms, interval_ms, timeout_seconds, active_scanning)

def start_scan(self, scan_parameters=None, clear_scan_reports=True):
"""
Expand All @@ -74,15 +101,18 @@ def start_scan(self, scan_parameters=None, clear_scan_reports=True):
self.scan_report = ScanReportCollection()
if not scan_parameters:
scan_parameters = self._default_scan_params
else:
# Make sure the scan parameters are valid
scan_parameters.validate()
self.ble_device.ble_driver.ble_gap_scan_start(scan_parameters)
self.scanning = True
self._is_scanning = True
return scan_waitable.ScanFinishedWaitable(self.ble_device)

def stop(self):
"""
Stops an active scan
"""
self.scanning = False
self._is_scanning = False

try:
self.ble_device.ble_driver.ble_gap_scan_stop()
Expand All @@ -98,5 +128,5 @@ def _on_timeout_event(self, driver, event):
:type event: nrf_events.GapEvtTimeout
"""
if event.src == nrf_events.BLEGapTimeoutSrc.scan:
self.scanning = False
self._is_scanning = False
self._on_scan_timeout.notify(self.ble_device, self.scan_report)
2 changes: 1 addition & 1 deletion blatann/gatt/service_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def __init__(self, ble_device, peer):
@property
def on_discovery_complete(self):
"""
:rtype: Event
:rtype: Event[blatann.peer.Peripheral, DatabaseDiscoveryCompleteEventArgs]
"""
return self._on_discovery_complete

Expand Down
48 changes: 48 additions & 0 deletions blatann/nrf/nrf_types/gap.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,54 @@
logger = logging.getLogger(__name__)


class TimeRange(object):

def __init__(self, name, val_min, val_max, unit_ms_conversion, divisor=1.0, units="ms"):
self._name = name
self._units = units
self._min = util.units_to_msec(val_min, unit_ms_conversion) / divisor
self._max = util.units_to_msec(val_max, unit_ms_conversion) / divisor

@property
def name(self) -> str:
return self._name

@property
def min(self) -> float:
return self._min

@property
def max(self) -> float:
return self._max

@property
def units(self) -> str:
return self._units

def is_in_range(self, value):
return self._min <= value <= self._max

def validate(self, value):
if value < self._min:
raise ValueError(f"Minimum {self.name} is {self._min}{self.units} (Got {value})")
if value > self._max:
raise ValueError(f"Maximum {self.name} is {self._max}{self.units} (Got {value})")


adv_interval_range = TimeRange("Advertising Interval",
driver.BLE_GAP_ADV_INTERVAL_MIN, driver.BLE_GAP_ADV_INTERVAL_MAX, util.UNIT_0_625_MS)
scan_window_range = TimeRange("Scan Window",
driver.BLE_GAP_SCAN_WINDOW_MIN, driver.BLE_GAP_SCAN_WINDOW_MAX, util.UNIT_0_625_MS)
scan_interval_range = TimeRange("Scan Interval",
driver.BLE_GAP_SCAN_INTERVAL_MIN, driver.BLE_GAP_SCAN_INTERVAL_MAX, util.UNIT_0_625_MS)
scan_timeout_range = TimeRange("Scan Timeout",
driver.BLE_GAP_SCAN_TIMEOUT_MIN, driver.BLE_GAP_SCAN_TIMEOUT_MAX, util.UNIT_10_MS, 1000.0, "s")
conn_interval_range = TimeRange("Connection Interval",
driver.BLE_GAP_CP_MIN_CONN_INTVL_MIN, driver.BLE_GAP_CP_MAX_CONN_INTVL_MAX, util.UNIT_1_25_MS)
conn_timeout_range = TimeRange("Connection Timeout",
driver.BLE_GAP_CP_CONN_SUP_TIMEOUT_MIN, driver.BLE_GAP_CP_CONN_SUP_TIMEOUT_MAX, util.UNIT_10_MS)


class BLEGapAdvParams(object):
def __init__(self, interval_ms, timeout_s, advertising_type=BLEGapAdvType.connectable_undirected):
self.interval_ms = interval_ms
Expand Down
Loading

0 comments on commit df75487

Please sign in to comment.