Skip to content

Commit

Permalink
ngtests: Add addtional asserts for files on disk
Browse files Browse the repository at this point in the history
Extend the TestRefresh cases with additional checks
for expected metadata files and their content written
on the file system.

Signed-off-by: Teodora Sechkova <[email protected]>
  • Loading branch information
sechkova committed Nov 9, 2021
1 parent 954331c commit 8a2c785
Showing 1 changed file with 94 additions and 22 deletions.
116 changes: 94 additions & 22 deletions tests/test_updater_top_level_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import os
import sys
import tempfile
from typing import List, Optional
import unittest
from datetime import datetime, timedelta

from tests import utils
from tests.repository_simulator import RepositorySimulator
from tuf.api.metadata import Metadata
from tuf.api.metadata import Metadata, TOP_LEVEL_ROLE_NAMES
from tuf.exceptions import (
BadVersionNumberError,
ExpiredMetadataError,
Expand Down Expand Up @@ -69,10 +70,21 @@ def _init_updater(self) -> Updater:
self.sim,
)

def _assert_files_exist(self, roles: List[str]) -> None:
"""Assert that local metadata files exist for 'roles'"""
expected_files = sorted([f"{role}.json" for role in roles])
local_metadata_files = sorted(os.listdir(self.metadata_dir))
self.assertListEqual(local_metadata_files, expected_files)

def _assert_content_equals(self, role: str, version: Optional[int]=None) -> None:
"""Assert that local file content is the expected"""
expected_content = self.sim._fetch_metadata(role, version)
with open(os.path.join(self.metadata_dir, f"{role}.json"), "rb") as f:
self.assertEqual(f.read(), expected_content)

def test_first_time_refresh(self) -> None:
# Metadata dir contains only the mandatory initial root.json
metadata_files = os.listdir(self.metadata_dir)
self.assertListEqual(metadata_files, ["root.json"])
self._assert_files_exist(["root"])

# Add one more root version to repository so that
# refresh() updates from local trusted root (v1) to
Expand All @@ -82,19 +94,19 @@ def test_first_time_refresh(self) -> None:

self._run_refresh()

# Top-level metadata can be found in metadata dir
metadata_files_after_refresh = os.listdir(self.metadata_dir)
metadata_files_after_refresh.sort()
self.assertListEqual(
metadata_files_after_refresh,
["root.json", "snapshot.json", "targets.json", "timestamp.json"],
)
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
for role in TOP_LEVEL_ROLE_NAMES:
version = 2 if role == "root" else None
self._assert_content_equals(role, version)

def test_trusted_root_missing(self) -> None:
os.remove(os.path.join(self.metadata_dir, "root.json"))
with self.assertRaises(OSError):
self._run_refresh()

# Metadata dir is empty
self.assertFalse(os.listdir(self.metadata_dir))

def test_trusted_root_expired(self) -> None:
# Create an expired root version
self.sim.root.expires = self.past_datetime
Expand All @@ -107,8 +119,8 @@ def test_trusted_root_expired(self) -> None:
with self.assertRaises(ExpiredMetadataError):
updater.refresh()

with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f:
self.assertEqual(f.read(), self.sim.signed_roots[-1])
self._assert_files_exist(["root"])
self._assert_content_equals("root", 2)

# Local root metadata can be loaded even if expired
updater = self._init_updater()
Expand All @@ -117,11 +129,11 @@ def test_trusted_root_expired(self) -> None:
self.sim.root.expires = self.sim.safe_expiry
self.sim.root.version += 1
self.sim.publish_root()

updater.refresh()

with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f:
self.assertEqual(f.read(), self.sim.signed_roots[-1])
# Root is successfully updated to latest version
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
self._assert_content_equals("root", 3)

def test_trusted_root_unsigned(self) -> None:
# Local trusted root is not signed
Expand All @@ -133,6 +145,11 @@ def test_trusted_root_unsigned(self) -> None:
with self.assertRaises(UnsignedMetadataError):
self._run_refresh()

# The update failed, no changes in metadata
self._assert_files_exist(["root"])
md_root_after = Metadata.from_file(root_path)
self.assertEqual(md_root.to_bytes(), md_root_after.to_bytes())

def test_max_root_rotations(self) -> None:
# Root must stop looking for new versions after Y number of
# intermediate files were downloaded.
Expand Down Expand Up @@ -175,6 +192,11 @@ def test_intermediate_root_incorrectly_signed(self) -> None:
with self.assertRaises(UnsignedMetadataError):
self._run_refresh()

# The update failed, latest root version is v1
self._assert_files_exist(["root"])
self._assert_content_equals("root", 1)


def test_intermediate_root_expired(self) -> None:
# The expiration of the new (intermediate) root metadata file
# does not matter yet
Expand All @@ -190,34 +212,46 @@ def test_intermediate_root_expired(self) -> None:
self.sim.publish_root()

self._run_refresh()
md_root = Metadata.from_file(
os.path.join(self.metadata_dir, "root.json")
)
self.assertEqual(md_root.signed.version, self.sim.root.version)

# Successfully updated to root v3
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
self._assert_content_equals("root", 3)

def test_final_root_incorrectly_signed(self) -> None:
# Check for an arbitrary software attack
self.sim.root.version += 1
self.sim.root.version += 1 # root v2
self.sim.signers["root"].clear()
self.sim.publish_root()

with self.assertRaises(UnsignedMetadataError):
self._run_refresh()

# The update failed, latest root version is v1
self._assert_files_exist(["root"])
self._assert_content_equals("root", 1)

def test_new_root_same_version(self) -> None:
# Check for a rollback_attack
# Repository serves a root file with the same version as previous
self.sim.publish_root()
with self.assertRaises(ReplayedMetadataError):
self._run_refresh()

# The update failed, latest root version is v1
self._assert_files_exist(["root"])
self._assert_content_equals("root", 1)

def test_new_root_nonconsecutive_version(self) -> None:
# Repository serves non-consecutive root version
self.sim.root.version += 2
self.sim.publish_root()
with self.assertRaises(ReplayedMetadataError):
self._run_refresh()

# The update failed, latest root version is v1
self._assert_files_exist(["root"])
self._assert_content_equals("root", 1)

def test_final_root_expired(self) -> None:
# Check for a freeze attack
# Final root is expired
Expand All @@ -228,12 +262,18 @@ def test_final_root_expired(self) -> None:
with self.assertRaises(ExpiredMetadataError):
self._run_refresh()

# The update failed but final root is persisted on the file system
self._assert_files_exist(["root"])
self._assert_content_equals("root", 2)

def test_new_timestamp_unsigned(self) -> None:
# Check for an arbitrary software attack
self.sim.signers["timestamp"].clear()
with self.assertRaises(UnsignedMetadataError):
self._run_refresh()

self._assert_files_exist(["root"])

def test_new_timestamp_version_rollback(self) -> None:
# Check for a rollback attack
self.sim.timestamp.version = 2
Expand All @@ -243,19 +283,25 @@ def test_new_timestamp_version_rollback(self) -> None:
with self.assertRaises(ReplayedMetadataError):
self._run_refresh()

md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json"))
self.assertEqual(md_timestamp.signed.version, 2)

def test_new_timestamp_snapshot_rollback(self) -> None:
# Check for a rollback attack.
self.sim.snapshot.version = 2
self.sim.update_timestamp()
self.sim.update_timestamp() # timestamp v2
self._run_refresh()

# Snapshot meta version is smaller than previous
self.sim.timestamp.snapshot_meta.version = 1
self.sim.timestamp.version += 1
self.sim.timestamp.version += 1 # timestamp v3

with self.assertRaises(ReplayedMetadataError):
self._run_refresh()

md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json"))
self.assertEqual(md_timestamp.signed.version, 2)

def test_new_timestamp_expired(self) -> None:
# Check for a freeze attack
self.sim.timestamp.expires = self.past_datetime
Expand All @@ -264,6 +310,8 @@ def test_new_timestamp_expired(self) -> None:
with self.assertRaises(ExpiredMetadataError):
self._run_refresh()

self._assert_files_exist(["root"])

def test_new_snapshot_hash_mismatch(self) -> None:
# Check against timestamp role’s snapshot hash

Expand All @@ -283,12 +331,20 @@ def test_new_snapshot_hash_mismatch(self) -> None:
with self.assertRaises(RepositoryError):
self._run_refresh()

md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json"))
self.assertEqual(md_timestamp.signed.version, 3)

md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json"))
self.assertEqual(md_snapshot.signed.version, 1)

def test_new_snapshot_unsigned(self) -> None:
# Check for an arbitrary software attack
self.sim.signers["snapshot"].clear()
with self.assertRaises(UnsignedMetadataError):
self._run_refresh()

self._assert_files_exist(["root", "timestamp"])

# TODO: RepositorySimulator works always with consistent snapshot
# enabled which forces the client to look for the snapshot version
# written in timestamp (which leads to "Unknown snapshot version").
Expand All @@ -315,6 +371,9 @@ def test_new_snapshot_version_rollback(self) -> None:
with self.assertRaises(ReplayedMetadataError):
self._run_refresh()

md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json"))
self.assertEqual(md_snapshot.signed.version, 2)

def test_new_snapshot_expired(self) -> None:
# Check for a freeze attack
self.sim.snapshot.expires = self.past_datetime
Expand All @@ -323,6 +382,9 @@ def test_new_snapshot_expired(self) -> None:
with self.assertRaises(ExpiredMetadataError):
self._run_refresh()

self._assert_files_exist(["root", "timestamp"])


def test_new_targets_hash_mismatch(self) -> None:
# Check against snapshot role’s targets hashes

Expand All @@ -341,12 +403,20 @@ def test_new_targets_hash_mismatch(self) -> None:
with self.assertRaises(RepositoryError):
self._run_refresh()

md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json"))
self.assertEqual(md_snapshot.signed.version, 3)

md_targets = Metadata.from_file(os.path.join(self.metadata_dir, "targets.json"))
self.assertEqual(md_targets.signed.version, 1)

def test_new_targets_unsigned(self) -> None:
# Check for an arbitrary software attack
self.sim.signers["targets"].clear()
with self.assertRaises(UnsignedMetadataError):
self._run_refresh()

self._assert_files_exist(["root", "timestamp", "snapshot"])

# TODO: RepositorySimulator works always with consistent snapshot
# enabled which forces the client to look for the targets version
# written in snapshot (which leads to "Unknown targets version").
Expand All @@ -366,6 +436,8 @@ def test_new_targets_expired(self) -> None:
with self.assertRaises(ExpiredMetadataError):
self._run_refresh()

self._assert_files_exist(["root", "timestamp", "snapshot"])


if __name__ == "__main__":

Expand Down

0 comments on commit 8a2c785

Please sign in to comment.