Skip to content

Commit

Permalink
WIP: Add rolename to Metadata
Browse files Browse the repository at this point in the history
* Lot of changes in API
* Advantages are smaller than expected
* from_bytes() requires a rolename: this could be inferred
  for other roles but not for targets... but this seems hard to
  express in the API
* If Signed implementations had Metadata constructors (as in the
  Generics branch) the root/snapshot/timestamp implementations would
  automatically be able to infer the rolename but again... targets
  would still need it in the API so either the targets constructor
  would be different or there would be an optional rolename
  (that you would only need for delegated targets)

Signed-off-by: Jussi Kukkonen <[email protected]>
  • Loading branch information
Jussi Kukkonen committed Jul 12, 2021
1 parent bd5912b commit 20662aa
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 46 deletions.
39 changes: 18 additions & 21 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_generic_read(self):
path = os.path.join(self.repo_dir, 'metadata', metadata + '.json')
metadata_obj = Metadata.from_file(path)
with open(path, 'rb') as f:
metadata_obj2 = Metadata.from_bytes(f.read())
metadata_obj2 = Metadata.from_bytes(metadata, f.read())

# Assert that both methods instantiate the right inner class for
# each metadata type and ...
Expand All @@ -130,7 +130,7 @@ def test_generic_read(self):
with self.assertRaises(DeserializationError):
Metadata.from_file(bad_metadata_path)
with self.assertRaises(DeserializationError):
Metadata.from_bytes(bad_string)
Metadata.from_bytes(metadata, bad_string)

os.remove(bad_metadata_path)

Expand All @@ -150,7 +150,8 @@ def test_read_write_read_compare(self):

path_2 = path + '.tmp'
metadata_obj.to_file(path_2)
metadata_obj_2 = Metadata.from_file(path_2)
# unusual filename: specify rolename explicitly
metadata_obj_2 = Metadata.from_file(path_2, rolename=metadata)

self.assertDictEqual(
metadata_obj.to_dict(),
Expand Down Expand Up @@ -274,7 +275,7 @@ def test_metadata_base(self):
data = md.to_dict()
data["signatures"].append({"keyid": data["signatures"][0]["keyid"], "sig": "foo"})
with self.assertRaises(ValueError):
Metadata.from_dict(data)
Metadata.from_dict(md.rolename, data)


def test_metadata_snapshot(self):
Expand Down Expand Up @@ -351,53 +352,49 @@ def test_metadata_verify_delegate(self):
role2 = Metadata.from_file(role2_path)

# test the expected delegation tree
root.verify_delegate('root', root)
root.verify_delegate('snapshot', snapshot)
root.verify_delegate('targets', targets)
targets.verify_delegate('role1', role1)
role1.verify_delegate('role2', role2)
root.verify_delegate(root)
root.verify_delegate(snapshot)
root.verify_delegate(targets)
targets.verify_delegate(role1)
role1.verify_delegate(role2)

# only root and targets can verify delegates
with self.assertRaises(TypeError):
snapshot.verify_delegate('snapshot', snapshot)
snapshot.verify_delegate(snapshot)
# verify fails for roles that are not delegated by delegator
with self.assertRaises(ValueError):
root.verify_delegate('role1', role1)
root.verify_delegate(role1)
with self.assertRaises(ValueError):
targets.verify_delegate('targets', targets)
targets.verify_delegate(targets)
# verify fails when delegator has no delegations
with self.assertRaises(ValueError):
role2.verify_delegate('role1', role1)
role2.verify_delegate(role1)

# verify fails when delegate content is modified
expires = snapshot.signed.expires
snapshot.signed.bump_expiration()
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate('snapshot', snapshot)
root.verify_delegate(snapshot)
snapshot.signed.expires = expires

# verify fails if roles keys do not sign the metadata
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate('timestamp', snapshot)

# Add a key to snapshot role, make sure the new sig fails to verify
ts_keyid = next(iter(root.signed.roles["timestamp"].keyids))
root.signed.add_key("snapshot", root.signed.keys[ts_keyid])
snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff"*64)

# verify succeeds if threshold is reached even if some signatures
# fail to verify
root.verify_delegate('snapshot', snapshot)
root.verify_delegate(snapshot)

# verify fails if threshold of signatures is not reached
root.signed.roles['snapshot'].threshold = 2
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate('snapshot', snapshot)
root.verify_delegate(snapshot)

# verify succeeds when we correct the new signature and reach the
# threshold of 2 keys
snapshot.sign(SSlibSigner(self.keystore['timestamp']), append=True)
root.verify_delegate('snapshot', snapshot)
root.verify_delegate(snapshot)


def test_key_class(self):
Expand Down
14 changes: 7 additions & 7 deletions tests/test_trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_update_with_invalid_json(self):
with self.assertRaises(exceptions.RepositoryError):
TrustedMetadataSet(b"")
# root.json is invalid
root = Metadata.from_bytes(data["root"])
root = Metadata.from_bytes("root", data["root"])
root.signed.version += 1
with self.assertRaises(exceptions.RepositoryError):
TrustedMetadataSet(json.dumps(root.to_dict()).encode())
Expand All @@ -94,16 +94,16 @@ def test_update_with_invalid_json(self):
trusted_set.root_update_finished()

top_level_md = [
(data["timestamp"], trusted_set.update_timestamp),
(data["snapshot"], trusted_set.update_snapshot),
(data["targets"], trusted_set.update_targets),
("timestamp", trusted_set.update_timestamp),
("snapshot", trusted_set.update_snapshot),
("targets", trusted_set.update_targets),
]
for metadata, update_func in top_level_md:
for rolename, update_func in top_level_md:
# metadata is not json
with self.assertRaises(exceptions.RepositoryError):
update_func(b"")
# metadata is invalid
md = Metadata.from_bytes(metadata)
md = Metadata.from_bytes(rolename, data[rolename])
md.signed.version += 1
with self.assertRaises(exceptions.RepositoryError):
update_func(json.dumps(md.to_dict()).encode())
Expand All @@ -112,7 +112,7 @@ def test_update_with_invalid_json(self):
with self.assertRaises(exceptions.RepositoryError):
update_func(data["root"])

update_func(metadata)
update_func(data[rolename])


# TODO test updating over initial metadata (new keys, newer timestamp, etc)
Expand Down
38 changes: 27 additions & 11 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import abc
import io
import logging
import os.path
import tempfile
from collections import OrderedDict
from datetime import datetime, timedelta
Expand Down Expand Up @@ -68,19 +69,22 @@ class Metadata:
i.e. one of Targets, Snapshot, Timestamp or Root.
signatures: An ordered dictionary of keyids to Signature objects, each
signing the canonical serialized representation of 'signed'.
rolename: Name of the metadata role (e.g. "root")
"""

def __init__(
self, signed: "Signed", signatures: "OrderedDict[str, Signature]"
self, rolename: str, signed: "Signed", signatures: "OrderedDict[str, Signature]"
):
self.rolename = rolename
self.signed = signed
self.signatures = signatures

@classmethod
def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata":
def from_dict(cls, rolename: str, metadata: Dict[str, Any]) -> "Metadata":
"""Creates Metadata object from its dict representation.
Arguments:
rolename: Name of the metadata role
metadata: TUF metadata in dict representation.
Raises:
Expand All @@ -99,14 +103,16 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata":

if _type == "targets":
inner_cls: Type[Signed] = Targets
elif _type == "snapshot":
elif _type == "snapshot" and rolename == "snapshot":
inner_cls = Snapshot
elif _type == "timestamp":
elif _type == "timestamp" and rolename == "timestamp":
inner_cls = Timestamp
elif _type == "root":
elif _type == "root" and rolename == "root":
inner_cls = Root
else:
raise ValueError(f'unrecognized metadata type "{_type}"')
raise ValueError(
f'unrecognized metadata type "{_type}" for role "{rolename}"'
)

# Make sure signatures are unique
signatures: "OrderedDict[str, Signature]" = OrderedDict()
Expand All @@ -119,6 +125,7 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata":
signatures[sig.keyid] = sig

return cls(
rolename=rolename,
signed=inner_cls.from_dict(metadata.pop("signed")),
signatures=signatures,
)
Expand All @@ -129,17 +136,21 @@ def from_file(
filename: str,
deserializer: Optional[MetadataDeserializer] = None,
storage_backend: Optional[StorageBackendInterface] = None,
rolename: Optional[str] = None,
) -> "Metadata":
"""Loads TUF metadata from file storage.
Arguments:
filename: The path to read the file from.
filename: The path to read the file from. Filename is expected to be of type
<rolename>.<ext>: e.g. "targets.json"
deserializer: A MetadataDeserializer subclass instance that
implements the desired wireline format deserialization. Per
default a JSONDeserializer is used.
storage_backend: An object that implements
securesystemslib.storage.StorageBackendInterface. Per default
a (local) FilesystemBackend is used.
rolename: Name of the metadata role. By default rolename is the
basename of the filename
Raises:
securesystemslib.exceptions.StorageError: The file cannot be read.
Expand All @@ -153,17 +164,23 @@ def from_file(
if storage_backend is None:
storage_backend = FilesystemBackend()

if rolename is None:
basename = os.path.basename(filename)
rolename, ext = os.path.splitext(basename)

with storage_backend.get(filename) as file_obj:
return cls.from_bytes(file_obj.read(), deserializer)
return cls.from_bytes(rolename, file_obj.read(), deserializer)

@staticmethod
def from_bytes(
rolename: str,
data: bytes,
deserializer: Optional[MetadataDeserializer] = None,
) -> "Metadata":
"""Loads TUF metadata from raw data.
Arguments:
rolename: Name of the metadata role
data: metadata content as bytes.
deserializer: Optional; A MetadataDeserializer instance that
implements deserialization. Default is JSONDeserializer.
Expand All @@ -183,7 +200,7 @@ def from_bytes(

deserializer = JSONDeserializer()

return deserializer.deserialize(data)
return deserializer.deserialize(rolename, data)

def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self."""
Expand Down Expand Up @@ -271,15 +288,13 @@ def sign(

def verify_delegate(
self,
delegated_role: str,
delegated_metadata: "Metadata",
signed_serializer: Optional[SignedSerializer] = None,
) -> None:
"""Verifies that 'delegated_metadata' is signed with the required
threshold of keys for the delegated role 'delegated_role'.
Args:
delegated_role: Name of the delegated role to verify
delegated_metadata: The Metadata object for the delegated role
signed_serializer: Optional; serializer used for delegate
serialization. Default is CanonicalJSONSerializer.
Expand All @@ -291,6 +306,7 @@ def verify_delegate(

# Find the keys and role in delegator metadata
role = None
delegated_role = delegated_metadata.rolename
if isinstance(self.signed, Root):
keys = self.signed.keys
role = self.signed.roles.get(delegated_role)
Expand Down
2 changes: 1 addition & 1 deletion tuf/api/serialization/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class MetadataDeserializer(metaclass=abc.ABCMeta):
"""Abstract base class for deserialization of Metadata objects."""

@abc.abstractmethod
def deserialize(self, raw_data: bytes) -> "Metadata":
def deserialize(self, rolename: str, raw_data: bytes) -> "Metadata":
"""Deserialize passed bytes to Metadata object."""
raise NotImplementedError

Expand Down
4 changes: 2 additions & 2 deletions tuf/api/serialization/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
class JSONDeserializer(MetadataDeserializer):
"""Provides JSON to Metadata deserialize method."""

def deserialize(self, raw_data: bytes) -> Metadata:
def deserialize(self, rolename: str, raw_data: bytes) -> Metadata:
"""Deserialize utf-8 encoded JSON bytes into Metadata object."""
try:
json_dict = json.loads(raw_data.decode("utf-8"))
metadata_obj = Metadata.from_dict(json_dict)
metadata_obj = Metadata.from_dict(rolename, json_dict)

except Exception as e: # pylint: disable=broad-except
raise DeserializationError from e
Expand Down
8 changes: 4 additions & 4 deletions tuf/ngclient/_internal/trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def update_root(self, data: bytes):
logger.debug("Updating root")

try:
new_root = Metadata.from_bytes(data)
new_root = Metadata.from_bytes("root", data)
except DeserializationError as e:
raise exceptions.RepositoryError("Failed to load root") from e

Expand Down Expand Up @@ -261,7 +261,7 @@ def update_timestamp(self, data: bytes):
raise RuntimeError("Cannot update timestamp after snapshot")

try:
new_timestamp = Metadata.from_bytes(data)
new_timestamp = Metadata.from_bytes("timestamp", data)
except DeserializationError as e:
raise exceptions.RepositoryError("Failed to load timestamp") from e

Expand Down Expand Up @@ -330,7 +330,7 @@ def update_snapshot(self, data: bytes):
) from e

try:
new_snapshot = Metadata.from_bytes(data)
new_snapshot = Metadata.from_bytes("snapshot", data)
except DeserializationError as e:
raise exceptions.RepositoryError("Failed to load snapshot") from e

Expand Down Expand Up @@ -429,7 +429,7 @@ def update_delegated_targets(
) from e

try:
new_delegate = Metadata.from_bytes(data)
new_delegate = Metadata.from_bytes(role_name, data)
except DeserializationError as e:
raise exceptions.RepositoryError("Failed to load snapshot") from e

Expand Down

0 comments on commit 20662aa

Please sign in to comment.