Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TC-DA-1.7: Use steps_function, add missing checks #33117

Merged
merged 6 commits into from
Apr 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 81 additions & 22 deletions src/python_testing/TC_DA_1_7.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
import logging
from glob import glob
from pathlib import Path
from typing import Optional
from typing import List, Optional

import chip.clusters as Clusters
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.x509 import AuthorityKeyIdentifier, Certificate, SubjectKeyIdentifier, load_der_x509_certificate
from matter_testing_support import MatterBaseTest, async_test_body, bytes_from_hex, default_matter_test_main, hex_from_bytes
from matter_testing_support import (MatterBaseTest, TestStep, async_test_body, bytes_from_hex, default_matter_test_main,
hex_from_bytes)
from mobly import asserts

# Those are SDK samples that are known to be non-production.
Expand Down Expand Up @@ -107,51 +108,101 @@ class TC_DA_1_7(MatterBaseTest):
--script-args "--storage-path admin_storage.json --commissioning-method on-network \
--discriminator 12 34 --passcode 20202021 20202021 --bool-arg allow_sdk_dac:true"
'''

def setup_class(self):
self.allow_sdk_dac = self.user_params.get("allow_sdk_dac", False)
self.post_cert_test = self.user_params.get("post_cert_test", False)

def expected_number_of_DUTs(self) -> int:
return 1 if (self.allow_sdk_dac or self.post_cert_test) else 2

def steps_one_dut(self, DUT: int) -> List[TestStep]:
return [TestStep(f'{DUT}', f'Test DUT{DUT} DAC chain as follows:'),
TestStep(f'{DUT}.1', f'TH sends CertificateChainRequest Command to DUT{DUT} with the CertificateType set to PAICertificate',
'Verify that the DUT returns a CertificateChainResponse. Save the returned Certificate as `pai_cert`.'),
TestStep(f'{DUT}.2', 'TH sends CertificateChainRequest Command to DUT1 with the CertificateType set to DACCertificate',
'Verify that the DUT returns a CertificateChainResponse. Save the returned Certificate as `dac_cert`.'),
TestStep(f'{DUT}.3', 'TH extracts the Authority Key Identifier from the PAI certificate',
('* Verify that the Authority Key Identifier is signed by a PAA in the DCL. (Ensure that it is not SDK’s test PAAs)\n'
'* Verify that PAI authority key ID must not be one of:\n'
' - 78: 5C: E7: 05: B8: 6B: 8F: 4E: 6F: C7: 93: AA: 60: CB: 43: EA: 69: 68:82: D5\n'
' - 6A: FD: 22: 77: 1F: 51: 1F: EC: BF: 16: 41: 97: 67: 10: DC: DC: 31: A1: 71: 7E\n'
'* Save the selected PAA certificate as `paa_cert`\n')),
TestStep(f'{DUT}.4', 'TH extracts ASN.1 DER bytes for the entire issuer field of `dac_cert` and subject field of `pai_cert`.',
'Verify that the `dac_cert` `issuer field is byte-forbyte equivalent to the `pai_cert`subject field.'),
TestStep(f'{DUT}.5', 'TH extracts ASN.1 DER bytes for the entire issuer field of `pai_cert` and subject field of `paa_cert`.',
'Verify that the `pai_cert` issuer field is byte-forbyte equivalent to the `paa_cert` subject field.'),
TestStep(f'{DUT}.6', f'TH extracts the public key from the DAC and saves as `pk_{DUT}`.')
]

def steps_TC_DA_1_7(self):
steps = [TestStep(0, "Commission DUT if not already done", is_commissioning=True)]
steps += self.steps_one_dut(1)
if self.expected_number_of_DUTs() == 2:
steps += self.steps_one_dut(2)
steps += [TestStep(3, "Verify that `pk_1` is not equal to `pk_2",
"Public keys do not match")]
return steps

@async_test_body
async def test_TC_DA_1_7(self):
# For real tests, we require more than one DUT
# On the CI, this doesn't make sense to do since all the examples use the same DAC
# To specify more than 1 DUT, use a list of discriminators and passcodes
allow_sdk_dac = self.user_params.get("allow_sdk_dac", False)
if allow_sdk_dac:
asserts.assert_equal(len(self.matter_test_config.discriminators), 1, "Only one device can be tested with SDK DAC")
if not allow_sdk_dac:
asserts.assert_equal(len(self.matter_test_config.discriminators), 2, "This test requires 2 DUTs")
# post_cert_tests (or sdk) can use the qr or manual code
# We don't currently support this in cert because the base doesn't support multiple QR/manual
num = 0
if self.matter_test_config.discriminators:
num += len(self.matter_test_config.discriminators)
if self.matter_test_config.qr_code_content:
num += 1
if self.matter_test_config.manual_code:
num += 1

if num != self.expected_number_of_DUTs():
if self.allow_sdk_dac:
msg = "The allow_sdk_dac flag is only for use in CI. When using this test in CI, please specify a single discriminator, manual-code or qr-code-content"
elif self.post_cert_test:
msg = "The post_cert_test flag is only for use post-certification. When using this flag, please specify a single discriminator, manual-code or qr-code-content"
else:
msg = "This test requires two devices for use at certification. Please specify two device discriminators ex. --discriminator 1234 5678"
asserts.fail(msg)

pk = []
# Commissioning - already done.
self.step(0)

for i in range(len(self.matter_test_config.dut_node_ids)):
pk.append(await self.single_DUT(i, self.matter_test_config.dut_node_ids[i]))
pk.append(await self.single_DUT(i+1, self.matter_test_config.dut_node_ids[i]))

self.step(3)
asserts.assert_equal(len(pk), len(set(pk)), "Found matching public keys in different DUTs")

async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes:
# Option to allow SDK roots (skip step 4 check 2)
allow_sdk_dac = self.user_params.get("allow_sdk_dac", False)

logging.info("Pre-condition: load all PAAs SKIDs")
conf = self.matter_test_config
paa_by_skid = load_all_paa(conf.paa_trust_store_path)
logging.info("Found %d PAAs" % len(paa_by_skid))

logging.info("DUT {} Step 1: Commissioning, already done".format(dut_index))
# Test plan step introducing test for each DUT
self.step(f'{dut_index}')
dev_ctrl = self.default_controller

logging.info("DUT {} Step 2: Get PAI of DUT1 with certificate chain request".format(dut_index))
self.step(f'{dut_index}.1')
result = await dev_ctrl.SendCommand(dut_node_id, 0,
Clusters.OperationalCredentials.Commands.CertificateChainRequest(2))
pai = result.certificate
asserts.assert_less_equal(len(pai), 600, "PAI cert must be at most 600 bytes")
key = 'pai_{}'.format(dut_index)
self.record_data({key: hex_from_bytes(pai)})

logging.info("DUT {} Step 3: Get DAC of DUT1 with certificate chain request".format(dut_index))
self.step(f'{dut_index}.2')
result = await dev_ctrl.SendCommand(dut_node_id, 0,
Clusters.OperationalCredentials.Commands.CertificateChainRequest(1))
dac = result.certificate
asserts.assert_less_equal(len(dac), 600, "DAC cert must be at most 600 bytes")
key = 'dac_{}'.format(dut_index)
self.record_data({key: hex_from_bytes(dac)})

logging.info("DUT {} Step 4 check 1: Ensure PAI's AKID matches a PAA and signature is valid".format(dut_index))
self.step(f'{dut_index}.3')
logging.info("DUT {} Step 3 check 1: Ensure PAI's AKID matches a PAA and signature is valid".format(dut_index))
pai_cert = load_der_x509_certificate(pai)
pai_akid = extract_akid(pai_cert)
if pai_akid not in paa_by_skid:
Expand All @@ -168,15 +219,23 @@ async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes:
asserts.fail("DUT %d: Failed to verify PAI signature against PAA public key: %s" % (dut_index, str(e)))
logging.info("Validated PAI signature against PAA")

logging.info("DUT {} Step 4 check 2: Verify PAI AKID not in denylist of SDK PAIs".format(dut_index))
if allow_sdk_dac:
logging.warn("===> TEST STEP SKIPPED: Allowing SDK DACs!")
logging.info("DUT {} Step 3 check 2: Verify PAI AKID not in denylist of SDK PAIs".format(dut_index))
if self.allow_sdk_dac:
logging.warning("===> TEST STEP SKIPPED: Allowing SDK DACs!")
else:
for candidate in FORBIDDEN_AKID:
asserts.assert_not_equal(hex_from_bytes(pai_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist")

logging.info("DUT {} Step 5: Extract subject public key of DAC and save".format(dut_index))
self.step(f'{dut_index}.4')
# dac issuer == pai subject
dac_cert = load_der_x509_certificate(dac)
asserts.assert_equal(dac_cert.issuer, pai_cert.subject, "DAC issuer does not match PAI subject")

self.step(f'{dut_index}.5')
# pai issues == paa subject
asserts.assert_equal(pai_cert.issuer, paa_cert.subject, "PAI issuer does not match PAA subject")

self.step(f'{dut_index}.6')
pk = dac_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint)
logging.info("Subject public key pk: %s" % hex_from_bytes(pk))
key = 'pk_{}'.format(dut_index)
Expand Down
Loading