From 5bf9d182abeef7e283c656fb1a1d3f94704b2b0e Mon Sep 17 00:00:00 2001 From: Max Moroz Date: Thu, 16 Mar 2017 19:39:22 -0700 Subject: [PATCH] Optimize tests (parallel execution) (#3019) I added a few features to runtests.py: * -p argument to pass arguments to pytest * move pytest to the front, run before anything else * combine all pytest tests into one shell command * schedule longer tasks first --- .gitignore | 1 + mypy/waiter.py | 108 ++++++++++++++++++++++++++++++++++++++++++++----- runtests.py | 61 +++++++++++++++++++++------- 3 files changed, 147 insertions(+), 23 deletions(-) diff --git a/.gitignore b/.gitignore index 731180e6e163..2d7c8d44982d 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ docs/build/ .mypy_cache/ .incremental_checker_cache.json .cache +.runtest_log.json # Packages *.egg diff --git a/mypy/waiter.py b/mypy/waiter.py index 10fa4028cbac..62a0555974c0 100644 --- a/mypy/waiter.py +++ b/mypy/waiter.py @@ -3,15 +3,18 @@ This is used for running mypy tests. """ -from typing import Dict, List, Optional, Set, Tuple +from typing import Dict, List, Optional, Set, Tuple, Any, Iterable import os +from multiprocessing import cpu_count import pipes import re from subprocess import Popen, STDOUT import sys import tempfile import time +import json +from collections import defaultdict class WaiterError(Exception): @@ -32,7 +35,7 @@ def __init__(self, name: str, args: List[str], *, cwd: str = None, def start(self) -> None: self.outfile = tempfile.TemporaryFile() - self.start_time = time.time() + self.start_time = time.perf_counter() self.process = Popen(self.args, cwd=self.cwd, env=self.env, stdout=self.outfile, stderr=STDOUT) self.pid = self.process.pid @@ -107,7 +110,11 @@ class Waiter: if not waiter.run(): print('error') """ - def __init__(self, limit: int = 0, *, verbosity: int = 0, xfail: List[str] = []) -> None: + LOGSIZE = 50 + FULL_LOG_FILENAME = '.runtest_log.json' + + def __init__(self, limit: int = 0, *, verbosity: int = 0, xfail: List[str] = [], + lf: bool = False, ff: bool = False) -> None: self.verbosity = verbosity self.queue = [] # type: List[LazySubprocess] # Index of next task to run in the queue. @@ -117,21 +124,42 @@ def __init__(self, limit: int = 0, *, verbosity: int = 0, xfail: List[str] = []) try: sched_getaffinity = os.sched_getaffinity except AttributeError: - limit = 2 + # no support for affinity on OSX/Windows + limit = cpu_count() else: # Note: only count CPUs we are allowed to use. It is a # major mistake to count *all* CPUs on the machine. limit = len(sched_getaffinity(0)) self.limit = limit + self.lf = lf + self.ff = ff assert limit > 0 self.xfail = set(xfail) self._note = None # type: Noter self.times1 = {} # type: Dict[str, float] self.times2 = {} # type: Dict[str, float] - - def add(self, cmd: LazySubprocess) -> int: + self.new_log = defaultdict(dict) # type: Dict[str, Dict[str, float]] + self.sequential_tasks = set() # type: Set[str] + + def load_log_file(self) -> Optional[List[Dict[str, Dict[str, Any]]]]: + try: + # get the last log + with open(self.FULL_LOG_FILENAME) as fp: + test_log = json.load(fp) + except FileNotFoundError: + test_log = [] + except json.JSONDecodeError: + print('corrupt test log file {}'.format(self.FULL_LOG_FILENAME), file=sys.stderr) + test_log = [] + return test_log + + def add(self, cmd: LazySubprocess, sequential: bool = False) -> int: rv = len(self.queue) + if cmd.name in (task.name for task in self.queue): + sys.exit('Duplicate test name: {}'.format(cmd.name)) self.queue.append(cmd) + if sequential: + self.sequential_tasks.add(cmd.name) return rv def _start_next(self) -> None: @@ -161,12 +189,14 @@ def _record_time(self, name: str, elapsed_time: float) -> None: def _poll_current(self) -> Tuple[int, int]: while True: - time.sleep(.05) + time.sleep(.01) for pid in self.current: cmd = self.current[pid][1] code = cmd.process.poll() if code is not None: - cmd.end_time = time.time() + cmd.end_time = time.perf_counter() + self.new_log['exit_code'][cmd.name] = code + self.new_log['runtime'][cmd.name] = cmd.end_time - cmd.start_time return pid, code def _wait_next(self) -> Tuple[List[str], int, int]: @@ -239,6 +269,47 @@ def run(self) -> int: if self.verbosity == 0: self._note = Noter(len(self.queue)) print('SUMMARY %d tasks selected' % len(self.queue)) + + def avg(lst: Iterable[float]) -> float: + valid_items = [item for item in lst if item is not None] + if not valid_items: + # we don't know how long a new task takes + # better err by putting it in front in case it is slow: + # a fast task in front hurts performance less than a slow task in the back + return float('inf') + else: + return sum(valid_items) / len(valid_items) + + logs = self.load_log_file() + if logs: + times = {cmd.name: avg(log['runtime'].get(cmd.name, None) for log in logs) + for cmd in self.queue} + + def sort_function(cmd: LazySubprocess) -> Tuple[Any, int, float]: + # longest tasks first + runtime = -times[cmd.name] + # sequential tasks go first by default + sequential = -(cmd.name in self.sequential_tasks) + if self.ff: + # failed tasks first with -ff + exit_code = -logs[-1]['exit_code'].get(cmd.name, 0) + if not exit_code: + # avoid interrupting parallel tasks with sequential in between + # so either: seq failed, parallel failed, parallel passed, seq passed + # or: parallel failed, seq failed, seq passed, parallel passed + # I picked the first one arbitrarily, since no obvious pros/cons + # in other words, among failed tasks, sequential should go before parallel, + # and among successful tasks, sequential should go after parallel + sequential = -sequential + else: + # ignore exit code without -ff + exit_code = 0 + return exit_code, sequential, runtime + self.queue = sorted(self.queue, key=sort_function) + if self.lf: + self.queue = [cmd for cmd in self.queue + if logs[-1]['exit_code'].get(cmd.name, 0)] + sys.stdout.flush() # Failed tasks. all_failures = [] # type: List[str] @@ -246,15 +317,35 @@ def run(self) -> int: total_tests = 0 # Number of failed test cases. total_failed_tests = 0 + running_sequential_task = False while self.current or self.next < len(self.queue): while len(self.current) < self.limit and self.next < len(self.queue): + # only start next task if idle, or current and next tasks are both parallel + if running_sequential_task: + break + if self.queue[self.next].name in self.sequential_tasks: + if self.current: + break + else: + running_sequential_task = True self._start_next() fails, tests, test_fails = self._wait_next() + running_sequential_task = False all_failures += fails total_tests += tests total_failed_tests += test_fails if self.verbosity == 0: self._note.clear() + + if self.new_log: # don't append empty log, it will corrupt the cache file + # log only LOGSIZE most recent tests + test_log = (self.load_log_file() + [self.new_log])[:self.LOGSIZE] + try: + with open(self.FULL_LOG_FILENAME, 'w') as fp: + json.dump(test_log, fp, sort_keys=True, indent=4) + except Exception as e: + print('cannot save test log file:', e) + if all_failures: summary = 'SUMMARY %d/%d tasks and %d/%d tests failed' % ( len(all_failures), len(self.queue), total_failed_tests, total_tests) @@ -271,7 +362,6 @@ def run(self) -> int: len(self.queue), total_tests)) print('*** OK ***') sys.stdout.flush() - return 0 diff --git a/runtests.py b/runtests.py index 2102c83c5053..4be285b19cf9 100755 --- a/runtests.py +++ b/runtests.py @@ -34,6 +34,7 @@ def get_versions(): # type: () -> typing.List[str] import itertools import os import re +import json # Ideally, all tests would be `discover`able so that they can be driven @@ -41,14 +42,17 @@ def get_versions(): # type: () -> typing.List[str] class Driver: - def __init__(self, whitelist: List[str], blacklist: List[str], - arglist: List[str], verbosity: int, parallel_limit: int, + def __init__(self, *, whitelist: List[str], blacklist: List[str], + lf: bool, ff: bool, + arglist: List[str], pyt_arglist: List[str], + verbosity: int, parallel_limit: int, xfail: List[str], coverage: bool) -> None: self.whitelist = whitelist self.blacklist = blacklist self.arglist = arglist + self.pyt_arglist = pyt_arglist self.verbosity = verbosity - self.waiter = Waiter(verbosity=verbosity, limit=parallel_limit, xfail=xfail) + self.waiter = Waiter(verbosity=verbosity, limit=parallel_limit, xfail=xfail, lf=lf, ff=ff) self.versions = get_versions() self.cwd = os.getcwd() self.mypy = os.path.join(self.cwd, 'scripts', 'mypy') @@ -107,7 +111,7 @@ def add_pytest(self, name: str, pytest_args: List[str], coverage: bool = False) else: args = [sys.executable, '-m', 'pytest'] + pytest_args - self.waiter.add(LazySubprocess(full_name, args, env=self.env)) + self.waiter.add(LazySubprocess(full_name, args, env=self.env), sequential=True) def add_python(self, name: str, *args: str, cwd: Optional[str] = None) -> None: name = 'run %s' % name @@ -155,7 +159,7 @@ def add_flake8(self, cwd: Optional[str] = None) -> None: name = 'lint' if not self.allow(name): return - largs = ['flake8', '-j{}'.format(self.waiter.limit)] + largs = ['flake8', '-j0'] env = self.env self.waiter.add(LazySubprocess(name, largs, cwd=cwd, env=env)) @@ -175,7 +179,8 @@ def add_basic(driver: Driver) -> None: def add_selftypecheck(driver: Driver) -> None: - driver.add_mypy_package('package mypy', 'mypy', '--config-file', 'mypy_self_check.ini') + driver.add_mypy_package('package mypy nonstrict optional', 'mypy', '--config-file', + 'mypy_self_check.ini') driver.add_mypy_package('package mypy', 'mypy', '--config-file', 'mypy_strict_optional.ini') @@ -209,8 +214,7 @@ def add_imports(driver: Driver) -> None: def add_pytest(driver: Driver) -> None: - for f in PYTEST_FILES: - driver.add_pytest(f, [f] + driver.arglist, True) + driver.add_pytest('pytest', PYTEST_FILES + driver.arglist + driver.pyt_arglist, True) def add_myunit(driver: Driver) -> None: @@ -297,7 +301,9 @@ def add_samples(driver: Driver) -> None: def usage(status: int) -> None: - print('Usage: %s [-h | -v | -q | [-x] FILTER | -a ARG] ... [-- FILTER ...]' % sys.argv[0]) + print('Usage: %s [-h | -v | -q | --lf | --ff | [-x] FILTER | -a ARG | -p ARG]' + '... [-- FILTER ...]' + % sys.argv[0]) print() print('Run mypy tests. If given no arguments, run all tests.') print() @@ -309,9 +315,12 @@ def usage(status: int) -> None: print('Options:') print(' -h, --help show this help') print(' -v, --verbose increase driver verbosity') + print(' --lf rerun only the tests that failed at the last run') + print(' --ff run all tests but run the last failures first') print(' -q, --quiet decrease driver verbosity') print(' -jN run N tasks at once (default: one per CPU)') print(' -a, --argument ARG pass an argument to myunit tasks') + print(' -p, --pytest_arg ARG pass an argument to pytest tasks') print(' (-v: verbose; glob pattern: filter by test name)') print(' -l, --list list included tasks (after filtering) and exit') print(' FILTER include tasks matching FILTER') @@ -337,6 +346,8 @@ def sanity() -> None: def main() -> None: + import time + t0 = time.perf_counter() sanity() verbosity = 0 @@ -344,13 +355,16 @@ def main() -> None: whitelist = [] # type: List[str] blacklist = [] # type: List[str] arglist = [] # type: List[str] + pyt_arglist = [] # type: List[str] + lf = False + ff = False list_only = False coverage = False allow_opts = True curlist = whitelist for a in sys.argv[1:]: - if curlist is not arglist and allow_opts and a.startswith('-'): + if not (curlist is arglist or curlist is pyt_arglist) and allow_opts and a.startswith('-'): if curlist is not whitelist: break if a == '--': @@ -368,6 +382,14 @@ def main() -> None: curlist = blacklist elif a == '-a' or a == '--argument': curlist = arglist + elif a == '-p' or a == '--pytest_arg': + curlist = pyt_arglist + # will also pass this option to pytest + elif a == '--lf': + lf = True + # will also pass this option to pytest + elif a == '--ff': + ff = True elif a == '-l' or a == '--list': list_only = True elif a == '-c' or a == '--coverage': @@ -383,35 +405,46 @@ def main() -> None: sys.exit('-x must be followed by a filter') if curlist is arglist: sys.exit('-a must be followed by an argument') + if curlist is pyt_arglist: + sys.exit('-p must be followed by an argument') + if lf and ff: + sys.exit('use either --lf or --ff, not both') # empty string is a substring of all names if not whitelist: whitelist.append('') + if lf: + pyt_arglist.append('--lf') + if ff: + pyt_arglist.append('--ff') - driver = Driver(whitelist=whitelist, blacklist=blacklist, arglist=arglist, - verbosity=verbosity, parallel_limit=parallel_limit, xfail=[], coverage=coverage) + driver = Driver(whitelist=whitelist, blacklist=blacklist, lf=lf, ff=ff, + arglist=arglist, pyt_arglist=pyt_arglist, verbosity=verbosity, + parallel_limit=parallel_limit, xfail=[], coverage=coverage) driver.prepend_path('PATH', [join(driver.cwd, 'scripts')]) driver.prepend_path('MYPYPATH', [driver.cwd]) driver.prepend_path('PYTHONPATH', [driver.cwd]) driver.prepend_path('PYTHONPATH', [join(driver.cwd, 'lib-typing', v) for v in driver.versions]) + driver.add_flake8() + add_pytest(driver) add_pythoneval(driver) add_cmdline(driver) add_basic(driver) add_selftypecheck(driver) - add_pytest(driver) add_myunit(driver) add_imports(driver) add_stubs(driver) add_stdlibsamples(driver) add_samples(driver) - driver.add_flake8() if list_only: driver.list_tasks() return exit_code = driver.waiter.run() + t1 = time.perf_counter() + print('total runtime:', t1 - t0, 'sec') if verbosity >= 1: times = driver.waiter.times2 if verbosity >= 2 else driver.waiter.times1