diff --git a/securesystemslib/_internal/utils.py b/securesystemslib/_internal/utils.py new file mode 100644 index 00000000..70bfba93 --- /dev/null +++ b/securesystemslib/_internal/utils.py @@ -0,0 +1,41 @@ +"""Internal utilities""" + +import base64 +import binascii + + +def b64enc(data: bytes) -> str: + """To encode byte sequence into base64 string + + Arguments: + data: Byte sequence to encode + + Exceptions: + TypeError: If "data" is not byte sequence + + Returns: + base64 string + """ + + return base64.standard_b64encode(data).decode("utf-8") + + +def b64dec(string: str) -> bytes: + """To decode byte sequence from base64 string + + Arguments: + string: base64 string to decode + + Raises: + binascii.Error: If invalid base64-encoded string + + Returns: + A byte sequence + """ + + data = string.encode("utf-8") + try: + return base64.b64decode(data, validate=True) + except binascii.Error: + # altchars for urlsafe encoded base64 - instead of + and _ instead of / + return base64.b64decode(data, altchars=b"-_", validate=True) diff --git a/securesystemslib/dsse.py b/securesystemslib/dsse.py new file mode 100644 index 00000000..c1e41a28 --- /dev/null +++ b/securesystemslib/dsse.py @@ -0,0 +1,162 @@ +"""Dead Simple Signing Envelope +""" + +import logging +from typing import Any, Dict, List + +from securesystemslib import exceptions +from securesystemslib._internal.utils import b64dec, b64enc +from securesystemslib.signer import Key, Signature, Signer + +logger = logging.getLogger(__name__) + + +class Envelope: + """DSSE Envelope to provide interface for signing arbitrary data. + + Attributes: + payload: Arbitrary byte sequence of serialized body. + payload_type: string that identifies how to interpret payload. + signatures: list of Signature. + + """ + + def __init__( + self, payload: bytes, payload_type: str, signatures: List[Signature] + ): + self.payload = payload + self.payload_type = payload_type + self.signatures = signatures + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Envelope): + return False + + return ( + self.payload == other.payload + and self.payload_type == other.payload_type + and self.signatures == other.signatures + ) + + @classmethod + def from_dict(cls, data: dict) -> "Envelope": + """Creates a DSSE Envelope from its JSON/dict representation. + + Arguments: + data: A dict containing a valid payload, payloadType and signatures + + Raises: + KeyError: If any of the "payload", "payloadType" and "signatures" + fields are missing from the "data". + + FormatError: If signature in "signatures" is incorrect. + + Returns: + A "Envelope" instance. + """ + + payload = b64dec(data["payload"]) + payload_type = data["payloadType"] + + signatures = [ + Signature.from_dict(signature) for signature in data["signatures"] + ] + + return cls(payload, payload_type, signatures) + + def to_dict(self) -> dict: + """Returns the JSON-serializable dictionary representation of self.""" + + return { + "payload": b64enc(self.payload), + "payloadType": self.payload_type, + "signatures": [ + signature.to_dict() for signature in self.signatures + ], + } + + def pae(self) -> bytes: + """Pre-Auth-Encoding byte sequence of self.""" + + return b"DSSEv1 %d %b %d %b" % ( + len(self.payload_type), + self.payload_type.encode("utf-8"), + len(self.payload), + self.payload, + ) + + def sign(self, signer: Signer) -> Signature: + """Sign the payload and create the signature. + + Arguments: + signer: A "Signer" class instance. + + Returns: + A "Signature" instance. + """ + + signature = signer.sign(self.pae()) + self.signatures.append(signature) + + return signature + + def verify(self, keys: List[Key], threshold: int) -> Dict[str, Key]: + """Verify the payload with the provided Keys. + + NOTE: This API is experimental and might change (see + secure-systems-lab/dsse#55) + + Arguments: + keys: A list of public keys to verify the signatures. + threshold: Number of signatures needed to pass the verification. + + Raises: + ValueError: If "threshold" is not valid. + VerificationError: If the enclosed signatures do not pass the + verification. + + Note: + Mandating keyid in signatures and matching them with keyid of Key + in order to consider them for verification, is not DSSE spec + compliant (Issue #416). + + Returns: + A dict of the threshold of unique public keys that verified a + signature. + """ + + accepted_keys = {} + pae = self.pae() + + # checks for threshold value. + if threshold <= 0: + raise ValueError("Threshold must be greater than 0") + + if len(keys) < threshold: + raise ValueError("Number of keys can't be less than threshold") + + for signature in self.signatures: + for key in keys: + # If Signature keyid doesn't match with Key, skip. + if not key.keyid == signature.keyid: + continue + + # If a key verifies the signature, we exit and use the result. + try: + key.verify_signature(signature, pae) + accepted_keys[key.keyid] = key + break + except exceptions.UnverifiedSignatureError: + continue + + # Break, if amount of accepted_keys are more than threshold. + if len(accepted_keys) >= threshold: + break + + if threshold > len(accepted_keys): + raise exceptions.VerificationError( + "Accepted signatures do not match threshold," + f" Found: {len(accepted_keys)}, Expected {threshold}" + ) + + return accepted_keys diff --git a/securesystemslib/exceptions.py b/securesystemslib/exceptions.py index 5f25fbe0..3e8122a1 100755 --- a/securesystemslib/exceptions.py +++ b/securesystemslib/exceptions.py @@ -147,3 +147,11 @@ class UnverifiedSignatureError(Error): class VerificationError(UnverifiedSignatureError): """Signature could not be verified because something failed in the process""" + + +class SerializationError(Error): + """Error during serialization.""" + + +class DeserializationError(Error): + """Error during deserialization.""" diff --git a/tests/test_dsse.py b/tests/test_dsse.py new file mode 100644 index 00000000..a91893b4 --- /dev/null +++ b/tests/test_dsse.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python + +"""Test cases for "metadata.py". """ + +import copy +import unittest + +import securesystemslib.keys as KEYS +from securesystemslib.dsse import Envelope +from securesystemslib.exceptions import ( + FormatError, + UnsupportedAlgorithmError, + VerificationError, +) +from securesystemslib.signer import Signature, SSlibKey, SSlibSigner + + +class TestEnvelope(unittest.TestCase): + """Test metadata interface provided by DSSE envelope.""" + + @classmethod + def setUpClass(cls): + cls.key_dicts = [ + KEYS.generate_rsa_key(), + KEYS.generate_ed25519_key(), + KEYS.generate_ecdsa_key(), + ] + + cls.signature_dict = { + "keyid": "11fa391a0ed7a447", + "sig": "30460221009342e4566528fcecf6a7a5", + } + cls.envelope_dict = { + "payload": "aGVsbG8gd29ybGQ=", + "payloadType": "http://example.com/HelloWorld", + "signatures": [cls.signature_dict], + } + cls.pae = b"DSSEv1 29 http://example.com/HelloWorld 11 hello world" + + def test_envelope_from_to_dict(self): + """Test envelope to_dict and from_dict methods.""" + + envelope_dict = copy.deepcopy(self.envelope_dict) + + # create envelope object from its dict. + envelope_obj = Envelope.from_dict(envelope_dict) + for signature in envelope_obj.signatures: + self.assertIsInstance(signature, Signature) + + # Assert envelope dict created by to_dict will be equal. + self.assertDictEqual(self.envelope_dict, envelope_obj.to_dict()) + + def test_envelope_eq_(self): + """Test envelope equality.""" + + envelope_obj = Envelope.from_dict(copy.deepcopy(self.envelope_dict)) + + # Assert that object and None will not be equal. + self.assertNotEqual(None, envelope_obj) + + # Assert a copy of envelope_obj will be equal to envelope_obj. + envelope_obj_2 = copy.deepcopy(envelope_obj) + self.assertEqual(envelope_obj, envelope_obj_2) + + # Assert that changing the "payload" will make the objects not equal. + envelope_obj_2.payload = b"wrong_payload" + self.assertNotEqual(envelope_obj, envelope_obj_2) + envelope_obj_2.payload = envelope_obj.payload + + # Assert that changing the "payload_type" will make the objects not equal. + envelope_obj_2.payload_type = "wrong_payload_type" + self.assertNotEqual(envelope_obj, envelope_obj_2) + envelope_obj_2.payload = envelope_obj.payload + + # Assert that changing the "signatures" will make the objects not equal. + sig_obg = Signature("", self.signature_dict["sig"]) + envelope_obj_2.signatures = [sig_obg] + self.assertNotEqual(envelope_obj, envelope_obj_2) + + def test_preauthencoding(self): + """Test envelope Pre-Auth-Encoding.""" + + envelope_obj = Envelope.from_dict(copy.deepcopy(self.envelope_dict)) + + # Checking for Pre-Auth-Encoding generated is correct. + self.assertEqual(self.pae, envelope_obj.pae()) + + def test_sign_and_verify(self): + """Test for creating and verifying DSSE signatures.""" + + # Create an Envelope with no signatures. + envelope_dict = copy.deepcopy(self.envelope_dict) + envelope_dict["signatures"] = [] + envelope_obj = Envelope.from_dict(envelope_dict) + + key_list = [] + for key_dict in self.key_dicts: + # Test for invalid scheme. + valid_scheme = key_dict["scheme"] + key_dict["scheme"] = "invalid_scheme" + signer = SSlibSigner(key_dict) + with self.assertRaises((FormatError, UnsupportedAlgorithmError)): + envelope_obj.sign(signer) + + # Sign the payload. + key_dict["scheme"] = valid_scheme + signer = SSlibSigner(key_dict) + envelope_obj.sign(signer) + + # Create a List of "Key" from key_dict. + key_list.append(SSlibKey.from_securesystemslib_key(key_dict)) + + # Check for signatures of Envelope. + self.assertEqual(len(self.key_dicts), len(envelope_obj.signatures)) + for signature in envelope_obj.signatures: + self.assertIsInstance(signature, Signature) + + # Test for invalid threshold value for keys_list. + # threshold is 0. + with self.assertRaises(ValueError): + envelope_obj.verify(key_list, 0) + + # threshold is greater than no of keys. + with self.assertRaises(ValueError): + envelope_obj.verify(key_list, 4) + + # Test with valid keylist and threshold. + verified_keys = envelope_obj.verify(key_list, len(key_list)) + self.assertEqual(len(verified_keys), len(key_list)) + + # Test for unknown keys and threshold of 1. + new_key_dicts = [ + KEYS.generate_rsa_key(), + KEYS.generate_ed25519_key(), + KEYS.generate_ecdsa_key(), + ] + new_key_list = [] + for key_dict in new_key_dicts: + new_key_list.append(SSlibKey.from_securesystemslib_key(key_dict)) + + with self.assertRaises(VerificationError): + envelope_obj.verify(new_key_list, 1) + + all_keys = key_list + new_key_list + envelope_obj.verify(all_keys, 3) + + # Test with duplicate keys. + duplicate_keys = key_list + key_list + with self.assertRaises(VerificationError): + envelope_obj.verify( + duplicate_keys, 4 + ) # 3 unique keys, threshold 4. + + +# Run the unit tests. +if __name__ == "__main__": + unittest.main()