-
Notifications
You must be signed in to change notification settings - Fork 275
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ngtests: Add top-level-roles update tests
Add ngclient/updater tests following the top-level-roles metadata update from the specification (Detailed client workflow) using RepositorySimulator. Signed-off-by: Teodora Sechkova <[email protected]>
- Loading branch information
Showing
1 changed file
with
361 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,361 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2021, New York University and the TUF contributors | ||
# SPDX-License-Identifier: MIT OR Apache-2.0 | ||
|
||
"""TODO | ||
""" | ||
import os | ||
import sys | ||
import tempfile | ||
import unittest | ||
from datetime import datetime, timedelta | ||
|
||
from securesystemslib import hash as sslib_hash | ||
|
||
from tests import utils | ||
from tests.repository_simulator import RepositorySimulator | ||
from tuf.api.metadata import Metadata | ||
from tuf.exceptions import ( | ||
BadVersionNumberError, | ||
ExpiredMetadataError, | ||
ReplayedMetadataError, | ||
RepositoryError, | ||
UnsignedMetadataError, | ||
) | ||
from tuf.ngclient import Updater | ||
from tuf.ngclient.config import UpdaterConfig | ||
|
||
|
||
class TestRefresh(unittest.TestCase): | ||
"""Test update of top-level metadata following | ||
'Detailed client workflow' in the specification.""" | ||
|
||
def setUp(self): | ||
self.temp_dir = tempfile.TemporaryDirectory() | ||
self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") | ||
self.targets_dir = os.path.join(self.temp_dir.name, "targets") | ||
os.mkdir(self.metadata_dir) | ||
os.mkdir(self.targets_dir) | ||
|
||
self.sim = RepositorySimulator() | ||
# Add one more root version to repository so that | ||
# refresh() always updates from local trusted root (v1) to | ||
# remote root (v2) | ||
self.sim.root.version += 1 | ||
self.sim.publish_root() | ||
|
||
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: | ||
root = self.sim.download_bytes( | ||
"https://example.com/metadata/1.root.json", 100000 | ||
) | ||
f.write(root) | ||
|
||
def tearDown(self): | ||
self.temp_dir.cleanup() | ||
|
||
def _run_refresh(self) -> Updater: | ||
updater = Updater( | ||
self.metadata_dir, | ||
"https://example.com/metadata/", | ||
"https://example.com/targets/", | ||
self.sim, | ||
) | ||
updater.refresh() | ||
return updater | ||
|
||
def test_first_time_refresh(self): | ||
# Metadata dir contains only the mandatory initial root.json | ||
metadata_files = os.listdir(self.metadata_dir) | ||
self.assertListEqual(metadata_files, ["root.json"]) | ||
|
||
self._run_refresh() | ||
|
||
# Top-level metadata can be found in metadata dir | ||
metadata_files_after_refresh = os.listdir(self.metadata_dir) | ||
self.assertListEqual( | ||
metadata_files_after_refresh, | ||
["root.json", "timestamp.json", "targets.json", "snapshot.json"], | ||
) | ||
|
||
def test_trusted_root_os_error(self): | ||
os.remove(os.path.join(self.metadata_dir, "root.json")) | ||
with self.assertRaises(OSError): | ||
self._run_refresh() | ||
|
||
def test_trusted_root_expired(self): | ||
# Local trusted root is expired | ||
root_path = os.path.join(self.metadata_dir, "root.json") | ||
md_root = Metadata.from_file(root_path) | ||
md_root.signed.expires = datetime.utcnow().replace( | ||
microsecond=0 | ||
) - timedelta(days=5) | ||
for signer in self.sim.signers["root"]: | ||
md_root.sign(signer) | ||
md_root.to_file(root_path) | ||
|
||
# The expiration of the trusted root metadata file does not lead | ||
# to failure in the update workflow and root is successfully updated | ||
# to a valid version. | ||
self._run_refresh() | ||
|
||
md_root = Metadata.from_file(root_path) | ||
self.assertEqual(md_root.signed.version, self.sim.root.version) | ||
|
||
def test_trusted_root_unsigned(self): | ||
# Local trusted root is not signed | ||
root_path = os.path.join(self.metadata_dir, "root.json") | ||
md_root = Metadata.from_file(root_path) | ||
md_root.signatures.clear() | ||
md_root.to_file(root_path) | ||
|
||
with self.assertRaises(UnsignedMetadataError): | ||
self._run_refresh() | ||
|
||
def test_max_root_rotations(self): | ||
# Root must stop looking for new versions after Y number of | ||
# intermediate files were downloaded. | ||
|
||
# Create some big number of root files in the repository | ||
highest_repo_root_version = UpdaterConfig.max_root_rotations + 10 | ||
for version in range( | ||
self.sim.root.version + 1, highest_repo_root_version | ||
): | ||
self.sim.root.version = version | ||
self.sim.publish_root() | ||
|
||
root_path = os.path.join(self.metadata_dir, "root.json") | ||
md_root = Metadata.from_file(root_path) | ||
initial_root_version = md_root.signed.version | ||
|
||
self._run_refresh() | ||
|
||
# Asserts that root version was increased with no more than 'max_root_rotations' | ||
md_root = Metadata.from_file(root_path) | ||
self.assertEqual( | ||
md_root.signed.version, | ||
initial_root_version + UpdaterConfig.max_root_rotations, | ||
) | ||
|
||
def test_intermediate_root_incorrectly_signed(self): | ||
# Check for an arbitrary software attack | ||
|
||
# Intermediate root v3 is unsigned | ||
self.sim.root.version += 1 | ||
root_signers = self.sim.signers["root"] | ||
self.sim.signers["root"].clear() | ||
self.sim.publish_root() | ||
|
||
# Final root v4 is correctly signed | ||
self.sim.root.version += 1 | ||
self.sim.signers["root"] = root_signers | ||
self.sim.publish_root() | ||
|
||
# Incorrectly signed intermediate root is detected | ||
with self.assertRaises(UnsignedMetadataError): | ||
self._run_refresh() | ||
|
||
def test_intermediate_root_expired(self): | ||
# The expiration of the new (intermediate) root metadata file | ||
# does not matter yet | ||
|
||
# Intermediate root v3 is expired | ||
self.sim.root.expires = datetime.utcnow().replace( | ||
microsecond=0 | ||
) - timedelta(days=5) | ||
self.sim.root.version += 1 | ||
self.sim.publish_root() | ||
|
||
# Final root v4 is up to date | ||
self.sim.root.expires = datetime.utcnow().replace( | ||
microsecond=0 | ||
) + timedelta(days=5) | ||
self.sim.root.version += 1 | ||
self.sim.publish_root() | ||
|
||
self._run_refresh() | ||
md_root = Metadata.from_file( | ||
os.path.join(self.metadata_dir, "root.json") | ||
) | ||
self.assertEqual(md_root.signed.version, self.sim.root.version) | ||
|
||
def test_final_root_incorrectly_signed(self): | ||
# Check for an arbitrary software attack | ||
self.sim.root.version += 1 | ||
self.sim.signers["root"].clear() | ||
self.sim.publish_root() | ||
|
||
with self.assertRaises(UnsignedMetadataError): | ||
self._run_refresh() | ||
|
||
def test_new_root_same_version(self): | ||
# Check for a rollback_attack | ||
# Repository serves a root file with the same version as previous | ||
self.sim.publish_root() | ||
with self.assertRaises(ReplayedMetadataError): | ||
self._run_refresh() | ||
|
||
def test_new_root_nonconsecutive_version(self): | ||
# Repository serves non-consecutive root version | ||
self.sim.root.version += 2 | ||
self.sim.publish_root() | ||
with self.assertRaises(ReplayedMetadataError): | ||
self._run_refresh() | ||
|
||
def test_final_root_expired(self): | ||
# Check for a freeze attack | ||
# Final root is expired | ||
self.sim.root.expires = datetime.utcnow().replace( | ||
microsecond=0 | ||
) - timedelta(days=5) | ||
self.sim.root.version += 1 | ||
self.sim.publish_root() | ||
|
||
with self.assertRaises(ExpiredMetadataError): | ||
self._run_refresh() | ||
|
||
def test_new_timestamp_unsigned(self): | ||
# Check for an arbitrary software attack | ||
self.sim.signers["timestamp"].clear() | ||
with self.assertRaises(UnsignedMetadataError): | ||
self._run_refresh() | ||
|
||
def test_new_timestamp_version_rollback(self): | ||
# Check for a rollback attack. | ||
self.sim.timestamp.version = 2 | ||
self._run_refresh() | ||
|
||
self.sim.timestamp.version = 1 | ||
with self.assertRaises(ReplayedMetadataError): | ||
self._run_refresh() | ||
|
||
def test_new_timestamp_snapshot_rollback(self): | ||
# Check for a rollback attack. | ||
self._run_refresh() | ||
|
||
self.sim.snapshot.version = 2 | ||
self.sim.update_timestamp() | ||
self._run_refresh() | ||
|
||
# Snapshot meta version is smaller than previous | ||
self.sim.timestamp.snapshot_meta.version = 1 | ||
self.sim.timestamp.version += 1 | ||
|
||
with self.assertRaises(ReplayedMetadataError): | ||
self._run_refresh() | ||
|
||
def test_new_timestamp_expired(self): | ||
# Check for a freeze attack | ||
self.sim.timestamp.expires = datetime.utcnow().replace( | ||
microsecond=0 | ||
) - timedelta(days=5) | ||
self.sim.update_timestamp() | ||
|
||
with self.assertRaises(ExpiredMetadataError): | ||
self._run_refresh() | ||
|
||
def test_new_snapshot_hash_mismatch(self): | ||
# Check against timestamp role’s snapshot hash | ||
|
||
# Add snapshot hash to timestamp and update | ||
self.sim.compute_metafile_hashes_length = True | ||
self.sim.update_timestamp() | ||
self._run_refresh() | ||
|
||
# Modify the snapshot contents without updating | ||
# timestamp's snapshot hash | ||
self.sim.compute_metafile_hashes_length = False | ||
self.sim.update_snapshot() | ||
|
||
# Hash mismatch error | ||
with self.assertRaises(RepositoryError): | ||
self._run_refresh() | ||
|
||
def test_new_snapshot_unsigned(self): | ||
# Check for an arbitrary software attack | ||
self.sim.signers["snapshot"].clear() | ||
with self.assertRaises(UnsignedMetadataError): | ||
self._run_refresh() | ||
|
||
# TODO: RepositorySimulator works always with consistent snapshot | ||
# enabled which forces the client to look for the snapshot version | ||
# written in timestamp (which leads to "Unknown snapshot version"). | ||
# This fails the test for a snapshot version mismatch. | ||
|
||
# def test_new_snapshot_version_mismatch(self): | ||
# # Check against timestamp role’s snapshot version | ||
|
||
# # Increase snapshot version without updating | ||
# # timestamp's snapshot version | ||
# self.sim.snapshot.version += 1 | ||
# with self.assertRaises(BadVersionNumberError): | ||
# self._run_refresh() | ||
|
||
def test_new_snapshot_version_rollback(self): | ||
self.sim.snapshot.version = 2 | ||
self.sim.update_timestamp() | ||
self._run_refresh() | ||
|
||
self.sim.snapshot.version = 1 | ||
self.sim.update_timestamp() | ||
|
||
with self.assertRaises(ReplayedMetadataError): | ||
self._run_refresh() | ||
|
||
def test_new_snapshot_expired(self): | ||
# Check for a freeze attack. | ||
self.sim.snapshot.expires = datetime.utcnow().replace( | ||
microsecond=0 | ||
) - timedelta(days=5) | ||
self.sim.update_snapshot() | ||
|
||
with self.assertRaises(ExpiredMetadataError): | ||
self._run_refresh() | ||
|
||
def test_new_targets_hash_mismatch(self): | ||
# Check against snapshot role’s targets hash | ||
self.sim.compute_metafile_hashes_length = True | ||
self.sim.update_snapshot() | ||
self._run_refresh() | ||
|
||
# Modify targets contents without updating | ||
# snapshot's targets hash | ||
self.sim.targets.version += 1 | ||
self.sim.compute_metafile_hashes_length = False | ||
self.sim.update_snapshot() | ||
|
||
with self.assertRaises(RepositoryError): | ||
self._run_refresh() | ||
|
||
def test_new_targets_unsigned(self): | ||
# Check for an arbitrary software attack | ||
self.sim.signers["targets"].clear() | ||
with self.assertRaises(UnsignedMetadataError): | ||
self._run_refresh() | ||
|
||
# TODO: RepositorySimulator works always with consistent snapshot | ||
# enabled which forces the client to look for the targets version | ||
# written in snapshot (which leads to "Unknown targets version"). | ||
# This fails the test for a targets version mismatch. | ||
|
||
# def test_new_targets_version_mismatch(self): | ||
# # Check against snapshot role’s targets version | ||
# self.sim.targets.version += 1 | ||
# with self.assertRaises(BadVersionNumberError): | ||
# self._run_refresh() | ||
|
||
def test_new_targets_expired(self): | ||
# Check for a freeze attack. | ||
self.sim.targets.expires = datetime.utcnow().replace( | ||
microsecond=0 | ||
) - timedelta(days=5) | ||
self.sim.update_snapshot() | ||
|
||
with self.assertRaises(ExpiredMetadataError): | ||
self._run_refresh() | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
utils.configure_test_logging(sys.argv) | ||
unittest.main() |