From 3f0bce2f5f4658bfa1d9cd6ddb2f6d7e520897e8 Mon Sep 17 00:00:00 2001 From: Ned Batchelder Date: Mon, 2 Jan 2023 11:43:18 -0500 Subject: [PATCH] mypy: partial debug.py and pytracer.py --- coverage/control.py | 2 +- coverage/debug.py | 88 ++++++++++++++++++++++++++++---------------- coverage/pytracer.py | 22 ++++++----- 3 files changed, 70 insertions(+), 42 deletions(-) diff --git a/coverage/control.py b/coverage/control.py index e5cabd5bb..8ac6781ee 100644 --- a/coverage/control.py +++ b/coverage/control.py @@ -225,7 +225,7 @@ def __init__( # pylint: disable=too-many-arguments data_file = None # This is injectable by tests. - self._debug_file = None + self._debug_file: Optional[IO[str]] = None self._auto_load = self._auto_save = auto_data self._data_suffix_specified = data_suffix diff --git a/coverage/debug.py b/coverage/debug.py index 7ed8937ce..82de3c298 100644 --- a/coverage/debug.py +++ b/coverage/debug.py @@ -3,6 +3,8 @@ """Control of and utilities for debugging.""" +from __future__ import annotations + import contextlib import functools import inspect @@ -15,7 +17,10 @@ import types import _thread -from typing import Any, Callable, Iterable, Iterator, Tuple +from typing import ( + Any, Callable, Generator, IO, Iterable, Iterator, Optional, List, Tuple, + cast, +) from coverage.misc import isolate_module @@ -25,7 +30,7 @@ # When debugging, it can be helpful to force some options, especially when # debugging the configuration mechanisms you usually use to control debugging! # This is a list of forced debugging options. -FORCED_DEBUG = [] +FORCED_DEBUG: List[str] = [] FORCED_DEBUG_FILE = None @@ -34,7 +39,7 @@ class DebugControl: show_repr_attr = False # For AutoReprMixin - def __init__(self, options, output): + def __init__(self, options: Iterable[str], output: Optional[IO[str]]) -> None: """Configure the options and output file for debugging.""" self.options = list(options) + FORCED_DEBUG self.suppress_callers = False @@ -49,17 +54,17 @@ def __init__(self, options, output): ) self.raw_output = self.output.outfile - def __repr__(self): + def __repr__(self) -> str: return f"" - def should(self, option): + def should(self, option: str) -> bool: """Decide whether to output debug information in category `option`.""" if option == "callers" and self.suppress_callers: return False return (option in self.options) @contextlib.contextmanager - def without_callers(self): + def without_callers(self) -> Generator[None, None, None]: """A context manager to prevent call stacks from being logged.""" old = self.suppress_callers self.suppress_callers = True @@ -68,7 +73,7 @@ def without_callers(self): finally: self.suppress_callers = old - def write(self, msg): + def write(self, msg: str) -> None: """Write a line of debug output. `msg` is the line to write. A newline will be appended. @@ -86,26 +91,26 @@ def write(self, msg): class DebugControlString(DebugControl): """A `DebugControl` that writes to a StringIO, for testing.""" - def __init__(self, options): + def __init__(self, options: Iterable[str]) -> None: super().__init__(options, io.StringIO()) - def get_output(self): + def get_output(self) -> str: """Get the output text from the `DebugControl`.""" - return self.raw_output.getvalue() + return cast(str, self.raw_output.getvalue()) class NoDebugging: """A replacement for DebugControl that will never try to do anything.""" - def should(self, option): # pylint: disable=unused-argument + def should(self, option: str) -> bool: # pylint: disable=unused-argument """Should we write debug messages? Never.""" return False - def write(self, msg): + def write(self, msg: str) -> None: """This will never be called.""" raise AssertionError("NoDebugging.write should never be called.") -def info_header(label): +def info_header(label: str) -> str: """Make a nice header string.""" return "--{:-<60s}".format(" "+label+" ") @@ -155,7 +160,7 @@ def write_formatted_info( write(f" {line}") -def short_stack(limit=None, skip=0): +def short_stack(limit: Optional[int]=None, skip: int=0) -> str: """Return a string summarizing the call stack. The string is multi-line, with one line per stack frame. Each line shows @@ -177,21 +182,25 @@ def short_stack(limit=None, skip=0): return "\n".join("%30s : %s:%d" % (t[3], t[1], t[2]) for t in stack) -def dump_stack_frames(limit=None, out=None, skip=0): +def dump_stack_frames( + limit: Optional[int]=None, + out: Optional[IO[str]]=None, + skip: int=0 +) -> None: """Print a summary of the stack to stdout, or someplace else.""" out = out or sys.stdout out.write(short_stack(limit=limit, skip=skip+1)) out.write("\n") -def clipped_repr(text, numchars=50): +def clipped_repr(text: str, numchars: int=50) -> str: """`repr(text)`, but limited to `numchars`.""" r = reprlib.Repr() r.maxstring = numchars return r.repr(text) -def short_id(id64): +def short_id(id64: int) -> int: """Given a 64-bit id, make a shorter 16-bit one.""" id16 = 0 for offset in range(0, 64, 16): @@ -199,7 +208,7 @@ def short_id(id64): return id16 & 0xFFFF -def add_pid_and_tid(text): +def add_pid_and_tid(text: str) -> str: """A filter to add pid and tid to debug messages.""" # Thread ids are useful, but too long. Make a shorter one. tid = f"{short_id(_thread.get_ident()):04x}" @@ -211,7 +220,7 @@ class AutoReprMixin: """A mixin implementing an automatic __repr__ for debugging.""" auto_repr_ignore = ['auto_repr_ignore', '$coverage.object_id'] - def __repr__(self): + def __repr__(self) -> str: show_attrs = ( (k, v) for k, v in self.__dict__.items() if getattr(v, "show_repr_attr", True) @@ -225,7 +234,7 @@ def __repr__(self): ) -def simplify(v): # pragma: debugging +def simplify(v: Any) -> Any: # pragma: debugging """Turn things which are nearly dict/list/etc into dict/list/etc.""" if isinstance(v, dict): return {k:simplify(vv) for k, vv in v.items()} @@ -237,13 +246,13 @@ def simplify(v): # pragma: debugging return v -def pp(v): # pragma: debugging +def pp(v: Any) -> None: # pragma: debugging """Debug helper to pretty-print data, including SimpleNamespace objects.""" # Might not be needed in 3.9+ pprint.pprint(simplify(v)) -def filter_text(text, filters): +def filter_text(text: str, filters: Iterable[Callable[[str], str]]) -> str: """Run `text` through a series of filters. `filters` is a list of functions. Each takes a string and returns a @@ -266,10 +275,10 @@ def filter_text(text, filters): class CwdTracker: # pragma: debugging """A class to add cwd info to debug messages.""" - def __init__(self): - self.cwd = None + def __init__(self) -> None: + self.cwd: Optional[str] = None - def filter(self, text): + def filter(self, text: str) -> str: """Add a cwd message for each new cwd.""" cwd = os.getcwd() if cwd != self.cwd: @@ -280,7 +289,12 @@ def filter(self, text): class DebugOutputFile: # pragma: debugging """A file-like object that includes pid and cwd information.""" - def __init__(self, outfile, show_process, filters): + def __init__( + self, + outfile: Optional[IO[str]], + show_process: bool, + filters: Iterable[Callable[[str], str]], + ): self.outfile = outfile self.show_process = show_process self.filters = list(filters) @@ -296,7 +310,13 @@ def __init__(self, outfile, show_process, filters): SINGLETON_ATTR = 'the_one_and_is_interim' @classmethod - def get_one(cls, fileobj=None, show_process=True, filters=(), interim=False): + def get_one( + cls, + fileobj: Optional[IO[str]]=None, + show_process: bool=True, + filters: Iterable[Callable[[str], str]]=(), + interim: bool=False, + ) -> DebugOutputFile: """Get a DebugOutputFile. If `fileobj` is provided, then a new DebugOutputFile is made with it. @@ -339,13 +359,15 @@ def get_one(cls, fileobj=None, show_process=True, filters=(), interim=False): sys.modules[cls.SYS_MOD_NAME] = singleton_module return the_one - def write(self, text): + def write(self, text: str) -> None: """Just like file.write, but filter through all our filters.""" + assert self.outfile is not None self.outfile.write(filter_text(text, self.filters)) self.outfile.flush() - def flush(self): + def flush(self) -> None: """Flush our file.""" + assert self.outfile is not None self.outfile.flush() @@ -388,7 +410,11 @@ def _wrapper(*args, **kwargs): CALLS = itertools.count() OBJ_ID_ATTR = "$coverage.object_id" -def show_calls(show_args=True, show_stack=False, show_return=False): # pragma: debugging +def show_calls( + show_args: bool=True, + show_stack: bool=False, + show_return: bool=False, +) -> Callable[..., Any]: # pragma: debugging """A method decorator to debug-log each call to the function.""" def _decorator(func): @functools.wraps(func) @@ -422,7 +448,7 @@ def _wrapper(self, *args, **kwargs): return _decorator -def _clean_stack_line(s): # pragma: debugging +def _clean_stack_line(s: str) -> str: # pragma: debugging """Simplify some paths in a stack trace, for compactness.""" s = s.strip() s = s.replace(os.path.dirname(__file__) + '/', '') diff --git a/coverage/pytracer.py b/coverage/pytracer.py index c50c9c198..027e8e7e0 100644 --- a/coverage/pytracer.py +++ b/coverage/pytracer.py @@ -8,7 +8,7 @@ import sys from types import FrameType -from typing import Any, Callable, Dict, Mapping, Optional +from typing import Any, Callable, Dict, Optional from coverage import env from coverage.types import TFileDisposition, TTraceData, TTraceFn, TTracer, TWarnFn @@ -51,7 +51,7 @@ def __init__(self) -> None: self.data: TTraceData self.trace_arcs = False self.should_trace: Callable[[str, FrameType], TFileDisposition] - self.should_trace_cache: Mapping[str, Optional[TFileDisposition]] + self.should_trace_cache: Dict[str, Optional[TFileDisposition]] self.should_start_context: Optional[Callable[[FrameType], Optional[str]]] = None self.switch_context: Optional[Callable[[Optional[str]], None]] = None self.warn: TWarnFn @@ -61,8 +61,8 @@ def __init__(self) -> None: self.cur_file_data = None self.last_line = 0 # int, but uninitialized. - self.cur_file_name = None - self.context = None + self.cur_file_name: Optional[str] = None + self.context: Optional[str] = None self.started_context = False self.data_stack = [] @@ -84,7 +84,7 @@ def __repr__(self) -> str: files = len(self.data) return f"" - def log(self, marker, *args) -> None: + def log(self, marker: str, *args: Any) -> None: """For hard-core logging of what this tracer is doing.""" with open("/tmp/debug_trace.txt", "a") as f: f.write("{} {}[{}]".format( @@ -93,13 +93,13 @@ def log(self, marker, *args) -> None: len(self.data_stack), )) if 0: # if you want thread ids.. - f.write(".{:x}.{:x}".format( + f.write(".{:x}.{:x}".format( # type: ignore[unreachable] self.thread.ident, self.threading.current_thread().ident, )) f.write(" {}".format(" ".join(map(str, args)))) if 0: # if you want callers.. - f.write(" | ") + f.write(" | ") # type: ignore[unreachable] stack = " / ".join( (fname or "???").rpartition("/")[-1] for _, fname, _, _ in self.data_stack @@ -107,7 +107,7 @@ def log(self, marker, *args) -> None: f.write(stack) f.write("\n") - def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn: + def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> Optional[TTraceFn]: """The trace function passed to sys.settrace.""" if THIS_FILE in frame.f_code.co_filename: @@ -119,8 +119,8 @@ def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn: # The PyTrace.stop() method has been called, possibly by another # thread, let's deactivate ourselves now. if 0: - self.log("---\nX", frame.f_code.co_filename, frame.f_lineno) - f = frame + f = frame # type: ignore[unreachable] + self.log("---\nX", f.f_code.co_filename, f.f_lineno) while f: self.log(">", f.f_code.co_filename, f.f_lineno, f.f_code.co_name, f.f_trace) f = f.f_back @@ -140,6 +140,7 @@ def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn: if context_maybe is not None: self.context = context_maybe started_context = True + assert self.switch_context is not None self.switch_context(self.context) else: started_context = False @@ -175,6 +176,7 @@ def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn: self.cur_file_data = None if disp.trace: tracename = disp.source_filename + assert tracename is not None if tracename not in self.data: self.data[tracename] = set() self.cur_file_data = self.data[tracename]