diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index 73463d9c..2eb72bc7 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -10,6 +10,12 @@ This library adheres to `Semantic Versioning 2.0 `_.
- Added support for the ``from_uri()``, ``full_match()``, ``parser`` methods/properties
in ``anyio.Path``, newly added in Python 3.13
(`#737 `_)
+- Added support for more keyword arguments for ``run_process()`` and ``open_process()``:
+ ``startupinfo``, ``creationflags``, ``pass_fds``, ``user``, ``group``,
+ ``extra_groups`` and ``umask``
+ (`#742 `_)
+- Improved the type annotations and support for ``PathLike`` in ``run_process()`` and
+ ``open_process()`` to allow for path-like arguments, just like ``subprocess.Popen``
- Changed the ``ResourceWarning`` from an unclosed memory object stream to include its
address for easier identification
- Changed ``start_blocking_portal()`` to always use daemonic threads, to accommodate the
diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py
index b531ad49..b67f8e22 100644
--- a/src/anyio/_backends/_asyncio.py
+++ b/src/anyio/_backends/_asyncio.py
@@ -4,6 +4,7 @@
import asyncio
import concurrent.futures
import math
+import os
import socket
import sys
import threading
@@ -47,7 +48,6 @@
Collection,
ContextManager,
Coroutine,
- Mapping,
Optional,
Sequence,
Tuple,
@@ -81,6 +81,7 @@
UDPPacketType,
UNIXDatagramPacketType,
)
+from ..abc._eventloop import StrOrBytesPath
from ..lowlevel import RunVar
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
@@ -2239,26 +2240,24 @@ def create_blocking_portal(cls) -> abc.BlockingPortal:
@classmethod
async def open_process(
cls,
- command: str | bytes | Sequence[str | bytes],
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
- shell: bool,
stdin: int | IO[Any] | None,
stdout: int | IO[Any] | None,
stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
+ **kwargs: Any,
) -> Process:
await cls.checkpoint()
- if shell:
+ if isinstance(command, PathLike):
+ command = os.fspath(command)
+
+ if isinstance(command, (str, bytes)):
process = await asyncio.create_subprocess_shell(
- cast("str | bytes", command),
+ command,
stdin=stdin,
stdout=stdout,
stderr=stderr,
- cwd=cwd,
- env=env,
- start_new_session=start_new_session,
+ **kwargs,
)
else:
process = await asyncio.create_subprocess_exec(
@@ -2266,9 +2265,7 @@ async def open_process(
stdin=stdin,
stdout=stdout,
stderr=stderr,
- cwd=cwd,
- env=env,
- start_new_session=start_new_session,
+ **kwargs,
)
stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
diff --git a/src/anyio/_backends/_trio.py b/src/anyio/_backends/_trio.py
index cf6f3db7..61205009 100644
--- a/src/anyio/_backends/_trio.py
+++ b/src/anyio/_backends/_trio.py
@@ -2,6 +2,7 @@
import array
import math
+import os
import socket
import sys
import types
@@ -25,7 +26,6 @@
ContextManager,
Coroutine,
Generic,
- Mapping,
NoReturn,
Sequence,
TypeVar,
@@ -60,7 +60,7 @@
from .._core._synchronization import ResourceGuard
from .._core._tasks import CancelScope as BaseCancelScope
from ..abc import IPSockAddrType, UDPPacketType, UNIXDatagramPacketType
-from ..abc._eventloop import AsyncBackend
+from ..abc._eventloop import AsyncBackend, StrOrBytesPath
from ..streams.memory import MemoryObjectSendStream
if sys.version_info >= (3, 10):
@@ -967,26 +967,39 @@ def create_blocking_portal(cls) -> abc.BlockingPortal:
@classmethod
async def open_process(
cls,
- command: str | bytes | Sequence[str | bytes],
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
- shell: bool,
stdin: int | IO[Any] | None,
stdout: int | IO[Any] | None,
stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
+ **kwargs: Any,
) -> Process:
- process = await trio.lowlevel.open_process( # type: ignore[misc]
- command, # type: ignore[arg-type]
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- shell=shell,
- cwd=cwd,
- env=env,
- start_new_session=start_new_session,
- )
+ def convert_item(item: StrOrBytesPath) -> str:
+ str_or_bytes = os.fspath(item)
+ if isinstance(str_or_bytes, str):
+ return str_or_bytes
+ else:
+ return os.fsdecode(str_or_bytes)
+
+ if isinstance(command, (str, bytes, PathLike)):
+ process = await trio.lowlevel.open_process(
+ convert_item(command),
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ shell=True,
+ **kwargs,
+ )
+ else:
+ process = await trio.lowlevel.open_process(
+ [convert_item(item) for item in command],
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ shell=False,
+ **kwargs,
+ )
+
stdin_stream = SendStreamWrapper(process.stdin) if process.stdin else None
stdout_stream = ReceiveStreamWrapper(process.stdout) if process.stdout else None
stderr_stream = ReceiveStreamWrapper(process.stderr) if process.stderr else None
diff --git a/src/anyio/_core/_subprocesses.py b/src/anyio/_core/_subprocesses.py
index 5d5d7b76..1ac2d549 100644
--- a/src/anyio/_core/_subprocesses.py
+++ b/src/anyio/_core/_subprocesses.py
@@ -1,26 +1,41 @@
from __future__ import annotations
-from collections.abc import AsyncIterable, Mapping, Sequence
+import sys
+from collections.abc import AsyncIterable, Iterable, Mapping, Sequence
from io import BytesIO
from os import PathLike
from subprocess import DEVNULL, PIPE, CalledProcessError, CompletedProcess
-from typing import IO, Any, cast
+from typing import IO, Any, Union, cast
from ..abc import Process
from ._eventloop import get_async_backend
from ._tasks import create_task_group
+if sys.version_info >= (3, 10):
+ from typing import TypeAlias
+else:
+ from typing_extensions import TypeAlias
+
+StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
+
async def run_process(
- command: str | bytes | Sequence[str | bytes],
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
input: bytes | None = None,
stdout: int | IO[Any] | None = PIPE,
stderr: int | IO[Any] | None = PIPE,
check: bool = True,
- cwd: str | bytes | PathLike[str] | None = None,
+ cwd: StrOrBytesPath | None = None,
env: Mapping[str, str] | None = None,
+ startupinfo: Any = None,
+ creationflags: int = 0,
start_new_session: bool = False,
+ pass_fds: Sequence[int] = (),
+ user: str | int | None = None,
+ group: str | int | None = None,
+ extra_groups: Iterable[str | int] | None = None,
+ umask: int = -1,
) -> CompletedProcess[bytes]:
"""
Run an external command in a subprocess and wait until it completes.
@@ -40,8 +55,20 @@ async def run_process(
command
:param env: if not ``None``, this mapping replaces the inherited environment
variables from the parent process
+ :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
+ to specify process startup parameters (Windows only)
+ :param creationflags: flags that can be used to control the creation of the
+ subprocess (see :class:`subprocess.Popen` for the specifics)
:param start_new_session: if ``true`` the setsid() system call will be made in the
child process prior to the execution of the subprocess. (POSIX only)
+ :param pass_fds: sequence of file descriptors to keep open between the parent and
+ child processes. (POSIX only)
+ :param user: effective user to run the process as (Python >= 3.9, POSIX only)
+ :param group: effective group to run the process as (Python >= 3.9, POSIX only)
+ :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9,
+ POSIX only)
+ :param umask: if not negative, this umask is applied in the child process before
+ running the given command (Python >= 3.9, POSIX only)
:return: an object representing the completed process
:raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process
exits with a nonzero return code
@@ -62,7 +89,14 @@ async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:
stderr=stderr,
cwd=cwd,
env=env,
+ startupinfo=startupinfo,
+ creationflags=creationflags,
start_new_session=start_new_session,
+ pass_fds=pass_fds,
+ user=user,
+ group=group,
+ extra_groups=extra_groups,
+ umask=umask,
) as process:
stream_contents: list[bytes | None] = [None, None]
async with create_task_group() as tg:
@@ -86,14 +120,21 @@ async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:
async def open_process(
- command: str | bytes | Sequence[str | bytes],
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
stdin: int | IO[Any] | None = PIPE,
stdout: int | IO[Any] | None = PIPE,
stderr: int | IO[Any] | None = PIPE,
- cwd: str | bytes | PathLike[str] | None = None,
+ cwd: StrOrBytesPath | None = None,
env: Mapping[str, str] | None = None,
+ startupinfo: Any = None,
+ creationflags: int = 0,
start_new_session: bool = False,
+ pass_fds: Sequence[int] = (),
+ user: str | int | None = None,
+ group: str | int | None = None,
+ extra_groups: Iterable[str | int] | None = None,
+ umask: int = -1,
) -> Process:
"""
Start an external command in a subprocess.
@@ -111,30 +152,58 @@ async def open_process(
:param cwd: If not ``None``, the working directory is changed before executing
:param env: If env is not ``None``, it must be a mapping that defines the
environment variables for the new process
+ :param creationflags: flags that can be used to control the creation of the
+ subprocess (see :class:`subprocess.Popen` for the specifics)
+ :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
+ to specify process startup parameters (Windows only)
:param start_new_session: if ``true`` the setsid() system call will be made in the
child process prior to the execution of the subprocess. (POSIX only)
+ :param pass_fds: sequence of file descriptors to keep open between the parent and
+ child processes. (POSIX only)
+ :param user: effective user to run the process as (Python >= 3.9; POSIX only)
+ :param group: effective group to run the process as (Python >= 3.9; POSIX only)
+ :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9;
+ POSIX only)
+ :param umask: if not negative, this umask is applied in the child process before
+ running the given command (Python >= 3.9; POSIX only)
:return: an asynchronous process object
"""
- if isinstance(command, (str, bytes)):
- return await get_async_backend().open_process(
- command,
- shell=True,
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- cwd=cwd,
- env=env,
- start_new_session=start_new_session,
- )
- else:
- return await get_async_backend().open_process(
- command,
- shell=False,
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- cwd=cwd,
- env=env,
- start_new_session=start_new_session,
- )
+ kwargs: dict[str, Any] = {}
+ if user is not None:
+ if sys.version_info < (3, 9):
+ raise TypeError("the 'user' argument requires Python 3.9 or later")
+
+ kwargs["user"] = user
+
+ if group is not None:
+ if sys.version_info < (3, 9):
+ raise TypeError("the 'group' argument requires Python 3.9 or later")
+
+ kwargs["group"] = group
+
+ if extra_groups is not None:
+ if sys.version_info < (3, 9):
+ raise TypeError("the 'extra_groups' argument requires Python 3.9 or later")
+
+ kwargs["extra_groups"] = group
+
+ if umask >= 0:
+ if sys.version_info < (3, 9):
+ raise TypeError("the 'umask' argument requires Python 3.9 or later")
+
+ kwargs["umask"] = umask
+
+ return await get_async_backend().open_process(
+ command,
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ cwd=cwd,
+ env=env,
+ startupinfo=startupinfo,
+ creationflags=creationflags,
+ start_new_session=start_new_session,
+ pass_fds=pass_fds,
+ **kwargs,
+ )
diff --git a/src/anyio/abc/_eventloop.py b/src/anyio/abc/_eventloop.py
index a50afefa..258d2e1d 100644
--- a/src/anyio/abc/_eventloop.py
+++ b/src/anyio/abc/_eventloop.py
@@ -3,7 +3,7 @@
import math
import sys
from abc import ABCMeta, abstractmethod
-from collections.abc import AsyncIterator, Awaitable, Mapping
+from collections.abc import AsyncIterator, Awaitable
from os import PathLike
from signal import Signals
from socket import AddressFamily, SocketKind, socket
@@ -15,6 +15,7 @@
ContextManager,
Sequence,
TypeVar,
+ Union,
overload,
)
@@ -23,9 +24,12 @@
else:
from typing_extensions import TypeVarTuple, Unpack
-if TYPE_CHECKING:
- from typing import Literal
+if sys.version_info >= (3, 10):
+ from typing import TypeAlias
+else:
+ from typing_extensions import TypeAlias
+if TYPE_CHECKING:
from .._core._synchronization import CapacityLimiter, Event
from .._core._tasks import CancelScope
from .._core._testing import TaskInfo
@@ -46,6 +50,7 @@
T_Retval = TypeVar("T_Retval")
PosArgsT = TypeVarTuple("PosArgsT")
+StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
class AsyncBackend(metaclass=ABCMeta):
@@ -213,51 +218,16 @@ def run_sync_from_thread(
def create_blocking_portal(cls) -> BlockingPortal:
pass
- @classmethod
- @overload
- async def open_process(
- cls,
- command: str | bytes,
- *,
- shell: Literal[True],
- stdin: int | IO[Any] | None,
- stdout: int | IO[Any] | None,
- stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike[str] | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
- ) -> Process:
- pass
-
- @classmethod
- @overload
- async def open_process(
- cls,
- command: Sequence[str | bytes],
- *,
- shell: Literal[False],
- stdin: int | IO[Any] | None,
- stdout: int | IO[Any] | None,
- stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike[str] | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
- ) -> Process:
- pass
-
@classmethod
@abstractmethod
async def open_process(
cls,
- command: str | bytes | Sequence[str | bytes],
+ command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
- shell: bool,
stdin: int | IO[Any] | None,
stdout: int | IO[Any] | None,
stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike[str] | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
+ **kwargs: Any,
) -> Process:
pass
diff --git a/tests/test_subprocesses.py b/tests/test_subprocesses.py
index 22bf882e..84c4b4dc 100644
--- a/tests/test_subprocesses.py
+++ b/tests/test_subprocesses.py
@@ -3,12 +3,15 @@
import os
import platform
import sys
+from collections.abc import Callable
+from contextlib import ExitStack
from pathlib import Path
from subprocess import CalledProcessError
from textwrap import dedent
+from typing import Any
import pytest
-from _pytest.fixtures import FixtureRequest
+from pytest import FixtureRequest
from anyio import CancelScope, ClosedResourceError, open_process, run_process
from anyio.streams.buffered import BufferedByteReceiveStream
@@ -225,3 +228,64 @@ async def test_process_aexit_cancellation_closes_standard_streams(
with pytest.raises(ClosedResourceError):
await process.stderr.receive(1)
+
+
+@pytest.mark.parametrize(
+ "argname, argvalue_factory",
+ [
+ pytest.param(
+ "user",
+ lambda: os.getuid(),
+ id="user",
+ marks=[
+ pytest.mark.skipif(
+ platform.system() == "Windows",
+ reason="os.getuid() is not available on Windows",
+ )
+ ],
+ ),
+ pytest.param(
+ "group",
+ lambda: os.getgid(),
+ id="user",
+ marks=[
+ pytest.mark.skipif(
+ platform.system() == "Windows",
+ reason="os.getgid() is not available on Windows",
+ )
+ ],
+ ),
+ pytest.param("extra_groups", list, id="extra_groups"),
+ pytest.param("umask", lambda: 0, id="umask"),
+ ],
+)
+async def test_py39_arguments(
+ argname: str,
+ argvalue_factory: Callable[[], Any],
+ anyio_backend_name: str,
+ anyio_backend_options: dict[str, Any],
+) -> None:
+ with ExitStack() as stack:
+ if sys.version_info < (3, 9):
+ stack.enter_context(
+ pytest.raises(
+ TypeError,
+ match=rf"the {argname!r} argument requires Python 3.9 or later",
+ )
+ )
+
+ try:
+ await run_process(
+ [sys.executable, "-c", "print('hello')"],
+ **{argname: argvalue_factory()},
+ )
+ except TypeError as exc:
+ if (
+ "unexpected keyword argument" in str(exc)
+ and anyio_backend_name == "asyncio"
+ and anyio_backend_options["loop_factory"]
+ and anyio_backend_options["loop_factory"].__module__ == "uvloop"
+ ):
+ pytest.skip(f"the {argname!r} argument is not supported by uvloop yet")
+
+ raise