diff --git a/tests/test_api.py b/tests/test_api.py index dcc6f0f2ee..e13c704242 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -38,8 +38,6 @@ CanonicalJSONSerializer ) -from tuf.api.serialization.util import metadata_to_dict - from securesystemslib.interface import ( import_ed25519_publickey_from_file, import_ed25519_privatekey_from_file @@ -49,7 +47,6 @@ format_keyval_to_metadata ) - logger = logging.getLogger(__name__) @@ -114,9 +111,9 @@ def test_generic_read(self): self.assertTrue( isinstance(metadata_obj2.signed, inner_metadata_cls)) - # ... and are equal (compared by dict representation) - self.assertDictEqual(metadata_to_dict(metadata_obj), - metadata_to_dict(metadata_obj2)) + # ... and return the same object (compared by dict representation) + self.assertDictEqual( + metadata_obj.to_dict(), metadata_obj2.to_dict()) # Assert that it chokes correctly on an unknown metadata type @@ -148,8 +145,9 @@ def test_read_write_read_compare(self): metadata_obj.to_file(path_2) metadata_obj_2 = Metadata.from_file(path_2) - self.assertDictEqual(metadata_to_dict(metadata_obj), - metadata_to_dict(metadata_obj_2)) + self.assertDictEqual( + metadata_obj.to_dict(), + metadata_obj_2.to_dict()) os.remove(path_2) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index eca2bf337b..cdbe0ad9eb 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -23,6 +23,7 @@ import tuf.exceptions + # Types JsonDict = Dict[str, Any] @@ -55,6 +56,49 @@ def __init__(self, signed: 'Signed', signatures: list) -> None: self.signatures = signatures + # Deserialization (factories). + @classmethod + def from_dict(cls, metadata: JsonDict) -> 'Metadata': + """Creates Metadata object from its JSON/dict representation. + + Calls 'from_dict' for any complex metadata attribute represented by a + class also that has a 'from_dict' factory method. (Currently this is + only the signed attribute.) + + Arguments: + metadata: TUF metadata in JSON/dict representation, as e.g. + returned by 'json.loads'. + + Raises: + KeyError: The metadata dict format is invalid. + ValueError: The metadata has an unrecognized signed._type field. + + Returns: + A TUF Metadata object. + + """ + # Dispatch to contained metadata class on metadata _type field. + _type = metadata['signed']['_type'] + + if _type == 'targets': + inner_cls = Targets + elif _type == 'snapshot': + inner_cls = Snapshot + elif _type == 'timestamp': + inner_cls = Timestamp + elif _type == 'root': + inner_cls = Root + else: + raise ValueError(f'unrecognized metadata type "{_type}"') + + # NOTE: If Signature becomes a class, we should iterate over + # metadata['signatures'], call Signature.from_dict for each item, and + # pass a list of Signature objects to the Metadata constructor intead. + return cls( + signed=inner_cls.from_dict(metadata['signed']), + signatures=metadata['signatures']) + + @classmethod def from_file( cls, filename: str, deserializer: MetadataDeserializer = None, @@ -95,6 +139,14 @@ def from_file( return deserializer.deserialize(raw_data) + # Serialization. + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + return { + 'signatures': self.signatures, + 'signed': self.signed.to_dict() + } + def to_file(self, filename: str, serializer: MetadataSerializer = None, storage_backend: StorageBackendInterface = None) -> None: """Writes TUF metadata to file storage. @@ -250,6 +302,39 @@ def __init__( self.version = version + # Deserialization (factories). + @classmethod + def from_dict(cls, signed_dict: JsonDict) -> 'Signed': + """Creates Signed object from its JSON/dict representation. """ + + # Convert 'expires' TUF metadata string to a datetime object, which is + # what the constructor expects and what we store. The inverse operation + # is implemented in 'to_dict'. + signed_dict['expires'] = tuf.formats.expiry_string_to_datetime( + signed_dict['expires']) + # NOTE: We write the converted 'expires' back into 'signed_dict' above + # so that we can pass it to the constructor as '**signed_dict' below, + # along with other fields that belong to Signed subclasses. + # Any 'from_dict'(-like) conversions of fields that correspond to a + # subclass should be performed in the 'from_dict' method of that + # subclass and also be written back into 'signed_dict' before calling + # super().from_dict. + + # NOTE: cls might be a subclass of Signed, if 'from_dict' was called on + # that subclass (see e.g. Metadata.from_dict). + return cls(**signed_dict) + + + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + return { + '_type': self._type, + 'version': self.version, + 'spec_version': self.spec_version, + 'expires': self.expires.isoformat() + 'Z' + } + + # Modification. def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: """Increments the expires attribute by the passed timedelta. """ @@ -261,7 +346,6 @@ def bump_version(self) -> None: self.version += 1 - class Root(Signed): """A container for the signed part of root metadata. @@ -311,6 +395,18 @@ def __init__( self.roles = roles + # Serialization. + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + json_dict = super().to_dict() + json_dict.update({ + 'consistent_snapshot': self.consistent_snapshot, + 'keys': self.keys, + 'roles': self.roles + }) + return json_dict + + # Update key for a role. def add_key(self, role: str, keyid: str, key_metadata: JsonDict) -> None: """Adds new key for 'role' and updates the key store. """ @@ -332,6 +428,7 @@ def remove_key(self, role: str, keyid: str) -> None: + class Timestamp(Signed): """A container for the signed part of timestamp metadata. @@ -359,6 +456,16 @@ def __init__( self.meta = meta + # Serialization. + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + json_dict = super().to_dict() + json_dict.update({ + 'meta': self.meta + }) + return json_dict + + # Modification. def update(self, version: int, length: int, hashes: JsonDict) -> None: """Assigns passed info about snapshot metadata to meta dict. """ @@ -369,7 +476,6 @@ def update(self, version: int, length: int, hashes: JsonDict) -> None: } - class Snapshot(Signed): """A container for the signed part of snapshot metadata. @@ -403,6 +509,15 @@ def __init__( # TODO: Add class for meta self.meta = meta + # Serialization. + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + json_dict = super().to_dict() + json_dict.update({ + 'meta': self.meta + }) + return json_dict + # Modification. def update( @@ -419,7 +534,6 @@ def update( self.meta[metadata_fn]['hashes'] = hashes - class Targets(Signed): """A container for the signed part of targets metadata. @@ -487,6 +601,16 @@ def __init__( self.delegations = delegations + # Serialization. + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + json_dict = super().to_dict() + json_dict.update({ + 'targets': self.targets, + 'delegations': self.delegations, + }) + return json_dict + # Modification. def update(self, filename: str, fileinfo: JsonDict) -> None: """Assigns passed target file info to meta dict. """ diff --git a/tuf/api/pylintrc b/tuf/api/pylintrc index 38562e62ea..badef7613d 100644 --- a/tuf/api/pylintrc +++ b/tuf/api/pylintrc @@ -8,8 +8,5 @@ good-names=e indent-string=" " max-line-length=79 -[CLASSES] -exclude-protected:_type - [DESIGN] min-public-methods=0 diff --git a/tuf/api/serialization/json.py b/tuf/api/serialization/json.py index fd73511455..602fbf59c0 100644 --- a/tuf/api/serialization/json.py +++ b/tuf/api/serialization/json.py @@ -19,8 +19,7 @@ MetadataDeserializer, SignedSerializer, SerializationError, - DeserializationError, - util) + DeserializationError) class JSONDeserializer(MetadataDeserializer): @@ -30,7 +29,7 @@ def deserialize(self, raw_data: bytes) -> Metadata: """Deserialize utf-8 encoded JSON bytes into Metadata object. """ try: _dict = json.loads(raw_data.decode("utf-8")) - return util.metadata_from_dict(_dict) + return Metadata.from_dict(_dict) except Exception as e: # pylint: disable=broad-except six.raise_from(DeserializationError, e) @@ -52,7 +51,7 @@ def serialize(self, metadata_obj: Metadata) -> bytes: try: indent = (None if self.compact else 1) separators=((',', ':') if self.compact else (',', ': ')) - return json.dumps(util.metadata_to_dict(metadata_obj), + return json.dumps(metadata_obj.to_dict(), indent=indent, separators=separators, sort_keys=True).encode("utf-8") @@ -67,7 +66,7 @@ class CanonicalJSONSerializer(SignedSerializer): def serialize(self, signed_obj: Signed) -> bytes: """Serialize Signed object into utf-8 encoded Canonical JSON bytes. """ try: - signed_dict = util.signed_to_dict(signed_obj) + signed_dict = signed_obj.to_dict() return encode_canonical(signed_dict).encode("utf-8") except Exception as e: # pylint: disable=broad-except diff --git a/tuf/api/serialization/util.py b/tuf/api/serialization/util.py deleted file mode 100644 index db06e92530..0000000000 --- a/tuf/api/serialization/util.py +++ /dev/null @@ -1,149 +0,0 @@ -"""Utility functions to facilitate TUF metadata de/serialization. - -Currently, this module contains functions to convert between the TUF metadata -class model and a corresponding dictionary representation. - -""" - - -from typing import Any, Dict, Mapping - -from tuf import formats -from tuf.api.metadata import (Signed, Metadata, Root, Timestamp, Snapshot, - Targets) - - -def _get_signed_common_args_from_dict(_dict: Mapping[str, Any]) -> list: - """Returns ordered positional arguments for 'Signed' subclass constructors. - - See '{root, timestamp, snapshot, targets}_from_dict' functions for usage. - - """ - _type = _dict.pop("_type") - version = _dict.pop("version") - spec_version = _dict.pop("spec_version") - expires_str = _dict.pop("expires") - expires = formats.expiry_string_to_datetime(expires_str) - return [_type, version, spec_version, expires] - - -def root_from_dict(_dict: Mapping[str, Any]) -> Root: - """Returns 'Root' object based on its dict representation. """ - common_args = _get_signed_common_args_from_dict(_dict) - consistent_snapshot = _dict.pop("consistent_snapshot") - keys = _dict.pop("keys") - roles = _dict.pop("roles") - return Root(*common_args, consistent_snapshot, keys, roles) - - -def timestamp_from_dict(_dict: Mapping[str, Any]) -> Timestamp: - """Returns 'Timestamp' object based on its dict representation. """ - common_args = _get_signed_common_args_from_dict(_dict) - meta = _dict.pop("meta") - return Timestamp(*common_args, meta) - - -def snapshot_from_dict(_dict: Mapping[str, Any]) -> Snapshot: - """Returns 'Snapshot' object based on its dict representation. """ - common_args = _get_signed_common_args_from_dict(_dict) - meta = _dict.pop("meta") - return Snapshot(*common_args, meta) - - -def targets_from_dict(_dict: Mapping[str, Any]) -> Targets: - """Returns 'Targets' object based on its dict representation. """ - common_args = _get_signed_common_args_from_dict(_dict) - targets = _dict.pop("targets") - delegations = _dict.pop("delegations") - return Targets(*common_args, targets, delegations) - -def signed_from_dict(_dict) -> Signed: - """Returns 'Signed'-subclass object based on its dict representation. """ - # Dispatch to '*_from_dict'-function based on '_type' field. - # TODO: Use if/else cascade, if easier to read! - # TODO: Use constants for types! (e.g. Root._type, Targets._type, etc.) - return { - "root": root_from_dict, - "timestamp": timestamp_from_dict, - "snapshot": snapshot_from_dict, - "targets": targets_from_dict - }[_dict["_type"]](_dict) - - -def metadata_from_dict(_dict: Mapping[str, Any]) -> Metadata: - """Returns 'Metadata' object based on its dict representation. """ - signed_dict = _dict.pop("signed") - signatures = _dict.pop("signatures") - return Metadata(signatures=signatures, - signed=signed_from_dict(signed_dict)) - - -def _get_signed_common_fields_as_dict(obj: Signed) -> Dict[str, Any]: - """Returns dict representation of 'Signed' object. - - See '{root, timestamp, snapshot, targets}_to_dict' functions for usage. - - """ - return { - "_type": obj._type, - "version": obj.version, - "spec_version": obj.spec_version, - "expires": obj.expires.isoformat() + "Z" - } - - -def root_to_dict(obj: Root) -> Dict[str, Any]: - """Returns dict representation of 'Root' object. """ - _dict = _get_signed_common_fields_as_dict(obj) - _dict.update({ - "consistent_snapshot": obj.consistent_snapshot, - "keys": obj.keys, - "roles": obj.roles - }) - return _dict - - -def timestamp_to_dict(obj: Timestamp) -> Dict[str, Any]: - """Returns dict representation of 'Timestamp' object. """ - _dict = _get_signed_common_fields_as_dict(obj) - _dict.update({ - "meta": obj.meta - }) - return _dict - - -def snapshot_to_dict(obj: Snapshot) -> Dict[str, Any]: - """Returns dict representation of 'Snapshot' object. """ - _dict = _get_signed_common_fields_as_dict(obj) - _dict.update({ - "meta": obj.meta - }) - return _dict - - -def targets_to_dict(obj: Targets) -> Dict[str, Any]: - """Returns dict representation of 'Targets' object. """ - _dict = _get_signed_common_fields_as_dict(obj) - _dict.update({ - "targets": obj.targets, - "delegations": obj.delegations, - }) - return _dict - -def signed_to_dict(obj: Signed) -> Dict[str, Any]: - """Returns dict representation of 'Signed'-subclass object. """ - # Dispatch to '*_to_dict'-function based on 'Signed' subclass type. - # TODO: Use if/else cascade, if easier to read! - return { - Root: root_to_dict, - Timestamp: timestamp_to_dict, - Snapshot: snapshot_to_dict, - Targets: targets_to_dict - }[obj.__class__](obj) - -def metadata_to_dict(obj: Metadata) -> Dict[str, Any]: - """Returns dict representation of 'Metadata' object. """ - return { - "signatures": obj.signatures, - "signed": signed_to_dict(obj.signed) - }