From adb15779296e7901fbef2f697b85160b70345bc0 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 22 Jan 2025 16:05:13 -0500 Subject: [PATCH 1/2] Implement network scanning using the standard zigpy interface (#648) * Implement existing scan commands on top of `callback_for_commands` * Implement `network_scan` * `duration` -> `duration_exp` * Use underscored method * Add a unit test --- bellows/ezsp/__init__.py | 48 +++++++++++----- bellows/zigbee/application.py | 41 ++++++++++++++ tests/test_application.py | 101 ++++++++++++++++++++++++++++++++++ 3 files changed, 176 insertions(+), 14 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index c3649462..2f367812 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -223,28 +223,44 @@ async def _list_command( self, name, item_frames, completion_frame, spos, *args, **kwargs ): """Run a command, returning result callbacks as a list""" - fut = asyncio.Future() + queue = asyncio.Queue() results = [] - def cb(frame_name, response): - if frame_name in item_frames: + with self.callback_for_commands( + commands=set(item_frames) | {completion_frame}, + callback=lambda command, response: queue.put_nowait((command, response)), + ): + v = await self._command(name, *args, **kwargs) + if t.sl_Status.from_ember_status(v[0]) != t.sl_Status.OK: + raise Exception(v) + + while True: + command, response = await queue.get() + if command == completion_frame: + if t.sl_Status.from_ember_status(response[spos]) != t.sl_Status.OK: + raise Exception(response) + + break + results.append(response) - elif frame_name == completion_frame: - fut.set_result(response) + + return results + + @contextlib.contextmanager + def callback_for_commands( + self, commands: set[str], callback: Callable + ) -> Generator[None]: + def cb(frame_name, response): + if frame_name in commands: + callback(frame_name, response) cbid = self.add_callback(cb) + try: - v = await self._command(name, *args, **kwargs) - if t.sl_Status.from_ember_status(v[0]) != t.sl_Status.OK: - raise Exception(v) - v = await fut - if t.sl_Status.from_ember_status(v[spos]) != t.sl_Status.OK: - raise Exception(v) + yield finally: self.remove_callback(cbid) - return results - startScan = functools.partialmethod( _list_command, "startScan", @@ -253,7 +269,11 @@ def cb(frame_name, response): 1, ) pollForData = functools.partialmethod( - _list_command, "pollForData", ["pollHandler"], "pollCompleteHandler", 0 + _list_command, + "pollForData", + ["pollHandler"], + "pollCompleteHandler", + 0, ) zllStartScan = functools.partialmethod( _list_command, diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index 13b6386c..068034a6 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -5,6 +5,7 @@ import os import statistics import sys +from typing import AsyncGenerator if sys.version_info[:2] < (3, 11): from async_timeout import timeout as asyncio_timeout # pragma: no cover @@ -711,6 +712,46 @@ async def energy_scan( for channel in list(channels) } + async def _network_scan( + self, channels: t.Channels, duration_exp: int + ) -> AsyncGenerator[zigpy.types.NetworkBeacon]: + """Scans for networks and yields network beacons.""" + queue = asyncio.Queue() + + with self._ezsp.callback_for_commands( + {"networkFoundHandler", "scanCompleteHandler"}, + callback=lambda command, response: queue.put_nowait((command, response)), + ): + # XXX: replace with normal command invocation once overload is removed + (status,) = await self._ezsp._command( + "startScan", + scanType=t.EzspNetworkScanType.ACTIVE_SCAN, + channelMask=channels, + duration=duration_exp, + ) + + if t.sl_Status.from_ember_status(status) != t.sl_Status.OK: + raise ControllerError(f"Failed to start scan: {status!r}") + + while True: + command, response = await queue.get() + + if command == "scanCompleteHandler": + break + + (networkFound, lastHopLqi, lastHopRssi) = response + + yield zigpy.types.NetworkBeacon( + pan_id=networkFound.panId, + extended_pan_id=networkFound.extendedPanId, + channel=networkFound.channel, + nwk_update_id=networkFound.nwkUpdateId, + permit_joining=bool(networkFound.allowingJoin), + stack_profile=networkFound.stackProfile, + lqi=lastHopLqi, + rssi=lastHopRssi, + ) + async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: if not self.is_controller_running: raise ControllerError("ApplicationController is not running") diff --git a/tests/test_application.py b/tests/test_application.py index c3931c3e..873f6921 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -1943,3 +1943,104 @@ async def test_write_network_info( ) ) ] + + +async def test_network_scan(app: ControllerApplication) -> None: + app._ezsp._protocol.startScan.return_value = [t.sl_Status.OK] + + def run_scan() -> None: + app._ezsp._protocol._handle_callback( + "networkFoundHandler", + list( + { + "networkFound": t.EmberZigbeeNetwork( + channel=11, + panId=zigpy_t.PanId(0x1D13), + extendedPanId=t.EUI64.convert("00:07:81:00:00:9a:8f:3b"), + allowingJoin=False, + stackProfile=2, + nwkUpdateId=0, + ), + "lastHopLqi": 152, + "lastHopRssi": -62, + }.values() + ), + ) + app._ezsp._protocol._handle_callback( + "networkFoundHandler", + list( + { + "networkFound": t.EmberZigbeeNetwork( + channel=11, + panId=zigpy_t.PanId(0x2857), + extendedPanId=t.EUI64.convert("00:07:81:00:00:9a:34:1b"), + allowingJoin=False, + stackProfile=2, + nwkUpdateId=0, + ), + "lastHopLqi": 136, + "lastHopRssi": -66, + }.values() + ), + ) + app._ezsp._protocol._handle_callback( + "scanCompleteHandler", + list( + { + "channel": 26, + "status": t.sl_Status.OK, + }.values() + ), + ) + + asyncio.get_running_loop().call_soon(run_scan) + + results = [ + beacon + async for beacon in app.network_scan( + channels=t.Channels.from_channel_list([11, 15, 26]), duration_exp=4 + ) + ] + + assert results == [ + zigpy_t.NetworkBeacon( + pan_id=0x1D13, + extended_pan_id=t.EUI64.convert("00:07:81:00:00:9a:8f:3b"), + channel=11, + permit_joining=False, + stack_profile=2, + nwk_update_id=0, + lqi=152, + src=None, + rssi=-62, + depth=None, + router_capacity=None, + device_capacity=None, + protocol_version=None, + ), + zigpy_t.NetworkBeacon( + pan_id=0x2857, + extended_pan_id=t.EUI64.convert("00:07:81:00:00:9a:34:1b"), + channel=11, + permit_joining=False, + stack_profile=2, + nwk_update_id=0, + lqi=136, + src=None, + rssi=-66, + depth=None, + router_capacity=None, + device_capacity=None, + protocol_version=None, + ), + ] + + +async def test_network_scan_failure(app: ControllerApplication) -> None: + app._ezsp._protocol.startScan.return_value = [t.sl_Status.FAIL] + + with pytest.raises(zigpy.exceptions.ControllerException): + async for beacon in app.network_scan( + channels=t.Channels.from_channel_list([11, 15, 26]), duration_exp=4 + ): + pass From 7d0d18fc4eb3a111e79746c0f3212cf88fde366c Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 22 Jan 2025 19:29:47 -0500 Subject: [PATCH 2/2] Zigpy packet capture interface (#664) * Zigpy packet capture interface * Keep track of the packet capture channel * Compute the timestamp immediately * Bump zigpy * Add a unit test --- bellows/zigbee/application.py | 44 +++++++++++++++++++++ pyproject.toml | 2 +- tests/test_application.py | 72 +++++++++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 1 deletion(-) diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index 068034a6..888ab562 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +from datetime import datetime, timezone import logging import os import statistics @@ -93,6 +94,7 @@ def __init__(self, config: dict): self._watchdog_feed_counter = 0 self._req_lock = asyncio.Lock() + self._packet_capture_channel: int | None = None @property def controller_event(self): @@ -752,6 +754,48 @@ async def _network_scan( rssi=lastHopRssi, ) + def _check_status(self, status: t.sl_Status | t.EmberStatus) -> None: + if t.sl_Status.from_ember_status(status) != t.sl_Status.OK: + raise ControllerError(f"Command failed: {status!r}") + + async def _packet_capture(self, channel: int): + (status,) = await self._ezsp.mfglibStart(rxCallback=True) + self._check_status(status) + + try: + await self._packet_capture_change_channel(channel=channel) + assert self._packet_capture_channel is not None + + queue = asyncio.Queue() + + with self._ezsp.callback_for_commands( + {"mfglibRxHandler"}, + callback=lambda _, response: queue.put_nowait( + (datetime.now(timezone.utc), response) + ), + ): + while True: + timestamp, (linkQuality, rssi, packetContents) = await queue.get() + + # The last two bytes are not a FCS + packetContents = packetContents[:-2] + + yield zigpy.types.CapturedPacket( + timestamp=timestamp, + rssi=rssi, + lqi=linkQuality, + channel=self._packet_capture_channel, + data=packetContents, + ) + finally: + (status,) = await self._ezsp.mfglibEnd() + self._check_status(status) + + async def _packet_capture_change_channel(self, channel: int): + (status,) = await self._ezsp.mfglibSetChannel(channel=channel) + self._check_status(status) + self._packet_capture_channel = channel + async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: if not self.is_controller_running: raise ControllerError("ApplicationController is not running") diff --git a/pyproject.toml b/pyproject.toml index 3a9c9b1a..6256ca23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ dependencies = [ "click-log>=0.2.1", "pure_pcapy3==1.0.1", "voluptuous", - "zigpy>=0.70.0", + "zigpy>=0.75.0", 'async-timeout; python_version<"3.11"', ] diff --git a/tests/test_application.py b/tests/test_application.py index 873f6921..c0baa95d 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -2044,3 +2044,75 @@ async def test_network_scan_failure(app: ControllerApplication) -> None: channels=t.Channels.from_channel_list([11, 15, 26]), duration_exp=4 ): pass + + +async def test_packet_capture(app: ControllerApplication) -> None: + app._ezsp._protocol.mfglibStart.return_value = [t.sl_Status.OK] + app._ezsp._protocol.mfglibSetChannel.return_value = [t.sl_Status.OK] + app._ezsp._protocol.mfglibEnd.return_value = [t.sl_Status.OK] + + async def receive_packets() -> None: + app._ezsp._protocol._handle_callback( + "mfglibRxHandler", + list( + { + "linkQuality": 150, + "rssi": -70, + "packetContents": b"packet 1\xAB\xCD", + }.values() + ), + ) + + await asyncio.sleep(0.5) + + app._ezsp._protocol._handle_callback( + "mfglibRxHandler", + list( + { + "linkQuality": 200, + "rssi": -50, + "packetContents": b"packet 2\xAB\xCD", + }.values() + ), + ) + + task = asyncio.create_task(receive_packets()) + packets = [] + + async for packet in app.packet_capture(channel=15): + packets.append(packet) + + if len(packets) == 1: + await app.packet_capture_change_channel(channel=20) + elif len(packets) == 2: + break + + assert packets == [ + zigpy_t.CapturedPacket( + timestamp=packets[0].timestamp, + rssi=-70, + lqi=150, + channel=15, + data=b"packet 1", + ), + zigpy_t.CapturedPacket( + timestamp=packets[1].timestamp, + rssi=-50, + lqi=200, + channel=20, # The second packet's channel was changed + data=b"packet 2", + ), + ] + + await task + await asyncio.sleep(0.1) + + assert app._ezsp._protocol.mfglibEnd.mock_calls == [call()] + + +async def test_packet_capture_failure(app: ControllerApplication) -> None: + app._ezsp._protocol.mfglibStart.return_value = [t.sl_Status.FAIL] + + with pytest.raises(zigpy.exceptions.ControllerException): + async for packet in app.packet_capture(channel=15): + pass