Skip to content

Commit

Permalink
Merge pull request #2193 from tjstum/noleak
Browse files Browse the repository at this point in the history
  • Loading branch information
Fuyukai authored Jan 7, 2022
2 parents 7f44dcf + d03fd15 commit 7b0001d
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 45 deletions.
3 changes: 3 additions & 0 deletions newsfragments/2193.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Trio now deterministically cleans up file descriptors that were opened before
subprocess creation fails. Previously, they would remain open until the next run of
the garbage collector.
35 changes: 35 additions & 0 deletions newsfragments/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
This directory collects "newsfragments": short files that each contain
a snippet of ReST-formatted text that will be added to the next
release notes. This should be a description of aspects of the change
(if any) that are relevant to users. (This contrasts with your commit
message and PR description, which are a description of the change as
relevant to people working on the code itself.)

Each file should be named like ``<ISSUE>.<TYPE>.rst``, where
``<ISSUE>`` is an issue number, and ``<TYPE>`` is one of:

* ``headline``: a major new feature we want to highlight for users
* ``breaking``: any breaking changes that happen without a proper
deprecation period (note: deprecations, and removal of previously
deprecated features after an appropriate time, go in the
``deprecated`` category instead)
* ``feature``: any new feature that doesn't qualify for ``headline``
* ``bugfix``
* ``doc``
* ``deprecated``
* ``misc``

So for example: ``123.headline.rst``, ``456.bugfix.rst``,
``789.deprecated.rst``

If your PR fixes an issue, use that number here. If there is no issue,
then after you submit the PR and get the PR number you can add a
newsfragment using that instead.

Your text can use all the same markup that we use in our Sphinx docs.
For example, you can use double-backticks to mark code snippets, or
single-backticks to link to a function/class/module.

To check how your formatting looks, the easiest way is to make the PR,
and then after the CI checks run, click on the "Read the Docs build"
details link, and navigate to the release history page.
66 changes: 35 additions & 31 deletions trio/_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import subprocess
import sys
from contextlib import ExitStack
from typing import Optional
from functools import partial
import warnings
Expand All @@ -28,6 +29,7 @@
def pidfd_open(fd: int, flags: int) -> int:
...

from ._subprocess_platform import ClosableReceiveStream, ClosableSendStream

else:
can_try_pidfd_open = True
Expand Down Expand Up @@ -359,28 +361,38 @@ async def open_process(
"on UNIX systems"
)

trio_stdin = None # type: Optional[SendStream]
trio_stdout = None # type: Optional[ReceiveStream]
trio_stderr = None # type: Optional[ReceiveStream]

if stdin == subprocess.PIPE:
trio_stdin, stdin = create_pipe_to_child_stdin()
if stdout == subprocess.PIPE:
trio_stdout, stdout = create_pipe_from_child_output()
if stderr == subprocess.STDOUT:
# If we created a pipe for stdout, pass the same pipe for
# stderr. If stdout was some non-pipe thing (DEVNULL or a
# given FD), pass the same thing. If stdout was passed as
# None, keep stderr as STDOUT to allow subprocess to dup
# our stdout. Regardless of which of these is applicable,
# don't create a new Trio stream for stderr -- if stdout
# is piped, stderr will be intermixed on the stdout stream.
if stdout is not None:
stderr = stdout
elif stderr == subprocess.PIPE:
trio_stderr, stderr = create_pipe_from_child_output()
trio_stdin = None # type: Optional[ClosableSendStream]
trio_stdout = None # type: Optional[ClosableReceiveStream]
trio_stderr = None # type: Optional[ClosableReceiveStream]
# Close the parent's handle for each child side of a pipe; we want the child to
# have the only copy, so that when it exits we can read EOF on our side. The
# trio ends of pipes will be transferred to the Process object, which will be
# responsible for their lifetime. If process spawning fails, though, we still
# want to close them before letting the failure bubble out
with ExitStack() as always_cleanup, ExitStack() as cleanup_on_fail:
if stdin == subprocess.PIPE:
trio_stdin, stdin = create_pipe_to_child_stdin()
always_cleanup.callback(os.close, stdin)
cleanup_on_fail.callback(trio_stdin.close)
if stdout == subprocess.PIPE:
trio_stdout, stdout = create_pipe_from_child_output()
always_cleanup.callback(os.close, stdout)
cleanup_on_fail.callback(trio_stdout.close)
if stderr == subprocess.STDOUT:
# If we created a pipe for stdout, pass the same pipe for
# stderr. If stdout was some non-pipe thing (DEVNULL or a
# given FD), pass the same thing. If stdout was passed as
# None, keep stderr as STDOUT to allow subprocess to dup
# our stdout. Regardless of which of these is applicable,
# don't create a new Trio stream for stderr -- if stdout
# is piped, stderr will be intermixed on the stdout stream.
if stdout is not None:
stderr = stdout
elif stderr == subprocess.PIPE:
trio_stderr, stderr = create_pipe_from_child_output()
always_cleanup.callback(os.close, stderr)
cleanup_on_fail.callback(trio_stderr.close)

try:
popen = await trio.to_thread.run_sync(
partial(
subprocess.Popen,
Expand All @@ -391,16 +403,8 @@ async def open_process(
**options,
)
)
finally:
# Close the parent's handle for each child side of a pipe;
# we want the child to have the only copy, so that when
# it exits we can read EOF on our side.
if trio_stdin is not None:
os.close(stdin)
if trio_stdout is not None:
os.close(stdout)
if trio_stderr is not None:
os.close(stderr)
# We did not fail, so dismiss the stack for the trio ends
cleanup_on_fail.pop_all()

return Process._create(popen, trio_stdin, trio_stdout, trio_stderr)

Expand Down
15 changes: 13 additions & 2 deletions trio/_subprocess_platform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@
_create_child_pipe_error: Optional[ImportError] = None


if TYPE_CHECKING:
# internal types for the pipe representations used in type checking only
class ClosableSendStream(SendStream):
def close(self) -> None:
...

class ClosableReceiveStream(ReceiveStream):
def close(self) -> None:
...


# Fallback versions of the functions provided -- implementations
# per OS are imported atop these at the bottom of the module.
async def wait_child_exiting(process: "_subprocess.Process") -> None:
Expand All @@ -30,7 +41,7 @@ async def wait_child_exiting(process: "_subprocess.Process") -> None:
raise NotImplementedError from _wait_child_exiting_error # pragma: no cover


def create_pipe_to_child_stdin() -> Tuple[SendStream, int]:
def create_pipe_to_child_stdin() -> Tuple["ClosableSendStream", int]:
"""Create a new pipe suitable for sending data from this
process to the standard input of a child we're about to spawn.
Expand All @@ -43,7 +54,7 @@ def create_pipe_to_child_stdin() -> Tuple[SendStream, int]:
raise NotImplementedError from _create_child_pipe_error # pragma: no cover


def create_pipe_from_child_output() -> Tuple[ReceiveStream, int]:
def create_pipe_from_child_output() -> Tuple["ClosableReceiveStream", int]:
"""Create a new pipe suitable for receiving data into this
process from the standard output or error stream of a child
we're about to spawn.
Expand Down
9 changes: 6 additions & 3 deletions trio/_unix_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ def _raw_close(self):
def __del__(self):
self._raw_close()

async def aclose(self):
def close(self):
if not self.closed:
trio.lowlevel.notify_closing(self.fd)
self._raw_close()
await trio.lowlevel.checkpoint()


class FdStream(Stream, metaclass=Final):
Expand Down Expand Up @@ -180,8 +179,12 @@ async def receive_some(self, max_bytes=None) -> bytes:

return data

def close(self):
self._fd_holder.close()

async def aclose(self):
await self._fd_holder.aclose()
self.close()
await trio.lowlevel.checkpoint()

def fileno(self):
return self._fd_holder.fd
20 changes: 12 additions & 8 deletions trio/_windows_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,16 @@ def __init__(self, handle: int) -> None:
def closed(self):
return self.handle == -1

def _close(self):
def close(self):
if self.closed:
return
handle = self.handle
self.handle = -1
if not kernel32.CloseHandle(_handle(handle)):
raise_winerror()

async def aclose(self):
self._close()
await _core.checkpoint()

def __del__(self):
self._close()
self.close()


class PipeSendStream(SendStream, metaclass=Final):
Expand Down Expand Up @@ -78,8 +74,12 @@ async def wait_send_all_might_not_block(self) -> None:
# not implemented yet, and probably not needed
await _core.checkpoint()

def close(self):
self._handle_holder.close()

async def aclose(self):
await self._handle_holder.aclose()
self.close()
await _core.checkpoint()


class PipeReceiveStream(ReceiveStream, metaclass=Final):
Expand Down Expand Up @@ -130,5 +130,9 @@ async def receive_some(self, max_bytes=None) -> bytes:
del buffer[size:]
return buffer

def close(self):
self._handle_holder.close()

async def aclose(self):
await self._handle_holder.aclose()
self.close()
await _core.checkpoint()
22 changes: 21 additions & 1 deletion trio/tests/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import signal
import subprocess
import sys
from pathlib import Path as SyncPath

import pytest
import random
from functools import partial
Expand Down Expand Up @@ -529,7 +531,7 @@ def broken_terminate(self):
nursery.cancel_scope.cancel()


@pytest.mark.skipif(os.name != "posix", reason="posix only")
@pytest.mark.skipif(not posix, reason="posix only")
async def test_warn_on_cancel_SIGKILL_escalation(autojump_clock, monkeypatch):
monkeypatch.setattr(Process, "terminate", lambda *args: None)

Expand All @@ -547,3 +549,21 @@ async def test_run_process_background_fail():
async with _core.open_nursery() as nursery:
proc = await nursery.start(run_process, EXIT_FALSE)
assert proc.returncode == 1


@pytest.mark.skipif(
not SyncPath("/dev/fd").exists(),
reason="requires a way to iterate through open files",
)
async def test_for_leaking_fds():
starting_fds = set(SyncPath("/dev/fd").iterdir())
await run_process(EXIT_TRUE)
assert set(SyncPath("/dev/fd").iterdir()) == starting_fds

with pytest.raises(subprocess.CalledProcessError):
await run_process(EXIT_FALSE)
assert set(SyncPath("/dev/fd").iterdir()) == starting_fds

with pytest.raises(PermissionError):
await run_process(["/dev/fd/0"])
assert set(SyncPath("/dev/fd").iterdir()) == starting_fds

0 comments on commit 7b0001d

Please sign in to comment.