Skip to content

Commit

Permalink
Zigpy packet capture interface (#664)
Browse files Browse the repository at this point in the history
* Zigpy packet capture interface

* Keep track of the packet capture channel

* Compute the timestamp immediately

* Bump zigpy

* Add a unit test
  • Loading branch information
puddly authored Jan 23, 2025
1 parent adb1577 commit 7d0d18f
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 1 deletion.
44 changes: 44 additions & 0 deletions bellows/zigbee/application.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
from datetime import datetime, timezone
import logging
import os
import statistics
Expand Down Expand Up @@ -93,6 +94,7 @@ def __init__(self, config: dict):
self._watchdog_feed_counter = 0

self._req_lock = asyncio.Lock()
self._packet_capture_channel: int | None = None

@property
def controller_event(self):
Expand Down Expand Up @@ -752,6 +754,48 @@ async def _network_scan(
rssi=lastHopRssi,
)

def _check_status(self, status: t.sl_Status | t.EmberStatus) -> None:
if t.sl_Status.from_ember_status(status) != t.sl_Status.OK:
raise ControllerError(f"Command failed: {status!r}")

async def _packet_capture(self, channel: int):
(status,) = await self._ezsp.mfglibStart(rxCallback=True)
self._check_status(status)

try:
await self._packet_capture_change_channel(channel=channel)
assert self._packet_capture_channel is not None

queue = asyncio.Queue()

with self._ezsp.callback_for_commands(
{"mfglibRxHandler"},
callback=lambda _, response: queue.put_nowait(
(datetime.now(timezone.utc), response)
),
):
while True:
timestamp, (linkQuality, rssi, packetContents) = await queue.get()

# The last two bytes are not a FCS
packetContents = packetContents[:-2]

yield zigpy.types.CapturedPacket(
timestamp=timestamp,
rssi=rssi,
lqi=linkQuality,
channel=self._packet_capture_channel,
data=packetContents,
)
finally:
(status,) = await self._ezsp.mfglibEnd()
self._check_status(status)

async def _packet_capture_change_channel(self, channel: int):
(status,) = await self._ezsp.mfglibSetChannel(channel=channel)
self._check_status(status)
self._packet_capture_channel = channel

async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
if not self.is_controller_running:
raise ControllerError("ApplicationController is not running")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies = [
"click-log>=0.2.1",
"pure_pcapy3==1.0.1",
"voluptuous",
"zigpy>=0.70.0",
"zigpy>=0.75.0",
'async-timeout; python_version<"3.11"',
]

Expand Down
72 changes: 72 additions & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -2044,3 +2044,75 @@ async def test_network_scan_failure(app: ControllerApplication) -> None:
channels=t.Channels.from_channel_list([11, 15, 26]), duration_exp=4
):
pass


async def test_packet_capture(app: ControllerApplication) -> None:
app._ezsp._protocol.mfglibStart.return_value = [t.sl_Status.OK]
app._ezsp._protocol.mfglibSetChannel.return_value = [t.sl_Status.OK]
app._ezsp._protocol.mfglibEnd.return_value = [t.sl_Status.OK]

async def receive_packets() -> None:
app._ezsp._protocol._handle_callback(
"mfglibRxHandler",
list(
{
"linkQuality": 150,
"rssi": -70,
"packetContents": b"packet 1\xAB\xCD",
}.values()
),
)

await asyncio.sleep(0.5)

app._ezsp._protocol._handle_callback(
"mfglibRxHandler",
list(
{
"linkQuality": 200,
"rssi": -50,
"packetContents": b"packet 2\xAB\xCD",
}.values()
),
)

task = asyncio.create_task(receive_packets())
packets = []

async for packet in app.packet_capture(channel=15):
packets.append(packet)

if len(packets) == 1:
await app.packet_capture_change_channel(channel=20)
elif len(packets) == 2:
break

assert packets == [
zigpy_t.CapturedPacket(
timestamp=packets[0].timestamp,
rssi=-70,
lqi=150,
channel=15,
data=b"packet 1",
),
zigpy_t.CapturedPacket(
timestamp=packets[1].timestamp,
rssi=-50,
lqi=200,
channel=20, # The second packet's channel was changed
data=b"packet 2",
),
]

await task
await asyncio.sleep(0.1)

assert app._ezsp._protocol.mfglibEnd.mock_calls == [call()]


async def test_packet_capture_failure(app: ControllerApplication) -> None:
app._ezsp._protocol.mfglibStart.return_value = [t.sl_Status.FAIL]

with pytest.raises(zigpy.exceptions.ControllerException):
async for packet in app.packet_capture(channel=15):
pass

0 comments on commit 7d0d18f

Please sign in to comment.