diff --git a/changelogs/fragments/40-subprocess.yaml b/changelogs/fragments/40-subprocess.yaml new file mode 100644 index 0000000..9fac4b1 --- /dev/null +++ b/changelogs/fragments/40-subprocess.yaml @@ -0,0 +1,5 @@ +--- +minor_changes: + - Add a new ``antsibull_core.subprocess_util`` module to help run + subprocesses output and log their output + (https://github.com/ansible-community/antsibull-core/pull/40). diff --git a/src/antsibull_core/subprocess_util.py b/src/antsibull_core/subprocess_util.py new file mode 100644 index 0000000..f31e3c4 --- /dev/null +++ b/src/antsibull_core/subprocess_util.py @@ -0,0 +1,117 @@ +# Copyright (C) 2023 Maxwell G +# SPDX-License-Identifier: GPL-3.0-or-later +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or +# https://www.gnu.org/licenses/gpl-3.0.txt) + +""" +Utilities for dealing with subprocesses +""" + +from __future__ import annotations + +import asyncio +import subprocess +from collections.abc import Callable, Sequence +from typing import TYPE_CHECKING, Any, cast + +from antsibull_core.logging import log + +if TYPE_CHECKING: + from logging import Logger as StdLogger + from _typeshed import StrOrBytesPath + from twiggy.logger import Logger as TwiggyLogger # type: ignore[import] + +mlog = log.fields(mod=__name__) + + +async def _stream_log( + name: str, callback: Callable[[str], Any] | None, stream: asyncio.StreamReader +) -> str: + line = await stream.readline() + lines = [] + while True: + if not line: + break + text = line.decode('utf-8') + if callback: + callback(f'{name}: {text.strip()}') + lines.append(text) + line = await stream.readline() + return ''.join(lines) + + +async def async_log_run( + args: Sequence[StrOrBytesPath], + logger: TwiggyLogger | StdLogger | None = None, + stdout_loglevel: str | None = None, + stderr_loglevel: str | None = 'debug', + check: bool = True, + **kwargs, +) -> subprocess.CompletedProcess: + """ + Asynchronously run a command in a subprocess and log its output. + The command's stdout and stderr are always captured. + For some usecases, you may still need to call + asyncio.create_subprocess_exec() directly to have more control. + + :param args: Command to run + :param logger: + Logger in which to log the command. Can be a `twiggy.logger.Logger` or + a stdlib `logger.Logger`. + :param stdout_loglevel: Which level to use to log stdout. `None` disables logging. + :param stderr_loglevel: Which level to use to log stderr. `None` disables logging. + :param check: + Whether to raise a `subprocess.CalledProcessError` when the + command returns a non-zero exit code + """ + logger = logger or mlog + stdout_logfunc: Callable[[str], Any] | None = None + if stdout_loglevel: + stdout_logfunc = getattr(logger, stdout_loglevel) + stderr_logfunc: Callable[[str], Any] | None = None + if stderr_loglevel: + stderr_logfunc = getattr(logger, stderr_loglevel) + + logger.debug(f'Running subprocess: {args!r}') + kwargs['stdout'] = asyncio.subprocess.PIPE + kwargs['stderr'] = asyncio.subprocess.PIPE + proc = await asyncio.create_subprocess_exec(*args, **kwargs) + stdout, stderr = await asyncio.gather( + # proc.stdout and proc.stderr won't be None with PIPE, hence the cast() + asyncio.create_task( + _stream_log( + 'stdout', stdout_logfunc, cast(asyncio.StreamReader, proc.stdout) + ) + ), + asyncio.create_task( + _stream_log( + 'stderr', stderr_logfunc, cast(asyncio.StreamReader, proc.stderr) + ) + ), + ) + returncode = await proc.wait() + + completed = subprocess.CompletedProcess( + args=args, returncode=returncode, stdout=stdout, stderr=stderr + ) + if check: + completed.check_returncode() + return completed + + +def log_run( + args: Sequence[StrOrBytesPath], + logger: TwiggyLogger | StdLogger | None = None, + stdout_loglevel: str | None = None, + stderr_loglevel: str | None = 'debug', + check: bool = True, + **kwargs, +) -> subprocess.CompletedProcess: + """ + Synchronous wrapper for the async_log_run function. + This function runs a command in a subprocess and captures and logs its + output. + """ + return asyncio.run( + async_log_run(args, logger, stdout_loglevel, stderr_loglevel, check, **kwargs) + ) diff --git a/tests/units/test_subprocess_util.py b/tests/units/test_subprocess_util.py new file mode 100644 index 0000000..1fb459c --- /dev/null +++ b/tests/units/test_subprocess_util.py @@ -0,0 +1,50 @@ +# Copyright (C) 2023 Maxwell G +# SPDX-License-Identifier: GPL-3.0-or-later +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or +# https://www.gnu.org/licenses/gpl-3.0.txt) + +from unittest.mock import MagicMock, call + +import antsibull_core.subprocess_util + + +def test_log_run() -> None: + logger = MagicMock() + args = ('bash', '-ec', 'echo 123 && echo 456 >&2') + proc = antsibull_core.subprocess_util.log_run(args, logger) + assert proc.args == args + assert proc.returncode == 0 + assert proc.stdout == '123\n' + assert proc.stderr == '456\n' + calls = [call(f'Running subprocess: {args}'), call('stderr: 456')] + assert logger.debug.call_args_list == calls + + +def test_log_run_multi() -> None: + logger = MagicMock() + command = """ + for i in {1..15}; do + echo "$i: Hello, stdout" + echo "$i: Hello, stderr" >&2 + done + """ + args = ('bash', '-ec', command) + proc = antsibull_core.subprocess_util.log_run(args, logger, 'info', 'warn') + assert proc.args == args + assert proc.returncode == 0 + + assert logger.debug.called_once_with(f'Running subprocess: {args}') + assert logger.warn.call_count == 15 + assert logger.info.call_count == 15 + expected_out: list[str] = [] + expected_err: list[str] = [] + for index, inp in enumerate(logger.info.call_args_list): + msg = f'{index+1}: Hello, stdout' + expected_out.append(msg) + assert inp == call('stdout: ' + msg) + for index, inp in enumerate(logger.warn.call_args_list): + msg = f'{index+1}: Hello, stderr' + expected_err.append(msg) + assert inp == call('stderr: ' + msg) + assert proc.stdout == '\n'.join(expected_out) + '\n' + assert proc.stderr == '\n'.join(expected_err) + '\n'