-
Notifications
You must be signed in to change notification settings - Fork 2
/
command_comparer.py
459 lines (327 loc) · 14.1 KB
/
command_comparer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
import copy
import csv
import os
import random
import string
import subprocess
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Mapping
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import timedelta
from pathlib import Path
from timeit import timeit
from typing import Tuple, Sequence, Callable, Optional, Iterable, Union
# TODO: remove this
def _lazy_repr(self):
return self.__str__()
DISPLAY_WIDTH = 120
class TerminalStyleCode:
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
END_CODE = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def print_error(s):
print(f"{TerminalStyleCode.FAIL}{s}{TerminalStyleCode.END_CODE}")
def print_warning(s):
print(f"{TerminalStyleCode.WARNING}{s}{TerminalStyleCode.END_CODE}")
class RepoSpec:
def __init__(self, name: str, *sub_directories: str):
self.name = name
self.sub_directories = tuple(Path(sub_directory)
for sub_directory in sub_directories)
def with_base_root(self, base_root: Path):
return RootedRepo(base_root, self)
def __str__(self):
return f'{self.name}, {self.sub_directories}'
def __repr__(self):
return _lazy_repr(self)
class RootedRepo:
def __init__(self, root: Path, repo: RepoSpec):
self.repo_spec = repo
self.root = root.joinpath(repo.name).resolve(strict=True)
self.sub_directories = tuple(
self.root / relative_sub_directory for relative_sub_directory in repo.sub_directories)
def __str__(self):
return f'{self.root.parent}, {self.root.name}, {[sub_dir.relative_to(self.root) for sub_dir in self.sub_directories]}'
def __repr__(self):
return _lazy_repr(self)
class CommandValidator(ABC):
@abstractmethod
def validate(self, command_stdout: str) -> bool:
...
class Func(CommandValidator):
"""
Should be stateless. Can be invoked multiple times for different commands.
"""
def __init__(self, func: Callable[[str], bool], description: str):
self.func = func
self.description = description
def validate(self, command_stdout) -> bool:
return self.func(command_stdout)
def __str__(self):
return self.description
class Include(CommandValidator):
def __init__(self, include_string: str):
self.include_string = include_string
def validate(self, command_stdout: str) -> bool:
return self.include_string in command_stdout
def __str__(self):
return f"Include({self.include_string})"
class Exclude(CommandValidator):
def __init__(self, exclude_string: str):
self.exclude_string = exclude_string
def validate(self, command_stdout: str) -> bool:
return self.exclude_string not in command_stdout
def __str__(self):
return f"Exclude({self.exclude_string})"
class ValidationException(Exception):
...
class Command(ABC):
def __init__(self, validation_checks: Optional[Sequence[CommandValidator]] = None):
self.working_directory = Path.cwd()
self.validation_checks = validation_checks if validation_checks else []
# gets set after command is run
self.captured_output = None
def run(self):
command_representation = str(self)
print(command_representation)
saved_cwd = Path.cwd()
try:
os.chdir(self.working_directory)
self.captured_output = self._invoke()
except subprocess.CalledProcessError as e:
print("\n[FAILED COMMAND]\n" + command_representation)
raise e
finally:
os.chdir(saved_cwd)
def validate(self):
assert self.captured_output is not None, "Command must be run before it can be validated"
output_str = self.captured_output if type(self.captured_output) is str else self.captured_output.decode('utf-8')
for validator in self.validation_checks:
if not validator.validate(output_str):
# TODO: this is too early to decide how to handle failed validations. It should be handled higher up.
print(output_str)
# trigger an exception to stop the tests
raise ValidationException(f"Validation failed: {str(validator)}")
@abstractmethod
def _invoke(self) -> Union[str, bytes]:
"""
Executes the command.
:returns: The captured output from the command. Can be empty string if no output gets produced.
"""
...
def with_working_directory(self, working_directory: Path) -> 'Command':
clone = copy.deepcopy(self)
clone.working_directory = working_directory
return clone
def add_validation_checks(self, validation_checks: Sequence[CommandValidator]) -> 'Command':
clone = copy.deepcopy(self)
clone.validation_checks = [*self.validation_checks, *list(validation_checks)]
return clone
def __str__(self):
return f'{self.working_directory} > '
class NullCommand(Command):
def __init__(self):
super().__init__()
def _invoke(self):
return ""
def __str__(self):
return super(NullCommand, self).__str__() + "NullCommand"
class Commands(Command):
def __init__(self, *args: Command):
super().__init__()
self.commands = copy.deepcopy(args)
def _invoke(self):
for command in self.commands:
command.run()
return ""
def validate(self):
for command in self.commands:
command.validate()
def with_working_directory(self, working_directory: Path):
self.commands = tuple(command.with_working_directory(working_directory)
for command in self.commands)
return self
def add_validation_checks(self, validation_checks: Sequence[CommandValidator]) -> 'Command':
self.commands = tuple(command.add_validation_checks(validation_checks)
for command in self.commands)
return self
def __str__(self):
return f"Composite({len(self.commands)})"
class ProcessCommand(Command):
def __init__(self, *args: str, validation_checks: Optional[Sequence[CommandValidator]] = None):
super().__init__(validation_checks)
self.args = copy.copy(args)
def _invoke(self):
completed_process = subprocess.run(
self.args,
check=True,
capture_output=True
)
assert completed_process.returncode == 0, f"Non-zero return code for: {str(self)}"
return completed_process.stdout
def __str__(self):
return super(ProcessCommand, self).__str__() + " ".join(self.args)
class PowershellCommand(ProcessCommand):
def __init__(self, *args: str, validation_checks: Optional[Sequence[CommandValidator]] = None):
super().__init__("powershell", "-nologo", "-noprofile", "-noninteractive", "-c",
*args,
validation_checks=validation_checks)
@dataclass(frozen=True)
class TestResult:
name: str
time_delta: timedelta
# todo: don't make it optional and implement all scenarios that pass None
command: Optional[Command]
@dataclass(frozen=True)
class Test:
name: str
test_command: Command
repo_root_setup_command: Command = field(default=NullCommand())
setup_command: Command = field(default=NullCommand())
def run(self, repo_root: Optional[Path] = None, working_directory: Optional[Path] = None) -> TestResult:
print(self.name.center(DISPLAY_WIDTH, "_"))
repo_root = repo_root or Path.cwd()
working_directory = working_directory or Path.cwd()
try:
root_setup_command = self.repo_root_setup_command.with_working_directory(repo_root)
root_setup_command.run()
root_setup_command.validate()
setup_command = self.setup_command.with_working_directory(working_directory)
setup_command.run()
setup_command.validate()
test_command = self.test_command.with_working_directory(
working_directory)
runtime_in_seconds = timeit(lambda: test_command.run(), number=1)
test_command.validate()
return TestResult(self.name, (timedelta(seconds=runtime_in_seconds)), test_command)
except Exception:
print(f"\n[Failed test] {self.name}")
raise
@dataclass(frozen=True)
class TestSuiteResult:
name: str
test_results: Tuple[TestResult, ...]
@dataclass(frozen=True)
class TestSuite:
name: str
tests: Sequence[Test]
environment_variables: Mapping[str, str] = os.environ
def run(self, repo_root: Optional[Path] = None, working_directory: Optional[Path] = None) -> TestSuiteResult:
print()
print(self.name.center(DISPLAY_WIDTH, "="))
with environment_variables(**self.environment_variables):
try:
test_results = [test.run(repo_root, working_directory)
for test in self.tests]
return TestSuiteResult(self.name, tuple(test_results))
except Exception:
print(f"\n[Failed TestSuite] {self.name}")
print()
raise
@dataclass(frozen=True)
class RepoResults:
name: str
test_suite_results: Tuple[TestSuiteResult, ...]
@contextmanager
def environment_variables(**kwargs):
original_environment = os.environ.copy()
try:
os.environ |= kwargs
yield
finally:
os.environ.clear()
os.environ |= original_environment
def test_suite_repeater(test_suite_runner: Callable[[], TestSuiteResult], repetitions: int) -> TestSuiteResult:
"""
Run the test suite multiple times and merge the multiple TestSuiteResults back into a single TestSuiteResult
"""
def mergeTestResults(test_results: tuple[str, Sequence[TestResult]]) -> TestResult:
name, tests = test_results
assert all(name == test.name for test in tests)
average_time = sum((test.time_delta for test in tests),
timedelta(0)) / repetitions
return TestResult(name, average_time, None)
test_results_per_name = defaultdict(list)
test_suite_name = None
for repetition in range(repetitions):
try:
print()
print(f"Repetition {repetition}".center(DISPLAY_WIDTH, "+"))
test_suite_result: TestSuiteResult = test_suite_runner()
assert test_suite_name is None or test_suite_name == test_suite_result.name
test_suite_name = test_suite_result.name
for test_result in test_suite_result.test_results:
test_results_per_name[test_result.name].append(test_result)
except Exception:
print(f"\n[Failed Repetition] {repetition}")
print()
raise
assert test_suite_name is not None
assert all(len(test_results) ==
repetitions for test_results in test_results_per_name.values())
return TestSuiteResult(test_suite_name,
tuple(mergeTestResults(test_results) for test_results in test_results_per_name.items()))
def run_tests(repos: Sequence[RepoSpec], repos_root: Path, test_suites: Sequence[TestSuite], repetitions: int = 1) -> \
Sequence[RepoResults]:
assert repos_root.exists() and repos_root.is_dir()
assert len(repos) > 0
assert len(test_suites) > 0
rooted_repos = [repo.with_base_root(repos_root) for repo in repos]
repo_results = []
for repo in rooted_repos:
for repo_subdir in repo.sub_directories:
sub_dir_pretty_name = str(repo_subdir.relative_to(repos_root))
print("".center(DISPLAY_WIDTH, "▇"))
print(sub_dir_pretty_name.center(DISPLAY_WIDTH, "▇"))
print("".center(DISPLAY_WIDTH, "▇"))
test_suite_results = []
for test_suite in test_suites:
result = test_suite_repeater(lambda: test_suite.run(
repo.root, repo_subdir), repetitions)
test_suite_results.append(result)
repo_results.append(RepoResults(
sub_dir_pretty_name, tuple(test_suite_results)))
assert len(repo_results) > 0
return repo_results
def random_string(size) -> str:
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=size))
def write_results_to_csv(repo_results: Sequence[RepoResults], results_file: os.PathLike):
"""
Prints multiple RepoResults to csv file.
Each line contains the test results for one repo subdirectory.
Each column represents the test results of a single test across all repo subdirectories.
The layout is as follows:
repo | <test suite name>_<test name> | ...
<repo name>_<subdir> | time in seconds | ...
"""
def write_to_file(file_path, header, rows):
with open(file_path, 'w', newline="") as f:
writer = csv.writer(f)
writer.writerow(header)
writer.writerows(rows)
header = ["repo"] + [f"{test_suite_result.name}_{test_result.name}"
for test_suite_result in repo_results[0].test_suite_results
for test_result in test_suite_result.test_results]
# each line has the results for one repo
rows = [[repo_result.name] + [str(test_result.time_delta.total_seconds())
for test_suite_result in repo_result.test_suite_results
for test_result in test_suite_result.test_results]
for repo_result in repo_results]
results_file_path = Path(results_file)
results_file_path.resolve()
try:
write_to_file(results_file_path, header, rows)
except Exception as e:
print_error(e)
fallback_file_name = f"Results_{random_string(16)}.csv"
print_warning(f"Couldn't write to file. Trying to write to {fallback_file_name} as a fallback.")
write_to_file(Path(fallback_file_name).resolve(), header, rows)
print(f"Wrote results to: {results_file_path}")