Skip to content

Commit

Permalink
Merge pull request secure-systems-lab#778 from lukpueh/simplify-aws-s…
Browse files Browse the repository at this point in the history
…igner

Simplify and strengthen AWSSigner.import_
  • Loading branch information
lukpueh authored Apr 15, 2024
2 parents 1b672dc + 587cf38 commit 420b38c
Showing 1 changed file with 46 additions and 65 deletions.
111 changes: 46 additions & 65 deletions securesystemslib/signer/_aws_signer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Signer implementation for AWS Key Management Service"""

import logging
from typing import Optional, Tuple
from typing import List, Optional, Tuple
from urllib import parse

from securesystemslib import exceptions
from securesystemslib.exceptions import UnsupportedLibraryError
from securesystemslib.exceptions import (
UnsupportedAlgorithmError,
UnsupportedLibraryError,
)
from securesystemslib.signer._key import Key, SSlibKey
from securesystemslib.signer._signer import SecretsHandler, Signature, Signer
from securesystemslib.signer._utils import compute_default_keyid
Expand Down Expand Up @@ -57,10 +59,12 @@ class AWSSigner(Signer):

SCHEME = "awskms"

aws_signing_algorithms = {
# Ordered dict of securesystemslib schemes to aws signing algorithms
# NOTE: the order matters when choosing a default (see _get_default_scheme)
aws_algos = {
"ecdsa-sha2-nistp256": "ECDSA_SHA_256",
"ecdsa-sha2-nistp384": "ECDSA_SHA_384",
"ecdsa-sha2-nistp512": "ECDSA_SHA_512",
# "ecdsa-sha2-nistp521": "ECDSA_SHA_512", # FIXME: needs SSlibKey support
"rsassa-pss-sha256": "RSASSA_PSS_SHA_256",
"rsassa-pss-sha384": "RSASSA_PSS_SHA_384",
"rsassa-pss-sha512": "RSASSA_PSS_SHA_512",
Expand All @@ -76,9 +80,7 @@ def __init__(self, aws_key_id: str, public_key: Key):
self.aws_key_id = aws_key_id
self._public_key = public_key
self.client = boto3.client("kms")
self.aws_algo = AWSSigner.aws_signing_algorithms.get(
self.public_key.scheme
)
self.aws_algo = self.aws_algos[self.public_key.scheme]

@property
def public_key(self) -> Key:
Expand All @@ -98,6 +100,23 @@ def from_priv_key_uri(

return cls(uri.path, public_key)

@classmethod
def _get_default_scheme(cls, supported_by_key: List[str]) -> Optional[str]:
# Iterate over supported AWS algorithms, pick the **first** that is also
# supported by the key, and return the related securesystemslib scheme.
for scheme, algo in cls.aws_algos.items():
if algo in supported_by_key:
return scheme
return None

@staticmethod
def _get_keytype_for_scheme(scheme: str) -> str:
if scheme.startswith("ecdsa"):
return "ecdsa"
if scheme.startswith("rsa"):
return "rsa"
raise RuntimeError

@classmethod
def import_(
cls, aws_key_id: str, local_scheme: Optional[str] = None
Expand Down Expand Up @@ -126,30 +145,28 @@ def import_(
if AWS_IMPORT_ERROR:
raise UnsupportedLibraryError(AWS_IMPORT_ERROR)

if local_scheme and local_scheme not in cls.aws_signing_algorithms:
raise ValueError(f"Unsupported scheme: {local_scheme}")
if local_scheme:
if local_scheme not in cls.aws_algos:
raise ValueError(f"Unsupported scheme '{local_scheme}'")

try:
client = boto3.client("kms")
request = client.get_public_key(KeyId=aws_key_id)
except (BotoCoreError, ClientError) as e:
logger.error(
"Failed to import key using AWS KMS key ID %s: %s",
aws_key_id,
str(e),
)
raise e
client = boto3.client("kms")
request = client.get_public_key(KeyId=aws_key_id)
key_algos = request["SigningAlgorithms"]

keytype = cls._get_keytype_from_aws_response(request)
aws_scheme = request["SigningAlgorithms"][0]
if local_scheme:
if cls.aws_algos[local_scheme] not in key_algos:
raise UnsupportedAlgorithmError(
f"Unsupported scheme '{local_scheme}' for AWS key"
)

if not local_scheme:
if keytype == "ecdsa":
local_scheme = cls._get_ecdsa_scheme(aws_scheme)
elif keytype == "rsa":
local_scheme = "rsassa-pss-sha256"
else:
raise ValueError(f"Unsupported key type: {keytype}")
else:
local_scheme = cls._get_default_scheme(key_algos)
if not local_scheme:
raise UnsupportedAlgorithmError(
f"Unsupported AWS key algorithms: {key_algos}"
)

keytype = cls._get_keytype_for_scheme(local_scheme)

kms_pubkey = serialization.load_der_public_key(request["PublicKey"])

Expand All @@ -163,42 +180,6 @@ def import_(
public_key = SSlibKey(keyid, keytype, local_scheme, keyval)
return f"{cls.SCHEME}:{aws_key_id}", public_key

@staticmethod
def _get_keytype_from_aws_response(reponse: dict) -> str:
"""Determines the key type from the AWS KMS get_public_key response.
Arguments:
response (dict): The response from AWS KMS get_public_key request.
Returns:
str: The key type, either 'ecdsa' or 'rsa'.
"""
algo = reponse["SigningAlgorithms"][0]
if "ECDSA" in algo:
return "ecdsa"
if "RSASSA" in algo:
return "rsa"
raise exceptions.UnsupportedAlgorithmError(
f"Unsupported algorithm in AWS response: {algo}"
)

@staticmethod
def _get_ecdsa_scheme(aws_algo: str) -> str:
"""Returns ECDSA signing scheme based on AWS signing algorithm.
Arguments:
aws_algo (str): The AWS ECDSA signing algorithm.
Returns:
str: The Secure Systems Library ECDSA scheme.
"""
ecdsa_signing_algorithms = {
"ECDSA_SHA_256": "ecdsa-sha2-nistp256",
"ECDSA_SHA_384": "ecdsa-sha2-nistp384",
"ECDSA_SHA_512": "ecdsa-sha2-nistp512",
}
return ecdsa_signing_algorithms[aws_algo]

def sign(self, payload: bytes) -> Signature:
"""Sign the payload with the AWS KMS key
Expand Down

0 comments on commit 420b38c

Please sign in to comment.