diff --git a/pyproject.toml b/pyproject.toml index 130e3e72..015b0a34 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ Issues = "https://github.com/secure-systems-lab/securesystemslib/issues" [project.optional-dependencies] crypto = ["cryptography>=37.0.0"] -gcpkms = ["google-cloud-kms"] +gcpkms = ["google-cloud-kms", "cryptography>=37.0.0"] pynacl = ["pynacl>1.2.0"] PySPX = ["PySPX==0.5.0"] asn1 = ["asn1crypto"] diff --git a/securesystemslib/signer/_gcp_signer.py b/securesystemslib/signer/_gcp_signer.py index 1aa61bc4..5b8e7cac 100644 --- a/securesystemslib/signer/_gcp_signer.py +++ b/securesystemslib/signer/_gcp_signer.py @@ -1,19 +1,26 @@ """Signer implementation for Google Cloud KMS""" import logging -from typing import Optional +from typing import Optional, Tuple from urllib import parse import securesystemslib.hash as sslib_hash from securesystemslib import exceptions +from securesystemslib.keys import _get_keyid from securesystemslib.signer._key import Key -from securesystemslib.signer._signer import SecretsHandler, Signature, Signer +from securesystemslib.signer._signer import ( + SecretsHandler, + Signature, + Signer, + SSlibKey, +) logger = logging.getLogger(__name__) GCP_IMPORT_ERROR = None try: from google.cloud import kms + from google.cloud.kms_v1.types import CryptoKeyVersion except ImportError: GCP_IMPORT_ERROR = ( "google-cloud-kms library required to sign with Google Cloud keys." @@ -29,9 +36,14 @@ class GCPSigner(Signer): The signer uses "ambient" credentials: typically environment var GOOGLE_APPLICATION_CREDENTIALS that points to a file with valid credentials. These will be found by google.cloud.kms, see - https://cloud.google.com/docs/authentication/getting-started - (and https://github.com/google-github-actions/auth for the relevant - GitHub action). + https://cloud.google.com/docs/authentication/getting-started. + Some practical authentication options include: + * GitHub Action: https://github.com/google-github-actions/auth + * gcloud CLI: https://cloud.google.com/sdk/gcloud + + The specific permissions that GCPSigner needs are: + * roles/cloudkms.signer for sign() + * roles/cloudkms.publicKeyViewer for import() Arguments: gcp_keyid: Fully qualified GCP KMS key name, like @@ -71,6 +83,79 @@ def from_priv_key_uri( return cls(uri.path, public_key) + @classmethod + def import_(cls, gcp_keyid: str) -> Tuple[str, Key]: + """Load key and signer details from KMS + + Returns the private key uri and the public key. This method should only + be called once per key: the uri and Key should be stored for later use. + """ + if GCP_IMPORT_ERROR: + raise exceptions.UnsupportedLibraryError(GCP_IMPORT_ERROR) + + client = kms.KeyManagementServiceClient() + request = {"name": gcp_keyid} + kms_pubkey = client.get_public_key(request) + try: + keytype, scheme = cls._get_keytype_and_scheme(kms_pubkey.algorithm) + except KeyError as e: + raise exceptions.UnsupportedAlgorithmError( + f"{kms_pubkey.algorithm} is not a supported signing algorithm" + ) from e + + keyval = {"public": kms_pubkey.pem} + keyid = _get_keyid(keytype, scheme, keyval) + public_key = SSlibKey(keyid, keytype, scheme, keyval) + + return f"{cls.SCHEME}:{gcp_keyid}", public_key + + @staticmethod + def _get_keytype_and_scheme(algorithm: int) -> Tuple[str, str]: + """Return keytype and scheme for the KMS algorithm enum""" + keytypes_and_schemes = { + CryptoKeyVersion.CryptoKeyVersionAlgorithm.EC_SIGN_P256_SHA256: ( + "ecdsa", + "ecdsa-sha2-nistp256", + ), + CryptoKeyVersion.CryptoKeyVersionAlgorithm.EC_SIGN_P384_SHA384: ( + "ecdsa", + "ecdsa-sha2-nistp384", + ), + CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_2048_SHA256: ( + "rsa", + "rsassa-pss-sha256", + ), + CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_3072_SHA256: ( + "rsa", + "rsassa-pss-sha256", + ), + CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_4096_SHA256: ( + "rsa", + "rsassa-pss-sha256", + ), + CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_4096_SHA512: ( + "rsa", + "rsassa-pss-sha512", + ), + CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_2048_SHA256: ( + "rsa", + "rsa-pkcs1v15-sha256", + ), + CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_3072_SHA256: ( + "rsa", + "rsa-pkcs1v15-sha256", + ), + CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_4096_SHA256: ( + "rsa", + "rsa-pkcs1v15-sha256", + ), + CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_4096_SHA512: ( + "rsa", + "rsa-pkcs1v15-sha512", + ), + } + return keytypes_and_schemes[algorithm] + @staticmethod def _get_hash_algorithm(public_key: Key) -> str: """Helper function to return payload hash algorithm used for this key""" diff --git a/securesystemslib/signer/_hsm_signer.py b/securesystemslib/signer/_hsm_signer.py index 95343c15..fc83f01f 100644 --- a/securesystemslib/signer/_hsm_signer.py +++ b/securesystemslib/signer/_hsm_signer.py @@ -11,6 +11,7 @@ from securesystemslib import KEY_TYPE_ECDSA from securesystemslib.exceptions import UnsupportedLibraryError +from securesystemslib.keys import _get_keyid from securesystemslib.signer._key import Key, SSlibKey from securesystemslib.signer._signature import Signature from securesystemslib.signer._signer import SecretsHandler, Signer @@ -191,13 +192,14 @@ def _find_key_values( return ECDomainParameters.load(bytes(params)), bytes(point) @classmethod - def pubkey_from_hsm( - cls, sslib_keyid: str, hsm_keyid: Optional[int] = None - ) -> SSlibKey: - """Export public key from HSM. + def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]: + """Import public key and signer details from HSM. + + Returns a private key URI (for Signer.from_priv_key_uri()) and a public + key. import_() should be called once and the returned URI and public + key should be stored for later use. Arguments: - sslib_keyid: Key identifier that is unique within the metadata it is used in. hsm_keyid: Key identifier on the token. Default is 2 (meaning PIV key slot 9c). Raises: @@ -240,12 +242,12 @@ def pubkey_from_hsm( .decode() ) - return SSlibKey( - sslib_keyid, - KEY_TYPE_ECDSA, - _SCHEME_FOR_CURVE[curve], - {"public": public_pem}, - ) + keyval = {"public": public_pem} + scheme = _SCHEME_FOR_CURVE[curve] + keyid = _get_keyid(KEY_TYPE_ECDSA, scheme, keyval) + key = SSlibKey(keyid, KEY_TYPE_ECDSA, scheme, keyval) + + return "hsm:", key @classmethod def from_priv_key_uri( diff --git a/tests/check_kms_signers.py b/tests/check_kms_signers.py index 0674dc86..236660cc 100644 --- a/tests/check_kms_signers.py +++ b/tests/check_kms_signers.py @@ -18,13 +18,25 @@ import unittest from securesystemslib.exceptions import UnverifiedSignatureError -from securesystemslib.signer import Key, Signer +from securesystemslib.signer import GCPSigner, Key, Signer class TestKMSKeys(unittest.TestCase): """Test that KMS keys can be used to sign.""" - def test_gcp(self): + pubkey = Key.from_dict( + "218611b80052667026c221f8774249b0f6b8b310d30a5c45a3b878aa3a02f39e", + { + "keytype": "ecdsa", + "scheme": "ecdsa-sha2-nistp256", + "keyval": { + "public": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/ptvrXYuUc2ZaKssHhtg/IKNbO1X\ncDWlbKqLNpaK62MKdOwDz1qlp5AGHZkTY9tO09iq1F16SvVot1BQ9FJ2dw==\n-----END PUBLIC KEY-----\n" + }, + }, + ) + gcp_id = "projects/python-tuf-kms/locations/global/keyRings/securesystemslib-tests/cryptoKeys/ecdsa-sha2-nistp256/cryptoKeyVersions/1" + + def test_gcp_sign(self): """Test that GCP KMS key works for signing NOTE: The KMS account is setup to only accept requests from the @@ -35,25 +47,27 @@ def test_gcp(self): """ data = "data".encode("utf-8") - pubkey = Key.from_dict( - "abcd", - { - "keyid": "abcd", - "keytype": "ecdsa", - "scheme": "ecdsa-sha2-nistp256", - "keyval": { - "public": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/ptvrXYuUc2ZaKssHhtg/IKNbO1X\ncDWlbKqLNpaK62MKdOwDz1qlp5AGHZkTY9tO09iq1F16SvVot1BQ9FJ2dw==\n-----END PUBLIC KEY-----\n" - }, - }, - ) - gcp_id = "projects/python-tuf-kms/locations/global/keyRings/securesystemslib-tests/cryptoKeys/ecdsa-sha2-nistp256/cryptoKeyVersions/1" - signer = Signer.from_priv_key_uri(f"gcpkms:{gcp_id}", pubkey) + signer = Signer.from_priv_key_uri(f"gcpkms:{self.gcp_id}", self.pubkey) sig = signer.sign(data) - pubkey.verify_signature(sig, data) + self.pubkey.verify_signature(sig, data) with self.assertRaises(UnverifiedSignatureError): - pubkey.verify_signature(sig, b"NOT DATA") + self.pubkey.verify_signature(sig, b"NOT DATA") + + def test_gcp_import(self): + """Test that GCP KMS key can be imported + + NOTE: The KMS account is setup to only accept requests from the + Securesystemslib GitHub Action environment: test cannot pass elsewhere. + + In case of problems with KMS account, please file an issue and + assign @jku. + """ + + uri, key = GCPSigner.import_(self.gcp_id) + self.assertEqual(key, self.pubkey) + self.assertEqual(uri, f"gcpkms:{self.gcp_id}") if __name__ == "__main__": diff --git a/tests/test_hsm_signer.py b/tests/test_hsm_signer.py index 5e8dfbae..6d7cd859 100644 --- a/tests/test_hsm_signer.py +++ b/tests/test_hsm_signer.py @@ -51,7 +51,6 @@ class TestHSM(unittest.TestCase): See .github/workflows/hsm.yml for how this can be done on Linux, macOS and Windows. """ - sslib_keyid = "a" * 64 # Mock SSlibKey conform sha256 hex digest keyid hsm_keyid = 1 hsm_keyid_default = 2 hsm_user_pin = "123456" @@ -139,7 +138,7 @@ def test_hsm(self): """Test HSM key export and signing.""" for hsm_keyid in [self.hsm_keyid, self.hsm_keyid_default]: - key = HSMSigner.pubkey_from_hsm(self.sslib_keyid, hsm_keyid) + _, key = HSMSigner.import_(hsm_keyid) signer = HSMSigner(hsm_keyid, key, lambda sec: self.hsm_user_pin) sig = signer.sign(b"DATA") key.verify_signature(sig, b"DATA") @@ -150,11 +149,9 @@ def test_hsm(self): def test_hsm_uri(self): """Test HSM default key export and signing from URI.""" - key = HSMSigner.pubkey_from_hsm( - self.sslib_keyid, self.hsm_keyid_default - ) + uri, key = HSMSigner.import_(self.hsm_keyid_default) signer = Signer.from_priv_key_uri( - "hsm:", key, lambda sec: self.hsm_user_pin + uri, key, lambda sec: self.hsm_user_pin ) sig = signer.sign(b"DATA") key.verify_signature(sig, b"DATA")