Skip to content

Commit

Permalink
add option to acquire a process monitor for a command running on a ho…
Browse files Browse the repository at this point in the history
…st (#571)

* add option to acquire a process monitor for a command running on a host

* + test, fix

* Update goth/runner/process.py

Co-authored-by: Kuba Mazurek <[email protected]>

* update with @zakaprov's remarks

* pytest fixture

Co-authored-by: Kuba Mazurek <[email protected]>
  • Loading branch information
shadeofblue and kmazurek authored Jan 19, 2022
1 parent 0209dca commit 8fe315e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 25 deletions.
9 changes: 7 additions & 2 deletions goth/runner/probe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ async def run_command_on_host(
command: str,
env: Optional[Mapping[str, str]] = None,
command_timeout: float = 300,
) -> AsyncIterator[Tuple[asyncio.Task, PatternMatchingEventMonitor]]:
) -> AsyncIterator[
Tuple[asyncio.Task, PatternMatchingEventMonitor, process.ProcessMonitor],
]:
"""Run `command` on host in given `env` and with optional `timeout`.
The command is run in the environment extending `env` with variables needed
Expand All @@ -375,6 +377,8 @@ async def run_command_on_host(
)
cmd_monitor.start()

process_monitor = process.ProcessMonitor()

try:
with monitored_logger(
f"goth.{self.name}.command_output", cmd_monitor
Expand All @@ -387,9 +391,10 @@ async def run_command_on_host(
log_level=logging.INFO,
cmd_logger=cmd_logger,
timeout=command_timeout,
process_monitor=process_monitor,
)
)
yield cmd_task, cmd_monitor
yield cmd_task, cmd_monitor, process_monitor

await cmd_task
logger.debug("Command task has finished")
Expand Down
19 changes: 18 additions & 1 deletion goth/runner/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,26 @@
RUN_COMMAND_DEFAULT_TIMEOUT = 900 # seconds


class ProcessMonitor:
"""Monitor enabling acquisition of the process object of a running command."""

_process: Optional[asyncio.subprocess.Process] = None

async def get_process(self) -> asyncio.subprocess.Process:
"""Wait for and return the `Process` object."""
while not self._process:
await asyncio.sleep(0.1)
return self._process


async def run_command(
args: Sequence[str],
env: Optional[dict] = None,
log_level: Optional[int] = logging.DEBUG,
cmd_logger: Optional[logging.Logger] = None,
log_prefix: Optional[str] = None,
timeout: float = RUN_COMMAND_DEFAULT_TIMEOUT,
process_monitor: Optional[ProcessMonitor] = None,
) -> None:
"""Run a command in a subprocess with timeout and logging.
Expand All @@ -34,6 +47,8 @@ async def run_command(
:param log_prefix: prefix for log lines with command output; ignored if `cmd_logger`
is specified. Default: name of the command
:param timeout: timeout for the command, in seconds. Default: 15 minutes
:param process_monitor: an optional `ProcessMonitor` to which the spawned process
will be reported, so that it can be communicated with from the calling code
"""
logger.info("Running local command: %s", " ".join(args))

Expand All @@ -45,11 +60,13 @@ async def run_command(
log_prefix = f"[{args[0]}] "

async def _run_command():

proc = await asyncio.subprocess.create_subprocess_exec(
*args, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)

if process_monitor:
process_monitor._process = proc

while not proc.stdout.at_eof():
line = await proc.stdout.readline()
cmd_logger.log(log_level, "%s%s", log_prefix, line.decode("utf-8").rstrip())
Expand Down
58 changes: 36 additions & 22 deletions test/unit/runner/probe/test_run_command_on_host.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tests for the method Probe.run_command_on_host."""
import asyncio
import os
import pytest
from unittest.mock import MagicMock
Expand All @@ -7,16 +8,29 @@
import goth.runner.container.yagna
from goth.runner.probe import RequestorProbe

CONTAINER_REST_PORT = 6006

@pytest.mark.asyncio
async def test_run_command_on_host(monkeypatch):
"""Test if the method `run_command_on_host` works as expected."""

async def env_lines_to_dict(lines):
"""Convert the lines received from the `env` command into a dictionary."""
# The monitor should guarantee that we don't skip any events
assert len(lines.past_events) == 0, lines.past_events
env = {}
async for line in lines:
tokens = line.split("=", 1)
if len(tokens) == 2:
env[tokens[0]] = tokens[1]
return env


@pytest.fixture
def mock_probe(monkeypatch):
"""Get a mocked `RequestorProbe`."""

runner = MagicMock()
docker_client = MagicMock()
container_config = MagicMock(use_proxy=False)
log_config = MagicMock()
container_rest_port = 6006

monkeypatch.setattr(goth.runner.probe, "YagnaContainer", MagicMock(spec="ports"))
monkeypatch.setattr(goth.runner.probe, "Cli", MagicMock(spec="yagna"))
Expand All @@ -28,37 +42,37 @@ async def test_run_command_on_host(monkeypatch):
config=container_config,
log_config=log_config,
)
probe.container.ports = {YAGNA_REST_PORT: container_rest_port}

async def func(lines):
# The monitor should guarantee that we don't skip any events
assert len(lines.past_events) == 0, lines.past_events
env = {}
async for line in lines:
tokens = line.split("=", 1)
if len(tokens) == 2:
env[tokens[0]] = tokens[1]
return env

async with probe.run_command_on_host(
probe.container.ports = {YAGNA_REST_PORT: CONTAINER_REST_PORT}
return probe


@pytest.mark.asyncio
async def test_run_command_on_host(mock_probe):
"""Test if the method `run_command_on_host` works as expected."""

async with mock_probe.run_command_on_host(
"/usr/bin/env",
env=os.environ,
) as (_task, monitor):
assertion = monitor.add_assertion(func)
) as (_task, monitor, process_monitor):
assertion = monitor.add_assertion(env_lines_to_dict)
proc: asyncio.subprocess.Process = await process_monitor.get_process()

assert await proc.wait() == 0

result = await assertion.wait_for_result(timeout=1)

assert result["YAGNA_APPKEY"] == probe.app_key
assert result["YAGNA_APPKEY"] == mock_probe.app_key
assert result["YAGNA_API_URL"] == YAGNA_REST_URL.substitute(
host="127.0.0.1", port=container_rest_port
host="127.0.0.1", port=CONTAINER_REST_PORT
)
assert result["GSB_URL"] == YAGNA_BUS_URL.substitute(host=None)

# Let's make sure that another command can be run without problems
# (see https://github.com/golemfactory/goth/issues/484).
async with probe.run_command_on_host("/bin/echo eChO", env=os.environ) as (
async with mock_probe.run_command_on_host("/bin/echo eChO", env=os.environ) as (
_task,
monitor,
_process_monitor,
):

await monitor.wait_for_pattern(".*eChO", timeout=10)

0 comments on commit 8fe315e

Please sign in to comment.