Skip to content

Commit

Permalink
Add missing type annotations to updater
Browse files Browse the repository at this point in the history
Add missing annotations and resolve partially the mypy
errors in updater.py and trusted_metadata_set.py

Signed-off-by: Teodora Sechkova <[email protected]>
  • Loading branch information
sechkova committed Jul 9, 2021
1 parent 4e3d6e4 commit e4109a0
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 38 deletions.
22 changes: 11 additions & 11 deletions tuf/ngclient/_internal/trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@
import logging
from collections import abc
from datetime import datetime
from typing import Dict, Iterator, Optional
from typing import Dict, Iterator, Mapping, Optional

from tuf import exceptions
from tuf.api.metadata import Metadata, Root, Targets
from tuf.api.metadata import Key, Metadata, Root, Targets
from tuf.api.serialization import DeserializationError

logger = logging.getLogger(__name__)
Expand All @@ -83,7 +83,7 @@ def verify_with_threshold(
) -> bool:
"""Verify 'unverified' with keys and threshold defined in delegator"""
role = None
keys = {}
keys: Mapping[str, Key] = {}
if isinstance(delegator.signed, Root):
keys = delegator.signed.keys
role = delegator.signed.roles.get(role_name)
Expand Down Expand Up @@ -133,7 +133,7 @@ def __init__(self, root_data: bytes):
RepositoryError: Metadata failed to load or verify. The actual
error type and content will contain more details.
"""
self._trusted_set = {} # type: Dict[str: Metadata]
self._trusted_set = {} # type: Dict[str, Metadata]
self.reference_time = datetime.utcnow()
self._root_update_finished = False

Expand All @@ -152,7 +152,7 @@ def __len__(self) -> int:

def __iter__(self) -> Iterator[Metadata]:
"""Returns iterator over all Metadata objects in TrustedMetadataSet"""
return iter(self._trusted_set)
return iter(self._trusted_set.values())

# Helper properties for top level metadata
@property
Expand All @@ -176,7 +176,7 @@ def targets(self) -> Optional[Metadata]:
return self._trusted_set.get("targets")

# Methods for updating metadata
def update_root(self, data: bytes):
def update_root(self, data: bytes) -> None:
"""Verifies and loads 'data' as new root metadata.
Note that an expired intermediate root is considered valid: expiry is
Expand Down Expand Up @@ -225,7 +225,7 @@ def update_root(self, data: bytes):
self._trusted_set["root"] = new_root
logger.debug("Updated root")

def root_update_finished(self):
def root_update_finished(self) -> None:
"""Marks root metadata as final and verifies it is not expired
Raises:
Expand All @@ -245,7 +245,7 @@ def root_update_finished(self):
self._root_update_finished = True
logger.debug("Verified final root.json")

def update_timestamp(self, data: bytes):
def update_timestamp(self, data: bytes) -> None:
"""Verifies and loads 'data' as new timestamp metadata.
Args:
Expand Down Expand Up @@ -302,7 +302,7 @@ def update_timestamp(self, data: bytes):
self._trusted_set["timestamp"] = new_timestamp
logger.debug("Updated timestamp")

def update_snapshot(self, data: bytes):
def update_snapshot(self, data: bytes) -> None:
"""Verifies and loads 'data' as new snapshot metadata.
Args:
Expand Down Expand Up @@ -379,7 +379,7 @@ def update_snapshot(self, data: bytes):
self._trusted_set["snapshot"] = new_snapshot
logger.debug("Updated snapshot")

def update_targets(self, data: bytes):
def update_targets(self, data: bytes) -> None:
"""Verifies and loads 'data' as new top-level targets metadata.
Args:
Expand All @@ -393,7 +393,7 @@ def update_targets(self, data: bytes):

def update_delegated_targets(
self, data: bytes, role_name: str, delegator_name: str
):
) -> None:
"""Verifies and loads 'data' as new metadata for target 'role_name'.
Args:
Expand Down
57 changes: 30 additions & 27 deletions tuf/ngclient/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from securesystemslib import util as sslib_util

from tuf import exceptions
from tuf.api.metadata import DelegatedRole, TargetFile, Targets
from tuf.ngclient._internal import (
download,
requests_fetcher,
Expand Down Expand Up @@ -63,12 +64,7 @@ def __init__(
# Read trusted local root metadata
data = self._load_local_metadata("root")
self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data)

if fetcher is None:
self._fetcher = requests_fetcher.RequestsFetcher()
else:
self._fetcher = fetcher

self._fetcher = fetcher or requests_fetcher.RequestsFetcher()
self.config = config or UpdaterConfig()

def refresh(self) -> None:
Expand All @@ -95,7 +91,7 @@ def refresh(self) -> None:
self._load_snapshot()
self._load_targets("targets", "root")

def get_one_valid_targetinfo(self, target_path: str) -> Dict:
def get_one_valid_targetinfo(self, target_path: str) -> Dict[str, Any]:
"""
Returns the target information for a target identified by target_path.
Expand Down Expand Up @@ -139,7 +135,7 @@ def updated_targets(
# 'destination_directory' if 'filepath' contains a leading path
# separator (i.e., is treated as an absolute path).
filepath = target["filepath"]
target_fileinfo: "TargetFile" = target["fileinfo"]
target_fileinfo: TargetFile = target["fileinfo"]

target_filepath = os.path.join(destination_directory, filepath)

Expand All @@ -159,10 +155,10 @@ def updated_targets(

def download_target(
self,
targetinfo: Dict,
targetinfo: Dict[str, Any],
destination_directory: str,
target_base_url: Optional[str] = None,
):
) -> None:
"""
Download target specified by 'targetinfo' into 'destination_directory'.
Expand All @@ -189,8 +185,8 @@ def download_target(
else:
target_base_url = _ensure_trailing_slash(target_base_url)

target_filepath = targetinfo["filepath"]
target_fileinfo: "TargetFile" = targetinfo["fileinfo"]
target_filepath: str = targetinfo["filepath"]
target_fileinfo: TargetFile = targetinfo["fileinfo"]
full_url = parse.urljoin(target_base_url, target_filepath)

with download.download_file(
Expand Down Expand Up @@ -226,7 +222,7 @@ def _load_local_metadata(self, rolename: str) -> bytes:
with open(os.path.join(self._dir, f"{rolename}.json"), "rb") as f:
return f.read()

def _persist_metadata(self, rolename: str, data: bytes):
def _persist_metadata(self, rolename: str, data: bytes) -> None:
with open(os.path.join(self._dir, f"{rolename}.json"), "wb") as f:
f.write(data)

Expand Down Expand Up @@ -314,7 +310,9 @@ def _load_targets(self, role: str, parent_role: str) -> None:
self._trusted_set.update_delegated_targets(data, role, parent_role)
self._persist_metadata(role, data)

def _preorder_depth_first_walk(self, target_filepath) -> Dict:
def _preorder_depth_first_walk(
self, target_filepath: str
) -> Dict[str, Any]:
"""
Interrogates the tree of target delegations in order of appearance
(which implicitly order trustworthiness), and returns the matching
Expand Down Expand Up @@ -343,7 +341,7 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict:
# The metadata for 'role_name' must be downloaded/updated before
# its targets, delegations, and child roles can be inspected.

role_metadata = self._trusted_set[role_name].signed
role_metadata: Targets = self._trusted_set[role_name].signed
target = role_metadata.targets.get(target_filepath)

# After preorder check, add current role to set of visited roles.
Expand All @@ -366,19 +364,19 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict:
)

if child_role.terminating and child_role_name is not None:
msg = (
f"Adding child role {child_role_name}.\n",
"Not backtracking to other roles.",
logger.debug(
"Adding child role %s. Not backtracking to other \
roles.",
child_role_name,
)
logger.debug(msg)
role_names = []
child_roles_to_visit.append(
(child_role_name, role_name)
)
break

if child_role_name is None:
msg = f"Skipping child role {child_role_name}"
msg = f"Skipping child role {child_role.name}"
logger.debug(msg)

else:
Expand All @@ -403,16 +401,19 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict:
and number_of_delegations == 0
and len(role_names) > 0
):
msg = (
f"{len(role_names)} roles left to visit, but allowed to ",
f"visit at most {self.config.max_delegations} delegations.",
logger.debug(
"%s roles left to visit, but allowed to visit at most \
%d delegations.",
role_names,
self.config.max_delegations,
)
logger.debug(msg)

return {"filepath": target_filepath, "fileinfo": target}


def _visit_child_role(child_role: Dict, target_filepath: str) -> str:
def _visit_child_role(
child_role: DelegatedRole, target_filepath: str
) -> Optional[str]:
"""
<Purpose>
Non-public method that determines whether the given 'target_filepath'
Expand Down Expand Up @@ -509,7 +510,9 @@ def _visit_child_role(child_role: Dict, target_filepath: str) -> str:
return None


def _get_filepath_hash(target_filepath, hash_function="sha256"):
def _get_filepath_hash(
target_filepath: str, hash_function: str = "sha256"
) -> str:
"""
Calculate the hash of the filepath to determine which bin to find the
target.
Expand All @@ -524,6 +527,6 @@ def _get_filepath_hash(target_filepath, hash_function="sha256"):
return target_filepath_hash


def _ensure_trailing_slash(url: str):
def _ensure_trailing_slash(url: str) -> str:
"""Return url guaranteed to end in a slash"""
return url if url.endswith("/") else f"{url}/"

0 comments on commit e4109a0

Please sign in to comment.