Skip to content

Commit

Permalink
Simplify and strengthen AWSSigner.import_ (WIP)
Browse files Browse the repository at this point in the history
**Simplify**
- remove redundant keytype/scheme switches
- remove try/except, which catches an exception only to re-raise it

**Strengthen**
- choose default scheme based on schemes supported by the key

Roughly implements the algorithm suggested in:
#724 (review)

**Unrelated changes**
- fix #765 fix typo AND disable scheme until SSlibKey adds support
- fail in __init__ if public_key.scheme is not in aws_algos

TODO:
* add tests

Many thanks to @ianhundere for the solid ground work!

Signed-off-by: Lukas Puehringer <[email protected]>
  • Loading branch information
lukpueh committed Apr 11, 2024
1 parent 94d58a6 commit bcfae9d
Showing 1 changed file with 45 additions and 64 deletions.
109 changes: 45 additions & 64 deletions securesystemslib/signer/_aws_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from typing import Optional, Tuple
from urllib import parse

from securesystemslib import exceptions
from securesystemslib.exceptions import UnsupportedLibraryError
from securesystemslib.exceptions import (
UnsupportedAlgorithmError,
UnsupportedLibraryError,
)
from securesystemslib.signer._key import Key, SSlibKey
from securesystemslib.signer._signer import SecretsHandler, Signature, Signer
from securesystemslib.signer._utils import compute_default_keyid
Expand Down Expand Up @@ -57,10 +59,12 @@ class AWSSigner(Signer):

SCHEME = "awskms"

aws_signing_algorithms = {
# Ordered dict of securesystemslib schemes to aws signing algorithms
# NOTE: the order matters when choosing a default (see _get_default_scheme)
aws_algos = {
"ecdsa-sha2-nistp256": "ECDSA_SHA_256",
"ecdsa-sha2-nistp384": "ECDSA_SHA_384",
"ecdsa-sha2-nistp512": "ECDSA_SHA_512",
# "ecdsa-sha2-nistp521": "ECDSA_SHA_512", # FIXME: needs SSlibKey support
"rsassa-pss-sha256": "RSASSA_PSS_SHA_256",
"rsassa-pss-sha384": "RSASSA_PSS_SHA_384",
"rsassa-pss-sha512": "RSASSA_PSS_SHA_512",
Expand All @@ -76,9 +80,7 @@ def __init__(self, aws_key_id: str, public_key: Key):
self.aws_key_id = aws_key_id
self._public_key = public_key
self.client = boto3.client("kms")
self.aws_algo = AWSSigner.aws_signing_algorithms.get(
self.public_key.scheme
)
self.aws_algo = self.aws_algos[self.public_key.scheme]

@property
def public_key(self) -> Key:
Expand All @@ -98,6 +100,23 @@ def from_priv_key_uri(

return cls(uri.path, public_key)

@classmethod
def _get_default_scheme(cls, supported_by_key: list[str]) -> Optional[str]:
# Iterate over supported AWS algorithms, pick the first one also
# supported by the key, and return the related securesystemslib scheme.
for scheme, algo in cls.aws_algos.items():
if algo in supported_by_key:
return scheme
return None

@staticmethod
def _get_keytype_for_scheme(scheme: str) -> str:
if scheme.startswith("ecdsa"):
return "ecdsa"
if scheme.startswith("rsa"):
return "rsa"
raise RuntimeError

@classmethod
def import_(
cls, aws_key_id: str, local_scheme: Optional[str] = None
Expand Down Expand Up @@ -126,30 +145,28 @@ def import_(
if AWS_IMPORT_ERROR:
raise UnsupportedLibraryError(AWS_IMPORT_ERROR)

if local_scheme and local_scheme not in cls.aws_signing_algorithms:
raise ValueError(f"Unsupported scheme: {local_scheme}")
if local_scheme:
if local_scheme not in cls.aws_algos:
raise ValueError(f"Unsupported scheme '{local_scheme}'")

try:
client = boto3.client("kms")
request = client.get_public_key(KeyId=aws_key_id)
except (BotoCoreError, ClientError) as e:
logger.error(
"Failed to import key using AWS KMS key ID %s: %s",
aws_key_id,
str(e),
)
raise e
client = boto3.client("kms")
request = client.get_public_key(KeyId=aws_key_id)
key_algos = request["SigningAlgorithms"]

keytype = cls._get_keytype_from_aws_response(request)
aws_scheme = request["SigningAlgorithms"][0]
if local_scheme:
if cls.aws_algos[local_scheme] not in key_algos:
raise UnsupportedAlgorithmError(
f"Unsupported scheme '{local_scheme}' for AWS key"
)

if not local_scheme:
if keytype == "ecdsa":
local_scheme = cls._get_ecdsa_scheme(aws_scheme)
elif keytype == "rsa":
local_scheme = "rsassa-pss-sha256"
else:
raise ValueError(f"Unsupported key type: {keytype}")
else:
local_scheme = cls._get_default_scheme(key_algos)
if not local_scheme:
raise UnsupportedAlgorithmError(
f"Unsupported AWS key algorithms: {key_algos}"
)

keytype = cls._get_keytype_for_scheme(local_scheme)

kms_pubkey = serialization.load_der_public_key(request["PublicKey"])

Expand All @@ -163,42 +180,6 @@ def import_(
public_key = SSlibKey(keyid, keytype, local_scheme, keyval)
return f"{cls.SCHEME}:{aws_key_id}", public_key

@staticmethod
def _get_keytype_from_aws_response(reponse: dict) -> str:
"""Determines the key type from the AWS KMS get_public_key response.
Arguments:
response (dict): The response from AWS KMS get_public_key request.
Returns:
str: The key type, either 'ecdsa' or 'rsa'.
"""
algo = reponse["SigningAlgorithms"][0]
if "ECDSA" in algo:
return "ecdsa"
if "RSASSA" in algo:
return "rsa"
raise exceptions.UnsupportedAlgorithmError(
f"Unsupported algorithm in AWS response: {algo}"
)

@staticmethod
def _get_ecdsa_scheme(aws_algo: str) -> str:
"""Returns ECDSA signing scheme based on AWS signing algorithm.
Arguments:
aws_algo (str): The AWS ECDSA signing algorithm.
Returns:
str: The Secure Systems Library ECDSA scheme.
"""
ecdsa_signing_algorithms = {
"ECDSA_SHA_256": "ecdsa-sha2-nistp256",
"ECDSA_SHA_384": "ecdsa-sha2-nistp384",
"ECDSA_SHA_512": "ecdsa-sha2-nistp512",
}
return ecdsa_signing_algorithms[aws_algo]

def sign(self, payload: bytes) -> Signature:
"""Sign the payload with the AWS KMS key
Expand Down

0 comments on commit bcfae9d

Please sign in to comment.