Skip to content

Commit

Permalink
tests: Use CancellableReader for PW RPC clients (#29567)
Browse files Browse the repository at this point in the history
The CancellableReader provides a clean teardown of an RpcClient
reader thread at exit. Switching from using readers and RpcClients
directly to context statements makes sure the teardown is done.
This change is adapts to a recent change in Pigweed's RPC Python library:
https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/172051
  • Loading branch information
ChinchillaWithGoggles authored Oct 5, 2023
1 parent f03e9ef commit 3f34c52
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 124 deletions.
12 changes: 6 additions & 6 deletions src/test_driver/efr32/py/nl_test_runner/nl_test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing import Any

import serial # type: ignore
from pw_hdlc.rpc import HdlcRpcClient, default_channels, write_to_file
from pw_hdlc import rpc

# RPC Protos
from nl_test_service import nl_test_pb2 # isort:skip
Expand Down Expand Up @@ -81,10 +81,10 @@ def flash_device(device: str, flash_image: str, **kwargs):
def get_hdlc_rpc_client(device: str, baudrate: int, output: Any, **kwargs):
"""Get the HdlcRpcClient based on arguments."""
serial_device = serial.Serial(device, baudrate, timeout=1)
def read(): return serial_device.read(8192)
reader = rpc.SerialReader(serial_device, 8192)
write = serial_device.write
return HdlcRpcClient(read, PROTOS, default_channels(write),
lambda data: write_to_file(data, output))
return rpc.HdlcRpcClient(reader, PROTOS, rpc.default_channels(write),
lambda data: rpc.write_to_file(data, output))


def runner(client) -> int:
Expand Down Expand Up @@ -133,8 +133,8 @@ def main() -> int:
if args.flash_image:
flash_device(**vars(args))
time.sleep(1) # Give time for device to boot
client = get_hdlc_rpc_client(**vars(args))
return runner(client)
with get_hdlc_rpc_client(**vars(args)) as client:
return runner(client)


if __name__ == '__main__':
Expand Down
18 changes: 15 additions & 3 deletions src/test_driver/mbed/integration_tests/common/pigweed_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pw_hdlc.rpc import HdlcRpcClient, default_channels
from pw_hdlc import rpc


class PigweedClient:
Expand All @@ -28,8 +28,11 @@ def __init__(self, device, protos):
self.device.stop()
self.last_timeout = self.device.serial.get_timeout()
self.device.serial.set_timeout(0.01)
self._pw_rpc_client = HdlcRpcClient(lambda: self.device.serial.read(4096),
protos, default_channels(self.device.serial.write))
reader = rpc.Serialreader(self.device.serial, 4096)
self._pw_rpc_client = rpc.HdlcRpcClient(
reader,
protos,
rpc.default_channels(self.device.serial.write))
self._rpcs = self._pw_rpc_client.rpcs()

def __del__(self):
Expand All @@ -39,3 +42,12 @@ def __del__(self):
@property
def rpcs(self):
return self._rpcs

def __enter__(self) -> 'PigweedClient':
return self

def __exit__(self, *exc_info) -> None:
self.stop()

def stop(self) -> None:
self._pw_rpc_client.stop()
112 changes: 57 additions & 55 deletions src/test_driver/mbed/integration_tests/lighting-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,77 +146,79 @@ def test_light_ctrl(device, network):


def test_device_info_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None

device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0
device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0

assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code
assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code


def test_device_factory_reset_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True


def test_device_reboot_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED


def test_device_ota_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED


def test_ligth_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check light on
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is True
# Check light on
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is True

# Check light off
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is False
# Check light off
status, payload = pw_client.rpcs.chip.rpc.Lighting.Set(on=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on is False


def test_button_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)

# Check button 0 (lighting)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
initial_state = bool(payload.on)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check button 0 (lighting)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
initial_state = bool(payload.on)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Lighting.Get()
assert status.ok() is True
assert payload.on == compare_state
112 changes: 57 additions & 55 deletions src/test_driver/mbed/integration_tests/lock-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,77 +136,79 @@ def test_lock_ctrl(device, network):


def test_device_info_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.GetDeviceInfo()
assert status.ok() is True
assert payload.vendor_id is not None and payload.product_id is not None and payload.serial_number is not None

device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0
device_details = get_device_details(device)
assert device_details is not None and len(device_details) != 0

assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code
assert int(device_details["VendorID"]) == payload.vendor_id
assert int(device_details["ProductID"]) == payload.product_id
assert int(device_details["Discriminator"]
) == payload.pairing_info.discriminator
assert int(device_details["SetUpPINCode"]) == payload.pairing_info.code


def test_device_factory_reset_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.FactoryReset()
assert status.ok() is True


def test_device_reboot_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.Reboot()
assert status == Status.UNIMPLEMENTED


def test_device_ota_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.chip.rpc.Device.TriggerOta()
assert status == Status.UNIMPLEMENTED


def test_lock_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check locked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is True
# Check locked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=True)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is True

# Check unlocked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is False
# Check unlocked
status, payload = pw_client.rpcs.chip.rpc.Locking.Set(locked=False)
assert status.ok() is True
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is False


def test_button_ctrl_rpc(device):
pw_client = PigweedClient(device, RPC_PROTOS)

# Check button 0 (locking)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
initial_state = bool(payload.locked)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0, pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked == compare_state
with PigweedClient(device, RPC_PROTOS) as pw_client:

# Check button 0 (locking)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
initial_state = bool(payload.locked)

compare_state = not initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked is compare_state

compare_state = initial_state
status, payload = pw_client.rpcs.chip.rpc.Button.Event(idx=0,
pushed=True)
assert status.ok() is True
sleep(2)
status, payload = pw_client.rpcs.chip.rpc.Locking.Get()
assert status.ok() is True
assert payload.locked == compare_state
10 changes: 5 additions & 5 deletions src/test_driver/mbed/integration_tests/pigweed-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def test_smoke_test(device):


def test_echo(device):
pw_client = PigweedClient(device, RPC_PROTOS)
status, payload = pw_client.rpcs.pw.rpc.EchoService.Echo(
msg=PW_ECHO_TEST_MESSAGE)
assert status.ok() is True
assert payload.msg == PW_ECHO_TEST_MESSAGE
with PigweedClient(device, RPC_PROTOS) as pw_client:
status, payload = pw_client.rpcs.pw.rpc.EchoService.Echo(
msg=PW_ECHO_TEST_MESSAGE)
assert status.ok() is True
assert payload.msg == PW_ECHO_TEST_MESSAGE

0 comments on commit 3f34c52

Please sign in to comment.