diff --git a/examples/repo_example/basic_repo.py b/examples/repo_example/basic_repo.py index 20ebe09ab0..aa002d0f2f 100644 --- a/examples/repo_example/basic_repo.py +++ b/examples/repo_example/basic_repo.py @@ -27,13 +27,12 @@ from typing import Any, Dict from securesystemslib.keys import generate_ed25519_key -from securesystemslib.signer import SSlibSigner +from securesystemslib.signer import SSlibKey, SSlibSigner from tuf.api.metadata import ( SPECIFICATION_VERSION, DelegatedRole, Delegations, - Key, Metadata, MetaFile, Root, @@ -157,7 +156,7 @@ def _in(days: float) -> datetime: for name in ["targets", "snapshot", "timestamp", "root"]: keys[name] = generate_ed25519_key() roles["root"].signed.add_key( - Key.from_securesystemslib_key(keys[name]), name + SSlibKey.from_securesystemslib_key(keys[name]), name ) # NOTE: We only need the public part to populate root, so it is possible to use @@ -173,7 +172,7 @@ def _in(days: float) -> datetime: # required signature threshold. another_root_key = generate_ed25519_key() roles["root"].signed.add_key( - Key.from_securesystemslib_key(another_root_key), "root" + SSlibKey.from_securesystemslib_key(another_root_key), "root" ) roles["root"].signed.roles["root"].threshold = 2 @@ -271,7 +270,7 @@ def _in(days: float) -> datetime: # https://theupdateframework.github.io/specification/latest/#delegations roles["targets"].signed.delegations = Delegations( keys={ - keys[delegatee_name]["keyid"]: Key.from_securesystemslib_key( + keys[delegatee_name]["keyid"]: SSlibKey.from_securesystemslib_key( keys[delegatee_name] ) }, @@ -345,7 +344,7 @@ def _in(days: float) -> datetime: roles["root"].signed.revoke_key(keys["root"]["keyid"], "root") roles["root"].signed.add_key( - Key.from_securesystemslib_key(new_root_key), "root" + SSlibKey.from_securesystemslib_key(new_root_key), "root" ) roles["root"].signed.version += 1 diff --git a/examples/repo_example/hashed_bin_delegation.py b/examples/repo_example/hashed_bin_delegation.py index 5a4e2a008c..eb2d81d79e 100644 --- a/examples/repo_example/hashed_bin_delegation.py +++ b/examples/repo_example/hashed_bin_delegation.py @@ -23,12 +23,11 @@ from typing import Any, Dict, Iterator, List, Tuple from securesystemslib.keys import generate_ed25519_key -from securesystemslib.signer import SSlibSigner +from securesystemslib.signer import SSlibKey, SSlibSigner from tuf.api.metadata import ( DelegatedRole, Delegations, - Key, Metadata, TargetFile, Targets, @@ -146,7 +145,7 @@ def find_hash_bin(path: str) -> str: # Create preliminary delegating targets role (bins) and add public key for # delegated targets (bin_n) to key store. Delegation details are update below. roles["bins"] = Metadata(Targets(expires=_in(365))) -bin_n_key = Key.from_securesystemslib_key(keys["bin-n"]) +bin_n_key = SSlibKey.from_securesystemslib_key(keys["bin-n"]) roles["bins"].signed.delegations = Delegations( keys={bin_n_key.keyid: bin_n_key}, roles={}, diff --git a/examples/repo_example/succinct_hash_bin_delegations.py b/examples/repo_example/succinct_hash_bin_delegations.py index 6e86c0d6c9..4c4ffdb9ec 100644 --- a/examples/repo_example/succinct_hash_bin_delegations.py +++ b/examples/repo_example/succinct_hash_bin_delegations.py @@ -25,7 +25,7 @@ from typing import Dict, Tuple from securesystemslib.keys import generate_ed25519_key -from securesystemslib.signer import SSlibSigner +from securesystemslib.signer import SSlibKey, SSlibSigner from tuf.api.metadata import ( Delegations, @@ -82,7 +82,7 @@ def create_key() -> Tuple[Key, SSlibSigner]: """Generates a new Key and Signer.""" sslib_key = generate_ed25519_key() - return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key) + return SSlibKey.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key) # Create one signing key for all bins, and one for the delegating targets role. diff --git a/tests/generated_data/generate_md.py b/tests/generated_data/generate_md.py index fef33678d3..df459c1d6d 100644 --- a/tests/generated_data/generate_md.py +++ b/tests/generated_data/generate_md.py @@ -8,7 +8,7 @@ from datetime import datetime from typing import Dict, List, Optional -from securesystemslib.signer import SSlibSigner +from securesystemslib.signer import SSlibKey, SSlibSigner from tests import utils from tuf.api.metadata import Key, Metadata, Root, Snapshot, Targets, Timestamp @@ -36,7 +36,7 @@ keys: Dict[str, Key] = {} for index in range(4): - keys[f"ed25519_{index}"] = Key.from_securesystemslib_key( + keys[f"ed25519_{index}"] = SSlibKey.from_securesystemslib_key( { "keytype": "ed25519", "scheme": "ed25519", diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index abb7f37141..1e8bebe93b 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -54,7 +54,7 @@ import securesystemslib.hash as sslib_hash from securesystemslib.keys import generate_ed25519_key -from securesystemslib.signer import SSlibSigner +from securesystemslib.signer import SSlibKey, SSlibSigner from tuf.api.exceptions import DownloadHTTPError from tuf.api.metadata import ( @@ -156,8 +156,8 @@ def all_targets(self) -> Iterator[Tuple[str, Targets]]: @staticmethod def create_key() -> Tuple[Key, SSlibSigner]: - sslib_key = generate_ed25519_key() - return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key) + key = generate_ed25519_key() + return SSlibKey.from_securesystemslib_key(key), SSlibSigner(key) def add_signer(self, role: str, signer: SSlibSigner) -> None: if role not in self.signers: diff --git a/tests/test_api.py b/tests/test_api.py index 1fd0a44689..87ea8a4085 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -17,23 +17,27 @@ from datetime import datetime, timedelta from typing import Any, ClassVar, Dict +from securesystemslib import exceptions as sslib_exceptions from securesystemslib import hash as sslib_hash from securesystemslib.interface import ( import_ed25519_privatekey_from_file, import_ed25519_publickey_from_file, ) from securesystemslib.keys import generate_ed25519_key -from securesystemslib.signer import Signature, SSlibSigner +from securesystemslib.signer import SSlibKey, SSlibSigner from tests import utils from tuf.api import exceptions from tuf.api.metadata import ( TOP_LEVEL_ROLE_NAMES, + BaseMetadata, DelegatedRole, Delegations, + Envelope, Key, Metadata, Root, + Signature, Snapshot, SuccinctRoles, TargetFile, @@ -46,6 +50,47 @@ logger = logging.getLogger(__name__) +class TestEnvelope(unittest.TestCase): + """Smoke test for Envelope (DSSE) and common metadata abstraction.""" + + def test_envelope(self) -> None: + # Generate key and root metadata, and sign and serialize as dsse + sslib_key = generate_ed25519_key() + root = Root() + envelope = Envelope.from_signed(root) + signer = SSlibSigner(sslib_key) + envelope.sign(signer) + data = envelope.to_bytes() + + # Deserialize dsse and verify signature successfully + envelope2 = Envelope.from_bytes(data) + self.assertEqual(envelope2, envelope) + key = SSlibKey.from_securesystemslib_key(sslib_key) + envelope2.verify([key], 1) + + # Create new envelope with bad signature, and fail + envelope3 = Envelope.from_signed(Targets()) + envelope3.signatures = envelope2.signatures + with self.assertRaises(sslib_exceptions.VerificationError): + envelope3.verify([key], 1) + + # Add root key to root so that we can verify with 'verify_delegate' + root.add_key(key, Root.type) + + # Sign and serialize traditional metadata and dsse w/ common interface + metadata = [] + for object_ in [Metadata(root), Envelope.from_signed(root)]: + object_.sign(signer) + metadata.append((type(object_), object_.to_bytes())) + + # Deserialize and verify both metadata types w/ common interface + for type_, bytes_ in metadata: + object_ = BaseMetadata.from_bytes(bytes_) + object_.verify_delegate(Root.type, object_) + # Assert we get the correct instance + self.assertTrue(isinstance(object_, type_)) + + # pylint: disable=too-many-public-methods class TestMetadata(unittest.TestCase): """Tests for public API of all classes in 'tuf/api/metadata.py'.""" @@ -187,8 +232,8 @@ def test_to_from_bytes(self) -> None: self.assertEqual(metadata_obj_2.to_bytes(), obj_bytes) def test_sign_verify(self) -> None: - root_path = os.path.join(self.repo_dir, "metadata", "root.json") - root = Metadata[Root].from_file(root_path).signed + path = os.path.join(self.repo_dir, "metadata") + root = Metadata[Root].from_file(os.path.join(path, "root.json")).signed # Locate the public keys we need from root targets_keyid = next(iter(root.roles[Targets.type].keyids)) @@ -199,41 +244,37 @@ def test_sign_verify(self) -> None: timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (targets) and assert ... - path = os.path.join(self.repo_dir, "metadata", "targets.json") - md_obj = Metadata.from_file(path) + md_obj = Metadata.from_file(os.path.join(path, "targets.json")) + sig = md_obj.signatures[targets_keyid] + data = CanonicalJSONSerializer().serialize(md_obj.signed) # ... it has a single existing signature, self.assertEqual(len(md_obj.signatures), 1) # ... which is valid for the correct key. - targets_key.verify_signature(md_obj) - with self.assertRaises(exceptions.UnsignedMetadataError): - snapshot_key.verify_signature(md_obj) - - # Test verifying with explicitly set serializer - targets_key.verify_signature(md_obj, CanonicalJSONSerializer()) - with self.assertRaises(exceptions.UnsignedMetadataError): - targets_key.verify_signature(md_obj, JSONSerializer()) # type: ignore[arg-type] + targets_key.verify_signature(sig, data) + with self.assertRaises(sslib_exceptions.VerificationError): + snapshot_key.verify_signature(sig, data) sslib_signer = SSlibSigner(self.keystore[Snapshot.type]) # Append a new signature with the unrelated key and assert that ... - sig = md_obj.sign(sslib_signer, append=True) + snapshot_sig = md_obj.sign(sslib_signer, append=True) # ... there are now two signatures, and self.assertEqual(len(md_obj.signatures), 2) # ... both are valid for the corresponding keys. - targets_key.verify_signature(md_obj) - snapshot_key.verify_signature(md_obj) + targets_key.verify_signature(sig, data) + snapshot_key.verify_signature(snapshot_sig, data) # ... the returned (appended) signature is for snapshot key - self.assertEqual(sig.keyid, snapshot_keyid) + self.assertEqual(snapshot_sig.keyid, snapshot_keyid) sslib_signer = SSlibSigner(self.keystore[Timestamp.type]) # Create and assign (don't append) a new signature and assert that ... - md_obj.sign(sslib_signer, append=False) + ts_sig = md_obj.sign(sslib_signer, append=False) # ... there now is only one signature, self.assertEqual(len(md_obj.signatures), 1) # ... valid for that key. - timestamp_key.verify_signature(md_obj) - with self.assertRaises(exceptions.UnsignedMetadataError): - targets_key.verify_signature(md_obj) + timestamp_key.verify_signature(ts_sig, data) + with self.assertRaises(sslib_exceptions.VerificationError): + targets_key.verify_signature(ts_sig, data) def test_sign_failures(self) -> None: # Test throwing UnsignedMetadataError because of signing problems @@ -248,7 +289,7 @@ def test_sign_failures(self) -> None: with self.assertRaises(exceptions.UnsignedMetadataError): md.sign(sslib_signer) - def test_verify_failures(self) -> None: + def test_key_verify_failures(self) -> None: root_path = os.path.join(self.repo_dir, "metadata", "root.json") root = Metadata[Root].from_file(root_path).signed @@ -259,36 +300,36 @@ def test_verify_failures(self) -> None: # Load sample metadata (timestamp) path = os.path.join(self.repo_dir, "metadata", "timestamp.json") md_obj = Metadata.from_file(path) + sig = md_obj.signatures[timestamp_keyid] + data = CanonicalJSONSerializer().serialize(md_obj.signed) # Test failure on unknown scheme (securesystemslib # UnsupportedAlgorithmError) scheme = timestamp_key.scheme timestamp_key.scheme = "foo" - with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(md_obj) + with self.assertRaises(sslib_exceptions.VerificationError): + timestamp_key.verify_signature(sig, data) timestamp_key.scheme = scheme # Test failure on broken public key data (securesystemslib # CryptoError) public = timestamp_key.keyval["public"] timestamp_key.keyval["public"] = "ffff" - with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(md_obj) + with self.assertRaises(sslib_exceptions.VerificationError): + timestamp_key.verify_signature(sig, data) timestamp_key.keyval["public"] = public # Test failure with invalid signature (securesystemslib # FormatError) - sig = md_obj.signatures[timestamp_keyid] - correct_sig = sig.signature - sig.signature = "foo" - with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(md_obj) + incorrect_sig = copy(sig) + incorrect_sig.signature = "foo" + with self.assertRaises(sslib_exceptions.VerificationError): + timestamp_key.verify_signature(incorrect_sig, data) # Test failure with valid but incorrect signature - sig.signature = "ff" * 64 - with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(md_obj) - sig.signature = correct_sig + incorrect_sig.signature = "ff" * 64 + with self.assertRaises(sslib_exceptions.UnverifiedSignatureError): + timestamp_key.verify_signature(incorrect_sig, data) def test_metadata_signed_is_expired(self) -> None: # Use of Snapshot is arbitrary, we're just testing the base class @@ -355,6 +396,15 @@ def test_metadata_verify_delegate(self) -> None: root.verify_delegate(Snapshot.type, snapshot) snapshot.signed.expires = expires + # verify fails if sslib verify fails with VerificationError + # (in this case signature is malformed) + keyid = next(iter(root.signed.roles[Snapshot.type].keyids)) + good_sig = snapshot.signatures[keyid].signature + snapshot.signatures[keyid].signature = "foo" + with self.assertRaises(exceptions.UnsignedMetadataError): + root.verify_delegate(Snapshot.type, snapshot) + snapshot.signatures[keyid].signature = good_sig + # verify fails if roles keys do not sign the metadata with self.assertRaises(exceptions.UnsignedMetadataError): root.verify_delegate(Timestamp.type, snapshot) @@ -382,14 +432,9 @@ def test_key_class(self) -> None: # Test if from_securesystemslib_key removes the private key from keyval # of a securesystemslib key dictionary. sslib_key = generate_ed25519_key() - key = Key.from_securesystemslib_key(sslib_key) + key = SSlibKey.from_securesystemslib_key(sslib_key) self.assertFalse("private" in key.keyval.keys()) - # Test raising ValueError with non-existent keytype - sslib_key["keytype"] = "bad keytype" - with self.assertRaises(ValueError): - Key.from_securesystemslib_key(sslib_key) - def test_root_add_key_and_revoke_key(self) -> None: root_path = os.path.join(self.repo_dir, "metadata", "root.json") root = Metadata[Root].from_file(root_path) @@ -399,7 +444,7 @@ def test_root_add_key_and_revoke_key(self) -> None: os.path.join(self.keystore_dir, "root_key2.pub") ) keyid = root_key2["keyid"] - key_metadata = Key( + key_metadata = SSlibKey( keyid, root_key2["keytype"], root_key2["scheme"], @@ -412,7 +457,7 @@ def test_root_add_key_and_revoke_key(self) -> None: # Assert that add_key with old argument order will raise an error with self.assertRaises(ValueError): - root.signed.add_key(Root.type, key_metadata) # type: ignore + root.signed.add_key(Root.type, key_metadata) # Add new root key root.signed.add_key(key_metadata, Root.type) @@ -513,7 +558,7 @@ def test_targets_key_api(self) -> None: # Assert that add_key with old argument order will raise an error with self.assertRaises(ValueError): - targets.add_key("role1", key) # type: ignore + targets.add_key("role1", key) # Assert that delegated role "role1" does not contain the new key self.assertNotIn(key.keyid, targets.delegations.roles["role1"].keyids) @@ -650,7 +695,7 @@ def test_length_and_hash_validation(self) -> None: # Test wrong algorithm format (sslib.FormatError) snapshot_metafile.hashes = { - 256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" # type: ignore[dict-item] + 256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" } with self.assertRaises(exceptions.LengthOrHashMismatchError): snapshot_metafile.verify_length_and_hashes(data) diff --git a/tests/test_metadata_eq_.py b/tests/test_metadata_eq_.py index a3b3f9fd91..c8de6147bf 100644 --- a/tests/test_metadata_eq_.py +++ b/tests/test_metadata_eq_.py @@ -12,17 +12,17 @@ import unittest from typing import Any, ClassVar, Dict -from securesystemslib.signer import Signature +from securesystemslib.signer import SSlibKey from tests import utils from tuf.api.metadata import ( TOP_LEVEL_ROLE_NAMES, DelegatedRole, Delegations, - Key, Metadata, MetaFile, Role, + Signature, SuccinctRoles, TargetFile, ) @@ -50,7 +50,7 @@ def setUpClass(cls) -> None: cls.objects["Metadata"] = Metadata(cls.objects["Timestamp"], {}) cls.objects["Signed"] = cls.objects["Timestamp"] - cls.objects["Key"] = Key( + cls.objects["Key"] = SSlibKey( "id", "rsa", "rsassa-pss-sha256", {"public": "foo"} ) cls.objects["Role"] = Role(["keyid1", "keyid2"], 3) diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 65d410bf3a..04c53775de 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -168,7 +168,7 @@ def test_valid_key_serialization(self, test_case_data: str) -> None: @utils.run_sub_tests_with_dataset(invalid_keys) def test_invalid_key_serialization(self, test_case_data: str) -> None: case_dict = json.loads(test_case_data) - with self.assertRaises((TypeError, KeyError)): + with self.assertRaises((TypeError, KeyError, ValueError)): keyid = case_dict.pop("keyid") Key.from_dict(keyid, case_dict) diff --git a/tox.ini b/tox.ini index d61df9390e..2f1f53a7a1 100644 --- a/tox.ini +++ b/tox.ini @@ -31,7 +31,7 @@ install_command = python3 -m pip install {opts} {packages} # Must to be invoked explicitly with, e.g. `tox -e with-sslib-master` [testenv:with-sslib-master] commands_pre = - python3 -m pip install git+https://github.com/secure-systems-lab/securesystemslib.git@master#egg=securesystemslib[crypto,pynacl] + python3 -m pip install --force-reinstall git+https://github.com/secure-systems-lab/securesystemslib.git@master#egg=securesystemslib[crypto,pynacl] commands = python3 -m coverage run aggregate_tests.py diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index afd3e53175..f6af5b6a44 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -31,7 +31,6 @@ import fnmatch import io import logging -import tempfile from datetime import datetime from typing import ( IO, @@ -52,16 +51,16 @@ from securesystemslib import exceptions as sslib_exceptions from securesystemslib import hash as sslib_hash -from securesystemslib import keys as sslib_keys -from securesystemslib.signer import Signature, Signer -from securesystemslib.storage import FilesystemBackend, StorageBackendInterface -from securesystemslib.util import persist_temp_file +from securesystemslib.metadata import Envelope as BaseEnvelope +from securesystemslib.serialization import JSONSerializable +from securesystemslib.signer import Key, Signature, Signer -from tuf.api import exceptions +from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError from tuf.api.serialization import ( - MetadataDeserializer, - MetadataSerializer, - SerializationError, + BaseDeserializer, + BaseSerializer, + SerializationMixin, + SignedDeserializer, SignedSerializer, ) @@ -83,7 +82,138 @@ T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") -class Metadata(Generic[T]): +class BaseMetadata(SerializationMixin, JSONSerializable, metaclass=abc.ABCMeta): + """A common metadata interface for Envelope (DSSE) and Metadata objects.""" + + @staticmethod + def _default_deserializer() -> BaseDeserializer: + """Default deserializer for Serialization Mixin.""" + # pylint: disable=import-outside-toplevel + from tuf.api.serialization.json import JSONDeserializer + + return JSONDeserializer() + + @staticmethod + def _default_serializer() -> BaseSerializer: + """Default serializer for Serialization Mixin.""" + # pylint: disable=import-outside-toplevel + from tuf.api.serialization.json import JSONSerializer + + return JSONSerializer(compact=True) + + @staticmethod + def _get_role_and_keys( + signed: "Signed", delegated_role: str + ) -> Tuple["Role", Dict[str, Key]]: + """Return the keys and role for delegated_role""" + + role: Optional[Role] = None + if isinstance(signed, Root): + keys = signed.keys + role = signed.roles.get(delegated_role) + elif isinstance(signed, Targets): + if signed.delegations is None: + raise ValueError(f"No delegation found for {delegated_role}") + + keys = signed.delegations.keys + if signed.delegations.roles is not None: + role = signed.delegations.roles.get(delegated_role) + elif signed.delegations.succinct_roles is not None: + if signed.delegations.succinct_roles.is_delegated_role( + delegated_role + ): + role = signed.delegations.succinct_roles + else: + raise TypeError("Call is valid only on delegator metadata") + + if role is None: + raise ValueError(f"No delegation found for {delegated_role}") + + return (role, keys) + + @abc.abstractmethod + def get_signed(self) -> "Signed": + raise NotImplementedError + + @abc.abstractmethod + def sign( + self, + signer: Signer, + ) -> Signature: + raise NotImplementedError + + @abc.abstractmethod + def verify_delegate( + self, + delegated_role: str, + delegated_metadata: "BaseMetadata", + ) -> None: + raise NotImplementedError + + +class Envelope(Generic[T], BaseMetadata, BaseEnvelope): + """DSSE Envelope for tuf payloads.""" + + DEFAULT_PAYLOAD_TYPE = "application/vnd.tuf" + + @classmethod + def from_signed( + cls, signed: "Signed", serializer: SignedSerializer = None + ) -> "Envelope": + """Creates DSSE envelope with signed bytes as payload.""" + + if serializer is None: + # Use local scope import to avoid circular import errors + # pylint: disable=import-outside-toplevel + from tuf.api.serialization.json import JSONSerializer + + serializer = JSONSerializer(compact=True) + + return cls( + payload=serializer.serialize(signed), + payload_type=cls.DEFAULT_PAYLOAD_TYPE, + signatures=[], + ) + + def get_signed(self, deserializer: SignedDeserializer = None) -> "Signed": + if deserializer is None: + # Use local scope import to avoid circular import errors + # pylint: disable=import-outside-toplevel + from tuf.api.serialization.json import SignedJSONDeserializer + + deserializer = SignedJSONDeserializer() + + return BaseEnvelope.get_payload(self, deserializer) + + def sign( + self, + signer: Signer, + append: bool = False, + ) -> Signature: + + if not append: + self.signatures.clear() + + return BaseEnvelope.sign(self, signer) + + def verify_delegate( + self, + delegated_role: str, + delegated_metadata: "BaseMetadata", + ) -> None: + signed = self.get_signed(None) + role, keys = self._get_role_and_keys(signed, delegated_role) + + try: + _ = BaseEnvelope.verify( + delegated_metadata, keys.values(), role.threshold + ) + + except sslib_exceptions.UnverifiedSignatureError as e: + raise UnsignedMetadataError from e + + +class Metadata(Generic[T], BaseMetadata): """A container for signed TUF metadata. Provides methods to convert to and from dictionary, read and write to and @@ -152,6 +282,9 @@ def __eq__(self, other: Any) -> bool: and self.unrecognized_fields == other.unrecognized_fields ) + def get_signed(self) -> "Signed": + return self.signed + @classmethod def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]": """Creates ``Metadata`` object from its json/dict representation. @@ -201,98 +334,6 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]": unrecognized_fields=metadata, ) - @classmethod - def from_file( - cls, - filename: str, - deserializer: Optional[MetadataDeserializer] = None, - storage_backend: Optional[StorageBackendInterface] = None, - ) -> "Metadata[T]": - """Loads TUF metadata from file storage. - - Args: - filename: Path to read the file from. - deserializer: ``MetadataDeserializer`` subclass instance that - implements the desired wireline format deserialization. Per - default a ``JSONDeserializer`` is used. - storage_backend: Object that implements - ``securesystemslib.storage.StorageBackendInterface``. - Default is ``FilesystemBackend`` (i.e. a local file). - Raises: - exceptions.StorageError: The file cannot be read. - tuf.api.serialization.DeserializationError: - The file cannot be deserialized. - - Returns: - TUF ``Metadata`` object. - """ - - if storage_backend is None: - storage_backend = FilesystemBackend() - - with storage_backend.get(filename) as file_obj: - return cls.from_bytes(file_obj.read(), deserializer) - - @classmethod - def from_bytes( - cls, - data: bytes, - deserializer: Optional[MetadataDeserializer] = None, - ) -> "Metadata[T]": - """Loads TUF metadata from raw data. - - Args: - data: Metadata content. - deserializer: ``MetadataDeserializer`` implementation to use. - Default is ``JSONDeserializer``. - - Raises: - tuf.api.serialization.DeserializationError: - The file cannot be deserialized. - - Returns: - TUF ``Metadata`` object. - """ - - if deserializer is None: - # Use local scope import to avoid circular import errors - # pylint: disable=import-outside-toplevel - from tuf.api.serialization.json import JSONDeserializer - - deserializer = JSONDeserializer() - - return deserializer.deserialize(data) - - def to_bytes( - self, serializer: Optional[MetadataSerializer] = None - ) -> bytes: - """Return the serialized TUF file format as bytes. - - Note that if bytes are first deserialized into ``Metadata`` and then - serialized with ``to_bytes()``, the two are not required to be - identical even though the signatures are guaranteed to stay valid. If - byte-for-byte equivalence is required (which is the case when content - hashes are used in other metadata), the original content should be used - instead of re-serializing. - - Args: - serializer: ``MetadataSerializer`` instance that implements the - desired serialization format. Default is ``JSONSerializer``. - - Raises: - tuf.api.serialization.SerializationError: - The metadata object cannot be serialized. - """ - - if serializer is None: - # Use local scope import to avoid circular import errors - # pylint: disable=import-outside-toplevel - from tuf.api.serialization.json import JSONSerializer - - serializer = JSONSerializer(compact=True) - - return serializer.serialize(self) - def to_dict(self) -> Dict[str, Any]: """Returns the dict representation of self.""" @@ -304,40 +345,6 @@ def to_dict(self) -> Dict[str, Any]: **self.unrecognized_fields, } - def to_file( - self, - filename: str, - serializer: Optional[MetadataSerializer] = None, - storage_backend: Optional[StorageBackendInterface] = None, - ) -> None: - """Writes TUF metadata to file storage. - - Note that if a file is first deserialized into ``Metadata`` and then - serialized with ``to_file()``, the two files are not required to be - identical even though the signatures are guaranteed to stay valid. If - byte-for-byte equivalence is required (which is the case when file - hashes are used in other metadata), the original file should be used - instead of re-serializing. - - Args: - filename: Path to write the file to. - serializer: ``MetadataSerializer`` instance that implements the - desired serialization format. Default is ``JSONSerializer``. - storage_backend: ``StorageBackendInterface`` implementation. Default - is ``FilesystemBackend`` (i.e. a local file). - - Raises: - tuf.api.serialization.SerializationError: - The metadata object cannot be serialized. - exceptions.StorageError: The file cannot be written. - """ - - bytes_data = self.to_bytes(serializer) - - with tempfile.TemporaryFile() as temp_file: - temp_file.write(bytes_data) - persist_temp_file(temp_file, filename, storage_backend) - # Signatures. def sign( self, @@ -360,7 +367,7 @@ def sign( Raises: tuf.api.serialization.SerializationError: ``signed`` cannot be serialized. - exceptions.UnsignedMetadataError: Signing errors. + UnsignedMetadataError: Signing errors. Returns: ``securesystemslib.signer.Signature`` object that was added into @@ -379,9 +386,7 @@ def sign( try: signature = signer.sign(bytes_data) except Exception as e: - raise exceptions.UnsignedMetadataError( - "Problem signing the metadata" - ) from e + raise UnsignedMetadataError("Problem signing the metadata") from e if not append: self.signatures.clear() @@ -412,41 +417,35 @@ def verify_delegate( TypeError: called this function on non-delegating metadata class. """ - # Find the keys and role in delegator metadata - role: Optional[Role] = None - if isinstance(self.signed, Root): - keys = self.signed.keys - role = self.signed.roles.get(delegated_role) - elif isinstance(self.signed, Targets): - if self.signed.delegations is None: - raise ValueError(f"No delegation found for {delegated_role}") + if signed_serializer is None: + # pylint: disable=import-outside-toplevel + from tuf.api.serialization.json import CanonicalJSONSerializer - keys = self.signed.delegations.keys - if self.signed.delegations.roles is not None: - role = self.signed.delegations.roles.get(delegated_role) - elif self.signed.delegations.succinct_roles is not None: - if self.signed.delegations.succinct_roles.is_delegated_role( - delegated_role - ): - role = self.signed.delegations.succinct_roles - else: - raise TypeError("Call is valid only on delegator metadata") + signed_serializer = CanonicalJSONSerializer() - if role is None: - raise ValueError(f"No delegation found for {delegated_role}") + data = signed_serializer.serialize(delegated_metadata.signed) + role, keys = self._get_role_and_keys(self.signed, delegated_role) # verify that delegated_metadata is signed by threshold of unique keys signing_keys = set() for keyid in role.keyids: - key = keys[keyid] + if keyid not in keys: + logger.info("No key for keyid %s", keyid) + continue + + if keyid not in delegated_metadata.signatures: + logger.info("No signature for keyid %s", keyid) + continue + + sig = delegated_metadata.signatures[keyid] try: - key.verify_signature(delegated_metadata, signed_serializer) - signing_keys.add(key.keyid) - except exceptions.UnsignedMetadataError: + keys[keyid].verify_signature(sig, data) + signing_keys.add(keyid) + except sslib_exceptions.UnverifiedSignatureError: logger.info("Key %s failed to verify %s", keyid, delegated_role) if len(signing_keys) < role.threshold: - raise exceptions.UnsignedMetadataError( + raise UnsignedMetadataError( f"{delegated_role} was signed by {len(signing_keys)}/" f"{role.threshold} keys", ) @@ -612,176 +611,6 @@ def is_expired(self, reference_time: Optional[datetime] = None) -> bool: return reference_time >= self.expires -class Key: - """A container class representing the public portion of a Key. - - Supported key content (type, scheme and keyval) is defined in - `` Securesystemslib``. - - *All parameters named below are not just constructor arguments but also - instance attributes.* - - Args: - keyid: Key identifier that is unique within the metadata it is used in. - Keyid is not verified to be the hash of a specific representation - of the key. - keytype: Key type, e.g. "rsa", "ed25519" or "ecdsa-sha2-nistp256". - scheme: Signature scheme. For example: - "rsassa-pss-sha256", "ed25519", and "ecdsa-sha2-nistp256". - keyval: Opaque key content - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API - - Raises: - TypeError: Invalid type for an argument. - """ - - def __init__( - self, - keyid: str, - keytype: str, - scheme: str, - keyval: Dict[str, str], - unrecognized_fields: Optional[Dict[str, Any]] = None, - ): - if not all( - isinstance(at, str) for at in [keyid, keytype, scheme] - ) or not isinstance(keyval, dict): - raise TypeError("Unexpected Key attributes types!") - self.keyid = keyid - self.keytype = keytype - self.scheme = scheme - self.keyval = keyval - if unrecognized_fields is None: - unrecognized_fields = {} - - self.unrecognized_fields = unrecognized_fields - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Key): - return False - - return ( - self.keyid == other.keyid - and self.keytype == other.keytype - and self.scheme == other.scheme - and self.keyval == other.keyval - and self.unrecognized_fields == other.unrecognized_fields - ) - - @classmethod - def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "Key": - """Creates ``Key`` object from its json/dict representation. - - Raises: - KeyError, TypeError: Invalid arguments. - """ - keytype = key_dict.pop("keytype") - scheme = key_dict.pop("scheme") - keyval = key_dict.pop("keyval") - # All fields left in the key_dict are unrecognized. - return cls(keyid, keytype, scheme, keyval, key_dict) - - def to_dict(self) -> Dict[str, Any]: - """Returns the dictionary representation of self.""" - return { - "keytype": self.keytype, - "scheme": self.scheme, - "keyval": self.keyval, - **self.unrecognized_fields, - } - - def to_securesystemslib_key(self) -> Dict[str, Any]: - """Returns a ``Securesystemslib`` compatible representation of self.""" - return { - "keyid": self.keyid, - "keytype": self.keytype, - "scheme": self.scheme, - "keyval": self.keyval, - } - - @classmethod - def from_securesystemslib_key(cls, key_dict: Dict[str, Any]) -> "Key": - """Creates a ``Key`` object from a securesystemlib key json/dict representation - removing the private key from keyval. - - Args: - key_dict: Key in securesystemlib dict representation. - - Raises: - ValueError: ``key_dict`` value is not following the securesystemslib - format. - """ - try: - key_meta = sslib_keys.format_keyval_to_metadata( - key_dict["keytype"], - key_dict["scheme"], - key_dict["keyval"], - ) - except sslib_exceptions.FormatError as e: - raise ValueError( - "key_dict value is not following the securesystemslib format" - ) from e - - return cls( - key_dict["keyid"], - key_meta["keytype"], - key_meta["scheme"], - key_meta["keyval"], - ) - - def verify_signature( - self, - metadata: Metadata, - signed_serializer: Optional[SignedSerializer] = None, - ) -> None: - """Verifies that the ``metadata.signatures`` contains a signature made - with this key, correctly signing ``metadata.signed``. - - Args: - metadata: Metadata to verify - signed_serializer: ``SignedSerializer`` to serialize - ``metadata.signed`` with. Default is ``CanonicalJSONSerializer``. - - Raises: - UnsignedMetadataError: The signature could not be verified for a - variety of possible reasons: see error message. - """ - try: - signature = metadata.signatures[self.keyid] - except KeyError: - raise exceptions.UnsignedMetadataError( - f"No signature for key {self.keyid} found in metadata" - ) from None - - if signed_serializer is None: - # pylint: disable=import-outside-toplevel - from tuf.api.serialization.json import CanonicalJSONSerializer - - signed_serializer = CanonicalJSONSerializer() - - try: - if not sslib_keys.verify_signature( - self.to_securesystemslib_key(), - signature.to_dict(), - signed_serializer.serialize(metadata.signed), - ): - raise exceptions.UnsignedMetadataError( - f"Failed to verify {self.keyid} signature" - ) - except ( - sslib_exceptions.CryptoError, - sslib_exceptions.FormatError, - sslib_exceptions.UnsupportedAlgorithmError, - SerializationError, - ) as e: - # Log unexpected failure, but continue as if there was no signature - logger.info("Key %s failed to verify sig: %s", self.keyid, str(e)) - raise exceptions.UnsignedMetadataError( - f"Failed to verify {self.keyid} signature" - ) from e - - class Role: """Container that defines which keys are required to sign roles metadata. @@ -1013,13 +842,13 @@ def _verify_hashes( sslib_exceptions.UnsupportedAlgorithmError, sslib_exceptions.FormatError, ) as e: - raise exceptions.LengthOrHashMismatchError( + raise LengthOrHashMismatchError( f"Unsupported algorithm '{algo}'" ) from e observed_hash = digest_object.hexdigest() if observed_hash != exp_hash: - raise exceptions.LengthOrHashMismatchError( + raise LengthOrHashMismatchError( f"Observed hash {observed_hash} does not match " f"expected hash {exp_hash}" ) @@ -1037,7 +866,7 @@ def _verify_length( observed_length = data.tell() if observed_length != expected_length: - raise exceptions.LengthOrHashMismatchError( + raise LengthOrHashMismatchError( f"Observed length {observed_length} does not match " f"expected length {expected_length}" ) diff --git a/tuf/api/serialization/__init__.py b/tuf/api/serialization/__init__.py index 7aef8b9884..6f071be1c6 100644 --- a/tuf/api/serialization/__init__.py +++ b/tuf/api/serialization/__init__.py @@ -15,7 +15,13 @@ """ import abc -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, TypeAlias + +from securesystemslib.serialization import ( + BaseDeserializer, + BaseSerializer, + SerializationMixin, +) from tuf.api.exceptions import RepositoryError @@ -23,6 +29,11 @@ # pylint: disable=cyclic-import from tuf.api.metadata import Metadata, Signed +MetadataSerializer: TypeAlias = BaseSerializer +MetadataDeserializer: TypeAlias = BaseDeserializer +SignedSerializer: TypeAlias = BaseSerializer +SignedDeserializer: TypeAlias = BaseDeserializer + class SerializationError(RepositoryError): """Error during serialization.""" @@ -30,30 +41,3 @@ class SerializationError(RepositoryError): class DeserializationError(RepositoryError): """Error during deserialization.""" - - -class MetadataDeserializer(metaclass=abc.ABCMeta): - """Abstract base class for deserialization of Metadata objects.""" - - @abc.abstractmethod - def deserialize(self, raw_data: bytes) -> "Metadata": - """Deserialize bytes to Metadata object.""" - raise NotImplementedError - - -class MetadataSerializer(metaclass=abc.ABCMeta): - """Abstract base class for serialization of Metadata objects.""" - - @abc.abstractmethod - def serialize(self, metadata_obj: "Metadata") -> bytes: - """Serialize Metadata object to bytes.""" - raise NotImplementedError - - -class SignedSerializer(metaclass=abc.ABCMeta): - """Abstract base class for serialization of Signed objects.""" - - @abc.abstractmethod - def serialize(self, signed_obj: "Signed") -> bytes: - """Serialize Signed object to bytes.""" - raise NotImplementedError diff --git a/tuf/api/serialization/json.py b/tuf/api/serialization/json.py index 3355511a66..1c2b0fb5e0 100644 --- a/tuf/api/serialization/json.py +++ b/tuf/api/serialization/json.py @@ -7,43 +7,61 @@ metadata to the OLPC Canonical JSON format for signature generation and verification. """ - -import json -from typing import Optional +from typing import Optional, Type from securesystemslib.formats import encode_canonical +from securesystemslib.serialization import ( + JSONDeserializer as BaseJSONDeserializer, +) +from securesystemslib.serialization import JSONSerializer as BaseJSONSerializer # pylint: disable=cyclic-import # ... to allow de/serializing Metadata and Signed objects here, while also # creating default de/serializers there (see metadata local scope imports). # NOTE: A less desirable alternative would be to add more abstraction layers. -from tuf.api.metadata import Metadata, Signed +from tuf.api.metadata import ( + BaseMetadata, + Envelope, + Metadata, + Root, + Signed, + Snapshot, + Targets, + Timestamp, +) from tuf.api.serialization import ( DeserializationError, - MetadataDeserializer, - MetadataSerializer, SerializationError, SignedSerializer, ) -class JSONDeserializer(MetadataDeserializer): - """Provides JSON to Metadata deserialize method.""" +class JSONDeserializer(BaseJSONDeserializer): + """Provides JSON to ``BaseMetadata`` deserialize method.""" + + def deserialize(self, raw_data: bytes) -> BaseMetadata: + """Deserialize utf-8 encoded JSON bytes into ``BaseMetadata`` instance. + + Creates ``Metadata`` or ``Envelope`` instance based on presence of + ``payload`` or ``signed`` field.""" - def deserialize(self, raw_data: bytes) -> Metadata: - """Deserialize utf-8 encoded JSON bytes into Metadata object.""" try: - json_dict = json.loads(raw_data.decode("utf-8")) - metadata_obj = Metadata.from_dict(json_dict) + json_dict = super().deserialize(raw_data) + + if "payload" in json_dict: + return Envelope.from_dict(json_dict) + + if "signed" in json_dict: + return Metadata.from_dict(json_dict) + + raise ValueError("unrecognized metadata") except Exception as e: raise DeserializationError("Failed to deserialize JSON") from e - return metadata_obj - -class JSONSerializer(MetadataSerializer): - """Provides Metadata to JSON serialize method. +class JSONSerializer(BaseJSONSerializer): + """Provides ``BaseMetadata`` to JSON serialize method. Args: compact: A boolean indicating if the JSON bytes generated in @@ -55,26 +73,19 @@ class JSONSerializer(MetadataSerializer): """ def __init__(self, compact: bool = False, validate: Optional[bool] = False): - self.compact = compact + super().__init__(compact) self.validate = validate - def serialize(self, metadata_obj: Metadata) -> bytes: - """Serialize Metadata object into utf-8 encoded JSON bytes.""" + def serialize(self, obj: BaseMetadata) -> bytes: + """Serialize ``BaseMetadata`` object into utf-8 encoded JSON bytes.""" try: - indent = None if self.compact else 1 - separators = (",", ":") if self.compact else (",", ": ") - json_bytes = json.dumps( - metadata_obj.to_dict(), - indent=indent, - separators=separators, - sort_keys=True, - ).encode("utf-8") + json_bytes = BaseJSONSerializer.serialize(self, obj) if self.validate: try: new_md_obj = JSONDeserializer().deserialize(json_bytes) - if metadata_obj != new_md_obj: + if obj != new_md_obj: raise ValueError( "Metadata changes if you serialize and deserialize." ) @@ -87,15 +98,45 @@ def serialize(self, metadata_obj: Metadata) -> bytes: return json_bytes +class SignedJSONDeserializer(BaseJSONDeserializer): + """Provides JSON to ``Signed`` deserialize method.""" + + def deserialize(self, raw_data: bytes) -> Signed: + """Deserialize utf-8 encoded JSON bytes into ``Signed`` instance. + + Creates ``Targets``, ``Snapshot``, ``Timestamp`` or ``Root`` instance + based on value in ``_type`` field.""" + try: + json_dict = super().deserialize(raw_data) + + _type = json_dict["_type"] + + if _type == Targets.type: + _cls: Type[Signed] = Targets + elif _type == Snapshot.type: + _cls = Snapshot + elif _type == Timestamp.type: + _cls = Timestamp + elif _type == Root.type: + _cls = Root + else: + raise ValueError(f'unrecognized metadata type "{_type}"') + + except Exception as e: + raise SerializationError("Failed to serialize JSON") from e + + return _cls.from_dict(json_dict) + + class CanonicalJSONSerializer(SignedSerializer): """Provides Signed to OLPC Canonical JSON serialize method.""" - def serialize(self, signed_obj: Signed) -> bytes: + def serialize(self, obj: Signed) -> bytes: """Serialize Signed object into utf-8 encoded OLPC Canonical JSON bytes. """ try: - signed_dict = signed_obj.to_dict() + signed_dict = obj.to_dict() canonical_bytes = encode_canonical(signed_dict).encode("utf-8") except Exception as e: