Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata API: add get_verification_result method #2481

Merged
merged 1 commit into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
TargetFile,
Targets,
Timestamp,
VerificationResult,
)
from tuf.api.serialization import DeserializationError, SerializationError
from tuf.api.serialization.json import JSONSerializer
Expand Down Expand Up @@ -470,6 +471,96 @@ def test_signed_verify_delegate(self) -> None:
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
)

def test_signed_get_verification_result(self) -> None:
# Setup: Load test metadata and keys
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path)
initial_root_keyids = root.signed.roles[Root.type].keyids
self.assertEqual(len(initial_root_keyids), 1)
key1_id = initial_root_keyids[0]
key2 = self.keystore[Timestamp.type]
key2_id = key2["keyid"]
key3_id = "123456789abcdefg"
key4 = self.keystore[Snapshot.type]
key4_id = key4["keyid"]

# Test: 1 authorized key, 1 valid signature
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertTrue(result.verified)
self.assertEqual(result.signed, {key1_id})
self.assertEqual(result.unsigned, set())

# Test: 2 authorized keys, 1 invalid signature
# Adding a key, i.e. metadata change, invalidates existing signature
root.signed.add_key(
SSlibKey.from_securesystemslib_key(key2),
Root.type,
)
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertFalse(result.verified)
self.assertEqual(result.signed, set())
self.assertEqual(result.unsigned, {key1_id, key2_id})

# Test: 3 authorized keys, 1 invalid signature, 1 key missing key data
# Adding a keyid w/o key, fails verification the same as no signature
# or an invalid signature for that key
root.signed.roles[Root.type].keyids.append(key3_id)
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertFalse(result.verified)
self.assertEqual(result.signed, set())
self.assertEqual(result.unsigned, {key1_id, key2_id, key3_id})

# Test: 3 authorized keys, 1 valid signature, 1 invalid signature, 1
# key missing key data
root.sign(SSlibSigner(key2), append=True)
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertTrue(result.verified)
self.assertEqual(result.signed, {key2_id})
self.assertEqual(result.unsigned, {key1_id, key3_id})

# Test: 3 authorized keys, 1 valid signature, 1 invalid signature, 1
# key missing key data, 1 ignored unrelated signature
root.sign(SSlibSigner(key4), append=True)
self.assertEqual(
set(root.signatures.keys()), {key1_id, key2_id, key4_id}
)
self.assertTrue(result.verified)
self.assertEqual(result.signed, {key2_id})
self.assertEqual(result.unsigned, {key1_id, key3_id})

# See test_signed_verify_delegate for more related tests ...

def test_signed_verification_result_union(self) -> None:
# Test all possible "unions" (AND) of "verified" field
data = [
(True, True, True),
(True, False, False),
(False, True, False),
(False, False, False),
]

for a_part, b_part, ab_part in data:
self.assertEqual(
VerificationResult(a_part, set(), set()).union(
VerificationResult(b_part, set(), set())
),
VerificationResult(ab_part, set(), set()),
)

# Test exemplary union (|) of "signed" and "unsigned" fields
a = VerificationResult(True, {"1"}, {"2"})
b = VerificationResult(True, {"3"}, {"4"})
ab = VerificationResult(True, {"1", "3"}, {"2", "4"})
self.assertEqual(a.union(b), ab)

def test_key_class(self) -> None:
# Test if from_securesystemslib_key removes the private key from keyval
# of a securesystemslib key dictionary.
Expand Down
92 changes: 78 additions & 14 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io
import logging
import tempfile
from dataclasses import dataclass
from datetime import datetime
from typing import (
IO,
Expand All @@ -43,6 +44,7 @@
Iterator,
List,
Optional,
Set,
Tuple,
Type,
TypeVar,
Expand Down Expand Up @@ -646,6 +648,37 @@ def to_dict(self) -> Dict[str, Any]:
}


@dataclass
class VerificationResult:
"""Signature verification result for delegated role metadata.

Attributes:
verified: True, if threshold of signatures is met.
signed: Set of delegated keyids, which validly signed.
unsigned: Set of delegated keyids, which did not validly sign.

"""

verified: bool
signed: Set[str]
unsigned: Set[str]

def __bool__(self) -> bool:
return self.verified

def union(self, other: "VerificationResult") -> "VerificationResult":
"""Combine two verification results.

Can be used to verify, if root metadata is signed by the threshold of
keys of previous root and the threshold of keys of itself.
"""
return VerificationResult(
self.verified and other.verified,
self.signed | other.signed,
self.unsigned | other.unsigned,
)


class _DelegatorMixin(metaclass=abc.ABCMeta):
"""Class that implements verify_delegate() for Root and Targets"""

Expand All @@ -665,54 +698,85 @@ def get_key(self, keyid: str) -> Key:
"""
raise NotImplementedError

def verify_delegate(
def get_verification_result(
self,
delegated_role: str,
payload: bytes,
signatures: Dict[str, Signature],
) -> None:
"""Verify signature threshold for delegated role.
) -> VerificationResult:
"""Return signature threshold verification result for delegated role.

Verify that there are enough valid ``signatures`` over ``payload``, to
meet the threshold of keys for ``delegated_role``, as defined by the
delegator (``self``).
NOTE: Unlike `verify_delegate()` this method does not raise, if the
role metadata is not fully verified.

Args:
delegated_role: Name of the delegated role to verify
payload: Signed payload bytes for the delegated role
signatures: Signatures over payload bytes

Raises:
UnsignedMetadataError: ``delegated_role`` was not signed with
required threshold of keys for ``role_name``.
ValueError: no delegation was found for ``delegated_role``.
"""
role = self.get_delegated_role(delegated_role)

# verify that delegated_metadata is signed by threshold of unique keys
signing_keys = set()
signed = set()
unsigned = set()

for keyid in role.keyids:
try:
key = self.get_key(keyid)
except ValueError:
unsigned.add(keyid)
logger.info("No key for keyid %s", keyid)
continue

if keyid not in signatures:
unsigned.add(keyid)
logger.info("No signature for keyid %s", keyid)
continue

sig = signatures[keyid]
try:
key.verify_signature(sig, payload)
signing_keys.add(keyid)
signed.add(keyid)
except sslib_exceptions.UnverifiedSignatureError:
unsigned.add(keyid)
logger.info("Key %s failed to verify %s", keyid, delegated_role)

if len(signing_keys) < role.threshold:
return VerificationResult(
len(signed) >= role.threshold, signed, unsigned
)

def verify_delegate(
self,
delegated_role: str,
payload: bytes,
signatures: Dict[str, Signature],
) -> None:
"""Verify signature threshold for delegated role.

Verify that there are enough valid ``signatures`` over ``payload``, to
meet the threshold of keys for ``delegated_role``, as defined by the
delegator (``self``).

Args:
delegated_role: Name of the delegated role to verify
payload: Signed payload bytes for the delegated role
signatures: Signatures over payload bytes

Raises:
UnsignedMetadataError: ``delegated_role`` was not signed with
required threshold of keys for ``role_name``.
ValueError: no delegation was found for ``delegated_role``.
"""
result = self.get_verification_result(
delegated_role, payload, signatures
)
if not result:
role = self.get_delegated_role(delegated_role)
raise UnsignedMetadataError(
f"{delegated_role} was signed by {len(signing_keys)}/"
f"{role.threshold} keys",
f"{delegated_role} was signed by {len(result.signed)}/"
f"{role.threshold} keys"
)


Expand Down
Loading