Skip to content

Commit

Permalink
Merge pull request #1074 from sechkova/delegations-update
Browse files Browse the repository at this point in the history
Update targets delegations in generate_targets_metadata
  • Loading branch information
joshuagl authored Aug 26, 2020
2 parents 28f0b67 + b6307dd commit 7b7a14a
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 106 deletions.
94 changes: 56 additions & 38 deletions tests/test_repository_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import json
import shutil
import unittest
import copy

import tuf
import tuf.formats
Expand Down Expand Up @@ -310,11 +311,6 @@ def test_generate_root_metadata(self):
# securesystemslib.exceptions.Error exception is raised for duplicate keyids.
tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'rsa'

# Add duplicate keyid to root's roleinfo.
tuf.roledb._roledb_dict['default']['root']['keyids'].append(root_keyids[0])
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.generate_root_metadata, 1,
expires, consistent_snapshot=False)

# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_root_metadata,
'3', expires, False)
Expand Down Expand Up @@ -349,48 +345,55 @@ def test_generate_targets_metadata(self):
file_permissions = oct(os.stat(file1_path).st_mode)[4:]
target_files = {'file.txt': {'custom': {'file_permission': file_permissions}}}

delegations = {"keys": {
"a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf": {
"keytype": "ed25519",
"keyval": {
"public": "3eb81026ded5af2c61fb3d4b272ac53cd1049a810ee88f4df1fc35cdaf918157"
}
}
},
"roles": [
{
"keyids": [
"a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf"
],
"name": "targets/warehouse",
"paths": [
"/file1.txt", "/README.txt", '/warehouse/'
],
"threshold": 1
}
]
}
# Delegations data must be loaded into roledb since
# generate_targets_metadata tries to update delegations keyids
# and threshold
repository_path = os.path.join('repository_data', 'repository')
repository = repo_tool.load_repository(repository_path)
roleinfo = tuf.roledb.get_roleinfo('targets')
delegations = roleinfo['delegations']

targets_metadata = \
repo_lib.generate_targets_metadata(targets_directory, target_files,
version, expiration_date, delegations,
False)
targets_metadata = repo_lib.generate_targets_metadata(targets_directory,
target_files, version, expiration_date, delegations, False)
self.assertTrue(tuf.formats.TARGETS_SCHEMA.matches(targets_metadata))

# Valid arguments with 'delegations' set to None.
targets_metadata = \
repo_lib.generate_targets_metadata(targets_directory, target_files,
version, expiration_date, None,
False)
targets_metadata = repo_lib.generate_targets_metadata(targets_directory,
target_files, version, expiration_date, None, False)
self.assertTrue(tuf.formats.TARGETS_SCHEMA.matches(targets_metadata))

# Test update in targets' delegations
keystore_path = os.path.join('repository_data', 'keystore')
targets_public_keypath = os.path.join(keystore_path, 'targets_key.pub')
targets_public_key = securesystemslib.interface.\
import_ed25519_publickey_from_file(targets_public_keypath)

# Add new key and threshold to delegated role
repository.targets('role1').add_verification_key(targets_public_key)
repository.targets('role1').threshold = 2
role1_keyids = tuf.roledb.get_role_keyids('role1')
role1_threshold = tuf.roledb.get_role_threshold('role1')
roleinfo = tuf.roledb.get_roleinfo('targets')
delegations = roleinfo['delegations']
old_delegations = copy.deepcopy(delegations)

targets_metadata = repo_lib.generate_targets_metadata(targets_directory,
target_files, version, expiration_date, delegations, False)
self.assertNotEqual(old_delegations, delegations)
self.assertEqual(role1_keyids,
targets_metadata['delegations']['roles'][0]['keyids'])
self.assertEqual(role1_threshold,
targets_metadata['delegations']['roles'][0]['threshold'])
for keyid in role1_keyids:
self.assertIn(keyid, targets_metadata['delegations']['keys'])


# Verify that 'digest.filename' file is saved to 'targets_directory' if
# the 'write_consistent_targets' argument is True.
list_targets_directory = os.listdir(targets_directory)
targets_metadata = \
repo_lib.generate_targets_metadata(targets_directory, target_files,
version, expiration_date, delegations,
write_consistent_targets=True)
targets_metadata = repo_lib.generate_targets_metadata(targets_directory,
target_files, version, expiration_date, delegations,
write_consistent_targets=True)
new_list_targets_directory = os.listdir(targets_directory)

# Verify that 'targets_directory' contains only one extra item.
Expand Down Expand Up @@ -958,6 +961,21 @@ def test__load_top_level_metadata(self):
repository = repo_tool.create_new_repository(repository_directory, repository_name)
repo_lib._load_top_level_metadata(repository, filenames, repository_name)

# Manually add targets delegations to roledb since
# repository.write('targets') will try to update its delegations
targets_filepath = os.path.join('repository_data', 'repository',
'metadata', 'targets.json')
targets_signable = securesystemslib.util.load_json_file(targets_filepath)
delegations = targets_signable['signed']['delegations']

roleinfo = {}
roleinfo['name'] = delegations['roles'][0]['name']
roleinfo['keyids'] = delegations['roles'][0]['keyids']
roleinfo['threshold'] = delegations['roles'][0]['threshold']
roleinfo['version'] = 1
tuf.roledb.add_role('role1', roleinfo, repository_name)


# Partially write all top-level roles (we increase the threshold of each
# top-level role so that they are flagged as partially written.
repository.root.threshold = repository.root.threshold + 1
Expand Down
111 changes: 72 additions & 39 deletions tuf/repository_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,13 @@ def _generate_and_write_metadata(rolename, metadata_filename,
metadata = generate_targets_metadata(targets_directory,
roleinfo['paths'], roleinfo['version'], roleinfo['expires'],
roleinfo['delegations'], consistent_targets, use_existing_fileinfo,
storage_backend)
storage_backend, repository_name)

# Update roledb with the latest delegations info collected during
# generate_targets_metadata()
tuf.roledb.update_roleinfo(rolename, roleinfo,
repository_name=repository_name)


# Before writing 'rolename' to disk, automatically increment its version
# number (if 'increment_version_number' is True) so that the caller does not
Expand Down Expand Up @@ -1228,6 +1234,7 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
# Conformant to 'ROLEDICT_SCHEMA' and 'KEYDICT_SCHEMA', respectively.
roledict = {}
keydict = {}
keylist = []

# Extract the role, threshold, and keyid information of the top-level roles,
# which Root stores in its metadata. The necessary role metadata is generated
Expand All @@ -1239,43 +1246,11 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
raise securesystemslib.exceptions.Error(repr(rolename) + ' not in'
' "tuf.roledb".')

# Keep track of the keys loaded to avoid duplicates.
keyids = []

# Generate keys for the keyids listed by the role being processed.
for keyid in tuf.roledb.get_role_keyids(rolename, repository_name):
# Collect keys from all roles in a list
keyids = tuf.roledb.get_role_keyids(rolename, repository_name)
for keyid in keyids:
key = tuf.keydb.get_key(keyid, repository_name=repository_name)

# If 'key' is an RSA key, it would conform to
# 'securesystemslib.formats.RSAKEY_SCHEMA', and have the form:
# {'keytype': 'rsa',
# 'keyid': keyid,
# 'keyval': {'public': '-----BEGIN RSA PUBLIC KEY----- ...',
# 'private': '-----BEGIN RSA PRIVATE KEY----- ...'}}
keyid = key['keyid']
if keyid not in keydict:

# This appears to be a new keyid. Generate the key for it.
if key['keytype'] in ['rsa', 'ed25519', 'ecdsa-sha2-nistp256']:
keytype = key['keytype']
keyval = key['keyval']
scheme = key['scheme']
keydict[keyid] = \
securesystemslib.keys.format_keyval_to_metadata(keytype,
scheme, keyval, private=False)

# This is not a recognized key. Raise an exception.
else:
raise securesystemslib.exceptions.Error('Unsupported keytype:'
' ' + key['keytype'])

# Do we have a duplicate?
if keyid in keyids:
raise securesystemslib.exceptions.Error('Same keyid listed twice:'
' ' + keyid)

# Add the loaded keyid for the role being processed.
keyids.append(keyid)
keylist.append(key)

# Generate the authentication information Root establishes for each
# top-level role.
Expand All @@ -1286,6 +1261,9 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
threshold=role_threshold)
roledict[rolename] = role_metadata

# Create the root metadata 'keys' dictionary
_, keydict = keys_to_keydict(keylist)

# Use generalized build_dict_conforming_to_schema func to produce a dict that
# contains all the appropriate information for this type of metadata,
# checking that the result conforms to the appropriate schema.
Expand All @@ -1308,7 +1286,8 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,

def generate_targets_metadata(targets_directory, target_files, version,
expiration_date, delegations=None, write_consistent_targets=False,
use_existing_fileinfo=False, storage_backend=None):
use_existing_fileinfo=False, storage_backend=None,
repository_name='default'):
"""
<Purpose>
Generate the targets metadata object. The targets in 'target_files' must
Expand Down Expand Up @@ -1361,6 +1340,10 @@ def generate_targets_metadata(targets_directory, target_files, version,
An object which implements
securesystemslib.storage.StorageBackendInterface.
repository_name:
The name of the repository. If not supplied, 'default' repository
is used.
<Exceptions>
securesystemslib.exceptions.FormatError, if an error occurred trying to
generate the targets metadata object.
Expand Down Expand Up @@ -1405,6 +1388,23 @@ def generate_targets_metadata(targets_directory, target_files, version,

if delegations is not None:
tuf.formats.DELEGATIONS_SCHEMA.check_match(delegations)
# If targets role has delegations, collect the up-to-date 'keyids' and
# 'threshold' for each role. Update the delegations keys dictionary.
delegations_keys = []
# Update 'keyids' and 'threshold' for each delegated role
for role in delegations['roles']:
role['keyids'] = tuf.roledb.get_role_keyids(role['name'],
repository_name)
role['threshold'] = tuf.roledb.get_role_threshold(role['name'],
repository_name)

# Collect all delegations keys for generating the delegations keydict
for keyid in role['keyids']:
key = tuf.keydb.get_key(keyid, repository_name=repository_name)
delegations_keys.append(key)

_, delegations['keys'] = keys_to_keydict(delegations_keys)


# Store the file attributes of targets in 'target_files'. 'filedict',
# conformant to 'tuf.formats.FILEDICT_SCHEMA', is added to the
Expand Down Expand Up @@ -2254,9 +2254,42 @@ def disable_console_log_messages():



def keys_to_keydict(keys):
"""
<Purpose>
Iterate over a list of keys and return a list of keyids and a dict mapping
keyid to key metadata
<Arguments>
keys:
A list of key objects conforming to
securesystemslib.formats.ANYKEYLIST_SCHEMA.
<Returns>
keyids:
A list of keyids conforming to securesystemslib.formats.KEYID_SCHEMA
keydict:
A dictionary conforming to securesystemslib.formats.KEYDICT_SCHEMA
"""
keyids = []
keydict = {}

for key in keys:
keyid = key['keyid']
key_metadata_format = securesystemslib.keys.format_keyval_to_metadata(
key['keytype'], key['scheme'], key['keyval'])

new_keydict = {keyid: key_metadata_format}
keydict.update(new_keydict)
keyids.append(keyid)
return keyids, keydict




if __name__ == '__main__':
# The interactive sessions of the documentation strings can
# be tested by running repository_tool.py as a standalone module:
# be tested by running repository_lib.py as a standalone module:
# $ python repository_lib.py.
import doctest
doctest.testmod()
38 changes: 9 additions & 29 deletions tuf/repository_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2377,7 +2377,7 @@ def delegate(self, rolename, public_keys, paths, threshold=1,

# Keep track of the valid keyids (added to the new Targets object) and
# their keydicts (added to this Targets delegations).
keyids, keydict = _keys_to_keydict(public_keys)
keyids, keydict = repo_lib.keys_to_keydict(public_keys)

# Ensure the paths of 'list_of_targets' are located in the repository's
# targets directory.
Expand Down Expand Up @@ -2612,7 +2612,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins,
hash_prefix = repo_lib.get_target_hash(target_path)[:prefix_length]
ordered_roles[int(hash_prefix, 16) // bin_size]["target_paths"].append(target_path)

keyids, keydict = _keys_to_keydict(keys_of_hashed_bins)
keyids, keydict = repo_lib.keys_to_keydict(keys_of_hashed_bins)

# A queue of roleinfo's that need to be updated in the roledb
delegated_roleinfos = []
Expand Down Expand Up @@ -2857,30 +2857,6 @@ def _check_path(self, pathname):




def _keys_to_keydict(keys):
"""
Iterate over a list of keys and return a list of keyids and a dict mapping
keyid to key metadata
"""
keyids = []
keydict = {}

for key in keys:
keyid = key['keyid']
key_metadata_format = securesystemslib.keys.format_keyval_to_metadata(
key['keytype'], key['scheme'], key['keyval'])

new_keydict = {keyid: key_metadata_format}
keydict.update(new_keydict)
keyids.append(keyid)

return keyids, keydict





def create_new_repository(repository_directory, repository_name='default',
storage_backend=None, use_timestamp_length=True, use_timestamp_hashes=True,
use_snapshot_length=False, use_snapshot_hashes=False):
Expand Down Expand Up @@ -3111,12 +3087,14 @@ def load_repository(repository_directory, repository_name='default',
# [('role1', 'targets'), ('role2', 'targets'), ... ]
roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
for role in roleinfo['delegations']['roles']:
delegations.append((role['name'], 'targets'))
delegations.append((role, 'targets'))

# Traverse the graph by appending the next delegation to the deque and
# 'pop'-ing and loading the left-most element.
while delegations:
rolename, delegating_role = delegations.popleft()
delegation_info, delegating_role = delegations.popleft()

rolename = delegation_info['name']
if (rolename, delegating_role) in loaded_delegations:
logger.warning('Detected cycle in the delegation graph: ' +
repr(delegating_role) + ' -> ' +
Expand Down Expand Up @@ -3156,6 +3134,8 @@ def load_repository(repository_directory, repository_name='default',
roleinfo['expires'] = metadata_object['expires']
roleinfo['paths'] = metadata_object['targets']
roleinfo['delegations'] = metadata_object['delegations']
roleinfo['threshold'] = delegation_info['threshold']
roleinfo['keyids'] = delegation_info['keyids']

# Generate the Targets object of the delegated role,
# add it to the top-level 'targets' object and to its
Expand All @@ -3173,7 +3153,7 @@ def load_repository(repository_directory, repository_name='default',
# Append the next level delegations to the deque:
# the 'delegated' role becomes the 'delegating'
for delegation in metadata_object['delegations']['roles']:
delegations.append((delegation['name'], rolename))
delegations.append((delegation, rolename))

# Extract the keys specified in the delegations field of the Targets
# role. Add 'key_object' to the list of recognized keys. Keys may be
Expand Down

0 comments on commit 7b7a14a

Please sign in to comment.