From 54969e8c76f4892cf2868c5fb9a257a69a026755 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 8 Nov 2021 15:50:38 +0200 Subject: [PATCH 1/3] RepositorySimulator: add non-consistent snapshot support Extend URL partitioning to support serving non-versioned metadata and non-prefixed target files. Signed-off-by: Teodora Sechkova --- tests/repository_simulator.py | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index cf74748bf3..ec8a899d5f 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -111,9 +111,12 @@ def __init__(self): # target downloads are served from this dict self.target_files: Dict[str, RepositoryTarget] = {} - # Whether to compute hashes and legth for meta in snapshot/timestamp + # Whether to compute hashes and length for meta in snapshot/timestamp self.compute_metafile_hashes_length = False + # Enable hash-prefixed target file names + self.prefix_targets_with_hash = True + self.dump_dir = None self.dump_version = 0 @@ -192,24 +195,32 @@ def fetch(self, url: str) -> Iterator[bytes]: """Fetches data from the given url and returns an Iterator (or yields bytes). """ - if not self.root.consistent_snapshot: - raise NotImplementedError("non-consistent snapshot not supported") path = parse.urlparse(url).path if path.startswith("/metadata/") and path.endswith(".json"): + # figure out rolename and version ver_and_name = path[len("/metadata/") :][: -len(".json")] - # only consistent_snapshot supported ATM: timestamp is special case - if ver_and_name == "timestamp": - version = None - role = "timestamp" - else: - version, _, role = ver_and_name.partition(".") + version, _, role = ver_and_name.partition(".") + # root is always version-prefixed while timestamp is always NOT + if role == "root" or ( + self.root.consistent_snapshot and ver_and_name != "timestamp" + ): version = int(version) + else: + # the file is not version-prefixed + role = ver_and_name + version = None + yield self._fetch_metadata(role, version) elif path.startswith("/targets/"): # figure out target path and hash prefix target_path = path[len("/targets/") :] dir_parts, sep, prefixed_filename = target_path.rpartition("/") - prefix, _, filename = prefixed_filename.partition(".") + # extract the hash prefix, if any + if self.root.consistent_snapshot and self.prefix_targets_with_hash: + prefix, _, filename = prefixed_filename.partition(".") + else: + filename = prefixed_filename + prefix = None target_path = f"{dir_parts}{sep}{filename}" yield self._fetch_target(target_path, prefix) @@ -257,7 +268,7 @@ def _fetch_metadata( elif role == "targets": md = self.md_targets else: - md = self.md_delegates[role] + md = self.md_delegates.get(role) if md is None: raise FetcherHTTPError(f"Unknown role {role}", 404) From a72fa62906903df5379e839b47faca76136c5873 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Fri, 12 Nov 2021 17:26:42 +0200 Subject: [PATCH 2/3] ngtests: Add consistent_snapshot tests Add tests for ngclient.Updater toggling 'consitent_snapshot' and 'prefix_targets_with_hash'. Signed-off-by: Teodora Sechkova --- tests/test_updater_consistent_snapshot.py | 237 ++++++++++++++++++++++ 1 file changed, 237 insertions(+) create mode 100644 tests/test_updater_consistent_snapshot.py diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py new file mode 100644 index 0000000000..4289d7b860 --- /dev/null +++ b/tests/test_updater_consistent_snapshot.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Test ngclient Updater toggling consistent snapshot""" + +import os +import sys +import tempfile +import unittest +from typing import Any, Dict, Iterable, List, Optional +from unittest.mock import call, patch + +from tests import utils +from tests.repository_simulator import RepositorySimulator +from tuf.api.metadata import ( + SPECIFICATION_VERSION, + TOP_LEVEL_ROLE_NAMES, + Targets, +) +from tuf.ngclient import Updater + + +class TestConsistentSnapshot(unittest.TestCase): + """Test different combinations of 'consistent_snapshot' and + 'prefix_targets_with_hash' and verify that the correct URLs + are formed for each combination""" + + def setUp(self) -> None: + self.temp_dir = tempfile.TemporaryDirectory() + self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") + self.targets_dir = os.path.join(self.temp_dir.name, "targets") + os.mkdir(self.metadata_dir) + os.mkdir(self.targets_dir) + + def tearDown(self) -> None: + self.temp_dir.cleanup() + + def _init_repo( + self, consistent_snapshot: bool, prefix_targets: bool = True + ) -> RepositorySimulator: + """Create a new RepositorySimulator instance""" + sim = RepositorySimulator() + sim.root.consistent_snapshot = consistent_snapshot + sim.root.version += 1 + sim.publish_root() + sim.prefix_targets_with_hash = prefix_targets + + # Init trusted root with the latest consistent_snapshot + with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: + root = sim.download_bytes( + "https://example.com/metadata/2.root.json", 100000 + ) + f.write(root) + + return sim + + def _init_updater(self, sim: RepositorySimulator) -> Updater: + """Create a new Updater instance""" + return Updater( + self.metadata_dir, + "https://example.com/metadata/", + self.targets_dir, + "https://example.com/targets/", + sim, + ) + + @staticmethod + def _cleanup_dir(path: str) -> None: + """Delete all files inside a directory""" + for filepath in [ + os.path.join(path, filename) for filename in os.listdir(path) + ]: + os.remove(filepath) + + def _assert_metadata_files_exist(self, roles: Iterable[str]) -> None: + """Assert that local metadata files exist for 'roles'""" + local_metadata_files = os.listdir(self.metadata_dir) + for role in roles: + self.assertIn(f"{role}.json", local_metadata_files) + + def _assert_targets_files_exist(self, filenames: Iterable[str]) -> None: + """Assert that local files with 'filenames' exist""" + local_target_files = os.listdir(self.targets_dir) + for filename in filenames: + self.assertIn(filename, local_target_files) + + top_level_roles_data: utils.DataSet = { + "consistent_snaphot disabled": { + "consistent_snapshot": False, + "calls": [ + call("root", 3), + call("timestamp", None), + call("snapshot", None), + call("targets", None), + ], + }, + "consistent_snaphot enabled": { + "consistent_snapshot": True, + "calls": [ + call("root", 3), + call("timestamp", None), + call("snapshot", 1), + call("targets", 1), + ], + }, + } + + @utils.run_sub_tests_with_dataset(top_level_roles_data) + def test_top_level_roles_update(self, test_case_data: Dict[str, Any]): + # Test if the client fetches and stores metadata files with the + # correct version prefix, depending on 'consistent_snapshot' config + consistent_snapshot: bool = test_case_data["consistent_snapshot"] + expected_calls: List[Any] = test_case_data["calls"] + + sim = self._init_repo(consistent_snapshot) + updater = self._init_updater(sim) + + with patch.object( + sim, "_fetch_metadata", wraps=sim._fetch_metadata + ) as wrapped_fetch: + updater.refresh() + + # metadata files are fetched with the expected version (or None) + self.assertListEqual(wrapped_fetch.call_args_list, expected_calls) + # metadata files are always persisted without a version prefix + self._assert_metadata_files_exist(TOP_LEVEL_ROLE_NAMES) + + self._cleanup_dir(self.metadata_dir) + + delegated_roles_data: utils.DataSet = { + "consistent_snaphot disabled": { + "consistent_snapshot": False, + "expected_version": None, + }, + "consistent_snaphot enabled": { + "consistent_snapshot": True, + "expected_version": 1, + }, + } + + @utils.run_sub_tests_with_dataset(delegated_roles_data) + def test_delegated_roles_update(self, test_case_data: Dict[str, Any]): + # Test if the client fetches and stores delegated metadata files with + # the correct version prefix, depending on 'consistent_snapshot' config + consistent_snapshot: bool = test_case_data["consistent_snapshot"] + expected_version: Optional[int] = test_case_data["expected_version"] + rolenames = ["role1", "..", "."] + expected_calls = [call(role, expected_version) for role in rolenames] + + sim = self._init_repo(consistent_snapshot) + # Add new delegated targets + spec_version = ".".join(SPECIFICATION_VERSION) + targets = Targets(1, spec_version, sim.safe_expiry, {}, None) + for role in rolenames: + sim.add_delegation("targets", role, targets, False, ["*"], None) + sim.update_snapshot() + updater = self._init_updater(sim) + updater.refresh() + + with patch.object( + sim, "_fetch_metadata", wraps=sim._fetch_metadata + ) as wrapped_fetch: + # trigger updater to fetch the delegated metadata + updater.get_targetinfo("anything") + # metadata files are fetched with the expected version (or None) + self.assertListEqual(wrapped_fetch.call_args_list, expected_calls) + # metadata files are always persisted without a version prefix + self._assert_metadata_files_exist(rolenames) + + self._cleanup_dir(self.metadata_dir) + + targets_download_data: utils.DataSet = { + "consistent_snaphot disabled": { + "consistent_snapshot": False, + "prefix_targets": True, + "hash_algo": None, + }, + "consistent_snaphot enabled without prefixed targets": { + "consistent_snapshot": True, + "prefix_targets": False, + "hash_algo": None, + }, + "consistent_snaphot enabled with prefixed targets": { + "consistent_snapshot": True, + "prefix_targets": True, + "hash_algo": "sha256", + }, + } + + @utils.run_sub_tests_with_dataset(targets_download_data) + def test_download_targets(self, test_case_data: Dict[str, Any]): + # Test if the client fetches and stores target files with + # the correct hash prefix, depending on 'consistent_snapshot' + # and 'prefix_targets_with_hash' config + consistent_snapshot: bool = test_case_data["consistent_snapshot"] + prefix_targets_with_hash: bool = test_case_data["prefix_targets"] + hash_algo: Optional[str] = test_case_data["hash_algo"] + targetpaths = ["file", "file.txt", "..file.ext", "f.le"] + + sim = self._init_repo(consistent_snapshot, prefix_targets_with_hash) + # Add targets to repository + for targetpath in targetpaths: + sim.targets.version += 1 + sim.add_target("targets", b"content", targetpath) + sim.update_snapshot() + + updater = self._init_updater(sim) + updater.config.prefix_targets_with_hash = prefix_targets_with_hash + updater.refresh() + + with patch.object( + sim, "_fetch_target", wraps=sim._fetch_target + ) as wrapped_fetch_target: + + for targetpath in targetpaths: + info = updater.get_targetinfo(targetpath) + updater.download_target(info) + expected_prefix = ( + None if not hash_algo else info.hashes[hash_algo] + ) + # files are fetched with the expected hash prefix (or None) + wrapped_fetch_target.assert_called_once_with( + info.path, expected_prefix + ) + # target files are always persisted without hash prefix + self._assert_targets_files_exist([info.path]) + wrapped_fetch_target.reset_mock() + + self._cleanup_dir(self.targets_dir) + + +if __name__ == "__main__": + + utils.configure_test_logging(sys.argv) + unittest.main() From da1d975db08d39862b1556ca10f3b84f337b7241 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Thu, 18 Nov 2021 12:40:57 +0200 Subject: [PATCH 3/3] Remove test_refresh_on_consistent_targets Consistent snapshot and consistent targets are now extensively tested in test_updater_consistent_snapshot.py. Signed-off-by: Teodora Sechkova --- tests/test_updater_ng.py | 65 ---------------------------------------- 1 file changed, 65 deletions(-) diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index 607dd87580..321be5bd97 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -131,26 +131,6 @@ def tearDown(self): # Logs stdout and stderr from the sever subprocess. self.server_process_handler.flush_log() - def _create_consistent_target( - self, targetname: str, target_hash: str - ) -> None: - """Create consistent targets copies of their non-consistent counterparts - inside the repository directory. - - Args: - targetname: A string denoting the name of the target file. - target_hash: A string denoting the hash of the target. - - """ - consistent_target_name = f"{target_hash}.{targetname}" - source_path = os.path.join( - self.repository_directory, "targets", targetname - ) - destination_path = os.path.join( - self.repository_directory, "targets", consistent_target_name - ) - shutil.copy(source_path, destination_path) - def _modify_repository_root( self, modification_func, bump_version=False ) -> None: @@ -185,51 +165,6 @@ def _assert_files(self, roles: List[str]): client_files = sorted(os.listdir(self.client_directory)) self.assertEqual(client_files, expected_files) - # pylint: disable=protected-access - def test_refresh_on_consistent_targets(self): - # Generate a new root version where consistent_snapshot is set to true - def consistent_snapshot_modifier(root): - root.signed.consistent_snapshot = True - - self._modify_repository_root( - consistent_snapshot_modifier, bump_version=True - ) - updater = ngclient.Updater( - self.client_directory, - self.metadata_url, - self.dl_dir, - self.targets_url, - ) - - # All metadata is in local directory already - updater.refresh() - # Make sure that consistent snapshot is enabled - self.assertTrue(updater._trusted_set.root.signed.consistent_snapshot) - - # Get targetinfos, assert cache does not contain the files - info1 = updater.get_targetinfo("file1.txt") - info3 = updater.get_targetinfo("file3.txt") - self.assertIsNone(updater.find_cached_target(info1)) - self.assertIsNone(updater.find_cached_target(info3)) - - # Create consistent targets with file path HASH.FILENAME.EXT - target1_hash = list(info1.hashes.values())[0] - target3_hash = list(info3.hashes.values())[0] - self._create_consistent_target("file1.txt", target1_hash) - self._create_consistent_target("file3.txt", target3_hash) - - # Download files, assert that cache has correct files - updater.download_target(info1) - path = updater.find_cached_target(info1) - self.assertEqual(path, os.path.join(self.dl_dir, info1.path)) - self.assertIsNone(updater.find_cached_target(info3)) - - updater.download_target(info3) - path = updater.find_cached_target(info1) - self.assertEqual(path, os.path.join(self.dl_dir, info1.path)) - path = updater.find_cached_target(info3) - self.assertEqual(path, os.path.join(self.dl_dir, info3.path)) - def test_refresh_and_download(self): # Test refresh without consistent targets - targets without hash prefix.