diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000000..e21514af76 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python + +# Copyright 2020, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 +""" Unit tests for api/metdata.py +""" +import logging +import os +import shutil +import tempfile +import unittest + +from datetime import timedelta +from dateutil.relativedelta import relativedelta + +from tuf.api.metadata import ( + Snapshot, + Timestamp, +) + +logger = logging.getLogger(__name__) + + +class TestMetadata(unittest.TestCase): + # TODO: Start Vault in a dev mode, and export VAULT_ADDR as well as VAULT_TOKEN. + # TODO: Enable the Vault Transit secrets engine. + @classmethod + def setUpClass(cls): + + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownClass() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + test_repo_data = os.path.join( + os.path.dirname(os.path.realpath(__file__)), 'repository_data') + + cls.repo_dir = os.path.join(cls.temporary_directory, 'repository') + shutil.copytree(os.path.join(test_repo_data, 'repository'), cls.repo_dir) + + cls.keystore_dir = os.path.join(cls.temporary_directory, 'keystore') + shutil.copytree(os.path.join(test_repo_data, 'keystore'), cls.keystore_dir) + + + # TODO: Shut down Vault. + @classmethod + def tearDownClass(cls): + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated for the test cases. + shutil.rmtree(cls.temporary_directory) + + + + # def _load_key_ring(self): + # key_list = [] + # root_key = RAMKey.read_from_file(os.path.join(self.keystore_dir, 'root_key'), + # 'rsassa-pss-sha256', 'password') + # key_list.append(root_key) + + # for key_file in os.listdir(self.keystore_dir): + # if key_file.endswith('.pub'): + # # ignore public keys + # continue + + # if key_file.startswith('root_key'): + # # root key is loaded + # continue + + # key = RAMKey.read_from_file(os.path.join(self.keystore_dir, key_file), + # 'ed25519', 'password') + # key_list.append(key) + # threshold = Threshold(1, 5) + # return KeyRing(threshold=threshold, keys=key_list) + + def test_metadata_base(self): + # Use of Snapshot is arbitrary, we're just testing the base class features + # with real data + snapshot_path = os.path.join(self.repo_dir, 'metadata', 'snapshot.json') + md = Snapshot.read_from_json(snapshot_path) + + self.assertEqual(md.signed.version, 1) + md.signed.bump_version() + self.assertEqual(md.signed.version, 2) + self.assertEqual(md.signed.expires, '2030-01-01T00:00:00Z') + md.signed.bump_expiration() + self.assertEqual(md.signed.expires, '2030-01-02T00:00:00Z') + md.signed.bump_expiration(timedelta(days=365)) + self.assertEqual(md.signed.expires, '2031-01-02T00:00:00Z') + + + def test_metadata_snapshot(self): + snapshot_path = os.path.join(self.repo_dir, 'metadata', 'snapshot.json') + snapshot = Snapshot.read_from_json(snapshot_path) + + # key_ring = self._load_key_ring() + # snapshot.verify(key_ring) + + # Create a dict representing what we expect the updated data to be + fileinfo = snapshot.signed.meta + hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'} + fileinfo['role1.json']['version'] = 2 + fileinfo['role1.json']['hashes'] = hashes + fileinfo['role1.json']['length'] = 123 + + snapshot.signed.update('role1', 2, 123, hashes) + self.assertEqual(snapshot.signed.meta, fileinfo) + + # snapshot.signable() + + # snapshot.sign() + + # snapshot.verify() + + # snapshot.write_to_json(os.path.join(cls.temporary_directory, 'api_snapshot.json')) + + + def test_metadata_timestamp(self): + timestamp_path = os.path.join(self.repo_dir, 'metadata', 'timestamp.json') + timestamp = Timestamp.read_from_json(timestamp_path) + + # key_ring = self._load_key_ring() + # timestamp.verify(key_ring) + + self.assertEqual(timestamp.signed.version, 1) + timestamp.signed.bump_version() + self.assertEqual(timestamp.signed.version, 2) + + self.assertEqual(timestamp.signed.expires, '2030-01-01T00:00:00Z') + timestamp.signed.bump_expiration() + self.assertEqual(timestamp.signed.expires, '2030-01-02T00:00:00Z') + timestamp.signed.bump_expiration(timedelta(days=365)) + self.assertEqual(timestamp.signed.expires, '2031-01-02T00:00:00Z') + + # Test whether dateutil.relativedelta works, this provides a much easier to + # use interface for callers + delta = relativedelta(days=1) + timestamp.signed.bump_expiration(delta) + self.assertEqual(timestamp.signed.expires, '2031-01-03T00:00:00Z') + delta = relativedelta(years=5) + timestamp.signed.bump_expiration(delta) + self.assertEqual(timestamp.signed.expires, '2036-01-03T00:00:00Z') + + hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'} + fileinfo = timestamp.signed.meta['snapshot.json'] + fileinfo['hashes'] = hashes + fileinfo['version'] = 2 + fileinfo['length'] = 520 + timestamp.signed.update(2, 520, hashes) + self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo) + + # timestamp.sign() + + # timestamp.write_to_json() + + +# Run unit test. +if __name__ == '__main__': + unittest.main() diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py new file mode 100644 index 0000000000..b059a1358f --- /dev/null +++ b/tuf/api/metadata.py @@ -0,0 +1,276 @@ +"""TUF role metadata model. + +This module provides container classes for TUF role metadata, including methods +to read/serialize/write from and to JSON, perform TUF-compliant metadata +updates, and create and verify signatures. + +TODO: + + * Add docstrings + + * Finalize/Document Verify/Sign functions (I am not fully sure about expected + behavior) + + * Validation (some thoughts ...) + - Avoid schema, see secure-systems-lab/securesystemslib#183 + - Provide methods to validate JSON representation (at user boundary) + - Fail on bad json metadata in read_from_json method + - Be lenient on bad/invalid metadata objects in memory, they might be + work in progress. E.g. it might be convenient to create empty metadata + and assign attributes later on. + - Fail on bad json metadata in write_to_json method, but with option to + disable check as there might be a justified reason to write WIP + metadata to json. + + * It might be nice to have a generic Metadata.read_from_json that + can load any TUF role metadata and instantiate the appropriate object based + on the json '_type' field. + + * Add Root metadata class + +""" +# Imports + +from datetime import datetime, timedelta +from typing import Any, Dict, Optional + +import json +import logging +import tempfile + +from securesystemslib.formats import encode_canonical +from securesystemslib.util import load_json_file, persist_temp_file +from securesystemslib.storage import StorageBackendInterface +from tuf.repository_lib import ( + _get_written_metadata, + _strip_version_number +) + +import iso8601 +import tuf.formats + + +# Types + +JsonDict = Dict[str, Any] + + +# Classes. + +class Metadata(): + def __init__( + self, signed: 'Signed' = None, signatures: list = None) -> None: + # TODO: How much init magic do we want? + self.signed = signed + self.signatures = signatures + + def as_dict(self) -> JsonDict: + return { + 'signatures': self.signatures, + 'signed': self.signed.as_dict() + } + + def __update_signature(self, signatures, keyid, signature): + updated = False + keyid_signature = {'keyid':keyid, 'sig':signature} + for idx, keyid_sig in enumerate(signatures): + if keyid_sig['keyid'] == keyid: + signatures[idx] = keyid_signature + updated = True + if not updated: + signatures.append(keyid_signature) + + # def sign(self, key_ring: ???) -> JsonDict: + # # FIXME: Needs documentation of expected behavior + # signed_bytes = self.signed_bytes + # signatures = self.__signatures + + # for key in key_ring.keys: + # signature = key.sign(self.signed_bytes) + # self.__update_signature(signatures, key.keyid, signature) + + # self.__signatures = signatures + # return self.signable + + # def verify(self, key_ring: ???) -> bool: + # # FIXME: Needs documentation of expected behavior + # signed_bytes = self.signed.signed_bytes + # signatures = self.signatures + # verified_keyids = set() + + # for signature in signatures: + # # TODO: handle an empty keyring + # for key in key_ring.keys: + # keyid = key.keyid + # if keyid == signature['keyid']: + # try: + # verified = key.verify(signed_bytes, signature) + # except: + # logging.exception(f'Could not verify signature for key {keyid}') + # continue + # else: + # # Avoid https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-6174 + # verified_keyids.add(keyid) + + # break + + # return len(verified_keyids) >= key_ring.threshold.least + + def write_to_json( + self, filename: str, + storage_backend: StorageBackendInterface = None) -> None: + with tempfile.TemporaryFile() as f: + f.write(_get_written_metadata(self.sign()).encode_canonical()) + persist_temp_file(f, filename, storage_backend) + + +class Signed: + # NOTE: Signed is a stupid name, because this might not be signed yet, but + # we keep it to match spec terminology (I often refer to this as "payload", + # or "inner metadata") + + # TODO: Re-think default values. It might be better to pass some things + # as args and not es kwargs. Then we'd need to pop those from + # signable["signed"] in read_from_json and pass them explicitly, which + # some say is better than implicit. :) + def __init__( + self, _type: str = None, version: int = 0, + spec_version: str = None, expires: datetime = None + ) -> None: + # TODO: How much init magic do we want? + + self._type = _type + self.spec_version = spec_version + + # We always intend times to be UTC + # NOTE: we could do this with datetime.fromisoformat() but that is not + # available in Python 2.7's datetime + # NOTE: Store as datetime object for convenient handling, use 'expires' + # property to get the TUF metadata format representation + self.__expiration = iso8601.parse_date(expires).replace(tzinfo=None) + + if version < 0: + raise ValueError(f'version must be < 0, got {version}') + self.version = version + + @property + def signed_bytes(self) -> bytes: + return encode_canonical(self.as_dict()).encode('UTF-8') + + @property + def expires(self) -> str: + """The expiration property in TUF metadata format.""" + return self.__expiration.isoformat() + 'Z' + + def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: + self.__expiration = self.__expiration + delta + + def bump_version(self) -> None: + self.version += 1 + + def as_dict(self) -> JsonDict: + # NOTE: The classes should be the single source of truth about metadata + # let's define the dict representation here and not in some dubious + # build_dict_conforming_to_schema + return { + '_type': self._type, + 'version': self.version, + 'spec_version': self.spec_version, + 'expires': self.expires + } + + @classmethod + def read_from_json( + cls, filename: str, + storage_backend: Optional[StorageBackendInterface] = None + ) -> Metadata: + signable = load_json_file(filename, storage_backend) + + # FIXME: It feels dirty to access signable["signed"]["version"] here in + # order to do this check, and also a bit random (there are likely other + # things to check), but later we don't have the filename anymore. If we + # want to stick to the check, which seems reasonable, we should maybe + # think of a better place. + _, fn_prefix = _strip_version_number(filename, True) + if fn_prefix and fn_prefix != signable['signed']['version']: + raise ValueError( + f'version filename prefix ({fn_prefix}) must align with ' + f'version in metadata ({signable["signed"]["version"]}).') + + return Metadata( + signed=cls(**signable['signed']), + signatures=signable['signatures']) + + +class Timestamp(Signed): + def __init__(self, meta: JsonDict = None, **kwargs) -> None: + super().__init__(**kwargs) + # TODO: How much init magic do we want? + # TODO: Is there merit in creating classes for dict fields? + self.meta = meta + + def as_dict(self) -> JsonDict: + json_dict = super().as_dict() + json_dict.update({ + 'meta': self.meta + }) + return json_dict + + # Update metadata about the snapshot metadata. + def update(self, version: int, length: int, hashes: JsonDict) -> None: + fileinfo = self.meta.get('snapshot.json', {}) + fileinfo['version'] = version + fileinfo['length'] = length + fileinfo['hashes'] = hashes + self.meta['snapshot.json'] = fileinfo + + +class Snapshot(Signed): + def __init__(self, meta: JsonDict = None, **kwargs) -> None: + # TODO: How much init magic do we want? + # TODO: Is there merit in creating classes for dict fields? + super().__init__(**kwargs) + self.meta = meta + + def as_dict(self) -> JsonDict: + json_dict = super().as_dict() + json_dict.update({ + 'meta': self.meta + }) + return json_dict + + # Add or update metadata about the targets metadata. + def update( + self, rolename: str, version: int, length: Optional[int] = None, + hashes: Optional[JsonDict] = None) -> None: + metadata_fn = f'{rolename}.json' + + self.meta[metadata_fn] = {'version': version} + if length is not None: + self.meta[metadata_fn]['length'] = length + + if hashes is not None: + self.meta[metadata_fn]['hashes'] = hashes + + +class Targets(Signed): + def __init__( + self, targets: JsonDict = None, delegations: JsonDict = None, + **kwargs) -> None: + # TODO: How much init magic do we want? + # TODO: Is there merit in creating classes for dict fields? + super().__init__(**kwargs) + self.targets = targets + self.delegations = delegations + + def as_dict(self) -> JsonDict: + json_dict = super().as_dict() + json_dict.update({ + 'targets': self.targets, + 'delegations': self.delegations, + }) + return json_dict + + # Add or update metadata about the target. + def update(self, filename: str, fileinfo: JsonDict) -> None: + self.targets[filename] = fileinfo