From bc90846539f73952013ac178e63c9c6dbdbd58da Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 15 Jun 2022 07:32:04 -0600 Subject: [PATCH] Log popen stdout/err when subprocess times out (#6567) --- distributed/tests/test_client.py | 12 +++---- distributed/tests/test_utils_test.py | 53 ++++++++++++++++++++++++++++ distributed/utils_test.py | 38 ++++++++++++++++---- 3 files changed, 89 insertions(+), 14 deletions(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index f1d5dba234..eed2a03a61 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -90,6 +90,7 @@ inc, map_varying, nodebug, + popen, pristine_loop, randominc, save_sys_modules, @@ -7508,7 +7509,7 @@ async def test_wait_for_workers_updates_info(c, s): client_script = """ from dask.distributed import Client if __name__ == "__main__": - client = Client(processes=%s, n_workers=1) + client = Client(processes=%s, n_workers=1, scheduler_port=0, dashboard_address=":0") """ @@ -7517,13 +7518,8 @@ def test_quiet_close_process(processes, tmp_path): with open(tmp_path / "script.py", mode="w") as f: f.write(client_script % processes) - proc = subprocess.Popen( - [sys.executable, tmp_path / "script.py"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - - out, err = proc.communicate(timeout=10) + with popen([sys.executable, tmp_path / "script.py"], capture_output=True) as proc: + out, err = proc.communicate(timeout=10) assert not out assert not err diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 78523561d9..3b5a21ed1f 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -3,6 +3,7 @@ import pathlib import signal import socket +import subprocess import sys import textwrap import threading @@ -822,6 +823,58 @@ def cb(signum, frame): # `subprocess.TimeoutExpired` if this test breaks. +def test_popen_timeout(capsys: pytest.CaptureFixture): + with pytest.raises(subprocess.TimeoutExpired): + with popen( + [ + sys.executable, + "-c", + textwrap.dedent( + """ + import signal + import sys + import time + + if sys.platform == "win32": + signal.signal(signal.SIGBREAK, signal.default_int_handler) + # ^ Cause `CTRL_BREAK_EVENT` on Windows to raise `KeyboardInterrupt` + + print('ready', flush=True) + while True: + try: + time.sleep(0.1) + print("slept", flush=True) + except KeyboardInterrupt: + print("interrupted", flush=True) + """ + ), + ], + capture_output=True, + terminate_timeout=1, + ) as proc: + assert proc.stdout + assert proc.stdout.readline().strip() == b"ready" + # Exiting contextmanager sends SIGINT, waits 1s for shutdown. + # Our script ignores SIGINT, so after 1s it sends SIGKILL. + # The contextmanager raises `TimeoutExpired` once the process is killed, + # because it failed the 1s timeout + captured = capsys.readouterr() + assert "stdout: returncode" in captured.out + assert "interrupted" in captured.out + assert "slept" in captured.out + + +def test_popen_always_prints_output(capsys: pytest.CaptureFixture): + # We always print stdout even if there was no error, in case some other assertion + # later in the test fails and the output would be useful. + with popen([sys.executable, "-c", "print('foo')"], capture_output=True) as proc: + proc.communicate(timeout=5) + + captured = capsys.readouterr() + assert "stdout: returncode 0" in captured.out + assert "foo" in captured.out + + @gen_test() async def test_freeze_batched_send(): async with EchoServer() as e: diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 42e90de0eb..c4d25455fd 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1286,14 +1286,14 @@ def raises(func, exc=Exception): return True -def _terminate_process(proc: subprocess.Popen) -> None: +def _terminate_process(proc: subprocess.Popen, terminate_timeout: float) -> None: if proc.poll() is None: if sys.platform.startswith("win"): proc.send_signal(signal.CTRL_BREAK_EVENT) else: proc.send_signal(signal.SIGINT) try: - proc.communicate(timeout=30) + proc.communicate(timeout=terminate_timeout) finally: # Make sure we don't leave the process lingering around with suppress(OSError): @@ -1302,7 +1302,11 @@ def _terminate_process(proc: subprocess.Popen) -> None: @contextmanager def popen( - args: list[str], capture_output: bool = False, **kwargs + args: list[str], + capture_output: bool = False, + terminate_timeout: float = 30, + kill_timeout: float = 10, + **kwargs, ) -> Iterator[subprocess.Popen[bytes]]: """Start a shell command in a subprocess. Yields a subprocess.Popen object. @@ -1328,6 +1332,21 @@ def popen( Note that ``proc.communicate`` is called automatically when the contextmanager exits. Calling code must not call ``proc.communicate`` in a separate thread, since it's not thread-safe. + + When captured, the stdout/stderr of the process is always printed + when the process exits for easier test debugging. + terminate_timeout: optional, default 30 + When the contextmanager exits, SIGINT is sent to the subprocess. + ``terminate_timeout`` sets how many seconds to wait for the subprocess + to terminate after that. If the timeout expires, SIGKILL is sent to + the subprocess (which cannot be blocked); see ``kill_timeout``. + If this timeout expires, `subprocess.TimeoutExpired` is raised. + kill_timeout: optional, default 10 + When the contextmanger exits, if the subprocess does not shut down + after ``terminate_timeout`` seconds in response to SIGINT, SIGKILL + is sent to the subprocess (which cannot be blocked). ``kill_timeout`` + controls how long to wait after SIGKILL to join the process. + If this timeout expires, `subprocess.TimeoutExpired` is raised. kwargs: optional optional arguments to subprocess.Popen """ @@ -1349,9 +1368,16 @@ def popen( try: yield proc finally: - _terminate_process(proc) - out, err = proc.communicate() - assert not err + try: + _terminate_process(proc, terminate_timeout) + finally: + out, err = proc.communicate(timeout=kill_timeout) + if out: + print(f"------ stdout: returncode {proc.returncode}, {args} ------") + print(out.decode() if isinstance(out, bytes) else out) + if err: + print(f"------ stderr: returncode {proc.returncode}, {args} ------") + print(err.decode() if isinstance(err, bytes) else err) def wait_for(predicate, timeout, fail_func=None, period=0.05):