diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index b37ed3c6ca4370..5265f82b09454c 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -486,6 +486,9 @@ jobs: --target linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test \ --target linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test \ --target linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test \ + --target linux-x64-fabric-admin-rpc-ipv6only-clang \ + --target linux-x64-fabric-bridge-rpc-ipv6only-no-ble-no-wifi-clang \ + --target linux-x64-light-data-model-no-unique-id-ipv6only-no-ble-no-wifi-clang \ --target linux-x64-python-bindings \ build \ --copy-artifacts-to objdir-clone \ @@ -500,6 +503,9 @@ jobs: echo "CHIP_MICROWAVE_OVEN_APP: out/linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-microwave-oven-app" >> /tmp/test_env.yaml echo "CHIP_RVC_APP: out/linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-rvc-app" >> /tmp/test_env.yaml echo "NETWORK_MANAGEMENT_APP: out/linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test/matter-network-manager-app" >> /tmp/test_env.yaml + echo "FABRIC_ADMIN_APP: out/linux-x64-fabric-admin-rpc-ipv6only-clang/fabric-admin" >> /tmp/test_env.yaml + echo "FABRIC_BRIDGE_APP: out/linux-x64-fabric-bridge-rpc-ipv6only-no-ble-no-wifi-clang/fabric-bridge-app" >> /tmp/test_env.yaml + echo "LIGHTING_APP_NO_UNIQUE_ID: out/linux-x64-light-data-model-no-unique-id-ipv6only-no-ble-no-wifi-clang/chip-lighting-app" >> /tmp/test_env.yaml echo "TRACE_APP: out/trace_data/app-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml echo "TRACE_TEST_JSON: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml echo "TRACE_TEST_PERFETTO: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml diff --git a/examples/fabric-admin/scripts/fabric-sync-app.py b/examples/fabric-admin/scripts/fabric-sync-app.py new file mode 100755 index 00000000000000..c6faed8b67ac00 --- /dev/null +++ b/examples/fabric-admin/scripts/fabric-sync-app.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2024 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import contextlib +import os +import signal +import sys +from argparse import ArgumentParser +from tempfile import TemporaryDirectory + + +async def asyncio_stdin() -> asyncio.StreamReader: + """Wrap sys.stdin in an asyncio StreamReader.""" + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader() + protocol = asyncio.StreamReaderProtocol(reader) + await loop.connect_read_pipe(lambda: protocol, sys.stdin) + return reader + + +async def asyncio_stdout(file=sys.stdout) -> asyncio.StreamWriter: + """Wrap an IO stream in an asyncio StreamWriter.""" + loop = asyncio.get_event_loop() + transport, protocol = await loop.connect_write_pipe( + lambda: asyncio.streams.FlowControlMixin(loop=loop), + os.fdopen(file.fileno(), 'wb')) + return asyncio.streams.StreamWriter(transport, protocol, None, loop) + + +async def forward_f(prefix: bytes, f_in: asyncio.StreamReader, + f_out: asyncio.StreamWriter, cb=None): + """Forward f_in to f_out with a prefix attached. + + This function can optionally feed received lines to a callback function. + """ + while True: + line = await f_in.readline() + if not line: + break + if cb is not None: + cb(line) + f_out.write(prefix) + f_out.write(line) + await f_out.drain() + + +async def forward_pipe(pipe_path: str, f_out: asyncio.StreamWriter): + """Forward named pipe to f_out. + + Unfortunately, Python does not support async file I/O on named pipes. This + function performs busy waiting with a short asyncio-friendly sleep to read + from the pipe. + """ + fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK) + while True: + try: + data = os.read(fd, 1024) + if data: + f_out.write(data) + if not data: + await asyncio.sleep(0.1) + except BlockingIOError: + await asyncio.sleep(0.1) + + +async def forward_stdin(f_out: asyncio.StreamWriter): + """Forward stdin to f_out.""" + reader = await asyncio_stdin() + while True: + line = await reader.readline() + if not line: + # Exit on Ctrl-D (EOF). + sys.exit(0) + f_out.write(line) + + +class Subprocess: + + def __init__(self, tag: str, program: str, *args, stdout_cb=None): + self.event = asyncio.Event() + self.tag = tag.encode() + self.program = program + self.args = args + self.stdout_cb = stdout_cb + self.expected_output = None + + def _check_output(self, line: bytes): + if self.expected_output is not None and self.expected_output in line: + self.event.set() + + async def run(self): + self.p = await asyncio.create_subprocess_exec(self.program, *self.args, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + # Add the stdout and stderr processing to the event loop. + asyncio.create_task(forward_f( + self.tag, + self.p.stderr, + await asyncio_stdout(sys.stderr))) + asyncio.create_task(forward_f( + self.tag, + self.p.stdout, + await asyncio_stdout(sys.stdout), + cb=self._check_output)) + + async def send(self, message: str, expected_output: str = None, timeout: float = None): + """Send a message to a process and optionally wait for a response.""" + + if expected_output is not None: + self.expected_output = expected_output.encode() + self.event.clear() + + self.p.stdin.write((message + "\n").encode()) + await self.p.stdin.drain() + + if expected_output is not None: + await asyncio.wait_for(self.event.wait(), timeout=timeout) + self.expected_output = None + + async def wait(self): + await self.p.wait() + + def terminate(self): + self.p.terminate() + + +async def run_admin(program, stdout_cb=None, storage_dir=None, + rpc_admin_port=None, rpc_bridge_port=None, + paa_trust_store_path=None, commissioner_name=None, + commissioner_node_id=None, commissioner_vendor_id=None): + args = [] + if storage_dir is not None: + args.extend(["--storage-directory", storage_dir]) + if rpc_admin_port is not None: + args.extend(["--local-server-port", str(rpc_admin_port)]) + if rpc_bridge_port is not None: + args.extend(["--fabric-bridge-server-port", str(rpc_bridge_port)]) + if paa_trust_store_path is not None: + args.extend(["--paa-trust-store-path", paa_trust_store_path]) + if commissioner_name is not None: + args.extend(["--commissioner-name", commissioner_name]) + if commissioner_node_id is not None: + args.extend(["--commissioner-nodeid", str(commissioner_node_id)]) + if commissioner_vendor_id is not None: + args.extend(["--commissioner-vendor-id", str(commissioner_vendor_id)]) + p = Subprocess("[FS-ADMIN]", program, "interactive", "start", *args, + stdout_cb=stdout_cb) + await p.run() + return p + + +async def run_bridge(program, storage_dir=None, rpc_admin_port=None, + rpc_bridge_port=None, discriminator=None, passcode=None, + secured_device_port=None): + args = [] + if storage_dir is not None: + args.extend(["--KVS", + os.path.join(storage_dir, "chip_fabric_bridge_kvs")]) + if rpc_admin_port is not None: + args.extend(["--fabric-admin-server-port", str(rpc_admin_port)]) + if rpc_bridge_port is not None: + args.extend(["--local-server-port", str(rpc_bridge_port)]) + if discriminator is not None: + args.extend(["--discriminator", str(discriminator)]) + if passcode is not None: + args.extend(["--passcode", str(passcode)]) + if secured_device_port is not None: + args.extend(["--secured-device-port", str(secured_device_port)]) + p = Subprocess("[FS-BRIDGE]", program, *args) + await p.run() + return p + + +async def main(args): + + # Node ID of the bridge on the fabric. + bridge_node_id = 1 + + if args.commissioner_node_id == bridge_node_id: + raise ValueError(f"NodeID={bridge_node_id} is reserved for the local fabric-bridge") + + storage_dir = args.storage_dir + if storage_dir is not None: + os.makedirs(storage_dir, exist_ok=True) + else: + storage = TemporaryDirectory(prefix="fabric-sync-app") + storage_dir = storage.name + + pipe = args.stdin_pipe + if pipe and not os.path.exists(pipe): + os.mkfifo(pipe) + + def terminate(signum, frame): + admin.terminate() + bridge.terminate() + sys.exit(0) + + signal.signal(signal.SIGINT, terminate) + signal.signal(signal.SIGTERM, terminate) + + admin, bridge = await asyncio.gather( + run_admin( + args.app_admin, + storage_dir=storage_dir, + rpc_admin_port=args.app_admin_rpc_port, + rpc_bridge_port=args.app_bridge_rpc_port, + paa_trust_store_path=args.paa_trust_store_path, + commissioner_name=args.commissioner_name, + commissioner_node_id=args.commissioner_node_id, + commissioner_vendor_id=args.commissioner_vendor_id, + ), + run_bridge( + args.app_bridge, + storage_dir=storage_dir, + rpc_admin_port=args.app_admin_rpc_port, + rpc_bridge_port=args.app_bridge_rpc_port, + secured_device_port=args.secured_device_port, + discriminator=args.discriminator, + passcode=args.passcode, + )) + + # Wait a bit for apps to start. + await asyncio.sleep(1) + + try: + # Check whether the bridge is already commissioned. If it is, + # we will get the response, otherwise we will hit timeout. + await admin.send( + f"descriptor read device-type-list {bridge_node_id} 1 --timeout 1", + # Log message which should appear in the fabric-admin output if + # the bridge is already commissioned. + expected_output="Reading attribute: Cluster=0x0000_001D Endpoint=0x1 AttributeId=0x0000_0000", + timeout=1.5) + except asyncio.TimeoutError: + # Commission the bridge to the admin. + cmd = f"fabricsync add-local-bridge {bridge_node_id}" + if args.passcode is not None: + cmd += f" --setup-pin-code {args.passcode}" + if args.secured_device_port is not None: + cmd += f" --local-port {args.secured_device_port}" + await admin.send( + cmd, + # Wait for the log message indicating that the bridge has been + # added to the fabric. + f"Commissioning complete for node ID {bridge_node_id:#018x}: success") + + # Open commissioning window with original setup code for the bridge. + cw_endpoint_id = 0 + cw_option = 0 # 0: Original setup code, 1: New setup code + cw_timeout = 600 + cw_iteration = 1000 + cw_discriminator = 0 + await admin.send(f"pairing open-commissioning-window {bridge_node_id} {cw_endpoint_id}" + f" {cw_option} {cw_timeout} {cw_iteration} {cw_discriminator}") + + try: + await asyncio.gather( + forward_pipe(pipe, admin.p.stdin) if pipe else forward_stdin(admin.p.stdin), + admin.wait(), + bridge.wait(), + ) + except SystemExit: + admin.terminate() + bridge.terminate() + except Exception: + admin.terminate() + bridge.terminate() + raise + + +if __name__ == "__main__": + parser = ArgumentParser(description="Fabric-Sync Example Application") + parser.add_argument("--app-admin", metavar="PATH", + default="out/linux-x64-fabric-admin-rpc/fabric-admin", + help="path to the fabric-admin executable; default=%(default)s") + parser.add_argument("--app-bridge", metavar="PATH", + default="out/linux-x64-fabric-bridge-rpc/fabric-bridge-app", + help="path to the fabric-bridge executable; default=%(default)s") + parser.add_argument("--app-admin-rpc-port", metavar="PORT", type=int, + help="fabric-admin RPC server port") + parser.add_argument("--app-bridge-rpc-port", metavar="PORT", type=int, + help="fabric-bridge RPC server port") + parser.add_argument("--stdin-pipe", metavar="PATH", + help="read input from a named pipe instead of stdin") + parser.add_argument("--storage-dir", metavar="PATH", + help=("directory to place storage files in; by default " + "volatile storage is used")) + parser.add_argument("--paa-trust-store-path", metavar="PATH", + help="path to directory holding PAA certificates") + parser.add_argument("--commissioner-name", metavar="NAME", + help="commissioner name to use for the admin") + parser.add_argument("--commissioner-node-id", metavar="NUM", type=int, + help="commissioner node ID to use for the admin") + parser.add_argument("--commissioner-vendor-id", metavar="NUM", type=int, + help="commissioner vendor ID to use for the admin") + parser.add_argument("--secured-device-port", metavar="NUM", type=int, + help="secure messages listen port to use for the bridge") + parser.add_argument("--discriminator", metavar="NUM", type=int, + help="discriminator to use for the bridge") + parser.add_argument("--passcode", metavar="NUM", type=int, + help="passcode to use for the bridge") + with contextlib.suppress(KeyboardInterrupt): + asyncio.run(main(parser.parse_args())) diff --git a/src/python_testing/TC_MCORE_FS_1_3.py b/src/python_testing/TC_MCORE_FS_1_3.py index 4245bc139ceb38..1a18896c055952 100644 --- a/src/python_testing/TC_MCORE_FS_1_3.py +++ b/src/python_testing/TC_MCORE_FS_1_3.py @@ -15,15 +15,31 @@ # limitations under the License. # -# This test requires a TH_SERVER application that returns UnsupportedAttribute when reading UniqueID from BasicInformation Cluster. Please specify with --string-arg th_server_no_uid_app_path: +# This test requires a TH_SERVER_NO_UID application that returns UnsupportedAttribute +# when reading UniqueID from BasicInformation Cluster. Please specify the app +# location with --string-arg th_server_no_uid_app_path: +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments +# for details about the block below. +# +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: run1 +# test-runner-run/run1/app: examples/fabric-admin/scripts/fabric-sync-app.py +# test-runner-run/run1/app-args: --app-admin=${FABRIC_ADMIN_APP} --app-bridge=${FABRIC_BRIDGE_APP} --stdin-pipe=dut-fsa-stdin --discriminator=1234 +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_server_no_uid_app_path:${LIGHTING_APP_NO_UNIQUE_ID} +# test-runner-run/run1/script-start-delay: 5 +# test-runner-run/run1/quiet: false +# === END CI TEST ARGUMENTS === + +import asyncio import logging import os import random -import signal import subprocess -import time -import uuid +import sys +import tempfile +import threading import chip.clusters as Clusters from chip import ChipDeviceCtrl @@ -32,109 +48,224 @@ from mobly import asserts +# TODO: Make this class more generic. Issue #35348 +class Subprocess(threading.Thread): + + def __init__(self, args: list = [], tag="", **kw): + super().__init__(**kw) + self.tag = f"[{tag}] " if tag else "" + self.args = args + + def forward_f(self, f_in, f_out): + while True: + line = f_in.readline() + if not line: + break + f_out.write(f"{self.tag}{line}") + f_out.flush() + + def run(self): + logging.info("RUN: %s", " ".join(self.args)) + self.p = subprocess.Popen(self.args, errors="ignore", stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Forward stdout and stderr with a tag attached. + t1 = threading.Thread(target=self.forward_f, args=[self.p.stdout, sys.stdout]) + t1.start() + t2 = threading.Thread(target=self.forward_f, args=[self.p.stderr, sys.stderr]) + t2.start() + # Wait for the process to finish. + self.p.wait() + t1.join() + t2.join() + + def stop(self): + self.p.terminate() + self.join() + + +class AppServer: + + def __init__(self, app, storage_dir, port=None, discriminator=None, passcode=None): + + args = [app] + args.extend(["--KVS", tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-")[1]]) + args.extend(['--secured-device-port', str(port)]) + args.extend(["--discriminator", str(discriminator)]) + args.extend(["--passcode", str(passcode)]) + self.app = Subprocess(args, tag="SERVER") + self.app.start() + + def stop(self): + self.app.stop() + + class TC_MCORE_FS_1_3(MatterBaseTest): - @async_test_body - async def setup_class(self): + + @property + def default_timeout(self) -> int: + # This test has some manual steps, so we need a longer timeout. + return 200 + + def setup_class(self): super().setup_class() - self.th_server_nodeid = 1111 - self.th_server_kvs = None - self.th_server_port = 5543 - self.app_process_for_dut_eco = None + self.th_server = None + self.storage = None - # Create a second controller on a new fabric to communicate to the server - new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() - new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2) - paa_path = str(self.matter_test_config.paa_trust_store_path) - self.TH_server_controller = new_fabric_admin.NewController(nodeId=112233, paaTrustStorePath=paa_path) + # Get the path to the TH_SERVER_NO_UID app from the user params. + th_server_app = self.user_params.get("th_server_no_uid_app_path", None) + if not th_server_app: + asserts.fail("This test requires a TH_SERVER_NO_UID app. Specify app path with --string-arg th_server_no_uid_app_path:") + if not os.path.exists(th_server_app): + asserts.fail(f"The path {th_server_app} does not exist") - def teardown_class(self): - if self.app_process_for_dut_eco is not None: - logging.warning("Stopping app with SIGTERM") - self.app_process_for_dut_eco.send_signal(signal.SIGTERM.value) - self.app_process_for_dut_eco.wait() + # Create a temporary storage directory for keeping KVS files. + self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__) + logging.info("Temporary storage directory: %s", self.storage.name) + + self.th_server_port = 5544 + self.th_server_discriminator = random.randint(0, 4095) + self.th_server_passcode = 20202021 - os.remove(self.th_server_kvs) + # Start the TH_SERVER_NO_UID app. + self.th_server = AppServer( + th_server_app, + storage_dir=self.storage.name, + port=self.th_server_port, + discriminator=self.th_server_discriminator, + passcode=self.th_server_passcode) + + def teardown_class(self): + if self.th_server is not None: + self.th_server.stop() + if self.storage is not None: + self.storage.cleanup() super().teardown_class() - async def create_device_and_commission_to_th_fabric(self, kvs, port, node_id_for_th, device_info): - app = self.user_params.get("th_server_no_uid_app_path", None) - if not app: - asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_no_uid_app_path:') + def steps_TC_MCORE_FS_1_3(self) -> list[TestStep]: + return [ + TestStep(0, "Commission DUT if not done", is_commissioning=True), + TestStep(1, "TH commissions TH_SERVER_NO_UID to TH's fabric"), + TestStep(2, "DUT_FSA commissions TH_SERVER_NO_UID to DUT_FSA's fabric and generates a UniqueID.", + "TH verifies a value is visible for the UniqueID from the DUT_FSA's Bridged Device Basic Information Cluster."), + ] - if not os.path.exists(app): - asserts.fail(f'The path {app} does not exist') + async def commission_via_commissioner_control(self, controller_node_id: int, device_node_id: int): + """Commission device_node_id to controller_node_id using CommissionerControl cluster.""" - discriminator = random.randint(0, 4095) - passcode = 20202021 + request_id = random.randint(0, 0xFFFFFFFFFFFFFFFF) - cmd = [app] - cmd.extend(['--secured-device-port', str(port)]) - cmd.extend(['--discriminator', str(discriminator)]) - cmd.extend(['--passcode', str(passcode)]) - cmd.extend(['--KVS', kvs]) + vendor_id = await self.read_single_attribute_check_success( + node_id=device_node_id, + cluster=Clusters.BasicInformation, + attribute=Clusters.BasicInformation.Attributes.VendorID, + ) - # TODO: Determine if we want these logs cooked or pushed to somewhere else - logging.info(f"Starting TH device for {device_info}") - self.app_process_for_dut_eco = subprocess.Popen(cmd) - logging.info(f"Started TH device for {device_info}") - time.sleep(3) + product_id = await self.read_single_attribute_check_success( + node_id=device_node_id, + cluster=Clusters.BasicInformation, + attribute=Clusters.BasicInformation.Attributes.ProductID, + ) - logging.info("Commissioning from separate fabric") - await self.TH_server_controller.CommissionOnNetwork(nodeId=node_id_for_th, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator) - logging.info("Commissioning device for DUT ecosystem onto TH for managing") + await self.send_single_cmd( + node_id=controller_node_id, + cmd=Clusters.CommissionerControl.Commands.RequestCommissioningApproval( + requestId=request_id, + vendorId=vendor_id, + productId=product_id, + ), + ) - def steps_TC_MCORE_FS_1_3(self) -> list[TestStep]: - steps = [TestStep(1, "TH commissions TH_SERVER to TH’s fabric.", is_commissioning=True), - TestStep(2, "DUT_FSA commissions TH_SERVER to DUT_FSA’s fabric and generates a UniqueID.")] - return steps + if not self.is_ci: + self.wait_for_user_input("Approve Commissioning Approval Request on DUT using manufacturer specified mechanism") + + resp = await self.send_single_cmd( + node_id=controller_node_id, + cmd=Clusters.CommissionerControl.Commands.CommissionNode( + requestId=request_id, + responseTimeoutSeconds=30, + ), + ) + + asserts.assert_equal(type(resp), Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow, + "Incorrect response type") + + await self.send_single_cmd( + node_id=device_node_id, + cmd=Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow( + commissioningTimeout=3*60, + PAKEPasscodeVerifier=resp.PAKEPasscodeVerifier, + discriminator=resp.discriminator, + iterations=resp.iterations, + salt=resp.salt, + ), + timedRequestTimeoutMs=5000, + ) @async_test_body async def test_TC_MCORE_FS_1_3(self): self.is_ci = self.check_pics('PICS_SDK_CI_ONLY') - self.print_step(0, "Commissioning DUT to TH, already done") + + # Commissioning - done + self.step(0) self.step(1) - root_node_endpoint = 0 - root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint) - set_of_endpoints_before_adding_device = set(root_part_list) - logging.info(f"Set of endpoints before adding the device: {set_of_endpoints_before_adding_device}") - kvs = f'kvs_{str(uuid.uuid4())}' - device_info = "for TH ecosystem" - await self.create_device_and_commission_to_th_fabric(kvs, self.th_server_port, self.th_server_nodeid, device_info) + th_server_th_node_id = 1 + await self.default_controller.CommissionOnNetwork( + nodeId=th_server_th_node_id, + setupPinCode=self.th_server_passcode, + filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, + filter=self.th_server_discriminator, + ) - self.th_server_kvs = kvs - read_result = await self.TH_server_controller.ReadAttribute(self.th_server_nodeid, [(root_node_endpoint, Clusters.BasicInformation.Attributes.UniqueID)]) - result = read_result[root_node_endpoint][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.UniqueID] - asserts.assert_true(type_matches(result, Clusters.Attribute.ValueDecodeFailure), "We were expecting a value decode failure") - asserts.assert_equal(result.Reason.status, Status.UnsupportedAttribute, "Incorrect error returned from reading UniqueID") + await self.read_single_attribute_expect_error( + cluster=Clusters.BasicInformation, + attribute=Clusters.BasicInformation.Attributes.UniqueID, + node_id=th_server_th_node_id, + error=Status.UnsupportedAttribute, + ) self.step(2) - params = await self.openCommissioningWindow(dev_ctrl=self.TH_server_controller, node_id=self.th_server_nodeid) - - self.wait_for_user_input( - prompt_msg=f"Using the DUT vendor's provided interface, commission the device using the following parameters:\n" - f"- discriminator: {params.randomDiscriminator}\n" - f"- setupPinCode: {params.commissioningParameters.setupPinCode}\n" - f"- setupQRCode: {params.commissioningParameters.setupQRCode}\n" - f"- setupManualcode: {params.commissioningParameters.setupManualCode}\n" - f"If using FabricSync Admin, you may type:\n" - f">>> pairing onnetwork {params.commissioningParameters.setupPinCode}") - - root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint) - set_of_endpoints_after_adding_device = set(root_part_list) - logging.info(f"Set of endpoints after adding the device: {set_of_endpoints_after_adding_device}") - - asserts.assert_true(set_of_endpoints_after_adding_device.issuperset( - set_of_endpoints_before_adding_device), "Expected only new endpoints to be added") - unique_endpoints_set = set_of_endpoints_after_adding_device - set_of_endpoints_before_adding_device - asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint") - newly_added_endpoint = list(unique_endpoints_set)[0] - - th_sed_dut_unique_id = await self.read_single_attribute_check_success(cluster=Clusters.BridgedDeviceBasicInformation, attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID, endpoint=newly_added_endpoint) - asserts.assert_true(type_matches(th_sed_dut_unique_id, str), "UniqueID should be a string") - asserts.assert_true(th_sed_dut_unique_id, "UniqueID should not be an empty string") + + # Get the list of endpoints on the DUT_FSA_BRIDGE before adding the TH_SERVER_NO_UID. + dut_fsa_bridge_endpoints = set(await self.read_single_attribute_check_success( + cluster=Clusters.Descriptor, + attribute=Clusters.Descriptor.Attributes.PartsList, + node_id=self.dut_node_id, + endpoint=0, + )) + + await self.commission_via_commissioner_control( + controller_node_id=self.dut_node_id, + device_node_id=th_server_th_node_id) + + # Wait for the device to appear on the DUT_FSA_BRIDGE. + await asyncio.sleep(2) + + # Get the list of endpoints on the DUT_FSA_BRIDGE after adding the TH_SERVER_NO_UID. + dut_fsa_bridge_endpoints_new = set(await self.read_single_attribute_check_success( + cluster=Clusters.Descriptor, + attribute=Clusters.Descriptor.Attributes.PartsList, + node_id=self.dut_node_id, + endpoint=0, + )) + + # Get the endpoint number for just added TH_SERVER_NO_UID. + logging.info("Endpoints on DUT_FSA_BRIDGE: old=%s, new=%s", dut_fsa_bridge_endpoints, dut_fsa_bridge_endpoints_new) + asserts.assert_true(dut_fsa_bridge_endpoints_new.issuperset(dut_fsa_bridge_endpoints), + "Expected only new endpoints to be added") + unique_endpoints_set = dut_fsa_bridge_endpoints_new - dut_fsa_bridge_endpoints + asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint on DUT_FSA") + dut_fsa_bridge_th_server_endpoint = list(unique_endpoints_set)[0] + + dut_fsa_bridge_th_server_unique_id = await self.read_single_attribute_check_success( + cluster=Clusters.BridgedDeviceBasicInformation, + attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID, + endpoint=dut_fsa_bridge_th_server_endpoint) + asserts.assert_true(type_matches(dut_fsa_bridge_th_server_unique_id, str), "UniqueID should be a string") + asserts.assert_true(dut_fsa_bridge_th_server_unique_id, "UniqueID should not be an empty string") + logging.info("UniqueID generated for TH_SERVER_NO_UID: %s", dut_fsa_bridge_th_server_unique_id) if __name__ == "__main__": diff --git a/src/python_testing/TC_MCORE_FS_1_4.py b/src/python_testing/TC_MCORE_FS_1_4.py new file mode 100644 index 00000000000000..8e05c2dd7e9c3d --- /dev/null +++ b/src/python_testing/TC_MCORE_FS_1_4.py @@ -0,0 +1,442 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This test requires a TH_SERVER_NO_UID application that returns UnsupportedAttribute +# when reading UniqueID from BasicInformation Cluster. Please specify the app +# location with --string-arg th_server_no_uid_app_path: + +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments +# for details about the block below. +# +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: run1 +# test-runner-run/run1/app: examples/fabric-admin/scripts/fabric-sync-app.py +# test-runner-run/run1/app-args: --app-admin=${FABRIC_ADMIN_APP} --app-bridge=${FABRIC_BRIDGE_APP} --stdin-pipe=dut-fsa-stdin --discriminator=1234 +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_fsa_app_path:examples/fabric-admin/scripts/fabric-sync-app.py th_fsa_admin_path:${FABRIC_ADMIN_APP} th_fsa_bridge_path:${FABRIC_BRIDGE_APP} th_server_no_uid_app_path:${LIGHTING_APP_NO_UNIQUE_ID} dut_fsa_stdin_pipe:dut-fsa-stdin +# test-runner-run/run1/script-start-delay: 5 +# test-runner-run/run1/quiet: false +# === END CI TEST ARGUMENTS === + +import asyncio +import logging +import os +import random +import subprocess +import sys +import tempfile +import threading + +import chip.clusters as Clusters +from chip import ChipDeviceCtrl +from chip.interaction_model import Status +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches +from mobly import asserts + + +# TODO: Make this class more generic. Issue #35348 +class Subprocess(threading.Thread): + + def __init__(self, args: list = [], stdout_cb=None, tag="", **kw): + super().__init__(**kw) + self.tag = f"[{tag}] " if tag else "" + self.stdout_cb = stdout_cb + self.args = args + + def forward_f(self, f_in, f_out): + while True: + line = f_in.readline() + if not line: + break + f_out.write(f"{self.tag}{line}") + f_out.flush() + if self.stdout_cb is not None: + self.stdout_cb(line) + + def run(self): + logging.info("RUN: %s", " ".join(self.args)) + self.p = subprocess.Popen(self.args, errors="ignore", stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Forward stdout and stderr with a tag attached. + forwarding_stdout_thread = threading.Thread(target=self.forward_f, args=[self.p.stdout, sys.stdout]) + forwarding_stdout_thread.start() + forwarding_stderr_thread = threading.Thread(target=self.forward_f, args=[self.p.stderr, sys.stderr]) + forwarding_stderr_thread.start() + # Wait for the process to finish. + self.p.wait() + forwarding_stdout_thread.join() + forwarding_stderr_thread.join() + + def stop(self): + self.p.terminate() + self.join() + + +class FabricSyncApp: + + def _process_admin_output(self, line): + if self.wait_for_text_text is not None and self.wait_for_text_text in line: + self.wait_for_text_event.set() + + def wait_for_text(self, timeout=30): + if not self.wait_for_text_event.wait(timeout=timeout): + raise Exception(f"Timeout waiting for text: {self.wait_for_text_text}") + self.wait_for_text_event.clear() + self.wait_for_text_text = None + + def __init__(self, fabric_sync_app_path, fabric_admin_app_path, fabric_bridge_app_path, + storage_dir, fabric_name=None, node_id=None, vendor_id=None, + paa_trust_store_path=None, bridge_port=None, bridge_discriminator=None, + bridge_passcode=None): + + self.wait_for_text_event = threading.Event() + self.wait_for_text_text = None + + args = [fabric_sync_app_path] + args.append(f"--app-admin={fabric_admin_app_path}") + args.append(f"--app-bridge={fabric_bridge_app_path}") + # Override default ports, so it will be possible to run + # our TH_FSA alongside the DUT_FSA during CI testing. + args.append("--app-admin-rpc-port=44000") + args.append("--app-bridge-rpc-port=44001") + # Keep the storage directory in a temporary location. + args.append(f"--storage-dir={storage_dir}") + if paa_trust_store_path is not None: + args.append(f"--paa-trust-store-path={paa_trust_store_path}") + if fabric_name is not None: + args.append(f"--commissioner-name={fabric_name}") + if node_id is not None: + args.append(f"--commissioner-node-id={node_id}") + args.append(f"--commissioner-vendor-id={vendor_id}") + args.append(f"--secured-device-port={bridge_port}") + args.append(f"--discriminator={bridge_discriminator}") + args.append(f"--passcode={bridge_passcode}") + + self.fabric_sync_app = Subprocess(args, stdout_cb=self._process_admin_output) + self.wait_for_text_text = "Successfully opened pairing window on the device" + self.fabric_sync_app.start() + + # Wait for the fabric-sync-app to be ready. + self.wait_for_text() + + def commission_on_network(self, node_id, setup_pin_code=None, filter_type=None, filter=None): + self.wait_for_text_text = f"Commissioning complete for node ID {node_id:#018x}: success" + # Send the commissioning command to the admin. + self.fabric_sync_app.p.stdin.write(f"pairing onnetwork {node_id} {setup_pin_code}\n") + self.fabric_sync_app.p.stdin.flush() + # Wait for success message. + self.wait_for_text() + + def stop(self): + self.fabric_sync_app.stop() + + +class AppServer: + + def __init__(self, app, storage_dir, port=None, discriminator=None, passcode=None): + + args = [app] + args.extend(["--KVS", tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-")[1]]) + args.extend(['--secured-device-port', str(port)]) + args.extend(["--discriminator", str(discriminator)]) + args.extend(["--passcode", str(passcode)]) + self.app = Subprocess(args, tag="SERVER") + self.app.start() + + def stop(self): + self.app.stop() + + +class TC_MCORE_FS_1_4(MatterBaseTest): + + @property + def default_timeout(self) -> int: + # This test has some manual steps, so we need a longer timeout. + return 200 + + def setup_class(self): + super().setup_class() + + self.th_fsa_controller = None + self.th_server = None + self.storage = None + + # Get the path to the TH_FSA (fabric-admin and fabric-bridge) app from the user params. + th_fsa_app_path = self.user_params.get("th_fsa_app_path") + if not th_fsa_app_path: + asserts.fail("This test requires a TH_FSA app. Specify app path with --string-arg th_fsa_app_path:") + if not os.path.exists(th_fsa_app_path): + asserts.fail(f"The path {th_fsa_app_path} does not exist") + th_fsa_admin_path = self.user_params.get("th_fsa_admin_path") + if not th_fsa_admin_path: + asserts.fail("This test requires a TH_FSA_ADMIN app. Specify app path with --string-arg th_fsa_admin_path:") + if not os.path.exists(th_fsa_admin_path): + asserts.fail(f"The path {th_fsa_admin_path} does not exist") + th_fsa_bridge_path = self.user_params.get("th_fsa_bridge_path") + if not th_fsa_bridge_path: + asserts.fail("This test requires a TH_FSA_BRIDGE app. Specify app path with --string-arg th_fsa_bridge_path:") + if not os.path.exists(th_fsa_bridge_path): + asserts.fail(f"The path {th_fsa_bridge_path} does not exist") + + # Get the path to the TH_SERVER_NO_UID app from the user params. + th_server_app = self.user_params.get("th_server_no_uid_app_path", None) + if not th_server_app: + asserts.fail("This test requires a TH_SERVER_NO_UID app. Specify app path with --string-arg th_server_no_uid_app_path:") + if not os.path.exists(th_server_app): + asserts.fail(f"The path {th_server_app} does not exist") + + # Create a temporary storage directory for keeping KVS files. + self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__) + logging.info("Temporary storage directory: %s", self.storage.name) + + self.th_fsa_bridge_address = "::1" + self.th_fsa_bridge_port = 5543 + # Random discriminator between 0 and MAX - 1. The one-less is to save + # a room for the TH_SERVER_NO_UID discriminator. + self.th_fsa_bridge_discriminator = random.randint(0, 4094) + self.th_fsa_bridge_passcode = 20202021 + + self.th_fsa_controller = FabricSyncApp( + th_fsa_app_path, + th_fsa_admin_path, + th_fsa_bridge_path, + storage_dir=self.storage.name, + paa_trust_store_path=self.matter_test_config.paa_trust_store_path, + bridge_port=self.th_fsa_bridge_port, + bridge_discriminator=self.th_fsa_bridge_discriminator, + bridge_passcode=self.th_fsa_bridge_passcode, + vendor_id=0xFFF1) + + # Get the named pipe path for the DUT_FSA app input from the user params. + dut_fsa_stdin_pipe = self.user_params.get("dut_fsa_stdin_pipe", None) + if dut_fsa_stdin_pipe is not None: + self.dut_fsa_stdin = open(dut_fsa_stdin_pipe, "w") + + self.th_server_port = 5544 + self.th_server_discriminator = self.th_fsa_bridge_discriminator + 1 + self.th_server_passcode = 20202021 + + # Start the TH_SERVER_NO_UID app. + self.th_server = AppServer( + th_server_app, + storage_dir=self.storage.name, + port=self.th_server_port, + discriminator=self.th_server_discriminator, + passcode=self.th_server_passcode) + + def teardown_class(self): + if self.th_fsa_controller is not None: + self.th_fsa_controller.stop() + if self.th_server is not None: + self.th_server.stop() + if self.storage is not None: + self.storage.cleanup() + super().teardown_class() + + def steps_TC_MCORE_FS_1_4(self) -> list[TestStep]: + return [ + TestStep(0, "Commission DUT if not done", is_commissioning=True), + TestStep(1, "TH commissions TH_SERVER_NO_UID to TH's fabric.", + "TH verifies that the TH_SERVER_NO_UID does not provide a UniqueID."), + TestStep(2, "TH instructs TH_FSA to commission TH_SERVER_NO_UID to TH_FSA's fabric."), + TestStep(3, "TH instructs TH_FSA to open up commissioning window on it's aggregator."), + TestStep(4, "Follow manufacturer provided instructions to have DUT_FSA commission TH_FSA's aggregator."), + TestStep(5, "Follow manufacturer provided instructions to enable DUT_FSA to synchronize TH_SERVER_NO_UID" + " from TH_FSA onto DUT_FSA's fabric. TH to provide endpoint saved from step 2 in user prompt."), + TestStep(6, "DUT_FSA synchronizes TH_SERVER_NO_UID onto DUT_FSA's fabric and copies the UniqueID presented" + " by TH_FSA's Bridged Device Basic Information Cluster."), + ] + + @async_test_body + async def test_TC_MCORE_FS_1_4(self): + self.is_ci = self.check_pics('PICS_SDK_CI_ONLY') + + # Commissioning - done + self.step(0) + + self.step(1) + + th_server_th_node_id = 1 + await self.default_controller.CommissionOnNetwork( + nodeId=th_server_th_node_id, + setupPinCode=self.th_server_passcode, + filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, + filter=self.th_server_discriminator, + ) + + await self.read_single_attribute_expect_error( + cluster=Clusters.BasicInformation, + attribute=Clusters.BasicInformation.Attributes.UniqueID, + node_id=th_server_th_node_id, + error=Status.UnsupportedAttribute, + ) + + self.step(2) + + th_fsa_bridge_th_node_id = 2 + # Commissioning TH_FSA_BRIDGE to TH fabric. + await self.default_controller.CommissionOnNetwork( + nodeId=th_fsa_bridge_th_node_id, + setupPinCode=self.th_fsa_bridge_passcode, + filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, + filter=self.th_fsa_bridge_discriminator, + ) + + # Get the list of endpoints on the TH_FSA_BRIDGE before adding the TH_SERVER_NO_UID. + th_fsa_bridge_endpoints = set(await self.read_single_attribute_check_success( + cluster=Clusters.Descriptor, + attribute=Clusters.Descriptor.Attributes.PartsList, + node_id=th_fsa_bridge_th_node_id, + endpoint=0, + )) + + discriminator = random.randint(0, 4095) + # Open commissioning window on TH_SERVER_NO_UID. + params = await self.default_controller.OpenCommissioningWindow( + nodeid=th_server_th_node_id, + option=self.default_controller.CommissioningWindowPasscode.kTokenWithRandomPin, + discriminator=discriminator, + iteration=10000, + timeout=600) + + th_server_th_fsa_node_id = 3 + # Commissioning TH_SERVER_NO_UID to TH_FSA. + self.th_fsa_controller.commission_on_network( + node_id=th_server_th_fsa_node_id, + setup_pin_code=params.setupPinCode, + filter_type=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, + filter=discriminator, + ) + + # Wait some time, so the dynamic endpoint will appear on the TH_FSA_BRIDGE. + await asyncio.sleep(5) + + # Get the list of endpoints on the TH_FSA_BRIDGE after adding the TH_SERVER_NO_UID. + th_fsa_bridge_endpoints_new = set(await self.read_single_attribute_check_success( + cluster=Clusters.Descriptor, + attribute=Clusters.Descriptor.Attributes.PartsList, + node_id=th_fsa_bridge_th_node_id, + endpoint=0, + )) + + # Get the endpoint number for just added TH_SERVER_NO_UID. + logging.info("Endpoints on TH_FSA_BRIDGE: old=%s, new=%s", th_fsa_bridge_endpoints, th_fsa_bridge_endpoints_new) + asserts.assert_true(th_fsa_bridge_endpoints_new.issuperset(th_fsa_bridge_endpoints), + "Expected only new endpoints to be added") + unique_endpoints_set = th_fsa_bridge_endpoints_new - th_fsa_bridge_endpoints + asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint") + th_fsa_bridge_th_server_endpoint = list(unique_endpoints_set)[0] + + # Verify that TH_FSA created a UniqueID for TH_SERVER_NO_UID. + th_fsa_bridge_th_server_unique_id = await self.read_single_attribute_check_success( + cluster=Clusters.BridgedDeviceBasicInformation, + attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID, + node_id=th_fsa_bridge_th_node_id, + endpoint=th_fsa_bridge_th_server_endpoint) + asserts.assert_true(type_matches(th_fsa_bridge_th_server_unique_id, str), "UniqueID should be a string") + asserts.assert_true(th_fsa_bridge_th_server_unique_id, "UniqueID should not be an empty string") + logging.info("UniqueID generated for TH_SERVER_NO_UID: %s", th_fsa_bridge_th_server_unique_id) + + self.step(3) + + discriminator = random.randint(0, 4095) + # Open commissioning window on TH_FSA_BRIDGE. + params = await self.default_controller.OpenCommissioningWindow( + nodeid=th_fsa_bridge_th_node_id, + option=self.default_controller.CommissioningWindowPasscode.kTokenWithRandomPin, + discriminator=discriminator, + iteration=10000, + timeout=600) + + self.step(4) + + # Commissioning TH_FSA_BRIDGE to DUT_FSA fabric. + if not self.is_ci: + self.wait_for_user_input( + f"Commission TH_FSA's aggregator on DUT using manufacturer specified mechanism.\n" + f"Use the following parameters:\n" + f"- discriminator: {discriminator}\n" + f"- setupPinCode: {params.setupPinCode}\n" + f"- setupQRCode: {params.setupQRCode}\n" + f"- setupManualCode: {params.setupManualCode}\n" + f"If using FabricSync Admin, you may type:\n" + f">>> fabricsync add-bridge {params.setupPinCode} {self.th_fsa_bridge_port}") + else: + self.dut_fsa_stdin.write( + f"fabricsync add-bridge 10 {params.setupPinCode} {self.th_fsa_bridge_address} {self.th_fsa_bridge_port}\n") + self.dut_fsa_stdin.flush() + # Wait for the commissioning to complete. + await asyncio.sleep(5) + + self.step(5) + + # Get the list of endpoints on the DUT_FSA_BRIDGE before synchronization. + dut_fsa_bridge_endpoints = set(await self.read_single_attribute_check_success( + cluster=Clusters.Descriptor, + attribute=Clusters.Descriptor.Attributes.PartsList, + node_id=self.dut_node_id, + endpoint=0, + )) + + # Synchronize TH_SERVER_NO_UID from TH_FSA to DUT_FSA fabric. + if not self.is_ci: + self.wait_for_user_input( + f"Synchronize endpoint from TH_FSA's aggregator to DUT using manufacturer specified mechanism.\n" + f"Use the following parameters:\n" + f"- endpointID: {th_fsa_bridge_th_server_endpoint}\n" + f"If using FabricSync Admin, you may type:\n" + f">>> fabricsync sync-device {th_fsa_bridge_th_server_endpoint}") + else: + self.dut_fsa_stdin.write(f"fabricsync sync-device {th_fsa_bridge_th_server_endpoint}\n") + self.dut_fsa_stdin.flush() + # Wait for the synchronization to complete. + await asyncio.sleep(5) + + self.step(6) + + # Get the list of endpoints on the DUT_FSA_BRIDGE after synchronization + dut_fsa_bridge_endpoints_new = set(await self.read_single_attribute_check_success( + cluster=Clusters.Descriptor, + attribute=Clusters.Descriptor.Attributes.PartsList, + node_id=self.dut_node_id, + endpoint=0, + )) + + # Get the endpoint number for just synced TH_SERVER_NO_UID. + logging.info("Endpoints on DUT_FSA_BRIDGE: old=%s, new=%s", dut_fsa_bridge_endpoints, dut_fsa_bridge_endpoints_new) + asserts.assert_true(dut_fsa_bridge_endpoints_new.issuperset(dut_fsa_bridge_endpoints), + "Expected only new endpoints to be added") + unique_endpoints_set = dut_fsa_bridge_endpoints_new - dut_fsa_bridge_endpoints + asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint on DUT_FSA") + dut_fsa_bridge_th_server_endpoint = list(unique_endpoints_set)[0] + + # Verify that DUT_FSA copied the TH_SERVER_NO_UID UniqueID from TH_FSA. + dut_fsa_bridge_th_server_unique_id = await self.read_single_attribute_check_success( + cluster=Clusters.BridgedDeviceBasicInformation, + attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID, + endpoint=dut_fsa_bridge_th_server_endpoint) + asserts.assert_true(type_matches(dut_fsa_bridge_th_server_unique_id, str), "UniqueID should be a string") + asserts.assert_true(dut_fsa_bridge_th_server_unique_id, "UniqueID should not be an empty string") + logging.info("UniqueID for TH_SERVER_NO_UID on DUT_FSA: %s", th_fsa_bridge_th_server_unique_id) + + # Make sure that the UniqueID on the DUT_FSA_BRIDGE is the same as the one on the DUT_FSA_BRIDGE. + asserts.assert_equal(dut_fsa_bridge_th_server_unique_id, th_fsa_bridge_th_server_unique_id, + "UniqueID on DUT_FSA and TH_FSA should be the same") + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/execute_python_tests.py b/src/python_testing/execute_python_tests.py index 7e7d7d501e81b2..1b56432b447b5b 100644 --- a/src/python_testing/execute_python_tests.py +++ b/src/python_testing/execute_python_tests.py @@ -75,6 +75,7 @@ def main(search_directory, env_file): "TC_MCORE_FS_1_1.py", "TC_MCORE_FS_1_2.py", "TC_MCORE_FS_1_3.py", + "TC_MCORE_FS_1_4.py", "TC_MCORE_FS_1_5.py", "TC_OCC_3_1.py", "TC_OCC_3_2.py",