diff --git a/dev_tools/shell_tools.py b/dev_tools/shell_tools.py index db932b7fb4f..68de9fead55 100644 --- a/dev_tools/shell_tools.py +++ b/dev_tools/shell_tools.py @@ -12,16 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asyncio import subprocess import sys -from typing import List, Optional, Tuple, Union, IO, Any, cast, NamedTuple - -from collections.abc import AsyncIterable - -CommandOutput = NamedTuple( - "CommandOutput", [('out', Optional[str]), ('err', Optional[str]), ('exit_code', int)] -) +from typing import List, Tuple, Union BOLD = 1 @@ -45,69 +38,6 @@ def highlight(text: str, color_code: int, bold: bool = False) -> str: return '{}\033[{}m{}\033[0m'.format('\033[1m' if bold else '', color_code, text) -class TeeCapture: - """Marker class indicating desire to capture output written to a pipe. - - If out_pipe is None, the caller just wants to capture output without - writing it to anything in particular. - """ - - def __init__(self, out_pipe: Optional[IO[str]] = None) -> None: - self.out_pipe = out_pipe - - -async def _async_forward( - async_chunks: AsyncIterable, out: Optional[Union[TeeCapture, IO[str]]] -) -> Optional[str]: - """Prints/captures output from the given asynchronous iterable. - - Args: - async_chunks: An asynchronous source of bytes or str. - out: Where to put the chunks. - - Returns: - The complete captured output, or else None if the out argument wasn't a - TeeCapture instance. - """ - capture = isinstance(out, TeeCapture) - out_pipe = out.out_pipe if isinstance(out, TeeCapture) else out - - chunks: Optional[List[str]] = [] if capture else None - async for chunk in async_chunks: - if not isinstance(chunk, str): - chunk = chunk.decode() - if out_pipe: - print(chunk, file=out_pipe, end='') - if chunks is not None: - chunks.append(chunk) - - return ''.join(chunks) if chunks is not None else None - - -async def _async_wait_for_process( - future_process: Any, - out: Optional[Union[TeeCapture, IO[str]]] = sys.stdout, - err: Optional[Union[TeeCapture, IO[str]]] = sys.stderr, -) -> CommandOutput: - """Awaits the creation and completion of an asynchronous process. - - Args: - future_process: The eventually created process. - out: Where to write stuff emitted by the process' stdout. - err: Where to write stuff emitted by the process' stderr. - - Returns: - A (captured output, captured error output, return code) triplet. - """ - process = await future_process - future_output = _async_forward(process.stdout, out) - future_err_output = _async_forward(process.stderr, err) - output, err_output = await asyncio.gather(future_output, future_err_output) - await process.wait() - - return CommandOutput(output, err_output, process.returncode) - - def abbreviate_command_arguments_after_switches(cmd: Tuple[str, ...]) -> Tuple[str, ...]: result = [cmd[0]] for i in range(1, len(cmd)): @@ -165,126 +95,6 @@ def run( return subprocess.run(args, **subprocess_run_kwargs) -def run_cmd( - *cmd: Optional[str], - out: Optional[Union[TeeCapture, IO[str]]] = sys.stdout, - err: Optional[Union[TeeCapture, IO[str]]] = sys.stderr, - raise_on_fail: bool = True, - log_run_to_stderr: bool = True, - abbreviate_non_option_arguments: bool = False, - **kwargs, -) -> CommandOutput: - """Invokes a subprocess and waits for it to finish. - - Args: - *cmd: Components of the command to execute, e.g. ["echo", "dog"]. - out: Where to write the process' stdout. Defaults to sys.stdout. Can be - anything accepted by print's 'file' parameter, or None if the - output should be dropped, or a TeeCapture instance. If a TeeCapture - instance is given, the first element of the returned tuple will be - the captured output. - err: Where to write the process' stderr. Defaults to sys.stderr. Can be - anything accepted by print's 'file' parameter, or None if the - output should be dropped, or a TeeCapture instance. If a TeeCapture - instance is given, the second element of the returned tuple will be - the captured error output. - raise_on_fail: If the process returns a non-zero error code - and this flag is set, a CalledProcessError will be raised. - Otherwise the return code is the third element of the returned - tuple. - log_run_to_stderr: Determines whether the fact that this shell command - was executed is logged to sys.stderr or not. - abbreviate_non_option_arguments: When logging to stderr, this cuts off - the potentially-huge tail of the command listing off e.g. hundreds - of file paths. No effect if log_run_to_stderr is not set. - **kwargs: Extra arguments for asyncio.create_subprocess_shell, such as - a cwd (current working directory) argument. - - Returns: - A (captured output, captured error output, return code) triplet. The - captured outputs will be None if the out or err parameters were not set - to an instance of TeeCapture. - - Raises: - subprocess.CalledProcessError: The process returned a non-zero error - code and raise_on_fail was set. - """ - kept_cmd = tuple(cast(str, e) for e in cmd if e is not None) - if log_run_to_stderr: - cmd_desc = kept_cmd - if abbreviate_non_option_arguments: - cmd_desc = abbreviate_command_arguments_after_switches(cmd_desc) - print('run:', cmd_desc, file=sys.stderr) - result = asyncio.get_event_loop().run_until_complete( - _async_wait_for_process( - asyncio.create_subprocess_exec( - *kept_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, **kwargs - ), - out, - err, - ) - ) - if raise_on_fail and result[2]: - raise subprocess.CalledProcessError(result[2], kept_cmd) - return result - - -def run_shell( - cmd: str, - out: Optional[Union[TeeCapture, IO[str]]] = sys.stdout, - err: Optional[Union[TeeCapture, IO[str]]] = sys.stderr, - raise_on_fail: bool = True, - log_run_to_stderr: bool = True, - **kwargs, -) -> CommandOutput: - """Invokes a shell command and waits for it to finish. - - Args: - cmd: The command line string to execute, e.g. "echo dog | cat > file". - out: Where to write the process' stdout. Defaults to sys.stdout. Can be - anything accepted by print's 'file' parameter, or None if the - output should be dropped, or a TeeCapture instance. If a TeeCapture - instance is given, the first element of the returned tuple will be - the captured output. - err: Where to write the process' stderr. Defaults to sys.stderr. Can be - anything accepted by print's 'file' parameter, or None if the - output should be dropped, or a TeeCapture instance. If a TeeCapture - instance is given, the second element of the returned tuple will be - the captured error output. - raise_on_fail: If the process returns a non-zero error code - and this flag is set, a CalledProcessError will be raised. - Otherwise the return code is the third element of the returned - tuple. - log_run_to_stderr: Determines whether the fact that this shell command - was executed is logged to sys.stderr or not. - **kwargs: Extra arguments for asyncio.create_subprocess_shell, such as - a cwd (current working directory) argument. - - Returns: - A (captured output, captured error output, return code) triplet. The - captured outputs will be None if the out or err parameters were not set - to an instance of TeeCapture. - - Raises: - subprocess.CalledProcessError: The process returned a non-zero error - code and raise_on_fail was set. - """ - if log_run_to_stderr: - print('shell:', cmd, file=sys.stderr) - result = asyncio.get_event_loop().run_until_complete( - _async_wait_for_process( - asyncio.create_subprocess_shell( - cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, **kwargs - ), - out, - err, - ) - ) - if raise_on_fail and result[2]: - raise subprocess.CalledProcessError(result[2], cmd) - return result - - def output_of(args: Union[str, List[str]], **kwargs) -> str: """Invokes a subprocess and returns its output as a string. diff --git a/dev_tools/shell_tools_test.py b/dev_tools/shell_tools_test.py index 6c6f20ae470..a259b040c6e 100644 --- a/dev_tools/shell_tools_test.py +++ b/dev_tools/shell_tools_test.py @@ -26,14 +26,6 @@ def run(*args, **kwargs): return shell_tools.run(*args, log_run_to_stderr=False, **kwargs) -def run_cmd(*args, **kwargs): - return shell_tools.run_cmd(*args, log_run_to_stderr=False, **kwargs) - - -def run_shell(*args, **kwargs): - return shell_tools.run_shell(*args, log_run_to_stderr=False, **kwargs) - - @only_on_posix def test_run_raises_on_failure(): assert run('true').returncode == 0 @@ -59,52 +51,6 @@ def test_run_with_command_logging(): assert catch_stderr.getvalue() == "run: ('echo', '-n', '[...]')\n" -@only_on_posix -def test_run_cmd_raise_on_fail(): - assert run_cmd('true') == (None, None, 0) - assert run_cmd('true', raise_on_fail=False) == (None, None, 0) - - with pytest.raises(subprocess.CalledProcessError): - run_cmd('false') - assert run_cmd('false', raise_on_fail=False) == (None, None, 1) - - -@only_on_posix -def test_run_shell_raise_on_fail(): - assert run_shell('true') == (None, None, 0) - assert run_shell('true', raise_on_fail=False) == (None, None, 0) - - with pytest.raises(subprocess.CalledProcessError): - run_shell('false') - assert run_shell('false', raise_on_fail=False) == (None, None, 1) - - -@only_on_posix -def test_run_cmd_capture(): - assert run_cmd('echo', 'test', out=None) == (None, None, 0) - assert run_cmd('echo', 'test', out=shell_tools.TeeCapture()) == ('test\n', None, 0) - assert run_cmd('echo', 'test', out=None, err=shell_tools.TeeCapture()) == (None, '', 0) - - -@only_on_posix -def test_run_shell_capture(): - assert run_shell('echo test 1>&2', err=None) == (None, None, 0) - assert run_shell('echo test 1>&2', err=shell_tools.TeeCapture()) == (None, 'test\n', 0) - assert run_shell('echo test 1>&2', err=None, out=shell_tools.TeeCapture()) == ('', None, 0) - - -@only_on_posix -def test_run_shell_does_not_deadlock_on_large_outputs(): - assert run_shell( - r"""python3 -c "import sys;""" - r"""print((('o' * 99) + '\n') * 10000);""" - r"""print((('e' * 99) + '\n') * 10000, file=sys.stderr)""" - '"', - out=None, - err=None, - ) == (None, None, 0) - - @only_on_posix def test_output_of(): assert shell_tools.output_of('true') == ''