forked from python/cpython
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
pythongh-109162: libregrtest: add single.py and result.py (python#109243
) * Add single.py and result.py files. * Rename runtest.py to runtests.py. * Move run_single_test() function and its helper functions to single.py. * Move remove_testfn(), abs_module_name() and normalize_test_name() to utils.py. * Move setup_support() to setup.py. * Move type hints like TestName to utils.py. * Rename runtest.py to runtests.py.
- Loading branch information
Showing
14 changed files
with
722 additions
and
697 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import os | ||
|
||
from test.libregrtest.utils import StrPath, TestName, TestList | ||
|
||
|
||
# If these test directories are encountered recurse into them and treat each | ||
# "test_*.py" file or each sub-directory as a separate test module. This can | ||
# increase parallelism. | ||
# | ||
# Beware this can't generally be done for any directory with sub-tests as the | ||
# __init__.py may do things which alter what tests are to be run. | ||
SPLITTESTDIRS: set[TestName] = { | ||
"test_asyncio", | ||
"test_concurrent_futures", | ||
"test_multiprocessing_fork", | ||
"test_multiprocessing_forkserver", | ||
"test_multiprocessing_spawn", | ||
} | ||
|
||
|
||
def findtestdir(path=None): | ||
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir | ||
|
||
|
||
def findtests(*, testdir: StrPath | None = None, exclude=(), | ||
split_test_dirs: set[TestName] = SPLITTESTDIRS, | ||
base_mod: str = "") -> TestList: | ||
"""Return a list of all applicable test modules.""" | ||
testdir = findtestdir(testdir) | ||
tests = [] | ||
for name in os.listdir(testdir): | ||
mod, ext = os.path.splitext(name) | ||
if (not mod.startswith("test_")) or (mod in exclude): | ||
continue | ||
if mod in split_test_dirs: | ||
subdir = os.path.join(testdir, mod) | ||
mod = f"{base_mod or 'test'}.{mod}" | ||
tests.extend(findtests(testdir=subdir, exclude=exclude, | ||
split_test_dirs=split_test_dirs, | ||
base_mod=mod)) | ||
elif ext in (".py", ""): | ||
tests.append(f"{base_mod}.{mod}" if base_mod else mod) | ||
return sorted(tests) | ||
|
||
|
||
def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(), | ||
split_test_dirs=SPLITTESTDIRS): | ||
testdir = findtestdir(testdir) | ||
splitted = [] | ||
for name in tests: | ||
if name in split_test_dirs: | ||
subdir = os.path.join(testdir, name) | ||
splitted.extend(findtests(testdir=subdir, exclude=exclude, | ||
split_test_dirs=split_test_dirs, | ||
base_mod=name)) | ||
else: | ||
splitted.append(name) | ||
return splitted |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
import dataclasses | ||
import json | ||
from typing import Any | ||
|
||
from test.support import TestStats | ||
|
||
from test.libregrtest.utils import ( | ||
TestName, FilterTuple, | ||
format_duration, normalize_test_name, print_warning) | ||
|
||
|
||
# Avoid enum.Enum to reduce the number of imports when tests are run | ||
class State: | ||
PASSED = "PASSED" | ||
FAILED = "FAILED" | ||
SKIPPED = "SKIPPED" | ||
UNCAUGHT_EXC = "UNCAUGHT_EXC" | ||
REFLEAK = "REFLEAK" | ||
ENV_CHANGED = "ENV_CHANGED" | ||
RESOURCE_DENIED = "RESOURCE_DENIED" | ||
INTERRUPTED = "INTERRUPTED" | ||
MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR" | ||
DID_NOT_RUN = "DID_NOT_RUN" | ||
TIMEOUT = "TIMEOUT" | ||
|
||
@staticmethod | ||
def is_failed(state): | ||
return state in { | ||
State.FAILED, | ||
State.UNCAUGHT_EXC, | ||
State.REFLEAK, | ||
State.MULTIPROCESSING_ERROR, | ||
State.TIMEOUT} | ||
|
||
@staticmethod | ||
def has_meaningful_duration(state): | ||
# Consider that the duration is meaningless for these cases. | ||
# For example, if a whole test file is skipped, its duration | ||
# is unlikely to be the duration of executing its tests, | ||
# but just the duration to execute code which skips the test. | ||
return state not in { | ||
State.SKIPPED, | ||
State.RESOURCE_DENIED, | ||
State.INTERRUPTED, | ||
State.MULTIPROCESSING_ERROR, | ||
State.DID_NOT_RUN} | ||
|
||
@staticmethod | ||
def must_stop(state): | ||
return state in { | ||
State.INTERRUPTED, | ||
State.MULTIPROCESSING_ERROR} | ||
|
||
|
||
@dataclasses.dataclass(slots=True) | ||
class TestResult: | ||
test_name: TestName | ||
state: str | None = None | ||
# Test duration in seconds | ||
duration: float | None = None | ||
xml_data: list[str] | None = None | ||
stats: TestStats | None = None | ||
|
||
# errors and failures copied from support.TestFailedWithDetails | ||
errors: list[tuple[str, str]] | None = None | ||
failures: list[tuple[str, str]] | None = None | ||
|
||
def is_failed(self, fail_env_changed: bool) -> bool: | ||
if self.state == State.ENV_CHANGED: | ||
return fail_env_changed | ||
return State.is_failed(self.state) | ||
|
||
def _format_failed(self): | ||
if self.errors and self.failures: | ||
le = len(self.errors) | ||
lf = len(self.failures) | ||
error_s = "error" + ("s" if le > 1 else "") | ||
failure_s = "failure" + ("s" if lf > 1 else "") | ||
return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})" | ||
|
||
if self.errors: | ||
le = len(self.errors) | ||
error_s = "error" + ("s" if le > 1 else "") | ||
return f"{self.test_name} failed ({le} {error_s})" | ||
|
||
if self.failures: | ||
lf = len(self.failures) | ||
failure_s = "failure" + ("s" if lf > 1 else "") | ||
return f"{self.test_name} failed ({lf} {failure_s})" | ||
|
||
return f"{self.test_name} failed" | ||
|
||
def __str__(self) -> str: | ||
match self.state: | ||
case State.PASSED: | ||
return f"{self.test_name} passed" | ||
case State.FAILED: | ||
return self._format_failed() | ||
case State.SKIPPED: | ||
return f"{self.test_name} skipped" | ||
case State.UNCAUGHT_EXC: | ||
return f"{self.test_name} failed (uncaught exception)" | ||
case State.REFLEAK: | ||
return f"{self.test_name} failed (reference leak)" | ||
case State.ENV_CHANGED: | ||
return f"{self.test_name} failed (env changed)" | ||
case State.RESOURCE_DENIED: | ||
return f"{self.test_name} skipped (resource denied)" | ||
case State.INTERRUPTED: | ||
return f"{self.test_name} interrupted" | ||
case State.MULTIPROCESSING_ERROR: | ||
return f"{self.test_name} process crashed" | ||
case State.DID_NOT_RUN: | ||
return f"{self.test_name} ran no tests" | ||
case State.TIMEOUT: | ||
return f"{self.test_name} timed out ({format_duration(self.duration)})" | ||
case _: | ||
raise ValueError("unknown result state: {state!r}") | ||
|
||
def has_meaningful_duration(self): | ||
return State.has_meaningful_duration(self.state) | ||
|
||
def set_env_changed(self): | ||
if self.state is None or self.state == State.PASSED: | ||
self.state = State.ENV_CHANGED | ||
|
||
def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool: | ||
if State.must_stop(self.state): | ||
return True | ||
if fail_fast and self.is_failed(fail_env_changed): | ||
return True | ||
return False | ||
|
||
def get_rerun_match_tests(self) -> FilterTuple | None: | ||
match_tests = [] | ||
|
||
errors = self.errors or [] | ||
failures = self.failures or [] | ||
for error_list, is_error in ( | ||
(errors, True), | ||
(failures, False), | ||
): | ||
for full_name, *_ in error_list: | ||
match_name = normalize_test_name(full_name, is_error=is_error) | ||
if match_name is None: | ||
# 'setUpModule (test.test_sys)': don't filter tests | ||
return None | ||
if not match_name: | ||
error_type = "ERROR" if is_error else "FAIL" | ||
print_warning(f"rerun failed to parse {error_type} test name: " | ||
f"{full_name!r}: don't filter tests") | ||
return None | ||
match_tests.append(match_name) | ||
|
||
if not match_tests: | ||
return None | ||
return tuple(match_tests) | ||
|
||
def write_json(self, file) -> None: | ||
json.dump(self, file, cls=_EncodeTestResult) | ||
|
||
@staticmethod | ||
def from_json(worker_json) -> 'TestResult': | ||
return json.loads(worker_json, object_hook=_decode_test_result) | ||
|
||
|
||
class _EncodeTestResult(json.JSONEncoder): | ||
def default(self, o: Any) -> dict[str, Any]: | ||
if isinstance(o, TestResult): | ||
result = dataclasses.asdict(o) | ||
result["__test_result__"] = o.__class__.__name__ | ||
return result | ||
else: | ||
return super().default(o) | ||
|
||
|
||
def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]: | ||
if "__test_result__" in data: | ||
data.pop('__test_result__') | ||
if data['stats'] is not None: | ||
data['stats'] = TestStats(**data['stats']) | ||
return TestResult(**data) | ||
else: | ||
return data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.