diff --git a/tests/test_api.py b/tests/test_api.py index f5134cd905..700228cfd5 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -41,6 +41,7 @@ def setUpModule(): from tuf.api.serialization import ( DeserializationError ) + from tuf.api.serialization.util import metadata_to_dict from tuf.api.serialization.json import ( JSONSerializer, JSONDeserializer, @@ -120,9 +121,9 @@ def test_generic_read(self): self.assertTrue( isinstance(metadata_obj2.signed, inner_metadata_cls)) - # ... and return the same object (compared by dict representation) - self.assertDictEqual( - metadata_obj.to_dict(), metadata_obj2.to_dict()) + # ... and are equal (compared by dict representation) + self.assertDictEqual(metadata_to_dict(metadata_obj), + metadata_to_dict(metadata_obj2)) # Assert that it chokes correctly on an unknown metadata type @@ -154,9 +155,8 @@ def test_read_write_read_compare(self): metadata_obj.to_file(path_2) metadata_obj_2 = Metadata.from_file(path_2) - self.assertDictEqual( - metadata_obj.to_dict(), - metadata_obj_2.to_dict()) + self.assertDictEqual(metadata_to_dict(metadata_obj), + metadata_to_dict(metadata_obj_2)) os.remove(path_2) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index fa4151f6de..eca2bf337b 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -9,7 +9,6 @@ from datetime import datetime, timedelta from typing import Any, Dict, Optional -import json import tempfile from securesystemslib.util import persist_temp_file @@ -24,7 +23,6 @@ import tuf.exceptions - # Types JsonDict = Dict[str, Any] @@ -57,49 +55,6 @@ def __init__(self, signed: 'Signed', signatures: list) -> None: self.signatures = signatures - # Deserialization (factories). - @classmethod - def from_dict(cls, metadata: JsonDict) -> 'Metadata': - """Creates Metadata object from its JSON/dict representation. - - Calls 'from_dict' for any complex metadata attribute represented by a - class also that has a 'from_dict' factory method. (Currently this is - only the signed attribute.) - - Arguments: - metadata: TUF metadata in JSON/dict representation, as e.g. - returned by 'json.loads'. - - Raises: - KeyError: The metadata dict format is invalid. - ValueError: The metadata has an unrecognized signed._type field. - - Returns: - A TUF Metadata object. - - """ - # Dispatch to contained metadata class on metadata _type field. - _type = metadata['signed']['_type'] - - if _type == 'targets': - inner_cls = Targets - elif _type == 'snapshot': - inner_cls = Snapshot - elif _type == 'timestamp': - inner_cls = Timestamp - elif _type == 'root': - inner_cls = Root - else: - raise ValueError(f'unrecognized metadata type "{_type}"') - - # NOTE: If Signature becomes a class, we should iterate over - # metadata['signatures'], call Signature.from_dict for each item, and - # pass a list of Signature objects to the Metadata constructor intead. - return cls( - signed=inner_cls.from_dict(metadata['signed']), - signatures=metadata['signatures']) - - @classmethod def from_file( cls, filename: str, deserializer: MetadataDeserializer = None, @@ -140,24 +95,6 @@ def from_file( return deserializer.deserialize(raw_data) - # Serialization. - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ - return { - 'signatures': self.signatures, - 'signed': self.signed.to_dict() - } - - - def to_json(self, compact: bool = False) -> None: - """Returns the optionally compacted JSON representation of self. """ - return json.dumps( - self.to_dict(), - indent=(None if compact else 1), - separators=((',', ':') if compact else (',', ': ')), - sort_keys=True) - - def to_file(self, filename: str, serializer: MetadataSerializer = None, storage_backend: StorageBackendInterface = None) -> None: """Writes TUF metadata to file storage. @@ -313,39 +250,6 @@ def __init__( self.version = version - # Deserialization (factories). - @classmethod - def from_dict(cls, signed_dict: JsonDict) -> 'Signed': - """Creates Signed object from its JSON/dict representation. """ - - # Convert 'expires' TUF metadata string to a datetime object, which is - # what the constructor expects and what we store. The inverse operation - # is implemented in 'to_dict'. - signed_dict['expires'] = tuf.formats.expiry_string_to_datetime( - signed_dict['expires']) - # NOTE: We write the converted 'expires' back into 'signed_dict' above - # so that we can pass it to the constructor as '**signed_dict' below, - # along with other fields that belong to Signed subclasses. - # Any 'from_dict'(-like) conversions of fields that correspond to a - # subclass should be performed in the 'from_dict' method of that - # subclass and also be written back into 'signed_dict' before calling - # super().from_dict. - - # NOTE: cls might be a subclass of Signed, if 'from_dict' was called on - # that subclass (see e.g. Metadata.from_dict). - return cls(**signed_dict) - - - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ - return { - '_type': self._type, - 'version': self.version, - 'spec_version': self.spec_version, - 'expires': self.expires.isoformat() + 'Z' - } - - # Modification. def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: """Increments the expires attribute by the passed timedelta. """ @@ -357,6 +261,7 @@ def bump_version(self) -> None: self.version += 1 + class Root(Signed): """A container for the signed part of root metadata. @@ -406,18 +311,6 @@ def __init__( self.roles = roles - # Serialization. - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ - json_dict = super().to_dict() - json_dict.update({ - 'consistent_snapshot': self.consistent_snapshot, - 'keys': self.keys, - 'roles': self.roles - }) - return json_dict - - # Update key for a role. def add_key(self, role: str, keyid: str, key_metadata: JsonDict) -> None: """Adds new key for 'role' and updates the key store. """ @@ -439,7 +332,6 @@ def remove_key(self, role: str, keyid: str) -> None: - class Timestamp(Signed): """A container for the signed part of timestamp metadata. @@ -467,16 +359,6 @@ def __init__( self.meta = meta - # Serialization. - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ - json_dict = super().to_dict() - json_dict.update({ - 'meta': self.meta - }) - return json_dict - - # Modification. def update(self, version: int, length: int, hashes: JsonDict) -> None: """Assigns passed info about snapshot metadata to meta dict. """ @@ -487,6 +369,7 @@ def update(self, version: int, length: int, hashes: JsonDict) -> None: } + class Snapshot(Signed): """A container for the signed part of snapshot metadata. @@ -520,15 +403,6 @@ def __init__( # TODO: Add class for meta self.meta = meta - # Serialization. - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ - json_dict = super().to_dict() - json_dict.update({ - 'meta': self.meta - }) - return json_dict - # Modification. def update( @@ -545,6 +419,7 @@ def update( self.meta[metadata_fn]['hashes'] = hashes + class Targets(Signed): """A container for the signed part of targets metadata. @@ -612,16 +487,6 @@ def __init__( self.delegations = delegations - # Serialization. - def to_dict(self) -> JsonDict: - """Returns the JSON-serializable dictionary representation of self. """ - json_dict = super().to_dict() - json_dict.update({ - 'targets': self.targets, - 'delegations': self.delegations, - }) - return json_dict - # Modification. def update(self, filename: str, fileinfo: JsonDict) -> None: """Assigns passed target file info to meta dict. """ diff --git a/tuf/api/pylintrc b/tuf/api/pylintrc index badef7613d..38562e62ea 100644 --- a/tuf/api/pylintrc +++ b/tuf/api/pylintrc @@ -8,5 +8,8 @@ good-names=e indent-string=" " max-line-length=79 +[CLASSES] +exclude-protected:_type + [DESIGN] min-public-methods=0 diff --git a/tuf/api/serialization/json.py b/tuf/api/serialization/json.py index 602fbf59c0..fd73511455 100644 --- a/tuf/api/serialization/json.py +++ b/tuf/api/serialization/json.py @@ -19,7 +19,8 @@ MetadataDeserializer, SignedSerializer, SerializationError, - DeserializationError) + DeserializationError, + util) class JSONDeserializer(MetadataDeserializer): @@ -29,7 +30,7 @@ def deserialize(self, raw_data: bytes) -> Metadata: """Deserialize utf-8 encoded JSON bytes into Metadata object. """ try: _dict = json.loads(raw_data.decode("utf-8")) - return Metadata.from_dict(_dict) + return util.metadata_from_dict(_dict) except Exception as e: # pylint: disable=broad-except six.raise_from(DeserializationError, e) @@ -51,7 +52,7 @@ def serialize(self, metadata_obj: Metadata) -> bytes: try: indent = (None if self.compact else 1) separators=((',', ':') if self.compact else (',', ': ')) - return json.dumps(metadata_obj.to_dict(), + return json.dumps(util.metadata_to_dict(metadata_obj), indent=indent, separators=separators, sort_keys=True).encode("utf-8") @@ -66,7 +67,7 @@ class CanonicalJSONSerializer(SignedSerializer): def serialize(self, signed_obj: Signed) -> bytes: """Serialize Signed object into utf-8 encoded Canonical JSON bytes. """ try: - signed_dict = signed_obj.to_dict() + signed_dict = util.signed_to_dict(signed_obj) return encode_canonical(signed_dict).encode("utf-8") except Exception as e: # pylint: disable=broad-except diff --git a/tuf/api/serialization/util.py b/tuf/api/serialization/util.py new file mode 100644 index 0000000000..db06e92530 --- /dev/null +++ b/tuf/api/serialization/util.py @@ -0,0 +1,149 @@ +"""Utility functions to facilitate TUF metadata de/serialization. + +Currently, this module contains functions to convert between the TUF metadata +class model and a corresponding dictionary representation. + +""" + + +from typing import Any, Dict, Mapping + +from tuf import formats +from tuf.api.metadata import (Signed, Metadata, Root, Timestamp, Snapshot, + Targets) + + +def _get_signed_common_args_from_dict(_dict: Mapping[str, Any]) -> list: + """Returns ordered positional arguments for 'Signed' subclass constructors. + + See '{root, timestamp, snapshot, targets}_from_dict' functions for usage. + + """ + _type = _dict.pop("_type") + version = _dict.pop("version") + spec_version = _dict.pop("spec_version") + expires_str = _dict.pop("expires") + expires = formats.expiry_string_to_datetime(expires_str) + return [_type, version, spec_version, expires] + + +def root_from_dict(_dict: Mapping[str, Any]) -> Root: + """Returns 'Root' object based on its dict representation. """ + common_args = _get_signed_common_args_from_dict(_dict) + consistent_snapshot = _dict.pop("consistent_snapshot") + keys = _dict.pop("keys") + roles = _dict.pop("roles") + return Root(*common_args, consistent_snapshot, keys, roles) + + +def timestamp_from_dict(_dict: Mapping[str, Any]) -> Timestamp: + """Returns 'Timestamp' object based on its dict representation. """ + common_args = _get_signed_common_args_from_dict(_dict) + meta = _dict.pop("meta") + return Timestamp(*common_args, meta) + + +def snapshot_from_dict(_dict: Mapping[str, Any]) -> Snapshot: + """Returns 'Snapshot' object based on its dict representation. """ + common_args = _get_signed_common_args_from_dict(_dict) + meta = _dict.pop("meta") + return Snapshot(*common_args, meta) + + +def targets_from_dict(_dict: Mapping[str, Any]) -> Targets: + """Returns 'Targets' object based on its dict representation. """ + common_args = _get_signed_common_args_from_dict(_dict) + targets = _dict.pop("targets") + delegations = _dict.pop("delegations") + return Targets(*common_args, targets, delegations) + +def signed_from_dict(_dict) -> Signed: + """Returns 'Signed'-subclass object based on its dict representation. """ + # Dispatch to '*_from_dict'-function based on '_type' field. + # TODO: Use if/else cascade, if easier to read! + # TODO: Use constants for types! (e.g. Root._type, Targets._type, etc.) + return { + "root": root_from_dict, + "timestamp": timestamp_from_dict, + "snapshot": snapshot_from_dict, + "targets": targets_from_dict + }[_dict["_type"]](_dict) + + +def metadata_from_dict(_dict: Mapping[str, Any]) -> Metadata: + """Returns 'Metadata' object based on its dict representation. """ + signed_dict = _dict.pop("signed") + signatures = _dict.pop("signatures") + return Metadata(signatures=signatures, + signed=signed_from_dict(signed_dict)) + + +def _get_signed_common_fields_as_dict(obj: Signed) -> Dict[str, Any]: + """Returns dict representation of 'Signed' object. + + See '{root, timestamp, snapshot, targets}_to_dict' functions for usage. + + """ + return { + "_type": obj._type, + "version": obj.version, + "spec_version": obj.spec_version, + "expires": obj.expires.isoformat() + "Z" + } + + +def root_to_dict(obj: Root) -> Dict[str, Any]: + """Returns dict representation of 'Root' object. """ + _dict = _get_signed_common_fields_as_dict(obj) + _dict.update({ + "consistent_snapshot": obj.consistent_snapshot, + "keys": obj.keys, + "roles": obj.roles + }) + return _dict + + +def timestamp_to_dict(obj: Timestamp) -> Dict[str, Any]: + """Returns dict representation of 'Timestamp' object. """ + _dict = _get_signed_common_fields_as_dict(obj) + _dict.update({ + "meta": obj.meta + }) + return _dict + + +def snapshot_to_dict(obj: Snapshot) -> Dict[str, Any]: + """Returns dict representation of 'Snapshot' object. """ + _dict = _get_signed_common_fields_as_dict(obj) + _dict.update({ + "meta": obj.meta + }) + return _dict + + +def targets_to_dict(obj: Targets) -> Dict[str, Any]: + """Returns dict representation of 'Targets' object. """ + _dict = _get_signed_common_fields_as_dict(obj) + _dict.update({ + "targets": obj.targets, + "delegations": obj.delegations, + }) + return _dict + +def signed_to_dict(obj: Signed) -> Dict[str, Any]: + """Returns dict representation of 'Signed'-subclass object. """ + # Dispatch to '*_to_dict'-function based on 'Signed' subclass type. + # TODO: Use if/else cascade, if easier to read! + return { + Root: root_to_dict, + Timestamp: timestamp_to_dict, + Snapshot: snapshot_to_dict, + Targets: targets_to_dict + }[obj.__class__](obj) + +def metadata_to_dict(obj: Metadata) -> Dict[str, Any]: + """Returns dict representation of 'Metadata' object. """ + return { + "signatures": obj.signatures, + "signed": signed_to_dict(obj.signed) + }