Skip to content

Commit

Permalink
Add TC_MCORE_FS_1_3.py test implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tehampson committed Jul 30, 2024
1 parent b6cfeac commit 40a0b94
Showing 1 changed file with 157 additions and 0 deletions.
157 changes: 157 additions & 0 deletions src/python_testing/TC_MCORE_FS_1_3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import os
import pathlib
import random
import signal
import subprocess
import time
import uuid

import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches
from mobly import asserts

class TC_MCORE_FS_1_3(MatterBaseTest):
@async_test_body
async def setup_class(self):
super().setup_class()
self.device_for_th_eco_kvs = None
self.device_for_th_eco_port = 5543
self.app_process_for_th_eco = None

self.device_for_dut_eco_kvs = None
self.device_for_dut_eco_port = 5544
self.app_process_for_dut_eco = None


def teardown_class(self):
logging.warning("Stopping app with SIGTERM")
self.app_process_for_dut_eco.send_signal(signal.SIGTERM.value)
self.app_process_for_dut_eco.wait()
# TODO: Use timeout, if term doesn't work, try SIGINT
if self.app_process_for_th_eco is not None:
self.app_process_for_th_eco.send_signal(signal.SIGTERM.value)
self.app_process_for_th_eco.wait()

os.remove(self.device_for_dut_eco_kvs)
if self.device_for_th_eco_kvs is not None:
os.remove(self.device_for_th_eco_kvs)
super().teardown_class()


async def create_device_for_dut_ecosystem(self):
# TODO: This needs to come from an arg and needs to be something available on the TH
# TODO: confirm whether we can open processes like this on the TH
app = os.path.join(pathlib.Path(__file__).resolve().parent, '..', '..', 'out',
'linux-x64-all-clusters-no-ble-clang', 'chip-all-clusters-app-no-unique-id')

self.device_for_dut_eco_kvs = f'kvs_{str(uuid.uuid4())}'
discriminator = random.randint(0, 4095)
discriminator = 3840
passcode = 20202021
app_args = f'--secured-device-port {self.device_for_dut_eco_port} --discriminator {discriminator} --passcode {passcode} --KVS {self.device_for_dut_eco_kvs}'
cmd = f'{app} {app_args}'
# TODO: Determine if we want these logs cooked or pushed to somewhere else
logging.info("Starting TH device for DUT ecosystem")
self.app_process_for_dut_eco = subprocess.Popen(cmd, bufsize=0, shell=True)
logging.info("Started TH device for DUT ecosystem")
time.sleep(3)
return discriminator, passcode


async def create_and_commission_device_for_th_ecosystem(self):
# TODO: This needs to come from an arg and needs to be something available on the TH
# TODO: confirm whether we can open processes like this on the TH
app = os.path.join(pathlib.Path(__file__).resolve().parent, '..', '..', 'out',
'linux-x64-all-clusters-no-ble-clang', 'chip-all-clusters-app-no-unique-id')

self.device_for_th_eco_kvs = f'kvs_{str(uuid.uuid4())}'
discriminator = random.randint(0, 4095)
discriminator = 3840
passcode = 20202021
app_args = f'--secured-device-port {self.device_for_th_eco_port} --discriminator {discriminator} --passcode {passcode} --KVS {self.device_for_th_eco_kvs}'
cmd = f'{app} {app_args}'
# TODO: Determine if we want these logs cooked or pushed to somewhere else
logging.info("Starting TH device for TH ecosystem")
self.app_process_for_th_eco = subprocess.Popen(cmd, bufsize=0, shell=True)
logging.info("Started TH device for TH ecosystem")
time.sleep(3)

logging.info("Commissioning from separate fabric")
# Create a second controller on a new fabric to communicate to the server
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
paa_path = str(self.matter_test_config.paa_trust_store_path)
self.TH_server_controller = new_fabric_admin.NewController(nodeId=112233, paaTrustStorePath=paa_path)
self.server_nodeid = 1111
await self.TH_server_controller.CommissionOnNetwork(nodeId=self.server_nodeid, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator)
logging.info("Commissioning TH device for TH ecosystem")

def steps_TC_MCORE_FS_1_3(self) -> list[TestStep]:
steps = [TestStep(1, "DUT_FSA commissions TH_SED_DUT to DUT_FSAs fabric and generates a UniqueID", is_commissioning=True),
TestStep(2, "TH_FSA commissions TH_SED_TH onto TH_FSAs fabric and generates a UniqueID."),
TestStep(3, "Follow manufacturer provided instructions to enable DUT_FSA to synchronize TH_SED_TH onto DUT_FSAs fabric."),
TestStep(4, "DUT_FSA synchronizes TH_SED_TH onto DUT_FSAs fabric and copies the UniqueID presented by TH_FSAs Bridged Device Basic Information Cluster.")]
return steps

async def test_TC_MCORE_FS_1_3(self):
self.is_ci = self.check_pics('PICS_SDK_CI_ONLY')
self.print_step(0, "Commissioning DUT to TH, already done")
self.step(1)
# These steps are not explicitly in step 1, but they help identify the dynamically added endpoint in step 1.
root_node_endpoint = 0
root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint)
set_of_endpoints_before_adding_device = set(root_part_list[root_node_endpoint]
[Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList])

discriminator, setup_pin_code = await self.create_device_for_dut_ecosystem()
self.wait_for_user_input(
prompt_msg=f"Using the DUT vendor's provided interface, commission the device using the following parameters:\n"
f"- discriminator: {discriminator}\n"
f"- setupPinCode: {setup_pin_code}\n"
f"If using FabricSync Admin, you may type:\n"
f">>> pairing onnetwork 111 {setup_pin_code}")

root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint)
set_of_endpoints_after_adding_device = set(root_part_list[root_node_endpoint]
[Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList])

asserts.assert_true(set_of_endpoints_after_adding_device.issuperset(set_of_endpoints_before_adding_device), "Expected only new endpoints to be added")
unique_endpoints_set = set_of_endpoints_after_adding_device - set_of_endpoints_before_adding_device
asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint")
newly_added_endpoint = list(unique_endpoints_set)[0]

th_sed_dut_unique_id = await self.read_single_attribute_check_success(cluster=Clusters.BasicInformation, attribute=Clusters.BasicInformation.Attributes.UniqueID, endpoint=newly_added_endpoint)
asserts.assert_true(type_matches(th_sed_dut_unique_id, str), "UniqueID should be a string")
asserts.assert_true(th_sed_dut_unique_id, "UniqueID should not be an empty string")

self.step(2)
await self.create_and_commission_device_for_th_ecosystem()
# TODO During setup we need to create the TH_FSA device where we would commission device created
# in create_and_commission_device_for_th_ecosystem to be commissioned into TH_FSA.

# TODO Because we cannot create a TH_FSA and there is no way to mock it the following 2 test steps are skipped for now.
self.skip_step(3)
self.skip_step(4)


if __name__ == "__main__":
default_matter_test_main()

0 comments on commit 40a0b94

Please sign in to comment.