Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds AWS KMS signing. #609

Merged
merged 23 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e50fee4
feat: Adds AWS KMS signing.
ianhundere Jul 19, 2023
6c30de2
fix: Adds logic to use the chosen signing algorithm instead of defaul…
ianhundere Jul 20, 2023
30f8ee6
refactor: Removes unnecessary imports.
ianhundere Jul 20, 2023
d27cd6d
refactor: Formats aws_signing_algos list.
ianhundere Jul 20, 2023
99b0ffc
fix: Ensures that the keytype and scheme passed in import_ are consis…
ianhundere Jul 22, 2023
b24f7b6
fix: Resolves pipeline failures.
ianhundere Jul 22, 2023
a753fd0
docs: Adds better docstrings including information on how to setup AW…
ianhundere Jul 22, 2023
cf20f58
chore: Reformats and corrects issues for tox to pass except for error…
ianhundere Jul 22, 2023
cca1f52
docs: Updates docstrings.
ianhundere Jul 22, 2023
63ee289
fix: Updates pyproject.toml.
ianhundere Jul 22, 2023
de818ea
fix: Ensures that parsing of PKCS1 algos is correct, all tests passin…
ianhundere Jul 24, 2023
31b5665
Merge branch 'add-aws-support' of github.com:ianhundere/securesystems…
ianhundere Jul 24, 2023
666c54a
refactor: Makes _parse_rsa algo output consistent with ecdsa algo out…
ianhundere Jul 24, 2023
382cc8d
refactor: Adds _get_aws_signing_algo to cleanup _get_keytype_and_sche…
ianhundere Jul 31, 2023
d9affda
docs: Updates docstring.
ianhundere Jul 31, 2023
080c7e1
refactor: Clarifies return of aws_signing_scheme.
ianhundere Jul 31, 2023
18f75dc
refactor: Simplifies logic and cleans up docstrings.
ianhundere Aug 1, 2023
12e8829
refactor: Resolves tests.
ianhundere Aug 1, 2023
e314c02
refactor: Resolves tests.
ianhundere Aug 1, 2023
51c54d4
refactor: Resolves tests.
ianhundere Aug 1, 2023
1c4261f
refactor: Removes unnecessary keys from keytypes_and_schemes in _get_…
ianhundere Aug 1, 2023
cb4fb63
refactor: Simplifies _get_keytype_and_scheme and renames it to _get_k…
ianhundere Aug 2, 2023
e616b12
docs: Updates docstring.
ianhundere Aug 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ ignore_missing_imports = True

[mypy-azure.*]
ignore_missing_imports = True

[mypy-boto3.*]
ignore_missing_imports = True

[mypy-botocore.*]
ignore_missing_imports = True
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Issues = "https://github.com/secure-systems-lab/securesystemslib/issues"
crypto = ["cryptography>=40.0.0"]
gcpkms = ["google-cloud-kms", "cryptography>=40.0.0"]
azurekms = ["azure-identity", "azure-keyvault-keys", "cryptography>=40.0.0"]
awskms = ["boto3", "botocore", "cryptography>=40.0.0"]
hsm = ["asn1crypto", "cryptography>=40.0.0", "PyKCS11"]
pynacl = ["pynacl>1.2.0"]
PySPX = ["PySPX>=0.5.0"]
Expand Down
2 changes: 2 additions & 0 deletions securesystemslib/signer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
This module provides extensible interfaces for public keys and signers:
Some implementations are provided by default but more can be added by users.
"""
from securesystemslib.signer._aws_signer import AWSSigner
from securesystemslib.signer._azure_signer import AzureSigner
from securesystemslib.signer._gcp_signer import GCPSigner
from securesystemslib.signer._gpg_signer import GPGKey, GPGSigner
Expand Down Expand Up @@ -32,6 +33,7 @@
HSMSigner.SCHEME: HSMSigner,
GPGSigner.SCHEME: GPGSigner,
AzureSigner.SCHEME: AzureSigner,
AWSSigner.SCHEME: AWSSigner,
}
)

Expand Down
240 changes: 240 additions & 0 deletions securesystemslib/signer/_aws_signer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
"""Signer implementation for AWS Key Management Service"""

import logging
from typing import Optional, Tuple
from urllib import parse

import securesystemslib.hash as sslib_hash
from securesystemslib import exceptions
from securesystemslib.exceptions import UnsupportedLibraryError
from securesystemslib.signer._key import Key
from securesystemslib.signer._signer import (
SecretsHandler,
Signature,
Signer,
SSlibKey,
)

logger = logging.getLogger(__name__)

AWS_IMPORT_ERROR = None
try:
import boto3
from botocore.exceptions import BotoCoreError, ClientError
from cryptography.hazmat.primitives import serialization
except ImportError:
AWS_IMPORT_ERROR = "Signing with AWS KMS requires aws-kms and cryptography."


class AWSSigner(Signer):
"""AWS Key Management Service Signer

This Signer uses AWS KMS to sign and supports signing with RSA/EC keys and
uses "ambient" credentials typically environment variables such as
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN. These will
be recognized by the boto3 SDK, which underlies the aws_kms Python module.

For more details on AWS authentication, refer to the AWS Command Line
Interface User Guide:
https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html

Some practical authentication options include:
AWS CLI: https://aws.amazon.com/cli/
AWS SDKs: https://aws.amazon.com/tools/

The specific permissions that AWS KMS signer needs are:
kms:Sign for sign()
kms:GetPublicKey for import()

Arguments:
aws_key_id (str): AWS KMS key ID or alias.
public_key (Key): The related public key instance.

Returns:
AWSSigner: An instance of the AWSSigner class.

Raises:
UnsupportedAlgorithmError: If the payload hash algorithm is unsupported.
BotoCoreError: Errors from the botocore.exceptions library.
ClientError: Errors related to AWS KMS client.
UnsupportedLibraryError: If necessary libraries for AWS KMS are not available.
"""

SCHEME = "awskms"

def __init__(self, aws_key_id: str, public_key: Key):
if AWS_IMPORT_ERROR:
raise UnsupportedLibraryError(AWS_IMPORT_ERROR)

self.hash_algorithm = self._get_hash_algorithm(public_key)
self.aws_key_id = aws_key_id
self.public_key = public_key
self.client = boto3.client("kms")
self.aws_algo = self._get_aws_signing_algo(self.public_key.scheme)

@classmethod
def from_priv_key_uri(
cls,
priv_key_uri: str,
public_key: Key,
secrets_handler: Optional[SecretsHandler] = None,
) -> "AWSSigner":
uri = parse.urlparse(priv_key_uri)

if uri.scheme != cls.SCHEME:
raise ValueError(f"AWSSigner does not support {priv_key_uri}")

return cls(uri.path, public_key)

@classmethod
def import_(cls, aws_key_id: str, local_scheme: str) -> Tuple[str, Key]:
"""Loads a key and signer details from AWS KMS.

Returns the private key uri and the public key. This method should only
be called once per key: the uri and Key should be stored for later use.

Arguments:
aws_key_id (str): AWS KMS key ID.
local_scheme (str): Local scheme to use.

Returns:
Tuple[str, Key]: A tuple where the first element is a string
representing the private key URI, and the second element is an
instance of the public key.

Raises:
UnsupportedAlgorithmError: If the AWS KMS signing algorithm is
unsupported.
BotoCoreError: Errors from the botocore.exceptions library.
ClientError: Errors related to AWS KMS client.
"""
if AWS_IMPORT_ERROR:
raise UnsupportedLibraryError(AWS_IMPORT_ERROR)

client = boto3.client("kms")
request = client.get_public_key(KeyId=aws_key_id)
kms_pubkey = serialization.load_der_public_key(request["PublicKey"])

public_key_pem = kms_pubkey.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("utf-8")
try:
keytype = cls._get_keytype_for_scheme(local_scheme)
except KeyError as e:
raise exceptions.UnsupportedAlgorithmError(
f"{local_scheme} is not a supported signing algorithm"
) from e

keyval = {"public": public_key_pem}
keyid = cls._get_keyid(keytype, local_scheme, keyval)
public_key = SSlibKey(keyid, keytype, local_scheme, keyval)
return f"{cls.SCHEME}:{aws_key_id}", public_key

@staticmethod
def _get_keytype_for_scheme(
scheme: str,
) -> str:
"""Returns the Secure Systems Library key type.

Arguments:
(str): The Secure Systems Library scheme.

Returns:
str: The Secure Systems Library key type.
"""
keytype_for_scheme = {
"ecdsa-sha2-nistp256": "ecdsa",
"ecdsa-sha2-nistp384": "ecdsa",
"ecdsa-sha2-nistp512": "ecdsa",
"rsassa-pss-sha256": "rsa",
"rsassa-pss-sha384": "rsa",
"rsassa-pss-sha512": "rsa",
"rsa-pkcs1v15-sha256": "rsa",
"rsa-pkcs1v15-sha384": "rsa",
"rsa-pkcs1v15-sha512": "rsa",
}
return keytype_for_scheme[scheme]

@staticmethod
def _get_aws_signing_algo(
scheme: str,
) -> str:
"""Returns AWS signing algorithm

Arguments:
scheme (str): The Secure Systems Library signing scheme.

Returns:
str: AWS signing scheme.
"""
aws_signing_algorithms = {
"ecdsa-sha2-nistp256": "ECDSA_SHA_256",
"ecdsa-sha2-nistp384": "ECDSA_SHA_384",
"ecdsa-sha2-nistp512": "ECDSA_SHA_512",
"rsassa-pss-sha256": "RSASSA_PSS_SHA_256",
"rsassa-pss-sha384": "RSASSA_PSS_SHA_384",
"rsassa-pss-sha512": "RSASSA_PSS_SHA_512",
"rsa-pkcs1v15-sha256": "RSASSA_PKCS1_V1_5_SHA_256",
"rsa-pkcs1v15-sha384": "RSASSA_PKCS1_V1_5_SHA_384",
"rsa-pkcs1v15-sha512": "RSASSA_PKCS1_V1_5_SHA_512",
}
return aws_signing_algorithms[scheme]

@staticmethod
def _get_hash_algorithm(public_key: Key) -> str:
"""Helper function to return payload hash algorithm used for this key

Arguments:
public_key (Key): Public key object

Returns:
str: Hash algorithm
"""
if public_key.keytype == "rsa":
# hash algorithm is encoded as last scheme portion
algo = public_key.scheme.split("-")[-1]
if public_key.keytype in [
"ecdsa",
"ecdsa-sha2-nistp256",
"ecdsa-sha2-nistp384",
]:
# nistp256 uses sha-256, nistp384 uses sha-384
bits = public_key.scheme.split("-nistp")[-1]
algo = f"sha{bits}"

# trigger UnsupportedAlgorithm if appropriate
_ = sslib_hash.digest(algo)
return algo

def sign(self, payload: bytes) -> Signature:
"""Sign the payload with the AWS KMS key

Arguments:
payload: bytes to be signed.

Raises:
BotoCoreError: Errors from the botocore.exceptions library.
ClientError: Errors related to AWS KMS client.

Returns:
Signature.
"""
try:
request = self.client.sign(
KeyId=self.aws_key_id,
Message=payload,
MessageType="RAW",
SigningAlgorithm=self.aws_algo,
)

hasher = sslib_hash.digest(self.hash_algorithm)
hasher.update(payload)
logger.debug("signing response %s", request)
response = request["Signature"]
logger.debug("signing response %s", response)

return Signature(self.public_key.keyid, response.hex())
except (BotoCoreError, ClientError) as e:
logger.error("Failed to sign with AWS KMS: %s", str(e))
raise e
62 changes: 62 additions & 0 deletions tests/check_aws_signer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""This module confirms that signing using AWS KMS keys works.

The purpose is to do a smoke test, not to exhaustively test every possible key
and environment combination.

For AWS, the requirements to successfully test are:
* AWS authentication details
have to be available in the environment
* The key defined in the test has to be
available to the authenticated user

Remember to replace the REDACTED fields to include the necessary values:
* keyid: Hash of the public key
* public: The public key, refer to other KMS tests to see the format
* aws_id: AWS KMS ID or alias
"""

import unittest

from securesystemslib.exceptions import UnverifiedSignatureError
from securesystemslib.signer import AWSSigner, Key, Signer


class TestAWSKMSKeys(unittest.TestCase):
"""Test that AWS KMS keys can be used to sign."""

pubkey = Key.from_dict(
"REDACTED",
{
"keytype": "rsa",
"scheme": "rsassa-pss-sha256",
"keyval": {
"public": "-----BEGIN PUBLIC KEY-----\nREDACTED\n-----END PUBLIC KEY-----\n"
},
},
)
aws_key_id = "REDACTED"

def test_aws_sign(self):
"""Test that AWS KMS key works for signing"""

data = "data".encode("utf-8")

signer = Signer.from_priv_key_uri(
f"awskms:{self.aws_key_id}", self.pubkey
)
sig = signer.sign(data)

self.pubkey.verify_signature(sig, data)
with self.assertRaises(UnverifiedSignatureError):
self.pubkey.verify_signature(sig, b"NOT DATA")

def test_aws_import(self):
"""Test that AWS KMS key can be imported"""

uri, key = AWSSigner.import_(self.aws_key_id, self.pubkey.scheme)
self.assertEqual(key.keytype, self.pubkey.keytype)
self.assertEqual(uri, f"awskms:{self.aws_key_id}")


if __name__ == "__main__":
unittest.main(verbosity=1)
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ deps =
commands =
python -m tests.check_gpg_available
coverage run tests/aggregate_tests.py
coverage report -m --fail-under 85
coverage report -m --fail-under 83

[testenv:purepy311]
deps =
Expand Down