Skip to content

Commit

Permalink
Log popen stdout/err when subprocess times out (#6567)
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 authored Jun 15, 2022
1 parent 085f300 commit bc90846
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 14 deletions.
12 changes: 4 additions & 8 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
inc,
map_varying,
nodebug,
popen,
pristine_loop,
randominc,
save_sys_modules,
Expand Down Expand Up @@ -7508,7 +7509,7 @@ async def test_wait_for_workers_updates_info(c, s):
client_script = """
from dask.distributed import Client
if __name__ == "__main__":
client = Client(processes=%s, n_workers=1)
client = Client(processes=%s, n_workers=1, scheduler_port=0, dashboard_address=":0")
"""


Expand All @@ -7517,13 +7518,8 @@ def test_quiet_close_process(processes, tmp_path):
with open(tmp_path / "script.py", mode="w") as f:
f.write(client_script % processes)

proc = subprocess.Popen(
[sys.executable, tmp_path / "script.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

out, err = proc.communicate(timeout=10)
with popen([sys.executable, tmp_path / "script.py"], capture_output=True) as proc:
out, err = proc.communicate(timeout=10)

assert not out
assert not err
53 changes: 53 additions & 0 deletions distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pathlib
import signal
import socket
import subprocess
import sys
import textwrap
import threading
Expand Down Expand Up @@ -822,6 +823,58 @@ def cb(signum, frame):
# `subprocess.TimeoutExpired` if this test breaks.


def test_popen_timeout(capsys: pytest.CaptureFixture):
with pytest.raises(subprocess.TimeoutExpired):
with popen(
[
sys.executable,
"-c",
textwrap.dedent(
"""
import signal
import sys
import time
if sys.platform == "win32":
signal.signal(signal.SIGBREAK, signal.default_int_handler)
# ^ Cause `CTRL_BREAK_EVENT` on Windows to raise `KeyboardInterrupt`
print('ready', flush=True)
while True:
try:
time.sleep(0.1)
print("slept", flush=True)
except KeyboardInterrupt:
print("interrupted", flush=True)
"""
),
],
capture_output=True,
terminate_timeout=1,
) as proc:
assert proc.stdout
assert proc.stdout.readline().strip() == b"ready"
# Exiting contextmanager sends SIGINT, waits 1s for shutdown.
# Our script ignores SIGINT, so after 1s it sends SIGKILL.
# The contextmanager raises `TimeoutExpired` once the process is killed,
# because it failed the 1s timeout
captured = capsys.readouterr()
assert "stdout: returncode" in captured.out
assert "interrupted" in captured.out
assert "slept" in captured.out


def test_popen_always_prints_output(capsys: pytest.CaptureFixture):
# We always print stdout even if there was no error, in case some other assertion
# later in the test fails and the output would be useful.
with popen([sys.executable, "-c", "print('foo')"], capture_output=True) as proc:
proc.communicate(timeout=5)

captured = capsys.readouterr()
assert "stdout: returncode 0" in captured.out
assert "foo" in captured.out


@gen_test()
async def test_freeze_batched_send():
async with EchoServer() as e:
Expand Down
38 changes: 32 additions & 6 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,14 +1286,14 @@ def raises(func, exc=Exception):
return True


def _terminate_process(proc: subprocess.Popen) -> None:
def _terminate_process(proc: subprocess.Popen, terminate_timeout: float) -> None:
if proc.poll() is None:
if sys.platform.startswith("win"):
proc.send_signal(signal.CTRL_BREAK_EVENT)
else:
proc.send_signal(signal.SIGINT)
try:
proc.communicate(timeout=30)
proc.communicate(timeout=terminate_timeout)
finally:
# Make sure we don't leave the process lingering around
with suppress(OSError):
Expand All @@ -1302,7 +1302,11 @@ def _terminate_process(proc: subprocess.Popen) -> None:

@contextmanager
def popen(
args: list[str], capture_output: bool = False, **kwargs
args: list[str],
capture_output: bool = False,
terminate_timeout: float = 30,
kill_timeout: float = 10,
**kwargs,
) -> Iterator[subprocess.Popen[bytes]]:
"""Start a shell command in a subprocess.
Yields a subprocess.Popen object.
Expand All @@ -1328,6 +1332,21 @@ def popen(
Note that ``proc.communicate`` is called automatically when the
contextmanager exits. Calling code must not call ``proc.communicate``
in a separate thread, since it's not thread-safe.
When captured, the stdout/stderr of the process is always printed
when the process exits for easier test debugging.
terminate_timeout: optional, default 30
When the contextmanager exits, SIGINT is sent to the subprocess.
``terminate_timeout`` sets how many seconds to wait for the subprocess
to terminate after that. If the timeout expires, SIGKILL is sent to
the subprocess (which cannot be blocked); see ``kill_timeout``.
If this timeout expires, `subprocess.TimeoutExpired` is raised.
kill_timeout: optional, default 10
When the contextmanger exits, if the subprocess does not shut down
after ``terminate_timeout`` seconds in response to SIGINT, SIGKILL
is sent to the subprocess (which cannot be blocked). ``kill_timeout``
controls how long to wait after SIGKILL to join the process.
If this timeout expires, `subprocess.TimeoutExpired` is raised.
kwargs: optional
optional arguments to subprocess.Popen
"""
Expand All @@ -1349,9 +1368,16 @@ def popen(
try:
yield proc
finally:
_terminate_process(proc)
out, err = proc.communicate()
assert not err
try:
_terminate_process(proc, terminate_timeout)
finally:
out, err = proc.communicate(timeout=kill_timeout)
if out:
print(f"------ stdout: returncode {proc.returncode}, {args} ------")
print(out.decode() if isinstance(out, bytes) else out)
if err:
print(f"------ stderr: returncode {proc.returncode}, {args} ------")
print(err.decode() if isinstance(err, bytes) else err)


def wait_for(predicate, timeout, fail_func=None, period=0.05):
Expand Down

0 comments on commit bc90846

Please sign in to comment.