Skip to content

Commit

Permalink
Merge pull request #485 from golemfactory/az/fix-monitored-logger
Browse files Browse the repository at this point in the history
Fixes for monitored logger
  • Loading branch information
azawlocki authored Apr 14, 2021
2 parents 4c638e5 + a81f864 commit f3e2798
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 10 deletions.
2 changes: 1 addition & 1 deletion goth/runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ async def __call__(
if self._test_failure_callback:
self._test_failure_callback(err)
else:
logger.info("Runner stopped due to test failure")
raise

async def _enter(self) -> None:
self._exit_stack.enter_context(configure_logging_for_test(self.log_dir))
Expand Down
8 changes: 4 additions & 4 deletions goth/runner/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ def monitored_logger(name: str, monitor: EventMonitor[str]) -> Iterator[logging.
from the logger.
"""

logger_ = logging.getLogger(name)
logger_to_monitor = logging.getLogger(name)
filter = MonitoringFilter(monitor, "cyan")
logger_.filters.insert(0, filter)
logger_to_monitor.filters.insert(0, filter)
try:
yield logger_
yield logger_to_monitor
finally:
logger.removeFilter(filter)
logger_to_monitor.removeFilter(filter)
2 changes: 1 addition & 1 deletion goth/runner/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ def start(self, in_stream: Iterator[bytes]):

async def stop(self) -> None:
"""Stop the monitor."""
await super().stop()
if self._buffer_task:
self._buffer_task.stop(StopThreadException)
await super().stop()

def update_stream(self, in_stream: Iterator[bytes]):
"""Update the stream when restarting a container."""
Expand Down
10 changes: 7 additions & 3 deletions goth/runner/probe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
YagnaContainerConfig,
PAYMENT_MOUNT_PATH,
)
from goth.runner.exceptions import KeyAlreadyExistsError
from goth.runner.exceptions import KeyAlreadyExistsError, TemporalAssertionError
from goth.runner.log import LogConfig, monitored_logger
from goth.runner.log_monitor import PatternMatchingEventMonitor
from goth.runner.probe.agent import AgentComponent, ProviderAgentComponent
Expand Down Expand Up @@ -313,6 +313,7 @@ async def run_command_on_host(
with monitored_logger(
f"goth.{self.name}.command_output", cmd_monitor
) as cmd_logger:

cmd_task = asyncio.create_task(
process.run_command(
command.split(),
Expand All @@ -324,6 +325,9 @@ async def run_command_on_host(
)
yield cmd_task, cmd_monitor

await cmd_task
logger.debug("Command task has finished")

except Exception as e:
logger.warning(f"Cancelling command on error: {e!r}")
if cmd_task and not cmd_task.done():
Expand All @@ -332,8 +336,8 @@ async def run_command_on_host(

finally:
await cmd_monitor.stop()
logger.debug("Waiting for the command to finish")
await asyncio.gather(cmd_task, return_exceptions=True)
for assertion in cmd_monitor.failed:
raise TemporalAssertionError(assertion.name)


@contextlib.contextmanager
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ exclude= '/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|venv|\.svn|_build|

[tool.poetry]
name = "goth"
version = "0.2.0"
version = "0.2.1"
description = "Golem Test Harness - integration testing framework"
authors = ["Golem Factory <[email protected]>"]
license = "GPL-3.0"
Expand Down
63 changes: 63 additions & 0 deletions test/goth/runner/probe/test_run_command_on_host.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Tests for the method Probe.run_command_on_host."""
import os
import pytest
from unittest.mock import MagicMock

from goth.address import YAGNA_BUS_URL, YAGNA_REST_URL
import goth.runner.container.yagna
from goth.runner.probe import RequestorProbe


@pytest.mark.asyncio
async def test_run_command_on_host(monkeypatch):
"""Test if the method `run_command_on_host` works as expected."""

runner = MagicMock()
docker_client = MagicMock()
container_config = MagicMock()
log_config = MagicMock()

monkeypatch.setattr(goth.runner.probe, "YagnaContainer", MagicMock(spec="ports"))
monkeypatch.setattr(goth.runner.probe, "Cli", MagicMock(spec="yagna"))
monkeypatch.setattr(
goth.runner.probe, "get_container_address", lambda *_args: "1.2.3.4"
)
monkeypatch.setattr(RequestorProbe, "app_key", "0xcafebabe")

probe = RequestorProbe(
runner=runner,
client=docker_client,
config=container_config,
log_config=log_config,
)

async def func(lines):
# The monitor should guarantee that we don't skip any events
assert len(lines.past_events) == 0, lines.past_events
env = {}
async for line in lines:
tokens = line.split("=", 1)
if len(tokens) == 2:
env[tokens[0]] = tokens[1]
return env

async with probe.run_command_on_host(
"/usr/bin/env",
env=os.environ,
) as (_task, monitor):
assertion = monitor.add_assertion(func)

result = await assertion.wait_for_result(timeout=1)

assert result["YAGNA_APPKEY"] == probe.app_key
assert result["YAGNA_API_URL"] == YAGNA_REST_URL.substitute(host=None)
assert result["GSB_URL"] == YAGNA_BUS_URL.substitute(host=None)

# Let's make sure that another command can be run without problems
# (see https://github.com/golemfactory/goth/issues/484).
async with probe.run_command_on_host("/bin/echo eChO", env=os.environ) as (
_task,
monitor,
):

monitor.wait_for_pattern(".*eChO", timeout=10)

0 comments on commit f3e2798

Please sign in to comment.