From b374559df6b35b0f43975ee65a9b93f3f1c68b7e Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Mon, 11 Sep 2023 09:23:11 +0200 Subject: [PATCH] gh-109162: libregrtest: move code around * Move Regrtest.display_header() to utils.py. * Move cleanup_temp_dir() to utils.py. * Move list_cases() to findtests.py. --- Lib/test/libregrtest/findtests.py | 42 ++++++++++++- Lib/test/libregrtest/main.py | 101 +++--------------------------- Lib/test/libregrtest/utils.py | 55 ++++++++++++++++ 3 files changed, 105 insertions(+), 93 deletions(-) diff --git a/Lib/test/libregrtest/findtests.py b/Lib/test/libregrtest/findtests.py index 0d38b8e60eb7223..f4a8b9ae26ae658 100644 --- a/Lib/test/libregrtest/findtests.py +++ b/Lib/test/libregrtest/findtests.py @@ -1,6 +1,12 @@ import os +import sys +import unittest -from .utils import StrPath, TestName, TestList +from test import support + +from .utils import ( + StrPath, TestName, TestTuple, TestList, FilterTuple, + abs_module_name, count, printlist) # If these test directories are encountered recurse into them and treat each @@ -56,3 +62,37 @@ def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(), else: splitted.append(name) return splitted + + +def _list_cases(suite): + for test in suite: + if isinstance(test, unittest.loader._FailedTest): + continue + if isinstance(test, unittest.TestSuite): + _list_cases(test) + elif isinstance(test, unittest.TestCase): + if support.match_test(test): + print(test.id()) + +def list_cases(tests: TestTuple, *, + match_tests: FilterTuple | None = None, + ignore_tests: FilterTuple | None = None, + test_dir: StrPath | None = None): + support.verbose = False + support.set_match_tests(match_tests, ignore_tests) + + skipped = [] + for test_name in tests: + module_name = abs_module_name(test_name, test_dir) + try: + suite = unittest.defaultTestLoader.loadTestsFromName(module_name) + _list_cases(suite) + except unittest.SkipTest: + skipped.append(test_name) + + if skipped: + sys.stdout.flush() + stderr = sys.stderr + print(file=stderr) + print(count(len(skipped), "test"), "skipped:", file=stderr) + printlist(skipped, file=stderr) diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index 0f289c06325bd00..2c0a6c204373cce 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -1,17 +1,14 @@ -import locale import os -import platform import random import re import sys import time -import unittest from test import support from test.support import os_helper from .cmdline import _parse_args, Namespace -from .findtests import findtests, split_test_packages +from .findtests import findtests, split_test_packages, list_cases from .logger import Logger from .result import State from .runtests import RunTests, HuntRefleak @@ -22,8 +19,8 @@ from .utils import ( StrPath, StrJSON, TestName, TestList, TestTuple, FilterTuple, strip_py_suffix, count, format_duration, - printlist, get_build_info, get_temp_dir, get_work_dir, exit_timeout, - abs_module_name) + printlist, get_temp_dir, get_work_dir, exit_timeout, + display_header, cleanup_temp_dir) class Regrtest: @@ -214,36 +211,6 @@ def list_tests(tests: TestTuple): for name in tests: print(name) - def _list_cases(self, suite): - for test in suite: - if isinstance(test, unittest.loader._FailedTest): - continue - if isinstance(test, unittest.TestSuite): - self._list_cases(test) - elif isinstance(test, unittest.TestCase): - if support.match_test(test): - print(test.id()) - - def list_cases(self, tests: TestTuple): - support.verbose = False - support.set_match_tests(self.match_tests, self.ignore_tests) - - skipped = [] - for test_name in tests: - module_name = abs_module_name(test_name, self.test_dir) - try: - suite = unittest.defaultTestLoader.loadTestsFromName(module_name) - self._list_cases(suite) - except unittest.SkipTest: - skipped.append(test_name) - - if skipped: - sys.stdout.flush() - stderr = sys.stderr - print(file=stderr) - print(count(len(skipped), "test"), "skipped:", file=stderr) - printlist(skipped, file=stderr) - def _rerun_failed_tests(self, runtests: RunTests): # Configure the runner to re-run tests if self.num_workers == 0: @@ -363,45 +330,6 @@ def run_tests_sequentially(self, runtests): return tracer - @staticmethod - def display_header(): - # Print basic platform information - print("==", platform.python_implementation(), *sys.version.split()) - print("==", platform.platform(aliased=True), - "%s-endian" % sys.byteorder) - print("== Python build:", ' '.join(get_build_info())) - print("== cwd:", os.getcwd()) - cpu_count = os.cpu_count() - if cpu_count: - print("== CPU count:", cpu_count) - print("== encodings: locale=%s, FS=%s" - % (locale.getencoding(), sys.getfilesystemencoding())) - - # This makes it easier to remember what to set in your local - # environment when trying to reproduce a sanitizer failure. - asan = support.check_sanitizer(address=True) - msan = support.check_sanitizer(memory=True) - ubsan = support.check_sanitizer(ub=True) - sanitizers = [] - if asan: - sanitizers.append("address") - if msan: - sanitizers.append("memory") - if ubsan: - sanitizers.append("undefined behavior") - if not sanitizers: - return - - print(f"== sanitizers: {', '.join(sanitizers)}") - for sanitizer, env_var in ( - (asan, "ASAN_OPTIONS"), - (msan, "MSAN_OPTIONS"), - (ubsan, "UBSAN_OPTIONS"), - ): - options= os.environ.get(env_var) - if sanitizer and options is not None: - print(f"== {env_var}={options!r}") - def get_state(self): state = self.results.get_state(self.fail_env_changed) if self.first_state: @@ -445,20 +373,6 @@ def display_summary(self): state = self.get_state() print(f"Result: {state}") - @staticmethod - def cleanup_temp_dir(tmp_dir: StrPath): - import glob - - path = os.path.join(glob.escape(tmp_dir), 'test_python_*') - print("Cleanup %s directory" % tmp_dir) - for name in glob.glob(path): - if os.path.isdir(name): - print("Remove directory: %s" % name) - os_helper.rmtree(name) - else: - print("Remove file: %s" % name) - os_helper.unlink(name) - def create_run_tests(self, tests: TestTuple): return RunTests( tests, @@ -496,7 +410,7 @@ def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int: if (self.want_header or not(self.pgo or self.quiet or self.single_test_run or tests or self.cmdline_args)): - self.display_header() + display_header() if self.randomize: print("Using random seed", self.random_seed) @@ -554,7 +468,7 @@ def main(self, tests: TestList | None = None): self.tmp_dir = get_temp_dir(self.tmp_dir) if self.want_cleanup: - self.cleanup_temp_dir(self.tmp_dir) + cleanup_temp_dir(self.tmp_dir) sys.exit(0) if self.want_wait: @@ -567,7 +481,10 @@ def main(self, tests: TestList | None = None): if self.want_list_tests: self.list_tests(selected) elif self.want_list_cases: - self.list_cases(selected) + list_cases(selected, + match_tests=self.match_tests, + ignore_tests=self.ignore_tests, + test_dir=self.test_dir) else: exitcode = self.run_tests(selected, tests) diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py index f97e3fd4bb7106c..b46cec6f0eec700 100644 --- a/Lib/test/libregrtest/utils.py +++ b/Lib/test/libregrtest/utils.py @@ -1,8 +1,10 @@ import atexit import contextlib import faulthandler +import locale import math import os.path +import platform import random import sys import sysconfig @@ -524,3 +526,56 @@ def adjust_rlimit_nofile(): except (ValueError, OSError) as err: print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to " f"{new_fd_limit}: {err}.") + + +def display_header(): + # Print basic platform information + print("==", platform.python_implementation(), *sys.version.split()) + print("==", platform.platform(aliased=True), + "%s-endian" % sys.byteorder) + print("== Python build:", ' '.join(get_build_info())) + print("== cwd:", os.getcwd()) + cpu_count = os.cpu_count() + if cpu_count: + print("== CPU count:", cpu_count) + print("== encodings: locale=%s, FS=%s" + % (locale.getencoding(), sys.getfilesystemencoding())) + + # This makes it easier to remember what to set in your local + # environment when trying to reproduce a sanitizer failure. + asan = support.check_sanitizer(address=True) + msan = support.check_sanitizer(memory=True) + ubsan = support.check_sanitizer(ub=True) + sanitizers = [] + if asan: + sanitizers.append("address") + if msan: + sanitizers.append("memory") + if ubsan: + sanitizers.append("undefined behavior") + if not sanitizers: + return + + print(f"== sanitizers: {', '.join(sanitizers)}") + for sanitizer, env_var in ( + (asan, "ASAN_OPTIONS"), + (msan, "MSAN_OPTIONS"), + (ubsan, "UBSAN_OPTIONS"), + ): + options= os.environ.get(env_var) + if sanitizer and options is not None: + print(f"== {env_var}={options!r}") + + +def cleanup_temp_dir(tmp_dir: StrPath): + import glob + + path = os.path.join(glob.escape(tmp_dir), 'test_python_*') + print("Cleanup %s directory" % tmp_dir) + for name in glob.glob(path): + if os.path.isdir(name): + print("Remove directory: %s" % name) + os_helper.rmtree(name) + else: + print("Remove file: %s" % name) + os_helper.unlink(name)