Skip to content

Commit

Permalink
mypy: partial debug.py and pytracer.py
Browse files Browse the repository at this point in the history
  • Loading branch information
nedbat committed Jan 2, 2023
1 parent ffc701a commit 3f0bce2
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 42 deletions.
2 changes: 1 addition & 1 deletion coverage/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def __init__( # pylint: disable=too-many-arguments
data_file = None

# This is injectable by tests.
self._debug_file = None
self._debug_file: Optional[IO[str]] = None

self._auto_load = self._auto_save = auto_data
self._data_suffix_specified = data_suffix
Expand Down
88 changes: 57 additions & 31 deletions coverage/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

"""Control of and utilities for debugging."""

from __future__ import annotations

import contextlib
import functools
import inspect
Expand All @@ -15,7 +17,10 @@
import types
import _thread

from typing import Any, Callable, Iterable, Iterator, Tuple
from typing import (
Any, Callable, Generator, IO, Iterable, Iterator, Optional, List, Tuple,
cast,
)

from coverage.misc import isolate_module

Expand All @@ -25,7 +30,7 @@
# When debugging, it can be helpful to force some options, especially when
# debugging the configuration mechanisms you usually use to control debugging!
# This is a list of forced debugging options.
FORCED_DEBUG = []
FORCED_DEBUG: List[str] = []
FORCED_DEBUG_FILE = None


Expand All @@ -34,7 +39,7 @@ class DebugControl:

show_repr_attr = False # For AutoReprMixin

def __init__(self, options, output):
def __init__(self, options: Iterable[str], output: Optional[IO[str]]) -> None:
"""Configure the options and output file for debugging."""
self.options = list(options) + FORCED_DEBUG
self.suppress_callers = False
Expand All @@ -49,17 +54,17 @@ def __init__(self, options, output):
)
self.raw_output = self.output.outfile

def __repr__(self):
def __repr__(self) -> str:
return f"<DebugControl options={self.options!r} raw_output={self.raw_output!r}>"

def should(self, option):
def should(self, option: str) -> bool:
"""Decide whether to output debug information in category `option`."""
if option == "callers" and self.suppress_callers:
return False
return (option in self.options)

@contextlib.contextmanager
def without_callers(self):
def without_callers(self) -> Generator[None, None, None]:
"""A context manager to prevent call stacks from being logged."""
old = self.suppress_callers
self.suppress_callers = True
Expand All @@ -68,7 +73,7 @@ def without_callers(self):
finally:
self.suppress_callers = old

def write(self, msg):
def write(self, msg: str) -> None:
"""Write a line of debug output.
`msg` is the line to write. A newline will be appended.
Expand All @@ -86,26 +91,26 @@ def write(self, msg):

class DebugControlString(DebugControl):
"""A `DebugControl` that writes to a StringIO, for testing."""
def __init__(self, options):
def __init__(self, options: Iterable[str]) -> None:
super().__init__(options, io.StringIO())

def get_output(self):
def get_output(self) -> str:
"""Get the output text from the `DebugControl`."""
return self.raw_output.getvalue()
return cast(str, self.raw_output.getvalue())


class NoDebugging:
"""A replacement for DebugControl that will never try to do anything."""
def should(self, option): # pylint: disable=unused-argument
def should(self, option: str) -> bool: # pylint: disable=unused-argument
"""Should we write debug messages? Never."""
return False

def write(self, msg):
def write(self, msg: str) -> None:
"""This will never be called."""
raise AssertionError("NoDebugging.write should never be called.")


def info_header(label):
def info_header(label: str) -> str:
"""Make a nice header string."""
return "--{:-<60s}".format(" "+label+" ")

Expand Down Expand Up @@ -155,7 +160,7 @@ def write_formatted_info(
write(f" {line}")


def short_stack(limit=None, skip=0):
def short_stack(limit: Optional[int]=None, skip: int=0) -> str:
"""Return a string summarizing the call stack.
The string is multi-line, with one line per stack frame. Each line shows
Expand All @@ -177,29 +182,33 @@ def short_stack(limit=None, skip=0):
return "\n".join("%30s : %s:%d" % (t[3], t[1], t[2]) for t in stack)


def dump_stack_frames(limit=None, out=None, skip=0):
def dump_stack_frames(
limit: Optional[int]=None,
out: Optional[IO[str]]=None,
skip: int=0
) -> None:
"""Print a summary of the stack to stdout, or someplace else."""
out = out or sys.stdout
out.write(short_stack(limit=limit, skip=skip+1))
out.write("\n")


def clipped_repr(text, numchars=50):
def clipped_repr(text: str, numchars: int=50) -> str:
"""`repr(text)`, but limited to `numchars`."""
r = reprlib.Repr()
r.maxstring = numchars
return r.repr(text)


def short_id(id64):
def short_id(id64: int) -> int:
"""Given a 64-bit id, make a shorter 16-bit one."""
id16 = 0
for offset in range(0, 64, 16):
id16 ^= id64 >> offset
return id16 & 0xFFFF


def add_pid_and_tid(text):
def add_pid_and_tid(text: str) -> str:
"""A filter to add pid and tid to debug messages."""
# Thread ids are useful, but too long. Make a shorter one.
tid = f"{short_id(_thread.get_ident()):04x}"
Expand All @@ -211,7 +220,7 @@ class AutoReprMixin:
"""A mixin implementing an automatic __repr__ for debugging."""
auto_repr_ignore = ['auto_repr_ignore', '$coverage.object_id']

def __repr__(self):
def __repr__(self) -> str:
show_attrs = (
(k, v) for k, v in self.__dict__.items()
if getattr(v, "show_repr_attr", True)
Expand All @@ -225,7 +234,7 @@ def __repr__(self):
)


def simplify(v): # pragma: debugging
def simplify(v: Any) -> Any: # pragma: debugging
"""Turn things which are nearly dict/list/etc into dict/list/etc."""
if isinstance(v, dict):
return {k:simplify(vv) for k, vv in v.items()}
Expand All @@ -237,13 +246,13 @@ def simplify(v): # pragma: debugging
return v


def pp(v): # pragma: debugging
def pp(v: Any) -> None: # pragma: debugging
"""Debug helper to pretty-print data, including SimpleNamespace objects."""
# Might not be needed in 3.9+
pprint.pprint(simplify(v))


def filter_text(text, filters):
def filter_text(text: str, filters: Iterable[Callable[[str], str]]) -> str:
"""Run `text` through a series of filters.
`filters` is a list of functions. Each takes a string and returns a
Expand All @@ -266,10 +275,10 @@ def filter_text(text, filters):

class CwdTracker: # pragma: debugging
"""A class to add cwd info to debug messages."""
def __init__(self):
self.cwd = None
def __init__(self) -> None:
self.cwd: Optional[str] = None

def filter(self, text):
def filter(self, text: str) -> str:
"""Add a cwd message for each new cwd."""
cwd = os.getcwd()
if cwd != self.cwd:
Expand All @@ -280,7 +289,12 @@ def filter(self, text):

class DebugOutputFile: # pragma: debugging
"""A file-like object that includes pid and cwd information."""
def __init__(self, outfile, show_process, filters):
def __init__(
self,
outfile: Optional[IO[str]],
show_process: bool,
filters: Iterable[Callable[[str], str]],
):
self.outfile = outfile
self.show_process = show_process
self.filters = list(filters)
Expand All @@ -296,7 +310,13 @@ def __init__(self, outfile, show_process, filters):
SINGLETON_ATTR = 'the_one_and_is_interim'

@classmethod
def get_one(cls, fileobj=None, show_process=True, filters=(), interim=False):
def get_one(
cls,
fileobj: Optional[IO[str]]=None,
show_process: bool=True,
filters: Iterable[Callable[[str], str]]=(),
interim: bool=False,
) -> DebugOutputFile:
"""Get a DebugOutputFile.
If `fileobj` is provided, then a new DebugOutputFile is made with it.
Expand Down Expand Up @@ -339,13 +359,15 @@ def get_one(cls, fileobj=None, show_process=True, filters=(), interim=False):
sys.modules[cls.SYS_MOD_NAME] = singleton_module
return the_one

def write(self, text):
def write(self, text: str) -> None:
"""Just like file.write, but filter through all our filters."""
assert self.outfile is not None
self.outfile.write(filter_text(text, self.filters))
self.outfile.flush()

def flush(self):
def flush(self) -> None:
"""Flush our file."""
assert self.outfile is not None
self.outfile.flush()


Expand Down Expand Up @@ -388,7 +410,11 @@ def _wrapper(*args, **kwargs):
CALLS = itertools.count()
OBJ_ID_ATTR = "$coverage.object_id"

def show_calls(show_args=True, show_stack=False, show_return=False): # pragma: debugging
def show_calls(
show_args: bool=True,
show_stack: bool=False,
show_return: bool=False,
) -> Callable[..., Any]: # pragma: debugging
"""A method decorator to debug-log each call to the function."""
def _decorator(func):
@functools.wraps(func)
Expand Down Expand Up @@ -422,7 +448,7 @@ def _wrapper(self, *args, **kwargs):
return _decorator


def _clean_stack_line(s): # pragma: debugging
def _clean_stack_line(s: str) -> str: # pragma: debugging
"""Simplify some paths in a stack trace, for compactness."""
s = s.strip()
s = s.replace(os.path.dirname(__file__) + '/', '')
Expand Down
22 changes: 12 additions & 10 deletions coverage/pytracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys

from types import FrameType
from typing import Any, Callable, Dict, Mapping, Optional
from typing import Any, Callable, Dict, Optional

from coverage import env
from coverage.types import TFileDisposition, TTraceData, TTraceFn, TTracer, TWarnFn
Expand Down Expand Up @@ -51,7 +51,7 @@ def __init__(self) -> None:
self.data: TTraceData
self.trace_arcs = False
self.should_trace: Callable[[str, FrameType], TFileDisposition]
self.should_trace_cache: Mapping[str, Optional[TFileDisposition]]
self.should_trace_cache: Dict[str, Optional[TFileDisposition]]
self.should_start_context: Optional[Callable[[FrameType], Optional[str]]] = None
self.switch_context: Optional[Callable[[Optional[str]], None]] = None
self.warn: TWarnFn
Expand All @@ -61,8 +61,8 @@ def __init__(self) -> None:

self.cur_file_data = None
self.last_line = 0 # int, but uninitialized.
self.cur_file_name = None
self.context = None
self.cur_file_name: Optional[str] = None
self.context: Optional[str] = None
self.started_context = False

self.data_stack = []
Expand All @@ -84,7 +84,7 @@ def __repr__(self) -> str:
files = len(self.data)
return f"<PyTracer at 0x{me:x}: {points} data points in {files} files>"

def log(self, marker, *args) -> None:
def log(self, marker: str, *args: Any) -> None:
"""For hard-core logging of what this tracer is doing."""
with open("/tmp/debug_trace.txt", "a") as f:
f.write("{} {}[{}]".format(
Expand All @@ -93,21 +93,21 @@ def log(self, marker, *args) -> None:
len(self.data_stack),
))
if 0: # if you want thread ids..
f.write(".{:x}.{:x}".format(
f.write(".{:x}.{:x}".format( # type: ignore[unreachable]
self.thread.ident,
self.threading.current_thread().ident,
))
f.write(" {}".format(" ".join(map(str, args))))
if 0: # if you want callers..
f.write(" | ")
f.write(" | ") # type: ignore[unreachable]
stack = " / ".join(
(fname or "???").rpartition("/")[-1]
for _, fname, _, _ in self.data_stack
)
f.write(stack)
f.write("\n")

def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn:
def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> Optional[TTraceFn]:
"""The trace function passed to sys.settrace."""

if THIS_FILE in frame.f_code.co_filename:
Expand All @@ -119,8 +119,8 @@ def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn:
# The PyTrace.stop() method has been called, possibly by another
# thread, let's deactivate ourselves now.
if 0:
self.log("---\nX", frame.f_code.co_filename, frame.f_lineno)
f = frame
f = frame # type: ignore[unreachable]
self.log("---\nX", f.f_code.co_filename, f.f_lineno)
while f:
self.log(">", f.f_code.co_filename, f.f_lineno, f.f_code.co_name, f.f_trace)
f = f.f_back
Expand All @@ -140,6 +140,7 @@ def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn:
if context_maybe is not None:
self.context = context_maybe
started_context = True
assert self.switch_context is not None
self.switch_context(self.context)
else:
started_context = False
Expand Down Expand Up @@ -175,6 +176,7 @@ def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn:
self.cur_file_data = None
if disp.trace:
tracename = disp.source_filename
assert tracename is not None
if tracename not in self.data:
self.data[tracename] = set()
self.cur_file_data = self.data[tracename]
Expand Down

0 comments on commit 3f0bce2

Please sign in to comment.