Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move DSSE Implementation to securesystemslib #487

Merged
merged 22 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6a5efd2
Add DSSE Envelope
PradyumnaKrishna Jun 14, 2022
0807b01
Move b64enc and b64dec to util.py
PradyumnaKrishna Jun 14, 2022
26de11e
Code formatting and update documentation
PradyumnaKrishna Jun 15, 2022
58c7124
Add PreAuthEncoding Method
PradyumnaKrishna Jun 15, 2022
bcaca59
Add tests for metadata.py
PradyumnaKrishna Jun 17, 2022
34fd48b
Update docs and minor changes
PradyumnaKrishna Jun 20, 2022
ca2b1ad
Correct exception in from_dict
PradyumnaKrishna Jun 20, 2022
1ab712a
Envelope: Remove GPGSignature case handling
PradyumnaKrishna Dec 14, 2022
d1f9602
Changes related to DSSE envelope
PradyumnaKrishna Jul 21, 2022
b8225f7
Add sign and verify methods to DSSE envelope
PradyumnaKrishna Jul 22, 2022
7ec955d
Add tests for sign and verify methods of Envelope
PradyumnaKrishna Jul 22, 2022
ba23d4c
Add Serialization module with Abstract methods
PradyumnaKrishna Aug 9, 2022
cece2e9
Creation of JSON Serializer and Deserializer
PradyumnaKrishna Aug 10, 2022
7536f87
Add payload deserialization in DSSE Envelope
PradyumnaKrishna Aug 10, 2022
12c2690
Add test case for serialization.py
PradyumnaKrishna Aug 10, 2022
1b40254
Add JSONSerializable and default option
PradyumnaKrishna Aug 11, 2022
cd2381b
Add Todo and fix Type hints for SerializationMixin
PradyumnaKrishna Aug 16, 2022
aade3f2
Changes in serialization API
PradyumnaKrishna Sep 5, 2022
4c6be46
Fix failing tests
PradyumnaKrishna Dec 17, 2022
99f13cf
Rename metadata subpackage to dsse
PradyumnaKrishna Feb 13, 2023
1ecb06b
Remove serialization module
PradyumnaKrishna Feb 17, 2023
c7602b5
Update documentation and lint fix
PradyumnaKrishna Mar 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions securesystemslib/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Dead Simple Signing Envelope
"""

import logging
from typing import Any, Dict, List

from securesystemslib import exceptions, formats
from securesystemslib.signer import Key, Signature, Signer
from securesystemslib.util import b64dec, b64enc

logger = logging.getLogger(__name__)


class Envelope:
"""DSSE Envelope to provide interface for signing arbitrary data.

Attributes:
payload: Arbitrary byte sequence of serialized body.
payload_type: string that identifies how to interpret payload.
signatures: list of Signature.

"""

def __init__(
self, payload: bytes, payload_type: str, signatures: List[Signature]
):
self.payload = payload
self.payload_type = payload_type
self.signatures = signatures
lukpueh marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the decision to internally store signatures as a dict in Metadata was a good idea. Is there a reason it's a list here (other than simpler de/serialize)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we kept it a list for simplicity, and because that's what the DSSE spec describes. I'm fine with using a dict internally, if there's a clear usage advantage. Though I don't know if the same requirements apply as for TUF Metadata, since I expect Envelope objects to usually have a much shorter lifetime (also see theupdateframework/python-tuf#2246 (review)).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any opinions about this, @PradyumnaKrishna?


def __eq__(self, other: Any) -> bool:
if not isinstance(other, Envelope):
return False

return (
self.payload == other.payload
and self.payload_type == other.payload_type
and self.signatures == other.signatures
)

@classmethod
def from_dict(cls, data: dict) -> "Envelope":
"""Creates a Signature object from its JSON/dict representation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy-pasta?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Creates a Signature object from its JSON/dict representation.
"""Creates a DSSE Envelope from its JSON/dict representation.


Arguments:
data: A dict containing a valid payload, payloadType and signatures

Raises:
KeyError: If any of the "payload", "payloadType" and "signatures"
fields are missing from the "data".

FormatError: If signature in "signatures" is incorrect.

Returns:
A "Envelope" instance.
"""

payload = b64dec(data["payload"])
payload_type = data["payloadType"]

formats.SIGNATURES_SCHEMA.check_match(data["signatures"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not use schema/formats in any new code. The from_dict parser hierarchy should be format validation enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

signatures = [
Signature.from_dict(signature) for signature in data["signatures"]
]

return cls(payload, payload_type, signatures)

def to_dict(self) -> dict:
"""Returns the JSON-serializable dictionary representation of self."""

return {
"payload": b64enc(self.payload),
"payloadType": self.payload_type,
"signatures": [
signature.to_dict() for signature in self.signatures
],
}

def pae(self) -> bytes:
"""Pre-Auth-Encoding byte sequence of self."""

return b"DSSEv1 %d %b %d %b" % (
len(self.payload_type),
self.payload_type.encode("utf-8"),
len(self.payload),
self.payload,
)

def sign(self, signer: Signer) -> Signature:
"""Sign the payload and create the signature.

Arguments:
signer: A "Signer" class instance.

Returns:
A "Signature" instance.
"""

signature = signer.sign(self.pae())
self.signatures.append(signature)
lukpueh marked this conversation as resolved.
Show resolved Hide resolved

return signature

def verify(self, keys: List[Key], threshold: int) -> Dict[str, Key]:
lukpueh marked this conversation as resolved.
Show resolved Hide resolved
"""Verify the payload with the provided Keys.

Arguments:
keys: A list of public keys to verify the signatures.
threshold: Number of signatures needed to pass the verification.

Raises:
ValueError: If "threshold" is not valid.
SignatureVerificationError: If the enclosed signatures do not pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SignatureVerificationError or VerificationError?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

the verification.

Note:
Mandating keyid in signatures and matching them with keyid of Key
in order to consider them for verification, is not a DSSE spec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
in order to consider them for verification, is not a DSSE spec
in order to consider them for verification, is not DSSE spec

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

compliant (Issue #416).

Returns:
accepted_keys: A dict of unique public keys.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't object to it being a dict if that's useful for other purposes but looking at the code it looks more like a set

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I resolved an earlier comment related to this... but I've decided I'm not happy about it: we should not return "accepted keys" that is not actually a complete list but just some of the valid keys.

This return value is also not part of the protocol spec.

To be clear: I don't object to the method returning a set of keys, but I do object to it returning a vaguely defined set of keys: returning None would be better than that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's return None then for the time being so that we provide a spec compliant implementation. Also, AFAICS, we don't currently need the return value in in-toto.

"""

accepted_keys = {}
pae = self.pae()

# checks for threshold value.
if threshold <= 0:
raise ValueError("Threshold must be greater than 0")

if len(keys) < threshold:
raise ValueError("Number of keys can't be less than threshold")
jku marked this conversation as resolved.
Show resolved Hide resolved

for signature in self.signatures:
for key in keys:
# If Signature keyid doesn't match with Key, skip.
if not key.keyid == signature.keyid:
continue

# If a key verifies the signature, we exit and use the result.
try:
key.verify_signature(signature, pae)
accepted_keys[key.keyid] = key
break
except exceptions.UnverifiedSignatureError:
# TODO: Log, Raise or continue with error?
continue

# Break, if amount of recognized_signer are more than threshold.
if len(accepted_keys) >= threshold:
break
jku marked this conversation as resolved.
Show resolved Hide resolved

if threshold > len(accepted_keys):
raise exceptions.VerificationError(
"Accepted signatures do not match threshold,"
f" Found: {len(accepted_keys)}, Expected {threshold}"
)

return accepted_keys
39 changes: 39 additions & 0 deletions securesystemslib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
that tries to import a working json module, load_json_* functions, etc.
"""

import base64
import binascii
import json
import logging
import os
Expand Down Expand Up @@ -457,3 +459,40 @@ def digests_are_equal(digest1: str, digest2: str) -> bool:
are_equal = False

return are_equal


def b64enc(data: bytes) -> str:
Copy link
Collaborator

@jku jku Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With regards to expanding existing modules like utils... Should we bite the bullet and make a _utils.py or _internal/utils.py so that we can:

  • add new internal helpers there without expanding the assumed API surface
  • start moving older code from utils.py to the internal version if we think there are methods that were never supposed to be public?

This is not that specific to this PR so feel free to disregard.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, especially since we already have base64 utility functions in the public (?) formats module. Let's start a _utils.py or _internal/utils.py just with the b64enc and b64dec from this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

"""To encode byte sequence into base64 string

Arguments:
data: Byte sequence to encode

Exceptions:
TypeError: If "data" is not byte sequence

Returns:
base64 string
"""

return base64.standard_b64encode(data).decode("utf-8")


def b64dec(string: str) -> bytes:
"""To decode byte sequence from base64 string

Arguments:
string: base64 string to decode

Raises:
binascii.Error: If invalid base64-encoded string

Returns:
A byte sequence
"""

data = string.encode("utf-8")
try:
return base64.b64decode(data, validate=True)
except binascii.Error:
# altchars for urlsafe encoded base64 - instead of + and _ instead of /
return base64.b64decode(data, altchars=b"-_", validate=True)
162 changes: 162 additions & 0 deletions tests/test_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#!/usr/bin/env python

"""Test cases for "metadata.py". """

import copy
import unittest

import securesystemslib.keys as KEYS
from securesystemslib.exceptions import (
FormatError,
UnsupportedAlgorithmError,
VerificationError,
)
from securesystemslib.metadata import Envelope
from securesystemslib.signer import Signature, SSlibKey, SSlibSigner


class TestEnvelope(unittest.TestCase):
"""Test metadata interface provided by DSSE envelope."""

@classmethod
def setUpClass(cls):
cls.key_dicts = [
KEYS.generate_rsa_key(),
KEYS.generate_ed25519_key(),
KEYS.generate_ecdsa_key(),
]

cls.signature_dict = {
"keyid": "11fa391a0ed7a447",
"sig": "30460221009342e4566528fcecf6a7a5",
}
cls.envelope_dict = {
"payload": "aGVsbG8gd29ybGQ=",
"payloadType": "http://example.com/HelloWorld",
"signatures": [cls.signature_dict],
}
cls.pae = b"DSSEv1 29 http://example.com/HelloWorld 11 hello world"

def test_envelope_from_to_dict(self):
"""Test envelope to_dict and from_dict methods."""

envelope_dict = copy.deepcopy(self.envelope_dict)

# create envelope object from its dict.
envelope_obj = Envelope.from_dict(envelope_dict)
for signature in envelope_obj.signatures:
self.assertIsInstance(signature, Signature)

# Assert envelope dict created by to_dict will be equal.
self.assertDictEqual(self.envelope_dict, envelope_obj.to_dict())

# Assert TypeError on invalid signature.
envelope_dict["signatures"] = [""]
with self.assertRaises(FormatError):
Envelope.from_dict(envelope_dict)

def test_envelope_eq_(self):
"""Test envelope equality."""

envelope_obj = Envelope.from_dict(copy.deepcopy(self.envelope_dict))

# Assert that object and None will not be equal.
self.assertNotEqual(None, envelope_obj)

# Assert a copy of envelope_obj will be equal to envelope_obj.
envelope_obj_2 = copy.deepcopy(envelope_obj)
self.assertEqual(envelope_obj, envelope_obj_2)

# Assert that changing the "payload" will make the objects not equal.
envelope_obj_2.payload = b"wrong_payload"
self.assertNotEqual(envelope_obj, envelope_obj_2)
envelope_obj_2.payload = envelope_obj.payload

# Assert that changing the "payload_type" will make the objects not equal.
envelope_obj_2.payload_type = "wrong_payload_type"
self.assertNotEqual(envelope_obj, envelope_obj_2)
envelope_obj_2.payload = envelope_obj.payload

# Assert that changing the "signatures" will make the objects not equal.
sig_obg = Signature("", self.signature_dict["sig"])
envelope_obj_2.signatures = [sig_obg]
self.assertNotEqual(envelope_obj, envelope_obj_2)

def test_preauthencoding(self):
"""Test envelope Pre-Auth-Encoding."""

envelope_obj = Envelope.from_dict(copy.deepcopy(self.envelope_dict))

# Checking for Pre-Auth-Encoding generated is correct.
self.assertEqual(self.pae, envelope_obj.pae())

def test_sign_and_verify(self):
"""Test for creating and verifying DSSE signatures."""

# Create an Envelope with no signatures.
envelope_dict = copy.deepcopy(self.envelope_dict)
envelope_dict["signatures"] = []
envelope_obj = Envelope.from_dict(envelope_dict)

key_list = []
for key_dict in self.key_dicts:
# Test for invalid scheme.
valid_scheme = key_dict["scheme"]
key_dict["scheme"] = "invalid_scheme"
signer = SSlibSigner(key_dict)
with self.assertRaises((FormatError, UnsupportedAlgorithmError)):
jku marked this conversation as resolved.
Show resolved Hide resolved
envelope_obj.sign(signer)

# Sign the payload.
key_dict["scheme"] = valid_scheme
signer = SSlibSigner(key_dict)
envelope_obj.sign(signer)

# Create a List of "Key" from key_dict.
key_list.append(SSlibKey.from_securesystemslib_key(key_dict))

# Check for signatures of Envelope.
self.assertEqual(len(self.key_dicts), len(envelope_obj.signatures))
for signature in envelope_obj.signatures:
self.assertIsInstance(signature, Signature)

# Test for invalid threshold value for keys_list.
# threshold is 0.
with self.assertRaises(ValueError):
envelope_obj.verify(key_list, 0)

# threshold is greater than no of keys.
with self.assertRaises(ValueError):
envelope_obj.verify(key_list, 4)

# Test with valid keylist and threshold.
verified_keys = envelope_obj.verify(key_list, len(key_list))
self.assertEqual(len(verified_keys), len(key_list))

# Test for unknown keys and threshold of 1.
new_key_dicts = [
KEYS.generate_rsa_key(),
KEYS.generate_ed25519_key(),
KEYS.generate_ecdsa_key(),
]
new_key_list = []
for key_dict in new_key_dicts:
new_key_list.append(SSlibKey.from_securesystemslib_key(key_dict))

with self.assertRaises(VerificationError):
envelope_obj.verify(new_key_list, 1)

all_keys = key_list + new_key_list
envelope_obj.verify(all_keys, 3)

# Test with duplicate keys.
duplicate_keys = key_list + key_list
with self.assertRaises(VerificationError):
envelope_obj.verify(
duplicate_keys, 4
) # 3 unique keys, threshold 4.


# Run the unit tests.
if __name__ == "__main__":
unittest.main()