-
Notifications
You must be signed in to change notification settings - Fork 275
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1408 from jku/merge-ngclient
Merge ngclient: a new client library implementation
- Loading branch information
Showing
12 changed files
with
1,839 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
import json | ||
import logging | ||
import os | ||
import shutil | ||
import sys | ||
import tempfile | ||
import unittest | ||
|
||
from tuf import exceptions | ||
from tuf.api.metadata import Metadata | ||
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet | ||
|
||
from tests import utils | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
class TestTrustedMetadataSet(unittest.TestCase): | ||
|
||
def test_update(self): | ||
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') | ||
|
||
with open(os.path.join(repo_dir, "root.json"), "rb") as f: | ||
trusted_set = TrustedMetadataSet(f.read()) | ||
trusted_set.root_update_finished() | ||
|
||
with open(os.path.join(repo_dir, "timestamp.json"), "rb") as f: | ||
trusted_set.update_timestamp(f.read()) | ||
with open(os.path.join(repo_dir, "snapshot.json"), "rb") as f: | ||
trusted_set.update_snapshot(f.read()) | ||
with open(os.path.join(repo_dir, "targets.json"), "rb") as f: | ||
trusted_set.update_targets(f.read()) | ||
with open(os.path.join(repo_dir, "role1.json"), "rb") as f: | ||
trusted_set.update_delegated_targets(f.read(), "role1", "targets") | ||
with open(os.path.join(repo_dir, "role2.json"), "rb") as f: | ||
trusted_set.update_delegated_targets(f.read(), "role2", "role1") | ||
|
||
def test_out_of_order_ops(self): | ||
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') | ||
data={} | ||
for md in ["root", "timestamp", "snapshot", "targets", "role1"]: | ||
with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f: | ||
data[md] = f.read() | ||
|
||
trusted_set = TrustedMetadataSet(data["root"]) | ||
|
||
# Update timestamp before root is finished | ||
with self.assertRaises(RuntimeError): | ||
trusted_set.update_timestamp(data["timestamp"]) | ||
|
||
trusted_set.root_update_finished() | ||
with self.assertRaises(RuntimeError): | ||
trusted_set.root_update_finished() | ||
|
||
# Update snapshot before timestamp | ||
with self.assertRaises(RuntimeError): | ||
trusted_set.update_snapshot(data["snapshot"]) | ||
|
||
trusted_set.update_timestamp(data["timestamp"]) | ||
|
||
# Update targets before snapshot | ||
with self.assertRaises(RuntimeError): | ||
trusted_set.update_targets(data["targets"]) | ||
|
||
trusted_set.update_snapshot(data["snapshot"]) | ||
|
||
#update timestamp after snapshot | ||
with self.assertRaises(RuntimeError): | ||
trusted_set.update_timestamp(data["timestamp"]) | ||
|
||
# Update delegated targets before targets | ||
with self.assertRaises(RuntimeError): | ||
trusted_set.update_delegated_targets(data["role1"], "role1", "targets") | ||
|
||
trusted_set.update_targets(data["targets"]) | ||
trusted_set.update_delegated_targets(data["role1"], "role1", "targets") | ||
|
||
def test_update_with_invalid_json(self): | ||
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') | ||
data={} | ||
for md in ["root", "timestamp", "snapshot", "targets", "role1"]: | ||
with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f: | ||
data[md] = f.read() | ||
|
||
# root.json not a json file at all | ||
with self.assertRaises(exceptions.RepositoryError): | ||
TrustedMetadataSet(b"") | ||
# root.json is invalid | ||
root = Metadata.from_bytes(data["root"]) | ||
root.signed.version += 1 | ||
with self.assertRaises(exceptions.RepositoryError): | ||
TrustedMetadataSet(json.dumps(root.to_dict()).encode()) | ||
|
||
trusted_set = TrustedMetadataSet(data["root"]) | ||
trusted_set.root_update_finished() | ||
|
||
top_level_md = [ | ||
(data["timestamp"], trusted_set.update_timestamp), | ||
(data["snapshot"], trusted_set.update_snapshot), | ||
(data["targets"], trusted_set.update_targets), | ||
] | ||
for metadata, update_func in top_level_md: | ||
# metadata is not json | ||
with self.assertRaises(exceptions.RepositoryError): | ||
update_func(b"") | ||
# metadata is invalid | ||
md = Metadata.from_bytes(metadata) | ||
md.signed.version += 1 | ||
with self.assertRaises(exceptions.RepositoryError): | ||
update_func(json.dumps(md.to_dict()).encode()) | ||
|
||
# metadata is of wrong type | ||
with self.assertRaises(exceptions.RepositoryError): | ||
update_func(data["root"]) | ||
|
||
update_func(metadata) | ||
|
||
|
||
# TODO test updating over initial metadata (new keys, newer timestamp, etc) | ||
# TODO test the actual specification checks | ||
|
||
|
||
if __name__ == '__main__': | ||
utils.configure_test_logging(sys.argv) | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2021, New York University and the TUF contributors | ||
# SPDX-License-Identifier: MIT OR Apache-2.0 | ||
|
||
"""Test Updater class | ||
""" | ||
|
||
import os | ||
import shutil | ||
import tempfile | ||
import logging | ||
import sys | ||
import unittest | ||
import tuf.unittest_toolbox as unittest_toolbox | ||
|
||
from tests import utils | ||
from tuf import ngclient | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class TestUpdater(unittest_toolbox.Modified_TestCase): | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
# Create a temporary directory to store the repository, metadata, and target | ||
# files. 'temporary_directory' must be deleted in TearDownModule() so that | ||
# temporary files are always removed, even when exceptions occur. | ||
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) | ||
|
||
# Needed because in some tests simple_server.py cannot be found. | ||
# The reason is that the current working directory | ||
# has been changed when executing a subprocess. | ||
cls.SIMPLE_SERVER_PATH = os.path.join(os.getcwd(), 'simple_server.py') | ||
|
||
# Launch a SimpleHTTPServer (serves files in the current directory). | ||
# Test cases will request metadata and target files that have been | ||
# pre-generated in 'tuf/tests/repository_data', which will be served | ||
# by the SimpleHTTPServer launched here. The test cases of 'test_updater.py' | ||
# assume the pre-generated metadata files have a specific structure, such | ||
# as a delegated role 'targets/role1', three target files, five key files, | ||
# etc. | ||
cls.server_process_handler = utils.TestServerProcess(log=logger, | ||
server=cls.SIMPLE_SERVER_PATH) | ||
|
||
|
||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
# Cleans the resources and flush the logged lines (if any). | ||
cls.server_process_handler.clean() | ||
|
||
# Remove the temporary repository directory, which should contain all the | ||
# metadata, targets, and key files generated for the test cases | ||
shutil.rmtree(cls.temporary_directory) | ||
|
||
|
||
|
||
def setUp(self): | ||
# We are inheriting from custom class. | ||
unittest_toolbox.Modified_TestCase.setUp(self) | ||
|
||
# Copy the original repository files provided in the test folder so that | ||
# any modifications made to repository files are restricted to the copies. | ||
# The 'repository_data' directory is expected to exist in 'tuf.tests/'. | ||
original_repository_files = os.path.join(os.getcwd(), 'repository_data') | ||
temporary_repository_root = \ | ||
self.make_temp_directory(directory=self.temporary_directory) | ||
|
||
# The original repository, keystore, and client directories will be copied | ||
# for each test case. | ||
original_repository = os.path.join(original_repository_files, 'repository') | ||
original_keystore = os.path.join(original_repository_files, 'keystore') | ||
original_client = os.path.join(original_repository_files, 'client', 'test_repository1', 'metadata', 'current') | ||
|
||
# Save references to the often-needed client repository directories. | ||
# Test cases need these references to access metadata and target files. | ||
self.repository_directory = \ | ||
os.path.join(temporary_repository_root, 'repository') | ||
self.keystore_directory = \ | ||
os.path.join(temporary_repository_root, 'keystore') | ||
|
||
self.client_directory = os.path.join(temporary_repository_root, 'client') | ||
|
||
# Copy the original 'repository', 'client', and 'keystore' directories | ||
# to the temporary repository the test cases can use. | ||
shutil.copytree(original_repository, self.repository_directory) | ||
shutil.copytree(original_client, self.client_directory) | ||
shutil.copytree(original_keystore, self.keystore_directory) | ||
|
||
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. | ||
repository_basepath = self.repository_directory[len(os.getcwd()):] | ||
url_prefix = 'http://' + utils.TEST_HOST_ADDRESS + ':' \ | ||
+ str(self.server_process_handler.port) + repository_basepath | ||
|
||
metadata_url = f"{url_prefix}/metadata/" | ||
targets_url = f"{url_prefix}/targets/" | ||
# Creating a repository instance. The test cases will use this client | ||
# updater to refresh metadata, fetch target files, etc. | ||
self.repository_updater = ngclient.Updater(self.client_directory, | ||
metadata_url, | ||
targets_url) | ||
|
||
def tearDown(self): | ||
# We are inheriting from custom class. | ||
unittest_toolbox.Modified_TestCase.tearDown(self) | ||
|
||
# Logs stdout and stderr from the sever subprocess. | ||
self.server_process_handler.flush_log() | ||
|
||
def test_refresh(self): | ||
# All metadata is in local directory already | ||
self.repository_updater.refresh() | ||
|
||
# Get targetinfo for 'file1.txt' listed in targets | ||
targetinfo1 = self.repository_updater.get_one_valid_targetinfo('file1.txt') | ||
# Get targetinfo for 'file3.txt' listed in the delegated role1 | ||
targetinfo3= self.repository_updater.get_one_valid_targetinfo('file3.txt') | ||
|
||
destination_directory = self.make_temp_directory() | ||
updated_targets = self.repository_updater.updated_targets([targetinfo1, targetinfo3], | ||
destination_directory) | ||
|
||
self.assertListEqual(updated_targets, [targetinfo1, targetinfo3]) | ||
|
||
self.repository_updater.download_target(targetinfo1, destination_directory) | ||
updated_targets = self.repository_updater.updated_targets(updated_targets, | ||
destination_directory) | ||
|
||
self.assertListEqual(updated_targets, [targetinfo3]) | ||
|
||
|
||
self.repository_updater.download_target(targetinfo3, destination_directory) | ||
updated_targets = self.repository_updater.updated_targets(updated_targets, | ||
destination_directory) | ||
|
||
self.assertListEqual(updated_targets, []) | ||
|
||
def test_refresh_with_only_local_root(self): | ||
os.remove(os.path.join(self.client_directory, "timestamp.json")) | ||
os.remove(os.path.join(self.client_directory, "snapshot.json")) | ||
os.remove(os.path.join(self.client_directory, "targets.json")) | ||
os.remove(os.path.join(self.client_directory, "role1.json")) | ||
|
||
self.repository_updater.refresh() | ||
|
||
# Get targetinfo for 'file3.txt' listed in the delegated role1 | ||
targetinfo3= self.repository_updater.get_one_valid_targetinfo('file3.txt') | ||
|
||
if __name__ == '__main__': | ||
utils.configure_test_logging(sys.argv) | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.