-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move DSSE Implementation to securesystemslib #487
Merged
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
6a5efd2
Add DSSE Envelope
PradyumnaKrishna 0807b01
Move b64enc and b64dec to util.py
PradyumnaKrishna 26de11e
Code formatting and update documentation
PradyumnaKrishna 58c7124
Add PreAuthEncoding Method
PradyumnaKrishna bcaca59
Add tests for metadata.py
PradyumnaKrishna 34fd48b
Update docs and minor changes
PradyumnaKrishna ca2b1ad
Correct exception in from_dict
PradyumnaKrishna 1ab712a
Envelope: Remove GPGSignature case handling
PradyumnaKrishna d1f9602
Changes related to DSSE envelope
PradyumnaKrishna b8225f7
Add sign and verify methods to DSSE envelope
PradyumnaKrishna 7ec955d
Add tests for sign and verify methods of Envelope
PradyumnaKrishna ba23d4c
Add Serialization module with Abstract methods
PradyumnaKrishna cece2e9
Creation of JSON Serializer and Deserializer
PradyumnaKrishna 7536f87
Add payload deserialization in DSSE Envelope
PradyumnaKrishna 12c2690
Add test case for serialization.py
PradyumnaKrishna 1b40254
Add JSONSerializable and default option
PradyumnaKrishna cd2381b
Add Todo and fix Type hints for SerializationMixin
PradyumnaKrishna aade3f2
Changes in serialization API
PradyumnaKrishna 4c6be46
Fix failing tests
PradyumnaKrishna 99f13cf
Rename metadata subpackage to dsse
PradyumnaKrishna 1ecb06b
Remove serialization module
PradyumnaKrishna c7602b5
Update documentation and lint fix
PradyumnaKrishna File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
"""Internal utilities""" | ||
|
||
import base64 | ||
import binascii | ||
|
||
|
||
def b64enc(data: bytes) -> str: | ||
"""To encode byte sequence into base64 string | ||
|
||
Arguments: | ||
data: Byte sequence to encode | ||
|
||
Exceptions: | ||
TypeError: If "data" is not byte sequence | ||
|
||
Returns: | ||
base64 string | ||
""" | ||
|
||
return base64.standard_b64encode(data).decode("utf-8") | ||
|
||
|
||
def b64dec(string: str) -> bytes: | ||
"""To decode byte sequence from base64 string | ||
|
||
Arguments: | ||
string: base64 string to decode | ||
|
||
Raises: | ||
binascii.Error: If invalid base64-encoded string | ||
|
||
Returns: | ||
A byte sequence | ||
""" | ||
|
||
data = string.encode("utf-8") | ||
try: | ||
return base64.b64decode(data, validate=True) | ||
except binascii.Error: | ||
# altchars for urlsafe encoded base64 - instead of + and _ instead of / | ||
return base64.b64decode(data, altchars=b"-_", validate=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
"""Dead Simple Signing Envelope | ||
""" | ||
|
||
import logging | ||
from typing import Any, Dict, List | ||
|
||
from securesystemslib import exceptions | ||
from securesystemslib._internal.utils import b64dec, b64enc | ||
from securesystemslib.signer import Key, Signature, Signer | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class Envelope: | ||
"""DSSE Envelope to provide interface for signing arbitrary data. | ||
|
||
Attributes: | ||
payload: Arbitrary byte sequence of serialized body. | ||
payload_type: string that identifies how to interpret payload. | ||
signatures: list of Signature. | ||
|
||
""" | ||
|
||
def __init__( | ||
self, payload: bytes, payload_type: str, signatures: List[Signature] | ||
): | ||
self.payload = payload | ||
self.payload_type = payload_type | ||
self.signatures = signatures | ||
|
||
def __eq__(self, other: Any) -> bool: | ||
if not isinstance(other, Envelope): | ||
return False | ||
|
||
return ( | ||
self.payload == other.payload | ||
and self.payload_type == other.payload_type | ||
and self.signatures == other.signatures | ||
) | ||
|
||
@classmethod | ||
def from_dict(cls, data: dict) -> "Envelope": | ||
"""Creates a DSSE Envelope from its JSON/dict representation. | ||
|
||
Arguments: | ||
data: A dict containing a valid payload, payloadType and signatures | ||
|
||
Raises: | ||
KeyError: If any of the "payload", "payloadType" and "signatures" | ||
fields are missing from the "data". | ||
|
||
FormatError: If signature in "signatures" is incorrect. | ||
|
||
Returns: | ||
A "Envelope" instance. | ||
""" | ||
|
||
payload = b64dec(data["payload"]) | ||
payload_type = data["payloadType"] | ||
|
||
signatures = [ | ||
Signature.from_dict(signature) for signature in data["signatures"] | ||
] | ||
|
||
return cls(payload, payload_type, signatures) | ||
|
||
def to_dict(self) -> dict: | ||
"""Returns the JSON-serializable dictionary representation of self.""" | ||
|
||
return { | ||
"payload": b64enc(self.payload), | ||
"payloadType": self.payload_type, | ||
"signatures": [ | ||
signature.to_dict() for signature in self.signatures | ||
], | ||
} | ||
|
||
def pae(self) -> bytes: | ||
"""Pre-Auth-Encoding byte sequence of self.""" | ||
|
||
return b"DSSEv1 %d %b %d %b" % ( | ||
len(self.payload_type), | ||
self.payload_type.encode("utf-8"), | ||
len(self.payload), | ||
self.payload, | ||
) | ||
|
||
def sign(self, signer: Signer) -> Signature: | ||
"""Sign the payload and create the signature. | ||
|
||
Arguments: | ||
signer: A "Signer" class instance. | ||
|
||
Returns: | ||
A "Signature" instance. | ||
""" | ||
|
||
signature = signer.sign(self.pae()) | ||
self.signatures.append(signature) | ||
|
||
return signature | ||
|
||
def verify(self, keys: List[Key], threshold: int) -> Dict[str, Key]: | ||
"""Verify the payload with the provided Keys. | ||
|
||
NOTE: This API is experimental and might change (see | ||
secure-systems-lab/dsse#55) | ||
|
||
Arguments: | ||
keys: A list of public keys to verify the signatures. | ||
threshold: Number of signatures needed to pass the verification. | ||
|
||
Raises: | ||
ValueError: If "threshold" is not valid. | ||
VerificationError: If the enclosed signatures do not pass the | ||
verification. | ||
|
||
Note: | ||
Mandating keyid in signatures and matching them with keyid of Key | ||
in order to consider them for verification, is not DSSE spec | ||
compliant (Issue #416). | ||
|
||
Returns: | ||
A dict of the threshold of unique public keys that verified a | ||
signature. | ||
""" | ||
|
||
accepted_keys = {} | ||
pae = self.pae() | ||
|
||
# checks for threshold value. | ||
if threshold <= 0: | ||
raise ValueError("Threshold must be greater than 0") | ||
|
||
if len(keys) < threshold: | ||
raise ValueError("Number of keys can't be less than threshold") | ||
|
||
for signature in self.signatures: | ||
for key in keys: | ||
# If Signature keyid doesn't match with Key, skip. | ||
if not key.keyid == signature.keyid: | ||
continue | ||
|
||
# If a key verifies the signature, we exit and use the result. | ||
try: | ||
key.verify_signature(signature, pae) | ||
accepted_keys[key.keyid] = key | ||
break | ||
except exceptions.UnverifiedSignatureError: | ||
continue | ||
|
||
# Break, if amount of accepted_keys are more than threshold. | ||
if len(accepted_keys) >= threshold: | ||
break | ||
|
||
if threshold > len(accepted_keys): | ||
raise exceptions.VerificationError( | ||
"Accepted signatures do not match threshold," | ||
f" Found: {len(accepted_keys)}, Expected {threshold}" | ||
) | ||
|
||
return accepted_keys |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
#!/usr/bin/env python | ||
|
||
"""Test cases for "metadata.py". """ | ||
|
||
import copy | ||
import unittest | ||
|
||
import securesystemslib.keys as KEYS | ||
from securesystemslib.dsse import Envelope | ||
from securesystemslib.exceptions import ( | ||
FormatError, | ||
UnsupportedAlgorithmError, | ||
VerificationError, | ||
) | ||
from securesystemslib.signer import Signature, SSlibKey, SSlibSigner | ||
|
||
|
||
class TestEnvelope(unittest.TestCase): | ||
"""Test metadata interface provided by DSSE envelope.""" | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
cls.key_dicts = [ | ||
KEYS.generate_rsa_key(), | ||
KEYS.generate_ed25519_key(), | ||
KEYS.generate_ecdsa_key(), | ||
] | ||
|
||
cls.signature_dict = { | ||
"keyid": "11fa391a0ed7a447", | ||
"sig": "30460221009342e4566528fcecf6a7a5", | ||
} | ||
cls.envelope_dict = { | ||
"payload": "aGVsbG8gd29ybGQ=", | ||
"payloadType": "http://example.com/HelloWorld", | ||
"signatures": [cls.signature_dict], | ||
} | ||
cls.pae = b"DSSEv1 29 http://example.com/HelloWorld 11 hello world" | ||
|
||
def test_envelope_from_to_dict(self): | ||
"""Test envelope to_dict and from_dict methods.""" | ||
|
||
envelope_dict = copy.deepcopy(self.envelope_dict) | ||
|
||
# create envelope object from its dict. | ||
envelope_obj = Envelope.from_dict(envelope_dict) | ||
for signature in envelope_obj.signatures: | ||
self.assertIsInstance(signature, Signature) | ||
|
||
# Assert envelope dict created by to_dict will be equal. | ||
self.assertDictEqual(self.envelope_dict, envelope_obj.to_dict()) | ||
|
||
def test_envelope_eq_(self): | ||
"""Test envelope equality.""" | ||
|
||
envelope_obj = Envelope.from_dict(copy.deepcopy(self.envelope_dict)) | ||
|
||
# Assert that object and None will not be equal. | ||
self.assertNotEqual(None, envelope_obj) | ||
|
||
# Assert a copy of envelope_obj will be equal to envelope_obj. | ||
envelope_obj_2 = copy.deepcopy(envelope_obj) | ||
self.assertEqual(envelope_obj, envelope_obj_2) | ||
|
||
# Assert that changing the "payload" will make the objects not equal. | ||
envelope_obj_2.payload = b"wrong_payload" | ||
self.assertNotEqual(envelope_obj, envelope_obj_2) | ||
envelope_obj_2.payload = envelope_obj.payload | ||
|
||
# Assert that changing the "payload_type" will make the objects not equal. | ||
envelope_obj_2.payload_type = "wrong_payload_type" | ||
self.assertNotEqual(envelope_obj, envelope_obj_2) | ||
envelope_obj_2.payload = envelope_obj.payload | ||
|
||
# Assert that changing the "signatures" will make the objects not equal. | ||
sig_obg = Signature("", self.signature_dict["sig"]) | ||
envelope_obj_2.signatures = [sig_obg] | ||
self.assertNotEqual(envelope_obj, envelope_obj_2) | ||
|
||
def test_preauthencoding(self): | ||
"""Test envelope Pre-Auth-Encoding.""" | ||
|
||
envelope_obj = Envelope.from_dict(copy.deepcopy(self.envelope_dict)) | ||
|
||
# Checking for Pre-Auth-Encoding generated is correct. | ||
self.assertEqual(self.pae, envelope_obj.pae()) | ||
|
||
def test_sign_and_verify(self): | ||
"""Test for creating and verifying DSSE signatures.""" | ||
|
||
# Create an Envelope with no signatures. | ||
envelope_dict = copy.deepcopy(self.envelope_dict) | ||
envelope_dict["signatures"] = [] | ||
envelope_obj = Envelope.from_dict(envelope_dict) | ||
|
||
key_list = [] | ||
for key_dict in self.key_dicts: | ||
# Test for invalid scheme. | ||
valid_scheme = key_dict["scheme"] | ||
key_dict["scheme"] = "invalid_scheme" | ||
signer = SSlibSigner(key_dict) | ||
with self.assertRaises((FormatError, UnsupportedAlgorithmError)): | ||
envelope_obj.sign(signer) | ||
|
||
# Sign the payload. | ||
key_dict["scheme"] = valid_scheme | ||
signer = SSlibSigner(key_dict) | ||
envelope_obj.sign(signer) | ||
|
||
# Create a List of "Key" from key_dict. | ||
key_list.append(SSlibKey.from_securesystemslib_key(key_dict)) | ||
|
||
# Check for signatures of Envelope. | ||
self.assertEqual(len(self.key_dicts), len(envelope_obj.signatures)) | ||
for signature in envelope_obj.signatures: | ||
self.assertIsInstance(signature, Signature) | ||
|
||
# Test for invalid threshold value for keys_list. | ||
# threshold is 0. | ||
with self.assertRaises(ValueError): | ||
envelope_obj.verify(key_list, 0) | ||
|
||
# threshold is greater than no of keys. | ||
with self.assertRaises(ValueError): | ||
envelope_obj.verify(key_list, 4) | ||
|
||
# Test with valid keylist and threshold. | ||
verified_keys = envelope_obj.verify(key_list, len(key_list)) | ||
self.assertEqual(len(verified_keys), len(key_list)) | ||
|
||
# Test for unknown keys and threshold of 1. | ||
new_key_dicts = [ | ||
KEYS.generate_rsa_key(), | ||
KEYS.generate_ed25519_key(), | ||
KEYS.generate_ecdsa_key(), | ||
] | ||
new_key_list = [] | ||
for key_dict in new_key_dicts: | ||
new_key_list.append(SSlibKey.from_securesystemslib_key(key_dict)) | ||
|
||
with self.assertRaises(VerificationError): | ||
envelope_obj.verify(new_key_list, 1) | ||
|
||
all_keys = key_list + new_key_list | ||
envelope_obj.verify(all_keys, 3) | ||
|
||
# Test with duplicate keys. | ||
duplicate_keys = key_list + key_list | ||
with self.assertRaises(VerificationError): | ||
envelope_obj.verify( | ||
duplicate_keys, 4 | ||
) # 3 unique keys, threshold 4. | ||
|
||
|
||
# Run the unit tests. | ||
if __name__ == "__main__": | ||
unittest.main() |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at how we use this function in in-toto --
verify(keys=[key], threshold=1)
, and building in-toto specific threshold check verification around it -- @jku's argument in secure-systems-lab/dsse#55 seems more convincing now.That said, we can work with what we have and I'd really like to move forward. Let's just add comment that makes it very clear that this API isn't final yet. E.g...