diff --git a/in_toto/models/_signer.py b/in_toto/models/_signer.py new file mode 100644 index 000000000..5f8b5800e --- /dev/null +++ b/in_toto/models/_signer.py @@ -0,0 +1,258 @@ +# Copyright New York University and the in-toto contributors +# SPDX-License-Identifier: Apache-2.0 + +""" + + _signer.py + + + Pradyumna Krishna + + + Jan 26, 2023 + + + See LICENSE for licensing information. + + + Provides in-toto flavored GPGSigner, GPGSignature and GPGKey. + +""" + +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +import securesystemslib.gpg.exceptions as gpg_exceptions +import securesystemslib.gpg.functions as gpg +from securesystemslib import exceptions +from securesystemslib.signer import Key, Signature, Signer, SecretsHandler + + +class GPGSignature(Signature): + """A container class containing information about a gpg signature. + Besides the signature, it also contains other meta information + needed to uniquely identify the key used to generate the signature. + + Attributes: + keyid: HEX string used as a unique identifier of the key. + signature: HEX string representing the signature. + other_headers: HEX representation of additional GPG headers. + """ + + def __init__( + self, + keyid: str, + signature: str, + other_headers: str, + ): + super().__init__(keyid, signature) + self.other_headers = other_headers + + @classmethod + def from_dict(cls, signature_dict: Dict) -> "GPGSignature": + """Creates a ``GPGSignature`` object from its JSON/dict + representation. + + Arguments: + signature_dict: Dict containing valid "keyid", "signature" and + "other_fields" fields. + Raises: + KeyError: If any of the "keyid", "sig" or "other_headers" fields + are missing from the signature_dict. + Returns: + ``GPGSignature`` instance. + """ + + return cls( + signature_dict["keyid"], + signature_dict["signature"], + signature_dict["other_headers"], + ) + + def to_dict(self) -> Dict: + """Returns the JSON-serializable dictionary representation of self.""" + return { + "keyid": self.keyid, + "signature": self.signature, + "other_headers": self.other_headers, + } + + +class GPGSigner(Signer): + """A in-toto gpg implementation of the ``Signer`` interface. + Provides a sign method to generate a cryptographic signature with gpg, using + an RSA, DSA or EdDSA private key identified by the keyid on the instance. + + Arguments: + keyid: The keyid of the gpg signing keyid. If not passed the default + key in the keyring is used. + homedir: Path to the gpg keyring. If not passed the default keyring + is used. + """ + + def __init__( + self, + keyid: Optional[str] = None, + homedir: Optional[str] = None + ): + self.keyid = keyid + self.homedir = homedir + + @classmethod + def from_priv_key_uri( + cls, + priv_key_uri: str, + public_key: Key, + secrets_handler: Optional[SecretsHandler] = None + ) -> "GPGSigner": + + raise NotImplementedError( + "Incompatible with private key URIs") # pragma: no cover + + + def sign(self, payload: bytes) -> GPGSignature: + """Signs a given payload by the key assigned to the ``GPGSigner`` + instance. Calls the gpg command line utility to sign the passed content + with the key identified by the passed keyid from the gpg keyring at the + passed homedir. + + Arguments: + payload: The bytes to be signed. + Raises: + securesystemslib.exceptions.FormatError: + If the keyid was passed and does not match + securesystemslib.formats.KEYID_SCHEMA. + ValueError: the gpg command failed to create a valid signature. + OSError: the gpg command is not present or non-executable. + securesystemslib.exceptions.UnsupportedLibraryError: the gpg command is + not available, or the cryptography library is not installed. + securesystemslib.gpg.exceptions.CommandError: the gpg command returned a + non-zero exit code. + securesystemslib.gpg.exceptions.KeyNotFoundError: the used gpg version is + not fully supported and no public key can be found for short keyid. + Returns: + Returns a ``GPGSignature`` class instance. + """ + + sig_dict = gpg.create_signature(payload, self.keyid, self.homedir) + return GPGSignature(**sig_dict) + + +@dataclass +class GPGKey(Key): + """A container class representing public key portion of a GPG key. + Provides a verify method to verify a cryptographic signature with a + gpg-style rsa, dsa or ecdsa public key on the instance. + + Attributes: + type: Key type, e.g. "rsa", "dsa" or "ecdsa". + method: GPG Key Scheme, For example: + "pgp+rsa-pkcsv1.5", "pgp+dsa-fips-180-2", and "pgp+eddsa-ed25519". + hashes: list of GPG Hash Algorithms, e.g. "pgp+SHA2". + keyval: Opaque key content. + keyid: Key identifier that is unique within the metadata it is used in. + Keyid is not verified to be the hash of a specific representation + of the key. + creation_time: Unix timestamp when GPG key was created. + validity_period: Validity of the GPG Keys in days. + subkeys: A dictionary containing keyid and GPG subkey. + """ + + type: str + method: str + hashes: List[str] + keyval: Dict[str, str] + keyid: str + creation_time: Optional[int] = None + validity_period: Optional[int] = None + subkeys: Optional[Dict[str, "GPGKey"]] = None + + @classmethod + def from_dict(cls, keyid: str, key_dict: Dict[str, Any]): + """Creates ``GPGKey`` object from its json/dict representation. + Raises: + KeyError, TypeError: Invalid arguments. + """ + subkeys_dict = key_dict.get("subkeys") + + gpg_subkeys = None + if subkeys_dict: + gpg_subkeys = { + _keyid: GPGKey.from_dict(_keyid, subkey_dict) + for (_keyid, subkey_dict) in subkeys_dict.items() + } + + return cls( + key_dict["type"], + key_dict["method"], + key_dict["hashes"], + key_dict["keyval"], + keyid, + key_dict.get("creation_time"), + key_dict.get("validity_period"), + gpg_subkeys, + ) + + @classmethod + def from_legacy_dict(cls, key_dict: Dict[str, Any]): + """Create GPGKey from legacy dictionary representation.""" + + keyid = key_dict["keyid"] + return cls.from_dict(keyid, key_dict) + + def to_dict(self): + """Returns the dictionary representation of self.""" + + key_dict = { + "method": self.method, + "type": self.type, + "hashes": self.hashes, + "keyid": self.keyid, + "keyval": self.keyval, + } + + if self.creation_time: + key_dict["creation_time"] = self.creation_time + if self.validity_period: + key_dict["validity_period"] = self.validity_period + if self.subkeys: + subkeys_dict = { + keyid: subkey.to_dict() + for (keyid, subkey) in self.subkeys.items() + } + key_dict["subkeys"] = subkeys_dict + + return key_dict + + @classmethod + def from_keyring(cls, keyid, homedir=None): + """Creates ``GPGKey`` object from GnuPG Keyring.""" + + pubkey_dict = gpg.export_pubkey(keyid, homedir) + return cls.from_dict(keyid, pubkey_dict) + + def verify_signature( + self, + signature: GPGSignature, + data: bytes + ) -> None: + """Verifies a given payload by the key assigned to the GPGKey + instance. + + Arguments: + signature: A ``GPGSignature`` class instance. + data: The bytes to be verified. + """ + + try: + if not gpg.verify_signature(signature.to_dict(), self.to_dict(), data): + raise exceptions.UnverifiedSignatureError( + f"Failed to verify signature by {self.keyid}") + except ( + exceptions.FormatError, + exceptions.UnsupportedLibraryError, + gpg_exceptions.KeyExpirationError, + ) as e: + raise exceptions.VerificationError( + f"Unknown failure to verify signature by {self.keyid}" + ) from e diff --git a/tests/models/test_signer.py b/tests/models/test_signer.py new file mode 100644 index 000000000..bdab7915e --- /dev/null +++ b/tests/models/test_signer.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python + +# Copyright New York University and the in-toto contributors +# SPDX-License-Identifier: Apache-2.0 + +""" + + test_signer.py + + + Pradyumna Krishna + + + Jan 28, 2023 + + + See LICENSE for licensing information. + + + Test GPGKey, GPGSigner and GPGSignature class methods. +""" + +import unittest + +from securesystemslib.gpg.functions import export_pubkey +from securesystemslib.gpg.constants import have_gpg +from securesystemslib.exceptions import ( + UnverifiedSignatureError, VerificationError) + +from in_toto.models._signer import (GPGKey, GPGSignature, + GPGSigner) + +from tests.common import GPGKeysMixin, TmpDirMixin + + +@unittest.skipIf(not have_gpg(), "gpg not found") +class TestLegacyGPGKeyAndSigner(unittest.TestCase, TmpDirMixin, GPGKeysMixin): + """Test RSA gpg signature creation and verification.""" + + @classmethod + def setUpClass(cls): + cls.set_up_test_dir() + cls.set_up_gpg_keys() + + cls.test_data = b"test_data" + cls.wrong_data = b"something malicious" + + cls.default_keyid = cls.gpg_key_0C8A17 + cls.signing_subkey_keyid = cls.gpg_key_D924E9 + cls.expired_keyid = "e8ac80c924116dabb51d4b987cb07d6d2c199c7c" + + cls.default_key_dict = export_pubkey(cls.default_keyid, cls.gnupg_home) + + @classmethod + def tearDownClass(cls): + cls.tear_down_test_dir() + + def test_gpg_sign_and_verify_object_with_default_key(self): + """Create and verify a signature using the default key on the keyring.""" + + # Create a signature. + signer = GPGSigner(homedir=self.gnupg_home) + signature = signer.sign(self.test_data) + + # Generate Key from gnupg keyring. + key = GPGKey.from_keyring(self.default_keyid, self.gnupg_home) + + key.verify_signature(signature, self.test_data) + with self.assertRaises(UnverifiedSignatureError): + key.verify_signature(signature, self.wrong_data) + + # Generate Key from dict. + key = GPGKey.from_legacy_dict(self.default_key_dict) + + key.verify_signature(signature, self.test_data) + with self.assertRaises(UnverifiedSignatureError): + key.verify_signature(signature, self.wrong_data) + + def test_gpg_sign_and_verify_object(self): + """Create and verify a signature using the specific key on the keyring.""" + + # Create a signature. + signer = GPGSigner(self.signing_subkey_keyid, self.gnupg_home) + signature = signer.sign(self.test_data) + + # Generate Key from gnupg keyring. + key = GPGKey.from_keyring(self.signing_subkey_keyid, self.gnupg_home) + + key.verify_signature(signature, self.test_data) + with self.assertRaises(UnverifiedSignatureError): + key.verify_signature(signature, self.wrong_data) + + # Generate Key from dict. + key_dict = export_pubkey(self.signing_subkey_keyid, self.gnupg_home) + key = GPGKey.from_dict(key_dict["keyid"], key_dict) + + key.verify_signature(signature, self.test_data) + with self.assertRaises(UnverifiedSignatureError): + key.verify_signature(signature, self.wrong_data) + + def test_verify_using_expired_keyid(self): + """Creates and verifies a signature using expired key on the keyring.""" + + # Create a signature. + signer = GPGSigner(self.signing_subkey_keyid, self.gnupg_home) + signature = signer.sign(self.test_data) + + # Verify signature using expired key. + key = GPGKey.from_keyring(self.expired_keyid, self.gnupg_home) + with self.assertRaises(VerificationError): + key.verify_signature(signature, self.test_data) + + def test_gpg_signature_serialization(self): + """Tests from_dict and to_dict methods of GPGSignature.""" + + sig_dict = { + "keyid": "f4f90403af58eef6", + "signature": "c39f86e70e12e70e11d87eb7e3ab7d3b", + "other_headers": "d8f8a89b5d71f07b842a", + } + + signature = GPGSignature.from_dict(sig_dict) + self.assertEqual(sig_dict, signature.to_dict()) + + def test_gpg_key_serialization(self): + """Test to check serialization methods of GPGKey.""" + + # Test loading and dumping of GPGKey. + key = GPGKey.from_legacy_dict(self.default_key_dict) + self.assertEqual(key.to_dict(), self.default_key_dict) + + # Test loading and dumping of GPGKey from keyring. + key = GPGKey.from_keyring(self.default_keyid, self.gnupg_home) + self.assertEqual(key.to_dict(), self.default_key_dict) + + def test_gpg_key_equality(self): + """Test to check equality between two GPGKey.""" + + # Generate two GPGkey. + key1 = GPGKey.from_legacy_dict(self.default_key_dict) + key2 = GPGKey.from_legacy_dict(self.default_key_dict) + + self.assertNotEqual(self.default_key_dict, key1) + self.assertEqual(key2, key1) + + # Assert equality of key created from dict of first GPGKey. + key2 = GPGKey.from_legacy_dict(key1.to_dict()) + self.assertEqual(key2, key1) + + # Assert Inequalities. + key2.type = "invalid" + self.assertNotEqual(key2, key1) + key2.type = key1.type + + key2.subkeys = {} + self.assertNotEqual(key2, key1) + key2.subkeys = key1.subkeys + + key2.keyval = {} + self.assertNotEqual(key2, key1) + key2.keyval = key1.keyval + + self.assertEqual(key2, key1)