Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge shell_tools functions #5566

Merged
merged 4 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 1 addition & 191 deletions dev_tools/shell_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import subprocess
import sys
from typing import List, Optional, Tuple, Union, IO, Any, cast, NamedTuple

from collections.abc import AsyncIterable

CommandOutput = NamedTuple(
"CommandOutput", [('out', Optional[str]), ('err', Optional[str]), ('exit_code', int)]
)
from typing import List, Tuple, Union


BOLD = 1
Expand All @@ -45,69 +38,6 @@ def highlight(text: str, color_code: int, bold: bool = False) -> str:
return '{}\033[{}m{}\033[0m'.format('\033[1m' if bold else '', color_code, text)


class TeeCapture:
"""Marker class indicating desire to capture output written to a pipe.

If out_pipe is None, the caller just wants to capture output without
writing it to anything in particular.
"""

def __init__(self, out_pipe: Optional[IO[str]] = None) -> None:
self.out_pipe = out_pipe


async def _async_forward(
async_chunks: AsyncIterable, out: Optional[Union[TeeCapture, IO[str]]]
) -> Optional[str]:
"""Prints/captures output from the given asynchronous iterable.

Args:
async_chunks: An asynchronous source of bytes or str.
out: Where to put the chunks.

Returns:
The complete captured output, or else None if the out argument wasn't a
TeeCapture instance.
"""
capture = isinstance(out, TeeCapture)
out_pipe = out.out_pipe if isinstance(out, TeeCapture) else out

chunks: Optional[List[str]] = [] if capture else None
async for chunk in async_chunks:
if not isinstance(chunk, str):
chunk = chunk.decode()
if out_pipe:
print(chunk, file=out_pipe, end='')
if chunks is not None:
chunks.append(chunk)

return ''.join(chunks) if chunks is not None else None


async def _async_wait_for_process(
future_process: Any,
out: Optional[Union[TeeCapture, IO[str]]] = sys.stdout,
err: Optional[Union[TeeCapture, IO[str]]] = sys.stderr,
) -> CommandOutput:
"""Awaits the creation and completion of an asynchronous process.

Args:
future_process: The eventually created process.
out: Where to write stuff emitted by the process' stdout.
err: Where to write stuff emitted by the process' stderr.

Returns:
A (captured output, captured error output, return code) triplet.
"""
process = await future_process
future_output = _async_forward(process.stdout, out)
future_err_output = _async_forward(process.stderr, err)
output, err_output = await asyncio.gather(future_output, future_err_output)
await process.wait()

return CommandOutput(output, err_output, process.returncode)


def abbreviate_command_arguments_after_switches(cmd: Tuple[str, ...]) -> Tuple[str, ...]:
result = [cmd[0]]
for i in range(1, len(cmd)):
Expand Down Expand Up @@ -165,126 +95,6 @@ def run(
return subprocess.run(args, **subprocess_run_kwargs)


def run_cmd(
*cmd: Optional[str],
out: Optional[Union[TeeCapture, IO[str]]] = sys.stdout,
err: Optional[Union[TeeCapture, IO[str]]] = sys.stderr,
raise_on_fail: bool = True,
log_run_to_stderr: bool = True,
abbreviate_non_option_arguments: bool = False,
**kwargs,
) -> CommandOutput:
"""Invokes a subprocess and waits for it to finish.

Args:
*cmd: Components of the command to execute, e.g. ["echo", "dog"].
out: Where to write the process' stdout. Defaults to sys.stdout. Can be
anything accepted by print's 'file' parameter, or None if the
output should be dropped, or a TeeCapture instance. If a TeeCapture
instance is given, the first element of the returned tuple will be
the captured output.
err: Where to write the process' stderr. Defaults to sys.stderr. Can be
anything accepted by print's 'file' parameter, or None if the
output should be dropped, or a TeeCapture instance. If a TeeCapture
instance is given, the second element of the returned tuple will be
the captured error output.
raise_on_fail: If the process returns a non-zero error code
and this flag is set, a CalledProcessError will be raised.
Otherwise the return code is the third element of the returned
tuple.
log_run_to_stderr: Determines whether the fact that this shell command
was executed is logged to sys.stderr or not.
abbreviate_non_option_arguments: When logging to stderr, this cuts off
the potentially-huge tail of the command listing off e.g. hundreds
of file paths. No effect if log_run_to_stderr is not set.
**kwargs: Extra arguments for asyncio.create_subprocess_shell, such as
a cwd (current working directory) argument.

Returns:
A (captured output, captured error output, return code) triplet. The
captured outputs will be None if the out or err parameters were not set
to an instance of TeeCapture.

Raises:
subprocess.CalledProcessError: The process returned a non-zero error
code and raise_on_fail was set.
"""
kept_cmd = tuple(cast(str, e) for e in cmd if e is not None)
if log_run_to_stderr:
cmd_desc = kept_cmd
if abbreviate_non_option_arguments:
cmd_desc = abbreviate_command_arguments_after_switches(cmd_desc)
print('run:', cmd_desc, file=sys.stderr)
result = asyncio.get_event_loop().run_until_complete(
_async_wait_for_process(
asyncio.create_subprocess_exec(
*kept_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, **kwargs
),
out,
err,
)
)
if raise_on_fail and result[2]:
raise subprocess.CalledProcessError(result[2], kept_cmd)
return result


def run_shell(
cmd: str,
out: Optional[Union[TeeCapture, IO[str]]] = sys.stdout,
err: Optional[Union[TeeCapture, IO[str]]] = sys.stderr,
raise_on_fail: bool = True,
log_run_to_stderr: bool = True,
**kwargs,
) -> CommandOutput:
"""Invokes a shell command and waits for it to finish.

Args:
cmd: The command line string to execute, e.g. "echo dog | cat > file".
out: Where to write the process' stdout. Defaults to sys.stdout. Can be
anything accepted by print's 'file' parameter, or None if the
output should be dropped, or a TeeCapture instance. If a TeeCapture
instance is given, the first element of the returned tuple will be
the captured output.
err: Where to write the process' stderr. Defaults to sys.stderr. Can be
anything accepted by print's 'file' parameter, or None if the
output should be dropped, or a TeeCapture instance. If a TeeCapture
instance is given, the second element of the returned tuple will be
the captured error output.
raise_on_fail: If the process returns a non-zero error code
and this flag is set, a CalledProcessError will be raised.
Otherwise the return code is the third element of the returned
tuple.
log_run_to_stderr: Determines whether the fact that this shell command
was executed is logged to sys.stderr or not.
**kwargs: Extra arguments for asyncio.create_subprocess_shell, such as
a cwd (current working directory) argument.

Returns:
A (captured output, captured error output, return code) triplet. The
captured outputs will be None if the out or err parameters were not set
to an instance of TeeCapture.

Raises:
subprocess.CalledProcessError: The process returned a non-zero error
code and raise_on_fail was set.
"""
if log_run_to_stderr:
print('shell:', cmd, file=sys.stderr)
result = asyncio.get_event_loop().run_until_complete(
_async_wait_for_process(
asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, **kwargs
),
out,
err,
)
)
if raise_on_fail and result[2]:
raise subprocess.CalledProcessError(result[2], cmd)
return result


def output_of(args: Union[str, List[str]], **kwargs) -> str:
"""Invokes a subprocess and returns its output as a string.

Expand Down
54 changes: 0 additions & 54 deletions dev_tools/shell_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ def run(*args, **kwargs):
return shell_tools.run(*args, log_run_to_stderr=False, **kwargs)


def run_cmd(*args, **kwargs):
return shell_tools.run_cmd(*args, log_run_to_stderr=False, **kwargs)


def run_shell(*args, **kwargs):
return shell_tools.run_shell(*args, log_run_to_stderr=False, **kwargs)


@only_on_posix
def test_run_raises_on_failure():
assert run('true').returncode == 0
Expand All @@ -59,52 +51,6 @@ def test_run_with_command_logging():
assert catch_stderr.getvalue() == "run: ('echo', '-n', '[...]')\n"


@only_on_posix
def test_run_cmd_raise_on_fail():
assert run_cmd('true') == (None, None, 0)
assert run_cmd('true', raise_on_fail=False) == (None, None, 0)

with pytest.raises(subprocess.CalledProcessError):
run_cmd('false')
assert run_cmd('false', raise_on_fail=False) == (None, None, 1)


@only_on_posix
def test_run_shell_raise_on_fail():
assert run_shell('true') == (None, None, 0)
assert run_shell('true', raise_on_fail=False) == (None, None, 0)

with pytest.raises(subprocess.CalledProcessError):
run_shell('false')
assert run_shell('false', raise_on_fail=False) == (None, None, 1)


@only_on_posix
def test_run_cmd_capture():
assert run_cmd('echo', 'test', out=None) == (None, None, 0)
assert run_cmd('echo', 'test', out=shell_tools.TeeCapture()) == ('test\n', None, 0)
assert run_cmd('echo', 'test', out=None, err=shell_tools.TeeCapture()) == (None, '', 0)


@only_on_posix
def test_run_shell_capture():
assert run_shell('echo test 1>&2', err=None) == (None, None, 0)
assert run_shell('echo test 1>&2', err=shell_tools.TeeCapture()) == (None, 'test\n', 0)
assert run_shell('echo test 1>&2', err=None, out=shell_tools.TeeCapture()) == ('', None, 0)


@only_on_posix
def test_run_shell_does_not_deadlock_on_large_outputs():
assert run_shell(
r"""python3 -c "import sys;"""
r"""print((('o' * 99) + '\n') * 10000);"""
r"""print((('e' * 99) + '\n') * 10000, file=sys.stderr)"""
'"',
out=None,
err=None,
) == (None, None, 0)


@only_on_posix
def test_output_of():
assert shell_tools.output_of('true') == ''
Expand Down