diff --git a/docs/registration_policies.md b/docs/registration_policies.md index ed1bb3c3..42f84187 100644 --- a/docs/registration_policies.md +++ b/docs/registration_policies.md @@ -93,6 +93,7 @@ from jsonschema import validate, ValidationError from scitt_emulator.scitt import ClaimInvalidError, CWTClaims from scitt_emulator.verify_statement import verify_statement +from scitt_emulator.key_helpers import verification_key_to_object def main(): @@ -107,16 +108,22 @@ def main(): f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}" ) - cwt_cose_key, _pycose_cose_key = verify_statement(msg) + verification_key = verify_statement(msg) unittest.TestCase().assertTrue( - cwt_cose_key, + verification_key, "Failed to verify signature on statement", ) - cwt_protected = cwt.decode(msg.phdr[CWTClaims], cwt_cose_key) + cwt_protected = cwt.decode(msg.phdr[CWTClaims], verification_key.cwt) issuer = cwt_protected[1] subject = cwt_protected[2] + issuer_key_as_object = verification_key_to_object(verification_key) + unittest.TestCase().assertTrue( + issuer_key_as_object, + "Failed to convert issuer key to JSON schema verifiable object", + ) + SCHEMA = json.loads(pathlib.Path(os.environ["SCHEMA_PATH"]).read_text()) try: @@ -124,6 +131,7 @@ def main(): instance={ "$schema": "https://schema.example.com/scitt-policy-engine-jsonschema.schema.json", "issuer": issuer, + "issuer_key": issuer_key_as_object, "subject": subject, "claim": json.loads(msg.payload.decode()), }, diff --git a/scitt_emulator/key_helper_dataclasses.py b/scitt_emulator/key_helper_dataclasses.py new file mode 100644 index 00000000..505afe88 --- /dev/null +++ b/scitt_emulator/key_helper_dataclasses.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass, field +from typing import List, Any, Union + +import cwt +import pycose.keys.ec2 + + +@dataclass +class VerificationKey: + transforms: List[Any] + original: Any + original_content_type: str + original_bytes: bytes + original_bytes_encoding: str + usable: bool + cwt: Union[cwt.COSEKey, None] + cose: Union[pycose.keys.ec2.EC2Key, None] diff --git a/scitt_emulator/key_helpers.py b/scitt_emulator/key_helpers.py new file mode 100644 index 00000000..5d692ee9 --- /dev/null +++ b/scitt_emulator/key_helpers.py @@ -0,0 +1,41 @@ +import itertools +import importlib.metadata +from typing import Optional, Callable, List, Tuple + +from scitt_emulator.key_helper_dataclasses import VerificationKey + + +ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT = "scitt_emulator.key_helpers.verification_key_to_object" + + +def verification_key_to_object( + verification_key: VerificationKey, + *, + key_transforms: Optional[List[Callable[[VerificationKey], dict]]] = None, +) -> bool: + """ + Resolve keys for statement issuer and verify signature on COSESign1 + statement and embedded CWT + """ + if key_transforms is None: + key_transforms = [] + # There is some difference in the return value of entry_points across + # Python versions/envs (conda vs. non-conda). Python 3.8 returns a dict. + entrypoints = importlib.metadata.entry_points() + if isinstance(entrypoints, dict): + for entrypoint in entrypoints.get(ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT, []): + key_transforms.append(entrypoint.load()) + elif isinstance(entrypoints, getattr(importlib.metadata, "EntryPoints", list)): + for entrypoint in entrypoints: + if entrypoint.group == ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT: + key_transforms.append(entrypoint.load()) + else: + raise TypeError(f"importlib.metadata.entry_points returned unknown type: {type(entrypoints)}: {entrypoints!r}") + + for key_transform in key_transforms: + verification_key_as_object = key_transform(verification_key) + # Skip keys that we couldn't derive COSE keys for + if verification_key_as_object: + return verification_key_as_object + + return None diff --git a/scitt_emulator/key_loader_format_did_key.py b/scitt_emulator/key_loader_format_did_key.py index 152965b5..528b298e 100644 --- a/scitt_emulator/key_loader_format_did_key.py +++ b/scitt_emulator/key_loader_format_did_key.py @@ -4,45 +4,48 @@ import cwt.algs.ec2 import pycose import pycose.keys.ec2 +import cryptography.hazmat.primitives.asymmetric.ec from cryptography.hazmat.primitives import serialization # TODO Remove this once we have a example flow for proper key verification import jwcrypto.jwk from scitt_emulator.did_helpers import DID_KEY_METHOD, did_key_to_cryptography_key +from scitt_emulator.key_helper_dataclasses import VerificationKey + + +# TODO What is the correct content type? Should we differ if it's been expanded? +CONTENT_TYPE = "application/key+did" def key_loader_format_did_key( unverified_issuer: str, -) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]: - jwk_keys = [] - cwt_cose_keys = [] - pycose_cose_keys = [] - cryptography_keys = [] - +) -> List[VerificationKey]: if not unverified_issuer.startswith(DID_KEY_METHOD): - return pycose_cose_keys - - cryptography_keys.append(did_key_to_cryptography_key(unverified_issuer)) - - for cryptography_key in cryptography_keys: - jwk_keys.append( - jwcrypto.jwk.JWK.from_pem( - cryptography_key.public_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo, - ) - ) + return [] + key = did_key_to_cryptography_key(unverified_issuer) + return [ + VerificationKey( + transforms=[key], + original=key, + original_content_type=CONTENT_TYPE, + original_bytes=unverified_issuer.encode("utf-8"), + original_bytes_encoding="utf-8", + usable=False, + cwt=None, + cose=None, ) - - for jwk_key in jwk_keys: - cwt_cose_key = cwt.COSEKey.from_pem( - jwk_key.export_to_pem(), - kid=jwk_key.thumbprint(), + ] + + +def transform_key_instance_cryptography_ecc_public_to_jwcrypto_jwk( + key: cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey, +) -> jwcrypto.jwk.JWK: + if not isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey): + raise TypeError(key) + return jwcrypto.jwk.JWK.from_pem( + key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, ) - cwt_cose_keys.append(cwt_cose_key) - cwt_ec2_key_as_dict = cwt_cose_key.to_dict() - pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict) - pycose_cose_keys.append((cwt_cose_key, pycose_cose_key)) - - return pycose_cose_keys + ) diff --git a/scitt_emulator/key_loader_format_url_referencing_oidc_issuer.py b/scitt_emulator/key_loader_format_url_referencing_oidc_issuer.py index 76a4547c..a1d9932b 100644 --- a/scitt_emulator/key_loader_format_url_referencing_oidc_issuer.py +++ b/scitt_emulator/key_loader_format_url_referencing_oidc_issuer.py @@ -13,20 +13,22 @@ import jwcrypto.jwk from scitt_emulator.did_helpers import did_web_to_url +from scitt_emulator.key_helper_dataclasses import VerificationKey + + +CONTENT_TYPE = "application/jwk+json" def key_loader_format_url_referencing_oidc_issuer( unverified_issuer: str, ) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]: - jwk_keys = [] - cwt_cose_keys = [] - pycose_cose_keys = [] + keys = [] if unverified_issuer.startswith("did:web:"): unverified_issuer = did_web_to_url(unverified_issuer) if "://" not in unverified_issuer or unverified_issuer.startswith("file://"): - return pycose_cose_keys + return keys # TODO Logging for URLErrors # Check if OIDC issuer @@ -44,18 +46,40 @@ def key_loader_format_url_referencing_oidc_issuer( jwks = json.loads(response.read()) for jwk_key_as_dict in jwks["keys"]: jwk_key_as_string = json.dumps(jwk_key_as_dict) - jwk_keys.append( - jwcrypto.jwk.JWK.from_json(jwk_key_as_string), + jwk_key = jwcrypto.jwk.JWK.from_json(jwk_key_as_string) + keys.append( + VerificationKey( + transforms=[jwk_key], + original=jwk_key, + original_content_type=CONTENT_TYPE, + original_bytes=jwk_key_as_string.encode("utf-8"), + original_bytes_encoding="utf-8", + usable=False, + cwt=None, + cose=None, + ) ) - for jwk_key in jwk_keys: - cwt_cose_key = cwt.COSEKey.from_pem( - jwk_key.export_to_pem(), - kid=jwk_key.thumbprint(), - ) - cwt_cose_keys.append(cwt_cose_key) - cwt_ec2_key_as_dict = cwt_cose_key.to_dict() - pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict) - pycose_cose_keys.append((cwt_cose_key, pycose_cose_key)) - - return pycose_cose_keys + return keys + + +def transform_key_instance_jwcrypto_jwk_to_cwt_cose( + key: jwcrypto.jwk.JWK, +) -> cwt.COSEKey: + if not isinstance(key, jwcrypto.jwk.JWK): + raise TypeError(key) + return cwt.COSEKey.from_pem( + key.export_to_pem(), + kid=key.thumbprint(), + ) + + +def to_object_oidc_issuer(verification_key: VerificationKey) -> dict: + if verification_key.original_content_type != CONTENT_TYPE: + return + + return { + **verification_key.original.export_public(as_dict=True), + "use": "sig", + "kid": verification_key.original.thumbprint(), + } diff --git a/scitt_emulator/key_loader_format_url_referencing_ssh_authorized_keys.py b/scitt_emulator/key_loader_format_url_referencing_ssh_authorized_keys.py index aea7e38b..5edd69e9 100644 --- a/scitt_emulator/key_loader_format_url_referencing_ssh_authorized_keys.py +++ b/scitt_emulator/key_loader_format_url_referencing_ssh_authorized_keys.py @@ -19,17 +19,13 @@ def key_loader_format_url_referencing_ssh_authorized_keys( unverified_issuer: str, ) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]: - jwk_keys = [] - cwt_cose_keys = [] - pycose_cose_keys = [] - - cryptography_ssh_keys = [] + keys = [] if unverified_issuer.startswith("did:web:"): unverified_issuer = did_web_to_url(unverified_issuer) if "://" not in unverified_issuer or unverified_issuer.startswith("file://"): - return pycose_cose_keys + return keys # Try loading ssh keys. Example: https://github.com/username.keys with contextlib.suppress(urllib.request.URLError): @@ -38,28 +34,18 @@ def key_loader_format_url_referencing_ssh_authorized_keys( with contextlib.suppress( (ValueError, cryptography.exceptions.UnsupportedAlgorithm) ): - cryptography_ssh_keys.append( - serialization.load_ssh_public_key(line) + key = serialization.load_ssh_public_key(line) + keys.append( + VerificationKey( + transforms=[key], + original=key, + original_content_type=CONTENT_TYPE, + original_bytes=line.encode("utf-8"), + original_bytes_encoding="utf-8", + usable=False, + cwt=None, + cose=None, + ) ) - for cryptography_ssh_key in cryptography_ssh_keys: - jwk_keys.append( - jwcrypto.jwk.JWK.from_pem( - cryptography_ssh_key.public_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo, - ) - ) - ) - - for jwk_key in jwk_keys: - cwt_cose_key = cwt.COSEKey.from_pem( - jwk_key.export_to_pem(), - kid=jwk_key.thumbprint(), - ) - cwt_cose_keys.append(cwt_cose_key) - cwt_ec2_key_as_dict = cwt_cose_key.to_dict() - pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict) - pycose_cose_keys.append((cwt_cose_key, pycose_cose_key)) - - return pycose_cose_keys + return keys diff --git a/scitt_emulator/key_loader_format_url_referencing_x509.py b/scitt_emulator/key_loader_format_url_referencing_x509.py index 2c92db79..0f46dff4 100644 --- a/scitt_emulator/key_loader_format_url_referencing_x509.py +++ b/scitt_emulator/key_loader_format_url_referencing_x509.py @@ -14,22 +14,22 @@ import jwcrypto.jwk from scitt_emulator.did_helpers import did_web_to_url +from scitt_emulator.key_helper_dataclasses import VerificationKey + + +CONTENT_TYPE = "application/pkix-cert" def key_loader_format_url_referencing_x509( unverified_issuer: str, ) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]: - jwk_keys = [] - cwt_cose_keys = [] - pycose_cose_keys = [] - - cryptography_ssh_keys = [] + keys = [] if unverified_issuer.startswith("did:web:"): unverified_issuer = did_web_to_url(unverified_issuer) if "://" not in unverified_issuer or unverified_issuer.startswith("file://"): - return pycose_cose_keys + return keys with contextlib.suppress(urllib.request.URLError): with urllib.request.urlopen(unverified_issuer) as response: @@ -40,26 +40,28 @@ def key_loader_format_url_referencing_x509( for certificate in cryptography.x509.load_pem_x509_certificates( contents ): - cryptography_ssh_keys.append(certificate.public_key()) + key = certificate.public_key() + keys.append( + VerificationKey( + transforms=[key], + original=key, + original_content_type=CONTENT_TYPE, + original_bytes=contents, + original_bytes_encoding="utf-8", + usable=False, + cwt=None, + cose=None, + ) + ) + + return keys + - for cryptography_ssh_key in cryptography_ssh_keys: - jwk_keys.append( - jwcrypto.jwk.JWK.from_pem( - cryptography_ssh_key.public_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo, - ) - ) - ) +def to_object_x509(verification_key: VerificationKey) -> dict: + if verification_key.original_content_type != CONTENT_TYPE: + return - for jwk_key in jwk_keys: - cwt_cose_key = cwt.COSEKey.from_pem( - jwk_key.export_to_pem(), - kid=jwk_key.thumbprint(), - ) - cwt_cose_keys.append(cwt_cose_key) - cwt_ec2_key_as_dict = cwt_cose_key.to_dict() - pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict) - pycose_cose_keys.append((cwt_cose_key, pycose_cose_key)) + # TODO to dict + verification_key.original - return pycose_cose_keys + return {} diff --git a/scitt_emulator/key_transforms.py b/scitt_emulator/key_transforms.py new file mode 100644 index 00000000..67eb9ed1 --- /dev/null +++ b/scitt_emulator/key_transforms.py @@ -0,0 +1,89 @@ +import inspect +import itertools +import importlib.metadata +from typing import Optional, Callable, List, Tuple + +import cwt +import pycose.keys.ec2 + +from scitt_emulator.key_helper_dataclasses import VerificationKey + + +ENTRYPOINT_KEY_TRANSFORMS_KEY_INSTANCES = "scitt_emulator.key_helpers.transforms_key_instances" + + +def preform_verification_key_transforms( + verification_keys: List[VerificationKey], + *, + key_transforms: Optional[List[Callable[[VerificationKey], dict]]] = None, +) -> None: + """ + Resolve keys for statement issuer and verify signature on COSESign1 + statement and embedded CWT + """ + # In case of iterators since we have to loop multiple times + verification_keys = list(verification_keys) + + if key_transforms is None: + key_transforms = [] + # There is some difference in the return value of entry_points across + # Python versions/envs (conda vs. non-conda). Python 3.8 returns a dict. + entrypoints = importlib.metadata.entry_points() + if isinstance(entrypoints, dict): + for entrypoint in entrypoints.get(ENTRYPOINT_KEY_TRANSFORMS_KEY_INSTANCES, []): + key_transforms.append(entrypoint.load()) + elif isinstance(entrypoints, getattr(importlib.metadata, "EntryPoints", list)): + for entrypoint in entrypoints: + if entrypoint.group == ENTRYPOINT_KEY_TRANSFORMS_KEY_INSTANCES: + key_transforms.append(entrypoint.load()) + else: + raise TypeError(f"importlib.metadata.entry_points returned unknown type: {type(entrypoints)}: {entrypoints!r}") + + key_transform_types = tuple( + [ + list(inspect.signature(key_transform).parameters.values())[0].annotation + for key_transform in key_transforms + ] + ) + + for verification_key in verification_keys: + while not verification_key.usable: + # Attempt key transforms + for key_transform in key_transforms: + key = verification_key.transforms[-1] + if isinstance(key, list(inspect.signature(key_transform).parameters.values())[0].annotation): + transformed_key = key_transform(key) + if transformed_key: + verification_key.transforms.append(transformed_key) + # Check if key is usable yet + for key in reversed(verification_key.transforms): + if not verification_key.cwt and isinstance(key, cwt.algs.ec2.EC2Key): + verification_key.cwt = key + if ( + not verification_key.cose + and isinstance( + key, + ( + pycose.keys.ec2.EC2Key, + ) + ) + ): + verification_key.cose = key + if verification_key.cwt and verification_key.cose: + verification_key.usable = True + break + # If we are unable to transform further, raise exception + key = verification_key.transforms[-1] + if not isinstance(key, key_transform_types): + raise NotImplementedError(f"Unable to transform {type(key)} into CWT and COSE keys needed. Transforms available: {key_transforms}. Transform types accepted: {key_transform_types}. Transforms completed: {verification_key.transforms}") + + return verification_keys + + +def transform_key_instance_cwt_cose_ec2_to_pycose_ec2( + key: cwt.algs.ec2.EC2Key, +) -> pycose.keys.ec2.EC2Key: + if not isinstance(key, cwt.algs.ec2.EC2Key): + raise TypeError(key) + cwt_ec2_key_as_dict = key.to_dict() + return pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict) diff --git a/scitt_emulator/scitt.py b/scitt_emulator/scitt.py index c2b2c062..2ce68c42 100644 --- a/scitt_emulator/scitt.py +++ b/scitt_emulator/scitt.py @@ -237,10 +237,10 @@ def _create_receipt(self, claim: bytes, entry_id: str): raise ClaimInvalidError("Claim does not have a CWTClaims header parameter") try: - cwt_cose_key, _pycose_cose_key = verify_statement(msg) + verification_key = verify_statement(msg) except Exception as e: raise ClaimInvalidError("Failed to verify signature on statement") from e - if not cwt_cose_key: + if not verification_key: raise ClaimInvalidError("Failed to verify signature on statement") # Extract fields of COSE_Sign1 for countersigning diff --git a/scitt_emulator/verify_statement.py b/scitt_emulator/verify_statement.py index f913720e..f7c56f8f 100644 --- a/scitt_emulator/verify_statement.py +++ b/scitt_emulator/verify_statement.py @@ -1,6 +1,7 @@ import os import itertools import contextlib +import dataclasses import urllib.parse import urllib.request import importlib.metadata @@ -14,6 +15,8 @@ from scitt_emulator.did_helpers import did_web_to_url from scitt_emulator.create_statement import CWTClaims +from scitt_emulator.key_helper_dataclasses import VerificationKey +from scitt_emulator.key_transforms import preform_verification_key_transforms ENTRYPOINT_KEY_LOADERS = "scitt_emulator.verify_signature.key_loaders" @@ -22,9 +25,7 @@ def verify_statement( msg: Sign1Message, *, - key_loaders: Optional[ - List[Callable[[str], List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]]] - ] = None, + key_loaders: Optional[List[Callable[[str], List[VerificationKey]]]] = None, ) -> bool: """ Resolve keys for statement issuer and verify signature on COSESign1 @@ -52,15 +53,20 @@ def verify_statement( ) unverified_issuer = cwt_unverified_protected[1] - # Load keys from issuer and attempt verification. Return keys used to verify - # as tuple of cwt.COSEKey and pycose.keys formats - for cwt_cose_key, pycose_cose_key in itertools.chain( - *[key_loader(unverified_issuer) for key_loader in key_loaders] + # Load keys from issuer and attempt verification. Return key used to verify + for verification_key in preform_verification_key_transforms( + itertools.chain( + *[key_loader(unverified_issuer) for key_loader in key_loaders] + ) ): - msg.key = pycose_cose_key + # Skip keys that we couldn't derive COSE keys for + if not verification_key.usable: + # TODO Logging + continue + msg.key = verification_key.cose with contextlib.suppress(Exception): verify_signature = msg.verify_signature() if verify_signature: - return cwt_cose_key, pycose_cose_key + return verification_key - return None, None + return None diff --git a/setup.py b/setup.py index 7e4ab2a3..165daac3 100644 --- a/setup.py +++ b/setup.py @@ -17,6 +17,17 @@ 'url_referencing_ssh_authorized_keys=scitt_emulator.key_loader_format_url_referencing_ssh_authorized_keys:key_loader_format_url_referencing_ssh_authorized_keys', 'url_referencing_x509=scitt_emulator.key_loader_format_url_referencing_x509:key_loader_format_url_referencing_x509', ], + 'scitt_emulator.key_helpers.transforms_key_instances': [ + 'transform_key_instance_cwt_cose_ec2_to_pycose_ec2=scitt_emulator.key_transforms:transform_key_instance_cwt_cose_ec2_to_pycose_ec2', + 'transform_key_instance_cryptography_ecc_public_to_jwcrypto_jwk=scitt_emulator:key_loader_format_did_key.transform_key_instance_cryptography_ecc_public_to_jwcrypto_jwk', + 'transform_key_instance_jwcrypto_jwk_to_cwt_cose=scitt_emulator.key_loader_format_url_referencing_oidc_issuer:transform_key_instance_jwcrypto_jwk_to_cwt_cose', + ], + 'scitt_emulator.key_helpers.verification_key_to_object': [ + # TODO 'to_object_did_key=scitt_emulator.key_loader_format_did_key:to_object_did_key', + 'to_object_x509=scitt_emulator.key_loader_format_url_referencing_x509:to_object_x509', + # TODO 'to_object_ssh_authorized_keys=scitt_emulator.key_loader_format_url_referencing_ssh_authorized_keys:to_object_ssh_authorized_keys', + 'to_object_oidc_issuer=scitt_emulator.key_loader_format_url_referencing_oidc_issuer:to_object_oidc_issuer', + ], }, python_requires=">=3.8", install_requires=[ diff --git a/tests/test_docs.py b/tests/test_docs.py index 78398e3e..508b23d6 100644 --- a/tests/test_docs.py +++ b/tests/test_docs.py @@ -263,7 +263,8 @@ def test_docs_registration_policies(tmp_path): check_error = error assert check_error assert "error" in check_error.operation - assert check_error.operation["error"] == claim_denied_error_blocked + if check_error.operation["error"] != claim_denied_error_blocked: + raise check_error assert not os.path.exists(receipt_path) assert not os.path.exists(entry_id_path)