diff --git a/securesystemslib/signer/_aws_signer.py b/securesystemslib/signer/_aws_signer.py index 91516ce8..07ca33ee 100644 --- a/securesystemslib/signer/_aws_signer.py +++ b/securesystemslib/signer/_aws_signer.py @@ -4,8 +4,10 @@ from typing import Optional, Tuple from urllib import parse -from securesystemslib import exceptions -from securesystemslib.exceptions import UnsupportedLibraryError +from securesystemslib.exceptions import ( + UnsupportedAlgorithmError, + UnsupportedLibraryError, +) from securesystemslib.signer._key import Key, SSlibKey from securesystemslib.signer._signer import SecretsHandler, Signature, Signer from securesystemslib.signer._utils import compute_default_keyid @@ -57,10 +59,12 @@ class AWSSigner(Signer): SCHEME = "awskms" - aws_signing_algorithms = { + # Ordered dict of securesystemslib schemes to aws signing algorithms + # NOTE: the order matters when choosing a default (see _get_default_scheme) + aws_algos = { "ecdsa-sha2-nistp256": "ECDSA_SHA_256", "ecdsa-sha2-nistp384": "ECDSA_SHA_384", - "ecdsa-sha2-nistp512": "ECDSA_SHA_512", + # "ecdsa-sha2-nistp521": "ECDSA_SHA_512", # FIXME: needs SSlibKey support "rsassa-pss-sha256": "RSASSA_PSS_SHA_256", "rsassa-pss-sha384": "RSASSA_PSS_SHA_384", "rsassa-pss-sha512": "RSASSA_PSS_SHA_512", @@ -76,9 +80,7 @@ def __init__(self, aws_key_id: str, public_key: Key): self.aws_key_id = aws_key_id self._public_key = public_key self.client = boto3.client("kms") - self.aws_algo = AWSSigner.aws_signing_algorithms.get( - self.public_key.scheme - ) + self.aws_algo = self.aws_algos[self.public_key.scheme] @property def public_key(self) -> Key: @@ -98,6 +100,23 @@ def from_priv_key_uri( return cls(uri.path, public_key) + @classmethod + def _get_default_scheme(cls, supported_by_key: list[str]) -> Optional[str]: + # Iterate over supported AWS algorithms, pick the first one also + # supported by the key, and return the related securesystemslib scheme. + for scheme, algo in cls.aws_algos.items(): + if algo in supported_by_key: + return scheme + return None + + @staticmethod + def _get_keytype_for_scheme(scheme: str) -> str: + if scheme.startswith("ecdsa"): + return "ecdsa" + if scheme.startswith("rsa"): + return "rsa" + raise RuntimeError + @classmethod def import_( cls, aws_key_id: str, local_scheme: Optional[str] = None @@ -126,30 +145,28 @@ def import_( if AWS_IMPORT_ERROR: raise UnsupportedLibraryError(AWS_IMPORT_ERROR) - if local_scheme and local_scheme not in cls.aws_signing_algorithms: - raise ValueError(f"Unsupported scheme: {local_scheme}") + if local_scheme: + if local_scheme not in cls.aws_algos: + raise ValueError(f"Unsupported scheme '{local_scheme}'") - try: - client = boto3.client("kms") - request = client.get_public_key(KeyId=aws_key_id) - except (BotoCoreError, ClientError) as e: - logger.error( - "Failed to import key using AWS KMS key ID %s: %s", - aws_key_id, - str(e), - ) - raise e + client = boto3.client("kms") + request = client.get_public_key(KeyId=aws_key_id) + key_algos = request["SigningAlgorithms"] - keytype = cls._get_keytype_from_aws_response(request) - aws_scheme = request["SigningAlgorithms"][0] + if local_scheme: + if cls.aws_algos[local_scheme] not in key_algos: + raise UnsupportedAlgorithmError( + f"Unsupported scheme '{local_scheme}' for AWS key" + ) - if not local_scheme: - if keytype == "ecdsa": - local_scheme = cls._get_ecdsa_scheme(aws_scheme) - elif keytype == "rsa": - local_scheme = "rsassa-pss-sha256" - else: - raise ValueError(f"Unsupported key type: {keytype}") + else: + local_scheme = cls._get_default_scheme(key_algos) + if not local_scheme: + raise UnsupportedAlgorithmError( + f"Unsupported AWS key algorithms: {key_algos}" + ) + + keytype = cls._get_keytype_for_scheme(local_scheme) kms_pubkey = serialization.load_der_public_key(request["PublicKey"]) @@ -163,42 +180,6 @@ def import_( public_key = SSlibKey(keyid, keytype, local_scheme, keyval) return f"{cls.SCHEME}:{aws_key_id}", public_key - @staticmethod - def _get_keytype_from_aws_response(reponse: dict) -> str: - """Determines the key type from the AWS KMS get_public_key response. - - Arguments: - response (dict): The response from AWS KMS get_public_key request. - - Returns: - str: The key type, either 'ecdsa' or 'rsa'. - """ - algo = reponse["SigningAlgorithms"][0] - if "ECDSA" in algo: - return "ecdsa" - if "RSASSA" in algo: - return "rsa" - raise exceptions.UnsupportedAlgorithmError( - f"Unsupported algorithm in AWS response: {algo}" - ) - - @staticmethod - def _get_ecdsa_scheme(aws_algo: str) -> str: - """Returns ECDSA signing scheme based on AWS signing algorithm. - - Arguments: - aws_algo (str): The AWS ECDSA signing algorithm. - - Returns: - str: The Secure Systems Library ECDSA scheme. - """ - ecdsa_signing_algorithms = { - "ECDSA_SHA_256": "ecdsa-sha2-nistp256", - "ECDSA_SHA_384": "ecdsa-sha2-nistp384", - "ECDSA_SHA_512": "ecdsa-sha2-nistp512", - } - return ecdsa_signing_algorithms[aws_algo] - def sign(self, payload: bytes) -> Signature: """Sign the payload with the AWS KMS key