From cb08ef628be2cdca97327d740ea9a96920d90845 Mon Sep 17 00:00:00 2001 From: Cecille Freeman Date: Tue, 23 Apr 2024 13:39:43 -0400 Subject: [PATCH 1/6] TC-DA-1.7: Use steps_function, add missing checks Also adding in a new flag that will let us use this test in post-cert (just introducing flag now, checks will be updated in an upcoming PR). --- src/python_testing/TC_DA_1_7.py | 88 ++++++++++++++++++++++++++------- 1 file changed, 71 insertions(+), 17 deletions(-) diff --git a/src/python_testing/TC_DA_1_7.py b/src/python_testing/TC_DA_1_7.py index 634dbb8885da2c..71a669ff18e7ce 100644 --- a/src/python_testing/TC_DA_1_7.py +++ b/src/python_testing/TC_DA_1_7.py @@ -18,7 +18,7 @@ import logging from glob import glob from pathlib import Path -from typing import Optional +from typing import List, Optional import chip.clusters as Clusters from cryptography.exceptions import InvalidSignature @@ -26,7 +26,7 @@ from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from cryptography.x509 import AuthorityKeyIdentifier, Certificate, SubjectKeyIdentifier, load_der_x509_certificate -from matter_testing_support import MatterBaseTest, async_test_body, bytes_from_hex, default_matter_test_main, hex_from_bytes +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, bytes_from_hex, default_matter_test_main, hex_from_bytes from mobly import asserts # Those are SDK samples that are known to be non-production. @@ -107,20 +107,64 @@ class TC_DA_1_7(MatterBaseTest): --script-args "--storage-path admin_storage.json --commissioning-method on-network \ --discriminator 12 34 --passcode 20202021 20202021 --bool-arg allow_sdk_dac:true" ''' - @async_test_body - async def test_TC_DA_1_7(self): + def expected_number_of_DUTs(self): # For real tests, we require more than one DUT # On the CI, this doesn't make sense to do since all the examples use the same DAC # To specify more than 1 DUT, use a list of discriminators and passcodes allow_sdk_dac = self.user_params.get("allow_sdk_dac", False) - if allow_sdk_dac: - asserts.assert_equal(len(self.matter_test_config.discriminators), 1, "Only one device can be tested with SDK DAC") - if not allow_sdk_dac: - asserts.assert_equal(len(self.matter_test_config.discriminators), 2, "This test requires 2 DUTs") - pk = [] + post_cert_test = self.user_params.get("post_cert_test", False) + if allow_sdk_dac or post_cert_test: + return 1 + return 2 + + def steps_one_dut(self, DUT: int) -> List[TestStep]: + return [TestStep(f'{DUT}', f'Test DUT{DUT} DAC chain as follows:'), + TestStep(f'{DUT}.1', f'TH sends CertificateChainRequest Command to DUT{DUT} with the CertificateType set to PAICertificate', + 'Verify that the DUT returns a CertificateChainResponse. Save the returned Certificate as `pai_cert`.'), + TestStep(f'{DUT}.2', 'TH sends CertificateChainRequest Command to DUT1 with the CertificateType set to DACCertificate', + 'Verify that the DUT returns a CertificateChainResponse. Save the returned Certificate as `dac_cert`.'), + TestStep(f'{DUT}.3', 'TH extracts the Authority Key Identifier from the PAI certificate', + ('* Verify that the Authority Key Identifier is signed by a PAA in the DCL. (Ensure that it is not SDK’s test PAAs)\n' + '* Verify that PAI authority key ID must not be one of:\n' + ' - 78: 5C: E7: 05: B8: 6B: 8F: 4E: 6F: C7: 93: AA: 60: CB: 43: EA: 69: 68:82: D5\n' + ' - 6A: FD: 22: 77: 1F: 51: 1F: EC: BF: 16: 41: 97: 67: 10: DC: DC: 31: A1: 71: 7E\n' + '* Save the selected PAA certificate as `paa_cert`\n')), + TestStep(f'{DUT}.4', 'TH extracts ASN.1 DER bytes for the entire issuer field of `dac_cert` and subject field of `pai_cert`.', + 'Verify that the `dac_cert` `issuer field is byte-forbyte equivalent to the `pai_cert`subject field.'), + TestStep(f'{DUT}.5', 'TH extracts ASN.1 DER bytes for the entire issuer field of `pai_cert` and subject field of `paa_cert`.', + 'Verify that the `pai_cert` issuer field is byte-forbyte equivalent to the `paa_cert` subject field.'), + TestStep(f'{DUT}.6', f'TH extracts the public key from the DAC and saves as `pk_{DUT}`.') + ] + + def steps_TC_DA_1_7(self): + steps = [TestStep(0, "Commission DUT if not already done", is_commissioning=True)] + steps += self.steps_one_dut(1) + if self.expected_number_of_DUTs() == 2: + steps += self.steps_one_dut(2) + steps += [TestStep(3, "Verify that `pk_1` is not equal to `pk_2", + "Public keys do not match")] + return steps + + @async_test_body + async def test_TC_DA_1_7(self): + # post_cert_tests (or sdk) can use the qr or manual code + # We don't currently support this in cert because the base doesn't support multiple QR/manual + num= 0 + if self.matter_test_config.discriminators: + num += len(self.matter_test_config.discriminators) + if self.matter_test_config.qr_code_content: + num += 1 + if self.matter_test_config.manual_code: + num += 1 + asserts.assert_equal(num, self.expected_number_of_DUTs(), "Unexpected number of devices specified - this test expects two DUTs at cert, one device for CI") + pk= [] + # Commissioning - already done. + self.step(0) + for i in range(len(self.matter_test_config.dut_node_ids)): - pk.append(await self.single_DUT(i, self.matter_test_config.dut_node_ids[i])) + pk.append(await self.single_DUT(i+1, self.matter_test_config.dut_node_ids[i])) + self.step(3) asserts.assert_equal(len(pk), len(set(pk)), "Found matching public keys in different DUTs") async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes: @@ -132,10 +176,11 @@ async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes: paa_by_skid = load_all_paa(conf.paa_trust_store_path) logging.info("Found %d PAAs" % len(paa_by_skid)) - logging.info("DUT {} Step 1: Commissioning, already done".format(dut_index)) + # Test plan step introducing test for each DUT + self.step(f'{dut_index}') dev_ctrl = self.default_controller - logging.info("DUT {} Step 2: Get PAI of DUT1 with certificate chain request".format(dut_index)) + self.step(f'{dut_index}.1') result = await dev_ctrl.SendCommand(dut_node_id, 0, Clusters.OperationalCredentials.Commands.CertificateChainRequest(2)) pai = result.certificate @@ -143,7 +188,7 @@ async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes: key = 'pai_{}'.format(dut_index) self.record_data({key: hex_from_bytes(pai)}) - logging.info("DUT {} Step 3: Get DAC of DUT1 with certificate chain request".format(dut_index)) + self.step(f'{dut_index}.2') result = await dev_ctrl.SendCommand(dut_node_id, 0, Clusters.OperationalCredentials.Commands.CertificateChainRequest(1)) dac = result.certificate @@ -151,7 +196,8 @@ async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes: key = 'dac_{}'.format(dut_index) self.record_data({key: hex_from_bytes(dac)}) - logging.info("DUT {} Step 4 check 1: Ensure PAI's AKID matches a PAA and signature is valid".format(dut_index)) + self.step(f'{dut_index}.3') + logging.info("DUT {} Step 3 check 1: Ensure PAI's AKID matches a PAA and signature is valid".format(dut_index)) pai_cert = load_der_x509_certificate(pai) pai_akid = extract_akid(pai_cert) if pai_akid not in paa_by_skid: @@ -168,15 +214,23 @@ async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes: asserts.fail("DUT %d: Failed to verify PAI signature against PAA public key: %s" % (dut_index, str(e))) logging.info("Validated PAI signature against PAA") - logging.info("DUT {} Step 4 check 2: Verify PAI AKID not in denylist of SDK PAIs".format(dut_index)) + logging.info("DUT {} Step 3 check 2: Verify PAI AKID not in denylist of SDK PAIs".format(dut_index)) if allow_sdk_dac: - logging.warn("===> TEST STEP SKIPPED: Allowing SDK DACs!") + logging.warning("===> TEST STEP SKIPPED: Allowing SDK DACs!") else: for candidate in FORBIDDEN_AKID: asserts.assert_not_equal(hex_from_bytes(pai_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist") - logging.info("DUT {} Step 5: Extract subject public key of DAC and save".format(dut_index)) + self.step(f'{dut_index}.4') + # dac issuer == pai subject dac_cert = load_der_x509_certificate(dac) + asserts.assert_equal(dac_cert.issuer, pai_cert.subject, "DAC issuer does not match PAI subject") + + self.step(f'{dut_index}.5') + # pai issues == paa subject + asserts.assert_equal(pai_cert.issuer, paa_cert.subject, "PAI issuer does not match PAA subject") + + self.step(f'{dut_index}.6') pk = dac_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint) logging.info("Subject public key pk: %s" % hex_from_bytes(pk)) key = 'pk_{}'.format(dut_index) From 5fbaa03b0c42c3aa0a1f249c1bcf7df985a64393 Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Tue, 23 Apr 2024 17:43:04 +0000 Subject: [PATCH 2/6] Restyled by autopep8 --- src/python_testing/TC_DA_1_7.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/python_testing/TC_DA_1_7.py b/src/python_testing/TC_DA_1_7.py index 71a669ff18e7ce..7b45d5fea4fc14 100644 --- a/src/python_testing/TC_DA_1_7.py +++ b/src/python_testing/TC_DA_1_7.py @@ -107,6 +107,7 @@ class TC_DA_1_7(MatterBaseTest): --script-args "--storage-path admin_storage.json --commissioning-method on-network \ --discriminator 12 34 --passcode 20202021 20202021 --bool-arg allow_sdk_dac:true" ''' + def expected_number_of_DUTs(self): # For real tests, we require more than one DUT # On the CI, this doesn't make sense to do since all the examples use the same DAC @@ -132,9 +133,9 @@ def steps_one_dut(self, DUT: int) -> List[TestStep]: TestStep(f'{DUT}.4', 'TH extracts ASN.1 DER bytes for the entire issuer field of `dac_cert` and subject field of `pai_cert`.', 'Verify that the `dac_cert` `issuer field is byte-forbyte equivalent to the `pai_cert`subject field.'), TestStep(f'{DUT}.5', 'TH extracts ASN.1 DER bytes for the entire issuer field of `pai_cert` and subject field of `paa_cert`.', - 'Verify that the `pai_cert` issuer field is byte-forbyte equivalent to the `paa_cert` subject field.'), + 'Verify that the `pai_cert` issuer field is byte-forbyte equivalent to the `paa_cert` subject field.'), TestStep(f'{DUT}.6', f'TH extracts the public key from the DAC and saves as `pk_{DUT}`.') - ] + ] def steps_TC_DA_1_7(self): steps = [TestStep(0, "Commission DUT if not already done", is_commissioning=True)] @@ -149,15 +150,16 @@ def steps_TC_DA_1_7(self): async def test_TC_DA_1_7(self): # post_cert_tests (or sdk) can use the qr or manual code # We don't currently support this in cert because the base doesn't support multiple QR/manual - num= 0 + num = 0 if self.matter_test_config.discriminators: num += len(self.matter_test_config.discriminators) if self.matter_test_config.qr_code_content: num += 1 if self.matter_test_config.manual_code: num += 1 - asserts.assert_equal(num, self.expected_number_of_DUTs(), "Unexpected number of devices specified - this test expects two DUTs at cert, one device for CI") - pk= [] + asserts.assert_equal(num, self.expected_number_of_DUTs(), + "Unexpected number of devices specified - this test expects two DUTs at cert, one device for CI") + pk = [] # Commissioning - already done. self.step(0) From b34d869d46c8b0249f9c06b0a08000704052dac3 Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Tue, 23 Apr 2024 17:43:05 +0000 Subject: [PATCH 3/6] Restyled by isort --- src/python_testing/TC_DA_1_7.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/python_testing/TC_DA_1_7.py b/src/python_testing/TC_DA_1_7.py index 7b45d5fea4fc14..323dce1ebb6c8e 100644 --- a/src/python_testing/TC_DA_1_7.py +++ b/src/python_testing/TC_DA_1_7.py @@ -26,7 +26,8 @@ from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from cryptography.x509 import AuthorityKeyIdentifier, Certificate, SubjectKeyIdentifier, load_der_x509_certificate -from matter_testing_support import MatterBaseTest, TestStep, async_test_body, bytes_from_hex, default_matter_test_main, hex_from_bytes +from matter_testing_support import (MatterBaseTest, TestStep, async_test_body, bytes_from_hex, default_matter_test_main, + hex_from_bytes) from mobly import asserts # Those are SDK samples that are known to be non-production. From c16e08a086780399f9da5a9f3ca26e8d62cfd479 Mon Sep 17 00:00:00 2001 From: cecille Date: Mon, 29 Apr 2024 17:08:16 -0400 Subject: [PATCH 4/6] Make error message more specific --- src/python_testing/TC_DA_1_7.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/python_testing/TC_DA_1_7.py b/src/python_testing/TC_DA_1_7.py index 323dce1ebb6c8e..bac54315dd66bd 100644 --- a/src/python_testing/TC_DA_1_7.py +++ b/src/python_testing/TC_DA_1_7.py @@ -109,15 +109,12 @@ class TC_DA_1_7(MatterBaseTest): --discriminator 12 34 --passcode 20202021 20202021 --bool-arg allow_sdk_dac:true" ''' - def expected_number_of_DUTs(self): - # For real tests, we require more than one DUT - # On the CI, this doesn't make sense to do since all the examples use the same DAC - # To specify more than 1 DUT, use a list of discriminators and passcodes - allow_sdk_dac = self.user_params.get("allow_sdk_dac", False) - post_cert_test = self.user_params.get("post_cert_test", False) - if allow_sdk_dac or post_cert_test: - return 1 - return 2 + def setup_class(self): + self.allow_sdk_dac = self.user_params.get("allow_sdk_dac", False) + self.post_cert_test = self.user_params.get("post_cert_test", False) + + def expected_number_of_DUTs() -> int: + return 1 if (self.allow_sdk_dac or self.post_cert_test) else 2 def steps_one_dut(self, DUT: int) -> List[TestStep]: return [TestStep(f'{DUT}', f'Test DUT{DUT} DAC chain as follows:'), @@ -158,8 +155,16 @@ async def test_TC_DA_1_7(self): num += 1 if self.matter_test_config.manual_code: num += 1 - asserts.assert_equal(num, self.expected_number_of_DUTs(), - "Unexpected number of devices specified - this test expects two DUTs at cert, one device for CI") + + if num != expected_num: + if self.allow_sdk_dac: + msg = "The allow_sdk_dac flag is only for use in CI. When using this test in CI, please specify a single discriminator, manual-code or qr-code-content" + elif self.post_cert_test: + msg = "The post_cert_test flag is only for use post-certification. When using this flag, please specify a single discriminator, manual-code or qr-code-content" + else: + msg = "This test requires two devices for use at certification. Please specify two device discriminators ex. --discriminator 1234 5678" + asserts.fail(msg) + pk = [] # Commissioning - already done. self.step(0) From 77e791072deade7e6f91ada72b4bfc827e0abe2f Mon Sep 17 00:00:00 2001 From: cecille Date: Mon, 29 Apr 2024 17:13:24 -0400 Subject: [PATCH 5/6] whoops, missed some --- src/python_testing/TC_DA_1_7.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/python_testing/TC_DA_1_7.py b/src/python_testing/TC_DA_1_7.py index bac54315dd66bd..9125452805ce0d 100644 --- a/src/python_testing/TC_DA_1_7.py +++ b/src/python_testing/TC_DA_1_7.py @@ -113,7 +113,7 @@ def setup_class(self): self.allow_sdk_dac = self.user_params.get("allow_sdk_dac", False) self.post_cert_test = self.user_params.get("post_cert_test", False) - def expected_number_of_DUTs() -> int: + def expected_number_of_DUTs(self) -> int: return 1 if (self.allow_sdk_dac or self.post_cert_test) else 2 def steps_one_dut(self, DUT: int) -> List[TestStep]: @@ -156,7 +156,7 @@ async def test_TC_DA_1_7(self): if self.matter_test_config.manual_code: num += 1 - if num != expected_num: + if num != self.expected_number_of_DUTs(): if self.allow_sdk_dac: msg = "The allow_sdk_dac flag is only for use in CI. When using this test in CI, please specify a single discriminator, manual-code or qr-code-content" elif self.post_cert_test: From 0cf621c2624f3480f1d70e5deb63c21de737f51c Mon Sep 17 00:00:00 2001 From: cecille Date: Mon, 29 Apr 2024 17:37:15 -0400 Subject: [PATCH 6/6] Use the flag from setup --- src/python_testing/TC_DA_1_7.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/python_testing/TC_DA_1_7.py b/src/python_testing/TC_DA_1_7.py index 9125452805ce0d..1d23883237eeeb 100644 --- a/src/python_testing/TC_DA_1_7.py +++ b/src/python_testing/TC_DA_1_7.py @@ -176,9 +176,6 @@ async def test_TC_DA_1_7(self): asserts.assert_equal(len(pk), len(set(pk)), "Found matching public keys in different DUTs") async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes: - # Option to allow SDK roots (skip step 4 check 2) - allow_sdk_dac = self.user_params.get("allow_sdk_dac", False) - logging.info("Pre-condition: load all PAAs SKIDs") conf = self.matter_test_config paa_by_skid = load_all_paa(conf.paa_trust_store_path) @@ -223,7 +220,7 @@ async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes: logging.info("Validated PAI signature against PAA") logging.info("DUT {} Step 3 check 2: Verify PAI AKID not in denylist of SDK PAIs".format(dut_index)) - if allow_sdk_dac: + if self.allow_sdk_dac: logging.warning("===> TEST STEP SKIPPED: Allowing SDK DACs!") else: for candidate in FORBIDDEN_AKID: