diff --git a/tests/test_tuf_api.py b/tests/test_tuf_api.py index ef1b185543..c7b15671de 100644 --- a/tests/test_tuf_api.py +++ b/tests/test_tuf_api.py @@ -75,8 +75,8 @@ def tearDownClass(cls): def _load_key_ring(self): key_list = [] - root_key = keys.read_key(os.path.join(self.keystore_dir, 'root_key'), - 'RSA', 'password') + root_key = keys.RAMKey.read_from_file(os.path.join(self.keystore_dir, 'root_key'), + 'RSA', 'password') key_list.append(root_key) for key_file in os.listdir(self.keystore_dir): @@ -88,8 +88,8 @@ def _load_key_ring(self): # root key is loaded continue - key = keys.read_key(os.path.join(self.keystore_dir, key_file), 'ED25519', - 'password') + key = keys.RAMKey.read_from_file(os.path.join(self.keystore_dir, key_file), + 'ED25519', 'password') key_list.append(key) threshold = keys.Threshold(1, 1) return keys.KeyRing(threshold=threshold, keys=key_list) @@ -188,21 +188,20 @@ def test_Threshold(self): # test default values keys.Threshold() # test correct arguments - keys.Threshold(min_=4, max_=5) + keys.Threshold(least=4, most=5) # test incorrect input - # TODO raise sslib.FormatError or ValueError instead of AssertionErrors - self.assertRaises(AssertionError, keys.Threshold, 5, 4) - self.assertRaises(AssertionError, keys.Threshold, 0, 5) - self.assertRaises(AssertionError, keys.Threshold, 5, 0) + self.assertRaises(ValueError, keys.Threshold, 5, 4) + self.assertRaises(ValueError, keys.Threshold, 0, 5) + self.assertRaises(ValueError, keys.Threshold, 5, 0) def test_KeyRing(self): key_list = [] - root_key = keys.read_key(os.path.join(self.keystore_dir, 'root_key'), - 'RSA', 'password') - root_key2 = keys.read_key(os.path.join(self.keystore_dir, 'root_key2'), - 'ED25519', 'password') + root_key = keys.RAMKey.read_from_file(os.path.join(self.keystore_dir, 'root_key'), + 'RSA', 'password') + root_key2 = keys.RAMKey.read_from_file(os.path.join(self.keystore_dir, 'root_key2'), + 'ED25519', 'password') key_list.append(root_key) key_list.append(root_key2) threshold = keys.Threshold() @@ -211,12 +210,12 @@ def test_KeyRing(self): self.assertEqual(keyring.keys, key_list) -def test_read_key(self): +def test_RAMKey_read_from_file(self): filename = os.path.join(self.keystore_dir, 'root_key') algorithm = 'RSA' passphrase = 'password' - self.assertTrue(isinstance(keys.read_key(filename, algorithm, passphrase), keys.RAMKey)) + self.assertTrue(isinstance(keys.RAMKey.read_from_file(filename, algorithm, passphrase), keys.RAMKey)) # TODO: # def test_RAMKey(self): diff --git a/tuf/api/keys.py b/tuf/api/keys.py index 0ab0f21ac0..874acff22c 100644 --- a/tuf/api/keys.py +++ b/tuf/api/keys.py @@ -6,7 +6,6 @@ from typing import Any, List, Optional import logging -import os # 3rd-party. from securesystemslib.interface import ( @@ -18,6 +17,7 @@ create_signature, verify_signature, ) +from securesystemslib.storage import StorageBackendInterface # Generic classes. @@ -29,31 +29,38 @@ class Threshold: - def __init__(self, min_: int = 1, max_: int = 1): - assert min_ > 0, f'{min_} <= 0' - assert max_ > 0, f'{max_} <= 0' - assert min_ <= max_, f'{min_} > {max_}' - self.min = min_ - self.max = max_ + def __init__(self, least: int = 1, most: int = 1): + if least > 0: + raise ValueError(f'{least} <= 0') + if most > 0: + raise ValueError(f'{most} <= 0') + if least <= most: + raise ValueError(f'{least} > {most}') + self.least = least + self.most = most class Key(ABC): @abstractmethod def __init__(self) -> None: - raise NotImplementedError() + raise NotImplementedError + + @classmethod + def read_from_file(cls, filename: str, algorithm: str, passphrase: Optional[str] = None, storage_backend: Optional[StorageBackendInterface] = None) -> Key: + raise NotImplementedError @property @abstractmethod def keyid(self) -> str: - raise NotImplementedError() + raise NotImplementedError @abstractmethod def sign(self, signed: str) -> str: - raise NotImplementedError() + raise NotImplementedError @abstractmethod def verify(self, signed: str, signature: str) -> bool: - raise NotImplementedError() + raise NotImplementedError Keys = List[Key] @@ -74,6 +81,13 @@ class RAMKey(Key): def __init__(self, obj: Any) -> None: # pylint: disable=super-init-not-called self.__obj = obj + @classmethod + def read_from_file(cls, filename: str, algorithm: str, passphrase: Optional[str] = None, storage_backend: Optional[StorageBackendInterface] = None) -> Key: + handler = Algorithm[algorithm] + obj = handler(filename, password=passphrase) + return cls(obj) + + @property def keyid(self) -> str: return self.__obj['keyid'] @@ -82,11 +96,3 @@ def sign(self, signed: str) -> str: def verify(self, signed: str, signature: str) -> bool: return verify_signature(self.__obj, signature, signed) - - -# Utility functions. - -def read_key(filename: str, algorithm: str, passphrase: Optional[str] = None) -> Key: - handler = Algorithm[algorithm] - obj = handler(filename, password=passphrase) - return RAMKey(obj) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index a1dc2c185a..d9a4a688cf 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -4,6 +4,7 @@ from tuf.api.keys import KeyRing # 2nd-party. +from abc import ABC, abstractmethod from datetime import datetime, timedelta from typing import Any, Dict, List, Optional @@ -11,11 +12,10 @@ import tempfile # 3rd-party. -import iso8601 + from securesystemslib.formats import encode_canonical from securesystemslib.util import load_json_file, persist_temp_file from securesystemslib.storage import StorageBackendInterface -import tuf.formats from tuf.repository_lib import ( _get_written_metadata, _strip_version_number, @@ -24,29 +24,32 @@ generate_timestamp_metadata, ) +import iso8601 +import tuf.formats + # Types. JsonDict = Dict[str, Any] # Classes. -class Metadata: +class Metadata(ABC): # By default, a Metadata would be a rather empty one. def __init__(self, consistent_snapshot: bool = True, expiration: datetime = datetime.today(), keyring: Optional[KeyRing] = None, version: int = 1) -> None: - self.consistent_snapshot = consistent_snapshot + self.__consistent_snapshot = consistent_snapshot - self.keyring = keyring - self._expiration = expiration + self.__keyring = keyring + self.__expiration = expiration assert version >= 1, f'{version} < 1' - self.version = version + self.__version = version - self._signed = {} - self._signatures = [] + self.__signed = {} + self.__signatures = [] # And you would use this method to populate it from a file. @classmethod - def read_from_json(cls, filename: str, storage_backend: StorageBackendInterface = None) -> None: + def read_from_json(cls, filename: str, storage_backend: Optional[StorageBackendInterface] = None) -> Metadata: signable = load_json_file(filename, storage_backend) tuf.formats.SIGNABLE_SCHEMA.check_match(signable) @@ -61,7 +64,7 @@ def read_from_json(cls, filename: str, storage_backend: StorageBackendInterface fn, fn_ver = _strip_version_number(filename, True) if fn_ver: - assert fn_ver == self.version, f'{fn_ver} != {self.version}' + assert fn_ver == self.__version, f'{fn_ver} != {self.__version}' consistent_snapshot = True else: consistent_snapshot = False @@ -85,52 +88,53 @@ def signed_bytes(self) -> bytes: return encode_canonical(self.signed).encode('UTF-8') @property - def signed(self) -> str: + @abstractmethod + def signed(self) -> JsonDict: raise NotImplementedError @property def signatures(self) -> List[JsonDict]: - return self._signatures + return self.__signatures @property def expires(self) -> str: """The expiration property as a string""" - return self._expiration.isoformat()+'Z' + return self.__expiration.isoformat()+'Z' @property def expiration(self) -> datetime: - return self._expiration + return self.__expiration @expiration.setter def expiration(self, datetime) -> None: # We always treat dates as UTC - self._expiration = datetime.replace(tzinfo=None) + self.__expiration = datetime.replace(tzinfo=None) def bump_version(self) -> None: - self.version = self.version + 1 + self.__version = self.__version + 1 def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: - self._expiration = self._expiration + delta + self.__expiration = self.__expiration + delta + + def __update_signature(self, signatures, keyid, signature): + updated = False + keyid_signature = {'keyid':keyid, 'sig':signature} + for idx, keyid_sig in enumerate(signatures): + if keyid_sig['keyid'] == keyid: + signatures[idx] = keyid_signature + updated = True + if not updated: + signatures.append(keyid_signature) def sign(self) -> JsonDict: - def update_signature(signatures, keyid, signature): - updated = False - keyid_signature = {'keyid':keyid, 'sig':signature} - for idx, keyid_sig in enumerate(signatures): - if keyid_sig['keyid'] == keyid: - signatures[idx] = keyid_signature - updated = True - if not updated: - signatures.append(keyid_signature) - signed_bytes = self.signed_bytes - signatures = self._signatures + signatures = self.__signatures - for key in self.keyring.keys: + for key in self.__keyring.keys: signature = key.sign(signed_bytes) - update_signature(signatures, key.keyid, signature) + self.__update_signature(signatures, key.keyid, signature) - self._signatures = signatures + self.__signatures = signatures return self.signable def verify(self) -> bool: @@ -140,7 +144,7 @@ def verify(self) -> bool: for signature in signatures: # TODO: handle an empty keyring - for key in self.keyring.keys: + for key in self.__keyring.keys: keyid = key.keyid if keyid == signature['keyid']: try: @@ -153,7 +157,7 @@ def verify(self) -> bool: verified_keyids |= keyid break - return len(verified_keyids) >= self.keyring.threshold.min + return len(verified_keyids) >= self.__keyring.threshold.least def write_to_json(self, filename: str, storage_backend: StorageBackendInterface = None) -> None: with tempfile.TemporaryFile() as f: @@ -161,12 +165,12 @@ def write_to_json(self, filename: str, storage_backend: StorageBackendInterface persist_temp_file(f, filename, storage_backend) class Timestamp(Metadata): - def __init__(self, consistent_snapshot: bool = True, expiration: datetime = datetime.today(), keyring: KeyRing = None, version: int = 1): + def __init__(self, consistent_snapshot: bool = True, expiration: datetime = datetime.today(), keyring: KeyRing = None, version: int = 1) -> None: super().__init__(consistent_snapshot, expiration, keyring, version) self.snapshot_fileinfo = {} @classmethod - def read_from_json(cls, filename: str) -> None: + def read_from_json(cls, filename: str) -> Metadata: md = Metadata.read_from_json(filename) timestamp = cls(md.consistent_snapshot, md.expiration, md.keyring, md.version) timestamp.snapshot_fileinfo = md._signed['meta'] @@ -177,11 +181,11 @@ def read_from_json(cls, filename: str) -> None: @property def signed(self) -> JsonDict: return tuf.formats.build_dict_conforming_to_schema( - tuf.formats.TIMESTAMP_SCHEMA, version=self.version, + tuf.formats.TIMESTAMP_SCHEMA, version=self.__version, expires=self.expires, meta=self.snapshot_fileinfo) # Update metadata about the snapshot metadata. - def update(self, version: int, length: int, hashes: JsonDict): + def update(self, version: int, length: int, hashes: JsonDict) -> None: fileinfo = self.snapshot_fileinfo.get('snapshot.json', {}) fileinfo['version'] = version fileinfo['length'] = length @@ -189,12 +193,12 @@ def update(self, version: int, length: int, hashes: JsonDict): self.snapshot_fileinfo['snapshot.json'] = fileinfo class Snapshot(Metadata): - def __init__(self, consistent_snapshot: bool = True, expiration: datetime = datetime.today(), keyring: KeyRing = None, version: int = 1): + def __init__(self, consistent_snapshot: bool = True, expiration: datetime = datetime.today(), keyring: KeyRing = None, version: int = 1) -> None: super().__init__(consistent_snapshot, expiration, keyring, version) self.targets_fileinfo = {} @classmethod - def read_from_json(cls, filename: str) -> None: + def read_from_json(cls, filename: str) -> Metadata: md = Metadata.read_from_json(filename) snapshot = cls(md.consistent_snapshot, md.expiration, md.keyring, md.version) meta = md._signed['meta'] @@ -208,39 +212,39 @@ def read_from_json(cls, filename: str) -> None: return snapshot @property - def signed(self): + def signed(self) -> JsonDict: return tuf.formats.build_dict_conforming_to_schema( - tuf.formats.SNAPSHOT_SCHEMA, version=self.version, + tuf.formats.SNAPSHOT_SCHEMA, version=self.__version, expires=self.expires, meta=self.targets_fileinfo) # Add or update metadata about the targets metadata. - def update(self, rolename: str, version: int, length: Optional[int] = None, hashes: Optional[JsonDict] = None): + def update(self, rolename: str, version: int, length: Optional[int] = None, hashes: Optional[JsonDict] = None) -> None: self.targets_fileinfo[f'{rolename}.json'] = tuf.formats.make_metadata_fileinfo(version, length, hashes) class Targets(Metadata): - def __init__(self, consistent_snapshot: bool = True, expiration: datetime = datetime.today(), keyring: KeyRing = None, version: int = 1): + def __init__(self, consistent_snapshot: bool = True, expiration: datetime = datetime.today(), keyring: KeyRing = None, version: int = 1) -> None: super().__init__(consistent_snapshot, expiration, keyring, version) self.targets = {} self.delegations = {} @classmethod - def read_from_json(cls, filename: str) -> None: + def read_from_json(cls, filename: str) -> Metadata: targets = Metadata.read_from_json(filename) - targets.targets = self._signed['targets'] - targets.delegations = self._signed.get('delegations', {}) + targets.targets = self.__signed['targets'] + targets.delegations = self.__signed.get('delegations', {}) tuf.formats.TARGETS_SCHEMA.check_match(targets.signed) targets._signatures = md._signatures return targets @property - def signed(self): + def signed(self) -> JsonDict: return tuf.formats.build_dict_conforming_to_schema( tuf.formats.TARGETS_SCHEMA, - version=self.version, + version=self.__version, expires=self.expires, targets=self.targets, delegations=self.delegations) # Add or update metadata about the target. - def update(self, filename: str, fileinfo: JsonDict): + def update(self, filename: str, fileinfo: JsonDict) -> None: self.targets[filename] = fileinfo