Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and strengthen AWSSigner.import_ #778

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 46 additions & 65 deletions securesystemslib/signer/_aws_signer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Signer implementation for AWS Key Management Service"""

import logging
from typing import Optional, Tuple
from typing import List, Optional, Tuple
from urllib import parse

from securesystemslib import exceptions
from securesystemslib.exceptions import UnsupportedLibraryError
from securesystemslib.exceptions import (
UnsupportedAlgorithmError,
UnsupportedLibraryError,
)
from securesystemslib.signer._key import Key, SSlibKey
from securesystemslib.signer._signer import SecretsHandler, Signature, Signer
from securesystemslib.signer._utils import compute_default_keyid
Expand Down Expand Up @@ -57,10 +59,12 @@ class AWSSigner(Signer):

SCHEME = "awskms"

aws_signing_algorithms = {
# Ordered dict of securesystemslib schemes to aws signing algorithms
# NOTE: the order matters when choosing a default (see _get_default_scheme)
aws_algos = {
"ecdsa-sha2-nistp256": "ECDSA_SHA_256",
"ecdsa-sha2-nistp384": "ECDSA_SHA_384",
"ecdsa-sha2-nistp512": "ECDSA_SHA_512",
# "ecdsa-sha2-nistp521": "ECDSA_SHA_512", # FIXME: needs SSlibKey support
"rsassa-pss-sha256": "RSASSA_PSS_SHA_256",
"rsassa-pss-sha384": "RSASSA_PSS_SHA_384",
"rsassa-pss-sha512": "RSASSA_PSS_SHA_512",
Expand All @@ -76,9 +80,7 @@ def __init__(self, aws_key_id: str, public_key: Key):
self.aws_key_id = aws_key_id
self._public_key = public_key
self.client = boto3.client("kms")
self.aws_algo = AWSSigner.aws_signing_algorithms.get(
self.public_key.scheme
)
self.aws_algo = self.aws_algos[self.public_key.scheme]

@property
def public_key(self) -> Key:
Expand All @@ -98,6 +100,23 @@ def from_priv_key_uri(

return cls(uri.path, public_key)

@classmethod
def _get_default_scheme(cls, supported_by_key: List[str]) -> Optional[str]:
# Iterate over supported AWS algorithms, pick the **first** that is also
# supported by the key, and return the related securesystemslib scheme.
for scheme, algo in cls.aws_algos.items():
if algo in supported_by_key:
return scheme
return None

@staticmethod
def _get_keytype_for_scheme(scheme: str) -> str:
if scheme.startswith("ecdsa"):
return "ecdsa"
if scheme.startswith("rsa"):
return "rsa"
raise RuntimeError

@classmethod
def import_(
cls, aws_key_id: str, local_scheme: Optional[str] = None
Expand Down Expand Up @@ -126,30 +145,28 @@ def import_(
if AWS_IMPORT_ERROR:
raise UnsupportedLibraryError(AWS_IMPORT_ERROR)

if local_scheme and local_scheme not in cls.aws_signing_algorithms:
raise ValueError(f"Unsupported scheme: {local_scheme}")
if local_scheme:
if local_scheme not in cls.aws_algos:
raise ValueError(f"Unsupported scheme '{local_scheme}'")

try:
client = boto3.client("kms")
request = client.get_public_key(KeyId=aws_key_id)
except (BotoCoreError, ClientError) as e:
logger.error(
"Failed to import key using AWS KMS key ID %s: %s",
aws_key_id,
str(e),
)
raise e
client = boto3.client("kms")
request = client.get_public_key(KeyId=aws_key_id)
key_algos = request["SigningAlgorithms"]

keytype = cls._get_keytype_from_aws_response(request)
aws_scheme = request["SigningAlgorithms"][0]
if local_scheme:
if cls.aws_algos[local_scheme] not in key_algos:
raise UnsupportedAlgorithmError(
f"Unsupported scheme '{local_scheme}' for AWS key"
)

if not local_scheme:
if keytype == "ecdsa":
local_scheme = cls._get_ecdsa_scheme(aws_scheme)
elif keytype == "rsa":
local_scheme = "rsassa-pss-sha256"
else:
raise ValueError(f"Unsupported key type: {keytype}")
else:
local_scheme = cls._get_default_scheme(key_algos)
if not local_scheme:
raise UnsupportedAlgorithmError(
f"Unsupported AWS key algorithms: {key_algos}"
)

keytype = cls._get_keytype_for_scheme(local_scheme)

kms_pubkey = serialization.load_der_public_key(request["PublicKey"])

Expand All @@ -163,42 +180,6 @@ def import_(
public_key = SSlibKey(keyid, keytype, local_scheme, keyval)
return f"{cls.SCHEME}:{aws_key_id}", public_key

@staticmethod
def _get_keytype_from_aws_response(reponse: dict) -> str:
"""Determines the key type from the AWS KMS get_public_key response.

Arguments:
response (dict): The response from AWS KMS get_public_key request.

Returns:
str: The key type, either 'ecdsa' or 'rsa'.
"""
algo = reponse["SigningAlgorithms"][0]
if "ECDSA" in algo:
return "ecdsa"
if "RSASSA" in algo:
return "rsa"
raise exceptions.UnsupportedAlgorithmError(
f"Unsupported algorithm in AWS response: {algo}"
)

@staticmethod
def _get_ecdsa_scheme(aws_algo: str) -> str:
"""Returns ECDSA signing scheme based on AWS signing algorithm.

Arguments:
aws_algo (str): The AWS ECDSA signing algorithm.

Returns:
str: The Secure Systems Library ECDSA scheme.
"""
ecdsa_signing_algorithms = {
"ECDSA_SHA_256": "ecdsa-sha2-nistp256",
"ECDSA_SHA_384": "ecdsa-sha2-nistp384",
"ECDSA_SHA_512": "ecdsa-sha2-nistp512",
}
return ecdsa_signing_algorithms[aws_algo]

def sign(self, payload: bytes) -> Signature:
"""Sign the payload with the AWS KMS key

Expand Down