Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

signer: Add abstract Key class, implement private key uri scheme for Signer #456

Merged
merged 21 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 295 additions & 26 deletions securesystemslib/signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,21 @@
"""

import abc
from typing import Any, Dict, Mapping, Optional
import logging
import os
from typing import Any, Callable, Dict, Mapping, Optional
from urllib import parse

import securesystemslib.gpg.functions as gpg
import securesystemslib.keys as sslib_keys
from securesystemslib.exceptions import FormatError

logger = logging.getLogger(__name__)

# NOTE This dictionary is initialized here so it's available to Signer, but
# filled at end of file when Signer subclass definitions are available.
# Users can add their own Signer implementations into this dictionary
SIGNER_FOR_URI_SCHEME: Dict[str, "Signer"] = {}


class Signature:
Expand Down Expand Up @@ -137,8 +148,154 @@ def to_dict(self) -> Dict:
}


class Key:
jku marked this conversation as resolved.
Show resolved Hide resolved
"""A container class representing the public portion of a key.

*All parameters named below are not just constructor arguments but also
instance attributes.*

Args:
keyid: Key identifier that is unique within the metadata it is used in.
Keyid is not verified to be the hash of a specific representation
of the key.
keytype: Key type, e.g. "rsa", "ed25519" or "ecdsa-sha2-nistp256".
scheme: Signature scheme. For example:
"rsassa-pss-sha256", "ed25519", and "ecdsa-sha2-nistp256".
keyval: Opaque key content
unrecognized_fields: Dictionary of all attributes that are not managed
by Securesystemslib

Raises:
TypeError: Invalid type for an argument.
"""

def __init__(
self,
keyid: str,
keytype: str,
scheme: str,
keyval: Dict[str, Any],
jku marked this conversation as resolved.
Show resolved Hide resolved
unrecognized_fields: Optional[Dict[str, Any]] = None,
):
if not all(
isinstance(at, str) for at in [keyid, keytype, scheme]
) or not isinstance(keyval, dict):
raise TypeError("Unexpected Key attributes types!")
self.keyid = keyid
self.keytype = keytype
self.scheme = scheme
self.keyval = keyval
if unrecognized_fields is None:
unrecognized_fields = {}

self.unrecognized_fields = unrecognized_fields

def __eq__(self, other: Any) -> bool:
if not isinstance(other, Key):
return False

return (
self.keyid == other.keyid
and self.keytype == other.keytype
and self.scheme == other.scheme
and self.keyval == other.keyval
and self.unrecognized_fields == other.unrecognized_fields
)

@classmethod
def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key":
"""Creates ``Key`` object from TUF serialization dict.

Raises:
KeyError, TypeError: Invalid arguments.
"""
keytype = key_dict.pop("keytype")
scheme = key_dict.pop("scheme")
keyval = key_dict.pop("keyval")
# All fields left in the key_dict are unrecognized.
return cls(keyid, keytype, scheme, keyval, key_dict)

def to_dict(self) -> Dict[str, Any]:
"""Returns a dict for TUF serialization."""
return {
"keytype": self.keytype,
"scheme": self.scheme,
"keyval": self.keyval,
**self.unrecognized_fields,
}

def to_securesystemslib_key(self) -> Dict[str, Any]:
"""Returns a classic Securesystemslib keydict"""
return {
"keyid": self.keyid,
"keytype": self.keytype,
"scheme": self.scheme,
"keyval": self.keyval,
}

@classmethod
def from_securesystemslib_key(cls, key_dict: Dict[str, Any]) -> "Key":
"""Creates a ``Key`` object from a classic securesystemlib keydict.

Args:
key_dict: Key in securesystemlib dict representation.

Raises:
ValueError: ``key_dict`` value is not in securesystemslib format.
"""
try:
key_meta = sslib_keys.format_keyval_to_metadata(
key_dict["keytype"],
key_dict["scheme"],
key_dict["keyval"],
)
except FormatError as e:
raise ValueError("keydict not in securesystemslib format") from e

return cls(
key_dict["keyid"],
key_meta["keytype"],
key_meta["scheme"],
key_meta["keyval"],
)

def is_verified(self, signature: Signature, data: bytes) -> bool:
jku marked this conversation as resolved.
Show resolved Hide resolved
"""Verifies the signature over data.

Args:
signature: Signature object.
data: Payload bytes.

Raises:
CryptoError, FormatError, UnsupportedAlgorithmError.

Returns True if signature is valid for this key for given data.
"""
return sslib_keys.verify_signature(
self.to_securesystemslib_key(),
signature.to_dict(),
data,
)


# SecretsHandler is a function the calling code can provide to Signer:
# If Signer needs secrets from user, the function will be called
SecretsHandler = Callable[[str], str]


class Signer:
jku marked this conversation as resolved.
Show resolved Hide resolved
"""Signer interface created to support multiple signing implementations."""
"""Signer interface that supports multiple signing implementations.

Usage example:
signer = Signer.from_priv_key_uri("envvar:MYPRIVKEY", pub_key)
sig = signer.sign(b"data")

See SIGNER_FOR_URI_SCHEME for supported private key URI schemes. The
currently supported default schemes are:
* envvar: see SSlibSigner for details
* file: see SSlibSigner for details
* encfile: see SSlibSigner for details
"""

__metaclass__ = abc.ABCMeta

Expand All @@ -154,42 +311,136 @@ def sign(self, payload: bytes) -> Signature:
"""
raise NotImplementedError # pragma: no cover

@classmethod
@abc.abstractmethod
def new_from_uri(
cls,
priv_key_uri: str,
public_key: Key,
secrets_handler: SecretsHandler,
) -> "Signer":
"""Constructor for given private key URI

This is a semi-private method meant to be called by Signer only.
Implementation is required if the Signer subclass is in
SIGNER_FOR_URI_SCHEME.

Arguments:
priv_key_uri: URI that identifies the private key and signer
public_key: Key object
secrets_handler: Optional function that may be called if the
signer needs additional secrets (like a PIN or passphrase)
"""
raise NotImplementedError # pragma: no cover

@staticmethod
def from_priv_key_uri(
priv_key_uri: str,
public_key: Key,
secrets_handler: Optional[SecretsHandler] = None,
):
"""Returns a concrete Signer implementation based on private key URI

Args:
priv_key_uri: URI that identifies the private key location and signer
public_key: Key object
"""

scheme, _, _ = priv_key_uri.partition(":")
if scheme not in SIGNER_FOR_URI_SCHEME:
raise ValueError(f"Unsupported private key scheme {scheme}")

signer = SIGNER_FOR_URI_SCHEME[scheme]
return signer.new_from_uri(priv_key_uri, public_key, secrets_handler)


class SSlibSigner(Signer):
"""A securesystemslib signer implementation.

Provides a sign method to generate a cryptographic signature with a
securesystemslib-style rsa, ed25519 or ecdsa private key on the instance.
The signature scheme is determined by the key and must be one of:

- rsa(ssa-pss|pkcs1v15)-(md5|sha1|sha224|sha256|sha384|sha512) (12 schemes)
- ed25519
- ecdsa-sha2-nistp256

See "securesystemslib.interface" for functions to generate and load keys.
securesystemslib-style rsa, ed25519 or ecdsa key. See keys module
for the supported types, schemes and hash algorithms.

SSlibSigners should be instantiated with Signer.from_priv_key_uri().
Two private key URI schemes are supported:
jku marked this conversation as resolved.
Show resolved Hide resolved
* envvar:<VAR>:
VAR is an environment variable that contains the private key content.
envvar:MYPRIVKEY
* file:<PATH>:
PATH is a file path to a file that contains private key content.
file:path/to/file
* encfile:<PATH>:
The the private key content in PATH has been encrypted with
keys.encryot_key(). Application provided SecretsHandler will be
called to get the passphrase.
file:/path/to/encrypted/file
jku marked this conversation as resolved.
Show resolved Hide resolved

Attributes:
key_dict:
A securesystemslib-style key dictionary, which includes a keyid,
key type, signature scheme, and the public and private key values,
e.g.::

{
"keytype": "rsa",
"scheme": "rsassa-pss-sha256",
"keyid": "f30a0870d026980100c0573bd557394f8c1bbd6...",
"keyval": {
"public": "-----BEGIN RSA PUBLIC KEY----- ...",
"private": "-----BEGIN RSA PRIVATE KEY----- ..."
}
}

The public and private keys are strings in PEM format.
A securesystemslib-style key dictionary. This is an implementation
detail, not part of public API
"""

ENVVAR_URI_SCHEME = "envvar"
FILE_URI_SCHEME = "file"
ENC_FILE_URI_SCHEME = "encfile"

def __init__(self, key_dict: Dict):
self.key_dict = key_dict

@classmethod
def new_from_uri(
cls,
priv_key_uri: str,
public_key: Key,
secrets_handler: SecretsHandler,
) -> "SSlibSigner":
"""Semi-private Constructor for Signer to call

Arguments:
priv_key_uri: private key URI described in class doc
public_key: Key object.

Raises:
OSError: Reading the file failed with "file:" URI
ValueError: URI is unsupported or environment variable was not set
with "envvar:" URIs

Returns:
SSlibSigner for the given private key URI.
"""
uri = parse.urlparse(priv_key_uri)

if uri.scheme == cls.ENVVAR_URI_SCHEME:
# read private key from environment variable
private = os.getenv(uri.path)
if private is None:
raise ValueError(
f"Unset private key variable for {priv_key_uri}"
)

elif uri.scheme == cls.FILE_URI_SCHEME:
# read private key from file
with open(uri.path, "rb") as f:
private = f.read().decode()

elif uri.scheme == cls.ENC_FILE_URI_SCHEME:
# read key from file, ask for passphrase, decrypt
with open(uri.path, "rb") as f:
enc = f.read().decode()
secret = secrets_handler("passphrase")
decrypted = sslib_keys.decrypt_key(enc, secret)
private = decrypted["keyval"]["private"]

else:
raise ValueError(
f"SSlibSigner does not support priv key uri {priv_key_uri}"
)

keydict = public_key.to_securesystemslib_key()
keydict["keyval"]["private"] = private
return cls(keydict)

def sign(self, payload: bytes) -> Signature:
"""Signs a given payload by the key assigned to the SSlibSigner instance.

Expand All @@ -205,7 +456,6 @@ def sign(self, payload: bytes) -> Signature:
Returns:
Returns a "Signature" class instance.
"""

sig_dict = sslib_keys.create_signature(self.key_dict, payload)
return Signature(**sig_dict)

Expand All @@ -231,6 +481,17 @@ def __init__(
self.keyid = keyid
self.homedir = homedir

@classmethod
def new_from_uri(
cls,
priv_key_uri: str,
public_key: Key,
secrets_handler: SecretsHandler,
) -> Signer:
# GPGSigner uses keys and produces signature dicts that are not
# compliant with TUF or intoto specifications: not useful here
raise NotImplementedError()

def sign(self, payload: bytes) -> GPGSignature:
"""Signs a given payload by the key assigned to the GPGSigner instance.

Expand Down Expand Up @@ -266,3 +527,11 @@ def sign(self, payload: bytes) -> GPGSignature:

sig_dict = gpg.create_signature(payload, self.keyid, self.homedir)
return GPGSignature(**sig_dict)


# signer implementations are now defined: Add them to the lookup table
SIGNER_FOR_URI_SCHEME = {
SSlibSigner.ENVVAR_URI_SCHEME: SSlibSigner,
SSlibSigner.FILE_URI_SCHEME: SSlibSigner,
SSlibSigner.ENC_FILE_URI_SCHEME: SSlibSigner,
}
Loading