-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move DSSE Implementation to securesystemslib #487
Changes from 21 commits
6a5efd2
0807b01
26de11e
58c7124
bcaca59
34fd48b
ca2b1ad
1ab712a
d1f9602
b8225f7
7ec955d
ba23d4c
cece2e9
7536f87
12c2690
1b40254
cd2381b
aade3f2
4c6be46
99f13cf
1ecb06b
c7602b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
"""Internal utilities""" | ||
|
||
import base64 | ||
import binascii | ||
|
||
|
||
def b64enc(data: bytes) -> str: | ||
"""To encode byte sequence into base64 string | ||
|
||
Arguments: | ||
data: Byte sequence to encode | ||
|
||
Exceptions: | ||
TypeError: If "data" is not byte sequence | ||
|
||
Returns: | ||
base64 string | ||
""" | ||
|
||
return base64.standard_b64encode(data).decode("utf-8") | ||
|
||
|
||
def b64dec(string: str) -> bytes: | ||
"""To decode byte sequence from base64 string | ||
|
||
Arguments: | ||
string: base64 string to decode | ||
|
||
Raises: | ||
binascii.Error: If invalid base64-encoded string | ||
|
||
Returns: | ||
A byte sequence | ||
""" | ||
|
||
data = string.encode("utf-8") | ||
try: | ||
return base64.b64decode(data, validate=True) | ||
except binascii.Error: | ||
# altchars for urlsafe encoded base64 - instead of + and _ instead of / | ||
return base64.b64decode(data, altchars=b"-_", validate=True) |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,159 @@ | ||||||
"""Dead Simple Signing Envelope | ||||||
""" | ||||||
|
||||||
import logging | ||||||
from typing import Any, Dict, List | ||||||
|
||||||
from securesystemslib import exceptions | ||||||
from securesystemslib._internal.utils import b64enc, b64dec | ||||||
from securesystemslib.signer import Key, Signature, Signer | ||||||
|
||||||
logger = logging.getLogger(__name__) | ||||||
|
||||||
|
||||||
class Envelope: | ||||||
"""DSSE Envelope to provide interface for signing arbitrary data. | ||||||
|
||||||
Attributes: | ||||||
payload: Arbitrary byte sequence of serialized body. | ||||||
payload_type: string that identifies how to interpret payload. | ||||||
signatures: list of Signature. | ||||||
|
||||||
""" | ||||||
|
||||||
def __init__( | ||||||
self, payload: bytes, payload_type: str, signatures: List[Signature] | ||||||
): | ||||||
self.payload = payload | ||||||
self.payload_type = payload_type | ||||||
self.signatures = signatures | ||||||
|
||||||
def __eq__(self, other: Any) -> bool: | ||||||
if not isinstance(other, Envelope): | ||||||
return False | ||||||
|
||||||
return ( | ||||||
self.payload == other.payload | ||||||
and self.payload_type == other.payload_type | ||||||
and self.signatures == other.signatures | ||||||
) | ||||||
|
||||||
@classmethod | ||||||
def from_dict(cls, data: dict) -> "Envelope": | ||||||
"""Creates a DSSE Envelope from its JSON/dict representation. | ||||||
|
||||||
Arguments: | ||||||
data: A dict containing a valid payload, payloadType and signatures | ||||||
|
||||||
Raises: | ||||||
KeyError: If any of the "payload", "payloadType" and "signatures" | ||||||
fields are missing from the "data". | ||||||
|
||||||
FormatError: If signature in "signatures" is incorrect. | ||||||
|
||||||
Returns: | ||||||
A "Envelope" instance. | ||||||
""" | ||||||
|
||||||
payload = b64dec(data["payload"]) | ||||||
payload_type = data["payloadType"] | ||||||
|
||||||
signatures = [ | ||||||
Signature.from_dict(signature) for signature in data["signatures"] | ||||||
] | ||||||
|
||||||
return cls(payload, payload_type, signatures) | ||||||
|
||||||
def to_dict(self) -> dict: | ||||||
"""Returns the JSON-serializable dictionary representation of self.""" | ||||||
|
||||||
return { | ||||||
"payload": b64enc(self.payload), | ||||||
"payloadType": self.payload_type, | ||||||
"signatures": [ | ||||||
signature.to_dict() for signature in self.signatures | ||||||
], | ||||||
} | ||||||
|
||||||
def pae(self) -> bytes: | ||||||
"""Pre-Auth-Encoding byte sequence of self.""" | ||||||
|
||||||
return b"DSSEv1 %d %b %d %b" % ( | ||||||
len(self.payload_type), | ||||||
self.payload_type.encode("utf-8"), | ||||||
len(self.payload), | ||||||
self.payload, | ||||||
) | ||||||
|
||||||
def sign(self, signer: Signer) -> Signature: | ||||||
"""Sign the payload and create the signature. | ||||||
|
||||||
Arguments: | ||||||
signer: A "Signer" class instance. | ||||||
|
||||||
Returns: | ||||||
A "Signature" instance. | ||||||
""" | ||||||
|
||||||
signature = signer.sign(self.pae()) | ||||||
self.signatures.append(signature) | ||||||
|
||||||
return signature | ||||||
|
||||||
def verify(self, keys: List[Key], threshold: int) -> Dict[str, Key]: | ||||||
"""Verify the payload with the provided Keys. | ||||||
|
||||||
Arguments: | ||||||
keys: A list of public keys to verify the signatures. | ||||||
threshold: Number of signatures needed to pass the verification. | ||||||
|
||||||
Raises: | ||||||
ValueError: If "threshold" is not valid. | ||||||
VerificationError: If the enclosed signatures do not pass the | ||||||
verification. | ||||||
|
||||||
Note: | ||||||
Mandating keyid in signatures and matching them with keyid of Key | ||||||
in order to consider them for verification, is not DSSE spec | ||||||
compliant (Issue #416). | ||||||
|
||||||
Returns: | ||||||
accepted_keys: A dict of unique public keys. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor nit: Sphinx renders There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, @jku made a good point about the return value being too vaguely defined, and not part of the dsse spec ( see #487 (comment)) I suggest we either return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's just add the docs here and move on. What do you think about...
Suggested change
? |
||||||
""" | ||||||
|
||||||
accepted_keys = {} | ||||||
pae = self.pae() | ||||||
|
||||||
# checks for threshold value. | ||||||
if threshold <= 0: | ||||||
raise ValueError("Threshold must be greater than 0") | ||||||
|
||||||
if len(keys) < threshold: | ||||||
raise ValueError("Number of keys can't be less than threshold") | ||||||
|
||||||
for signature in self.signatures: | ||||||
for key in keys: | ||||||
# If Signature keyid doesn't match with Key, skip. | ||||||
if not key.keyid == signature.keyid: | ||||||
continue | ||||||
|
||||||
# If a key verifies the signature, we exit and use the result. | ||||||
try: | ||||||
key.verify_signature(signature, pae) | ||||||
accepted_keys[key.keyid] = key | ||||||
break | ||||||
except exceptions.UnverifiedSignatureError: | ||||||
# TODO: Log, Raise or continue with error? | ||||||
continue | ||||||
|
||||||
# Break, if amount of recognized_signer are more than threshold. | ||||||
if len(accepted_keys) >= threshold: | ||||||
break | ||||||
|
||||||
if threshold > len(accepted_keys): | ||||||
raise exceptions.VerificationError( | ||||||
"Accepted signatures do not match threshold," | ||||||
f" Found: {len(accepted_keys)}, Expected {threshold}" | ||||||
) | ||||||
|
||||||
return accepted_keys |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
#!/usr/bin/env python | ||
|
||
"""Test cases for "metadata.py". """ | ||
|
||
import copy | ||
import unittest | ||
|
||
import securesystemslib.keys as KEYS | ||
from securesystemslib.exceptions import ( | ||
FormatError, | ||
UnsupportedAlgorithmError, | ||
VerificationError, | ||
) | ||
from securesystemslib.dsse import Envelope | ||
from securesystemslib.signer import Signature, SSlibKey, SSlibSigner | ||
|
||
|
||
class TestEnvelope(unittest.TestCase): | ||
"""Test metadata interface provided by DSSE envelope.""" | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
cls.key_dicts = [ | ||
KEYS.generate_rsa_key(), | ||
KEYS.generate_ed25519_key(), | ||
KEYS.generate_ecdsa_key(), | ||
] | ||
|
||
cls.signature_dict = { | ||
"keyid": "11fa391a0ed7a447", | ||
"sig": "30460221009342e4566528fcecf6a7a5", | ||
} | ||
cls.envelope_dict = { | ||
"payload": "aGVsbG8gd29ybGQ=", | ||
"payloadType": "http://example.com/HelloWorld", | ||
"signatures": [cls.signature_dict], | ||
} | ||
cls.pae = b"DSSEv1 29 http://example.com/HelloWorld 11 hello world" | ||
|
||
def test_envelope_from_to_dict(self): | ||
"""Test envelope to_dict and from_dict methods.""" | ||
|
||
envelope_dict = copy.deepcopy(self.envelope_dict) | ||
|
||
# create envelope object from its dict. | ||
envelope_obj = Envelope.from_dict(envelope_dict) | ||
for signature in envelope_obj.signatures: | ||
self.assertIsInstance(signature, Signature) | ||
|
||
# Assert envelope dict created by to_dict will be equal. | ||
self.assertDictEqual(self.envelope_dict, envelope_obj.to_dict()) | ||
|
||
def test_envelope_eq_(self): | ||
"""Test envelope equality.""" | ||
|
||
envelope_obj = Envelope.from_dict(copy.deepcopy(self.envelope_dict)) | ||
|
||
# Assert that object and None will not be equal. | ||
self.assertNotEqual(None, envelope_obj) | ||
|
||
# Assert a copy of envelope_obj will be equal to envelope_obj. | ||
envelope_obj_2 = copy.deepcopy(envelope_obj) | ||
self.assertEqual(envelope_obj, envelope_obj_2) | ||
|
||
# Assert that changing the "payload" will make the objects not equal. | ||
envelope_obj_2.payload = b"wrong_payload" | ||
self.assertNotEqual(envelope_obj, envelope_obj_2) | ||
envelope_obj_2.payload = envelope_obj.payload | ||
|
||
# Assert that changing the "payload_type" will make the objects not equal. | ||
envelope_obj_2.payload_type = "wrong_payload_type" | ||
self.assertNotEqual(envelope_obj, envelope_obj_2) | ||
envelope_obj_2.payload = envelope_obj.payload | ||
|
||
# Assert that changing the "signatures" will make the objects not equal. | ||
sig_obg = Signature("", self.signature_dict["sig"]) | ||
envelope_obj_2.signatures = [sig_obg] | ||
self.assertNotEqual(envelope_obj, envelope_obj_2) | ||
|
||
def test_preauthencoding(self): | ||
"""Test envelope Pre-Auth-Encoding.""" | ||
|
||
envelope_obj = Envelope.from_dict(copy.deepcopy(self.envelope_dict)) | ||
|
||
# Checking for Pre-Auth-Encoding generated is correct. | ||
self.assertEqual(self.pae, envelope_obj.pae()) | ||
|
||
def test_sign_and_verify(self): | ||
"""Test for creating and verifying DSSE signatures.""" | ||
|
||
# Create an Envelope with no signatures. | ||
envelope_dict = copy.deepcopy(self.envelope_dict) | ||
envelope_dict["signatures"] = [] | ||
envelope_obj = Envelope.from_dict(envelope_dict) | ||
|
||
key_list = [] | ||
for key_dict in self.key_dicts: | ||
# Test for invalid scheme. | ||
valid_scheme = key_dict["scheme"] | ||
key_dict["scheme"] = "invalid_scheme" | ||
signer = SSlibSigner(key_dict) | ||
with self.assertRaises((FormatError, UnsupportedAlgorithmError)): | ||
envelope_obj.sign(signer) | ||
|
||
# Sign the payload. | ||
key_dict["scheme"] = valid_scheme | ||
signer = SSlibSigner(key_dict) | ||
envelope_obj.sign(signer) | ||
|
||
# Create a List of "Key" from key_dict. | ||
key_list.append(SSlibKey.from_securesystemslib_key(key_dict)) | ||
|
||
# Check for signatures of Envelope. | ||
self.assertEqual(len(self.key_dicts), len(envelope_obj.signatures)) | ||
for signature in envelope_obj.signatures: | ||
self.assertIsInstance(signature, Signature) | ||
|
||
# Test for invalid threshold value for keys_list. | ||
# threshold is 0. | ||
with self.assertRaises(ValueError): | ||
envelope_obj.verify(key_list, 0) | ||
|
||
# threshold is greater than no of keys. | ||
with self.assertRaises(ValueError): | ||
envelope_obj.verify(key_list, 4) | ||
|
||
# Test with valid keylist and threshold. | ||
verified_keys = envelope_obj.verify(key_list, len(key_list)) | ||
self.assertEqual(len(verified_keys), len(key_list)) | ||
|
||
# Test for unknown keys and threshold of 1. | ||
new_key_dicts = [ | ||
KEYS.generate_rsa_key(), | ||
KEYS.generate_ed25519_key(), | ||
KEYS.generate_ecdsa_key(), | ||
] | ||
new_key_list = [] | ||
for key_dict in new_key_dicts: | ||
new_key_list.append(SSlibKey.from_securesystemslib_key(key_dict)) | ||
|
||
with self.assertRaises(VerificationError): | ||
envelope_obj.verify(new_key_list, 1) | ||
|
||
all_keys = key_list + new_key_list | ||
envelope_obj.verify(all_keys, 3) | ||
|
||
# Test with duplicate keys. | ||
duplicate_keys = key_list + key_list | ||
with self.assertRaises(VerificationError): | ||
envelope_obj.verify( | ||
duplicate_keys, 4 | ||
) # 3 unique keys, threshold 4. | ||
|
||
|
||
# Run the unit tests. | ||
if __name__ == "__main__": | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at how we use this function in in-toto --
verify(keys=[key], threshold=1)
, and building in-toto specific threshold check verification around it -- @jku's argument in secure-systems-lab/dsse#55 seems more convincing now.That said, we can work with what we have and I'd really like to move forward. Let's just add comment that makes it very clear that this API isn't final yet. E.g...