From c97b0d71197f1adfa704e2dd5572823d5be3c465 Mon Sep 17 00:00:00 2001 From: Demi Marie Obenour Date: Mon, 3 Apr 2023 15:40:16 -0400 Subject: [PATCH] Fix some mypy errors No functional change intended. (cherry picked from commit f84f5785b609573a3c102287ff9fbce13df6236e) --- qrexec/policy/parser.py | 57 ++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/qrexec/policy/parser.py b/qrexec/policy/parser.py index 96684654..1cc85454 100644 --- a/qrexec/policy/parser.py +++ b/qrexec/policy/parser.py @@ -22,6 +22,7 @@ """Qrexec policy parser and evaluator""" +from __future__ import annotations import abc import collections import collections.abc @@ -40,6 +41,9 @@ Tuple, Dict, Optional, + Set, + Union, + Type, ) from .. import QREXEC_CLIENT, POLICYPATH, RPCNAME_ALLOWED_CHARSET, POLICYSUFFIX @@ -89,7 +93,7 @@ def filter_filepaths(filepaths: Iterable[pathlib.Path]) -> List[pathlib.Path]: return filepaths -def parse_service_and_argument(rpcname, *, no_arg="+"): +def parse_service_and_argument(rpcname: Union[str, pathlib.PurePath], *, no_arg: str ="+"): """Parse service and argument string. Parse ``SERVICE+ARGUMENT``. Argument may be empty (single ``+`` at the end) @@ -184,8 +188,8 @@ def validate_service_and_argument(service, argument, *, filepath, lineno): class VMTokenMeta(abc.ABCMeta): # pylint: disable=missing-docstring - exacts = collections.OrderedDict() - prefixes = collections.OrderedDict() + exacts: collections.OrderedDict[str, Type[str]] = collections.OrderedDict() + prefixes: collections.OrderedDict[str, Type[str]] = collections.OrderedDict() def __init__(cls, name, bases, dict_): super().__init__(name, bases, dict_) @@ -562,7 +566,7 @@ def __init__(self, rule, request, *, user): self.notify = rule.action.notify @abc.abstractmethod - async def execute(self, caller_ident): + async def execute(self, caller_ident: str) -> str: """ Execute the action. For allow, this runs the qrexec. For ask, it asks user and then (depending on verdict) runs the call. @@ -752,7 +756,7 @@ def handle_invalid_response(self): # pylint: disable=no-self-use raise AccessDenied("invalid response") - async def execute(self, caller_ident): + async def execute(self, caller_ident: str) -> str: """Ask the user for permission. This method should be overloaded in children classes. This @@ -1566,7 +1570,7 @@ def collect_targets_for_ask(self, request): Word 'targets' is used intentionally instead of 'domains', because it can also contains @dispvm like keywords. """ - targets = set() + targets: Set[str] = set() # iterate over rules in reversed order to easier handle 'deny' # actions - simply remove matching domains from allowed set @@ -1629,22 +1633,23 @@ def resolve_filepath( Raises: qrexec.exc.PolicySyntaxError: when the path does not point to a file """ - included_path = self.resolve_path(included_path) - if not included_path.is_file(): + resolved_included_path: pathlib.Path = self.resolve_path(included_path) + if not resolved_included_path.is_file(): raise exc.PolicySyntaxError( - filepath, lineno, "not a file: {}".format(included_path) + filepath, lineno, "not a file: {}".format(resolved_included_path) ) # pylint: disable=consider-using-with - return open(str(included_path), encoding='utf-8'), included_path + return (open(str(resolved_included_path), encoding='utf-8'), + pathlib.PurePath(resolved_included_path)) def handle_include( self, included_path: pathlib.PurePosixPath, *, filepath, lineno ): - file, included_path = self.resolve_filepath( + file, resolved_included_path = self.resolve_filepath( included_path, filepath=filepath, lineno=lineno ) with file: - self.load_policy_file(file, pathlib.PurePosixPath(included_path)) + self.load_policy_file(file, resolved_included_path) def handle_include_service( self, @@ -1658,12 +1663,12 @@ def handle_include_service( service, argument = validate_service_and_argument( service, argument, filepath=filepath, lineno=lineno ) - file, included_path = self.resolve_filepath( + file, resolved_included_path = self.resolve_filepath( included_path, filepath=filepath, lineno=lineno ) with file: self.load_policy_file_service( - service, argument, file, pathlib.PurePosixPath(included_path) + service, argument, file, resolved_included_path ) @@ -1681,20 +1686,20 @@ def resolve_dirpath( qrexec.exc.PolicySyntaxError: when the path does not point to a directory """ - included_path = self.resolve_path(included_path) - if not included_path.is_dir(): + resolved_included_path = self.resolve_path(included_path) + if not resolved_included_path.is_dir(): raise exc.PolicySyntaxError( - filepath, lineno, "not a directory: {}".format(included_path) + filepath, lineno, "not a directory: {}".format(resolved_included_path) ) - return included_path + return resolved_included_path def handle_include_dir( self, included_path: pathlib.PurePosixPath, *, filepath, lineno ): - included_path = self.resolve_dirpath( + resolved_included_path = self.resolve_dirpath( included_path, filepath=filepath, lineno=lineno ) - self.load_policy_dir(included_path) + self.load_policy_dir(resolved_included_path) def load_policy_dir(self, dirpath): """Load all files in the directory (``!include-dir``) @@ -1902,7 +1907,7 @@ def handle_include( filepath, ) self.save_included_path(included_path, filepath=filepath, lineno=lineno) - super().handle_include(included_path, filepath=filepath, lineno=lineno) + super().handle_include(included_path, filepath=filepath, lineno=lineno) # type: ignore def handle_include_service( self, @@ -1920,7 +1925,7 @@ def handle_include_service( filepath, ) self.save_included_path(included_path, filepath=filepath, lineno=lineno) - super().handle_include_service( + super().handle_include_service( # type: ignore service, argument, included_path, filepath=filepath, lineno=lineno ) @@ -1945,7 +1950,13 @@ def __init__(self, *args, policy, **kwds): super().__init__(*args, **kwds) self.policy = policy - def resolve_filepath(self, included_path, *, filepath, lineno): + def resolve_filepath( + self, + included_path, + *, + filepath, + lineno, + ) -> Tuple[TextIO, pathlib.PurePath]: """ Raises: qrexec.exc.PolicySyntaxError: when wrong path is included