Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic DSSE equivalent for Metadata API and configurable DSSE support in ngclient #2436

Merged
merged 16 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/uploader/_localrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ def open(self, role: str) -> Metadata:
# if there is a metadata version fetched from remote, use that
# HACK: access Updater internals
if role in self.updater._trusted_set:
return copy.deepcopy(self.updater._trusted_set[role])
# NOTE: The original signature wrapper (Metadata) was verified and
# discarded upon inclusion in the trusted set. It is safe to use
# a fresh wrapper. `close` will override existing signatures anyway.
return Metadata(copy.deepcopy(self.updater._trusted_set[role]))

# otherwise we're creating metadata from scratch
md = Metadata(Targets())
Expand Down
91 changes: 91 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import unittest
from copy import copy, deepcopy
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, ClassVar, Dict, Optional

from securesystemslib import exceptions as sslib_exceptions
Expand All @@ -33,6 +34,7 @@

from tests import utils
from tuf.api import exceptions
from tuf.api.dsse import SimpleEnvelope
from tuf.api.metadata import (
TOP_LEVEL_ROLE_NAMES,
DelegatedRole,
Expand Down Expand Up @@ -1144,6 +1146,95 @@ def test_delegations_get_delegated_role(self) -> None:
)


class TestSimpleEnvelope(unittest.TestCase):
"""Tests for public API in 'tuf/api/dsse.py'."""

@classmethod
def setUpClass(cls) -> None:
repo_data_dir = Path(utils.TESTS_DIR) / "repository_data"
cls.metadata_dir = repo_data_dir / "repository" / "metadata"
cls.signer_store = {}
for role in [Snapshot, Targets, Timestamp]:
key_path = repo_data_dir / "keystore" / f"{role.type}_key"
key = import_ed25519_privatekey_from_file(
str(key_path),
password="password",
)
cls.signer_store[role.type] = SSlibSigner(key)

def test_serialization(self) -> None:
"""Basic de/serialization test.

1. Load test metadata for each role
2. Wrap metadata payloads in envelope serializing the payload
3. Serialize envelope
4. De-serialize envelope
5. De-serialize payload

"""
for role in [Root, Timestamp, Snapshot, Targets]:
metadata_path = self.metadata_dir / f"{role.type}.json"
metadata = Metadata.from_file(str(metadata_path))
self.assertIsInstance(metadata.signed, role)

envelope = SimpleEnvelope.from_signed(metadata.signed)
envelope_bytes = envelope.to_bytes()

envelope2 = SimpleEnvelope.from_bytes(envelope_bytes)
payload = envelope2.get_signed()
self.assertEqual(metadata.signed, payload)

def test_fail_envelope_serialization(self) -> None:
envelope = SimpleEnvelope(b"foo", "bar", ["baz"])
with self.assertRaises(SerializationError):
envelope.to_bytes()

def test_fail_envelope_deserialization(self) -> None:
with self.assertRaises(DeserializationError):
SimpleEnvelope.from_bytes(b"[")

def test_fail_payload_serialization(self) -> None:
with self.assertRaises(SerializationError):
SimpleEnvelope.from_signed("foo") # type: ignore

def test_fail_payload_deserialization(self) -> None:
payloads = [b"[", b'{"_type": "foo"}']
for payload in payloads:
envelope = SimpleEnvelope(payload, "bar", [])
with self.assertRaises(DeserializationError):
envelope.get_signed()

def test_verify_delegate(self) -> None:
"""Basic verification test.

1. Load test metadata for each role
2. Wrap non-root payloads in envelope serializing the payload
3. Sign with correct delegated key
4. Verify delegate with root

"""
root_path = self.metadata_dir / "root.json"
root = Metadata[Root].from_file(str(root_path)).signed

for role in [Timestamp, Snapshot, Targets]:
metadata_path = self.metadata_dir / f"{role.type}.json"
metadata = Metadata.from_file(str(metadata_path))
self.assertIsInstance(metadata.signed, role)

signer = self.signer_store[role.type]
self.assertIn(
signer.key_dict["keyid"], root.roles[role.type].keyids
)

envelope = SimpleEnvelope.from_signed(metadata.signed)
envelope.sign(signer)
self.assertTrue(len(envelope.signatures) == 1)

root.verify_delegate(
role.type, envelope.pae(), envelope.signatures_dict
)


# Run unit test.
if __name__ == "__main__":
utils.configure_test_logging(sys.argv)
Expand Down
117 changes: 93 additions & 24 deletions tests/test_trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@

from tests import utils
from tuf.api import exceptions
from tuf.api.dsse import SimpleEnvelope
from tuf.api.metadata import (
Metadata,
MetaFile,
Root,
Signed,
Snapshot,
Targets,
Timestamp,
)
from tuf.api.serialization.json import JSONSerializer
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
from tuf.ngclient._internal.trusted_metadata_set import (
TrustedMetadataSet,
_load_from_simple_envelope,
)
from tuf.ngclient.config import EnvelopeType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -93,7 +99,9 @@ def hashes_length_modifier(timestamp: Timestamp) -> None:
)

def setUp(self) -> None:
self.trusted_set = TrustedMetadataSet(self.metadata[Root.type])
self.trusted_set = TrustedMetadataSet(
self.metadata[Root.type], EnvelopeType.METADATA
)

def _update_all_besides_targets(
self,
Expand Down Expand Up @@ -132,7 +140,7 @@ def test_update(self) -> None:

count = 0
for md in self.trusted_set:
self.assertIsInstance(md, Metadata)
self.assertIsInstance(md, Signed)
count += 1

self.assertTrue(count, 6)
Expand All @@ -149,11 +157,11 @@ def test_update_metadata_output(self) -> None:
delegeted_targets_2 = self.trusted_set.update_delegated_targets(
self.metadata["role2"], "role2", "role1"
)
self.assertIsInstance(timestamp.signed, Timestamp)
self.assertIsInstance(snapshot.signed, Snapshot)
self.assertIsInstance(targets.signed, Targets)
self.assertIsInstance(delegeted_targets_1.signed, Targets)
self.assertIsInstance(delegeted_targets_2.signed, Targets)
self.assertIsInstance(timestamp, Timestamp)
self.assertIsInstance(snapshot, Snapshot)
self.assertIsInstance(targets, Targets)
self.assertIsInstance(delegeted_targets_1, Targets)
self.assertIsInstance(delegeted_targets_2, Targets)

def test_out_of_order_ops(self) -> None:
# Update snapshot before timestamp
Expand Down Expand Up @@ -192,25 +200,40 @@ def test_out_of_order_ops(self) -> None:
self.metadata["role1"], "role1", Targets.type
)

def test_root_with_invalid_json(self) -> None:
# Test loading initial root and root update
for test_func in [TrustedMetadataSet, self.trusted_set.update_root]:
# root is not json
with self.assertRaises(exceptions.RepositoryError):
test_func(b"")
def test_bad_initial_root(self) -> None:
# root is not json
with self.assertRaises(exceptions.RepositoryError):
TrustedMetadataSet(b"", EnvelopeType.METADATA)

# root is invalid
root = Metadata.from_bytes(self.metadata[Root.type])
root.signed.version += 1
with self.assertRaises(exceptions.UnsignedMetadataError):
test_func(root.to_bytes())
# root is invalid
root = Metadata.from_bytes(self.metadata[Root.type])
root.signed.version += 1
with self.assertRaises(exceptions.UnsignedMetadataError):
TrustedMetadataSet(root.to_bytes(), EnvelopeType.METADATA)

# metadata is of wrong type
with self.assertRaises(exceptions.RepositoryError):
test_func(self.metadata[Snapshot.type])
# metadata is of wrong type
with self.assertRaises(exceptions.RepositoryError):
TrustedMetadataSet(
self.metadata[Snapshot.type], EnvelopeType.METADATA
)

def test_bad_root_update(self) -> None:
# root is not json
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_root(b"")

# root is invalid
root = Metadata.from_bytes(self.metadata[Root.type])
root.signed.version += 1
with self.assertRaises(exceptions.UnsignedMetadataError):
self.trusted_set.update_root(root.to_bytes())

# metadata is of wrong type
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_root(self.metadata[Snapshot.type])

def test_top_level_md_with_invalid_json(self) -> None:
top_level_md: List[Tuple[bytes, Callable[[bytes], Metadata]]] = [
top_level_md: List[Tuple[bytes, Callable[[bytes], Signed]]] = [
(self.metadata[Timestamp.type], self.trusted_set.update_timestamp),
(self.metadata[Snapshot.type], self.trusted_set.update_snapshot),
(self.metadata[Targets.type], self.trusted_set.update_targets),
Expand Down Expand Up @@ -260,7 +283,7 @@ def root_expired_modifier(root: Root) -> None:

# intermediate root can be expired
root = self.modify_metadata(Root.type, root_expired_modifier)
tmp_trusted_set = TrustedMetadataSet(root)
tmp_trusted_set = TrustedMetadataSet(root, EnvelopeType.METADATA)
# update timestamp to trigger final root expiry check
with self.assertRaises(exceptions.ExpiredMetadataError):
tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type])
Expand Down Expand Up @@ -471,6 +494,52 @@ def target_expired_modifier(target: Targets) -> None:

# TODO test updating over initial metadata (new keys, newer timestamp, etc)

def test_load_from_simple_envelope(self) -> None:
"""Basic unit test for ``_load_from_simple_envelope`` helper.

TODO: Test via trusted metadata set tests like for traditional metadata
lukpueh marked this conversation as resolved.
Show resolved Hide resolved
"""
metadata = Metadata.from_bytes(self.metadata[Root.type])
root = metadata.signed
envelope = SimpleEnvelope.from_signed(root)

# Unwrap unsigned envelope without verification
envelope_bytes = envelope.to_bytes()
payload_obj, signed_bytes, signatures = _load_from_simple_envelope(
Root, envelope_bytes
)

self.assertEqual(payload_obj, root)
self.assertEqual(signed_bytes, envelope.pae())
self.assertDictEqual(signatures, {})

# Unwrap correctly signed envelope (use default role name)
sig = envelope.sign(self.keystore[Root.type])
envelope_bytes = envelope.to_bytes()
_, _, signatures = _load_from_simple_envelope(
Root, envelope_bytes, root
)
self.assertDictEqual(signatures, {sig.keyid: sig})

# Load correctly signed envelope (with explicit role name)
_, _, signatures = _load_from_simple_envelope(
Root, envelope.to_bytes(), root, Root.type
)
self.assertDictEqual(signatures, {sig.keyid: sig})

# Fail load envelope with unexpected 'payload_type'
envelope_bad_type = SimpleEnvelope.from_signed(root)
envelope_bad_type.payload_type = "foo"
envelope_bad_type_bytes = envelope_bad_type.to_bytes()
with self.assertRaises(exceptions.RepositoryError):
_load_from_simple_envelope(Root, envelope_bad_type_bytes)

# Fail load envelope with unexpected payload type
envelope_bad_signed = SimpleEnvelope.from_signed(root)
envelope_bad_signed_bytes = envelope_bad_signed.to_bytes()
with self.assertRaises(exceptions.RepositoryError):
_load_from_simple_envelope(Targets, envelope_bad_signed_bytes)


if __name__ == "__main__":
utils.configure_test_logging(sys.argv)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_updater_ng.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def test_updating_root(self) -> None:
# Bump root version, resign and refresh
self._modify_repository_root(lambda root: None, bump_version=True)
self.updater.refresh()
self.assertEqual(self.updater._trusted_set.root.signed.version, 2)
self.assertEqual(self.updater._trusted_set.root.version, 2)

def test_missing_targetinfo(self) -> None:
self.updater.refresh()
Expand Down
Loading
Loading