Skip to content

Commit

Permalink
add subprocess_util module to replace sh (#40)
Browse files Browse the repository at this point in the history
* add subprocess_util module to replace sh

This adds code to run subprocesses asynchronously and integrates with
the logger. stdout and stderr are logged in real time and always
captured. We return a CompletedProcess object like regular subprocess
does.

Relates: #34

* subprocess_util: also support stdlib Logger

Both Logger classes have `.debug()`, `.info()`, etc. methods, so let's
explicitly declare support for both.

* subprocess_util: order arguments more logically

* add unit tests for subprocess_util
  • Loading branch information
gotmax23 authored Apr 5, 2023
1 parent 2c8493f commit 0388c0f
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 0 deletions.
5 changes: 5 additions & 0 deletions changelogs/fragments/40-subprocess.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
minor_changes:
- Add a new ``antsibull_core.subprocess_util`` module to help run
subprocesses output and log their output
(https://github.com/ansible-community/antsibull-core/pull/40).
117 changes: 117 additions & 0 deletions src/antsibull_core/subprocess_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright (C) 2023 Maxwell G <[email protected]>
# SPDX-License-Identifier: GPL-3.0-or-later
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or
# https://www.gnu.org/licenses/gpl-3.0.txt)

"""
Utilities for dealing with subprocesses
"""

from __future__ import annotations

import asyncio
import subprocess
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Any, cast

from antsibull_core.logging import log

if TYPE_CHECKING:
from logging import Logger as StdLogger
from _typeshed import StrOrBytesPath
from twiggy.logger import Logger as TwiggyLogger # type: ignore[import]

mlog = log.fields(mod=__name__)


async def _stream_log(
name: str, callback: Callable[[str], Any] | None, stream: asyncio.StreamReader
) -> str:
line = await stream.readline()
lines = []
while True:
if not line:
break
text = line.decode('utf-8')
if callback:
callback(f'{name}: {text.strip()}')
lines.append(text)
line = await stream.readline()
return ''.join(lines)


async def async_log_run(
args: Sequence[StrOrBytesPath],
logger: TwiggyLogger | StdLogger | None = None,
stdout_loglevel: str | None = None,
stderr_loglevel: str | None = 'debug',
check: bool = True,
**kwargs,
) -> subprocess.CompletedProcess:
"""
Asynchronously run a command in a subprocess and log its output.
The command's stdout and stderr are always captured.
For some usecases, you may still need to call
asyncio.create_subprocess_exec() directly to have more control.
:param args: Command to run
:param logger:
Logger in which to log the command. Can be a `twiggy.logger.Logger` or
a stdlib `logger.Logger`.
:param stdout_loglevel: Which level to use to log stdout. `None` disables logging.
:param stderr_loglevel: Which level to use to log stderr. `None` disables logging.
:param check:
Whether to raise a `subprocess.CalledProcessError` when the
command returns a non-zero exit code
"""
logger = logger or mlog
stdout_logfunc: Callable[[str], Any] | None = None
if stdout_loglevel:
stdout_logfunc = getattr(logger, stdout_loglevel)
stderr_logfunc: Callable[[str], Any] | None = None
if stderr_loglevel:
stderr_logfunc = getattr(logger, stderr_loglevel)

logger.debug(f'Running subprocess: {args!r}')
kwargs['stdout'] = asyncio.subprocess.PIPE
kwargs['stderr'] = asyncio.subprocess.PIPE
proc = await asyncio.create_subprocess_exec(*args, **kwargs)
stdout, stderr = await asyncio.gather(
# proc.stdout and proc.stderr won't be None with PIPE, hence the cast()
asyncio.create_task(
_stream_log(
'stdout', stdout_logfunc, cast(asyncio.StreamReader, proc.stdout)
)
),
asyncio.create_task(
_stream_log(
'stderr', stderr_logfunc, cast(asyncio.StreamReader, proc.stderr)
)
),
)
returncode = await proc.wait()

completed = subprocess.CompletedProcess(
args=args, returncode=returncode, stdout=stdout, stderr=stderr
)
if check:
completed.check_returncode()
return completed


def log_run(
args: Sequence[StrOrBytesPath],
logger: TwiggyLogger | StdLogger | None = None,
stdout_loglevel: str | None = None,
stderr_loglevel: str | None = 'debug',
check: bool = True,
**kwargs,
) -> subprocess.CompletedProcess:
"""
Synchronous wrapper for the async_log_run function.
This function runs a command in a subprocess and captures and logs its
output.
"""
return asyncio.run(
async_log_run(args, logger, stdout_loglevel, stderr_loglevel, check, **kwargs)
)
50 changes: 50 additions & 0 deletions tests/units/test_subprocess_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (C) 2023 Maxwell G <[email protected]>
# SPDX-License-Identifier: GPL-3.0-or-later
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or
# https://www.gnu.org/licenses/gpl-3.0.txt)

from unittest.mock import MagicMock, call

import antsibull_core.subprocess_util


def test_log_run() -> None:
logger = MagicMock()
args = ('bash', '-ec', 'echo 123 && echo 456 >&2')
proc = antsibull_core.subprocess_util.log_run(args, logger)
assert proc.args == args
assert proc.returncode == 0
assert proc.stdout == '123\n'
assert proc.stderr == '456\n'
calls = [call(f'Running subprocess: {args}'), call('stderr: 456')]
assert logger.debug.call_args_list == calls


def test_log_run_multi() -> None:
logger = MagicMock()
command = """
for i in {1..15}; do
echo "$i: Hello, stdout"
echo "$i: Hello, stderr" >&2
done
"""
args = ('bash', '-ec', command)
proc = antsibull_core.subprocess_util.log_run(args, logger, 'info', 'warn')
assert proc.args == args
assert proc.returncode == 0

assert logger.debug.called_once_with(f'Running subprocess: {args}')
assert logger.warn.call_count == 15
assert logger.info.call_count == 15
expected_out: list[str] = []
expected_err: list[str] = []
for index, inp in enumerate(logger.info.call_args_list):
msg = f'{index+1}: Hello, stdout'
expected_out.append(msg)
assert inp == call('stdout: ' + msg)
for index, inp in enumerate(logger.warn.call_args_list):
msg = f'{index+1}: Hello, stderr'
expected_err.append(msg)
assert inp == call('stderr: ' + msg)
assert proc.stdout == '\n'.join(expected_out) + '\n'
assert proc.stderr == '\n'.join(expected_err) + '\n'

0 comments on commit 0388c0f

Please sign in to comment.