Skip to content

Commit

Permalink
Optimize tests (parallel execution) (python#3019)
Browse files Browse the repository at this point in the history
I added a few features to runtests.py:

* -p argument to pass arguments to pytest
* move pytest to the front, run before anything else
* combine all pytest tests into one shell command
* schedule longer tasks first
  • Loading branch information
pkch authored and JukkaL committed Apr 3, 2017
1 parent 00359ad commit 5bf9d18
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 23 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ docs/build/
.mypy_cache/
.incremental_checker_cache.json
.cache
.runtest_log.json

# Packages
*.egg
Expand Down
108 changes: 99 additions & 9 deletions mypy/waiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
This is used for running mypy tests.
"""

from typing import Dict, List, Optional, Set, Tuple
from typing import Dict, List, Optional, Set, Tuple, Any, Iterable

import os
from multiprocessing import cpu_count
import pipes
import re
from subprocess import Popen, STDOUT
import sys
import tempfile
import time
import json
from collections import defaultdict


class WaiterError(Exception):
Expand All @@ -32,7 +35,7 @@ def __init__(self, name: str, args: List[str], *, cwd: str = None,

def start(self) -> None:
self.outfile = tempfile.TemporaryFile()
self.start_time = time.time()
self.start_time = time.perf_counter()
self.process = Popen(self.args, cwd=self.cwd, env=self.env,
stdout=self.outfile, stderr=STDOUT)
self.pid = self.process.pid
Expand Down Expand Up @@ -107,7 +110,11 @@ class Waiter:
if not waiter.run():
print('error')
"""
def __init__(self, limit: int = 0, *, verbosity: int = 0, xfail: List[str] = []) -> None:
LOGSIZE = 50
FULL_LOG_FILENAME = '.runtest_log.json'

def __init__(self, limit: int = 0, *, verbosity: int = 0, xfail: List[str] = [],
lf: bool = False, ff: bool = False) -> None:
self.verbosity = verbosity
self.queue = [] # type: List[LazySubprocess]
# Index of next task to run in the queue.
Expand All @@ -117,21 +124,42 @@ def __init__(self, limit: int = 0, *, verbosity: int = 0, xfail: List[str] = [])
try:
sched_getaffinity = os.sched_getaffinity
except AttributeError:
limit = 2
# no support for affinity on OSX/Windows
limit = cpu_count()
else:
# Note: only count CPUs we are allowed to use. It is a
# major mistake to count *all* CPUs on the machine.
limit = len(sched_getaffinity(0))
self.limit = limit
self.lf = lf
self.ff = ff
assert limit > 0
self.xfail = set(xfail)
self._note = None # type: Noter
self.times1 = {} # type: Dict[str, float]
self.times2 = {} # type: Dict[str, float]

def add(self, cmd: LazySubprocess) -> int:
self.new_log = defaultdict(dict) # type: Dict[str, Dict[str, float]]
self.sequential_tasks = set() # type: Set[str]

def load_log_file(self) -> Optional[List[Dict[str, Dict[str, Any]]]]:
try:
# get the last log
with open(self.FULL_LOG_FILENAME) as fp:
test_log = json.load(fp)
except FileNotFoundError:
test_log = []
except json.JSONDecodeError:
print('corrupt test log file {}'.format(self.FULL_LOG_FILENAME), file=sys.stderr)
test_log = []
return test_log

def add(self, cmd: LazySubprocess, sequential: bool = False) -> int:
rv = len(self.queue)
if cmd.name in (task.name for task in self.queue):
sys.exit('Duplicate test name: {}'.format(cmd.name))
self.queue.append(cmd)
if sequential:
self.sequential_tasks.add(cmd.name)
return rv

def _start_next(self) -> None:
Expand Down Expand Up @@ -161,12 +189,14 @@ def _record_time(self, name: str, elapsed_time: float) -> None:

def _poll_current(self) -> Tuple[int, int]:
while True:
time.sleep(.05)
time.sleep(.01)
for pid in self.current:
cmd = self.current[pid][1]
code = cmd.process.poll()
if code is not None:
cmd.end_time = time.time()
cmd.end_time = time.perf_counter()
self.new_log['exit_code'][cmd.name] = code
self.new_log['runtime'][cmd.name] = cmd.end_time - cmd.start_time
return pid, code

def _wait_next(self) -> Tuple[List[str], int, int]:
Expand Down Expand Up @@ -239,22 +269,83 @@ def run(self) -> int:
if self.verbosity == 0:
self._note = Noter(len(self.queue))
print('SUMMARY %d tasks selected' % len(self.queue))

def avg(lst: Iterable[float]) -> float:
valid_items = [item for item in lst if item is not None]
if not valid_items:
# we don't know how long a new task takes
# better err by putting it in front in case it is slow:
# a fast task in front hurts performance less than a slow task in the back
return float('inf')
else:
return sum(valid_items) / len(valid_items)

logs = self.load_log_file()
if logs:
times = {cmd.name: avg(log['runtime'].get(cmd.name, None) for log in logs)
for cmd in self.queue}

def sort_function(cmd: LazySubprocess) -> Tuple[Any, int, float]:
# longest tasks first
runtime = -times[cmd.name]
# sequential tasks go first by default
sequential = -(cmd.name in self.sequential_tasks)
if self.ff:
# failed tasks first with -ff
exit_code = -logs[-1]['exit_code'].get(cmd.name, 0)
if not exit_code:
# avoid interrupting parallel tasks with sequential in between
# so either: seq failed, parallel failed, parallel passed, seq passed
# or: parallel failed, seq failed, seq passed, parallel passed
# I picked the first one arbitrarily, since no obvious pros/cons
# in other words, among failed tasks, sequential should go before parallel,
# and among successful tasks, sequential should go after parallel
sequential = -sequential
else:
# ignore exit code without -ff
exit_code = 0
return exit_code, sequential, runtime
self.queue = sorted(self.queue, key=sort_function)
if self.lf:
self.queue = [cmd for cmd in self.queue
if logs[-1]['exit_code'].get(cmd.name, 0)]

sys.stdout.flush()
# Failed tasks.
all_failures = [] # type: List[str]
# Number of test cases. Some tasks can involve multiple test cases.
total_tests = 0
# Number of failed test cases.
total_failed_tests = 0
running_sequential_task = False
while self.current or self.next < len(self.queue):
while len(self.current) < self.limit and self.next < len(self.queue):
# only start next task if idle, or current and next tasks are both parallel
if running_sequential_task:
break
if self.queue[self.next].name in self.sequential_tasks:
if self.current:
break
else:
running_sequential_task = True
self._start_next()
fails, tests, test_fails = self._wait_next()
running_sequential_task = False
all_failures += fails
total_tests += tests
total_failed_tests += test_fails
if self.verbosity == 0:
self._note.clear()

if self.new_log: # don't append empty log, it will corrupt the cache file
# log only LOGSIZE most recent tests
test_log = (self.load_log_file() + [self.new_log])[:self.LOGSIZE]
try:
with open(self.FULL_LOG_FILENAME, 'w') as fp:
json.dump(test_log, fp, sort_keys=True, indent=4)
except Exception as e:
print('cannot save test log file:', e)

if all_failures:
summary = 'SUMMARY %d/%d tasks and %d/%d tests failed' % (
len(all_failures), len(self.queue), total_failed_tests, total_tests)
Expand All @@ -271,7 +362,6 @@ def run(self) -> int:
len(self.queue), total_tests))
print('*** OK ***')
sys.stdout.flush()

return 0


Expand Down
61 changes: 47 additions & 14 deletions runtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,25 @@ def get_versions(): # type: () -> typing.List[str]
import itertools
import os
import re
import json


# Ideally, all tests would be `discover`able so that they can be driven
# (and parallelized) by an external test driver.

class Driver:

def __init__(self, whitelist: List[str], blacklist: List[str],
arglist: List[str], verbosity: int, parallel_limit: int,
def __init__(self, *, whitelist: List[str], blacklist: List[str],
lf: bool, ff: bool,
arglist: List[str], pyt_arglist: List[str],
verbosity: int, parallel_limit: int,
xfail: List[str], coverage: bool) -> None:
self.whitelist = whitelist
self.blacklist = blacklist
self.arglist = arglist
self.pyt_arglist = pyt_arglist
self.verbosity = verbosity
self.waiter = Waiter(verbosity=verbosity, limit=parallel_limit, xfail=xfail)
self.waiter = Waiter(verbosity=verbosity, limit=parallel_limit, xfail=xfail, lf=lf, ff=ff)
self.versions = get_versions()
self.cwd = os.getcwd()
self.mypy = os.path.join(self.cwd, 'scripts', 'mypy')
Expand Down Expand Up @@ -107,7 +111,7 @@ def add_pytest(self, name: str, pytest_args: List[str], coverage: bool = False)
else:
args = [sys.executable, '-m', 'pytest'] + pytest_args

self.waiter.add(LazySubprocess(full_name, args, env=self.env))
self.waiter.add(LazySubprocess(full_name, args, env=self.env), sequential=True)

def add_python(self, name: str, *args: str, cwd: Optional[str] = None) -> None:
name = 'run %s' % name
Expand Down Expand Up @@ -155,7 +159,7 @@ def add_flake8(self, cwd: Optional[str] = None) -> None:
name = 'lint'
if not self.allow(name):
return
largs = ['flake8', '-j{}'.format(self.waiter.limit)]
largs = ['flake8', '-j0']
env = self.env
self.waiter.add(LazySubprocess(name, largs, cwd=cwd, env=env))

Expand All @@ -175,7 +179,8 @@ def add_basic(driver: Driver) -> None:


def add_selftypecheck(driver: Driver) -> None:
driver.add_mypy_package('package mypy', 'mypy', '--config-file', 'mypy_self_check.ini')
driver.add_mypy_package('package mypy nonstrict optional', 'mypy', '--config-file',
'mypy_self_check.ini')
driver.add_mypy_package('package mypy', 'mypy', '--config-file', 'mypy_strict_optional.ini')


Expand Down Expand Up @@ -209,8 +214,7 @@ def add_imports(driver: Driver) -> None:


def add_pytest(driver: Driver) -> None:
for f in PYTEST_FILES:
driver.add_pytest(f, [f] + driver.arglist, True)
driver.add_pytest('pytest', PYTEST_FILES + driver.arglist + driver.pyt_arglist, True)


def add_myunit(driver: Driver) -> None:
Expand Down Expand Up @@ -297,7 +301,9 @@ def add_samples(driver: Driver) -> None:


def usage(status: int) -> None:
print('Usage: %s [-h | -v | -q | [-x] FILTER | -a ARG] ... [-- FILTER ...]' % sys.argv[0])
print('Usage: %s [-h | -v | -q | --lf | --ff | [-x] FILTER | -a ARG | -p ARG]'
'... [-- FILTER ...]'
% sys.argv[0])
print()
print('Run mypy tests. If given no arguments, run all tests.')
print()
Expand All @@ -309,9 +315,12 @@ def usage(status: int) -> None:
print('Options:')
print(' -h, --help show this help')
print(' -v, --verbose increase driver verbosity')
print(' --lf rerun only the tests that failed at the last run')
print(' --ff run all tests but run the last failures first')
print(' -q, --quiet decrease driver verbosity')
print(' -jN run N tasks at once (default: one per CPU)')
print(' -a, --argument ARG pass an argument to myunit tasks')
print(' -p, --pytest_arg ARG pass an argument to pytest tasks')
print(' (-v: verbose; glob pattern: filter by test name)')
print(' -l, --list list included tasks (after filtering) and exit')
print(' FILTER include tasks matching FILTER')
Expand All @@ -337,20 +346,25 @@ def sanity() -> None:


def main() -> None:
import time
t0 = time.perf_counter()
sanity()

verbosity = 0
parallel_limit = 0
whitelist = [] # type: List[str]
blacklist = [] # type: List[str]
arglist = [] # type: List[str]
pyt_arglist = [] # type: List[str]
lf = False
ff = False
list_only = False
coverage = False

allow_opts = True
curlist = whitelist
for a in sys.argv[1:]:
if curlist is not arglist and allow_opts and a.startswith('-'):
if not (curlist is arglist or curlist is pyt_arglist) and allow_opts and a.startswith('-'):
if curlist is not whitelist:
break
if a == '--':
Expand All @@ -368,6 +382,14 @@ def main() -> None:
curlist = blacklist
elif a == '-a' or a == '--argument':
curlist = arglist
elif a == '-p' or a == '--pytest_arg':
curlist = pyt_arglist
# will also pass this option to pytest
elif a == '--lf':
lf = True
# will also pass this option to pytest
elif a == '--ff':
ff = True
elif a == '-l' or a == '--list':
list_only = True
elif a == '-c' or a == '--coverage':
Expand All @@ -383,35 +405,46 @@ def main() -> None:
sys.exit('-x must be followed by a filter')
if curlist is arglist:
sys.exit('-a must be followed by an argument')
if curlist is pyt_arglist:
sys.exit('-p must be followed by an argument')
if lf and ff:
sys.exit('use either --lf or --ff, not both')
# empty string is a substring of all names
if not whitelist:
whitelist.append('')
if lf:
pyt_arglist.append('--lf')
if ff:
pyt_arglist.append('--ff')

driver = Driver(whitelist=whitelist, blacklist=blacklist, arglist=arglist,
verbosity=verbosity, parallel_limit=parallel_limit, xfail=[], coverage=coverage)
driver = Driver(whitelist=whitelist, blacklist=blacklist, lf=lf, ff=ff,
arglist=arglist, pyt_arglist=pyt_arglist, verbosity=verbosity,
parallel_limit=parallel_limit, xfail=[], coverage=coverage)

driver.prepend_path('PATH', [join(driver.cwd, 'scripts')])
driver.prepend_path('MYPYPATH', [driver.cwd])
driver.prepend_path('PYTHONPATH', [driver.cwd])
driver.prepend_path('PYTHONPATH', [join(driver.cwd, 'lib-typing', v) for v in driver.versions])

driver.add_flake8()
add_pytest(driver)
add_pythoneval(driver)
add_cmdline(driver)
add_basic(driver)
add_selftypecheck(driver)
add_pytest(driver)
add_myunit(driver)
add_imports(driver)
add_stubs(driver)
add_stdlibsamples(driver)
add_samples(driver)
driver.add_flake8()

if list_only:
driver.list_tasks()
return

exit_code = driver.waiter.run()
t1 = time.perf_counter()
print('total runtime:', t1 - t0, 'sec')

if verbosity >= 1:
times = driver.waiter.times2 if verbosity >= 2 else driver.waiter.times1
Expand Down

0 comments on commit 5bf9d18

Please sign in to comment.