Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subprocess_util: support callbacks for stdout and stderr #113

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelogs/fragments/113-subprocess_util_loopback.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
minor_changes:
- "``subprocess_util.async_log_run()``, ``subprocess_util.log_run()``, and
the corresponding functions in ``venv`` now support passing generic
callback functions for ``stdout_loglevel`` and ``stderr_loglevel``
(https://github.com/ansible-community/antsibull-core/pull/113)."
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ typing = [
"pyre-check >= 0.9.17",
"types-aiofiles",
"types-PyYAML",
"typing-extensions",
]
dev = [
# Used by nox sessions
Expand Down
76 changes: 60 additions & 16 deletions src/antsibull_core/subprocess_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

import asyncio
import subprocess
import sys
from asyncio.exceptions import IncompleteReadError, LimitOverrunError
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Any, cast
from collections.abc import Awaitable, Callable, Sequence
from functools import partial
from inspect import isawaitable
from typing import TYPE_CHECKING, Any, TypeVar, cast

from antsibull_core.logging import log

Expand All @@ -22,15 +25,36 @@

from _typeshed import StrOrBytesPath
from twiggy.logger import Logger as TwiggyLogger # type: ignore[import]
from typing_extensions import ParamSpec, TypeAlias

_T = TypeVar("_T")
_P = ParamSpec("_P")

mlog = log.fields(mod=__name__)

CalledProcessError = subprocess.CalledProcessError

OutputCallbackType: TypeAlias = "Callable[[str], Any] | Callable[[str], Awaitable[Any]]"

stdout_callback = print
gotmax23 marked this conversation as resolved.
Show resolved Hide resolved
stderr_callback = partial(print, file=sys.stderr)


async def _sync_or_async(
func: Callable[_P, Awaitable[_T]] | Callable[_P, _T],
/,
*args: _P.args,
**kwargs: _P.kwargs,
) -> _T:
out = func(*args, **kwargs)
if isawaitable(out):
return await out
return cast("_T", out)


async def _stream_log(
name: str,
callback: Callable[[str], Any] | None,
callback: OutputCallbackType | None,
stream: asyncio.StreamReader,
errors: str,
) -> str:
Expand Down Expand Up @@ -58,16 +82,16 @@ async def _stream_log(
break
text = line.decode("utf-8", errors=errors)
if callback:
callback(f"{name}: {text.strip()}")
await _sync_or_async(callback, f"{name}{text.strip()}")
lines.append(text)
return "".join(lines)


async def async_log_run(
args: Sequence[StrOrBytesPath],
logger: TwiggyLogger | StdLogger | None = None,
stdout_loglevel: str | None = None,
stderr_loglevel: str | None = "debug",
stdout_loglevel: str | OutputCallbackType | None = None,
stderr_loglevel: str | OutputCallbackType | None = "debug",
check: bool = True,
*,
errors: str = "strict",
Expand All @@ -83,8 +107,12 @@ async def async_log_run(
:param logger:
Logger in which to log the command. Can be a `twiggy.logger.Logger` or
a stdlib `logger.Logger`.
:param stdout_loglevel: Which level to use to log stdout. `None` disables logging.
:param stderr_loglevel: Which level to use to log stderr. `None` disables logging.
:param stdout_loglevel:
Which level to use to log stdout or a generic callback function.
`None` disables logging.
:param stderr_loglevel:
Which level to use to log stdout or a generic callback function.
`None` disables logging.
:param check:
Whether to raise a `subprocess.CalledProcessError` when the
command returns a non-zero exit code
Expand All @@ -93,12 +121,21 @@ async def async_log_run(
"""
logger = logger or mlog
stdout_logfunc: Callable[[str], Any] | None = None
stdout_log_prefix = "stdout: "
if stdout_loglevel:
stdout_logfunc = getattr(logger, stdout_loglevel)
if callable(stdout_loglevel):
stdout_logfunc = stdout_loglevel
stdout_log_prefix = ""
else:
stdout_logfunc = getattr(logger, stdout_loglevel)
stderr_logfunc: Callable[[str], Any] | None = None
stderr_log_prefix = "stderr: "
if stderr_loglevel:
stderr_logfunc = getattr(logger, stderr_loglevel)

if callable(stderr_loglevel):
stderr_logfunc = stderr_loglevel
stderr_log_prefix = ""
else:
stderr_logfunc = getattr(logger, stderr_loglevel)
logger.debug(f"Running subprocess: {args!r}")
kwargs["stdout"] = asyncio.subprocess.PIPE
kwargs["stderr"] = asyncio.subprocess.PIPE
Expand All @@ -108,15 +145,15 @@ async def async_log_run(
# proc.stdout and proc.stderr won't be None with PIPE, hence the cast()
asyncio.create_task(
_stream_log(
"stdout",
stdout_log_prefix,
stdout_logfunc,
cast(asyncio.StreamReader, proc.stdout),
errors,
)
),
asyncio.create_task(
_stream_log(
"stderr",
stderr_log_prefix,
stderr_logfunc,
cast(asyncio.StreamReader, proc.stderr),
errors,
Expand All @@ -136,8 +173,8 @@ async def async_log_run(
def log_run(
args: Sequence[StrOrBytesPath],
logger: TwiggyLogger | StdLogger | None = None,
stdout_loglevel: str | None = None,
stderr_loglevel: str | None = "debug",
stdout_loglevel: str | OutputCallbackType | None = None,
stderr_loglevel: str | OutputCallbackType | None = "debug",
check: bool = True,
**kwargs,
) -> subprocess.CompletedProcess[str]:
Expand All @@ -151,4 +188,11 @@ def log_run(
)


__all__ = ("async_log_run", "log_run", "CalledProcessError")
__all__ = (
"async_log_run",
"log_run",
"CalledProcessError",
"stdout_callback",
"stderr_callback",
"OutputCallbackType",
)
8 changes: 4 additions & 4 deletions src/antsibull_core/venv.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ async def async_log_run(
self,
args: Sequence[StrPath],
logger: TwiggyLogger | StdLogger | None = None,
stdout_loglevel: str | None = None,
stderr_loglevel: str | None = "debug",
stdout_loglevel: str | subprocess_util.OutputCallbackType | None = None,
stderr_loglevel: str | subprocess_util.OutputCallbackType | None = "debug",
check: bool = True,
*,
errors: str = "strict",
Expand Down Expand Up @@ -133,8 +133,8 @@ def log_run(
self,
args: Sequence[StrPath],
logger: TwiggyLogger | StdLogger | None = None,
stdout_loglevel: str | None = None,
stderr_loglevel: str | None = "debug",
stdout_loglevel: str | subprocess_util.OutputCallbackType | None = None,
stderr_loglevel: str | subprocess_util.OutputCallbackType | None = "debug",
check: bool = True,
*,
errors: str = "strict",
Expand Down
17 changes: 17 additions & 0 deletions tests/units/test_subprocess_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,20 @@ def test_log_run_long_line(count: int) -> None:
assert proc.args == args
assert proc.returncode == 0
assert proc.stdout == ("\u0000" * count) + "\nfoo\n"


def test_log_run_callback() -> None:
stdout_lines: list[str] = []
stderr_lines: list[str] = []

async def add_to_stderr(string: str, /) -> None:
stderr_lines.append(string)

antsibull_core.subprocess_util.log_run(
["sh", "-c", "echo Never; echo gonna >&2; echo give"],
None,
stdout_lines.append,
add_to_stderr,
)
assert stdout_lines == ["Never", "give"]
assert stderr_lines == ["gonna"]