diff --git a/securesystemslib/dsse.py b/securesystemslib/dsse.py index 36b454c6..f38213ce 100644 --- a/securesystemslib/dsse.py +++ b/securesystemslib/dsse.py @@ -5,45 +5,13 @@ from typing import Any, Dict, List from securesystemslib import exceptions -from securesystemslib.serialization import ( - BaseDeserializer, - BaseSerializer, - JSONDeserializer, - JSONSerializable, - JSONSerializer, - SerializationMixin, -) from securesystemslib._internal.utils import b64enc, b64dec from securesystemslib.signer import Key, Signature, Signer logger = logging.getLogger(__name__) -class EnvelopeJSONDeserializer(JSONDeserializer): - """Deserializes raw bytes and creates an Envelope object using JSON - Deserialization.""" - - def deserialize(self, raw_data: bytes) -> "Envelope": - """Deserialize utf-8 encoded JSON bytes into an instance of Envelope. - - Arguments: - raw_data: A utf-8 encoded bytes string. - - Raises: - DeserializationError: If fails to deserialize raw_data. - - Returns: - dict. - """ - try: - return Envelope.from_dict(super().deserialize(raw_data)) - except Exception as e: - raise exceptions.DeserializationError( - "Failed to create Envelope" - ) from e - - -class Envelope(SerializationMixin, JSONSerializable): +class Envelope: """DSSE Envelope to provide interface for signing arbitrary data. Attributes: @@ -70,14 +38,6 @@ def __eq__(self, other: Any) -> bool: and self.signatures == other.signatures ) - @staticmethod - def _default_deserializer() -> BaseDeserializer: - return EnvelopeJSONDeserializer() - - @staticmethod - def _default_serializer() -> BaseSerializer: - return JSONSerializer() - @classmethod def from_dict(cls, data: dict) -> "Envelope": """Creates a DSSE Envelope from its JSON/dict representation. @@ -197,21 +157,3 @@ def verify(self, keys: List[Key], threshold: int) -> Dict[str, Key]: ) return accepted_keys - - def get_payload( - self, - deserializer: BaseDeserializer, - ) -> Any: - """Parse DSSE payload. - - Arguments: - deserializer: ``BaseDeserializer`` implementation to use. - - Raises: - DeserializationError: The payload cannot be deserialized. - - Returns: - The deserialized object of payload. - """ - - return deserializer.deserialize(self.payload) diff --git a/securesystemslib/serialization.py b/securesystemslib/serialization.py deleted file mode 100644 index 33f8aed4..00000000 --- a/securesystemslib/serialization.py +++ /dev/null @@ -1,249 +0,0 @@ -"""Serialization module provides abstract base classes and concrete -implementations to serialize and deserialize objects. -""" - -import abc -import json -import tempfile -from typing import Any, Dict, Optional - -from securesystemslib.exceptions import DeserializationError, SerializationError -from securesystemslib.storage import FilesystemBackend, StorageBackendInterface -from securesystemslib.util import persist_temp_file - - -# TODO: Use typing.Protocol post python 3.7 -class BaseDeserializer(metaclass=abc.ABCMeta): - """Abstract base class for deserialization of objects.""" - - @abc.abstractmethod - def deserialize(self, raw_data: bytes) -> Any: - """Deserialize bytes.""" - - raise NotImplementedError # pragma: no cover - - -class JSONDeserializer(BaseDeserializer): - """Provides raw to JSON deserialize method.""" - - def deserialize(self, raw_data: bytes) -> Dict: - """Deserialize utf-8 encoded JSON bytes into a dict. - - Arguments: - raw_data: A utf-8 encoded bytes string. - - Raises: - securesystemslib.exceptions.DeserializationError: If fails to - decode raw_data into json. - - Returns: - dict. - """ - - try: - return json.loads(raw_data.decode("utf-8")) - - except Exception as e: - raise DeserializationError("Failed to deserialize bytes") from e - - -class BaseSerializer(metaclass=abc.ABCMeta): - """Abstract base class for serialization of objects.""" - - @abc.abstractmethod - def serialize(self, obj: Any) -> bytes: - """Serialize an object to bytes.""" - - raise NotImplementedError # pragma: no cover - - -class JSONSerializer(BaseSerializer): - """Provide an object to bytes serialize method. - - Attributes: - compact: A boolean indicating if the JSON bytes generated in - 'serialize' should be compact by excluding whitespace. - """ - - def __init__(self, compact: bool = False): - self.indent = 1 - self.separators = (",", ": ") - if compact: - self.indent = None - self.separators = (",", ":") - - def serialize(self, obj: "JSONSerializable") -> bytes: - """Serialize an object into utf-8 encoded JSON bytes. - - Arguments: - obj: An instance of - ``securesystemslib.serialization.JSONSerializable`` subclass. - - Raises: - securesystemslib.exceptions.SerializationError: If fails to encode - into json bytes. - - Returns: - UTF-8 encoded JSON bytes of the object. - """ - - try: - json_bytes = json.dumps( - obj.to_dict(), - indent=self.indent, - separators=self.separators, - sort_keys=True, - ).encode("utf-8") - - except Exception as e: - raise SerializationError("Failed to serialize JSON") from e - - return json_bytes - - -class SerializationMixin(metaclass=abc.ABCMeta): - """Instance of class with ``SerializationMixin`` are to be serialized and - deserialized using `to_bytes`, `from_bytes`, `to_file` and `from_file` - methods. - """ - - @staticmethod - @abc.abstractmethod - def _default_deserializer() -> BaseDeserializer: - """Default Deserializer to be used for deserialization.""" - - raise NotImplementedError # pragma: no cover - - @staticmethod - @abc.abstractmethod - def _default_serializer() -> BaseSerializer: - """Default Serializer to be used for serialization.""" - - raise NotImplementedError # pragma: no cover - - @classmethod - def from_bytes( - cls, - data: bytes, - deserializer: Optional[BaseDeserializer] = None, - ) -> Any: - """Loads the object from raw data. - - Arguments: - data: bytes content. - deserializer: ``securesystemslib.serialization.BaseDeserializer`` - implementation to use. - Raises: - securesystemslib.exceptions.DeserializationError: The file cannot - be deserialized. - Returns: - Deserialized object. - """ - - if deserializer is None: - deserializer = cls._default_deserializer() - - return deserializer.deserialize(data) - - @classmethod - def from_file( - cls, - filename: str, - deserializer: Optional[BaseDeserializer] = None, - storage_backend: Optional[StorageBackendInterface] = None, - ) -> Any: - """Loads object from file storage. - - Arguments: - filename: Path to read the file from. - deserializer: ``securesystemslib.serialization.BaseDeserializer`` - subclass instance that implements the desired wireline - format deserialization. - storage_backend: Object that implements - ``securesystemslib.storage.StorageBackendInterface``. - Default is ``securesystemslib.storage.FilesystemBackend`` - (i.e. a local file). - Raises: - securesystemslib.exceptions.StorageError: The file cannot be read. - securesystemslib.exceptions.DeserializationError: The file cannot - be deserialized. - Returns: - Deserialized object. - """ - - if storage_backend is None: - storage_backend = FilesystemBackend() - - with storage_backend.get(filename) as file_obj: - return cls.from_bytes(file_obj.read(), deserializer) - - def to_bytes(self, serializer: Optional[BaseSerializer] = None) -> bytes: - """Return the serialized file format as bytes. - - Note that if bytes are first deserialized and then serialized with - ``to_file()``, the two files are not required to be identical (in case - of Metadata the signatures are guaranteed to stay valid). If - byte-for-byte equivalence is required (which is the case when content - hashes are used in other metadata), the original content should be used - instead of re-serializing. - - Arguments: - serializer: ``securesystemslib.serialization.BaseSerializer`` - instance that implements the desired serialization format. - Raises: - securesystemslib.exceptions.SerializationError: If object cannot be - serialized. - """ - - if serializer is None: - serializer = self._default_serializer() - - return serializer.serialize(self) - - def to_file( - self, - filename: str, - serializer: Optional[BaseSerializer] = None, - storage_backend: Optional[StorageBackendInterface] = None, - ): - """Writes object to file storage. - - Note that if a file is first deserialized and then serialized with - ``to_file()``, the two files are not required to be identical (in case - of Metadata the signatures are guaranteed to stay valid). If - byte-for-byte equivalence is required (which is the case when file - hashes are used in other metadata), the original file should be used - instead of re-serializing. - - Arguments: - filename: Path to write the file to. - serializer: ``securesystemslib.serialization.BaseSerializer`` - instance that implements the desired serialization format. - storage_backend: Object that implements - ``securesystemslib.storage.StorageBackendInterface``. - Default is ``securesystemslib.storage.FilesystemBackend`` - (i.e. a local file). - Raises: - securesystemslib.exceptions.SerializationError: If object cannot - be serialized. - securesystemslib.exceptions.StorageError: The file cannot be - written. - """ - - bytes_data = self.to_bytes(serializer) - - with tempfile.TemporaryFile() as temp_file: - temp_file.write(bytes_data) - persist_temp_file(temp_file, filename, storage_backend) - - -class JSONSerializable(metaclass=abc.ABCMeta): - """Objects serialized with ``securesystemslib.serialization.JSONSerializer`` - must inherit from this class and implement its ``to_dict`` method. - """ - - @abc.abstractmethod - def to_dict(self) -> dict: - """Returns the JSON-serializable dictionary representation of self.""" - - raise NotImplementedError # pragma: no cover diff --git a/tests/test_serialization.py b/tests/test_serialization.py deleted file mode 100644 index f32d598e..00000000 --- a/tests/test_serialization.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/env python - -"""Test cases for "serialization.py". """ - -import json -import os -import shutil -import tempfile -import unittest - -from securesystemslib.exceptions import DeserializationError, SerializationError -from securesystemslib.dsse import Envelope, EnvelopeJSONDeserializer -from securesystemslib.serialization import JSONDeserializer, JSONSerializer -from securesystemslib.storage import FilesystemBackend - - -class TestJSONSerialization(unittest.TestCase): - """Serialization Test Case.""" - - @classmethod - def setUpClass(cls): - cls.test_obj = Envelope( - payload=b"hello world", - payload_type="http://example.com/HelloWorld", - signatures=[], - ) - cls.test_bytes = b'{"payload":"aGVsbG8gd29ybGQ=","payloadType":"http://example.com/HelloWorld","signatures":[]}' - - def test_serializer(self): - """Test JSONSerializer with DSSE Envelope.""" - - serializer = JSONSerializer() - - # Assert SerializationError on serializing a invalid object. - with self.assertRaises(SerializationError): - serializer.serialize("not a valid obj") - - # Serialize compact and non compact envelope object into bytes. - json_bytes = serializer.serialize(self.test_obj) - - serializer = JSONSerializer(compact=True) - compact_json_bytes = serializer.serialize(self.test_obj) - - # Assert inequality between compact and non compact json bytes. - self.assertNotEqual(json_bytes, compact_json_bytes) - - # Assert equality with the test bytes. - self.assertEqual(compact_json_bytes, self.test_bytes) - - # Assert equality between compact and non compact json dict. - self.assertEqual(json.loads(json_bytes), json.loads(compact_json_bytes)) - - def test_deserializer(self): - """Test JSONDeserializer with DSSE Envelope.""" - - deserializer = JSONDeserializer() - - # Assert DeserializationError on invalid json and class. - with self.assertRaises(DeserializationError): - deserializer.deserialize(b"not a valid json") - - # Assert Equality between deserialized envelope and test object. - envelope_dict = deserializer.deserialize(self.test_bytes) - envelope_obj = Envelope.from_dict(envelope_dict) - - self.assertEqual(envelope_obj, self.test_obj) - - def test_serialization(self): - """Test JSONDeserializer and JSONSerializer.""" - - serializer = JSONSerializer() - json_bytes = serializer.serialize(self.test_obj) - - deserializer = JSONDeserializer() - envelope_dict = deserializer.deserialize(json_bytes) - envelope_obj = Envelope.from_dict(envelope_dict) - - # Assert Equality between original object and deserialized object. - self.assertEqual(envelope_obj, self.test_obj) - - -class TestSerializationMixin(unittest.TestCase): - """SerializationMixin Test Case.""" - - def setUp(self): - self.storage_backend = FilesystemBackend() - self.temp_dir = tempfile.mkdtemp(dir=os.getcwd()) - self.filepath = os.path.join(self.temp_dir, "testfile") - - def tearDown(self): - shutil.rmtree(self.temp_dir) - - @classmethod - def setUpClass(cls): - cls.test_obj = Envelope( - payload=b"hello world", - payload_type="http://example.com/HelloWorld", - signatures=[], - ) - - def test_to_and_from_file(self): - """Test to_file and from_file method of Serializable.""" - - # Save test_obj to a file. - self.test_obj.to_file(self.filepath) - - # Load object from the saved file. - envelope_obj = Envelope.from_file(self.filepath) - - # Test for equality. - self.assertEqual(envelope_obj, self.test_obj) - - def test_to_and_from_file_with_storage_backend(self): - """Test to_file and from_file method of Serializable with storage - backend.""" - - # Save test_obj to a file. - self.test_obj.to_file( - self.filepath, storage_backend=self.storage_backend - ) - - # Load object from the saved file. - envelope_obj = Envelope.from_file( - self.filepath, storage_backend=self.storage_backend - ) - - # Test for equality. - self.assertEqual(envelope_obj, self.test_obj) - - def test_to_and_from_bytes(self): - """Test to_bytes and from_bytes method of Serializable.""" - - # Serializer object into bytes. - json_bytes = self.test_obj.to_bytes() - - # Deserialize object from bytes. - envelope_obj = Envelope.from_bytes(json_bytes) - - # Test for equality. - self.assertEqual(envelope_obj, self.test_obj) - - def test_to_and_from_bytes_with_serializer(self): - """Test to_bytes and from_bytes method of Serializable with JSON - serializer and deserializer.""" - - # Serializer object into bytes. - serializer = JSONSerializer(compact=True) - json_bytes = self.test_obj.to_bytes(serializer) - - # Deserialize object from bytes. - deserializer = EnvelopeJSONDeserializer() - envelope_obj = Envelope.from_bytes(json_bytes, deserializer) - - # Test for equality. - self.assertEqual(envelope_obj, self.test_obj) - - -# Run the unit tests. -if __name__ == "__main__": - unittest.main()