Skip to content

Commit

Permalink
Don't call extend_report_with_coverage_gains in apply_async callback.
Browse files Browse the repository at this point in the history
Per
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async,
callbacks should return immediately or they will otherwise block the
entire Pool from making progress.

For large experiments, this is likely causing problems causing our
throughput to decrease as the experiment runs.

From debugging with GDB on
#692, it looks like a large
number of worker processes are stuck waiting to report results:

```
(gdb) py-bt
Traceback (most recent call first):
  File "/usr/lib/python3.11/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/usr/lib/python3.11/multiprocessing/queues.py", line 376, in put
    with self._wlock:
  File "/usr/lib/python3.11/multiprocessing/pool.py", line 131, in worker
    put((job, i, result))
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
```

This partially reverts #566.
We instead just create a new sub-process to periodically call this in the
background to avoid blocking anything.
  • Loading branch information
oliverchang committed Nov 8, 2024
1 parent 6c04059 commit c9c98ea
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions run_all_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import time
import traceback
from datetime import timedelta
from multiprocessing import Pool
from multiprocessing import Pool, Process
from typing import Any

import run_one_experiment
Expand Down Expand Up @@ -60,8 +60,6 @@
LOG_FMT = ('%(asctime)s.%(msecs)03d %(levelname)s '
'%(module)s - %(funcName)s: %(message)s')

EXPERIMENT_RESULTS = []


class Result:
benchmark: benchmarklib.Benchmark
Expand Down Expand Up @@ -335,15 +333,23 @@ def extend_report_with_coverage_gains() -> None:
comparative_cov_gains)


def _print_and_dump_experiment_result(result: Result):
def extend_report_with_coverage_gains_process():
"""A process that continuously runs to update coverage gains in the background."""
while True:
time.sleep(120) # 2 minutes.
try:
extend_report_with_coverage_gains()
except Exception:
logger.error('Failed to extend report with coverage gains')
traceback.print_exc()


def _print_experiment_result(result: Result):
"""Prints the |result| of a single experiment."""
logger.info('\n**** Finished benchmark %s, %s ****\n%s',
result.benchmark.project, result.benchmark.function_signature,
result.result)

EXPERIMENT_RESULTS.append(result)
extend_report_with_coverage_gains()


def _print_experiment_results(results: list[Result],
cov_gain: dict[str, dict[str, Any]]):
Expand Down Expand Up @@ -503,7 +509,7 @@ def _process_total_coverage_gain() -> dict[str, dict[str, Any]]:


def main():
global WORK_DIR, EXPERIMENT_RESULTS
global WORK_DIR

args = parse_args()
_setup_logging(args.log_level)
Expand All @@ -528,27 +534,38 @@ def main():
len(experiment_targets), str(NUM_EXP))

# Set global variables that are updated throughout experiment runs.
EXPERIMENT_RESULTS = []
WORK_DIR = args.work_dir

coverage_gains_process = Process(
target=extend_report_with_coverage_gains_process)
coverage_gains_process.start()

experiment_results = []
if NUM_EXP == 1:
for target_benchmark in experiment_targets:
result = run_experiments(target_benchmark, args)
_print_and_dump_experiment_result(result)
_print_experiment_result(result)
experiment_results.append(result)
else:
experiment_tasks = []
with Pool(NUM_EXP, maxtasksperchild=1) as p:
for target_benchmark in experiment_targets:
experiment_task = p.apply_async(
run_experiments, (target_benchmark, args),
callback=_print_and_dump_experiment_result)
callback=_print_experiment_result)
experiment_tasks.append(experiment_task)
time.sleep(args.delay)

experiment_results = [task.get() for task in experiment_tasks]

# Signal that no more work will be submitte to the pool.
p.close()

# Wait for all workers to complete.
p.join()

# Do a final coverage aggregation.
coverage_gains_process.kill()
extend_report_with_coverage_gains()

# Capture time at end
Expand All @@ -559,7 +576,7 @@ def main():
str(timedelta(seconds=end - start)))

coverage_gain_dict = _process_total_coverage_gain()
_print_experiment_results(EXPERIMENT_RESULTS, coverage_gain_dict)
_print_experiment_results(experiment_results, coverage_gain_dict)


if __name__ == '__main__':
Expand Down

0 comments on commit c9c98ea

Please sign in to comment.