From fd40dfc094ca33f00f153fd4ae44bf5f77441d5a Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 8 Oct 2021 10:30:31 +0300 Subject: [PATCH 1/3] tests: Refactor simulator signer handling Store signers with their keyids so they are easier to remove. The signers structure now looks like: { "role1": { "keyidA": SSlibSigner, "keyidB": SSlibSigner, } } Add convenience method for adding a signer. Signed-off-by: Jussi Kukkonen --- tests/repository_simulator.py | 24 +++++++++++++----------- tests/test_updater_with_simulator.py | 11 ++++++----- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index 3565fbe07f..c17e719d91 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -98,7 +98,8 @@ def __init__(self): self.signed_roots: List[bytes] = [] # signers are used on-demand at fetch time to sign metadata - self.signers: Dict[str, List[SSlibSigner]] = {} + # keys are roles, values are dicts of {keyid: signer} + self.signers: Dict[str, Dict[str, SSlibSigner]] = {} # target downloads are served from this dict self.target_files: Dict[str, RepositoryTarget] = {} @@ -139,6 +140,11 @@ def create_key(self) -> Tuple[Key, SSlibSigner]: sslib_key = generate_ed25519_key() return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key) + def add_signer(self, role:str, signer: SSlibSigner): + if role not in self.signers: + self.signers[role] = {} + self.signers[role][signer.key_dict["keyid"]] = signer + def _initialize(self): """Setup a minimal valid repository""" @@ -159,18 +165,16 @@ def _initialize(self): for role in TOP_LEVEL_ROLE_NAMES: key, signer = self.create_key() root.add_key(role, key) - # store the private key - if role not in self.signers: - self.signers[role] = [] - self.signers[role].append(signer) + self.add_signer(role, signer) + self.md_root = Metadata(root, OrderedDict()) self.publish_root() def publish_root(self): """Sign and store a new serialized version of root""" self.md_root.signatures.clear() - for signer in self.signers["root"]: - self.md_root.sign(signer) + for signer in self.signers["root"].values(): + self.md_root.sign(signer, append=True) self.signed_roots.append(self.md_root.to_bytes(JSONSerializer())) logger.debug("Published root v%d", self.root.version) @@ -242,7 +246,7 @@ def _fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: raise FetcherHTTPError(f"Unknown {role} version {version}", 404) md.signatures.clear() - for signer in self.signers[role]: + for signer in self.signers[role].values(): md.sign(signer, append=True) logger.debug( @@ -320,9 +324,7 @@ def add_delegation( # By default add one new key for the role key, signer = self.create_key() delegator.add_key(role.name, key) - if role.name not in self.signers: - self.signers[role.name] = [] - self.signers[role.name].append(signer) + self.add_signer(role.name, signer) # Add metadata for the role self.md_delegates[role.name] = Metadata(targets, OrderedDict()) diff --git a/tests/test_updater_with_simulator.py b/tests/test_updater_with_simulator.py index 8cfb9de72a..65727aa8c4 100644 --- a/tests/test_updater_with_simulator.py +++ b/tests/test_updater_with_simulator.py @@ -156,10 +156,10 @@ def test_keys_and_signatures(self): # Update top level metadata self._run_refresh() - # New targets: signed with a new key that is not in roles keys - old_signer = self.sim.signers["targets"].pop() + # New targets: signed with only a new key that is not in roles keys + old_signers = self.sim.signers.pop("targets") key, signer = self.sim.create_key() - self.sim.signers["targets"] = [signer] + self.sim.add_signer("targets", signer) self.sim.targets.version += 1 self.sim.update_snapshot() @@ -182,8 +182,9 @@ def test_keys_and_signatures(self): with self.assertRaises(UnsignedMetadataError): self._run_refresh() - # New targets: sign with both new and old key - self.sim.signers["targets"] = [signer, old_signer] + # New targets: sign with both new and any original keys + for signer in old_signers.values(): + self.sim.add_signer("targets", signer) self.sim.targets.version += 1 self.sim.update_snapshot() From ad80bd96c6b1c5bad8fa844389b242d8cb678472 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 8 Oct 2021 22:52:46 +0300 Subject: [PATCH 2/3] tests: Mark RepositorySimulator. create_key() static Signed-off-by: Jussi Kukkonen --- tests/repository_simulator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index c17e719d91..d9e5885515 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -136,7 +136,8 @@ def all_targets(self) -> Iterator[Tuple[str, Targets]]: for role, md in self.md_delegates.items(): yield role, md.signed - def create_key(self) -> Tuple[Key, SSlibSigner]: + @staticmethod + def create_key() -> Tuple[Key, SSlibSigner]: sslib_key = generate_ed25519_key() return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key) From e817473e3c8fdde0d1c8017ac29c2dac628b5d02 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 8 Oct 2021 13:15:52 +0300 Subject: [PATCH 3/3] tests: Add root key rotation tests Add one test with 1 subtests for various root key rotation situations. The test data definition format is a bit tricky but I tried to document that in the test function docstring. Signed-off-by: Jussi Kukkonen --- tests/test_updater_key_rotations.py | 211 ++++++++++++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 tests/test_updater_key_rotations.py diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py new file mode 100644 index 0000000000..0dad301ecd --- /dev/null +++ b/tests/test_updater_key_rotations.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Test ngclient Updater key rotation handling""" + +from dataclasses import dataclass +from typing import List, Optional, Type +import os +import sys +import tempfile +import unittest + +from securesystemslib.signer import SSlibSigner + +from tuf.api.metadata import Key +from tuf.exceptions import UnsignedMetadataError +from tuf.ngclient import Updater + +from tests import utils +from tests.repository_simulator import RepositorySimulator +from tests.utils import run_sub_tests_with_dataset + + +@dataclass +class RootVersion: + keys: List[int] + threshold: int + signatures: List[int] + result: Optional[Type[Exception]] = None + + +class TestUpdaterKeyRotations(unittest.TestCase): + """Test ngclient root rotation handling""" + # set dump_dir to trigger repository state dumps + dump_dir: Optional[str] = None + + def setUp(self) -> None: + self.sim = None + self.metadata_dir = None + self.subtest_count = 0 + # pylint: disable-next=consider-using-with + self.temp_dir = tempfile.TemporaryDirectory() + + # Pre-create a bunch of keys and signers + self.keys: List[Key] = [] + self.signers: List[SSlibSigner] = [] + for _ in range(10): + key, signer = RepositorySimulator.create_key() + self.keys.append(key) + self.signers.append(signer) + + def tearDown(self) -> None: + self.temp_dir.cleanup() + + def setup_subtest(self) -> None: + self.subtest_count += 1 + + # Setup repository for subtest: make sure no roots have been published + self.sim = RepositorySimulator() + self.sim.signed_roots.clear() + self.sim.root.version = 0 + + if self.dump_dir is not None: + # create subtest dumpdir + name = f"{self.id().split('.')[-1]}-{self.subtest_count}" + self.sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(self.sim.dump_dir) + + def _run_refresh(self) -> None: + """Create new updater, run refresh""" + if self.sim.dump_dir is not None: + self.sim.write() + + # bootstrap with initial root + self.metadata_dir = tempfile.mkdtemp(dir=self.temp_dir.name) + with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: + f.write(self.sim.signed_roots[0]) + + updater = Updater( + self.metadata_dir, + "https://example.com/metadata/", + fetcher=self.sim, + ) + updater.refresh() + + root_rotation_cases = { + "1-of-1 key rotation": [ + RootVersion([1], 1, [1]), + RootVersion([2], 1, [2, 1]), + RootVersion([2], 1, [2]), + ], + "1-of-1 key rotation, unused signatures": [ + RootVersion([1], 1, [3, 1, 4]), + RootVersion([2], 1, [3, 2, 1, 4]), + RootVersion([2], 1, [3, 2, 4]), + ], + "1-of-1 key rotation fail: not signed with old key": [ + RootVersion([1], 1, [1]), + RootVersion([2], 1, [2, 3, 4], UnsignedMetadataError), + ], + "1-of-1 key rotation fail: not signed with new key": [ + RootVersion([1], 1, [1]), + RootVersion([2], 1, [1, 3, 4], UnsignedMetadataError), + ], + "3-of-5, sign with different keycombos": [ + RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]), + RootVersion([0, 1, 2, 3, 4], 3, [0, 4, 1]), + RootVersion([0, 1, 2, 3, 4], 3, [0, 1, 3]), + RootVersion([0, 1, 2, 3, 4], 3, [0, 1, 3]), + ], + "3-of-5, one key rotated": [ + RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]), + RootVersion([0, 1, 3, 4, 5], 3, [0, 4, 1]), + ], + "3-of-5, one key rotate fails: not signed with 3 new keys": [ + RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]), + RootVersion([0, 1, 3, 4, 5], 3, [0, 2, 4], UnsignedMetadataError), + ], + "3-of-5, one key rotate fails: not signed with 3 old keys": [ + RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]), + RootVersion([0, 1, 3, 4, 5], 3, [0, 4, 5], UnsignedMetadataError), + ], + "3-of-5, one key rotated, with intermediate step": [ + RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]), + RootVersion([0, 1, 3, 4, 5], 3, [0, 2, 4, 5]), + RootVersion([0, 1, 3, 4, 5], 3, [0, 4, 5]), + ], + "3-of-5, all keys rotated, with intermediate step": [ + RootVersion([0, 1, 2, 3, 4], 3, [0, 2, 4]), + RootVersion([5, 6, 7, 8, 9], 3, [0, 2, 4, 5, 6, 7]), + RootVersion([5, 6, 7, 8, 9], 3, [5, 6, 7]), + ], + "1-of-3 threshold increase to 2-of-3": [ + RootVersion([1, 2, 3], 1, [1]), + RootVersion([1, 2, 3], 2, [1, 2]), + ], + "1-of-3 threshold bump to 2-of-3 fails: new threshold not reached": [ + RootVersion([1, 2, 3], 1, [1]), + RootVersion([1, 2, 3], 2, [2], UnsignedMetadataError), + ], + "2-of-3 threshold decrease to 1-of-3": [ + RootVersion([1, 2, 3], 2, [1, 2]), + RootVersion([1, 2, 3], 1, [1, 2]), + RootVersion([1, 2, 3], 1, [1]), + ], + "2-of-3 threshold decr. to 1-of-3 fails: old threshold not reached": [ + RootVersion([1, 2, 3], 2, [1, 2]), + RootVersion([1, 2, 3], 1, [1], UnsignedMetadataError), + ], + "1-of-2 threshold increase to 2-of-2": [ + RootVersion([1], 1, [1]), + RootVersion([1, 2], 2, [1, 2]), + ], + } + + @run_sub_tests_with_dataset(root_rotation_cases) + def test_root_rotation(self, root_versions: List[RootVersion]) -> None: + """Test Updater.refresh() with various sequences of root updates + + Each RootVersion in the list describes root keys and signatures of a + remote root metadata version. As an example: + RootVersion([1,2,3], 2, [1,2]) + defines a root that contains keys 1, 2 and 3 with threshold 2. The + metadata is signed with keys 1 and 2. + + Assert that refresh() result is expected and that local root on disk is + the expected one after all roots have been loaded from remote using the + standard client update workflow. + """ + self.setup_subtest() + + # Publish all remote root versions defined in root_versions + for rootver in root_versions: + # clear root keys, signers + self.sim.root.roles["root"].keyids.clear() + self.sim.signers["root"].clear() + + self.sim.root.roles["root"].threshold = rootver.threshold + for i in rootver.keys: + self.sim.root.add_key("root", self.keys[i]) + for i in rootver.signatures: + self.sim.add_signer("root", self.signers[i]) + self.sim.root.version += 1 + self.sim.publish_root() + + # run client workflow, assert success/failure + expected_result = root_versions[-1].result + if expected_result is None: + self._run_refresh() + expected_local_root = self.sim.signed_roots[-1] + else: + # failure expected: local root should be the root before last + with self.assertRaises(expected_result): + self._run_refresh() + expected_local_root = self.sim.signed_roots[-2] + + # assert local root on disk is expected + with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f: + self.assertEqual(f.read(), expected_local_root) + + +if __name__ == "__main__": + if "--dump" in sys.argv: + TestUpdaterKeyRotations.dump_dir = tempfile.mkdtemp() + print(f"Repository dumps in {TestUpdaterKeyRotations.dump_dir}") + sys.argv.remove("--dump") + + utils.configure_test_logging(sys.argv) + unittest.main()