From c2cfd3b26338ec812a448bf2fbc145fbe93e0f33 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Mon, 24 Oct 2022 16:07:54 +0300 Subject: [PATCH 01/21] Signer: Implement a private key uri scheme The scheme allows instantiating signers and signing with them without knowing the signer or privtae key type beforehand., Signers can now be used like this: # demo setup pubkey = generate_ecdsa_key() privkey = pubkey["keyval"].pop("private") os.environ["myprivatekey"] = privkey # sign: Correct Signer subclass is chosen based on the URI signer = Signer.from_priv_key_uri("envvar://myprivatekey", pubkey) sig = signer.sign(b"data") Just three schemes are supported by default but more can be added here and exernally, in applications. Default schemes: envvar: read private key content from environment variable file: read private key content from file encfile: read encrypted private key content from file, decrypt with passphrase from application --- securesystemslib/signer.py | 189 ++++++++++++++++++++++++++++++++----- tests/test_signer.py | 108 ++++++++++++++++----- 2 files changed, 251 insertions(+), 46 deletions(-) diff --git a/securesystemslib/signer.py b/securesystemslib/signer.py index 4b9e6c68..2fa28886 100644 --- a/securesystemslib/signer.py +++ b/securesystemslib/signer.py @@ -6,10 +6,19 @@ """ import abc -from typing import Any, Dict, Mapping, Optional +import copy +import os +from typing import Any, Callable, Dict, Mapping, Optional +from urllib import parse import securesystemslib.gpg.functions as gpg import securesystemslib.keys as sslib_keys +from securesystemslib import formats + +# NOTE This dictionary is initialized here so it's available to Signer, but +# filled at end of file when Signer subclass definitions are available. +# Users can add their own Signer implementations into this dictionary +SIGNER_FOR_URI_SCHEME: Dict[str, "Signer"] = {} class Signature: @@ -137,8 +146,24 @@ def to_dict(self) -> Dict: } +# SecretsHandler is a function the calling code can provide to Signer: +# If Signer needs secrets from user, the function will be called +SecretsHandler = Callable[[str], str] + + class Signer: - """Signer interface created to support multiple signing implementations.""" + """Signer interface that supports multiple signing implementations. + + Usage example: + signer = Signer.from_priv_key_uri("envvar:MYPRIVKEY", pub_key) + sig = signer.sign(b"data") + + See SIGNER_FOR_URI_SCHEME for supported private key URI schemes. The + currently supported default schemes are: + * envvar: see SSlibSigner for details + * file: see SSlibSigner for details + * encfile: see SSlibSigner for details + """ __metaclass__ = abc.ABCMeta @@ -154,42 +179,138 @@ def sign(self, payload: bytes) -> Signature: """ raise NotImplementedError # pragma: no cover + @classmethod + @abc.abstractmethod + def new_from_uri( + cls, + priv_key_uri: str, + public_key: Dict[str, Any], + secrets_handler: SecretsHandler, + ) -> "Signer": + """Constructor for given private key URI + + This is a semi-private method meant to be called by Signer only. + Implementation is required if the Signer subclass is in + SIGNER_FOR_URI_SCHEME. + + Arguments: + priv_key_uri: URI that identifies the private key and signer + public_key: Public key metadata conforming to PUBLIC_KEY_SCHEMA + secrets_handler: Optional function that may be called if the + signer needs additional secrets (like a PIN or passphrase) + """ + raise NotImplementedError # pragma: no cover + + @staticmethod + def from_priv_key_uri( + priv_key_uri: str, + public_key: Dict[str, Any], + secrets_handler: Optional[SecretsHandler] = None, + ): + """Returns a concrete Signer implementation based on private key URI + + Args: + priv_key_uri: URI that identifies the private key location and signer + public_key: Public key metadata conforming to PUBLIC_KEY_SCHEMA + """ + + formats.PUBLIC_KEY_SCHEMA.check_match(public_key) + scheme, _, _ = priv_key_uri.partition(":") + if scheme not in SIGNER_FOR_URI_SCHEME: + raise ValueError(f"Unsupported private key scheme {scheme}") + + signer = SIGNER_FOR_URI_SCHEME[scheme] + return signer.new_from_uri(priv_key_uri, public_key, secrets_handler) + class SSlibSigner(Signer): """A securesystemslib signer implementation. Provides a sign method to generate a cryptographic signature with a - securesystemslib-style rsa, ed25519 or ecdsa private key on the instance. - The signature scheme is determined by the key and must be one of: - - - rsa(ssa-pss|pkcs1v15)-(md5|sha1|sha224|sha256|sha384|sha512) (12 schemes) - - ed25519 - - ecdsa-sha2-nistp256 - - See "securesystemslib.interface" for functions to generate and load keys. + securesystemslib-style rsa, ed25519 or ecdsa key. See keys module + for the supported types, schemes and hash algorithms. + + SSlibSigners should be instantiated with Signer.from_priv_key_uri(). + Two private key URI schemes are supported: + * envvar:: + VAR is an environment variable that contains the private key content. + envvar:MYPRIVKEY + * file:: + PATH is a file path to a file that contains private key content. + file:path/to/file + * encfile:: + The the private key content in PATH has been encrypted with + keys.encryot_key(). Application provided SecretsHandler will be + called to get the passphrase. + file:/path/to/encrypted/file Attributes: key_dict: A securesystemslib-style key dictionary, which includes a keyid, - key type, signature scheme, and the public and private key values, - e.g.:: - - { - "keytype": "rsa", - "scheme": "rsassa-pss-sha256", - "keyid": "f30a0870d026980100c0573bd557394f8c1bbd6...", - "keyval": { - "public": "-----BEGIN RSA PUBLIC KEY----- ...", - "private": "-----BEGIN RSA PRIVATE KEY----- ..." - } - } - - The public and private keys are strings in PEM format. + key type, scheme, and keyval with both private and public parts. """ + ENVVAR_URI_SCHEME = "envvar" + FILE_URI_SCHEME = "file" + ENC_FILE_URI_SCHEME = "encfile" + def __init__(self, key_dict: Dict): self.key_dict = key_dict + @classmethod + def new_from_uri( + cls, + priv_key_uri: str, + public_key: Dict[str, Any], + secrets_handler: SecretsHandler, + ) -> "SSlibSigner": + """Semi-private Constructor for Signer to call + + Arguments: + priv_key_uri: private key URI described in class doc + public_key: securesystemslib-style key dict, which includes keyid, + type, scheme, and keyval the public key. + + Raises: + OSError: Reading the file failed with "file:" URI + ValueError: URI is unsupported or environment variable was not set + with "envvar:" URIs + + Returns: + SSlibSigner for the given private key URI. + """ + keydict = copy.deepcopy(public_key) + uri = parse.urlparse(priv_key_uri) + + if uri.scheme == cls.ENVVAR_URI_SCHEME: + # read private key from environment variable + private = os.getenv(uri.path) + if private is None: + raise ValueError( + f"Unset private key variable for {priv_key_uri}" + ) + + elif uri.scheme == cls.FILE_URI_SCHEME: + # read private key from file + with open(uri.path, "rb") as f: + private = f.read().decode() + + elif uri.scheme == cls.ENC_FILE_URI_SCHEME: + # read key from file, ask for passphrase, decrypt + with open(uri.path, "rb") as f: + enc = f.read().decode() + secret = secrets_handler("passphrase") + decrypted = sslib_keys.decrypt_key(enc, secret) + private = decrypted["keyval"]["private"] + + else: + raise ValueError( + f"SSlibSigner does not support priv key uri {priv_key_uri}" + ) + + keydict["keyval"]["private"] = private + return cls(keydict) + def sign(self, payload: bytes) -> Signature: """Signs a given payload by the key assigned to the SSlibSigner instance. @@ -205,7 +326,6 @@ def sign(self, payload: bytes) -> Signature: Returns: Returns a "Signature" class instance. """ - sig_dict = sslib_keys.create_signature(self.key_dict, payload) return Signature(**sig_dict) @@ -231,6 +351,17 @@ def __init__( self.keyid = keyid self.homedir = homedir + @classmethod + def new_from_uri( + cls, + priv_key_uri: str, + public_key: Dict[str, Any], + secrets_handler: SecretsHandler, + ) -> Signer: + # GPGSigner uses keys and produces signature dicts that are not + # compliant with TUF or intoto specifications: not useful here + raise NotImplementedError() + def sign(self, payload: bytes) -> GPGSignature: """Signs a given payload by the key assigned to the GPGSigner instance. @@ -266,3 +397,11 @@ def sign(self, payload: bytes) -> GPGSignature: sig_dict = gpg.create_signature(payload, self.keyid, self.homedir) return GPGSignature(**sig_dict) + + +# signer implementations are now defined: Add them to the lookup table +SIGNER_FOR_URI_SCHEME = { + SSlibSigner.ENVVAR_URI_SCHEME: SSlibSigner, + SSlibSigner.FILE_URI_SCHEME: SSlibSigner, + SSlibSigner.ENC_FILE_URI_SCHEME: SSlibSigner, +} diff --git a/tests/test_signer.py b/tests/test_signer.py index ecdcccd0..9c654e34 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -8,9 +8,12 @@ import tempfile import unittest -import securesystemslib.formats import securesystemslib.keys as KEYS -from securesystemslib.exceptions import FormatError, UnsupportedAlgorithmError +from securesystemslib.exceptions import ( + CryptoError, + FormatError, + UnsupportedAlgorithmError, +) from securesystemslib.gpg.constants import have_gpg from securesystemslib.gpg.functions import export_pubkey from securesystemslib.gpg.functions import verify_signature as verify_sig @@ -18,32 +21,95 @@ GPGSignature, GPGSigner, Signature, + Signer, SSlibSigner, ) -class TestSSlibSigner( - unittest.TestCase -): # pylint: disable=missing-class-docstring +class TestSigner(unittest.TestCase): + """Test Signer and SSlibSigner functionality""" + @classmethod def setUpClass(cls): - cls.rsakey_dict = KEYS.generate_rsa_key() - cls.ed25519key_dict = KEYS.generate_ed25519_key() - cls.ecdsakey_dict = KEYS.generate_ecdsa_key() - cls.sphincskey_dict = KEYS.generate_sphincs_key() - cls.DATA_STR = "SOME DATA REQUIRING AUTHENTICITY." - cls.DATA = securesystemslib.formats.encode_canonical( - cls.DATA_STR - ).encode("utf-8") - - def test_sslib_sign(self): - dicts = [ - self.rsakey_dict, - self.ecdsakey_dict, - self.ed25519key_dict, - self.sphincskey_dict, + cls.keys = [ + KEYS.generate_rsa_key(), + KEYS.generate_ed25519_key(), + KEYS.generate_ecdsa_key(), + KEYS.generate_sphincs_key(), ] - for scheme_dict in dicts: + cls.DATA = b"DATA" + + # pylint: disable=consider-using-with + cls.testdir = tempfile.TemporaryDirectory() + + @classmethod + def tearDownClass(cls): + cls.testdir.cleanup() + + def test_signer_sign_with_envvar_uri(self): + for key in self.keys: + # setup + pubkey = copy.deepcopy(key) + privkey = pubkey["keyval"].pop("private") + os.environ["PRIVKEY"] = privkey + + # test signing + signer = Signer.from_priv_key_uri("envvar:PRIVKEY", pubkey) + sig = signer.sign(self.DATA).to_dict() + + self.assertTrue(KEYS.verify_signature(pubkey, sig, self.DATA)) + self.assertFalse(KEYS.verify_signature(pubkey, sig, b"NOT DATA")) + + def test_signer_sign_with_file_uri(self): + for key in self.keys: + # setup + pubkey = copy.deepcopy(key) + privkey = pubkey["keyval"].pop("private") + # let teardownclass handle the file removal + with tempfile.NamedTemporaryFile( + dir=self.testdir.name, delete=False + ) as f: + f.write(privkey.encode()) + + # test signing + signer = Signer.from_priv_key_uri(f"file:{f.name}", pubkey) + sig = signer.sign(self.DATA).to_dict() + + self.assertTrue(KEYS.verify_signature(pubkey, sig, self.DATA)) + self.assertFalse(KEYS.verify_signature(pubkey, sig, b"NOT DATA")) + + def test_signer_sign_with_enc_file_uri(self): + for key in self.keys: + # setup + pubkey = copy.deepcopy(key) + pubkey["keyval"].pop("private") + privkey = KEYS.encrypt_key(key, "hunter2") + # let teardownclass handle the file removal + with tempfile.NamedTemporaryFile( + dir=self.testdir.name, delete=False + ) as f: + f.write(privkey.encode()) + + # test signing + def secrets_handler(secret: str) -> str: + return "hunter2" if secret == "passphrase" else "???" + + uri = f"encfile:{f.name}" + signer = Signer.from_priv_key_uri(uri, pubkey, secrets_handler) + sig = signer.sign(self.DATA).to_dict() + + self.assertTrue(KEYS.verify_signature(pubkey, sig, self.DATA)) + self.assertFalse(KEYS.verify_signature(pubkey, sig, b"NOT DATA")) + + # test wrong passphrase + def fake_handler(_) -> str: + return "12345" + + with self.assertRaises(CryptoError): + signer = Signer.from_priv_key_uri(uri, pubkey, fake_handler) + + def test_sslib_signer_sign(self): + for scheme_dict in self.keys: # Test generation of signatures. sslib_signer = SSlibSigner(scheme_dict) sig_obj = sslib_signer.sign(self.DATA) From bfeda3c6a7b386c0c6acfe5c68380516a12e3f69 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Thu, 3 Nov 2022 20:08:56 +0200 Subject: [PATCH 02/21] Signer: Add Key class This is copied from python-tuf with following changes * verify_signature() is left in python-tuf as Metadata-related * is_verified() is added instead * type of keyval is changed from Dict[str, str] to Dict[str, Any] * Some docstrings made more reasonable for SSLib Signer.from_priv_key_uri() now takes a Key as argument. SSlibSigner constructor still takes a classic keydict as constructor argument and not a Key (in order to not break API) but that is now documented as an implementation detail. --- securesystemslib/signer.py | 158 +++++++++++++++++++++++++++++++++---- tests/test_signer.py | 32 ++++---- 2 files changed, 159 insertions(+), 31 deletions(-) diff --git a/securesystemslib/signer.py b/securesystemslib/signer.py index 2fa28886..6f20cad0 100644 --- a/securesystemslib/signer.py +++ b/securesystemslib/signer.py @@ -6,14 +6,16 @@ """ import abc -import copy +import logging import os from typing import Any, Callable, Dict, Mapping, Optional from urllib import parse import securesystemslib.gpg.functions as gpg import securesystemslib.keys as sslib_keys -from securesystemslib import formats +from securesystemslib.exceptions import FormatError + +logger = logging.getLogger(__name__) # NOTE This dictionary is initialized here so it's available to Signer, but # filled at end of file when Signer subclass definitions are available. @@ -146,6 +148,136 @@ def to_dict(self) -> Dict: } +class Key: + """A container class representing the public portion of a key. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + keyid: Key identifier that is unique within the metadata it is used in. + Keyid is not verified to be the hash of a specific representation + of the key. + keytype: Key type, e.g. "rsa", "ed25519" or "ecdsa-sha2-nistp256". + scheme: Signature scheme. For example: + "rsassa-pss-sha256", "ed25519", and "ecdsa-sha2-nistp256". + keyval: Opaque key content + unrecognized_fields: Dictionary of all attributes that are not managed + by Securesystemslib + + Raises: + TypeError: Invalid type for an argument. + """ + + def __init__( + self, + keyid: str, + keytype: str, + scheme: str, + keyval: Dict[str, Any], + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + if not all( + isinstance(at, str) for at in [keyid, keytype, scheme] + ) or not isinstance(keyval, dict): + raise TypeError("Unexpected Key attributes types!") + self.keyid = keyid + self.keytype = keytype + self.scheme = scheme + self.keyval = keyval + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Key): + return False + + return ( + self.keyid == other.keyid + and self.keytype == other.keytype + and self.scheme == other.scheme + and self.keyval == other.keyval + and self.unrecognized_fields == other.unrecognized_fields + ) + + @classmethod + def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": + """Creates ``Key`` object from TUF serialization dict. + + Raises: + KeyError, TypeError: Invalid arguments. + """ + keytype = key_dict.pop("keytype") + scheme = key_dict.pop("scheme") + keyval = key_dict.pop("keyval") + # All fields left in the key_dict are unrecognized. + return cls(keyid, keytype, scheme, keyval, key_dict) + + def to_dict(self) -> Dict[str, Any]: + """Returns a dict for TUF serialization.""" + return { + "keytype": self.keytype, + "scheme": self.scheme, + "keyval": self.keyval, + **self.unrecognized_fields, + } + + def to_securesystemslib_key(self) -> Dict[str, Any]: + """Returns a classic Securesystemslib keydict""" + return { + "keyid": self.keyid, + "keytype": self.keytype, + "scheme": self.scheme, + "keyval": self.keyval, + } + + @classmethod + def from_securesystemslib_key(cls, key_dict: Dict[str, Any]) -> "Key": + """Creates a ``Key`` object from a classic securesystemlib keydict. + + Args: + key_dict: Key in securesystemlib dict representation. + + Raises: + ValueError: ``key_dict`` value is not in securesystemslib format. + """ + try: + key_meta = sslib_keys.format_keyval_to_metadata( + key_dict["keytype"], + key_dict["scheme"], + key_dict["keyval"], + ) + except FormatError as e: + raise ValueError("keydict not in securesystemslib format") from e + + return cls( + key_dict["keyid"], + key_meta["keytype"], + key_meta["scheme"], + key_meta["keyval"], + ) + + def is_verified(self, signature: Signature, data: bytes) -> bool: + """Verifies the signature over data. + + Args: + signature: Signature object. + data: Payload bytes. + + Raises: + CryptoError, FormatError, UnsupportedAlgorithmError. + + Returns True if signature is valid for this key for given data. + """ + return sslib_keys.verify_signature( + self.to_securesystemslib_key(), + signature.to_dict(), + data, + ) + + # SecretsHandler is a function the calling code can provide to Signer: # If Signer needs secrets from user, the function will be called SecretsHandler = Callable[[str], str] @@ -184,7 +316,7 @@ def sign(self, payload: bytes) -> Signature: def new_from_uri( cls, priv_key_uri: str, - public_key: Dict[str, Any], + public_key: Key, secrets_handler: SecretsHandler, ) -> "Signer": """Constructor for given private key URI @@ -195,7 +327,7 @@ def new_from_uri( Arguments: priv_key_uri: URI that identifies the private key and signer - public_key: Public key metadata conforming to PUBLIC_KEY_SCHEMA + public_key: Key object secrets_handler: Optional function that may be called if the signer needs additional secrets (like a PIN or passphrase) """ @@ -204,17 +336,16 @@ def new_from_uri( @staticmethod def from_priv_key_uri( priv_key_uri: str, - public_key: Dict[str, Any], + public_key: Key, secrets_handler: Optional[SecretsHandler] = None, ): """Returns a concrete Signer implementation based on private key URI Args: priv_key_uri: URI that identifies the private key location and signer - public_key: Public key metadata conforming to PUBLIC_KEY_SCHEMA + public_key: Key object """ - formats.PUBLIC_KEY_SCHEMA.check_match(public_key) scheme, _, _ = priv_key_uri.partition(":") if scheme not in SIGNER_FOR_URI_SCHEME: raise ValueError(f"Unsupported private key scheme {scheme}") @@ -246,8 +377,8 @@ class SSlibSigner(Signer): Attributes: key_dict: - A securesystemslib-style key dictionary, which includes a keyid, - key type, scheme, and keyval with both private and public parts. + A securesystemslib-style key dictionary. This is an implementation + detail, not part of public API """ ENVVAR_URI_SCHEME = "envvar" @@ -261,15 +392,14 @@ def __init__(self, key_dict: Dict): def new_from_uri( cls, priv_key_uri: str, - public_key: Dict[str, Any], + public_key: Key, secrets_handler: SecretsHandler, ) -> "SSlibSigner": """Semi-private Constructor for Signer to call Arguments: priv_key_uri: private key URI described in class doc - public_key: securesystemslib-style key dict, which includes keyid, - type, scheme, and keyval the public key. + public_key: Key object. Raises: OSError: Reading the file failed with "file:" URI @@ -279,7 +409,6 @@ def new_from_uri( Returns: SSlibSigner for the given private key URI. """ - keydict = copy.deepcopy(public_key) uri = parse.urlparse(priv_key_uri) if uri.scheme == cls.ENVVAR_URI_SCHEME: @@ -308,6 +437,7 @@ def new_from_uri( f"SSlibSigner does not support priv key uri {priv_key_uri}" ) + keydict = public_key.to_securesystemslib_key() keydict["keyval"]["private"] = private return cls(keydict) @@ -355,7 +485,7 @@ def __init__( def new_from_uri( cls, priv_key_uri: str, - public_key: Dict[str, Any], + public_key: Key, secrets_handler: SecretsHandler, ) -> Signer: # GPGSigner uses keys and produces signature dicts that are not diff --git a/tests/test_signer.py b/tests/test_signer.py index 9c654e34..b5edcddd 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -20,6 +20,7 @@ from securesystemslib.signer import ( GPGSignature, GPGSigner, + Key, Signature, Signer, SSlibSigner, @@ -49,40 +50,37 @@ def tearDownClass(cls): def test_signer_sign_with_envvar_uri(self): for key in self.keys: # setup - pubkey = copy.deepcopy(key) - privkey = pubkey["keyval"].pop("private") - os.environ["PRIVKEY"] = privkey + pubkey = Key.from_securesystemslib_key(key) + os.environ["PRIVKEY"] = key["keyval"]["private"] # test signing signer = Signer.from_priv_key_uri("envvar:PRIVKEY", pubkey) - sig = signer.sign(self.DATA).to_dict() + sig = signer.sign(self.DATA) - self.assertTrue(KEYS.verify_signature(pubkey, sig, self.DATA)) - self.assertFalse(KEYS.verify_signature(pubkey, sig, b"NOT DATA")) + self.assertTrue(pubkey.is_verified(sig, self.DATA)) + self.assertFalse(pubkey.is_verified(sig, b"NOT DATA")) def test_signer_sign_with_file_uri(self): for key in self.keys: # setup - pubkey = copy.deepcopy(key) - privkey = pubkey["keyval"].pop("private") + pubkey = Key.from_securesystemslib_key(key) # let teardownclass handle the file removal with tempfile.NamedTemporaryFile( dir=self.testdir.name, delete=False ) as f: - f.write(privkey.encode()) + f.write(key["keyval"]["private"].encode()) # test signing signer = Signer.from_priv_key_uri(f"file:{f.name}", pubkey) - sig = signer.sign(self.DATA).to_dict() + sig = signer.sign(self.DATA) - self.assertTrue(KEYS.verify_signature(pubkey, sig, self.DATA)) - self.assertFalse(KEYS.verify_signature(pubkey, sig, b"NOT DATA")) + self.assertTrue(pubkey.is_verified(sig, self.DATA)) + self.assertFalse(pubkey.is_verified(sig, b"NOT DATA")) def test_signer_sign_with_enc_file_uri(self): for key in self.keys: # setup - pubkey = copy.deepcopy(key) - pubkey["keyval"].pop("private") + pubkey = Key.from_securesystemslib_key(key) privkey = KEYS.encrypt_key(key, "hunter2") # let teardownclass handle the file removal with tempfile.NamedTemporaryFile( @@ -96,10 +94,10 @@ def secrets_handler(secret: str) -> str: uri = f"encfile:{f.name}" signer = Signer.from_priv_key_uri(uri, pubkey, secrets_handler) - sig = signer.sign(self.DATA).to_dict() + sig = signer.sign(self.DATA) - self.assertTrue(KEYS.verify_signature(pubkey, sig, self.DATA)) - self.assertFalse(KEYS.verify_signature(pubkey, sig, b"NOT DATA")) + self.assertTrue(pubkey.is_verified(sig, self.DATA)) + self.assertFalse(pubkey.is_verified(sig, b"NOT DATA")) # test wrong passphrase def fake_handler(_) -> str: From 61f177427285d668f7064f741550fef81c4e4263 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 4 Nov 2022 15:51:09 +0200 Subject: [PATCH 03/21] tests: Add basic smoke tests for Key python-tuf has an extensive test suite for Metadata in general: * I don't think python-tuf should stop testing Key just because it's now in securesystemslib * Maybe it's not necessary to copy e.g. all serialization tests here? Still, added a few basic tests for Key. Key is also used in the Signer tests. --- tests/test_signer.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/test_signer.py b/tests/test_signer.py index b5edcddd..23e70597 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -27,6 +27,45 @@ ) +class TestKey(unittest.TestCase): + """Key tests. See many more tests in python-tuf test suite""" + + def test_key_from_to_dict(self): + keydict = { + "keytype": "rsa", + "scheme": "rsassa-pss-sha256", + "extra": "somedata", + "keyval": { + "public": "pubkeyval", + "foo": "bar", + }, + } + + key = Key.from_dict("aa", copy.deepcopy(keydict)) + self.assertDictEqual(keydict, key.to_dict()) + + def test_key_is_verified(self): + sigdict = { + "keyid": "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", + "sig": "3fc91f5411a567d6a7f28b7fbb9ba6d60b1e2a1b64d8af0b119650015d86bb5a55e57c0e2c995a9b4a332b8f435703e934c0e6ce69fe6674a8ce68719394a40b", + } + keydict = { + "keytype": "ed25519", + "scheme": "ed25519", + "keyval": { + "public": "8ae43d22b8e0fbf4a48fa3490d31b4d389114f5dc1039c918f075427f4100759", + }, + } + key = Key.from_dict( + "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", + keydict, + ) + sig = Signature.from_dict(sigdict) + + self.assertTrue(key.is_verified(sig, b"DATA")) + self.assertFalse(key.is_verified(sig, b"NOTDATA")) + + class TestSigner(unittest.TestCase): """Test Signer and SSlibSigner functionality""" From 4dd2f7b8ec4dcb2d29d4bbacbbcda41c5f5ab7f0 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Wed, 9 Nov 2022 14:10:31 +0200 Subject: [PATCH 04/21] Key: Make Key an abtract class The idea is to allow more Key implementations, but SSlibKey provides the current implementations. --- securesystemslib/exceptions.py | 4 + securesystemslib/signer.py | 180 ++++++++++++++++++++++++--------- tests/test_signer.py | 61 ++++++----- 3 files changed, 170 insertions(+), 75 deletions(-) diff --git a/securesystemslib/exceptions.py b/securesystemslib/exceptions.py index 2ace2679..a0ef8ab9 100755 --- a/securesystemslib/exceptions.py +++ b/securesystemslib/exceptions.py @@ -138,3 +138,7 @@ class StorageError(Error): backend.""" pass # pylint: disable=unnecessary-pass + + +class UnverifiedSignatureError(Error): + """Signature could not be verified""" diff --git a/securesystemslib/signer.py b/securesystemslib/signer.py index 6f20cad0..bce3b6c6 100644 --- a/securesystemslib/signer.py +++ b/securesystemslib/signer.py @@ -8,19 +8,20 @@ import abc import logging import os -from typing import Any, Callable, Dict, Mapping, Optional +from typing import Any, Callable, Dict, Mapping, Optional, Tuple from urllib import parse import securesystemslib.gpg.functions as gpg import securesystemslib.keys as sslib_keys -from securesystemslib.exceptions import FormatError +from securesystemslib import exceptions logger = logging.getLogger(__name__) -# NOTE This dictionary is initialized here so it's available to Signer, but -# filled at end of file when Signer subclass definitions are available. -# Users can add their own Signer implementations into this dictionary +# NOTE dicts for Key and Signer dispatch are defined here, but +# filled at end of file when subclass definitions are available. +# Users can add their own implementations into these dictionaries SIGNER_FOR_URI_SCHEME: Dict[str, "Signer"] = {} +KEY_FOR_TYPE_AND_SCHEME: Dict[Tuple[str, str], "Key"] = {} class Signature: @@ -149,7 +150,7 @@ def to_dict(self) -> Dict: class Key: - """A container class representing the public portion of a key. + """Abstract class representing the public portion of a key. *All parameters named below are not just constructor arguments but also instance attributes.* @@ -169,6 +170,8 @@ class Key: TypeError: Invalid type for an argument. """ + __metaclass__ = abc.ABCMeta + def __init__( self, keyid: str, @@ -203,20 +206,29 @@ def __eq__(self, other: Any) -> bool: ) @classmethod + @abc.abstractmethod def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": """Creates ``Key`` object from TUF serialization dict. + Key implementations must override this factory constructor. + Raises: KeyError, TypeError: Invalid arguments. """ - keytype = key_dict.pop("keytype") - scheme = key_dict.pop("scheme") - keyval = key_dict.pop("keyval") - # All fields left in the key_dict are unrecognized. - return cls(keyid, keytype, scheme, keyval, key_dict) + keytype = key_dict["keytype"] + scheme = key_dict["scheme"] + + if (keytype, scheme) not in KEY_FOR_TYPE_AND_SCHEME: + raise ValueError(f"Unsupported public key {keytype}/{scheme}") + + key_impl = KEY_FOR_TYPE_AND_SCHEME[(keytype, scheme)] + return key_impl.from_dict(keyid, key_dict) def to_dict(self) -> Dict[str, Any]: - """Returns a dict for TUF serialization.""" + """Returns a dict for TUF serialization. + + Key implementations may override this method. + """ return { "keytype": self.keytype, "scheme": self.scheme, @@ -224,8 +236,50 @@ def to_dict(self) -> Dict[str, Any]: **self.unrecognized_fields, } + @abc.abstractmethod + def verify_signature(self, signature: Signature, data: bytes) -> None: + """Verifies the signature over data. + + Args: + signature: Signature object. + data: Payload bytes. + + Raises: + UnverifiedSignatureError: Failed to verify signature, either + because it was incorrect or because of a verification problem. + """ + raise NotImplementedError + + @abc.abstractmethod + def get_payload_hash_algorithm(self) -> str: + """Return the payload hash algorithm + + This is used by Signers where the actual signing system only accepts + hashes of payloads: e.g. HSM and KMS + + """ + # TODO Do all signing systems support payload prehash? like ed25519? + raise NotImplementedError + + @abc.abstractmethod + def match_keyid(self, keyid: str) -> bool: + """Does given keyid match this keys keyid + + This is a workaround for GPSignature design features. + """ + # TODO is this reeally really needed? + raise NotImplementedError + + +# TODO verify_signature software errors should have a error of their own? +# Maybe something deriving from UnverifiedSignature? + + +class SSlibKey(Key): + """Key implementation for RSA, Ed25519, ECDSA and Sphincs keys""" + def to_securesystemslib_key(self) -> Dict[str, Any]: - """Returns a classic Securesystemslib keydict""" + """Internal helper function""" return { "keyid": self.keyid, "keytype": self.keytype, @@ -234,48 +288,49 @@ def to_securesystemslib_key(self) -> Dict[str, Any]: } @classmethod - def from_securesystemslib_key(cls, key_dict: Dict[str, Any]) -> "Key": - """Creates a ``Key`` object from a classic securesystemlib keydict. - - Args: - key_dict: Key in securesystemlib dict representation. - - Raises: - ValueError: ``key_dict`` value is not in securesystemslib format. - """ - try: - key_meta = sslib_keys.format_keyval_to_metadata( - key_dict["keytype"], - key_dict["scheme"], - key_dict["keyval"], - ) - except FormatError as e: - raise ValueError("keydict not in securesystemslib format") from e - - return cls( + def from_securesystemslib_key(cls, key_dict: Dict[str, Any]) -> "SSlibKey": + """Constructor from classic securesystemslib keydict""" + return SSlibKey( key_dict["keyid"], - key_meta["keytype"], - key_meta["scheme"], - key_meta["keyval"], + key_dict["keytype"], + key_dict["scheme"], + key_dict["keyval"], ) - def is_verified(self, signature: Signature, data: bytes) -> bool: - """Verifies the signature over data. + @classmethod + def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "SSlibKey": + keytype = key_dict.pop("keytype") + scheme = key_dict.pop("scheme") + keyval = key_dict.pop("keyval") + # All fields left in the key_dict are unrecognized. + return cls(keyid, keytype, scheme, keyval, key_dict) - Args: - signature: Signature object. - data: Payload bytes. + def verify_signature(self, signature: Signature, data: bytes) -> None: + try: + if not sslib_keys.verify_signature( + self.to_securesystemslib_key(), + signature.to_dict(), + data, + ): + raise exceptions.UnverifiedSignatureError( + f"Failed to verify signature by {self.keyid}" + ) + except ( + exceptions.CryptoError, + exceptions.FormatError, + exceptions.UnsupportedAlgorithmError, + ) as e: + # Log unexpected failure, but continue as if there was no signature + logger.info("Key %s failed to verify sig: %s", self.keyid, str(e)) + raise exceptions.UnverifiedSignatureError( + f"Unknown failure to verify signature by {self.keyid}" + ) from e - Raises: - CryptoError, FormatError, UnsupportedAlgorithmError. + def get_payload_hash_algorithm(self) -> str: + raise NotImplementedError - Returns True if signature is valid for this key for given data. - """ - return sslib_keys.verify_signature( - self.to_securesystemslib_key(), - signature.to_dict(), - data, - ) + def match_keyid(self, keyid: str) -> bool: + raise NotImplementedError("TODO -- this seems pointless...") # SecretsHandler is a function the calling code can provide to Signer: @@ -409,6 +464,11 @@ def new_from_uri( Returns: SSlibSigner for the given private key URI. """ + if not isinstance(public_key, SSlibKey): + raise ValueError( + f"Expected SSlibKey public key for private key {priv_key_uri}" + ) + uri = parse.urlparse(priv_key_uri) if uri.scheme == cls.ENVVAR_URI_SCHEME: @@ -529,9 +589,29 @@ def sign(self, payload: bytes) -> GPGSignature: return GPGSignature(**sig_dict) -# signer implementations are now defined: Add them to the lookup table +# signer and key implementations are now defined: Add them to the lookup table SIGNER_FOR_URI_SCHEME = { SSlibSigner.ENVVAR_URI_SCHEME: SSlibSigner, SSlibSigner.FILE_URI_SCHEME: SSlibSigner, SSlibSigner.ENC_FILE_URI_SCHEME: SSlibSigner, } +KEY_FOR_TYPE_AND_SCHEME = { + ("ecdsa", "ecdsa-sha2-nistp256"): SSlibKey, + ("ecdsa", "ecdsa-sha2-nistp384"): SSlibKey, + ("ecdsa-sha2-nistp256", "ecdsa-sha2-nistp256"): SSlibKey, + ("ecdsa-sha2-nistp384", "ecdsa-sha2-nistp384"): SSlibKey, + ("ed25519", "ed25519"): SSlibKey, + ("rsa", "rsassa-pss-md5"): SSlibKey, + ("rsa", "rsassa-pss-sha1"): SSlibKey, + ("rsa", "rsassa-pss-sha224"): SSlibKey, + ("rsa", "rsassa-pss-sha256"): SSlibKey, + ("rsa", "rsassa-pss-sha384"): SSlibKey, + ("rsa", "rsassa-pss-sha512"): SSlibKey, + ("rsa", "rsa-pkcs1v15-md5"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha1"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha224"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha256"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha384"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha512"): SSlibKey, + ("sphincs", "sphincs-shake-128s"): SSlibKey, +} diff --git a/tests/test_signer.py b/tests/test_signer.py index 23e70597..a2ab2df0 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -13,16 +13,19 @@ CryptoError, FormatError, UnsupportedAlgorithmError, + UnverifiedSignatureError, ) from securesystemslib.gpg.constants import have_gpg from securesystemslib.gpg.functions import export_pubkey from securesystemslib.gpg.functions import verify_signature as verify_sig from securesystemslib.signer import ( + KEY_FOR_TYPE_AND_SCHEME, GPGSignature, GPGSigner, Key, Signature, Signer, + SSlibKey, SSlibSigner, ) @@ -31,20 +34,23 @@ class TestKey(unittest.TestCase): """Key tests. See many more tests in python-tuf test suite""" def test_key_from_to_dict(self): - keydict = { - "keytype": "rsa", - "scheme": "rsassa-pss-sha256", - "extra": "somedata", - "keyval": { - "public": "pubkeyval", - "foo": "bar", - }, - } - - key = Key.from_dict("aa", copy.deepcopy(keydict)) - self.assertDictEqual(keydict, key.to_dict()) - - def test_key_is_verified(self): + """Test to/from_dict for known keytype/scheme combos""" + for (keytype, scheme), key_impl in KEY_FOR_TYPE_AND_SCHEME.items(): + keydict = { + "keytype": keytype, + "scheme": scheme, + "extra": "somedata", + "keyval": { + "public": "pubkeyval", + "foo": "bar", + }, + } + + key = Key.from_dict("aa", copy.deepcopy(keydict)) + self.assertIsInstance(key, key_impl) + self.assertDictEqual(keydict, key.to_dict()) + + def test_key_verify_signature(self): sigdict = { "keyid": "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", "sig": "3fc91f5411a567d6a7f28b7fbb9ba6d60b1e2a1b64d8af0b119650015d86bb5a55e57c0e2c995a9b4a332b8f435703e934c0e6ce69fe6674a8ce68719394a40b", @@ -62,8 +68,9 @@ def test_key_is_verified(self): ) sig = Signature.from_dict(sigdict) - self.assertTrue(key.is_verified(sig, b"DATA")) - self.assertFalse(key.is_verified(sig, b"NOTDATA")) + key.verify_signature(sig, b"DATA") + with self.assertRaises(UnverifiedSignatureError): + key.verify_signature(sig, b"NOT DATA") class TestSigner(unittest.TestCase): @@ -89,20 +96,21 @@ def tearDownClass(cls): def test_signer_sign_with_envvar_uri(self): for key in self.keys: # setup - pubkey = Key.from_securesystemslib_key(key) + pubkey = SSlibKey.from_securesystemslib_key(key) os.environ["PRIVKEY"] = key["keyval"]["private"] # test signing signer = Signer.from_priv_key_uri("envvar:PRIVKEY", pubkey) sig = signer.sign(self.DATA) - self.assertTrue(pubkey.is_verified(sig, self.DATA)) - self.assertFalse(pubkey.is_verified(sig, b"NOT DATA")) + pubkey.verify_signature(sig, self.DATA) + with self.assertRaises(UnverifiedSignatureError): + pubkey.verify_signature(sig, b"NOT DATA") def test_signer_sign_with_file_uri(self): for key in self.keys: # setup - pubkey = Key.from_securesystemslib_key(key) + pubkey = SSlibKey.from_securesystemslib_key(key) # let teardownclass handle the file removal with tempfile.NamedTemporaryFile( dir=self.testdir.name, delete=False @@ -113,13 +121,14 @@ def test_signer_sign_with_file_uri(self): signer = Signer.from_priv_key_uri(f"file:{f.name}", pubkey) sig = signer.sign(self.DATA) - self.assertTrue(pubkey.is_verified(sig, self.DATA)) - self.assertFalse(pubkey.is_verified(sig, b"NOT DATA")) + pubkey.verify_signature(sig, self.DATA) + with self.assertRaises(UnverifiedSignatureError): + pubkey.verify_signature(sig, b"NOT DATA") def test_signer_sign_with_enc_file_uri(self): for key in self.keys: # setup - pubkey = Key.from_securesystemslib_key(key) + pubkey = SSlibKey.from_securesystemslib_key(key) privkey = KEYS.encrypt_key(key, "hunter2") # let teardownclass handle the file removal with tempfile.NamedTemporaryFile( @@ -132,11 +141,13 @@ def secrets_handler(secret: str) -> str: return "hunter2" if secret == "passphrase" else "???" uri = f"encfile:{f.name}" + signer = Signer.from_priv_key_uri(uri, pubkey, secrets_handler) sig = signer.sign(self.DATA) - self.assertTrue(pubkey.is_verified(sig, self.DATA)) - self.assertFalse(pubkey.is_verified(sig, b"NOT DATA")) + pubkey.verify_signature(sig, self.DATA) + with self.assertRaises(UnverifiedSignatureError): + pubkey.verify_signature(sig, b"NOT DATA") # test wrong passphrase def fake_handler(_) -> str: From 707e0def3f20fc69978603956a73adca62c29181 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Wed, 9 Nov 2022 15:28:41 +0200 Subject: [PATCH 05/21] Add GPGKey, use it in GPGSigner GPGSigner and GPGKey are not in the default dispatch list as the serialization formats they produce are not compatible with TUF or in-toto specifications Tests are very smoke testy so far --- securesystemslib/signer.py | 109 ++++++++++++++++++++++++++++++++++--- tests/test_signer.py | 45 +++++++++++++++ 2 files changed, 146 insertions(+), 8 deletions(-) diff --git a/securesystemslib/signer.py b/securesystemslib/signer.py index bce3b6c6..303575ca 100644 --- a/securesystemslib/signer.py +++ b/securesystemslib/signer.py @@ -6,9 +6,10 @@ """ import abc +from dataclasses import dataclass import logging import os -from typing import Any, Callable, Dict, Mapping, Optional, Tuple +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple from urllib import parse import securesystemslib.gpg.functions as gpg @@ -215,9 +216,8 @@ def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": Raises: KeyError, TypeError: Invalid arguments. """ - keytype = key_dict["keytype"] - scheme = key_dict["scheme"] - + keytype = key_dict.get("keytype") + scheme = key_dict.get("scheme") if (keytype, scheme) not in KEY_FOR_TYPE_AND_SCHEME: raise ValueError(f"Unsupported public key {keytype}/{scheme}") @@ -332,6 +332,91 @@ def get_payload_hash_algorithm(self) -> str: def match_keyid(self, keyid: str) -> bool: raise NotImplementedError("TODO -- this seems pointless...") +@dataclass +class GPGKey(Key): + """Public GPG key. + + Provides a verify method to verify a cryptographic signature with a + gpg-style rsa, dsa or ecdsa public key on the instance. + + Note that this implementation is not TUF or in-toto spec compliant + + Attributes: + type: Key type, e.g. "rsa", "dsa" or "ecdsa". + method: GPG Key Scheme + hashes: list of GPG Hash Algorithms, e.g. "pgp+SHA2". + keyval: Opaque key content. + keyid: Key identifier + creation_time: Unix timestamp when GPG key was created. + validity_period: Validity of the GPG Keys in days. + subkeys: A dictionary containing keyid and GPG subkey. + """ + + type: str + method: str + hashes: List[str] + keyval: Dict[str, str] + keyid: str + creation_time: Optional[int] = None + validity_period: Optional[int] = None + subkeys: Optional[Dict[str, "GPGKey"]] = None + + @classmethod + def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "GPGKey": + """Creates ``GPGKey`` object from its json/dict representation. + Raises: + KeyError, TypeError: Invalid arguments. + """ + subkeys_dict = key_dict.get("subkeys") + + gpg_subkeys = None + if subkeys_dict: + gpg_subkeys = { + subkeyid: GPGKey.from_dict(subkeyid, subkey_dict) + for (subkeyid, subkey_dict) in subkeys_dict.items() + } + + return cls( + key_dict["type"], + key_dict["method"], + key_dict["hashes"], + key_dict["keyval"], + key_dict["keyid"], + key_dict.get("creation_time"), + key_dict.get("validity_period"), + gpg_subkeys, + ) + + def to_dict(self): + """Returns the dictionary representation of self.""" + + key_dict = { + "method": self.method, + "type": self.type, + "hashes": self.hashes, + "keyid": self.keyid, + "keyval": self.keyval, + } + + if self.creation_time: + key_dict["creation_time"] = self.creation_time + if self.validity_period: + key_dict["validity_period"] = self.validity_period + if self.subkeys: + subkeys_dict = { + keyid: subkey.to_dict() + for (keyid, subkey) in self.subkeys.items() + } + key_dict["subkeys"] = subkeys_dict + + return key_dict + + def verify_signature(self, signature: Signature, data: bytes) -> None: + if not gpg.verify_signature(signature.to_dict(), self.to_dict(), data): + raise exceptions.UnverifiedSignatureError( + f"Failed to verify signature by {self.keyid}" + ) + # SecretsHandler is a function the calling code can provide to Signer: # If Signer needs secrets from user, the function will be called @@ -534,6 +619,7 @@ class GPGSigner(Signer): is used. """ + GPG_SCHEME = "gpg" def __init__( self, keyid: Optional[str] = None, homedir: Optional[str] = None @@ -545,12 +631,17 @@ def __init__( def new_from_uri( cls, priv_key_uri: str, - public_key: Key, + public_key: GPGKey, secrets_handler: SecretsHandler, ) -> Signer: - # GPGSigner uses keys and produces signature dicts that are not - # compliant with TUF or intoto specifications: not useful here - raise NotImplementedError() + # TODO design the URI structure + # TODO public_key is now unused: the signature keyid won't necessarily even match it + uri = parse.urlparse(priv_key_uri) + if uri.scheme != cls.GPG_SCHEME: + raise ValueError(f"Invalid private key uri {priv_key_uri}") + + homedir = uri.path if uri.path else None + return GPGSigner(homedir=homedir) def sign(self, payload: bytes) -> GPGSignature: """Signs a given payload by the key assigned to the GPGSigner instance. @@ -594,6 +685,7 @@ def sign(self, payload: bytes) -> GPGSignature: SSlibSigner.ENVVAR_URI_SCHEME: SSlibSigner, SSlibSigner.FILE_URI_SCHEME: SSlibSigner, SSlibSigner.ENC_FILE_URI_SCHEME: SSlibSigner, + # GPGSigner.GPG_SCHEME: GPGSigner, # Disabled by default: not compliant with TUF or in-toto specifications } KEY_FOR_TYPE_AND_SCHEME = { ("ecdsa", "ecdsa-sha2-nistp256"): SSlibKey, @@ -614,4 +706,5 @@ def sign(self, payload: bytes) -> GPGSignature: ("rsa", "rsa-pkcs1v15-sha384"): SSlibKey, ("rsa", "rsa-pkcs1v15-sha512"): SSlibKey, ("sphincs", "sphincs-shake-128s"): SSlibKey, + # (None, None): GPGKey, # Disabled by default: not compliant with TUF or in-toto specifications } diff --git a/tests/test_signer.py b/tests/test_signer.py index a2ab2df0..8bfd9780 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -20,6 +20,8 @@ from securesystemslib.gpg.functions import verify_signature as verify_sig from securesystemslib.signer import ( KEY_FOR_TYPE_AND_SCHEME, + SIGNER_FOR_URI_SCHEME, + GPGKey, GPGSignature, GPGSigner, Key, @@ -50,6 +52,31 @@ def test_key_from_to_dict(self): self.assertIsInstance(key, key_impl) self.assertDictEqual(keydict, key.to_dict()) + # test GPG as a special non-default case + keydict = { + "hashes": ["pgp+SHA2"], + "keyid": "7b3abb26b97b655ab9296bd15b0bd02e1c768c43", + "keyval": { + "private": "", + "public": { + "e": "010001", + "n": "e9ad391502ae32bd4fcc41a0f9970f8901ed6ad1c5c128c02add22721cdc22318b64bec9f9467b6949b19fc2e98ce41906125ad45d0b138f1ad6c5da7bde38092d9e3e697ce8b8373b150b57342dd921d634b873f258f5c15559b52921fa4bb7f482ec43a1c85c3385bd520cedbdc16b2524a64aecf32ac5690e6dd4ee0210a975e1b6c5af164ea69ca64533422432070511068730594793885567bb8f7cffacf6eb5ffdc640e898e599579b21b15e497f5c052112c5fdf7974e7056cd1564fe84f207cb946d1efc521e5031299e6275936e6f9464a735bd4edc8e0cde3fe5b1bf6d3bc1ed12993b865d8fcb9d9a2b2ef2df30cb7f0ab4c0dea819ea017ff195", + }, + }, + "method": "pgp+rsa-pkcsv1.5", + "type": "rsa", + } + + # Add non-default keytype + KEY_FOR_TYPE_AND_SCHEME[(None, None)] = GPGKey + key = Key.from_dict( + "7b3abb26b97b655ab9296bd15b0bd02e1c768c43", copy.deepcopy(keydict) + ) + del KEY_FOR_TYPE_AND_SCHEME[(None, None)] + + self.assertIsInstance(key, GPGKey) + self.assertDictEqual(keydict, key.to_dict()) + def test_key_verify_signature(self): sigdict = { "keyid": "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", @@ -249,10 +276,15 @@ def setUpClass(cls): shutil.copytree(gpg_keyring_path, cls.gnupg_home) os.chdir(cls.test_dir) + # add signer that is by default not supported + SIGNER_FOR_URI_SCHEME[GPGSigner.GPG_SCHEME] = GPGSigner + @classmethod def tearDownClass(cls): """Change back to initial working dir and remove temp test directory.""" + del SIGNER_FOR_URI_SCHEME[GPGSigner.GPG_SCHEME] + os.chdir(cls.working_dir) shutil.rmtree(cls.test_dir) @@ -292,6 +324,19 @@ def test_gpg_serialization(self): signature = GPGSignature.from_dict(sig_dict) self.assertEqual(sig_dict, signature.to_dict()) + def test_signer_dispatch_with_gpg_signer(self): + + key_data = export_pubkey(self.signing_subkey_keyid, self.gnupg_home) + pubkey = GPGKey.from_dict(key_data["keyid"], key_data) + + signer = Signer.from_priv_key_uri(f"gpg:{self.gnupg_home}", pubkey) + + signature = signer.sign(self.test_data) + signature_dict = signature.to_dict() + + self.assertTrue(verify_sig(signature_dict, key_data, self.test_data)) + self.assertFalse(verify_sig(signature_dict, key_data, self.wrong_data)) + # Run the unit tests. if __name__ == "__main__": From d151606e15e3335c66968af6a6b2fc225b3f4d1f Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Thu, 10 Nov 2022 14:27:24 +0200 Subject: [PATCH 06/21] Signer, Key: Tweak exceptions, improve docs verify_signature now has two exceptions: class UnverifiedSignatureError(Error): class VerificationError(UnverifiedSignatureError): First one for those who just want to know if verify succeeded. Second one for those who want to also know if process failed somehow. --- securesystemslib/exceptions.py | 7 +++- securesystemslib/signer.py | 68 ++++++++++------------------------ 2 files changed, 25 insertions(+), 50 deletions(-) diff --git a/securesystemslib/exceptions.py b/securesystemslib/exceptions.py index a0ef8ab9..5f25fbe0 100755 --- a/securesystemslib/exceptions.py +++ b/securesystemslib/exceptions.py @@ -141,4 +141,9 @@ class StorageError(Error): class UnverifiedSignatureError(Error): - """Signature could not be verified""" + """Signature could not be verified: either signature was incorrect or + something failed during process (see VerificationError)""" + + +class VerificationError(UnverifiedSignatureError): + """Signature could not be verified because something failed in the process""" diff --git a/securesystemslib/signer.py b/securesystemslib/signer.py index 303575ca..8a6bd29a 100644 --- a/securesystemslib/signer.py +++ b/securesystemslib/signer.py @@ -6,9 +6,9 @@ """ import abc -from dataclasses import dataclass import logging import os +from dataclasses import dataclass from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple from urllib import parse @@ -245,36 +245,14 @@ def verify_signature(self, signature: Signature, data: bytes) -> None: data: Payload bytes. Raises: - UnverifiedSignatureError: Failed to verify signature, either - because it was incorrect or because of a verification problem. - """ - raise NotImplementedError - - @abc.abstractmethod - def get_payload_hash_algorithm(self) -> str: - """Return the payload hash algorithm - - This is used by Signers where the actual signing system only accepts - hashes of payloads: e.g. HSM and KMS - - """ - # TODO Do all signing systems support payload prehash? like ed25519? - raise NotImplementedError - - @abc.abstractmethod - def match_keyid(self, keyid: str) -> bool: - """Does given keyid match this keys keyid - - This is a workaround for GPSignature design features. + UnverifiedSignatureError: Failed to verify signature. + VerificationError: Signature verification process failed. If you + are only interested in the verify result, just handle + UnverifiedSignatureError: it contains VerificationError as well """ - # TODO is this reeally really needed? raise NotImplementedError -# TODO verify_signature software errors should have a error of their own? -# Maybe something deriving from UnverifiedSignature? - - class SSlibKey(Key): """Key implementation for RSA, Ed25519, ECDSA and Sphincs keys""" @@ -320,20 +298,14 @@ def verify_signature(self, signature: Signature, data: bytes) -> None: exceptions.FormatError, exceptions.UnsupportedAlgorithmError, ) as e: - # Log unexpected failure, but continue as if there was no signature logger.info("Key %s failed to verify sig: %s", self.keyid, str(e)) - raise exceptions.UnverifiedSignatureError( + raise exceptions.VerificationError( f"Unknown failure to verify signature by {self.keyid}" ) from e - def get_payload_hash_algorithm(self) -> str: - raise NotImplementedError - - def match_keyid(self, keyid: str) -> bool: - raise NotImplementedError("TODO -- this seems pointless...") @dataclass -class GPGKey(Key): +class GPGKey(Key): # pylint: disable=too-many-instance-attributes """Public GPG key. Provides a verify method to verify a cryptographic signature with a @@ -502,18 +474,18 @@ class SSlibSigner(Signer): for the supported types, schemes and hash algorithms. SSlibSigners should be instantiated with Signer.from_priv_key_uri(). - Two private key URI schemes are supported: + These private key URI schemes are supported: * envvar:: - VAR is an environment variable that contains the private key content. + VAR is an environment variable with unencrypted private key content. envvar:MYPRIVKEY * file:: - PATH is a file path to a file that contains private key content. + PATH is a file path to a file with unencrypted private key content. file:path/to/file * encfile:: The the private key content in PATH has been encrypted with - keys.encryot_key(). Application provided SecretsHandler will be + keys.encrypt_key(). Application provided SecretsHandler will be called to get the passphrase. - file:/path/to/encrypted/file + encfile:/path/to/encrypted/file Attributes: key_dict: @@ -611,14 +583,13 @@ class GPGSigner(Signer): Provides a sign method to generate a cryptographic signature with gpg, using an RSA, DSA or EdDSA private key identified by the keyid on the instance. - Args: - keyid: The keyid of the gpg signing keyid. If not passed the default - key in the keyring is used. - - homedir: Path to the gpg keyring. If not passed the default keyring - is used. + GPGSigners should be instantiated with Signer.from_priv_key_uri(). + Two private key URI schemes are supported: + * gpg:: + HOMEDIR: Optional filesystem path to GPG home directory """ + GPG_SCHEME = "gpg" def __init__( @@ -634,8 +605,6 @@ def new_from_uri( public_key: GPGKey, secrets_handler: SecretsHandler, ) -> Signer: - # TODO design the URI structure - # TODO public_key is now unused: the signature keyid won't necessarily even match it uri = parse.urlparse(priv_key_uri) if uri.scheme != cls.GPG_SCHEME: raise ValueError(f"Invalid private key uri {priv_key_uri}") @@ -680,13 +649,14 @@ def sign(self, payload: bytes) -> GPGSignature: return GPGSignature(**sig_dict) -# signer and key implementations are now defined: Add them to the lookup table +# Supported private key uri schemes and the Signers implementing them SIGNER_FOR_URI_SCHEME = { SSlibSigner.ENVVAR_URI_SCHEME: SSlibSigner, SSlibSigner.FILE_URI_SCHEME: SSlibSigner, SSlibSigner.ENC_FILE_URI_SCHEME: SSlibSigner, # GPGSigner.GPG_SCHEME: GPGSigner, # Disabled by default: not compliant with TUF or in-toto specifications } +# Supported key types and schemes, and the Keys implementing them KEY_FOR_TYPE_AND_SCHEME = { ("ecdsa", "ecdsa-sha2-nistp256"): SSlibKey, ("ecdsa", "ecdsa-sha2-nistp384"): SSlibKey, From 748fff50a52bd22186f0282496804465a8219155 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Tue, 15 Nov 2022 14:04:23 +0200 Subject: [PATCH 07/21] signer: Remove GPGSigner/Key from the dispatching Signer and Key are now abstract and can have multiple implementations that get dispatched by the base classes. The current GPG implementation is just non-compliant enough that it is not included in the dispatch. Plan is: * move GPGSigner to intoto, add GPGKey in intoto: this allows intoto to keep using this implementation without too much pain * Possibly implement a more compliant GPG signer and key in securesystemslib --- securesystemslib/signer.py | 105 ++----------------------------------- tests/test_signer.py | 45 ---------------- 2 files changed, 5 insertions(+), 145 deletions(-) diff --git a/securesystemslib/signer.py b/securesystemslib/signer.py index 8a6bd29a..109b074b 100644 --- a/securesystemslib/signer.py +++ b/securesystemslib/signer.py @@ -8,8 +8,7 @@ import abc import logging import os -from dataclasses import dataclass -from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple +from typing import Any, Callable, Dict, Mapping, Optional, Tuple from urllib import parse import securesystemslib.gpg.functions as gpg @@ -304,92 +303,6 @@ def verify_signature(self, signature: Signature, data: bytes) -> None: ) from e -@dataclass -class GPGKey(Key): # pylint: disable=too-many-instance-attributes - """Public GPG key. - - Provides a verify method to verify a cryptographic signature with a - gpg-style rsa, dsa or ecdsa public key on the instance. - - Note that this implementation is not TUF or in-toto spec compliant - - Attributes: - type: Key type, e.g. "rsa", "dsa" or "ecdsa". - method: GPG Key Scheme - hashes: list of GPG Hash Algorithms, e.g. "pgp+SHA2". - keyval: Opaque key content. - keyid: Key identifier - creation_time: Unix timestamp when GPG key was created. - validity_period: Validity of the GPG Keys in days. - subkeys: A dictionary containing keyid and GPG subkey. - """ - - type: str - method: str - hashes: List[str] - keyval: Dict[str, str] - keyid: str - creation_time: Optional[int] = None - validity_period: Optional[int] = None - subkeys: Optional[Dict[str, "GPGKey"]] = None - - @classmethod - def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "GPGKey": - """Creates ``GPGKey`` object from its json/dict representation. - Raises: - KeyError, TypeError: Invalid arguments. - """ - subkeys_dict = key_dict.get("subkeys") - - gpg_subkeys = None - if subkeys_dict: - gpg_subkeys = { - subkeyid: GPGKey.from_dict(subkeyid, subkey_dict) - for (subkeyid, subkey_dict) in subkeys_dict.items() - } - - return cls( - key_dict["type"], - key_dict["method"], - key_dict["hashes"], - key_dict["keyval"], - key_dict["keyid"], - key_dict.get("creation_time"), - key_dict.get("validity_period"), - gpg_subkeys, - ) - - def to_dict(self): - """Returns the dictionary representation of self.""" - - key_dict = { - "method": self.method, - "type": self.type, - "hashes": self.hashes, - "keyid": self.keyid, - "keyval": self.keyval, - } - - if self.creation_time: - key_dict["creation_time"] = self.creation_time - if self.validity_period: - key_dict["validity_period"] = self.validity_period - if self.subkeys: - subkeys_dict = { - keyid: subkey.to_dict() - for (keyid, subkey) in self.subkeys.items() - } - key_dict["subkeys"] = subkeys_dict - - return key_dict - - def verify_signature(self, signature: Signature, data: bytes) -> None: - if not gpg.verify_signature(signature.to_dict(), self.to_dict(), data): - raise exceptions.UnverifiedSignatureError( - f"Failed to verify signature by {self.keyid}" - ) - - # SecretsHandler is a function the calling code can provide to Signer: # If Signer needs secrets from user, the function will be called SecretsHandler = Callable[[str], str] @@ -590,8 +503,6 @@ class GPGSigner(Signer): """ - GPG_SCHEME = "gpg" - def __init__( self, keyid: Optional[str] = None, homedir: Optional[str] = None ): @@ -602,15 +513,10 @@ def __init__( def new_from_uri( cls, priv_key_uri: str, - public_key: GPGKey, + public_key: Key, secrets_handler: SecretsHandler, - ) -> Signer: - uri = parse.urlparse(priv_key_uri) - if uri.scheme != cls.GPG_SCHEME: - raise ValueError(f"Invalid private key uri {priv_key_uri}") - - homedir = uri.path if uri.path else None - return GPGSigner(homedir=homedir) + ) -> "GPGSigner": + raise NotImplementedError("Incompatible with private key URIs") def sign(self, payload: bytes) -> GPGSignature: """Signs a given payload by the key assigned to the GPGSigner instance. @@ -654,8 +560,8 @@ def sign(self, payload: bytes) -> GPGSignature: SSlibSigner.ENVVAR_URI_SCHEME: SSlibSigner, SSlibSigner.FILE_URI_SCHEME: SSlibSigner, SSlibSigner.ENC_FILE_URI_SCHEME: SSlibSigner, - # GPGSigner.GPG_SCHEME: GPGSigner, # Disabled by default: not compliant with TUF or in-toto specifications } + # Supported key types and schemes, and the Keys implementing them KEY_FOR_TYPE_AND_SCHEME = { ("ecdsa", "ecdsa-sha2-nistp256"): SSlibKey, @@ -676,5 +582,4 @@ def sign(self, payload: bytes) -> GPGSignature: ("rsa", "rsa-pkcs1v15-sha384"): SSlibKey, ("rsa", "rsa-pkcs1v15-sha512"): SSlibKey, ("sphincs", "sphincs-shake-128s"): SSlibKey, - # (None, None): GPGKey, # Disabled by default: not compliant with TUF or in-toto specifications } diff --git a/tests/test_signer.py b/tests/test_signer.py index 8bfd9780..a2ab2df0 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -20,8 +20,6 @@ from securesystemslib.gpg.functions import verify_signature as verify_sig from securesystemslib.signer import ( KEY_FOR_TYPE_AND_SCHEME, - SIGNER_FOR_URI_SCHEME, - GPGKey, GPGSignature, GPGSigner, Key, @@ -52,31 +50,6 @@ def test_key_from_to_dict(self): self.assertIsInstance(key, key_impl) self.assertDictEqual(keydict, key.to_dict()) - # test GPG as a special non-default case - keydict = { - "hashes": ["pgp+SHA2"], - "keyid": "7b3abb26b97b655ab9296bd15b0bd02e1c768c43", - "keyval": { - "private": "", - "public": { - "e": "010001", - "n": "e9ad391502ae32bd4fcc41a0f9970f8901ed6ad1c5c128c02add22721cdc22318b64bec9f9467b6949b19fc2e98ce41906125ad45d0b138f1ad6c5da7bde38092d9e3e697ce8b8373b150b57342dd921d634b873f258f5c15559b52921fa4bb7f482ec43a1c85c3385bd520cedbdc16b2524a64aecf32ac5690e6dd4ee0210a975e1b6c5af164ea69ca64533422432070511068730594793885567bb8f7cffacf6eb5ffdc640e898e599579b21b15e497f5c052112c5fdf7974e7056cd1564fe84f207cb946d1efc521e5031299e6275936e6f9464a735bd4edc8e0cde3fe5b1bf6d3bc1ed12993b865d8fcb9d9a2b2ef2df30cb7f0ab4c0dea819ea017ff195", - }, - }, - "method": "pgp+rsa-pkcsv1.5", - "type": "rsa", - } - - # Add non-default keytype - KEY_FOR_TYPE_AND_SCHEME[(None, None)] = GPGKey - key = Key.from_dict( - "7b3abb26b97b655ab9296bd15b0bd02e1c768c43", copy.deepcopy(keydict) - ) - del KEY_FOR_TYPE_AND_SCHEME[(None, None)] - - self.assertIsInstance(key, GPGKey) - self.assertDictEqual(keydict, key.to_dict()) - def test_key_verify_signature(self): sigdict = { "keyid": "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", @@ -276,15 +249,10 @@ def setUpClass(cls): shutil.copytree(gpg_keyring_path, cls.gnupg_home) os.chdir(cls.test_dir) - # add signer that is by default not supported - SIGNER_FOR_URI_SCHEME[GPGSigner.GPG_SCHEME] = GPGSigner - @classmethod def tearDownClass(cls): """Change back to initial working dir and remove temp test directory.""" - del SIGNER_FOR_URI_SCHEME[GPGSigner.GPG_SCHEME] - os.chdir(cls.working_dir) shutil.rmtree(cls.test_dir) @@ -324,19 +292,6 @@ def test_gpg_serialization(self): signature = GPGSignature.from_dict(sig_dict) self.assertEqual(sig_dict, signature.to_dict()) - def test_signer_dispatch_with_gpg_signer(self): - - key_data = export_pubkey(self.signing_subkey_keyid, self.gnupg_home) - pubkey = GPGKey.from_dict(key_data["keyid"], key_data) - - signer = Signer.from_priv_key_uri(f"gpg:{self.gnupg_home}", pubkey) - - signature = signer.sign(self.test_data) - signature_dict = signature.to_dict() - - self.assertTrue(verify_sig(signature_dict, key_data, self.test_data)) - self.assertFalse(verify_sig(signature_dict, key_data, self.wrong_data)) - # Run the unit tests. if __name__ == "__main__": From d40527e1d6330b39d6e821bd427e493f66dce494 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 18 Nov 2022 13:40:54 +0200 Subject: [PATCH 08/21] signer: refactor, move to a new submodule Public API stays the same: * module is still securesystemslib.signer * but now we can split implementation to internal submodules * __init__.py now clearly defines the public API --- securesystemslib/signer/__init__.py | 16 ++++++++++++++++ .../{signer.py => signer/_signer.py} | 0 2 files changed, 16 insertions(+) create mode 100644 securesystemslib/signer/__init__.py rename securesystemslib/{signer.py => signer/_signer.py} (100%) diff --git a/securesystemslib/signer/__init__.py b/securesystemslib/signer/__init__.py new file mode 100644 index 00000000..43a3ef71 --- /dev/null +++ b/securesystemslib/signer/__init__.py @@ -0,0 +1,16 @@ +""" +The Signer API +""" + +from securesystemslib.signer._signer import ( + KEY_FOR_TYPE_AND_SCHEME, + SIGNER_FOR_URI_SCHEME, + GPGSignature, + GPGSigner, + Key, + SecretsHandler, + Signature, + Signer, + SSlibKey, + SSlibSigner, +) diff --git a/securesystemslib/signer.py b/securesystemslib/signer/_signer.py similarity index 100% rename from securesystemslib/signer.py rename to securesystemslib/signer/_signer.py From bbcd02cdbd1ea26d006e6e0c0d808057f9a9f100 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 18 Nov 2022 14:03:20 +0200 Subject: [PATCH 09/21] signer API: refactor, split to three internal submodules This could be split even further (separate the interfaces from the provided implementations) but this is already useful as it shows our abstractions are not leaky: there are no cyclical dependencies. --- securesystemslib/signer/__init__.py | 11 +- securesystemslib/signer/_key.py | 191 +++++++++++++++ securesystemslib/signer/_signature.py | 131 +++++++++++ securesystemslib/signer/_signer.py | 319 +------------------------- 4 files changed, 333 insertions(+), 319 deletions(-) create mode 100644 securesystemslib/signer/_key.py create mode 100644 securesystemslib/signer/_signature.py diff --git a/securesystemslib/signer/__init__.py b/securesystemslib/signer/__init__.py index 43a3ef71..7924eae3 100644 --- a/securesystemslib/signer/__init__.py +++ b/securesystemslib/signer/__init__.py @@ -1,16 +1,15 @@ """ The Signer API -""" +This module provides extensible interfaces for public keys and signers: +Some implementations are provided by default but more can be added by users. +""" +from securesystemslib.signer._key import KEY_FOR_TYPE_AND_SCHEME, Key, SSlibKey +from securesystemslib.signer._signature import GPGSignature, Signature from securesystemslib.signer._signer import ( - KEY_FOR_TYPE_AND_SCHEME, SIGNER_FOR_URI_SCHEME, - GPGSignature, GPGSigner, - Key, SecretsHandler, - Signature, Signer, - SSlibKey, SSlibSigner, ) diff --git a/securesystemslib/signer/_key.py b/securesystemslib/signer/_key.py new file mode 100644 index 00000000..f7c88f64 --- /dev/null +++ b/securesystemslib/signer/_key.py @@ -0,0 +1,191 @@ +"""Key interface and the default implementations""" +import abc +import logging +from typing import Any, Dict, Optional, Tuple + +import securesystemslib.keys as sslib_keys +from securesystemslib import exceptions +from securesystemslib.signer._signature import Signature + +logger = logging.getLogger(__name__) + +# NOTE dict for Key dispatch defined here, but filled at end of file when +# subclass definitions are available. Users can add Key implementations. +KEY_FOR_TYPE_AND_SCHEME: Dict[Tuple[str, str], "Key"] = {} + + +class Key: + """Abstract class representing the public portion of a key. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + keyid: Key identifier that is unique within the metadata it is used in. + Keyid is not verified to be the hash of a specific representation + of the key. + keytype: Key type, e.g. "rsa", "ed25519" or "ecdsa-sha2-nistp256". + scheme: Signature scheme. For example: + "rsassa-pss-sha256", "ed25519", and "ecdsa-sha2-nistp256". + keyval: Opaque key content + unrecognized_fields: Dictionary of all attributes that are not managed + by Securesystemslib + + Raises: + TypeError: Invalid type for an argument. + """ + + __metaclass__ = abc.ABCMeta + + def __init__( + self, + keyid: str, + keytype: str, + scheme: str, + keyval: Dict[str, Any], + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + if not all( + isinstance(at, str) for at in [keyid, keytype, scheme] + ) or not isinstance(keyval, dict): + raise TypeError("Unexpected Key attributes types!") + self.keyid = keyid + self.keytype = keytype + self.scheme = scheme + self.keyval = keyval + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Key): + return False + + return ( + self.keyid == other.keyid + and self.keytype == other.keytype + and self.scheme == other.scheme + and self.keyval == other.keyval + and self.unrecognized_fields == other.unrecognized_fields + ) + + @classmethod + @abc.abstractmethod + def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": + """Creates ``Key`` object from TUF serialization dict. + + Key implementations must override this factory constructor. + + Raises: + KeyError, TypeError: Invalid arguments. + """ + keytype = key_dict.get("keytype") + scheme = key_dict.get("scheme") + if (keytype, scheme) not in KEY_FOR_TYPE_AND_SCHEME: + raise ValueError(f"Unsupported public key {keytype}/{scheme}") + + key_impl = KEY_FOR_TYPE_AND_SCHEME[(keytype, scheme)] + return key_impl.from_dict(keyid, key_dict) + + def to_dict(self) -> Dict[str, Any]: + """Returns a dict for TUF serialization. + + Key implementations may override this method. + """ + return { + "keytype": self.keytype, + "scheme": self.scheme, + "keyval": self.keyval, + **self.unrecognized_fields, + } + + @abc.abstractmethod + def verify_signature(self, signature: Signature, data: bytes) -> None: + """Verifies the signature over data. + + Args: + signature: Signature object. + data: Payload bytes. + + Raises: + UnverifiedSignatureError: Failed to verify signature. + VerificationError: Signature verification process failed. If you + are only interested in the verify result, just handle + UnverifiedSignatureError: it contains VerificationError as well + """ + raise NotImplementedError + + +class SSlibKey(Key): + """Key implementation for RSA, Ed25519, ECDSA and Sphincs keys""" + + def to_securesystemslib_key(self) -> Dict[str, Any]: + """Internal helper function""" + return { + "keyid": self.keyid, + "keytype": self.keytype, + "scheme": self.scheme, + "keyval": self.keyval, + } + + @classmethod + def from_securesystemslib_key(cls, key_dict: Dict[str, Any]) -> "SSlibKey": + """Constructor from classic securesystemslib keydict""" + return SSlibKey( + key_dict["keyid"], + key_dict["keytype"], + key_dict["scheme"], + key_dict["keyval"], + ) + + @classmethod + def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "SSlibKey": + keytype = key_dict.pop("keytype") + scheme = key_dict.pop("scheme") + keyval = key_dict.pop("keyval") + # All fields left in the key_dict are unrecognized. + return cls(keyid, keytype, scheme, keyval, key_dict) + + def verify_signature(self, signature: Signature, data: bytes) -> None: + try: + if not sslib_keys.verify_signature( + self.to_securesystemslib_key(), + signature.to_dict(), + data, + ): + raise exceptions.UnverifiedSignatureError( + f"Failed to verify signature by {self.keyid}" + ) + except ( + exceptions.CryptoError, + exceptions.FormatError, + exceptions.UnsupportedAlgorithmError, + ) as e: + logger.info("Key %s failed to verify sig: %s", self.keyid, str(e)) + raise exceptions.VerificationError( + f"Unknown failure to verify signature by {self.keyid}" + ) from e + + +# Supported key types and schemes, and the Keys implementing them +KEY_FOR_TYPE_AND_SCHEME = { + ("ecdsa", "ecdsa-sha2-nistp256"): SSlibKey, + ("ecdsa", "ecdsa-sha2-nistp384"): SSlibKey, + ("ecdsa-sha2-nistp256", "ecdsa-sha2-nistp256"): SSlibKey, + ("ecdsa-sha2-nistp384", "ecdsa-sha2-nistp384"): SSlibKey, + ("ed25519", "ed25519"): SSlibKey, + ("rsa", "rsassa-pss-md5"): SSlibKey, + ("rsa", "rsassa-pss-sha1"): SSlibKey, + ("rsa", "rsassa-pss-sha224"): SSlibKey, + ("rsa", "rsassa-pss-sha256"): SSlibKey, + ("rsa", "rsassa-pss-sha384"): SSlibKey, + ("rsa", "rsassa-pss-sha512"): SSlibKey, + ("rsa", "rsa-pkcs1v15-md5"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha1"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha224"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha256"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha384"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha512"): SSlibKey, + ("sphincs", "sphincs-shake-128s"): SSlibKey, +} diff --git a/securesystemslib/signer/_signature.py b/securesystemslib/signer/_signature.py new file mode 100644 index 00000000..a94c36a9 --- /dev/null +++ b/securesystemslib/signer/_signature.py @@ -0,0 +1,131 @@ +"""Signature container class""" + +import logging +from typing import Any, Dict, Mapping, Optional + +logger = logging.getLogger(__name__) + + +class Signature: + """A container class containing information about a signature. + + Contains a signature and the keyid uniquely identifying the key used + to generate the signature. + + Provides utility methods to easily create an object from a dictionary + and return the dictionary representation of the object. + + Attributes: + keyid: HEX string used as a unique identifier of the key. + signature: HEX string representing the signature. + unrecognized_fields: Dictionary of all attributes that are not managed + by securesystemslib. + + """ + + def __init__( + self, + keyid: str, + sig: str, + unrecognized_fields: Optional[Mapping[str, Any]] = None, + ): + self.keyid = keyid + self.signature = sig + self.unrecognized_fields: Mapping[str, Any] = unrecognized_fields or {} + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Signature): + return False + + return ( + self.keyid == other.keyid + and self.signature == other.signature + and self.unrecognized_fields == other.unrecognized_fields + ) + + @classmethod + def from_dict(cls, signature_dict: Dict) -> "Signature": + """Creates a Signature object from its JSON/dict representation. + + Arguments: + signature_dict: + A dict containing a valid keyid and a signature. + Note that the fields in it should be named "keyid" and "sig" + respectively. + + Raises: + KeyError: If any of the "keyid" and "sig" fields are missing from + the signature_dict. + + Side Effect: + Destroys the metadata dict passed by reference. + + Returns: + A "Signature" instance. + """ + + keyid = signature_dict.pop("keyid") + sig = signature_dict.pop("sig") + # All fields left in the signature_dict are unrecognized. + return cls(keyid, sig, signature_dict) + + def to_dict(self) -> Dict: + """Returns the JSON-serializable dictionary representation of self.""" + + return { + "keyid": self.keyid, + "sig": self.signature, + **self.unrecognized_fields, + } + + +class GPGSignature(Signature): + """A container class containing information about a gpg signature. + + Besides the signature, it also contains other meta information + needed to uniquely identify the key used to generate the signature. + + Attributes: + keyid: HEX string used as a unique identifier of the key. + signature: HEX string representing the signature. + other_headers: HEX representation of additional GPG headers. + """ + + def __init__( + self, + keyid: str, + signature: str, + other_headers: str, + ): + super().__init__(keyid, signature) + self.other_headers = other_headers + + @classmethod + def from_dict(cls, signature_dict: Dict) -> "GPGSignature": + """Creates a GPGSignature object from its JSON/dict representation. + + Args: + signature_dict: Dict containing valid "keyid", "signature" and + "other_fields" fields. + + Raises: + KeyError: If any of the "keyid", "sig" or "other_headers" fields + are missing from the signature_dict. + + Returns: + GPGSignature instance. + """ + + return cls( + signature_dict["keyid"], + signature_dict["signature"], + signature_dict["other_headers"], + ) + + def to_dict(self) -> Dict: + """Returns the JSON-serializable dictionary representation of self.""" + return { + "keyid": self.keyid, + "signature": self.signature, + "other_headers": self.other_headers, + } diff --git a/securesystemslib/signer/_signer.py b/securesystemslib/signer/_signer.py index 109b074b..66236144 100644 --- a/securesystemslib/signer/_signer.py +++ b/securesystemslib/signer/_signer.py @@ -1,306 +1,21 @@ -"""Signer interface and example interface implementations. - -The goal of this module is to provide a signing interface supporting multiple -signing implementations and a couple of example implementations. - -""" +"""Signer interface and the default implementations""" import abc import logging import os -from typing import Any, Callable, Dict, Mapping, Optional, Tuple +from typing import Callable, Dict, Optional from urllib import parse import securesystemslib.gpg.functions as gpg import securesystemslib.keys as sslib_keys -from securesystemslib import exceptions +from securesystemslib.signer._key import Key, SSlibKey +from securesystemslib.signer._signature import GPGSignature, Signature logger = logging.getLogger(__name__) -# NOTE dicts for Key and Signer dispatch are defined here, but -# filled at end of file when subclass definitions are available. -# Users can add their own implementations into these dictionaries +# NOTE dict for Signer dispatch defined here, but filled at end of file when +# subclass definitions are available. Users can add Signer implementations. SIGNER_FOR_URI_SCHEME: Dict[str, "Signer"] = {} -KEY_FOR_TYPE_AND_SCHEME: Dict[Tuple[str, str], "Key"] = {} - - -class Signature: - """A container class containing information about a signature. - - Contains a signature and the keyid uniquely identifying the key used - to generate the signature. - - Provides utility methods to easily create an object from a dictionary - and return the dictionary representation of the object. - - Attributes: - keyid: HEX string used as a unique identifier of the key. - signature: HEX string representing the signature. - unrecognized_fields: Dictionary of all attributes that are not managed - by securesystemslib. - - """ - - def __init__( - self, - keyid: str, - sig: str, - unrecognized_fields: Optional[Mapping[str, Any]] = None, - ): - self.keyid = keyid - self.signature = sig - self.unrecognized_fields: Mapping[str, Any] = unrecognized_fields or {} - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Signature): - return False - - return ( - self.keyid == other.keyid - and self.signature == other.signature - and self.unrecognized_fields == other.unrecognized_fields - ) - - @classmethod - def from_dict(cls, signature_dict: Dict) -> "Signature": - """Creates a Signature object from its JSON/dict representation. - - Arguments: - signature_dict: - A dict containing a valid keyid and a signature. - Note that the fields in it should be named "keyid" and "sig" - respectively. - - Raises: - KeyError: If any of the "keyid" and "sig" fields are missing from - the signature_dict. - - Side Effect: - Destroys the metadata dict passed by reference. - - Returns: - A "Signature" instance. - """ - - keyid = signature_dict.pop("keyid") - sig = signature_dict.pop("sig") - # All fields left in the signature_dict are unrecognized. - return cls(keyid, sig, signature_dict) - - def to_dict(self) -> Dict: - """Returns the JSON-serializable dictionary representation of self.""" - - return { - "keyid": self.keyid, - "sig": self.signature, - **self.unrecognized_fields, - } - - -class GPGSignature(Signature): - """A container class containing information about a gpg signature. - - Besides the signature, it also contains other meta information - needed to uniquely identify the key used to generate the signature. - - Attributes: - keyid: HEX string used as a unique identifier of the key. - signature: HEX string representing the signature. - other_headers: HEX representation of additional GPG headers. - """ - - def __init__( - self, - keyid: str, - signature: str, - other_headers: str, - ): - super().__init__(keyid, signature) - self.other_headers = other_headers - - @classmethod - def from_dict(cls, signature_dict: Dict) -> "GPGSignature": - """Creates a GPGSignature object from its JSON/dict representation. - - Args: - signature_dict: Dict containing valid "keyid", "signature" and - "other_fields" fields. - - Raises: - KeyError: If any of the "keyid", "sig" or "other_headers" fields - are missing from the signature_dict. - - Returns: - GPGSignature instance. - """ - - return cls( - signature_dict["keyid"], - signature_dict["signature"], - signature_dict["other_headers"], - ) - - def to_dict(self) -> Dict: - """Returns the JSON-serializable dictionary representation of self.""" - return { - "keyid": self.keyid, - "signature": self.signature, - "other_headers": self.other_headers, - } - - -class Key: - """Abstract class representing the public portion of a key. - - *All parameters named below are not just constructor arguments but also - instance attributes.* - - Args: - keyid: Key identifier that is unique within the metadata it is used in. - Keyid is not verified to be the hash of a specific representation - of the key. - keytype: Key type, e.g. "rsa", "ed25519" or "ecdsa-sha2-nistp256". - scheme: Signature scheme. For example: - "rsassa-pss-sha256", "ed25519", and "ecdsa-sha2-nistp256". - keyval: Opaque key content - unrecognized_fields: Dictionary of all attributes that are not managed - by Securesystemslib - - Raises: - TypeError: Invalid type for an argument. - """ - - __metaclass__ = abc.ABCMeta - - def __init__( - self, - keyid: str, - keytype: str, - scheme: str, - keyval: Dict[str, Any], - unrecognized_fields: Optional[Dict[str, Any]] = None, - ): - if not all( - isinstance(at, str) for at in [keyid, keytype, scheme] - ) or not isinstance(keyval, dict): - raise TypeError("Unexpected Key attributes types!") - self.keyid = keyid - self.keytype = keytype - self.scheme = scheme - self.keyval = keyval - if unrecognized_fields is None: - unrecognized_fields = {} - - self.unrecognized_fields = unrecognized_fields - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Key): - return False - - return ( - self.keyid == other.keyid - and self.keytype == other.keytype - and self.scheme == other.scheme - and self.keyval == other.keyval - and self.unrecognized_fields == other.unrecognized_fields - ) - - @classmethod - @abc.abstractmethod - def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": - """Creates ``Key`` object from TUF serialization dict. - - Key implementations must override this factory constructor. - - Raises: - KeyError, TypeError: Invalid arguments. - """ - keytype = key_dict.get("keytype") - scheme = key_dict.get("scheme") - if (keytype, scheme) not in KEY_FOR_TYPE_AND_SCHEME: - raise ValueError(f"Unsupported public key {keytype}/{scheme}") - - key_impl = KEY_FOR_TYPE_AND_SCHEME[(keytype, scheme)] - return key_impl.from_dict(keyid, key_dict) - - def to_dict(self) -> Dict[str, Any]: - """Returns a dict for TUF serialization. - - Key implementations may override this method. - """ - return { - "keytype": self.keytype, - "scheme": self.scheme, - "keyval": self.keyval, - **self.unrecognized_fields, - } - - @abc.abstractmethod - def verify_signature(self, signature: Signature, data: bytes) -> None: - """Verifies the signature over data. - - Args: - signature: Signature object. - data: Payload bytes. - - Raises: - UnverifiedSignatureError: Failed to verify signature. - VerificationError: Signature verification process failed. If you - are only interested in the verify result, just handle - UnverifiedSignatureError: it contains VerificationError as well - """ - raise NotImplementedError - - -class SSlibKey(Key): - """Key implementation for RSA, Ed25519, ECDSA and Sphincs keys""" - - def to_securesystemslib_key(self) -> Dict[str, Any]: - """Internal helper function""" - return { - "keyid": self.keyid, - "keytype": self.keytype, - "scheme": self.scheme, - "keyval": self.keyval, - } - - @classmethod - def from_securesystemslib_key(cls, key_dict: Dict[str, Any]) -> "SSlibKey": - """Constructor from classic securesystemslib keydict""" - return SSlibKey( - key_dict["keyid"], - key_dict["keytype"], - key_dict["scheme"], - key_dict["keyval"], - ) - - @classmethod - def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "SSlibKey": - keytype = key_dict.pop("keytype") - scheme = key_dict.pop("scheme") - keyval = key_dict.pop("keyval") - # All fields left in the key_dict are unrecognized. - return cls(keyid, keytype, scheme, keyval, key_dict) - - def verify_signature(self, signature: Signature, data: bytes) -> None: - try: - if not sslib_keys.verify_signature( - self.to_securesystemslib_key(), - signature.to_dict(), - data, - ): - raise exceptions.UnverifiedSignatureError( - f"Failed to verify signature by {self.keyid}" - ) - except ( - exceptions.CryptoError, - exceptions.FormatError, - exceptions.UnsupportedAlgorithmError, - ) as e: - logger.info("Key %s failed to verify sig: %s", self.keyid, str(e)) - raise exceptions.VerificationError( - f"Unknown failure to verify signature by {self.keyid}" - ) from e # SecretsHandler is a function the calling code can provide to Signer: @@ -561,25 +276,3 @@ def sign(self, payload: bytes) -> GPGSignature: SSlibSigner.FILE_URI_SCHEME: SSlibSigner, SSlibSigner.ENC_FILE_URI_SCHEME: SSlibSigner, } - -# Supported key types and schemes, and the Keys implementing them -KEY_FOR_TYPE_AND_SCHEME = { - ("ecdsa", "ecdsa-sha2-nistp256"): SSlibKey, - ("ecdsa", "ecdsa-sha2-nistp384"): SSlibKey, - ("ecdsa-sha2-nistp256", "ecdsa-sha2-nistp256"): SSlibKey, - ("ecdsa-sha2-nistp384", "ecdsa-sha2-nistp384"): SSlibKey, - ("ed25519", "ed25519"): SSlibKey, - ("rsa", "rsassa-pss-md5"): SSlibKey, - ("rsa", "rsassa-pss-sha1"): SSlibKey, - ("rsa", "rsassa-pss-sha224"): SSlibKey, - ("rsa", "rsassa-pss-sha256"): SSlibKey, - ("rsa", "rsassa-pss-sha384"): SSlibKey, - ("rsa", "rsassa-pss-sha512"): SSlibKey, - ("rsa", "rsa-pkcs1v15-md5"): SSlibKey, - ("rsa", "rsa-pkcs1v15-sha1"): SSlibKey, - ("rsa", "rsa-pkcs1v15-sha224"): SSlibKey, - ("rsa", "rsa-pkcs1v15-sha256"): SSlibKey, - ("rsa", "rsa-pkcs1v15-sha384"): SSlibKey, - ("rsa", "rsa-pkcs1v15-sha512"): SSlibKey, - ("sphincs", "sphincs-shake-128s"): SSlibKey, -} From 4a877cbf2c84de4731699f3fbdbc28006293a2b4 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Mon, 21 Nov 2022 12:19:55 +0200 Subject: [PATCH 10/21] signer: Improve docstrings, annotations * better docstrings * enable annotations for signer API * Fix annotation issues * Add a missing type check based on the annotations --- mypy.ini | 1 + securesystemslib/signer/_key.py | 28 +++++++++++++-------- securesystemslib/signer/_signer.py | 40 +++++++++++++++++++----------- 3 files changed, 45 insertions(+), 24 deletions(-) diff --git a/mypy.ini b/mypy.ini index 6a13550c..47e7fce2 100644 --- a/mypy.ini +++ b/mypy.ini @@ -2,6 +2,7 @@ warn_unused_configs = True files = securesystemslib/util.py, + securesystemslib/signer/*.py, securesystemslib/storage.py, securesystemslib/gpg/constants.py diff --git a/securesystemslib/signer/_key.py b/securesystemslib/signer/_key.py index f7c88f64..93eb97b3 100644 --- a/securesystemslib/signer/_key.py +++ b/securesystemslib/signer/_key.py @@ -1,7 +1,7 @@ """Key interface and the default implementations""" import abc import logging -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Optional, Tuple, Type import securesystemslib.keys as sslib_keys from securesystemslib import exceptions @@ -11,7 +11,8 @@ # NOTE dict for Key dispatch defined here, but filled at end of file when # subclass definitions are available. Users can add Key implementations. -KEY_FOR_TYPE_AND_SCHEME: Dict[Tuple[str, str], "Key"] = {} + +KEY_FOR_TYPE_AND_SCHEME: Dict[Tuple[str, str], Type] = {} class Key: @@ -73,9 +74,13 @@ def __eq__(self, other: Any) -> bool: @classmethod @abc.abstractmethod def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": - """Creates ``Key`` object from TUF serialization dict. + """Creates ``Key`` object from a serialization dict + + Key implementations must override this factory constructor that is used + as a deserialization helper. - Key implementations must override this factory constructor. + Users should call Key.from_dict(): it dispacthes to the actual subclass + implementation based on supported keys in KEY_FOR_TYPE_AND_SCHEME. Raises: KeyError, TypeError: Invalid arguments. @@ -85,13 +90,16 @@ def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": if (keytype, scheme) not in KEY_FOR_TYPE_AND_SCHEME: raise ValueError(f"Unsupported public key {keytype}/{scheme}") - key_impl = KEY_FOR_TYPE_AND_SCHEME[(keytype, scheme)] + # NOTE: Explicitly not checking the keytype and scheme types to allow + # intoto to use (None,None) to lookup GPGKey, see issue #450 + key_impl = KEY_FOR_TYPE_AND_SCHEME[(keytype, scheme)] # type: ignore return key_impl.from_dict(keyid, key_dict) def to_dict(self) -> Dict[str, Any]: - """Returns a dict for TUF serialization. + """Returns a serialization dict. - Key implementations may override this method. + Key implementations may override this method. This method is a + deserialization helper. """ return { "keytype": self.keytype, @@ -102,7 +110,7 @@ def to_dict(self) -> Dict[str, Any]: @abc.abstractmethod def verify_signature(self, signature: Signature, data: bytes) -> None: - """Verifies the signature over data. + """Raises if verification of signature over data fails. Args: signature: Signature object. @@ -110,7 +118,7 @@ def verify_signature(self, signature: Signature, data: bytes) -> None: Raises: UnverifiedSignatureError: Failed to verify signature. - VerificationError: Signature verification process failed. If you + VerificationError: Signature verification process error. If you are only interested in the verify result, just handle UnverifiedSignatureError: it contains VerificationError as well """ @@ -121,7 +129,7 @@ class SSlibKey(Key): """Key implementation for RSA, Ed25519, ECDSA and Sphincs keys""" def to_securesystemslib_key(self) -> Dict[str, Any]: - """Internal helper function""" + """Internal helper, returns a classic securesystemslib keydict""" return { "keyid": self.keyid, "keytype": self.keytype, diff --git a/securesystemslib/signer/_signer.py b/securesystemslib/signer/_signer.py index 66236144..94b3ebeb 100644 --- a/securesystemslib/signer/_signer.py +++ b/securesystemslib/signer/_signer.py @@ -3,7 +3,7 @@ import abc import logging import os -from typing import Callable, Dict, Optional +from typing import Callable, Dict, Optional, Type from urllib import parse import securesystemslib.gpg.functions as gpg @@ -15,11 +15,13 @@ # NOTE dict for Signer dispatch defined here, but filled at end of file when # subclass definitions are available. Users can add Signer implementations. -SIGNER_FOR_URI_SCHEME: Dict[str, "Signer"] = {} +SIGNER_FOR_URI_SCHEME: Dict[str, Type] = {} # SecretsHandler is a function the calling code can provide to Signer: -# If Signer needs secrets from user, the function will be called +# SecretsHandler will be called if Signer needs additional secrets. +# The argument is the name of the secret ("PIN", "passphrase", etc). +# Return value is the secret string. SecretsHandler = Callable[[str], str] @@ -57,17 +59,17 @@ def new_from_uri( cls, priv_key_uri: str, public_key: Key, - secrets_handler: SecretsHandler, + secrets_handler: Optional[SecretsHandler], ) -> "Signer": - """Constructor for given private key URI + """Constructor implementation for given private key URI This is a semi-private method meant to be called by Signer only. - Implementation is required if the Signer subclass is in + Method implementation is required if the Signer subclass is added to SIGNER_FOR_URI_SCHEME. Arguments: - priv_key_uri: URI that identifies the private key and signer - public_key: Key object + priv_key_uri: URI that identifies the private key + public_key: Key that is the public portion of this private key secrets_handler: Optional function that may be called if the signer needs additional secrets (like a PIN or passphrase) """ @@ -78,12 +80,18 @@ def from_priv_key_uri( priv_key_uri: str, public_key: Key, secrets_handler: Optional[SecretsHandler] = None, - ): - """Returns a concrete Signer implementation based on private key URI + ) -> "Signer": + """Factory constructor for a given private key URI + + Returns a specific Signer instance based on the private key URI and the + supported uri schemes listed in SIGNER_FOR_URI_SCHEME. Args: - priv_key_uri: URI that identifies the private key location and signer - public_key: Key object + priv_key_uri: URI that identifies the private key + public_key: Key that is the public portion of this private key + secrets_handler: Optional function that may be called if the + signer needs additional secrets (like a PIN or passphrase). + secrets_handler should return the requested secret string. """ scheme, _, _ = priv_key_uri.partition(":") @@ -133,7 +141,7 @@ def new_from_uri( cls, priv_key_uri: str, public_key: Key, - secrets_handler: SecretsHandler, + secrets_handler: Optional[SecretsHandler], ) -> "SSlibSigner": """Semi-private Constructor for Signer to call @@ -170,6 +178,10 @@ def new_from_uri( private = f.read().decode() elif uri.scheme == cls.ENC_FILE_URI_SCHEME: + if not secrets_handler: + raise ValueError( + f"{uri.scheme} requires a SecretsHandler" + ) # read key from file, ask for passphrase, decrypt with open(uri.path, "rb") as f: enc = f.read().decode() @@ -229,7 +241,7 @@ def new_from_uri( cls, priv_key_uri: str, public_key: Key, - secrets_handler: SecretsHandler, + secrets_handler: Optional[SecretsHandler], ) -> "GPGSigner": raise NotImplementedError("Incompatible with private key URIs") From aea4b31d94f02c11803eb29e283fbec49c59aa12 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Mon, 21 Nov 2022 14:26:03 +0200 Subject: [PATCH 11/21] signer: Refactor file uri scheme Remove the "encfile" uri scheme: instead have a single "file" scheme that has a required parameter "encrypted". This makes the uris slightly longer but this way there is never any confusion whether a file private key is encrypted or not: The URI makes it clear in all cases. --- securesystemslib/signer/_signer.py | 58 ++++++++++++++---------------- tests/test_signer.py | 28 ++++++++++++--- 2 files changed, 51 insertions(+), 35 deletions(-) diff --git a/securesystemslib/signer/_signer.py b/securesystemslib/signer/_signer.py index 94b3ebeb..d7d97f40 100644 --- a/securesystemslib/signer/_signer.py +++ b/securesystemslib/signer/_signer.py @@ -36,7 +36,6 @@ class Signer: currently supported default schemes are: * envvar: see SSlibSigner for details * file: see SSlibSigner for details - * encfile: see SSlibSigner for details """ __metaclass__ = abc.ABCMeta @@ -92,6 +91,11 @@ def from_priv_key_uri( secrets_handler: Optional function that may be called if the signer needs additional secrets (like a PIN or passphrase). secrets_handler should return the requested secret string. + + Raises: + ValueError: Incorrect arguments + Other Signer-specific errors: These could include OSErrors for + reading files or network errors for connecting to a KMS. """ scheme, _, _ = priv_key_uri.partition(":") @@ -111,17 +115,13 @@ class SSlibSigner(Signer): SSlibSigners should be instantiated with Signer.from_priv_key_uri(). These private key URI schemes are supported: - * envvar:: + * "envvar:": VAR is an environment variable with unencrypted private key content. envvar:MYPRIVKEY - * file:: + * "file:?encrypted=[true|false]": PATH is a file path to a file with unencrypted private key content. - file:path/to/file - * encfile:: - The the private key content in PATH has been encrypted with - keys.encrypt_key(). Application provided SecretsHandler will be - called to get the passphrase. - encfile:/path/to/encrypted/file + file:path/to/file?encrypted=true + file:/abs/path/to/file?encrypted=false Attributes: key_dict: @@ -131,7 +131,6 @@ class SSlibSigner(Signer): ENVVAR_URI_SCHEME = "envvar" FILE_URI_SCHEME = "file" - ENC_FILE_URI_SCHEME = "encfile" def __init__(self, key_dict: Dict): self.key_dict = key_dict @@ -158,9 +157,7 @@ def new_from_uri( SSlibSigner for the given private key URI. """ if not isinstance(public_key, SSlibKey): - raise ValueError( - f"Expected SSlibKey public key for private key {priv_key_uri}" - ) + raise ValueError(f"Expected SSlibKey for {priv_key_uri}") uri = parse.urlparse(priv_key_uri) @@ -168,31 +165,31 @@ def new_from_uri( # read private key from environment variable private = os.getenv(uri.path) if private is None: + raise ValueError(f"Unset env var for {priv_key_uri}") + + elif uri.scheme == cls.FILE_URI_SCHEME: + params = dict(parse.parse_qsl(uri.query)) + if "encrypted" not in params: raise ValueError( - f"Unset private key variable for {priv_key_uri}" + f"{uri.scheme} requires 'encrypted' parameter" ) - elif uri.scheme == cls.FILE_URI_SCHEME: - # read private key from file + # read private key (may be encrypted or not) from file with open(uri.path, "rb") as f: private = f.read().decode() - elif uri.scheme == cls.ENC_FILE_URI_SCHEME: - if not secrets_handler: - raise ValueError( - f"{uri.scheme} requires a SecretsHandler" - ) - # read key from file, ask for passphrase, decrypt - with open(uri.path, "rb") as f: - enc = f.read().decode() - secret = secrets_handler("passphrase") - decrypted = sslib_keys.decrypt_key(enc, secret) - private = decrypted["keyval"]["private"] + if params["encrypted"] != "false": + if not secrets_handler: + raise ValueError( + f"encrypted private key requires a SecretsHandler" + ) + + secret = secrets_handler("passphrase") + decrypted = sslib_keys.decrypt_key(private, secret) + private = decrypted["keyval"]["private"] else: - raise ValueError( - f"SSlibSigner does not support priv key uri {priv_key_uri}" - ) + raise ValueError(f"SSlibSigner does not support {priv_key_uri}") keydict = public_key.to_securesystemslib_key() keydict["keyval"]["private"] = private @@ -286,5 +283,4 @@ def sign(self, payload: bytes) -> GPGSignature: SIGNER_FOR_URI_SCHEME = { SSlibSigner.ENVVAR_URI_SCHEME: SSlibSigner, SSlibSigner.FILE_URI_SCHEME: SSlibSigner, - SSlibSigner.ENC_FILE_URI_SCHEME: SSlibSigner, } diff --git a/tests/test_signer.py b/tests/test_signer.py index a2ab2df0..585dcb19 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -93,6 +93,25 @@ def setUpClass(cls): def tearDownClass(cls): cls.testdir.cleanup() + def test_signer_sign_with_incorrect_uri(self): + pubkey = SSlibKey.from_securesystemslib_key(self.keys[0]) + with self.assertRaises(ValueError): + # unknown uri + Signer.from_priv_key_uri("unknownscheme:x", pubkey) + + with self.assertRaises(ValueError): + # env variable not defined + Signer.from_priv_key_uri("envvar:NONEXISTENTVAR", pubkey) + + with self.assertRaises(ValueError): + # no "encrypted" param + Signer.from_priv_key_uri("file:path/to/privkey", pubkey) + + with self.assertRaises(OSError): + # file not found + uri = "file:nonexistentfile?encrypted=false" + Signer.from_priv_key_uri(uri, pubkey) + def test_signer_sign_with_envvar_uri(self): for key in self.keys: # setup @@ -117,8 +136,9 @@ def test_signer_sign_with_file_uri(self): ) as f: f.write(key["keyval"]["private"].encode()) - # test signing - signer = Signer.from_priv_key_uri(f"file:{f.name}", pubkey) + # test signing with unencrypted key + uri = f"file:{f.name}?encrypted=false" + signer = Signer.from_priv_key_uri(uri, pubkey) sig = signer.sign(self.DATA) pubkey.verify_signature(sig, self.DATA) @@ -136,11 +156,11 @@ def test_signer_sign_with_enc_file_uri(self): ) as f: f.write(privkey.encode()) - # test signing + # test signing with encrypted key def secrets_handler(secret: str) -> str: return "hunter2" if secret == "passphrase" else "???" - uri = f"encfile:{f.name}" + uri = f"file:{f.name}?encrypted=true" signer = Signer.from_priv_key_uri(uri, pubkey, secrets_handler) sig = signer.sign(self.DATA) From 1d5931edbc791c38081aaac50ec52b451dfd8f64 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Mon, 21 Nov 2022 14:44:13 +0200 Subject: [PATCH 12/21] signer: Refactor uri support There used to be two Signer methods: * from_priv_key_uri -- the public dispatcher or factory constructor * new_from_uri -- the abstract method that subclassess implement There is no need for these to be separate: remove new_from_uri() --- securesystemslib/signer/_signer.py | 44 ++++++++---------------------- 1 file changed, 11 insertions(+), 33 deletions(-) diff --git a/securesystemslib/signer/_signer.py b/securesystemslib/signer/_signer.py index d7d97f40..19ad7694 100644 --- a/securesystemslib/signer/_signer.py +++ b/securesystemslib/signer/_signer.py @@ -54,28 +54,8 @@ def sign(self, payload: bytes) -> Signature: @classmethod @abc.abstractmethod - def new_from_uri( - cls, - priv_key_uri: str, - public_key: Key, - secrets_handler: Optional[SecretsHandler], - ) -> "Signer": - """Constructor implementation for given private key URI - - This is a semi-private method meant to be called by Signer only. - Method implementation is required if the Signer subclass is added to - SIGNER_FOR_URI_SCHEME. - - Arguments: - priv_key_uri: URI that identifies the private key - public_key: Key that is the public portion of this private key - secrets_handler: Optional function that may be called if the - signer needs additional secrets (like a PIN or passphrase) - """ - raise NotImplementedError # pragma: no cover - - @staticmethod def from_priv_key_uri( + cls, priv_key_uri: str, public_key: Key, secrets_handler: Optional[SecretsHandler] = None, @@ -103,7 +83,9 @@ def from_priv_key_uri( raise ValueError(f"Unsupported private key scheme {scheme}") signer = SIGNER_FOR_URI_SCHEME[scheme] - return signer.new_from_uri(priv_key_uri, public_key, secrets_handler) + return signer.from_priv_key_uri( + priv_key_uri, public_key, secrets_handler + ) class SSlibSigner(Signer): @@ -136,13 +118,13 @@ def __init__(self, key_dict: Dict): self.key_dict = key_dict @classmethod - def new_from_uri( + def from_priv_key_uri( cls, priv_key_uri: str, public_key: Key, - secrets_handler: Optional[SecretsHandler], + secrets_handler: Optional[SecretsHandler] = None, ) -> "SSlibSigner": - """Semi-private Constructor for Signer to call + """Constructor for Signer to call Arguments: priv_key_uri: private key URI described in class doc @@ -170,9 +152,7 @@ def new_from_uri( elif uri.scheme == cls.FILE_URI_SCHEME: params = dict(parse.parse_qsl(uri.query)) if "encrypted" not in params: - raise ValueError( - f"{uri.scheme} requires 'encrypted' parameter" - ) + raise ValueError(f"{uri.scheme} requires 'encrypted' parameter") # read private key (may be encrypted or not) from file with open(uri.path, "rb") as f: @@ -180,9 +160,7 @@ def new_from_uri( if params["encrypted"] != "false": if not secrets_handler: - raise ValueError( - f"encrypted private key requires a SecretsHandler" - ) + raise ValueError("encrypted key requires a secrets handler") secret = secrets_handler("passphrase") decrypted = sslib_keys.decrypt_key(private, secret) @@ -234,11 +212,11 @@ def __init__( self.homedir = homedir @classmethod - def new_from_uri( + def from_priv_key_uri( cls, priv_key_uri: str, public_key: Key, - secrets_handler: Optional[SecretsHandler], + secrets_handler: Optional[SecretsHandler] = None, ) -> "GPGSigner": raise NotImplementedError("Incompatible with private key URIs") From 8a7951a1c1440c981d92cc73e5b7d16500e46e96 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Wed, 23 Nov 2022 11:36:05 +0200 Subject: [PATCH 13/21] Improve docstrings Various docstring and test improvements based on review comments. The general theme is: * Avoid documenting too much in Signer implementations: Users will be using Signer dispatch so they will not read the subclass documentation * Try to document Signer itself better The open questions are * exceptions from subclasses: this has been left in subclass docs for now. * documenting the URIs: this is also still in subclass docs. I think this issue becomes more tangible once we have multiple subclasses. --- securesystemslib/signer/_signer.py | 46 ++++++++++++++++-------------- tests/test_signer.py | 4 ++- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/securesystemslib/signer/_signer.py b/securesystemslib/signer/_signer.py index 19ad7694..e05ffe34 100644 --- a/securesystemslib/signer/_signer.py +++ b/securesystemslib/signer/_signer.py @@ -29,6 +29,7 @@ class Signer: """Signer interface that supports multiple signing implementations. Usage example: + signer = Signer.from_priv_key_uri("envvar:MYPRIVKEY", pub_key) sig = signer.sign(b"data") @@ -36,6 +37,22 @@ class Signer: currently supported default schemes are: * envvar: see SSlibSigner for details * file: see SSlibSigner for details + + Interactive applications may also define a secrets handler that allows + asking for user secrets if they are needed: + + from getpass import getpass + + def sec_handler(secret_name:str) -> str: + return getpass(f"Enter {secret_name}: ") + + # user will not be asked for a passphrase for unencrypted key + uri = "file:keys/mykey?encrypted=false" + signer = Signer.from_priv_key_uri(uri, pub_key, sec_handler) + + # user will be asked for a passphrase for encrypted key + uri2 = "file:keys/myenckey?encrypted=true" + signer2 = Signer.from_priv_key_uri(uri2, pub_key2, sec_handler) """ __metaclass__ = abc.ABCMeta @@ -101,7 +118,9 @@ class SSlibSigner(Signer): VAR is an environment variable with unencrypted private key content. envvar:MYPRIVKEY * "file:?encrypted=[true|false]": - PATH is a file path to a file with unencrypted private key content. + PATH is a file path to a file with private key content. If + encrypted=true, the file is expected to have been created with + securesystemslib.keys.encrypt_key(). file:path/to/file?encrypted=true file:/abs/path/to/file?encrypted=false @@ -126,17 +145,10 @@ def from_priv_key_uri( ) -> "SSlibSigner": """Constructor for Signer to call - Arguments: - priv_key_uri: private key URI described in class doc - public_key: Key object. + Please refer to Signer.from_priv_key_uri() documentation. - Raises: + Additionally raises: OSError: Reading the file failed with "file:" URI - ValueError: URI is unsupported or environment variable was not set - with "envvar:" URIs - - Returns: - SSlibSigner for the given private key URI. """ if not isinstance(public_key, SSlibKey): raise ValueError(f"Expected SSlibKey for {priv_key_uri}") @@ -176,17 +188,13 @@ def from_priv_key_uri( def sign(self, payload: bytes) -> Signature: """Signs a given payload by the key assigned to the SSlibSigner instance. - Arguments: - payload: The bytes to be signed. + Please see Signer.sign() documentation. - Raises: + Additionally raises: securesystemslib.exceptions.FormatError: Key argument is malformed. securesystemslib.exceptions.CryptoError, \ securesystemslib.exceptions.UnsupportedAlgorithmError: Signing errors. - - Returns: - Returns a "Signature" class instance. """ sig_dict = sslib_keys.create_signature(self.key_dict, payload) return Signature(**sig_dict) @@ -197,12 +205,6 @@ class GPGSigner(Signer): Provides a sign method to generate a cryptographic signature with gpg, using an RSA, DSA or EdDSA private key identified by the keyid on the instance. - - GPGSigners should be instantiated with Signer.from_priv_key_uri(). - Two private key URI schemes are supported: - * gpg:: - HOMEDIR: Optional filesystem path to GPG home directory - """ def __init__( diff --git a/tests/test_signer.py b/tests/test_signer.py index 585dcb19..233dcdae 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -158,7 +158,9 @@ def test_signer_sign_with_enc_file_uri(self): # test signing with encrypted key def secrets_handler(secret: str) -> str: - return "hunter2" if secret == "passphrase" else "???" + if secret != "passphrase": + raise ValueError("Only prepared to return a passphrase") + return "hunter2" uri = f"file:{f.name}?encrypted=true" From d9bc6559edcdb6f16e8b0fcc2d7694b07b269fd8 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 25 Nov 2022 10:16:43 +0200 Subject: [PATCH 14/21] Key: Make to_dict abstract, fix bug in SSlibKey Based on review comments: * Move to_dict() implementation to SSlibKey to prevent future accidents * improve docstrings * Fix a bug in SSlibKey.from_securesystemslib_key() --- securesystemslib/signer/_key.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/securesystemslib/signer/_key.py b/securesystemslib/signer/_key.py index 93eb97b3..f0c04be0 100644 --- a/securesystemslib/signer/_key.py +++ b/securesystemslib/signer/_key.py @@ -79,7 +79,7 @@ def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": Key implementations must override this factory constructor that is used as a deserialization helper. - Users should call Key.from_dict(): it dispacthes to the actual subclass + Users should call Key.from_dict(): it dispatches to the actual subclass implementation based on supported keys in KEY_FOR_TYPE_AND_SCHEME. Raises: @@ -95,18 +95,13 @@ def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": key_impl = KEY_FOR_TYPE_AND_SCHEME[(keytype, scheme)] # type: ignore return key_impl.from_dict(keyid, key_dict) + @abc.abstractmethod def to_dict(self) -> Dict[str, Any]: """Returns a serialization dict. - Key implementations may override this method. This method is a - deserialization helper. + Key implementations must override this serialization helper. """ - return { - "keytype": self.keytype, - "scheme": self.scheme, - "keyval": self.keyval, - **self.unrecognized_fields, - } + raise NotImplementedError @abc.abstractmethod def verify_signature(self, signature: Signature, data: bytes) -> None: @@ -140,11 +135,12 @@ def to_securesystemslib_key(self) -> Dict[str, Any]: @classmethod def from_securesystemslib_key(cls, key_dict: Dict[str, Any]) -> "SSlibKey": """Constructor from classic securesystemslib keydict""" + # ensure possible private keys are not included in keyval return SSlibKey( key_dict["keyid"], key_dict["keytype"], key_dict["scheme"], - key_dict["keyval"], + {"public": key_dict["keyval"]["public"]}, ) @classmethod @@ -155,6 +151,14 @@ def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "SSlibKey": # All fields left in the key_dict are unrecognized. return cls(keyid, keytype, scheme, keyval, key_dict) + def to_dict(self) -> Dict[str, Any]: + return { + "keytype": self.keytype, + "scheme": self.scheme, + "keyval": self.keyval, + **self.unrecognized_fields, + } + def verify_signature(self, signature: Signature, data: bytes) -> None: try: if not sslib_keys.verify_signature( From ce07f34f88fa8dcae26bd6014d8dd69b3dbe4088 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 25 Nov 2022 10:48:49 +0200 Subject: [PATCH 15/21] Key: Add minimal deserialization type check The TUF/intoto specs specify that the keytypes that SSlibKey implements must have a keyval.public that is a string. Check that that this is true on deserialization. Also test deserialization with some invalid key data. --- securesystemslib/signer/_key.py | 4 ++++ tests/test_signer.py | 17 +++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/securesystemslib/signer/_key.py b/securesystemslib/signer/_key.py index f0c04be0..746fd217 100644 --- a/securesystemslib/signer/_key.py +++ b/securesystemslib/signer/_key.py @@ -148,6 +148,10 @@ def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "SSlibKey": keytype = key_dict.pop("keytype") scheme = key_dict.pop("scheme") keyval = key_dict.pop("keyval") + + if "public" not in keyval or not isinstance(keyval["public"], str): + raise ValueError(f"public key string required for scheme {scheme}") + # All fields left in the key_dict are unrecognized. return cls(keyid, keytype, scheme, keyval, key_dict) diff --git a/tests/test_signer.py b/tests/test_signer.py index 233dcdae..52f5529a 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -50,6 +50,23 @@ def test_key_from_to_dict(self): self.assertIsInstance(key, key_impl) self.assertDictEqual(keydict, key.to_dict()) + def test_sslib_key_from_dict_invalid(self): + """Test from_dict for invalid data""" + invalid_dicts = [ + {"scheme": "ed25519", "keyval": {"public": "abc"}}, + {"keytype": "ed25519", "keyval": {"public": "abc"}}, + {"keytype": "ed25519", "scheme": "ed25519"}, + {"keytype": "ed25519", "scheme": "ed25519", "keyval": {"x": "y"}}, + { + "keytype": "ed25519", + "scheme": "ed25519", + "keyval": {"public": b"abc"}, + }, + ] + for keydict in invalid_dicts: + with self.assertRaises((KeyError, ValueError)): + Key.from_dict("aa", keydict) + def test_key_verify_signature(self): sigdict = { "keyid": "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", From 549affd5e78a80f1720db556925b1aa4c6a34ad8 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Tue, 29 Nov 2022 11:35:13 +0200 Subject: [PATCH 16/21] signer API: Make Key actually abstract * Fix the incorrect metaclass syntax * Also refactor constructor so it is shorter --- securesystemslib/signer/_key.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/securesystemslib/signer/_key.py b/securesystemslib/signer/_key.py index 746fd217..b984a5f0 100644 --- a/securesystemslib/signer/_key.py +++ b/securesystemslib/signer/_key.py @@ -1,6 +1,6 @@ """Key interface and the default implementations""" -import abc import logging +from abc import ABCMeta, abstractmethod from typing import Any, Dict, Optional, Tuple, Type import securesystemslib.keys as sslib_keys @@ -15,7 +15,7 @@ KEY_FOR_TYPE_AND_SCHEME: Dict[Tuple[str, str], Type] = {} -class Key: +class Key(metaclass=ABCMeta): """Abstract class representing the public portion of a key. *All parameters named below are not just constructor arguments but also @@ -36,8 +36,6 @@ class Key: TypeError: Invalid type for an argument. """ - __metaclass__ = abc.ABCMeta - def __init__( self, keyid: str, @@ -54,10 +52,7 @@ def __init__( self.keytype = keytype self.scheme = scheme self.keyval = keyval - if unrecognized_fields is None: - unrecognized_fields = {} - - self.unrecognized_fields = unrecognized_fields + self.unrecognized_fields = unrecognized_fields or {} def __eq__(self, other: Any) -> bool: if not isinstance(other, Key): @@ -72,7 +67,7 @@ def __eq__(self, other: Any) -> bool: ) @classmethod - @abc.abstractmethod + @abstractmethod def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": """Creates ``Key`` object from a serialization dict @@ -95,7 +90,7 @@ def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": key_impl = KEY_FOR_TYPE_AND_SCHEME[(keytype, scheme)] # type: ignore return key_impl.from_dict(keyid, key_dict) - @abc.abstractmethod + @abstractmethod def to_dict(self) -> Dict[str, Any]: """Returns a serialization dict. @@ -103,7 +98,7 @@ def to_dict(self) -> Dict[str, Any]: """ raise NotImplementedError - @abc.abstractmethod + @abstractmethod def verify_signature(self, signature: Signature, data: bytes) -> None: """Raises if verification of signature over data fails. From b69303e0cb7684264fa7ce551bf680f1b70130e1 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Tue, 29 Nov 2022 11:46:52 +0200 Subject: [PATCH 17/21] signer API: Add doc example of using a custom signer --- securesystemslib/signer/_signer.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/securesystemslib/signer/_signer.py b/securesystemslib/signer/_signer.py index e05ffe34..a4e9ad9e 100644 --- a/securesystemslib/signer/_signer.py +++ b/securesystemslib/signer/_signer.py @@ -53,6 +53,17 @@ def sec_handler(secret_name:str) -> str: # user will be asked for a passphrase for encrypted key uri2 = "file:keys/myenckey?encrypted=true" signer2 = Signer.from_priv_key_uri(uri2, pub_key2, sec_handler) + + Applications can provide their own Signer and Key implementations: + + from securesystemslib.signer import Signer, SIGNER_FOR_URI_SCHEME + from mylib import MySigner + + SIGNER_FOR_URI_SCHEME[MySigner.MY_SCHEME] = MySigner + + This way the application code using signer API continues to work with + default signers but now also uses the custom signer when the proper URI is + used. """ __metaclass__ = abc.ABCMeta From acd99fd0c8dcac3f730cf72570a646ebaa97ce81 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Tue, 29 Nov 2022 12:05:11 +0200 Subject: [PATCH 18/21] tests: Add a test for custom Signer --- tests/test_signer.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/test_signer.py b/tests/test_signer.py index 52f5529a..58b15d8f 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -7,6 +7,7 @@ import shutil import tempfile import unittest +from typing import Optional import securesystemslib.keys as KEYS from securesystemslib.exceptions import ( @@ -20,9 +21,11 @@ from securesystemslib.gpg.functions import verify_signature as verify_sig from securesystemslib.signer import ( KEY_FOR_TYPE_AND_SCHEME, + SIGNER_FOR_URI_SCHEME, GPGSignature, GPGSigner, Key, + SecretsHandler, Signature, Signer, SSlibKey, @@ -227,6 +230,37 @@ def test_sslib_signer_sign(self): scheme_dict["scheme"] = valid_scheme + def test_custom_signer(self): + # setup + key = self.keys[0] + pubkey = SSlibKey.from_securesystemslib_key(key) + + class CustomSigner(SSlibSigner): + """Custom signer with a hard coded key""" + + CUSTOM_SCHEME = "custom" + + @classmethod + def from_priv_key_uri( + cls, + priv_key_uri: str, + public_key: Key, + secrets_handler: Optional[SecretsHandler] = None, + ) -> "CustomSigner": + return cls(key) + + # register custom signer + SIGNER_FOR_URI_SCHEME[CustomSigner.CUSTOM_SCHEME] = CustomSigner + + # test signing + signer = Signer.from_priv_key_uri("custom:foo", pubkey) + self.assertIsInstance(signer, CustomSigner) + sig = signer.sign(self.DATA) + + pubkey.verify_signature(sig, self.DATA) + with self.assertRaises(UnverifiedSignatureError): + pubkey.verify_signature(sig, b"NOT DATA") + def test_signature_from_to_dict(self): signature_dict = { "sig": "30460221009342e4566528fcecf6a7a5d53ebacdb1df151e242f55f8775883469cb01dbc6602210086b426cc826709acfa2c3f9214610cb0a832db94bbd266fd7c5939a48064a851", From c3c1ca87f17fc605adfa0673cb958e94069e7ac9 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Tue, 29 Nov 2022 15:22:49 +0200 Subject: [PATCH 19/21] tests: Add test for custom Key --- tests/test_signer.py | 69 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/tests/test_signer.py b/tests/test_signer.py index 58b15d8f..5224858b 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -7,7 +7,7 @@ import shutil import tempfile import unittest -from typing import Optional +from typing import Any, Dict, Optional import securesystemslib.keys as KEYS from securesystemslib.exceptions import ( @@ -92,6 +92,73 @@ def test_key_verify_signature(self): with self.assertRaises(UnverifiedSignatureError): key.verify_signature(sig, b"NOT DATA") + def test_unsupported_key(self): + keydict = { + "keytype": "custom", + "scheme": "ed25519", + "keyval": { + "public": "8ae43d22b8e0fbf4a48fa3490d31b4d389114f5dc1039c918f075427f4100759", + }, + } + with self.assertRaises(ValueError): + Key.from_dict( + "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", + keydict, + ) + + def test_custom_key(self): + class CustomKey(SSlibKey): + """Fake keytype that actually uses ed25519 under the hood""" + + @classmethod + def from_dict( + cls, keyid: str, key_dict: Dict[str, Any] + ) -> "CustomKey": + assert key_dict.pop("keytype") == "custom" + keytype = "ed25519" + scheme = key_dict.pop("scheme") + keyval = key_dict.pop("keyval") + return cls(keyid, keytype, scheme, keyval, key_dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "keytype": "custom", + "scheme": self.scheme, + "keyval": self.keyval, + **self.unrecognized_fields, + } + + # register custom key type + KEY_FOR_TYPE_AND_SCHEME[("custom", "ed25519")] = CustomKey + + # setup + sig = Signature.from_dict( + { + "keyid": "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", + "sig": "3fc91f5411a567d6a7f28b7fbb9ba6d60b1e2a1b64d8af0b119650015d86bb5a55e57c0e2c995a9b4a332b8f435703e934c0e6ce69fe6674a8ce68719394a40b", + } + ) + + keydict = { + "keytype": "custom", + "scheme": "ed25519", + "keyval": { + "public": "8ae43d22b8e0fbf4a48fa3490d31b4d389114f5dc1039c918f075427f4100759", + }, + } + key = Key.from_dict( + "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", + keydict, + ) + + # test that CustomKey is used and that it works + self.assertIsInstance(key, CustomKey) + key.verify_signature(sig, b"DATA") + with self.assertRaises(UnverifiedSignatureError): + key.verify_signature(sig, b"NOT DATA") + + del KEY_FOR_TYPE_AND_SCHEME[("custom", "ed25519")] + class TestSigner(unittest.TestCase): """Test Signer and SSlibSigner functionality""" From 98442f5cb17204cdbb726561f2a953ee704eadba Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Tue, 29 Nov 2022 15:28:15 +0200 Subject: [PATCH 20/21] signer: Add note about unhandled exceptions There is (at least currently) no promise that Signer implementations do not raise unexpected errors. --- securesystemslib/signer/_signer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/securesystemslib/signer/_signer.py b/securesystemslib/signer/_signer.py index a4e9ad9e..d85f3127 100644 --- a/securesystemslib/signer/_signer.py +++ b/securesystemslib/signer/_signer.py @@ -33,6 +33,11 @@ class Signer: signer = Signer.from_priv_key_uri("envvar:MYPRIVKEY", pub_key) sig = signer.sign(b"data") + Note that signer implementations may raise errors (during both + Signer.from_priv_key_uri() and Signer.sign()) that are not documented here: + examples could include network errors or file read errors. Applications + should use generic try-except here if unexpected raises are not an option. + See SIGNER_FOR_URI_SCHEME for supported private key URI schemes. The currently supported default schemes are: * envvar: see SSlibSigner for details From 10edc1a3112315b44d4b20bef0b00c401398f18a Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Wed, 30 Nov 2022 20:19:16 +0200 Subject: [PATCH 21/21] signer API: Fix unrecognized_fields * do not check falsyness of argument containers: the results are unexpected if the argument container is empty * Do not use Mapping annotation for unrecognized_fields. The original idea was that Metadata API is not changing them: this is true but we don't want to prevent users from modifying the fields... --- securesystemslib/signer/_key.py | 6 +++++- securesystemslib/signer/_signature.py | 10 +++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/securesystemslib/signer/_key.py b/securesystemslib/signer/_key.py index b984a5f0..a5b33b6e 100644 --- a/securesystemslib/signer/_key.py +++ b/securesystemslib/signer/_key.py @@ -52,7 +52,11 @@ def __init__( self.keytype = keytype self.scheme = scheme self.keyval = keyval - self.unrecognized_fields = unrecognized_fields or {} + + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields def __eq__(self, other: Any) -> bool: if not isinstance(other, Key): diff --git a/securesystemslib/signer/_signature.py b/securesystemslib/signer/_signature.py index a94c36a9..190afc3c 100644 --- a/securesystemslib/signer/_signature.py +++ b/securesystemslib/signer/_signature.py @@ -1,7 +1,7 @@ """Signature container class""" import logging -from typing import Any, Dict, Mapping, Optional +from typing import Any, Dict, Optional logger = logging.getLogger(__name__) @@ -27,11 +27,15 @@ def __init__( self, keyid: str, sig: str, - unrecognized_fields: Optional[Mapping[str, Any]] = None, + unrecognized_fields: Optional[Dict[str, Any]] = None, ): self.keyid = keyid self.signature = sig - self.unrecognized_fields: Mapping[str, Any] = unrecognized_fields or {} + + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields def __eq__(self, other: Any) -> bool: if not isinstance(other, Signature):