Skip to content

Commit

Permalink
Split TC_MCORE_FS_1_3 to align with the Test Spec (#35274)
Browse files Browse the repository at this point in the history
* [TC_MCORE_FS_1_3] Fix test script according to test plan update

* Separate storage for all used components

* Open commissioning window on TH_FSA_BRIDGE

* Python wrapper for running fabric-admin and fabric-bridge together

* Customize fabric-admin and fabric-bridge RPC ports

* Create storage directory

* Use fabric-sync-app in the TC-MCORE-FS-1.3 script

* Use CommissionerControlCluster to commission TH_SERVER onto DUT

* Auto-link bridge with admin

* Test automation setup

* Terminate apps on SIGTERM and SIGINT

* Open commissioning window on fabric-bridge after adding to FSA

* Commissioning TH_FSA_BRIDGE to DUT_FSA fabric

* Synchronize server from TH to DUT

* Start another instance of app server

* Test if unique ID was synced

* Allow customization for fabric-sync app components

* Final cleanup

* Split test case into two test cases

* Simplify TC_MCORE_FS_1_3 script

* Simplify TC_MCORE_FS_1_4 steps

* Use volatile storage for fabric-sync-app by default

* Add TC_MCORE_FS_1_4 to exceptions

* Get rid of defaults

* Document used options in open commissioning window

* Speed up the pipe read busy loop

* Refactor local output processing

* Improve wait for output

* Add FS-sync tests to CI

* Improve Python code style

* Fix wait for fabric-sync-app start

* Fix asyncio forwarder

* Fixes for review comments
  • Loading branch information
arkq authored Sep 3, 2024
1 parent 50cfda6 commit ce3b4d9
Show file tree
Hide file tree
Showing 5 changed files with 981 additions and 83 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ jobs:
--target linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test \
--target linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test \
--target linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test \
--target linux-x64-fabric-admin-rpc-ipv6only-clang \
--target linux-x64-fabric-bridge-rpc-ipv6only-no-ble-no-wifi-clang \
--target linux-x64-light-data-model-no-unique-id-ipv6only-no-ble-no-wifi-clang \
--target linux-x64-python-bindings \
build \
--copy-artifacts-to objdir-clone \
Expand All @@ -500,6 +503,9 @@ jobs:
echo "CHIP_MICROWAVE_OVEN_APP: out/linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-microwave-oven-app" >> /tmp/test_env.yaml
echo "CHIP_RVC_APP: out/linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-rvc-app" >> /tmp/test_env.yaml
echo "NETWORK_MANAGEMENT_APP: out/linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test/matter-network-manager-app" >> /tmp/test_env.yaml
echo "FABRIC_ADMIN_APP: out/linux-x64-fabric-admin-rpc-ipv6only-clang/fabric-admin" >> /tmp/test_env.yaml
echo "FABRIC_BRIDGE_APP: out/linux-x64-fabric-bridge-rpc-ipv6only-no-ble-no-wifi-clang/fabric-bridge-app" >> /tmp/test_env.yaml
echo "LIGHTING_APP_NO_UNIQUE_ID: out/linux-x64-light-data-model-no-unique-id-ipv6only-no-ble-no-wifi-clang/chip-lighting-app" >> /tmp/test_env.yaml
echo "TRACE_APP: out/trace_data/app-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
echo "TRACE_TEST_JSON: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
echo "TRACE_TEST_PERFETTO: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
Expand Down
318 changes: 318 additions & 0 deletions examples/fabric-admin/scripts/fabric-sync-app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
#!/usr/bin/env python3

# Copyright (c) 2024 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import contextlib
import os
import signal
import sys
from argparse import ArgumentParser
from tempfile import TemporaryDirectory


async def asyncio_stdin() -> asyncio.StreamReader:
"""Wrap sys.stdin in an asyncio StreamReader."""
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
return reader


async def asyncio_stdout(file=sys.stdout) -> asyncio.StreamWriter:
"""Wrap an IO stream in an asyncio StreamWriter."""
loop = asyncio.get_event_loop()
transport, protocol = await loop.connect_write_pipe(
lambda: asyncio.streams.FlowControlMixin(loop=loop),
os.fdopen(file.fileno(), 'wb'))
return asyncio.streams.StreamWriter(transport, protocol, None, loop)


async def forward_f(prefix: bytes, f_in: asyncio.StreamReader,
f_out: asyncio.StreamWriter, cb=None):
"""Forward f_in to f_out with a prefix attached.
This function can optionally feed received lines to a callback function.
"""
while True:
line = await f_in.readline()
if not line:
break
if cb is not None:
cb(line)
f_out.write(prefix)
f_out.write(line)
await f_out.drain()


async def forward_pipe(pipe_path: str, f_out: asyncio.StreamWriter):
"""Forward named pipe to f_out.
Unfortunately, Python does not support async file I/O on named pipes. This
function performs busy waiting with a short asyncio-friendly sleep to read
from the pipe.
"""
fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK)
while True:
try:
data = os.read(fd, 1024)
if data:
f_out.write(data)
if not data:
await asyncio.sleep(0.1)
except BlockingIOError:
await asyncio.sleep(0.1)


async def forward_stdin(f_out: asyncio.StreamWriter):
"""Forward stdin to f_out."""
reader = await asyncio_stdin()
while True:
line = await reader.readline()
if not line:
# Exit on Ctrl-D (EOF).
sys.exit(0)
f_out.write(line)


class Subprocess:

def __init__(self, tag: str, program: str, *args, stdout_cb=None):
self.event = asyncio.Event()
self.tag = tag.encode()
self.program = program
self.args = args
self.stdout_cb = stdout_cb
self.expected_output = None

def _check_output(self, line: bytes):
if self.expected_output is not None and self.expected_output in line:
self.event.set()

async def run(self):
self.p = await asyncio.create_subprocess_exec(self.program, *self.args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
# Add the stdout and stderr processing to the event loop.
asyncio.create_task(forward_f(
self.tag,
self.p.stderr,
await asyncio_stdout(sys.stderr)))
asyncio.create_task(forward_f(
self.tag,
self.p.stdout,
await asyncio_stdout(sys.stdout),
cb=self._check_output))

async def send(self, message: str, expected_output: str = None, timeout: float = None):
"""Send a message to a process and optionally wait for a response."""

if expected_output is not None:
self.expected_output = expected_output.encode()
self.event.clear()

self.p.stdin.write((message + "\n").encode())
await self.p.stdin.drain()

if expected_output is not None:
await asyncio.wait_for(self.event.wait(), timeout=timeout)
self.expected_output = None

async def wait(self):
await self.p.wait()

def terminate(self):
self.p.terminate()


async def run_admin(program, stdout_cb=None, storage_dir=None,
rpc_admin_port=None, rpc_bridge_port=None,
paa_trust_store_path=None, commissioner_name=None,
commissioner_node_id=None, commissioner_vendor_id=None):
args = []
if storage_dir is not None:
args.extend(["--storage-directory", storage_dir])
if rpc_admin_port is not None:
args.extend(["--local-server-port", str(rpc_admin_port)])
if rpc_bridge_port is not None:
args.extend(["--fabric-bridge-server-port", str(rpc_bridge_port)])
if paa_trust_store_path is not None:
args.extend(["--paa-trust-store-path", paa_trust_store_path])
if commissioner_name is not None:
args.extend(["--commissioner-name", commissioner_name])
if commissioner_node_id is not None:
args.extend(["--commissioner-nodeid", str(commissioner_node_id)])
if commissioner_vendor_id is not None:
args.extend(["--commissioner-vendor-id", str(commissioner_vendor_id)])
p = Subprocess("[FS-ADMIN]", program, "interactive", "start", *args,
stdout_cb=stdout_cb)
await p.run()
return p


async def run_bridge(program, storage_dir=None, rpc_admin_port=None,
rpc_bridge_port=None, discriminator=None, passcode=None,
secured_device_port=None):
args = []
if storage_dir is not None:
args.extend(["--KVS",
os.path.join(storage_dir, "chip_fabric_bridge_kvs")])
if rpc_admin_port is not None:
args.extend(["--fabric-admin-server-port", str(rpc_admin_port)])
if rpc_bridge_port is not None:
args.extend(["--local-server-port", str(rpc_bridge_port)])
if discriminator is not None:
args.extend(["--discriminator", str(discriminator)])
if passcode is not None:
args.extend(["--passcode", str(passcode)])
if secured_device_port is not None:
args.extend(["--secured-device-port", str(secured_device_port)])
p = Subprocess("[FS-BRIDGE]", program, *args)
await p.run()
return p


async def main(args):

# Node ID of the bridge on the fabric.
bridge_node_id = 1

if args.commissioner_node_id == bridge_node_id:
raise ValueError(f"NodeID={bridge_node_id} is reserved for the local fabric-bridge")

storage_dir = args.storage_dir
if storage_dir is not None:
os.makedirs(storage_dir, exist_ok=True)
else:
storage = TemporaryDirectory(prefix="fabric-sync-app")
storage_dir = storage.name

pipe = args.stdin_pipe
if pipe and not os.path.exists(pipe):
os.mkfifo(pipe)

def terminate(signum, frame):
admin.terminate()
bridge.terminate()
sys.exit(0)

signal.signal(signal.SIGINT, terminate)
signal.signal(signal.SIGTERM, terminate)

admin, bridge = await asyncio.gather(
run_admin(
args.app_admin,
storage_dir=storage_dir,
rpc_admin_port=args.app_admin_rpc_port,
rpc_bridge_port=args.app_bridge_rpc_port,
paa_trust_store_path=args.paa_trust_store_path,
commissioner_name=args.commissioner_name,
commissioner_node_id=args.commissioner_node_id,
commissioner_vendor_id=args.commissioner_vendor_id,
),
run_bridge(
args.app_bridge,
storage_dir=storage_dir,
rpc_admin_port=args.app_admin_rpc_port,
rpc_bridge_port=args.app_bridge_rpc_port,
secured_device_port=args.secured_device_port,
discriminator=args.discriminator,
passcode=args.passcode,
))

# Wait a bit for apps to start.
await asyncio.sleep(1)

try:
# Check whether the bridge is already commissioned. If it is,
# we will get the response, otherwise we will hit timeout.
await admin.send(
f"descriptor read device-type-list {bridge_node_id} 1 --timeout 1",
# Log message which should appear in the fabric-admin output if
# the bridge is already commissioned.
expected_output="Reading attribute: Cluster=0x0000_001D Endpoint=0x1 AttributeId=0x0000_0000",
timeout=1.5)
except asyncio.TimeoutError:
# Commission the bridge to the admin.
cmd = f"fabricsync add-local-bridge {bridge_node_id}"
if args.passcode is not None:
cmd += f" --setup-pin-code {args.passcode}"
if args.secured_device_port is not None:
cmd += f" --local-port {args.secured_device_port}"
await admin.send(
cmd,
# Wait for the log message indicating that the bridge has been
# added to the fabric.
f"Commissioning complete for node ID {bridge_node_id:#018x}: success")

# Open commissioning window with original setup code for the bridge.
cw_endpoint_id = 0
cw_option = 0 # 0: Original setup code, 1: New setup code
cw_timeout = 600
cw_iteration = 1000
cw_discriminator = 0
await admin.send(f"pairing open-commissioning-window {bridge_node_id} {cw_endpoint_id}"
f" {cw_option} {cw_timeout} {cw_iteration} {cw_discriminator}")

try:
await asyncio.gather(
forward_pipe(pipe, admin.p.stdin) if pipe else forward_stdin(admin.p.stdin),
admin.wait(),
bridge.wait(),
)
except SystemExit:
admin.terminate()
bridge.terminate()
except Exception:
admin.terminate()
bridge.terminate()
raise


if __name__ == "__main__":
parser = ArgumentParser(description="Fabric-Sync Example Application")
parser.add_argument("--app-admin", metavar="PATH",
default="out/linux-x64-fabric-admin-rpc/fabric-admin",
help="path to the fabric-admin executable; default=%(default)s")
parser.add_argument("--app-bridge", metavar="PATH",
default="out/linux-x64-fabric-bridge-rpc/fabric-bridge-app",
help="path to the fabric-bridge executable; default=%(default)s")
parser.add_argument("--app-admin-rpc-port", metavar="PORT", type=int,
help="fabric-admin RPC server port")
parser.add_argument("--app-bridge-rpc-port", metavar="PORT", type=int,
help="fabric-bridge RPC server port")
parser.add_argument("--stdin-pipe", metavar="PATH",
help="read input from a named pipe instead of stdin")
parser.add_argument("--storage-dir", metavar="PATH",
help=("directory to place storage files in; by default "
"volatile storage is used"))
parser.add_argument("--paa-trust-store-path", metavar="PATH",
help="path to directory holding PAA certificates")
parser.add_argument("--commissioner-name", metavar="NAME",
help="commissioner name to use for the admin")
parser.add_argument("--commissioner-node-id", metavar="NUM", type=int,
help="commissioner node ID to use for the admin")
parser.add_argument("--commissioner-vendor-id", metavar="NUM", type=int,
help="commissioner vendor ID to use for the admin")
parser.add_argument("--secured-device-port", metavar="NUM", type=int,
help="secure messages listen port to use for the bridge")
parser.add_argument("--discriminator", metavar="NUM", type=int,
help="discriminator to use for the bridge")
parser.add_argument("--passcode", metavar="NUM", type=int,
help="passcode to use for the bridge")
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(main(parser.parse_args()))
Loading

0 comments on commit ce3b4d9

Please sign in to comment.