diff --git a/mypy.ini b/mypy.ini index 6a13550c..47e7fce2 100644 --- a/mypy.ini +++ b/mypy.ini @@ -2,6 +2,7 @@ warn_unused_configs = True files = securesystemslib/util.py, + securesystemslib/signer/*.py, securesystemslib/storage.py, securesystemslib/gpg/constants.py diff --git a/securesystemslib/exceptions.py b/securesystemslib/exceptions.py index 2ace2679..5f25fbe0 100755 --- a/securesystemslib/exceptions.py +++ b/securesystemslib/exceptions.py @@ -138,3 +138,12 @@ class StorageError(Error): backend.""" pass # pylint: disable=unnecessary-pass + + +class UnverifiedSignatureError(Error): + """Signature could not be verified: either signature was incorrect or + something failed during process (see VerificationError)""" + + +class VerificationError(UnverifiedSignatureError): + """Signature could not be verified because something failed in the process""" diff --git a/securesystemslib/signer.py b/securesystemslib/signer.py deleted file mode 100644 index 4b9e6c68..00000000 --- a/securesystemslib/signer.py +++ /dev/null @@ -1,268 +0,0 @@ -"""Signer interface and example interface implementations. - -The goal of this module is to provide a signing interface supporting multiple -signing implementations and a couple of example implementations. - -""" - -import abc -from typing import Any, Dict, Mapping, Optional - -import securesystemslib.gpg.functions as gpg -import securesystemslib.keys as sslib_keys - - -class Signature: - """A container class containing information about a signature. - - Contains a signature and the keyid uniquely identifying the key used - to generate the signature. - - Provides utility methods to easily create an object from a dictionary - and return the dictionary representation of the object. - - Attributes: - keyid: HEX string used as a unique identifier of the key. - signature: HEX string representing the signature. - unrecognized_fields: Dictionary of all attributes that are not managed - by securesystemslib. - - """ - - def __init__( - self, - keyid: str, - sig: str, - unrecognized_fields: Optional[Mapping[str, Any]] = None, - ): - self.keyid = keyid - self.signature = sig - self.unrecognized_fields: Mapping[str, Any] = unrecognized_fields or {} - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Signature): - return False - - return ( - self.keyid == other.keyid - and self.signature == other.signature - and self.unrecognized_fields == other.unrecognized_fields - ) - - @classmethod - def from_dict(cls, signature_dict: Dict) -> "Signature": - """Creates a Signature object from its JSON/dict representation. - - Arguments: - signature_dict: - A dict containing a valid keyid and a signature. - Note that the fields in it should be named "keyid" and "sig" - respectively. - - Raises: - KeyError: If any of the "keyid" and "sig" fields are missing from - the signature_dict. - - Side Effect: - Destroys the metadata dict passed by reference. - - Returns: - A "Signature" instance. - """ - - keyid = signature_dict.pop("keyid") - sig = signature_dict.pop("sig") - # All fields left in the signature_dict are unrecognized. - return cls(keyid, sig, signature_dict) - - def to_dict(self) -> Dict: - """Returns the JSON-serializable dictionary representation of self.""" - - return { - "keyid": self.keyid, - "sig": self.signature, - **self.unrecognized_fields, - } - - -class GPGSignature(Signature): - """A container class containing information about a gpg signature. - - Besides the signature, it also contains other meta information - needed to uniquely identify the key used to generate the signature. - - Attributes: - keyid: HEX string used as a unique identifier of the key. - signature: HEX string representing the signature. - other_headers: HEX representation of additional GPG headers. - """ - - def __init__( - self, - keyid: str, - signature: str, - other_headers: str, - ): - super().__init__(keyid, signature) - self.other_headers = other_headers - - @classmethod - def from_dict(cls, signature_dict: Dict) -> "GPGSignature": - """Creates a GPGSignature object from its JSON/dict representation. - - Args: - signature_dict: Dict containing valid "keyid", "signature" and - "other_fields" fields. - - Raises: - KeyError: If any of the "keyid", "sig" or "other_headers" fields - are missing from the signature_dict. - - Returns: - GPGSignature instance. - """ - - return cls( - signature_dict["keyid"], - signature_dict["signature"], - signature_dict["other_headers"], - ) - - def to_dict(self) -> Dict: - """Returns the JSON-serializable dictionary representation of self.""" - return { - "keyid": self.keyid, - "signature": self.signature, - "other_headers": self.other_headers, - } - - -class Signer: - """Signer interface created to support multiple signing implementations.""" - - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def sign(self, payload: bytes) -> Signature: - """Signs a given payload by the key assigned to the Signer instance. - - Arguments: - payload: The bytes to be signed. - - Returns: - Returns a "Signature" class instance. - """ - raise NotImplementedError # pragma: no cover - - -class SSlibSigner(Signer): - """A securesystemslib signer implementation. - - Provides a sign method to generate a cryptographic signature with a - securesystemslib-style rsa, ed25519 or ecdsa private key on the instance. - The signature scheme is determined by the key and must be one of: - - - rsa(ssa-pss|pkcs1v15)-(md5|sha1|sha224|sha256|sha384|sha512) (12 schemes) - - ed25519 - - ecdsa-sha2-nistp256 - - See "securesystemslib.interface" for functions to generate and load keys. - - Attributes: - key_dict: - A securesystemslib-style key dictionary, which includes a keyid, - key type, signature scheme, and the public and private key values, - e.g.:: - - { - "keytype": "rsa", - "scheme": "rsassa-pss-sha256", - "keyid": "f30a0870d026980100c0573bd557394f8c1bbd6...", - "keyval": { - "public": "-----BEGIN RSA PUBLIC KEY----- ...", - "private": "-----BEGIN RSA PRIVATE KEY----- ..." - } - } - - The public and private keys are strings in PEM format. - """ - - def __init__(self, key_dict: Dict): - self.key_dict = key_dict - - def sign(self, payload: bytes) -> Signature: - """Signs a given payload by the key assigned to the SSlibSigner instance. - - Arguments: - payload: The bytes to be signed. - - Raises: - securesystemslib.exceptions.FormatError: Key argument is malformed. - securesystemslib.exceptions.CryptoError, \ - securesystemslib.exceptions.UnsupportedAlgorithmError: - Signing errors. - - Returns: - Returns a "Signature" class instance. - """ - - sig_dict = sslib_keys.create_signature(self.key_dict, payload) - return Signature(**sig_dict) - - -class GPGSigner(Signer): - """A securesystemslib gpg implementation of the "Signer" interface. - - Provides a sign method to generate a cryptographic signature with gpg, using - an RSA, DSA or EdDSA private key identified by the keyid on the instance. - - Args: - keyid: The keyid of the gpg signing keyid. If not passed the default - key in the keyring is used. - - homedir: Path to the gpg keyring. If not passed the default keyring - is used. - - """ - - def __init__( - self, keyid: Optional[str] = None, homedir: Optional[str] = None - ): - self.keyid = keyid - self.homedir = homedir - - def sign(self, payload: bytes) -> GPGSignature: - """Signs a given payload by the key assigned to the GPGSigner instance. - - Calls the gpg command line utility to sign the passed content with the - key identified by the passed keyid from the gpg keyring at the passed - homedir. - - The executed base command is defined in - securesystemslib.gpg.constants.gpg_sign_command. - - Arguments: - payload: The bytes to be signed. - - Raises: - securesystemslib.exceptions.FormatError: - If the keyid was passed and does not match - securesystemslib.formats.KEYID_SCHEMA. - - ValueError: the gpg command failed to create a valid signature. - OSError: the gpg command is not present or non-executable. - securesystemslib.exceptions.UnsupportedLibraryError: the gpg - command is not available, or the cryptography library is - not installed. - securesystemslib.gpg.exceptions.CommandError: the gpg command - returned a non-zero exit code. - securesystemslib.gpg.exceptions.KeyNotFoundError: the used gpg - version is not fully supported and no public key can be found - for short keyid. - - Returns: - Returns a "GPGSignature" class instance. - """ - - sig_dict = gpg.create_signature(payload, self.keyid, self.homedir) - return GPGSignature(**sig_dict) diff --git a/securesystemslib/signer/__init__.py b/securesystemslib/signer/__init__.py new file mode 100644 index 00000000..7924eae3 --- /dev/null +++ b/securesystemslib/signer/__init__.py @@ -0,0 +1,15 @@ +""" +The Signer API + +This module provides extensible interfaces for public keys and signers: +Some implementations are provided by default but more can be added by users. +""" +from securesystemslib.signer._key import KEY_FOR_TYPE_AND_SCHEME, Key, SSlibKey +from securesystemslib.signer._signature import GPGSignature, Signature +from securesystemslib.signer._signer import ( + SIGNER_FOR_URI_SCHEME, + GPGSigner, + SecretsHandler, + Signer, + SSlibSigner, +) diff --git a/securesystemslib/signer/_key.py b/securesystemslib/signer/_key.py new file mode 100644 index 00000000..a5b33b6e --- /dev/null +++ b/securesystemslib/signer/_key.py @@ -0,0 +1,206 @@ +"""Key interface and the default implementations""" +import logging +from abc import ABCMeta, abstractmethod +from typing import Any, Dict, Optional, Tuple, Type + +import securesystemslib.keys as sslib_keys +from securesystemslib import exceptions +from securesystemslib.signer._signature import Signature + +logger = logging.getLogger(__name__) + +# NOTE dict for Key dispatch defined here, but filled at end of file when +# subclass definitions are available. Users can add Key implementations. + +KEY_FOR_TYPE_AND_SCHEME: Dict[Tuple[str, str], Type] = {} + + +class Key(metaclass=ABCMeta): + """Abstract class representing the public portion of a key. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + keyid: Key identifier that is unique within the metadata it is used in. + Keyid is not verified to be the hash of a specific representation + of the key. + keytype: Key type, e.g. "rsa", "ed25519" or "ecdsa-sha2-nistp256". + scheme: Signature scheme. For example: + "rsassa-pss-sha256", "ed25519", and "ecdsa-sha2-nistp256". + keyval: Opaque key content + unrecognized_fields: Dictionary of all attributes that are not managed + by Securesystemslib + + Raises: + TypeError: Invalid type for an argument. + """ + + def __init__( + self, + keyid: str, + keytype: str, + scheme: str, + keyval: Dict[str, Any], + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + if not all( + isinstance(at, str) for at in [keyid, keytype, scheme] + ) or not isinstance(keyval, dict): + raise TypeError("Unexpected Key attributes types!") + self.keyid = keyid + self.keytype = keytype + self.scheme = scheme + self.keyval = keyval + + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Key): + return False + + return ( + self.keyid == other.keyid + and self.keytype == other.keytype + and self.scheme == other.scheme + and self.keyval == other.keyval + and self.unrecognized_fields == other.unrecognized_fields + ) + + @classmethod + @abstractmethod + def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": + """Creates ``Key`` object from a serialization dict + + Key implementations must override this factory constructor that is used + as a deserialization helper. + + Users should call Key.from_dict(): it dispatches to the actual subclass + implementation based on supported keys in KEY_FOR_TYPE_AND_SCHEME. + + Raises: + KeyError, TypeError: Invalid arguments. + """ + keytype = key_dict.get("keytype") + scheme = key_dict.get("scheme") + if (keytype, scheme) not in KEY_FOR_TYPE_AND_SCHEME: + raise ValueError(f"Unsupported public key {keytype}/{scheme}") + + # NOTE: Explicitly not checking the keytype and scheme types to allow + # intoto to use (None,None) to lookup GPGKey, see issue #450 + key_impl = KEY_FOR_TYPE_AND_SCHEME[(keytype, scheme)] # type: ignore + return key_impl.from_dict(keyid, key_dict) + + @abstractmethod + def to_dict(self) -> Dict[str, Any]: + """Returns a serialization dict. + + Key implementations must override this serialization helper. + """ + raise NotImplementedError + + @abstractmethod + def verify_signature(self, signature: Signature, data: bytes) -> None: + """Raises if verification of signature over data fails. + + Args: + signature: Signature object. + data: Payload bytes. + + Raises: + UnverifiedSignatureError: Failed to verify signature. + VerificationError: Signature verification process error. If you + are only interested in the verify result, just handle + UnverifiedSignatureError: it contains VerificationError as well + """ + raise NotImplementedError + + +class SSlibKey(Key): + """Key implementation for RSA, Ed25519, ECDSA and Sphincs keys""" + + def to_securesystemslib_key(self) -> Dict[str, Any]: + """Internal helper, returns a classic securesystemslib keydict""" + return { + "keyid": self.keyid, + "keytype": self.keytype, + "scheme": self.scheme, + "keyval": self.keyval, + } + + @classmethod + def from_securesystemslib_key(cls, key_dict: Dict[str, Any]) -> "SSlibKey": + """Constructor from classic securesystemslib keydict""" + # ensure possible private keys are not included in keyval + return SSlibKey( + key_dict["keyid"], + key_dict["keytype"], + key_dict["scheme"], + {"public": key_dict["keyval"]["public"]}, + ) + + @classmethod + def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "SSlibKey": + keytype = key_dict.pop("keytype") + scheme = key_dict.pop("scheme") + keyval = key_dict.pop("keyval") + + if "public" not in keyval or not isinstance(keyval["public"], str): + raise ValueError(f"public key string required for scheme {scheme}") + + # All fields left in the key_dict are unrecognized. + return cls(keyid, keytype, scheme, keyval, key_dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "keytype": self.keytype, + "scheme": self.scheme, + "keyval": self.keyval, + **self.unrecognized_fields, + } + + def verify_signature(self, signature: Signature, data: bytes) -> None: + try: + if not sslib_keys.verify_signature( + self.to_securesystemslib_key(), + signature.to_dict(), + data, + ): + raise exceptions.UnverifiedSignatureError( + f"Failed to verify signature by {self.keyid}" + ) + except ( + exceptions.CryptoError, + exceptions.FormatError, + exceptions.UnsupportedAlgorithmError, + ) as e: + logger.info("Key %s failed to verify sig: %s", self.keyid, str(e)) + raise exceptions.VerificationError( + f"Unknown failure to verify signature by {self.keyid}" + ) from e + + +# Supported key types and schemes, and the Keys implementing them +KEY_FOR_TYPE_AND_SCHEME = { + ("ecdsa", "ecdsa-sha2-nistp256"): SSlibKey, + ("ecdsa", "ecdsa-sha2-nistp384"): SSlibKey, + ("ecdsa-sha2-nistp256", "ecdsa-sha2-nistp256"): SSlibKey, + ("ecdsa-sha2-nistp384", "ecdsa-sha2-nistp384"): SSlibKey, + ("ed25519", "ed25519"): SSlibKey, + ("rsa", "rsassa-pss-md5"): SSlibKey, + ("rsa", "rsassa-pss-sha1"): SSlibKey, + ("rsa", "rsassa-pss-sha224"): SSlibKey, + ("rsa", "rsassa-pss-sha256"): SSlibKey, + ("rsa", "rsassa-pss-sha384"): SSlibKey, + ("rsa", "rsassa-pss-sha512"): SSlibKey, + ("rsa", "rsa-pkcs1v15-md5"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha1"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha224"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha256"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha384"): SSlibKey, + ("rsa", "rsa-pkcs1v15-sha512"): SSlibKey, + ("sphincs", "sphincs-shake-128s"): SSlibKey, +} diff --git a/securesystemslib/signer/_signature.py b/securesystemslib/signer/_signature.py new file mode 100644 index 00000000..190afc3c --- /dev/null +++ b/securesystemslib/signer/_signature.py @@ -0,0 +1,135 @@ +"""Signature container class""" + +import logging +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + + +class Signature: + """A container class containing information about a signature. + + Contains a signature and the keyid uniquely identifying the key used + to generate the signature. + + Provides utility methods to easily create an object from a dictionary + and return the dictionary representation of the object. + + Attributes: + keyid: HEX string used as a unique identifier of the key. + signature: HEX string representing the signature. + unrecognized_fields: Dictionary of all attributes that are not managed + by securesystemslib. + + """ + + def __init__( + self, + keyid: str, + sig: str, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + self.keyid = keyid + self.signature = sig + + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Signature): + return False + + return ( + self.keyid == other.keyid + and self.signature == other.signature + and self.unrecognized_fields == other.unrecognized_fields + ) + + @classmethod + def from_dict(cls, signature_dict: Dict) -> "Signature": + """Creates a Signature object from its JSON/dict representation. + + Arguments: + signature_dict: + A dict containing a valid keyid and a signature. + Note that the fields in it should be named "keyid" and "sig" + respectively. + + Raises: + KeyError: If any of the "keyid" and "sig" fields are missing from + the signature_dict. + + Side Effect: + Destroys the metadata dict passed by reference. + + Returns: + A "Signature" instance. + """ + + keyid = signature_dict.pop("keyid") + sig = signature_dict.pop("sig") + # All fields left in the signature_dict are unrecognized. + return cls(keyid, sig, signature_dict) + + def to_dict(self) -> Dict: + """Returns the JSON-serializable dictionary representation of self.""" + + return { + "keyid": self.keyid, + "sig": self.signature, + **self.unrecognized_fields, + } + + +class GPGSignature(Signature): + """A container class containing information about a gpg signature. + + Besides the signature, it also contains other meta information + needed to uniquely identify the key used to generate the signature. + + Attributes: + keyid: HEX string used as a unique identifier of the key. + signature: HEX string representing the signature. + other_headers: HEX representation of additional GPG headers. + """ + + def __init__( + self, + keyid: str, + signature: str, + other_headers: str, + ): + super().__init__(keyid, signature) + self.other_headers = other_headers + + @classmethod + def from_dict(cls, signature_dict: Dict) -> "GPGSignature": + """Creates a GPGSignature object from its JSON/dict representation. + + Args: + signature_dict: Dict containing valid "keyid", "signature" and + "other_fields" fields. + + Raises: + KeyError: If any of the "keyid", "sig" or "other_headers" fields + are missing from the signature_dict. + + Returns: + GPGSignature instance. + """ + + return cls( + signature_dict["keyid"], + signature_dict["signature"], + signature_dict["other_headers"], + ) + + def to_dict(self) -> Dict: + """Returns the JSON-serializable dictionary representation of self.""" + return { + "keyid": self.keyid, + "signature": self.signature, + "other_headers": self.other_headers, + } diff --git a/securesystemslib/signer/_signer.py b/securesystemslib/signer/_signer.py new file mode 100644 index 00000000..d85f3127 --- /dev/null +++ b/securesystemslib/signer/_signer.py @@ -0,0 +1,282 @@ +"""Signer interface and the default implementations""" + +import abc +import logging +import os +from typing import Callable, Dict, Optional, Type +from urllib import parse + +import securesystemslib.gpg.functions as gpg +import securesystemslib.keys as sslib_keys +from securesystemslib.signer._key import Key, SSlibKey +from securesystemslib.signer._signature import GPGSignature, Signature + +logger = logging.getLogger(__name__) + +# NOTE dict for Signer dispatch defined here, but filled at end of file when +# subclass definitions are available. Users can add Signer implementations. +SIGNER_FOR_URI_SCHEME: Dict[str, Type] = {} + + +# SecretsHandler is a function the calling code can provide to Signer: +# SecretsHandler will be called if Signer needs additional secrets. +# The argument is the name of the secret ("PIN", "passphrase", etc). +# Return value is the secret string. +SecretsHandler = Callable[[str], str] + + +class Signer: + """Signer interface that supports multiple signing implementations. + + Usage example: + + signer = Signer.from_priv_key_uri("envvar:MYPRIVKEY", pub_key) + sig = signer.sign(b"data") + + Note that signer implementations may raise errors (during both + Signer.from_priv_key_uri() and Signer.sign()) that are not documented here: + examples could include network errors or file read errors. Applications + should use generic try-except here if unexpected raises are not an option. + + See SIGNER_FOR_URI_SCHEME for supported private key URI schemes. The + currently supported default schemes are: + * envvar: see SSlibSigner for details + * file: see SSlibSigner for details + + Interactive applications may also define a secrets handler that allows + asking for user secrets if they are needed: + + from getpass import getpass + + def sec_handler(secret_name:str) -> str: + return getpass(f"Enter {secret_name}: ") + + # user will not be asked for a passphrase for unencrypted key + uri = "file:keys/mykey?encrypted=false" + signer = Signer.from_priv_key_uri(uri, pub_key, sec_handler) + + # user will be asked for a passphrase for encrypted key + uri2 = "file:keys/myenckey?encrypted=true" + signer2 = Signer.from_priv_key_uri(uri2, pub_key2, sec_handler) + + Applications can provide their own Signer and Key implementations: + + from securesystemslib.signer import Signer, SIGNER_FOR_URI_SCHEME + from mylib import MySigner + + SIGNER_FOR_URI_SCHEME[MySigner.MY_SCHEME] = MySigner + + This way the application code using signer API continues to work with + default signers but now also uses the custom signer when the proper URI is + used. + """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def sign(self, payload: bytes) -> Signature: + """Signs a given payload by the key assigned to the Signer instance. + + Arguments: + payload: The bytes to be signed. + + Returns: + Returns a "Signature" class instance. + """ + raise NotImplementedError # pragma: no cover + + @classmethod + @abc.abstractmethod + def from_priv_key_uri( + cls, + priv_key_uri: str, + public_key: Key, + secrets_handler: Optional[SecretsHandler] = None, + ) -> "Signer": + """Factory constructor for a given private key URI + + Returns a specific Signer instance based on the private key URI and the + supported uri schemes listed in SIGNER_FOR_URI_SCHEME. + + Args: + priv_key_uri: URI that identifies the private key + public_key: Key that is the public portion of this private key + secrets_handler: Optional function that may be called if the + signer needs additional secrets (like a PIN or passphrase). + secrets_handler should return the requested secret string. + + Raises: + ValueError: Incorrect arguments + Other Signer-specific errors: These could include OSErrors for + reading files or network errors for connecting to a KMS. + """ + + scheme, _, _ = priv_key_uri.partition(":") + if scheme not in SIGNER_FOR_URI_SCHEME: + raise ValueError(f"Unsupported private key scheme {scheme}") + + signer = SIGNER_FOR_URI_SCHEME[scheme] + return signer.from_priv_key_uri( + priv_key_uri, public_key, secrets_handler + ) + + +class SSlibSigner(Signer): + """A securesystemslib signer implementation. + + Provides a sign method to generate a cryptographic signature with a + securesystemslib-style rsa, ed25519 or ecdsa key. See keys module + for the supported types, schemes and hash algorithms. + + SSlibSigners should be instantiated with Signer.from_priv_key_uri(). + These private key URI schemes are supported: + * "envvar:": + VAR is an environment variable with unencrypted private key content. + envvar:MYPRIVKEY + * "file:?encrypted=[true|false]": + PATH is a file path to a file with private key content. If + encrypted=true, the file is expected to have been created with + securesystemslib.keys.encrypt_key(). + file:path/to/file?encrypted=true + file:/abs/path/to/file?encrypted=false + + Attributes: + key_dict: + A securesystemslib-style key dictionary. This is an implementation + detail, not part of public API + """ + + ENVVAR_URI_SCHEME = "envvar" + FILE_URI_SCHEME = "file" + + def __init__(self, key_dict: Dict): + self.key_dict = key_dict + + @classmethod + def from_priv_key_uri( + cls, + priv_key_uri: str, + public_key: Key, + secrets_handler: Optional[SecretsHandler] = None, + ) -> "SSlibSigner": + """Constructor for Signer to call + + Please refer to Signer.from_priv_key_uri() documentation. + + Additionally raises: + OSError: Reading the file failed with "file:" URI + """ + if not isinstance(public_key, SSlibKey): + raise ValueError(f"Expected SSlibKey for {priv_key_uri}") + + uri = parse.urlparse(priv_key_uri) + + if uri.scheme == cls.ENVVAR_URI_SCHEME: + # read private key from environment variable + private = os.getenv(uri.path) + if private is None: + raise ValueError(f"Unset env var for {priv_key_uri}") + + elif uri.scheme == cls.FILE_URI_SCHEME: + params = dict(parse.parse_qsl(uri.query)) + if "encrypted" not in params: + raise ValueError(f"{uri.scheme} requires 'encrypted' parameter") + + # read private key (may be encrypted or not) from file + with open(uri.path, "rb") as f: + private = f.read().decode() + + if params["encrypted"] != "false": + if not secrets_handler: + raise ValueError("encrypted key requires a secrets handler") + + secret = secrets_handler("passphrase") + decrypted = sslib_keys.decrypt_key(private, secret) + private = decrypted["keyval"]["private"] + + else: + raise ValueError(f"SSlibSigner does not support {priv_key_uri}") + + keydict = public_key.to_securesystemslib_key() + keydict["keyval"]["private"] = private + return cls(keydict) + + def sign(self, payload: bytes) -> Signature: + """Signs a given payload by the key assigned to the SSlibSigner instance. + + Please see Signer.sign() documentation. + + Additionally raises: + securesystemslib.exceptions.FormatError: Key argument is malformed. + securesystemslib.exceptions.CryptoError, \ + securesystemslib.exceptions.UnsupportedAlgorithmError: + Signing errors. + """ + sig_dict = sslib_keys.create_signature(self.key_dict, payload) + return Signature(**sig_dict) + + +class GPGSigner(Signer): + """A securesystemslib gpg implementation of the "Signer" interface. + + Provides a sign method to generate a cryptographic signature with gpg, using + an RSA, DSA or EdDSA private key identified by the keyid on the instance. + """ + + def __init__( + self, keyid: Optional[str] = None, homedir: Optional[str] = None + ): + self.keyid = keyid + self.homedir = homedir + + @classmethod + def from_priv_key_uri( + cls, + priv_key_uri: str, + public_key: Key, + secrets_handler: Optional[SecretsHandler] = None, + ) -> "GPGSigner": + raise NotImplementedError("Incompatible with private key URIs") + + def sign(self, payload: bytes) -> GPGSignature: + """Signs a given payload by the key assigned to the GPGSigner instance. + + Calls the gpg command line utility to sign the passed content with the + key identified by the passed keyid from the gpg keyring at the passed + homedir. + + The executed base command is defined in + securesystemslib.gpg.constants.gpg_sign_command. + + Arguments: + payload: The bytes to be signed. + + Raises: + securesystemslib.exceptions.FormatError: + If the keyid was passed and does not match + securesystemslib.formats.KEYID_SCHEMA. + + ValueError: the gpg command failed to create a valid signature. + OSError: the gpg command is not present or non-executable. + securesystemslib.exceptions.UnsupportedLibraryError: the gpg + command is not available, or the cryptography library is + not installed. + securesystemslib.gpg.exceptions.CommandError: the gpg command + returned a non-zero exit code. + securesystemslib.gpg.exceptions.KeyNotFoundError: the used gpg + version is not fully supported and no public key can be found + for short keyid. + + Returns: + Returns a "GPGSignature" class instance. + """ + + sig_dict = gpg.create_signature(payload, self.keyid, self.homedir) + return GPGSignature(**sig_dict) + + +# Supported private key uri schemes and the Signers implementing them +SIGNER_FOR_URI_SCHEME = { + SSlibSigner.ENVVAR_URI_SCHEME: SSlibSigner, + SSlibSigner.FILE_URI_SCHEME: SSlibSigner, +} diff --git a/tests/test_signer.py b/tests/test_signer.py index ecdcccd0..5224858b 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -7,43 +7,266 @@ import shutil import tempfile import unittest +from typing import Any, Dict, Optional -import securesystemslib.formats import securesystemslib.keys as KEYS -from securesystemslib.exceptions import FormatError, UnsupportedAlgorithmError +from securesystemslib.exceptions import ( + CryptoError, + FormatError, + UnsupportedAlgorithmError, + UnverifiedSignatureError, +) from securesystemslib.gpg.constants import have_gpg from securesystemslib.gpg.functions import export_pubkey from securesystemslib.gpg.functions import verify_signature as verify_sig from securesystemslib.signer import ( + KEY_FOR_TYPE_AND_SCHEME, + SIGNER_FOR_URI_SCHEME, GPGSignature, GPGSigner, + Key, + SecretsHandler, Signature, + Signer, + SSlibKey, SSlibSigner, ) -class TestSSlibSigner( - unittest.TestCase -): # pylint: disable=missing-class-docstring +class TestKey(unittest.TestCase): + """Key tests. See many more tests in python-tuf test suite""" + + def test_key_from_to_dict(self): + """Test to/from_dict for known keytype/scheme combos""" + for (keytype, scheme), key_impl in KEY_FOR_TYPE_AND_SCHEME.items(): + keydict = { + "keytype": keytype, + "scheme": scheme, + "extra": "somedata", + "keyval": { + "public": "pubkeyval", + "foo": "bar", + }, + } + + key = Key.from_dict("aa", copy.deepcopy(keydict)) + self.assertIsInstance(key, key_impl) + self.assertDictEqual(keydict, key.to_dict()) + + def test_sslib_key_from_dict_invalid(self): + """Test from_dict for invalid data""" + invalid_dicts = [ + {"scheme": "ed25519", "keyval": {"public": "abc"}}, + {"keytype": "ed25519", "keyval": {"public": "abc"}}, + {"keytype": "ed25519", "scheme": "ed25519"}, + {"keytype": "ed25519", "scheme": "ed25519", "keyval": {"x": "y"}}, + { + "keytype": "ed25519", + "scheme": "ed25519", + "keyval": {"public": b"abc"}, + }, + ] + for keydict in invalid_dicts: + with self.assertRaises((KeyError, ValueError)): + Key.from_dict("aa", keydict) + + def test_key_verify_signature(self): + sigdict = { + "keyid": "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", + "sig": "3fc91f5411a567d6a7f28b7fbb9ba6d60b1e2a1b64d8af0b119650015d86bb5a55e57c0e2c995a9b4a332b8f435703e934c0e6ce69fe6674a8ce68719394a40b", + } + keydict = { + "keytype": "ed25519", + "scheme": "ed25519", + "keyval": { + "public": "8ae43d22b8e0fbf4a48fa3490d31b4d389114f5dc1039c918f075427f4100759", + }, + } + key = Key.from_dict( + "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", + keydict, + ) + sig = Signature.from_dict(sigdict) + + key.verify_signature(sig, b"DATA") + with self.assertRaises(UnverifiedSignatureError): + key.verify_signature(sig, b"NOT DATA") + + def test_unsupported_key(self): + keydict = { + "keytype": "custom", + "scheme": "ed25519", + "keyval": { + "public": "8ae43d22b8e0fbf4a48fa3490d31b4d389114f5dc1039c918f075427f4100759", + }, + } + with self.assertRaises(ValueError): + Key.from_dict( + "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", + keydict, + ) + + def test_custom_key(self): + class CustomKey(SSlibKey): + """Fake keytype that actually uses ed25519 under the hood""" + + @classmethod + def from_dict( + cls, keyid: str, key_dict: Dict[str, Any] + ) -> "CustomKey": + assert key_dict.pop("keytype") == "custom" + keytype = "ed25519" + scheme = key_dict.pop("scheme") + keyval = key_dict.pop("keyval") + return cls(keyid, keytype, scheme, keyval, key_dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "keytype": "custom", + "scheme": self.scheme, + "keyval": self.keyval, + **self.unrecognized_fields, + } + + # register custom key type + KEY_FOR_TYPE_AND_SCHEME[("custom", "ed25519")] = CustomKey + + # setup + sig = Signature.from_dict( + { + "keyid": "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", + "sig": "3fc91f5411a567d6a7f28b7fbb9ba6d60b1e2a1b64d8af0b119650015d86bb5a55e57c0e2c995a9b4a332b8f435703e934c0e6ce69fe6674a8ce68719394a40b", + } + ) + + keydict = { + "keytype": "custom", + "scheme": "ed25519", + "keyval": { + "public": "8ae43d22b8e0fbf4a48fa3490d31b4d389114f5dc1039c918f075427f4100759", + }, + } + key = Key.from_dict( + "e33221e745d40465d1efc0215d6db83e5fdb83ea16e1fb894d09d6d96c456f3b", + keydict, + ) + + # test that CustomKey is used and that it works + self.assertIsInstance(key, CustomKey) + key.verify_signature(sig, b"DATA") + with self.assertRaises(UnverifiedSignatureError): + key.verify_signature(sig, b"NOT DATA") + + del KEY_FOR_TYPE_AND_SCHEME[("custom", "ed25519")] + + +class TestSigner(unittest.TestCase): + """Test Signer and SSlibSigner functionality""" + @classmethod def setUpClass(cls): - cls.rsakey_dict = KEYS.generate_rsa_key() - cls.ed25519key_dict = KEYS.generate_ed25519_key() - cls.ecdsakey_dict = KEYS.generate_ecdsa_key() - cls.sphincskey_dict = KEYS.generate_sphincs_key() - cls.DATA_STR = "SOME DATA REQUIRING AUTHENTICITY." - cls.DATA = securesystemslib.formats.encode_canonical( - cls.DATA_STR - ).encode("utf-8") - - def test_sslib_sign(self): - dicts = [ - self.rsakey_dict, - self.ecdsakey_dict, - self.ed25519key_dict, - self.sphincskey_dict, + cls.keys = [ + KEYS.generate_rsa_key(), + KEYS.generate_ed25519_key(), + KEYS.generate_ecdsa_key(), + KEYS.generate_sphincs_key(), ] - for scheme_dict in dicts: + cls.DATA = b"DATA" + + # pylint: disable=consider-using-with + cls.testdir = tempfile.TemporaryDirectory() + + @classmethod + def tearDownClass(cls): + cls.testdir.cleanup() + + def test_signer_sign_with_incorrect_uri(self): + pubkey = SSlibKey.from_securesystemslib_key(self.keys[0]) + with self.assertRaises(ValueError): + # unknown uri + Signer.from_priv_key_uri("unknownscheme:x", pubkey) + + with self.assertRaises(ValueError): + # env variable not defined + Signer.from_priv_key_uri("envvar:NONEXISTENTVAR", pubkey) + + with self.assertRaises(ValueError): + # no "encrypted" param + Signer.from_priv_key_uri("file:path/to/privkey", pubkey) + + with self.assertRaises(OSError): + # file not found + uri = "file:nonexistentfile?encrypted=false" + Signer.from_priv_key_uri(uri, pubkey) + + def test_signer_sign_with_envvar_uri(self): + for key in self.keys: + # setup + pubkey = SSlibKey.from_securesystemslib_key(key) + os.environ["PRIVKEY"] = key["keyval"]["private"] + + # test signing + signer = Signer.from_priv_key_uri("envvar:PRIVKEY", pubkey) + sig = signer.sign(self.DATA) + + pubkey.verify_signature(sig, self.DATA) + with self.assertRaises(UnverifiedSignatureError): + pubkey.verify_signature(sig, b"NOT DATA") + + def test_signer_sign_with_file_uri(self): + for key in self.keys: + # setup + pubkey = SSlibKey.from_securesystemslib_key(key) + # let teardownclass handle the file removal + with tempfile.NamedTemporaryFile( + dir=self.testdir.name, delete=False + ) as f: + f.write(key["keyval"]["private"].encode()) + + # test signing with unencrypted key + uri = f"file:{f.name}?encrypted=false" + signer = Signer.from_priv_key_uri(uri, pubkey) + sig = signer.sign(self.DATA) + + pubkey.verify_signature(sig, self.DATA) + with self.assertRaises(UnverifiedSignatureError): + pubkey.verify_signature(sig, b"NOT DATA") + + def test_signer_sign_with_enc_file_uri(self): + for key in self.keys: + # setup + pubkey = SSlibKey.from_securesystemslib_key(key) + privkey = KEYS.encrypt_key(key, "hunter2") + # let teardownclass handle the file removal + with tempfile.NamedTemporaryFile( + dir=self.testdir.name, delete=False + ) as f: + f.write(privkey.encode()) + + # test signing with encrypted key + def secrets_handler(secret: str) -> str: + if secret != "passphrase": + raise ValueError("Only prepared to return a passphrase") + return "hunter2" + + uri = f"file:{f.name}?encrypted=true" + + signer = Signer.from_priv_key_uri(uri, pubkey, secrets_handler) + sig = signer.sign(self.DATA) + + pubkey.verify_signature(sig, self.DATA) + with self.assertRaises(UnverifiedSignatureError): + pubkey.verify_signature(sig, b"NOT DATA") + + # test wrong passphrase + def fake_handler(_) -> str: + return "12345" + + with self.assertRaises(CryptoError): + signer = Signer.from_priv_key_uri(uri, pubkey, fake_handler) + + def test_sslib_signer_sign(self): + for scheme_dict in self.keys: # Test generation of signatures. sslib_signer = SSlibSigner(scheme_dict) sig_obj = sslib_signer.sign(self.DATA) @@ -74,6 +297,37 @@ def test_sslib_sign(self): scheme_dict["scheme"] = valid_scheme + def test_custom_signer(self): + # setup + key = self.keys[0] + pubkey = SSlibKey.from_securesystemslib_key(key) + + class CustomSigner(SSlibSigner): + """Custom signer with a hard coded key""" + + CUSTOM_SCHEME = "custom" + + @classmethod + def from_priv_key_uri( + cls, + priv_key_uri: str, + public_key: Key, + secrets_handler: Optional[SecretsHandler] = None, + ) -> "CustomSigner": + return cls(key) + + # register custom signer + SIGNER_FOR_URI_SCHEME[CustomSigner.CUSTOM_SCHEME] = CustomSigner + + # test signing + signer = Signer.from_priv_key_uri("custom:foo", pubkey) + self.assertIsInstance(signer, CustomSigner) + sig = signer.sign(self.DATA) + + pubkey.verify_signature(sig, self.DATA) + with self.assertRaises(UnverifiedSignatureError): + pubkey.verify_signature(sig, b"NOT DATA") + def test_signature_from_to_dict(self): signature_dict = { "sig": "30460221009342e4566528fcecf6a7a5d53ebacdb1df151e242f55f8775883469cb01dbc6602210086b426cc826709acfa2c3f9214610cb0a832db94bbd266fd7c5939a48064a851",