diff --git a/tests/test_api.py b/tests/test_api.py index 2cf1222884..6332134ac5 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -495,14 +495,14 @@ def test_metadata_targets(self): "sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0" } - fileinfo = TargetFile(length=28, hashes=hashes) + fileinfo = TargetFile(length=28, hashes=hashes, path=filename) # Assert that data is not aleady equal self.assertNotEqual( targets.signed.targets[filename].to_dict(), fileinfo.to_dict() ) # Update an already existing fileinfo - targets.signed.update(filename, fileinfo) + targets.signed.update(fileinfo) # Verify that data is updated self.assertEqual( targets.signed.targets[filename].to_dict(), fileinfo.to_dict() diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index b8a2d818ab..1f5867d2ce 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -340,7 +340,7 @@ def test_delegation_serialization(self, test_case_data: str): def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises(KeyError): - TargetFile.from_dict(copy.deepcopy(case_dict)) + TargetFile.from_dict(copy.deepcopy(case_dict), "file1.txt") valid_targetfiles: DataSet = { @@ -354,7 +354,7 @@ def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]): @run_sub_tests_with_dataset(valid_targetfiles) def test_targetfile_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) - target_file = TargetFile.from_dict(copy.copy(case_dict)) + target_file = TargetFile.from_dict(copy.copy(case_dict), "file1.txt") self.assertDictEqual(case_dict, target_file.to_dict()) diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index 1e2d95c429..9286f32c71 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -162,8 +162,8 @@ def test_refresh_on_consistent_targets(self): targetinfo3 = self.repository_updater.get_one_valid_targetinfo("file3.txt") # Create consistent targets with file path HASH.FILENAME.EXT - target1_hash = list(targetinfo1["fileinfo"].hashes.values())[0] - target3_hash = list(targetinfo3["fileinfo"].hashes.values())[0] + target1_hash = list(targetinfo1.hashes.values())[0] + target3_hash = list(targetinfo3.hashes.values())[0] self._create_consistent_target("file1.txt", target1_hash) self._create_consistent_target("file3.txt", target3_hash) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 01c306100a..8f5f48299d 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -1140,6 +1140,8 @@ class TargetFile(BaseFile): Attributes: length: An integer indicating the length of the target file. hashes: A dictionary of hash algorithm names to hash values. + path: A string denoting the path to a target file relative to a base + URL of targets. unrecognized_fields: Dictionary of all unrecognized fields. """ @@ -1147,6 +1149,7 @@ def __init__( self, length: int, hashes: Dict[str, str], + path: str, unrecognized_fields: Optional[Mapping[str, Any]] = None, ) -> None: @@ -1155,6 +1158,7 @@ def __init__( self.length = length self.hashes = hashes + self.path = path self.unrecognized_fields = unrecognized_fields or {} @property @@ -1162,13 +1166,13 @@ def custom(self) -> Any: return self.unrecognized_fields.get("custom", None) @classmethod - def from_dict(cls, target_dict: Dict[str, Any]) -> "TargetFile": + def from_dict(cls, target_dict: Dict[str, Any], path: str) -> "TargetFile": """Creates TargetFile object from its dict representation.""" length = target_dict.pop("length") hashes = target_dict.pop("hashes") # All fields left in the target_dict are unrecognized. - return cls(length, hashes, target_dict) + return cls(length, hashes, path, target_dict) def to_dict(self) -> Dict[str, Any]: """Returns the JSON-serializable dictionary representation of self.""" @@ -1234,7 +1238,9 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets": delegations = Delegations.from_dict(delegations_dict) res_targets = {} for target_path, target_info in targets.items(): - res_targets[target_path] = TargetFile.from_dict(target_info) + res_targets[target_path] = TargetFile.from_dict( + target_info, target_path + ) # All fields left in the targets_dict are unrecognized. return cls(*common_args, res_targets, delegations, signed_dict) @@ -1250,6 +1256,6 @@ def to_dict(self) -> Dict[str, Any]: return targets_dict # Modification. - def update(self, filename: str, fileinfo: TargetFile) -> None: + def update(self, fileinfo: TargetFile) -> None: """Assigns passed target file info to meta dict.""" - self.targets[filename] = fileinfo + self.targets[fileinfo.path] = fileinfo diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index dd6f86546e..1acf2c40bd 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -62,13 +62,13 @@ import logging import os -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import List, Optional, Set, Tuple from urllib import parse from securesystemslib import util as sslib_util from tuf import exceptions -from tuf.api.metadata import Targets +from tuf.api.metadata import TargetFile, Targets from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set from tuf.ngclient.config import UpdaterConfig from tuf.ngclient.fetcher import FetcherInterface @@ -146,8 +146,8 @@ def refresh(self) -> None: def get_one_valid_targetinfo( self, target_path: str - ) -> Optional[Dict[str, Any]]: - """Returns target information for 'target_path'. + ) -> Optional[TargetFile]: + """Returns TargetFile instance with information for 'target_path'. The return value can be used as an argument to :func:`download_target()` and :func:`updated_targets()`. @@ -174,14 +174,14 @@ def get_one_valid_targetinfo( TODO: download-related errors Returns: - A targetinfo dictionary or None + A TargetFile instance or None. """ return self._preorder_depth_first_walk(target_path) @staticmethod def updated_targets( - targets: List[Dict[str, Any]], destination_directory: str - ) -> List[Dict[str, Any]]: + targets: List[TargetFile], destination_directory: str + ) -> List[TargetFile]: """Checks whether local cached target files are up to date After retrieving the target information for the targets that should be @@ -204,17 +204,14 @@ def updated_targets( # against each hash listed for its fileinfo. Note: join() discards # 'destination_directory' if 'filepath' contains a leading path # separator (i.e., is treated as an absolute path). - filepath = target["filepath"] - target_fileinfo: "TargetFile" = target["fileinfo"] - - target_filepath = os.path.join(destination_directory, filepath) + target_filepath = os.path.join(destination_directory, target.path) if target_filepath in updated_targetpaths: continue try: with open(target_filepath, "rb") as target_file: - target_fileinfo.verify_length_and_hashes(target_file) + target.verify_length_and_hashes(target_file) # If the file does not exist locally or length and hashes # do not match, append to updated targets. except (OSError, exceptions.LengthOrHashMismatchError): @@ -225,15 +222,15 @@ def updated_targets( def download_target( self, - targetinfo: Dict, + targetinfo: TargetFile, destination_directory: str, target_base_url: Optional[str] = None, ): """Downloads the target file specified by 'targetinfo'. Args: - targetinfo: data received from get_one_valid_targetinfo() or - updated_targets(). + targetinfo: TargetFile instance received from + get_one_valid_targetinfo() or updated_targets(). destination_directory: existing local directory to download into. Note that new directories may be created inside destination_directory as required. @@ -254,19 +251,18 @@ def download_target( else: target_base_url = _ensure_trailing_slash(target_base_url) - target_fileinfo: "TargetFile" = targetinfo["fileinfo"] - target_filepath = targetinfo["filepath"] + target_filepath = targetinfo.path consistent_snapshot = self._trusted_set.root.signed.consistent_snapshot if consistent_snapshot and self.config.prefix_targets_with_hash: - hashes = list(target_fileinfo.hashes.values()) + hashes = list(targetinfo.hashes.values()) target_filepath = f"{hashes[0]}.{target_filepath}" full_url = parse.urljoin(target_base_url, target_filepath) with self._fetcher.download_file( - full_url, target_fileinfo.length + full_url, targetinfo.length ) as target_file: try: - target_fileinfo.verify_length_and_hashes(target_file) + targetinfo.verify_length_and_hashes(target_file) except exceptions.LengthOrHashMismatchError as e: raise exceptions.RepositoryError( f"{target_filepath} length or hashes do not match" @@ -274,7 +270,7 @@ def download_target( # Store the target file name without the HASH prefix. local_filepath = os.path.join( - destination_directory, targetinfo["filepath"] + destination_directory, targetinfo.path ) sslib_util.persist_temp_file(target_file, local_filepath) @@ -380,7 +376,7 @@ def _load_targets(self, role: str, parent_role: str) -> None: def _preorder_depth_first_walk( self, target_filepath: str - ) -> Optional[Dict[str, Any]]: + ) -> Optional[TargetFile]: """ Interrogates the tree of target delegations in order of appearance (which implicitly order trustworthiness), and returns the matching @@ -413,7 +409,7 @@ def _preorder_depth_first_walk( if target is not None: logger.debug("Found target in current role %s", role_name) - return {"filepath": target_filepath, "fileinfo": target} + return target # After preorder check, add current role to set of visited roles. visited_role_names.add((role_name, parent_role))