Skip to content

Commit

Permalink
Fix some mypy errors
Browse files Browse the repository at this point in the history
No functional change intended.

(cherry picked from commit f84f578)
  • Loading branch information
DemiMarie authored and marmarek committed May 12, 2023
1 parent c6dad2f commit c97b0d7
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions qrexec/policy/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

"""Qrexec policy parser and evaluator"""

from __future__ import annotations
import abc
import collections
import collections.abc
Expand All @@ -40,6 +41,9 @@
Tuple,
Dict,
Optional,
Set,
Union,
Type,
)

from .. import QREXEC_CLIENT, POLICYPATH, RPCNAME_ALLOWED_CHARSET, POLICYSUFFIX
Expand Down Expand Up @@ -89,7 +93,7 @@ def filter_filepaths(filepaths: Iterable[pathlib.Path]) -> List[pathlib.Path]:
return filepaths


def parse_service_and_argument(rpcname, *, no_arg="+"):
def parse_service_and_argument(rpcname: Union[str, pathlib.PurePath], *, no_arg: str ="+"):
"""Parse service and argument string.
Parse ``SERVICE+ARGUMENT``. Argument may be empty (single ``+`` at the end)
Expand Down Expand Up @@ -184,8 +188,8 @@ def validate_service_and_argument(service, argument, *, filepath, lineno):

class VMTokenMeta(abc.ABCMeta):
# pylint: disable=missing-docstring
exacts = collections.OrderedDict()
prefixes = collections.OrderedDict()
exacts: collections.OrderedDict[str, Type[str]] = collections.OrderedDict()
prefixes: collections.OrderedDict[str, Type[str]] = collections.OrderedDict()

def __init__(cls, name, bases, dict_):
super().__init__(name, bases, dict_)
Expand Down Expand Up @@ -562,7 +566,7 @@ def __init__(self, rule, request, *, user):
self.notify = rule.action.notify

@abc.abstractmethod
async def execute(self, caller_ident):
async def execute(self, caller_ident: str) -> str:
"""
Execute the action. For allow, this runs the qrexec. For ask, it asks
user and then (depending on verdict) runs the call.
Expand Down Expand Up @@ -752,7 +756,7 @@ def handle_invalid_response(self):
# pylint: disable=no-self-use
raise AccessDenied("invalid response")

async def execute(self, caller_ident):
async def execute(self, caller_ident: str) -> str:
"""Ask the user for permission.
This method should be overloaded in children classes. This
Expand Down Expand Up @@ -1566,7 +1570,7 @@ def collect_targets_for_ask(self, request):
Word 'targets' is used intentionally instead of 'domains', because it
can also contains @dispvm like keywords.
"""
targets = set()
targets: Set[str] = set()

# iterate over rules in reversed order to easier handle 'deny'
# actions - simply remove matching domains from allowed set
Expand Down Expand Up @@ -1629,22 +1633,23 @@ def resolve_filepath(
Raises:
qrexec.exc.PolicySyntaxError: when the path does not point to a file
"""
included_path = self.resolve_path(included_path)
if not included_path.is_file():
resolved_included_path: pathlib.Path = self.resolve_path(included_path)
if not resolved_included_path.is_file():
raise exc.PolicySyntaxError(
filepath, lineno, "not a file: {}".format(included_path)
filepath, lineno, "not a file: {}".format(resolved_included_path)
)
# pylint: disable=consider-using-with
return open(str(included_path), encoding='utf-8'), included_path
return (open(str(resolved_included_path), encoding='utf-8'),
pathlib.PurePath(resolved_included_path))

def handle_include(
self, included_path: pathlib.PurePosixPath, *, filepath, lineno
):
file, included_path = self.resolve_filepath(
file, resolved_included_path = self.resolve_filepath(
included_path, filepath=filepath, lineno=lineno
)
with file:
self.load_policy_file(file, pathlib.PurePosixPath(included_path))
self.load_policy_file(file, resolved_included_path)

def handle_include_service(
self,
Expand All @@ -1658,12 +1663,12 @@ def handle_include_service(
service, argument = validate_service_and_argument(
service, argument, filepath=filepath, lineno=lineno
)
file, included_path = self.resolve_filepath(
file, resolved_included_path = self.resolve_filepath(
included_path, filepath=filepath, lineno=lineno
)
with file:
self.load_policy_file_service(
service, argument, file, pathlib.PurePosixPath(included_path)
service, argument, file, resolved_included_path
)


Expand All @@ -1681,20 +1686,20 @@ def resolve_dirpath(
qrexec.exc.PolicySyntaxError: when the path does not point to
a directory
"""
included_path = self.resolve_path(included_path)
if not included_path.is_dir():
resolved_included_path = self.resolve_path(included_path)
if not resolved_included_path.is_dir():
raise exc.PolicySyntaxError(
filepath, lineno, "not a directory: {}".format(included_path)
filepath, lineno, "not a directory: {}".format(resolved_included_path)
)
return included_path
return resolved_included_path

def handle_include_dir(
self, included_path: pathlib.PurePosixPath, *, filepath, lineno
):
included_path = self.resolve_dirpath(
resolved_included_path = self.resolve_dirpath(
included_path, filepath=filepath, lineno=lineno
)
self.load_policy_dir(included_path)
self.load_policy_dir(resolved_included_path)

def load_policy_dir(self, dirpath):
"""Load all files in the directory (``!include-dir``)
Expand Down Expand Up @@ -1902,7 +1907,7 @@ def handle_include(
filepath,
)
self.save_included_path(included_path, filepath=filepath, lineno=lineno)
super().handle_include(included_path, filepath=filepath, lineno=lineno)
super().handle_include(included_path, filepath=filepath, lineno=lineno) # type: ignore

def handle_include_service(
self,
Expand All @@ -1920,7 +1925,7 @@ def handle_include_service(
filepath,
)
self.save_included_path(included_path, filepath=filepath, lineno=lineno)
super().handle_include_service(
super().handle_include_service( # type: ignore
service, argument, included_path, filepath=filepath, lineno=lineno
)

Expand All @@ -1945,7 +1950,13 @@ def __init__(self, *args, policy, **kwds):
super().__init__(*args, **kwds)
self.policy = policy

def resolve_filepath(self, included_path, *, filepath, lineno):
def resolve_filepath(
self,
included_path,
*,
filepath,
lineno,
) -> Tuple[TextIO, pathlib.PurePath]:
"""
Raises:
qrexec.exc.PolicySyntaxError: when wrong path is included
Expand Down

0 comments on commit c97b0d7

Please sign in to comment.