Skip to content

Commit

Permalink
Add support for test cases with more than 2 incremental runs (python#…
Browse files Browse the repository at this point in the history
…3347)

Use .2, .3 etc. as the suffixes for files in the second and later runs (instead of .next).
  • Loading branch information
JukkaL authored and gvanrossum committed May 22, 2017
1 parent 05521e4 commit 03f9521
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 199 deletions.
76 changes: 46 additions & 30 deletions mypy/test/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import shutil

import pytest # type: ignore # no pytest in typeshed
from typing import Callable, List, Tuple, Set, Optional, Iterator, Any
from typing import Callable, List, Tuple, Set, Optional, Iterator, Any, Dict

from mypy.myunit import TestCase, SkipTestCaseException

Expand Down Expand Up @@ -53,9 +53,9 @@ def parse_test_cases(
files = [] # type: List[Tuple[str, str]] # path and contents
output_files = [] # type: List[Tuple[str, str]] # path and contents for output files
tcout = [] # type: List[str] # Regular output errors
tcout2 = [] # type: List[str] # Output errors for incremental, second run
stale_modules = None # type: Optional[Set[str]] # module names
rechecked_modules = None # type: Optional[Set[str]] # module names
tcout2 = {} # type: Dict[int, List[str]] # Output errors for incremental, runs 2+
stale_modules = {} # type: Dict[int, Set[str]] # from run number to module names
rechecked_modules = {} # type: Dict[ int, Set[str]] # from run number module names
while i < len(p) and p[i].id != 'case':
if p[i].id == 'file' or p[i].id == 'outfile':
# Record an extra file needed for the test case.
Expand All @@ -78,43 +78,57 @@ def parse_test_cases(
fnam = '__builtin__.pyi'
with open(mpath) as f:
files.append((join(base_path, fnam), f.read()))
elif p[i].id == 'stale':
elif re.match(r'stale[0-9]*$', p[i].id):
if p[i].id == 'stale':
passnum = 1
else:
passnum = int(p[i].id[len('stale'):])
assert passnum > 0
arg = p[i].arg
if arg is None:
stale_modules = set()
stale_modules[passnum] = set()
else:
stale_modules[passnum] = {item.strip() for item in arg.split(',')}
elif re.match(r'rechecked[0-9]*$', p[i].id):
if p[i].id == 'rechecked':
passnum = 1
else:
stale_modules = {item.strip() for item in arg.split(',')}
elif p[i].id == 'rechecked':
passnum = int(p[i].id[len('rechecked'):])
arg = p[i].arg
if arg is None:
rechecked_modules = set()
rechecked_modules[passnum] = set()
else:
rechecked_modules = {item.strip() for item in arg.split(',')}
rechecked_modules[passnum] = {item.strip() for item in arg.split(',')}
elif p[i].id == 'out' or p[i].id == 'out1':
tcout = p[i].data
if native_sep and os.path.sep == '\\':
tcout = [fix_win_path(line) for line in tcout]
ok = True
elif p[i].id == 'out2':
tcout2 = p[i].data
elif re.match(r'out[0-9]*$', p[i].id):
passnum = int(p[i].id[3:])
assert passnum > 1
output = p[i].data
if native_sep and os.path.sep == '\\':
tcout2 = [fix_win_path(line) for line in tcout2]
output = [fix_win_path(line) for line in output]
tcout2[passnum] = output
ok = True
else:
raise ValueError(
'Invalid section header {} in {} at line {}'.format(
p[i].id, path, p[i].line))
i += 1

if rechecked_modules is None:
# If the set of rechecked modules isn't specified, make it the same as the set of
# modules with a stale public interface.
rechecked_modules = stale_modules
if (stale_modules is not None
and rechecked_modules is not None
and not stale_modules.issubset(rechecked_modules)):
raise ValueError(
'Stale modules must be a subset of rechecked modules ({})'.format(path))
for passnum in stale_modules.keys():
if passnum not in rechecked_modules:
# If the set of rechecked modules isn't specified, make it the same as the set
# of modules with a stale public interface.
rechecked_modules[passnum] = stale_modules[passnum]
if (passnum in stale_modules
and passnum in rechecked_modules
and not stale_modules[passnum].issubset(rechecked_modules[passnum])):
raise ValueError(
('Stale modules after pass {} must be a subset of rechecked '
'modules ({}:{})').format(passnum, path, p[i0].line))

if optional_out:
ok = True
Expand All @@ -140,30 +154,32 @@ def parse_test_cases(

class DataDrivenTestCase(TestCase):
input = None # type: List[str]
output = None # type: List[str]
output = None # type: List[str] # Output for the first pass
output2 = None # type: Dict[int, List[str]] # Output for runs 2+, indexed by run number

file = ''
line = 0

# (file path, file content) tuples
files = None # type: List[Tuple[str, str]]
expected_stale_modules = None # type: Optional[Set[str]]
expected_stale_modules = None # type: Dict[int, Set[str]]
expected_rechecked_modules = None # type: Dict[int, Set[str]]

clean_up = None # type: List[Tuple[bool, str]]

def __init__(self,
name: str,
input: List[str],
output: List[str],
output2: List[str],
output2: Dict[int, List[str]],
file: str,
line: int,
lastline: int,
perform: Callable[['DataDrivenTestCase'], None],
files: List[Tuple[str, str]],
output_files: List[Tuple[str, str]],
expected_stale_modules: Optional[Set[str]],
expected_rechecked_modules: Optional[Set[str]],
expected_stale_modules: Dict[int, Set[str]],
expected_rechecked_modules: Dict[int, Set[str]],
native_sep: bool = False,
) -> None:
super().__init__(name)
Expand Down Expand Up @@ -192,9 +208,9 @@ def set_up(self) -> None:
f.write(content)
self.clean_up.append((False, path))
encountered_files.add(path)
if path.endswith(".next"):
# Make sure new files introduced in the second run are accounted for
renamed_path = path[:-5]
if re.search(r'\.[2-9]$', path):
# Make sure new files introduced in the second and later runs are accounted for
renamed_path = path[:-2]
if renamed_path not in encountered_files:
encountered_files.add(renamed_path)
self.clean_up.append((False, renamed_path))
Expand Down
103 changes: 64 additions & 39 deletions mypy/test/testcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,17 @@ def run_case(self, testcase: DataDrivenTestCase) -> None:
# Expect success on first run, errors from testcase.output (if any) on second run.
# We briefly sleep to make sure file timestamps are distinct.
self.clear_cache()
self.run_case_once(testcase, 1)
self.run_case_once(testcase, 2)
num_steps = max([2] + list(testcase.output2.keys()))
# Check that there are no file changes beyond the last run (they would be ignored).
for dn, dirs, files in os.walk(os.curdir):
for file in files:
m = re.search(r'\.([2-9])$', file)
if m and int(m.group(1)) > num_steps:
raise ValueError(
'Output file {} exists though test case only has {} runs'.format(
file, num_steps))
for step in range(1, num_steps + 1):
self.run_case_once(testcase, step)
elif optional:
experiments.STRICT_OPTIONAL = True
self.run_case_once(testcase)
Expand All @@ -118,26 +127,26 @@ def clear_cache(self) -> None:
if os.path.exists(dn):
shutil.rmtree(dn)

def run_case_once(self, testcase: DataDrivenTestCase, incremental: int = 0) -> None:
def run_case_once(self, testcase: DataDrivenTestCase, incremental_step: int = 0) -> None:
find_module_clear_caches()
original_program_text = '\n'.join(testcase.input)
module_data = self.parse_module(original_program_text, incremental)
module_data = self.parse_module(original_program_text, incremental_step)

if incremental:
if incremental == 1:
if incremental_step:
if incremental_step == 1:
# In run 1, copy program text to program file.
for module_name, program_path, program_text in module_data:
if module_name == '__main__':
with open(program_path, 'w') as f:
f.write(program_text)
break
elif incremental == 2:
# In run 2, copy *.next files to * files.
elif incremental_step > 1:
# In runs 2+, copy *.[num] files to * files.
for dn, dirs, files in os.walk(os.curdir):
for file in files:
if file.endswith('.next'):
if file.endswith('.' + str(incremental_step)):
full = os.path.join(dn, file)
target = full[:-5]
target = full[:-2]
shutil.copy(full, target)

# In some systems, mtime has a resolution of 1 second which can cause
Expand All @@ -147,12 +156,12 @@ def run_case_once(self, testcase: DataDrivenTestCase, incremental: int = 0) -> N
os.utime(target, times=(new_time, new_time))

# Parse options after moving files (in case mypy.ini is being moved).
options = self.parse_options(original_program_text, testcase, incremental)
options = self.parse_options(original_program_text, testcase, incremental_step)
options.use_builtins_fixtures = True
options.show_traceback = True
if 'optional' in testcase.file:
options.strict_optional = True
if incremental:
if incremental_step:
options.incremental = True
else:
options.cache_dir = os.devnull # Dont waste time writing cache
Expand All @@ -161,7 +170,7 @@ def run_case_once(self, testcase: DataDrivenTestCase, incremental: int = 0) -> N
for module_name, program_path, program_text in module_data:
# Always set to none so we're forced to reread the module in incremental mode
sources.append(BuildSource(program_path, module_name,
None if incremental else program_text))
None if incremental_step else program_text))
res = None
try:
res = build.build(sources=sources,
Expand All @@ -173,42 +182,51 @@ def run_case_once(self, testcase: DataDrivenTestCase, incremental: int = 0) -> N
a = normalize_error_messages(a)

# Make sure error messages match
if incremental == 0:
msg = 'Invalid type checker output ({}, line {})'
if incremental_step == 0:
# Not incremental
msg = 'Unexpected type checker output ({}, line {})'
output = testcase.output
elif incremental == 1:
msg = 'Invalid type checker output in incremental, run 1 ({}, line {})'
elif incremental_step == 1:
msg = 'Unexpected type checker output in incremental, run 1 ({}, line {})'
output = testcase.output
elif incremental == 2:
msg = 'Invalid type checker output in incremental, run 2 ({}, line {})'
output = testcase.output2
elif incremental_step > 1:
msg = ('Unexpected type checker output in incremental, run {}'.format(
incremental_step) + ' ({}, line {})')
output = testcase.output2.get(incremental_step, [])
else:
raise AssertionError()

if output != a and self.update_data:
update_testcase_output(testcase, a)
assert_string_arrays_equal(output, a, msg.format(testcase.file, testcase.line))

if incremental and res:
if incremental_step and res:
if options.follow_imports == 'normal' and testcase.output is None:
self.verify_cache(module_data, a, res.manager)
if incremental == 2:
if incremental_step > 1:
suffix = '' if incremental_step == 2 else str(incremental_step - 1)
self.check_module_equivalence(
'rechecked',
testcase.expected_rechecked_modules,
'rechecked' + suffix,
testcase.expected_rechecked_modules.get(incremental_step - 1),
res.manager.rechecked_modules)
self.check_module_equivalence(
'stale',
testcase.expected_stale_modules,
'stale' + suffix,
testcase.expected_stale_modules.get(incremental_step - 1),
res.manager.stale_modules)

def check_module_equivalence(self, name: str,
expected: Optional[Set[str]], actual: Set[str]) -> None:
if expected is not None:
expected_normalized = sorted(expected)
actual_normalized = sorted(actual.difference({"__main__"}))
assert_string_arrays_equal(
list(sorted(expected)),
list(sorted(actual.difference({"__main__"}))),
'Set of {} modules does not match expected set'.format(name))
expected_normalized,
actual_normalized,
('Actual modules ({}) do not match expected modules ({}) '
'for "[{} ...]"').format(
', '.join(actual_normalized),
', '.join(expected_normalized),
name))

def verify_cache(self, module_data: List[Tuple[str, str, str]], a: List[str],
manager: build.BuildManager) -> None:
Expand Down Expand Up @@ -268,7 +286,9 @@ def find_missing_cache_files(self, modules: Dict[str, str],
missing[id] = path
return set(missing.values())

def parse_module(self, program_text: str, incremental: int = 0) -> List[Tuple[str, str, str]]:
def parse_module(self,
program_text: str,
incremental_step: int = 0) -> List[Tuple[str, str, str]]:
"""Return the module and program names for a test case.
Normally, the unit tests will parse the default ('__main__')
Expand All @@ -278,15 +298,19 @@ def parse_module(self, program_text: str, incremental: int = 0) -> List[Tuple[st
# cmd: mypy -m foo.bar foo.baz
You can also use `# cmdN:` to have a different cmd for incremental
step N (2, 3, ...).
Return a list of tuples (module name, file name, program text).
"""
m = re.search('# cmd: mypy -m ([a-zA-Z0-9_. ]+)$', program_text, flags=re.MULTILINE)
m2 = re.search('# cmd2: mypy -m ([a-zA-Z0-9_. ]+)$', program_text, flags=re.MULTILINE)
if m2 is not None and incremental == 2:
# Optionally return a different command if in the second
# stage of incremental mode, otherwise default to reusing
# the original cmd.
m = m2
regex = '# cmd{}: mypy -m ([a-zA-Z0-9_. ]+)$'.format(incremental_step)
alt_m = re.search(regex, program_text, flags=re.MULTILINE)
if alt_m is not None and incremental_step > 1:
# Optionally return a different command if in a later step
# of incremental mode, otherwise default to reusing the
# original cmd.
m = alt_m

if m:
# The test case wants to use a non-default main
Expand All @@ -304,11 +328,12 @@ def parse_module(self, program_text: str, incremental: int = 0) -> List[Tuple[st
return [('__main__', 'main', program_text)]

def parse_options(self, program_text: str, testcase: DataDrivenTestCase,
incremental: int) -> Options:
incremental_step: int) -> Options:
options = Options()
flags = re.search('# flags: (.*)$', program_text, flags=re.MULTILINE)
if incremental == 2:
flags2 = re.search('# flags2: (.*)$', program_text, flags=re.MULTILINE)
if incremental_step > 1:
flags2 = re.search('# flags{}: (.*)$'.format(incremental_step), program_text,
flags=re.MULTILINE)
if flags2:
flags = flags2

Expand Down
Loading

0 comments on commit 03f9521

Please sign in to comment.