From 20662aa83bc64fed5a7ccc78a21d360883cb078b Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Sun, 11 Jul 2021 11:04:58 +0300 Subject: [PATCH] WIP: Add rolename to Metadata * Lot of changes in API * Advantages are smaller than expected * from_bytes() requires a rolename: this could be inferred for other roles but not for targets... but this seems hard to express in the API * If Signed implementations had Metadata constructors (as in the Generics branch) the root/snapshot/timestamp implementations would automatically be able to infer the rolename but again... targets would still need it in the API so either the targets constructor would be different or there would be an optional rolename (that you would only need for delegated targets) Signed-off-by: Jussi Kukkonen --- tests/test_api.py | 39 +++++++++---------- tests/test_trusted_metadata_set.py | 14 +++---- tuf/api/metadata.py | 38 ++++++++++++------ tuf/api/serialization/__init__.py | 2 +- tuf/api/serialization/json.py | 4 +- .../_internal/trusted_metadata_set.py | 8 ++-- 6 files changed, 59 insertions(+), 46 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index fb50faff46..ce967f5585 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -107,7 +107,7 @@ def test_generic_read(self): path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') metadata_obj = Metadata.from_file(path) with open(path, 'rb') as f: - metadata_obj2 = Metadata.from_bytes(f.read()) + metadata_obj2 = Metadata.from_bytes(metadata, f.read()) # Assert that both methods instantiate the right inner class for # each metadata type and ... @@ -130,7 +130,7 @@ def test_generic_read(self): with self.assertRaises(DeserializationError): Metadata.from_file(bad_metadata_path) with self.assertRaises(DeserializationError): - Metadata.from_bytes(bad_string) + Metadata.from_bytes(metadata, bad_string) os.remove(bad_metadata_path) @@ -150,7 +150,8 @@ def test_read_write_read_compare(self): path_2 = path + '.tmp' metadata_obj.to_file(path_2) - metadata_obj_2 = Metadata.from_file(path_2) + # unusual filename: specify rolename explicitly + metadata_obj_2 = Metadata.from_file(path_2, rolename=metadata) self.assertDictEqual( metadata_obj.to_dict(), @@ -274,7 +275,7 @@ def test_metadata_base(self): data = md.to_dict() data["signatures"].append({"keyid": data["signatures"][0]["keyid"], "sig": "foo"}) with self.assertRaises(ValueError): - Metadata.from_dict(data) + Metadata.from_dict(md.rolename, data) def test_metadata_snapshot(self): @@ -351,35 +352,31 @@ def test_metadata_verify_delegate(self): role2 = Metadata.from_file(role2_path) # test the expected delegation tree - root.verify_delegate('root', root) - root.verify_delegate('snapshot', snapshot) - root.verify_delegate('targets', targets) - targets.verify_delegate('role1', role1) - role1.verify_delegate('role2', role2) + root.verify_delegate(root) + root.verify_delegate(snapshot) + root.verify_delegate(targets) + targets.verify_delegate(role1) + role1.verify_delegate(role2) # only root and targets can verify delegates with self.assertRaises(TypeError): - snapshot.verify_delegate('snapshot', snapshot) + snapshot.verify_delegate(snapshot) # verify fails for roles that are not delegated by delegator with self.assertRaises(ValueError): - root.verify_delegate('role1', role1) + root.verify_delegate(role1) with self.assertRaises(ValueError): - targets.verify_delegate('targets', targets) + targets.verify_delegate(targets) # verify fails when delegator has no delegations with self.assertRaises(ValueError): - role2.verify_delegate('role1', role1) + role2.verify_delegate(role1) # verify fails when delegate content is modified expires = snapshot.signed.expires snapshot.signed.bump_expiration() with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('snapshot', snapshot) + root.verify_delegate(snapshot) snapshot.signed.expires = expires - # verify fails if roles keys do not sign the metadata - with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('timestamp', snapshot) - # Add a key to snapshot role, make sure the new sig fails to verify ts_keyid = next(iter(root.signed.roles["timestamp"].keyids)) root.signed.add_key("snapshot", root.signed.keys[ts_keyid]) @@ -387,17 +384,17 @@ def test_metadata_verify_delegate(self): # verify succeeds if threshold is reached even if some signatures # fail to verify - root.verify_delegate('snapshot', snapshot) + root.verify_delegate(snapshot) # verify fails if threshold of signatures is not reached root.signed.roles['snapshot'].threshold = 2 with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('snapshot', snapshot) + root.verify_delegate(snapshot) # verify succeeds when we correct the new signature and reach the # threshold of 2 keys snapshot.sign(SSlibSigner(self.keystore['timestamp']), append=True) - root.verify_delegate('snapshot', snapshot) + root.verify_delegate(snapshot) def test_key_class(self): diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index b59e9de78b..46facede0b 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -85,7 +85,7 @@ def test_update_with_invalid_json(self): with self.assertRaises(exceptions.RepositoryError): TrustedMetadataSet(b"") # root.json is invalid - root = Metadata.from_bytes(data["root"]) + root = Metadata.from_bytes("root", data["root"]) root.signed.version += 1 with self.assertRaises(exceptions.RepositoryError): TrustedMetadataSet(json.dumps(root.to_dict()).encode()) @@ -94,16 +94,16 @@ def test_update_with_invalid_json(self): trusted_set.root_update_finished() top_level_md = [ - (data["timestamp"], trusted_set.update_timestamp), - (data["snapshot"], trusted_set.update_snapshot), - (data["targets"], trusted_set.update_targets), + ("timestamp", trusted_set.update_timestamp), + ("snapshot", trusted_set.update_snapshot), + ("targets", trusted_set.update_targets), ] - for metadata, update_func in top_level_md: + for rolename, update_func in top_level_md: # metadata is not json with self.assertRaises(exceptions.RepositoryError): update_func(b"") # metadata is invalid - md = Metadata.from_bytes(metadata) + md = Metadata.from_bytes(rolename, data[rolename]) md.signed.version += 1 with self.assertRaises(exceptions.RepositoryError): update_func(json.dumps(md.to_dict()).encode()) @@ -112,7 +112,7 @@ def test_update_with_invalid_json(self): with self.assertRaises(exceptions.RepositoryError): update_func(data["root"]) - update_func(metadata) + update_func(data[rolename]) # TODO test updating over initial metadata (new keys, newer timestamp, etc) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index db5fd89c92..cfbc8473e3 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -18,6 +18,7 @@ import abc import io import logging +import os.path import tempfile from collections import OrderedDict from datetime import datetime, timedelta @@ -68,19 +69,22 @@ class Metadata: i.e. one of Targets, Snapshot, Timestamp or Root. signatures: An ordered dictionary of keyids to Signature objects, each signing the canonical serialized representation of 'signed'. + rolename: Name of the metadata role (e.g. "root") """ def __init__( - self, signed: "Signed", signatures: "OrderedDict[str, Signature]" + self, rolename: str, signed: "Signed", signatures: "OrderedDict[str, Signature]" ): + self.rolename = rolename self.signed = signed self.signatures = signatures @classmethod - def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata": + def from_dict(cls, rolename: str, metadata: Dict[str, Any]) -> "Metadata": """Creates Metadata object from its dict representation. Arguments: + rolename: Name of the metadata role metadata: TUF metadata in dict representation. Raises: @@ -99,14 +103,16 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata": if _type == "targets": inner_cls: Type[Signed] = Targets - elif _type == "snapshot": + elif _type == "snapshot" and rolename == "snapshot": inner_cls = Snapshot - elif _type == "timestamp": + elif _type == "timestamp" and rolename == "timestamp": inner_cls = Timestamp - elif _type == "root": + elif _type == "root" and rolename == "root": inner_cls = Root else: - raise ValueError(f'unrecognized metadata type "{_type}"') + raise ValueError( + f'unrecognized metadata type "{_type}" for role "{rolename}"' + ) # Make sure signatures are unique signatures: "OrderedDict[str, Signature]" = OrderedDict() @@ -119,6 +125,7 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata": signatures[sig.keyid] = sig return cls( + rolename=rolename, signed=inner_cls.from_dict(metadata.pop("signed")), signatures=signatures, ) @@ -129,17 +136,21 @@ def from_file( filename: str, deserializer: Optional[MetadataDeserializer] = None, storage_backend: Optional[StorageBackendInterface] = None, + rolename: Optional[str] = None, ) -> "Metadata": """Loads TUF metadata from file storage. Arguments: - filename: The path to read the file from. + filename: The path to read the file from. Filename is expected to be of type + .: e.g. "targets.json" deserializer: A MetadataDeserializer subclass instance that implements the desired wireline format deserialization. Per default a JSONDeserializer is used. storage_backend: An object that implements securesystemslib.storage.StorageBackendInterface. Per default a (local) FilesystemBackend is used. + rolename: Name of the metadata role. By default rolename is the + basename of the filename Raises: securesystemslib.exceptions.StorageError: The file cannot be read. @@ -153,17 +164,23 @@ def from_file( if storage_backend is None: storage_backend = FilesystemBackend() + if rolename is None: + basename = os.path.basename(filename) + rolename, ext = os.path.splitext(basename) + with storage_backend.get(filename) as file_obj: - return cls.from_bytes(file_obj.read(), deserializer) + return cls.from_bytes(rolename, file_obj.read(), deserializer) @staticmethod def from_bytes( + rolename: str, data: bytes, deserializer: Optional[MetadataDeserializer] = None, ) -> "Metadata": """Loads TUF metadata from raw data. Arguments: + rolename: Name of the metadata role data: metadata content as bytes. deserializer: Optional; A MetadataDeserializer instance that implements deserialization. Default is JSONDeserializer. @@ -183,7 +200,7 @@ def from_bytes( deserializer = JSONDeserializer() - return deserializer.deserialize(data) + return deserializer.deserialize(rolename, data) def to_dict(self) -> Dict[str, Any]: """Returns the dict representation of self.""" @@ -271,7 +288,6 @@ def sign( def verify_delegate( self, - delegated_role: str, delegated_metadata: "Metadata", signed_serializer: Optional[SignedSerializer] = None, ) -> None: @@ -279,7 +295,6 @@ def verify_delegate( threshold of keys for the delegated role 'delegated_role'. Args: - delegated_role: Name of the delegated role to verify delegated_metadata: The Metadata object for the delegated role signed_serializer: Optional; serializer used for delegate serialization. Default is CanonicalJSONSerializer. @@ -291,6 +306,7 @@ def verify_delegate( # Find the keys and role in delegator metadata role = None + delegated_role = delegated_metadata.rolename if isinstance(self.signed, Root): keys = self.signed.keys role = self.signed.roles.get(delegated_role) diff --git a/tuf/api/serialization/__init__.py b/tuf/api/serialization/__init__.py index 4ec0a4aef1..52913595fd 100644 --- a/tuf/api/serialization/__init__.py +++ b/tuf/api/serialization/__init__.py @@ -35,7 +35,7 @@ class MetadataDeserializer(metaclass=abc.ABCMeta): """Abstract base class for deserialization of Metadata objects.""" @abc.abstractmethod - def deserialize(self, raw_data: bytes) -> "Metadata": + def deserialize(self, rolename: str, raw_data: bytes) -> "Metadata": """Deserialize passed bytes to Metadata object.""" raise NotImplementedError diff --git a/tuf/api/serialization/json.py b/tuf/api/serialization/json.py index 3c223d0053..d2e5bb5ebd 100644 --- a/tuf/api/serialization/json.py +++ b/tuf/api/serialization/json.py @@ -30,11 +30,11 @@ class JSONDeserializer(MetadataDeserializer): """Provides JSON to Metadata deserialize method.""" - def deserialize(self, raw_data: bytes) -> Metadata: + def deserialize(self, rolename: str, raw_data: bytes) -> Metadata: """Deserialize utf-8 encoded JSON bytes into Metadata object.""" try: json_dict = json.loads(raw_data.decode("utf-8")) - metadata_obj = Metadata.from_dict(json_dict) + metadata_obj = Metadata.from_dict(rolename, json_dict) except Exception as e: # pylint: disable=broad-except raise DeserializationError from e diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index 8f580e0e6a..04f5ee774d 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -196,7 +196,7 @@ def update_root(self, data: bytes): logger.debug("Updating root") try: - new_root = Metadata.from_bytes(data) + new_root = Metadata.from_bytes("root", data) except DeserializationError as e: raise exceptions.RepositoryError("Failed to load root") from e @@ -261,7 +261,7 @@ def update_timestamp(self, data: bytes): raise RuntimeError("Cannot update timestamp after snapshot") try: - new_timestamp = Metadata.from_bytes(data) + new_timestamp = Metadata.from_bytes("timestamp", data) except DeserializationError as e: raise exceptions.RepositoryError("Failed to load timestamp") from e @@ -330,7 +330,7 @@ def update_snapshot(self, data: bytes): ) from e try: - new_snapshot = Metadata.from_bytes(data) + new_snapshot = Metadata.from_bytes("snapshot", data) except DeserializationError as e: raise exceptions.RepositoryError("Failed to load snapshot") from e @@ -429,7 +429,7 @@ def update_delegated_targets( ) from e try: - new_delegate = Metadata.from_bytes(data) + new_delegate = Metadata.from_bytes(role_name, data) except DeserializationError as e: raise exceptions.RepositoryError("Failed to load snapshot") from e