From 8fe315eee42829b4fe7dc8cad17346e7a9fd7d9c Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 19 Jan 2022 10:34:45 +0100 Subject: [PATCH] add option to acquire a process monitor for a command running on a host (#571) * add option to acquire a process monitor for a command running on a host * + test, fix * Update goth/runner/process.py Co-authored-by: Kuba Mazurek * update with @zakaprov's remarks * pytest fixture Co-authored-by: Kuba Mazurek --- goth/runner/probe/__init__.py | 9 ++- goth/runner/process.py | 19 +++++- .../runner/probe/test_run_command_on_host.py | 58 ++++++++++++------- 3 files changed, 61 insertions(+), 25 deletions(-) diff --git a/goth/runner/probe/__init__.py b/goth/runner/probe/__init__.py index e754eb2d..5fda9740 100644 --- a/goth/runner/probe/__init__.py +++ b/goth/runner/probe/__init__.py @@ -351,7 +351,9 @@ async def run_command_on_host( command: str, env: Optional[Mapping[str, str]] = None, command_timeout: float = 300, - ) -> AsyncIterator[Tuple[asyncio.Task, PatternMatchingEventMonitor]]: + ) -> AsyncIterator[ + Tuple[asyncio.Task, PatternMatchingEventMonitor, process.ProcessMonitor], + ]: """Run `command` on host in given `env` and with optional `timeout`. The command is run in the environment extending `env` with variables needed @@ -375,6 +377,8 @@ async def run_command_on_host( ) cmd_monitor.start() + process_monitor = process.ProcessMonitor() + try: with monitored_logger( f"goth.{self.name}.command_output", cmd_monitor @@ -387,9 +391,10 @@ async def run_command_on_host( log_level=logging.INFO, cmd_logger=cmd_logger, timeout=command_timeout, + process_monitor=process_monitor, ) ) - yield cmd_task, cmd_monitor + yield cmd_task, cmd_monitor, process_monitor await cmd_task logger.debug("Command task has finished") diff --git a/goth/runner/process.py b/goth/runner/process.py index 7256c692..e5b4a484 100644 --- a/goth/runner/process.py +++ b/goth/runner/process.py @@ -13,6 +13,18 @@ RUN_COMMAND_DEFAULT_TIMEOUT = 900 # seconds +class ProcessMonitor: + """Monitor enabling acquisition of the process object of a running command.""" + + _process: Optional[asyncio.subprocess.Process] = None + + async def get_process(self) -> asyncio.subprocess.Process: + """Wait for and return the `Process` object.""" + while not self._process: + await asyncio.sleep(0.1) + return self._process + + async def run_command( args: Sequence[str], env: Optional[dict] = None, @@ -20,6 +32,7 @@ async def run_command( cmd_logger: Optional[logging.Logger] = None, log_prefix: Optional[str] = None, timeout: float = RUN_COMMAND_DEFAULT_TIMEOUT, + process_monitor: Optional[ProcessMonitor] = None, ) -> None: """Run a command in a subprocess with timeout and logging. @@ -34,6 +47,8 @@ async def run_command( :param log_prefix: prefix for log lines with command output; ignored if `cmd_logger` is specified. Default: name of the command :param timeout: timeout for the command, in seconds. Default: 15 minutes + :param process_monitor: an optional `ProcessMonitor` to which the spawned process + will be reported, so that it can be communicated with from the calling code """ logger.info("Running local command: %s", " ".join(args)) @@ -45,11 +60,13 @@ async def run_command( log_prefix = f"[{args[0]}] " async def _run_command(): - proc = await asyncio.subprocess.create_subprocess_exec( *args, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) + if process_monitor: + process_monitor._process = proc + while not proc.stdout.at_eof(): line = await proc.stdout.readline() cmd_logger.log(log_level, "%s%s", log_prefix, line.decode("utf-8").rstrip()) diff --git a/test/unit/runner/probe/test_run_command_on_host.py b/test/unit/runner/probe/test_run_command_on_host.py index 5485d2aa..558477ce 100644 --- a/test/unit/runner/probe/test_run_command_on_host.py +++ b/test/unit/runner/probe/test_run_command_on_host.py @@ -1,4 +1,5 @@ """Tests for the method Probe.run_command_on_host.""" +import asyncio import os import pytest from unittest.mock import MagicMock @@ -7,16 +8,29 @@ import goth.runner.container.yagna from goth.runner.probe import RequestorProbe +CONTAINER_REST_PORT = 6006 -@pytest.mark.asyncio -async def test_run_command_on_host(monkeypatch): - """Test if the method `run_command_on_host` works as expected.""" + +async def env_lines_to_dict(lines): + """Convert the lines received from the `env` command into a dictionary.""" + # The monitor should guarantee that we don't skip any events + assert len(lines.past_events) == 0, lines.past_events + env = {} + async for line in lines: + tokens = line.split("=", 1) + if len(tokens) == 2: + env[tokens[0]] = tokens[1] + return env + + +@pytest.fixture +def mock_probe(monkeypatch): + """Get a mocked `RequestorProbe`.""" runner = MagicMock() docker_client = MagicMock() container_config = MagicMock(use_proxy=False) log_config = MagicMock() - container_rest_port = 6006 monkeypatch.setattr(goth.runner.probe, "YagnaContainer", MagicMock(spec="ports")) monkeypatch.setattr(goth.runner.probe, "Cli", MagicMock(spec="yagna")) @@ -28,37 +42,37 @@ async def test_run_command_on_host(monkeypatch): config=container_config, log_config=log_config, ) - probe.container.ports = {YAGNA_REST_PORT: container_rest_port} - - async def func(lines): - # The monitor should guarantee that we don't skip any events - assert len(lines.past_events) == 0, lines.past_events - env = {} - async for line in lines: - tokens = line.split("=", 1) - if len(tokens) == 2: - env[tokens[0]] = tokens[1] - return env - - async with probe.run_command_on_host( + probe.container.ports = {YAGNA_REST_PORT: CONTAINER_REST_PORT} + return probe + + +@pytest.mark.asyncio +async def test_run_command_on_host(mock_probe): + """Test if the method `run_command_on_host` works as expected.""" + + async with mock_probe.run_command_on_host( "/usr/bin/env", env=os.environ, - ) as (_task, monitor): - assertion = monitor.add_assertion(func) + ) as (_task, monitor, process_monitor): + assertion = monitor.add_assertion(env_lines_to_dict) + proc: asyncio.subprocess.Process = await process_monitor.get_process() + + assert await proc.wait() == 0 result = await assertion.wait_for_result(timeout=1) - assert result["YAGNA_APPKEY"] == probe.app_key + assert result["YAGNA_APPKEY"] == mock_probe.app_key assert result["YAGNA_API_URL"] == YAGNA_REST_URL.substitute( - host="127.0.0.1", port=container_rest_port + host="127.0.0.1", port=CONTAINER_REST_PORT ) assert result["GSB_URL"] == YAGNA_BUS_URL.substitute(host=None) # Let's make sure that another command can be run without problems # (see https://github.com/golemfactory/goth/issues/484). - async with probe.run_command_on_host("/bin/echo eChO", env=os.environ) as ( + async with mock_probe.run_command_on_host("/bin/echo eChO", env=os.environ) as ( _task, monitor, + _process_monitor, ): await monitor.wait_for_pattern(".*eChO", timeout=10)