diff --git a/goth/api_monitor/monitor_addon.py b/goth/api_monitor/monitor_addon.py index 095e81fd5..e80bb5461 100644 --- a/goth/api_monitor/monitor_addon.py +++ b/goth/api_monitor/monitor_addon.py @@ -37,7 +37,7 @@ def _register_event(self, event: APIEvent) -> None: """Log an API event and add it to the monitor.""" self._logger.debug("%s", event) - self._monitor.add_event(event) + self._monitor.add_event_sync(event) def request(self, flow: HTTPFlow) -> None: """Register a request.""" diff --git a/goth/assertions/assertions.py b/goth/assertions/assertions.py index 1181bd7a1..f95e224b3 100644 --- a/goth/assertions/assertions.py +++ b/goth/assertions/assertions.py @@ -44,9 +44,8 @@ class EventStream(Protocol, AsyncIterable[E]): class Assertion(AsyncIterable[E]): """A class for executing assertion coroutines. - An instance of this class wraps a coroutine function (called the - "assertion coroutine") and provides an asynchronous generator of - events that the assertion coroutine processes. + An instance of this class wraps a coroutine (called the "assertion coroutine") + and provides an asynchronous generator of events that this coroutine processes. After creating an instance of this class, its client should await the `update_events()` method each time a new event is appended to the list of events (the list is passed as an argument to `Assertion()`). @@ -98,13 +97,20 @@ def start(self) -> asyncio.Task: if self.started: raise RuntimeError("Assertion already started") - def on_done(*args) -> None: - """Notify the tasks waiting until this assertion updates.""" - self._notify_update_events() + async def func_wrapper(): + """Ensure `_notify_update_events` is called after processing each event. + + See also comments in `_create_generator()`. + """ + try: + return await self._func(self) + except asyncio.CancelledError: + raise AssertionError("Assertion cancelled") + finally: + self._notify_update_events() assert self._func is not None - self._task = asyncio.create_task(self._func(self)) - self._task.add_done_callback(on_done) + self._task = asyncio.create_task(func_wrapper()) self._ready = asyncio.Event() self._processed = asyncio.Event() return self._task @@ -126,27 +132,54 @@ def done(self) -> bool: @property def accepted(self) -> bool: """Return `True` iff this assertion finished execution successfuly.""" - return self.started and self._task.done() and self._task.exception() is None + return ( + self._task is not None + and self._task.done() + and self._task.exception() is None + ) @property def failed(self) -> bool: """Return `True` iff this assertion finished execution by failing.""" - return self.started and self._task.done() and self._task.exception() is not None + return ( + self._task is not None + and self._task.done() + and self._task.exception() is not None + ) - async def result(self) -> Any: + def result(self) -> Any: """Return the result of this assertion. - If the assertion succeeded its result will be returned. - If it failed, the exception will be raised. - If the assertion hasn't finished yet, `None` will be returned. - If it hasn't been started, `asyncio.InvalidStateError` will be raised. + The semantics is similar to that of the `result()` method of `asyncio.Task` + (https://docs.python.org/3/library/asyncio-task.html#task-object): + If the assertion is done, the result of the assertion coroutine is returned + or the exception raised by the coroutine is re-raised. + If the assertion is not done (in particular, if hasn't been started) then + this method raises `asyncio.InvalidStateError`. """ - if not self.started: + if not self._task: raise asyncio.InvalidStateError("Assertion not started") - if self._task.done(): + return self._task.result() + + async def wait_for_result(self, timeout: Optional[float] = None) -> Any: + """Wait until this assertion's result becomes available and return it. + + Optional `timeout` is in seconds, `None` means to wait indefinitely + (this is the default). + """ + if not self._task: + raise asyncio.InvalidStateError("Assertion not started") + + if timeout is None: return await self._task - return None + + try: + return await asyncio.wait_for(self._task, timeout=timeout) + finally: + # This is to retrieve exception from `self._task` so no unretrieved + # exceptions are reported when the event loop closes. + _ = self._task.exception() async def update_events(self, events_ended: bool = False) -> None: """Notify the assertion that a new event has been added.""" @@ -183,6 +216,8 @@ def __aiter__(self) -> AsyncIterator[E]: def _notify_update_events(self) -> None: """Notify tasks waiting in `update_events()` that the update is processed.""" + if not self._ready or not self._processed: + raise asyncio.InvalidStateError("Assertion not started") self._ready.clear() self._processed.set() @@ -209,5 +244,5 @@ async def _create_generator(self) -> AsyncIterator[E]: # # In case the control does not return (since the events end or # the assertion coroutine raises an exception), `_notify_update_events()` - # is called in the done callback for the coroutine task. + # must be called after returning from the assertion coroutine. self._notify_update_events() diff --git a/goth/assertions/monitor.py b/goth/assertions/monitor.py index 2c6c51475..5de6022c7 100644 --- a/goth/assertions/monitor.py +++ b/goth/assertions/monitor.py @@ -8,7 +8,7 @@ import importlib import logging import sys -from typing import Generic, List, Optional, Sequence +from typing import Callable, Generic, List, Optional, Sequence, Union from goth.assertions import Assertion, AssertionFunction, E @@ -55,13 +55,23 @@ class EventMonitor(Generic[E]): name: Optional[str] """The name of this monitor, for use in logging.""" + _event_loop: asyncio.AbstractEventLoop + """The event loop in which this monitor has been started.""" + _events: List[E] """List of events registered so far.""" - _incoming: "Optional[asyncio.Queue[Optional[E]]]" + _incoming: "asyncio.Queue[Optional[E]]" """A queue used to pass the events to the worker task.""" - _logger: logging.Logger + _last_checked_event: int + """The index of the last event examined by `wait_for_event()` method. + + Subsequent calls to `wait_for_event` will only look at events that occurred + after this event. + """ + + _logger: Union[logging.Logger, MonitorLoggerAdapter] """A logger instance for this monitor.""" _worker_task: Optional[asyncio.Task] @@ -75,32 +85,33 @@ def __init__( ) -> None: self.assertions = OrderedDict() self.name = name + + self._event_loop = asyncio.get_event_loop() self._events = [] - # Delay creating the queue to make sure it's created in the event loop - # used by the worker task. - self._incoming = None - self._worker_task = None + self._incoming = asyncio.Queue() + self._last_checked_event = -1 self._logger = logger or logging.getLogger(__name__) if self.name: self._logger = MonitorLoggerAdapter( self._logger, {MonitorLoggerAdapter.EXTRA_MONITOR_NAME: self.name} ) self._stop_callback = on_stop + self._worker_task = None def add_assertion( self, assertion_func: AssertionFunction[E], log_level: LogLevel = logging.INFO ) -> Assertion: """Add an assertion function to this monitor.""" - result = Assertion(self._events, assertion_func) - self.assertions[result] = log_level - return result + assertion = Assertion(self._events, assertion_func) + assertion.start() + self._logger.debug("Assertion '%s' started", assertion.name) + self.assertions[assertion] = log_level + return assertion def add_assertions(self, assertion_funcs: List[AssertionFunction[E]]) -> None: """Add a list of assertion functions to this monitor.""" - # Create assertions here but don't start them yet, to make sure - # they're started in the same event loop in which they'll be running. for func in assertion_funcs: self.add_assertion(func) @@ -123,29 +134,46 @@ def load_assertions(self, module_name: str) -> None: mod_logger.addHandler(handler) def start(self) -> None: - """Start tracing events.""" + """Start tracing events. + + Starting a monitor is decoupled from its initialisation. This allows the + user to add assertions to the monitor before starting to register events. + Such assertions are thus guaranteed not to "miss" any event registered + by the monitor. + """ if self.is_running(): self._logger.warning("Monitor already started") return - self._incoming = asyncio.Queue() - # Don't use `asyncio.create_task()` here, as it'll fail if the current - # event loop is not running yet. `asyncio.ensure_future()` will create - # a task which will be run when the loop is started. - future = asyncio.ensure_future(self._run_worker()) - assert isinstance(future, asyncio.Task) - self._worker_task = future + self._worker_task = self._event_loop.create_task(self._run_worker()) self._logger.debug("Monitor started") - def add_event(self, event: E) -> None: + async def add_event(self, event: E) -> None: """Register a new event.""" + # Note: this method is `async` even though it does not perform any `await`. + # This is to ensure that it's directly callable only from code running in + # an event loop, which in turn guarantees that tasks waiting for input from + # `self._incoming` will be notified. + if not self.is_running(): raise RuntimeError(f"Monitor {self.name or ''} is not running") self._incoming.put_nowait(event) + def add_event_sync(self, event: E) -> None: + """Schedule registering a new event. + + This function can be called from a thread different from the one + that started this monitor. + """ + + if not self.is_running(): + raise RuntimeError(f"Monitor {self.name or ''} is not running") + + self._event_loop.call_soon_threadsafe(self._incoming.put_nowait, event) + async def stop(self) -> None: """Stop tracing events.""" @@ -162,6 +190,7 @@ async def stop(self) -> None: # will return `False` to prevent adding more events. worker = self._worker_task self._worker_task = None + assert worker await worker self._logger.debug("Monitor stopped") @@ -183,8 +212,9 @@ def is_running(self) -> bool: return False if self._worker_task.done(): - if self._worker_task.exception(): - raise self._worker_task.exception() + exc = self._worker_task.exception() + if exc: + raise exc return False return True @@ -200,7 +230,6 @@ async def _run_worker(self) -> None: events_ended = False while not events_ended: - event = await self._incoming.get() if event is not None: self._events.append(event) @@ -220,40 +249,33 @@ async def _check_assertions(self, events_ended: bool) -> None: for a, level in self.assertions.items(): - # As new assertions may be added on the fly, we need to make sure - # that this one has been started already. - if not a.started: - a.start() - self._logger.debug("Assertion '%s' started", a.name) - if a.done: continue await a.update_events(events_ended=events_ended) if a.accepted: - result = await a.result() + result = a.result() msg = "Assertion '%s' succeeded after event: %s; result: %s" self._logger.log(level, msg, a.name, event_descr, result) elif a.failed: await self._report_failure(a, event_descr) - # Ensure other tasks can also run between assertions - await asyncio.sleep(0) - async def _report_failure(self, a: Assertion, event_descr: str) -> None: try: - await a.result() - except Exception: - exc_type, exc, tb = sys.exc_info() - # Drop top 2 frames from the traceback: the current one - # and the the one for `a.result()`, so that only the frames - # of the assertion functions are left. - tb = tb.tb_next.tb_next + a.result() + except Exception as exc: + _exc_type, _exc, tb = sys.exc_info() + # Drop top 3 frames from the traceback: the current one, + # the one for `a.result()` and the one for the `func_wrapper` + # used in `__init__()`, so that only the frames of the assertion + # functions are left. + for _ in (1, 2, 3): + tb = tb.tb_next if tb else tb msg = "Assertion '%s' failed after event: %s; cause: %s" self._logger.error( - msg, a.name, event_descr, exc, exc_info=(exc_type, exc, tb) + msg, a.name, event_descr, exc, exc_info=(type(exc), exc, tb) ) @property @@ -279,3 +301,38 @@ def finished(self) -> bool: """Return True iif all assertions are done.""" return all(a.done for a in self.assertions) + + async def wait_for_event( + self, predicate: Callable[[E], bool], timeout: Optional[float] = None + ) -> E: + """Wait for an event that satisfies given `predicate`. + + The first call to this method will examine all events gathered since + this monitor was started and then, if needed, will wait for up to `timeout` + seconds for a matching event. + + Subsequent calls will examine all events gathered since the previous call + returned and then wait for up to `timeout` seconds. + + When `timeout` elapses, `asyncio.TimeourError` will be raised. + """ + + # First examine log lines already seen + while self._last_checked_event + 1 < len(self._events): + self._last_checked_event += 1 + event = self._events[self._last_checked_event] + if predicate(event): + return event + + # Otherwise create an assertion that waits for a matching event... + async def wait_for_match(stream) -> E: + async for e in stream: + self._last_checked_event = len(stream.past_events) - 1 + if predicate(e): + return e + raise AssertionError("No matching event occurred") + + assertion = self.add_assertion(wait_for_match, logging.DEBUG) + + # ... and wait until the assertion completes + return await assertion.wait_for_result(timeout=timeout) diff --git a/goth/default-assets/docker/docker-compose.yml b/goth/default-assets/docker/docker-compose.yml index 9c8ef5fa4..d8378fcc0 100644 --- a/goth/default-assets/docker/docker-compose.yml +++ b/goth/default-assets/docker/docker-compose.yml @@ -21,7 +21,7 @@ services: - "8545:8545" zksync: - image: docker.pkg.github.com/golemfactory/yagna-zksync/yagna-zksync-mock:7a25ed913d4a + image: docker.pkg.github.com/golemfactory/yagna-zksync/yagna-zksync-mock:f6d0cf02f6bc ports: - "3030:3030" environment: diff --git a/goth/node.py b/goth/node.py index 67638a2a3..7f8a81a02 100644 --- a/goth/node.py +++ b/goth/node.py @@ -29,7 +29,7 @@ def node_environment( "RUST_LOG": "debug,tokio_core=info,tokio_reactor=info,hyper=info", "YA_PAYMENT_NETWORK": "rinkeby", "YAGNA_API_URL": YAGNA_REST_URL.substitute(host="0.0.0.0"), - "ZKSYNC_FAUCET_ADDR": "http://zksync:3030/donate", + "ZKSYNC_FAUCET_ADDR": "http://zksync:3030/zk/donatex", "ZKSYNC_RINKEBY_RPC_ADDRESS": "http://zksync:3030", # left for compatibility with yagna prior to commit 800efe13 "ZKSYNC_RPC_ADDRESS": "http://zksync:3030", diff --git a/goth/runner/__init__.py b/goth/runner/__init__.py index a82eada05..a7e110ee1 100644 --- a/goth/runner/__init__.py +++ b/goth/runner/__init__.py @@ -200,7 +200,7 @@ async def _start_nodes(self): ports=ports, assertions_module=self.api_assertions_module, ) - self._exit_stack.enter_context(run_proxy(self.proxy)) + await self._exit_stack.enter_async_context(run_proxy(self.proxy)) # Collect all agent enabled probes and start them in parallel awaitables = [] diff --git a/goth/runner/log.py b/goth/runner/log.py index 7861a7205..0a1f2dc2f 100644 --- a/goth/runner/log.py +++ b/goth/runner/log.py @@ -7,10 +7,11 @@ from pathlib import Path import tempfile import time -from typing import Optional, Union +from typing import Iterator, Optional, Union import goth import goth.api_monitor +from goth.assertions.monitor import EventMonitor DEFAULT_LOG_DIR = Path(tempfile.gettempdir()) / "goth-tests" @@ -145,3 +146,34 @@ def configure_logging_for_test(test_log_dir: Path) -> None: goth_logger.handlers.remove(runner_handler) if proxy_handler in api_monitor_logger.handlers: api_monitor_logger.handlers.remove(proxy_handler) + + +class MonitorHandler(logging.Handler): + """A logging handler that passes messages from log records to an event monitor.""" + + def __init__(self, monitor: EventMonitor[str]): + self._monitor = monitor + super().__init__() + + def handle(self, record: logging.LogRecord) -> None: + """Add the `record`'s message to the associated event monitor.""" + self._monitor.add_event_sync(record.getMessage()) + + +@contextlib.contextmanager +def monitored_logger(name: str, monitor: EventMonitor[str]) -> Iterator[logging.Logger]: + """Get logger identified by `name` and attach the given event monitor to it. + + The monitor will receive all messages emitted by the logger as events. + Upon exiting from this context manager, the monitor will be detached + from the logger. + """ + + logger_ = logging.getLogger(name) + handler = MonitorHandler(monitor) + try: + logger_.addHandler(handler) + yield logger_ + finally: + if handler in logger_.handlers: + logger_.removeHandler(handler) diff --git a/goth/runner/log_monitor.py b/goth/runner/log_monitor.py index f9038cc13..1d2309ff4 100644 --- a/goth/runner/log_monitor.py +++ b/goth/runner/log_monitor.py @@ -10,8 +10,7 @@ from func_timeout.StoppableThread import StoppableThread -from goth.assertions.monitor import EventMonitor -from goth.assertions.operators import eventually +from goth.assertions.monitor import E, EventMonitor from goth.runner.exceptions import StopThreadException from goth.runner.log import LogConfig @@ -108,37 +107,63 @@ def _create_file_logger(config: LogConfig) -> logging.Logger: delay=True, ) handler.setFormatter(config.formatter) - logger_ = logging.getLogger(str(config.file_name)) + logger_name = f"{config.base_dir}.{config.file_name}" + logger_ = logging.getLogger(logger_name) logger_.setLevel(config.level) logger_.addHandler(handler) logger_.propagate = False return logger_ -class LogEventMonitor(EventMonitor[LogEvent]): +class PatternMatchingEventMonitor(EventMonitor[E]): + """An `EventMonitor` that can wait for events that match regex patterns.""" + + def event_str(self, event: E) -> str: + """Return the string associated with `event` on which to perform matching.""" + return str(event) + + async def wait_for_pattern( + self, pattern: str, timeout: Optional[float] = None + ) -> E: + """Wait for an event with string representation matching `pattern`. + + The semantics for this method is as for + `EventMonitor.wait_for_event(predicate, timeout)`, with `predicate(e)` + being true iff `event_str(e)` matches `pattern`, for any event `e`. + """ + + regex = re.compile(pattern) + event = await self.wait_for_event( + lambda e: regex.match(self.event_str(e)) is not None, timeout + ) + return event + + +class LogEventMonitor(PatternMatchingEventMonitor[LogEvent]): """Log buffer supporting logging to a file and waiting for a line pattern match. `log_config` parameter holds the configuration of the file logger. Consecutive values are interpreted as lines by splitting them on the new line character. - Internally it uses an asyncio task to read the stream and add lines to the buffer. + Internally it uses a thread to read the stream and add lines to the buffer. """ _buffer_task: Optional[StoppableThread] _file_logger: logging.Logger _in_stream: Iterator[bytes] - _last_checked_line: int - """The index of the last line examined while waiting for log messages. - Subsequent calls to `wait_for_agent_log()` will only look at lines that - were logged after this line. - """ - - def __init__(self, name: str, log_config: LogConfig): + def __init__(self, name: str, log_config: Optional[LogConfig] = None): super().__init__(name) - self._file_logger = _create_file_logger(log_config) + if log_config: + self._file_logger = _create_file_logger(log_config) + else: + self._file_logger = logging.getLogger(name) self._buffer_task = None - self._last_checked_line = -1 + self._loop = asyncio.get_event_loop() + + def event_str(self, event: LogEvent) -> str: + """Return the string associated with `event` on which to perform matching.""" + return event.message @property def events(self) -> Sequence[LogEvent]: @@ -173,56 +198,28 @@ def _buffer_input(self): self._file_logger.info(line) event = LogEvent(line) - self.add_event(event) + self.add_event_sync(event) + except StopThreadException: return - async def wait_for_entry(self, pattern: str, timeout: float = 1000) -> LogEvent: + async def wait_for_entry( + self, pattern: str, timeout: Optional[float] = None + ) -> LogEvent: """Search log for a log entry with the message matching `pattern`. The first call to this method will examine all log entries gathered since this monitor was started and then, if needed, will wait for - up to `timeout` seconds for a matching entry. + up to `timeout` seconds (or indefinitely, if `timeout` is `None`) + for a matching entry. Subsequent calls will examine all log entries gathered since the previous call returned and then wait for up to `timeout` seconds. """ - regex = re.compile(pattern) - - def predicate(log_event) -> bool: - return regex.match(log_event.message) is not None - - # First examine log lines already seen - while self._last_checked_line + 1 < len(self.events): - self._last_checked_line += 1 - event = self.events[self._last_checked_line] - if predicate(event): - logger.debug( - "Found match in past log lines. pattern=%s, match=%s", - pattern, - event.message, - ) - return event - - # Otherwise create an assertion that waits for a matching line... - async def wait_for_matching_line(stream) -> LogEvent: - try: - log_event = await eventually(stream, predicate, timeout=timeout) - return log_event - finally: - self._last_checked_line = len(stream.past_events) - 1 - - assertion = self.add_assertion(wait_for_matching_line, logging.DEBUG) - - # ... and wait until the assertion completes - while not assertion.done: - await asyncio.sleep(0.1) - - result: LogEvent = await assertion.result() - if result: - logger.debug( - "Log assertion completed with a match. pattern=%s, match=%s", - pattern, - result.message, - ) - return result + event = await self.wait_for_pattern(pattern, timeout) + logger.debug( + "Log assertion completed with a match. pattern=%s, match=%s", + pattern, + event.message, + ) + return event diff --git a/goth/runner/probe.py b/goth/runner/probe.py index 5e38e3740..33006457d 100644 --- a/goth/runner/probe.py +++ b/goth/runner/probe.py @@ -6,7 +6,14 @@ import copy import logging from pathlib import Path -from typing import AsyncIterator, Dict, Iterator, Optional, TYPE_CHECKING +from typing import ( + AsyncIterator, + Dict, + Iterator, + Optional, + Tuple, + TYPE_CHECKING, +) from docker import DockerClient @@ -28,8 +35,10 @@ PAYMENT_MOUNT_PATH, ) from goth.runner.exceptions import KeyAlreadyExistsError -from goth.runner.log import LogConfig -from goth.runner.process import run_command +from goth.runner.log import LogConfig, monitored_logger +from goth.runner.log_monitor import PatternMatchingEventMonitor +from goth.runner import process + if TYPE_CHECKING: from goth.runner import Runner @@ -244,30 +253,59 @@ def set_agent_env_vars(self, env: Dict[str, str]) -> None: } ) - def run_command_on_host( + @contextlib.asynccontextmanager + async def run_command_on_host( self, command: str, env: Optional[Dict[str, str]] = None, - timeout: int = 300, - ) -> asyncio.Task: + command_timeout: float = 300, + ) -> Iterator[Tuple[asyncio.Task, PatternMatchingEventMonitor]]: """Run `command` on host in given `env` and with optional `timeout`. - The command is run the environment extending `env` with variables needed + The command is run in the environment extending `env` with variables needed to communicate with the daemon running in this probe's container. - Returns the `asyncio` task that logs output from the command. The task - can be awaited in order to wait until the command completes. - """ + Internally, this method uses `process.run_command()` to run `command`. + The argument `command_timeout` is passed as the `timeout` parameter to + `process.run_command()`. + Returns the `asyncio` task that logs output from the command, and an event + monitor that observes lines of output produced by the command. + + The task can be awaited in order to wait until the command completes. + The monitor can be used for asserting properties of the command's output. + """ cmd_env = {**env} if env is not None else {} self.set_agent_env_vars(cmd_env) - task = asyncio.create_task( - run_command( - command.split(), cmd_env, log_level=logging.INFO, timeout=timeout - ) - ) - return task + cmd_monitor = PatternMatchingEventMonitor(name="command output") + cmd_monitor.start() + + try: + with monitored_logger( + f"goth.{self.name}.command_output", cmd_monitor + ) as cmd_logger: + cmd_task = asyncio.create_task( + process.run_command( + command.split(), + cmd_env, + log_level=logging.INFO, + cmd_logger=cmd_logger, + timeout=command_timeout, + ) + ) + yield cmd_task, cmd_monitor + + except Exception as e: + logger.warning(f"Cancelling command on error: {e!r}") + if cmd_task and not cmd_task.done(): + cmd_task.cancel() + raise + + finally: + await cmd_monitor.stop() + logger.debug("Waiting for the command to finish") + await asyncio.gather(cmd_task, return_exceptions=True) @contextlib.contextmanager diff --git a/goth/runner/process.py b/goth/runner/process.py index 2acf7d0db..7256c6921 100644 --- a/goth/runner/process.py +++ b/goth/runner/process.py @@ -17,8 +17,9 @@ async def run_command( args: Sequence[str], env: Optional[dict] = None, log_level: Optional[int] = logging.DEBUG, + cmd_logger: Optional[logging.Logger] = None, log_prefix: Optional[str] = None, - timeout: int = RUN_COMMAND_DEFAULT_TIMEOUT, + timeout: float = RUN_COMMAND_DEFAULT_TIMEOUT, ) -> None: """Run a command in a subprocess with timeout and logging. @@ -27,13 +28,21 @@ async def run_command( :param args: sequence consisting of the program to run along with its arguments :param env: dict with environment for the command - :param log_prefix: prefix for log lines emitted. Default: name of the command + :param log_level: logging level at which command output will be logged + :param cmd_logger: optional logger instance used to log output from the command; + if not set the default module logger will be used + :param log_prefix: prefix for log lines with command output; ignored if `cmd_logger` + is specified. Default: name of the command :param timeout: timeout for the command, in seconds. Default: 15 minutes """ logger.info("Running local command: %s", " ".join(args)) - if log_prefix is None: - log_prefix = f"[{args[0]}] " + if cmd_logger: + log_prefix = "" + else: + cmd_logger = logger + if log_prefix is None: + log_prefix = f"[{args[0]}] " async def _run_command(): @@ -43,7 +52,7 @@ async def _run_command(): while not proc.stdout.at_eof(): line = await proc.stdout.readline() - logger.log(log_level, "%s%s", log_prefix, line.decode("utf-8").rstrip()) + cmd_logger.log(log_level, "%s%s", log_prefix, line.decode("utf-8").rstrip()) return_code = await proc.wait() if return_code: diff --git a/goth/runner/proxy.py b/goth/runner/proxy.py index a4d140d94..a857d8350 100644 --- a/goth/runner/proxy.py +++ b/goth/runner/proxy.py @@ -3,7 +3,7 @@ import contextlib import logging import threading -from typing import Mapping, Optional +from typing import AsyncIterator, Mapping, Optional from mitmproxy import options import mitmproxy.utils.debug @@ -61,14 +61,15 @@ def _stop_callback(): def start(self): """Start the proxy thread.""" + self.monitor.start() self._proxy_thread.start() self._server_ready.wait() - def stop(self): + async def stop(self): """Start the proxy monitor and thread.""" if not self._loop: raise RuntimeError("Event loop is not set") - asyncio.run_coroutine_threadsafe(self.monitor.stop(), self._loop) + await self.monitor.stop() self._proxy_thread.join() def _run_mitmproxy(self): @@ -81,8 +82,6 @@ def _run_mitmproxy(self): self._loop.add_signal_handler = lambda *args_: None asyncio.set_event_loop(self._loop) - self.monitor.start() - self._logger.info("Starting embedded mitmproxy...") # This class is nested since it needs to refer to the `monitor` attribute @@ -103,12 +102,12 @@ def start(inner_self): self._logger.info("Embedded mitmproxy exited") -@contextlib.contextmanager -def run_proxy(proxy: Proxy) -> Proxy: - """Implement ContextManager protocol for stating and stopping a Proxy.""" +@contextlib.asynccontextmanager +async def run_proxy(proxy: Proxy) -> AsyncIterator[Proxy]: + """Implement AsyncContextManager protocol for stating and stopping a Proxy.""" try: proxy.start() yield finally: - proxy.stop() + await proxy.stop() diff --git a/test/goth/assertions/test_assertions.py b/test/goth/assertions/test_assertions.py index ca8d02b92..2c2d736ff 100644 --- a/test/goth/assertions/test_assertions.py +++ b/test/goth/assertions/test_assertions.py @@ -40,7 +40,7 @@ async def func(_stream): await task assert assertion.done assert assertion.accepted - assert await assertion.result() == 43 + assert assertion.result() == 43 @pytest.mark.asyncio @@ -58,7 +58,7 @@ async def func(_stream): await task assert assertion.done assert assertion.accepted - assert await assertion.result() == 43 + assert assertion.result() == 43 @pytest.mark.asyncio @@ -95,7 +95,7 @@ async def func(stream): events.append(2) await assertion.update_events() assert assertion.accepted - assert await assertion.result() == 2 + assert assertion.result() == 2 @pytest.mark.asyncio @@ -113,7 +113,7 @@ async def func(stream): await assertion.update_events() assert assertion.failed with pytest.raises(AssertionError): - await assertion.result() + _ = assertion.result() @pytest.mark.asyncio @@ -144,7 +144,7 @@ async def func(stream): events.append(3) await assertion.update_events() assert assertion.accepted - assert await assertion.result() == 6 + assert assertion.result() == 6 @pytest.mark.asyncio @@ -164,7 +164,7 @@ async def func(stream): await assertion.update_events(events_ended=True) assert assertion.accepted - assert await assertion.result() == 44 + assert assertion.result() == 44 @pytest.mark.asyncio @@ -185,7 +185,7 @@ async def func(stream): await assertion.update_events(events_ended=True) assert assertion.failed with pytest.raises(AssertionError): - await assertion.result() + assertion.result() @pytest.mark.asyncio @@ -209,7 +209,7 @@ async def func(stream): await assertion.update_events(events_ended=True) assert assertion.done - assert await assertion.result() == events + assert assertion.result() == events @pytest.mark.asyncio @@ -282,7 +282,7 @@ async def func(stream): await assertion.update_events(events_ended=True) assert assertion.accepted - assert await assertion.result() == [3, 2, 1] + assert assertion.result() == [3, 2, 1] @pytest.mark.parametrize( @@ -319,7 +319,7 @@ async def func(stream): assert assertion.accepted is accept try: - result = await assertion.result() + result = assertion.result() except Exception as error: result = error assert result_predicate(result) @@ -393,7 +393,7 @@ async def assert_111(stream): await asyncio.sleep(0.1) await assertion.update_events(events_ended=True) - assert await assertion.result() == (True, False, False) + assert assertion.result() == (True, False, False) @pytest.mark.asyncio @@ -401,10 +401,131 @@ async def test_start_twice_raises(): """Test whether starting an already started assertion raises a `RuntimeError`.""" events = [] - async def func(stream): + async def func(_stream): return assertion = Assertion(events, func) assertion.start() with pytest.raises(RuntimeError): assertion.start() + + +@pytest.mark.asyncio +async def test_result_raises_invalidstateerror(): + """Test that `result()` raises an exception when the result is not ready.""" + + async def func(stream): + async for e in stream: + if e == 1: + return + + assertion = Assertion([], func) + with pytest.raises(asyncio.InvalidStateError): + assertion.result() + + assertion.start() + with pytest.raises(asyncio.InvalidStateError): + assertion.result() + + # In order not to leave assertion's task pending + await assertion.update_events(events_ended=True) + + +@pytest.mark.asyncio +async def test_wait_for_result_already_done(): + """Test that `wait_for_result()` returns the result of a done assertion.""" + + async def func(_stream): + return 6 + + assertion = Assertion([], func) + assertion.start() + + result = await assertion.wait_for_result() + assert result == 6 + + +@pytest.mark.asyncio +async def test_wait_for_result_success(): + """Test that `wait_for_result()` waits for the assertion's result.""" + + async def func(_stream): + await asyncio.sleep(0.1) + return 7 + + assertion = Assertion([], func) + assertion.start() + + assert not assertion.done + result = await assertion.wait_for_result() + assert result == 7 + + +@pytest.mark.asyncio() +async def test_wait_for_result_failure(): + """Test that `wait_for_result()` waits for the assertion's exception.""" + + async def func(stream): + await asyncio.sleep(0.1) + raise AssertionError() + + assertion = Assertion([], func) + assertion.start() + + assert not assertion.done + with pytest.raises(AssertionError): + await assertion.wait_for_result() + + +@pytest.mark.asyncio +async def test_wait_for_result_timeout_success(): + """Test that `wait_for_result` with timeout returns the assertion's result.""" + + async def func(_stream): + await asyncio.sleep(0.1) + return 8 + + assertion = Assertion([], func) + assertion.start() + + assert not assertion.done + result = await assertion.wait_for_result(timeout=1.0) + assert result == 8 + + +@pytest.mark.asyncio +async def test_wait_for_result_timeout_failure(): + """Test that `wait_for_result` with timeout raises the assertion's exception.""" + + async def func(stream): + await asyncio.sleep(0.1) + raise AssertionError() + + assertion = Assertion([], func) + assertion.start() + + assert not assertion.done + with pytest.raises(AssertionError): + await assertion.wait_for_result(timeout=1.0) + + +@pytest.mark.asyncio +async def test_wait_for_result_timeout_raises(): + """Test that `wait_for_result` with timeout raises TimeoutError.""" + + async def func(stream): + await asyncio.sleep(1.0) + return 9 + + assertion = Assertion([], func) + assertion.start() + + assert not assertion.done + with pytest.raises(asyncio.TimeoutError): + await assertion.wait_for_result(timeout=0.1) + + # Assertion is cancelled and should fail with AssertionError("Assertion cancelled") + assert assertion.failed + with pytest.raises(AssertionError) as error: + _ = assertion.result() + assert "Assertion cancelled" in str(error) diff --git a/test/goth/assertions/test_monitor.py b/test/goth/assertions/test_monitor.py index d22bc504e..9e47a5aec 100644 --- a/test/goth/assertions/test_monitor.py +++ b/test/goth/assertions/test_monitor.py @@ -74,7 +74,7 @@ async def assert_fancy_property(stream: Events) -> int: @pytest.mark.asyncio async def test_assertions(): - """Test a dummy set of assertions agains a list of int's.""" + """Test a dummy set of assertions against a list of int's.""" monitor: EventMonitor[int] = EventMonitor() monitor.add_assertions( @@ -88,9 +88,10 @@ async def test_assertions(): monitor.start() for n in [1, 3, 4, 6, 3, 8, 9, 10]: - monitor.add_event(n) - # Need this sleep to make sure the assertions consume the events - await asyncio.sleep(0.2) + await monitor.add_event(n) + + # Need this sleep to make sure the assertions consume the events + await asyncio.sleep(0.1) failed = {a.name.rsplit(".", 1)[-1] for a in monitor.failed} assert failed == {"assert_increasing"} @@ -112,20 +113,81 @@ async def test_assertions(): async def test_not_started_raises_on_add_event(): """Test whether `add_event()` invoked before starting the monitor raises error.""" - monitor: EventMonitor[int] = EventMonitor() + monitor = EventMonitor() with pytest.raises(RuntimeError): - monitor.add_event(1) + await monitor.add_event(1) @pytest.mark.asyncio async def test_stopped_raises_on_add_event(): """Test whether `add_event()` invoked after stopping the monitor raises error.""" - monitor: EventMonitor[int] = EventMonitor() + monitor = EventMonitor() monitor.start() await monitor.stop() with pytest.raises(RuntimeError): - monitor.add_event(1) + await monitor.add_event(1) + + +@pytest.mark.asyncio +async def test_waitable_monitor(): + """Test if `WaitableMonitor.wait_for_event()` respects event ordering.""" + + monitor = EventMonitor() + monitor.start() + + events = [] + + async def wait_for_events(): + events.append(await monitor.wait_for_event(lambda e: e == 1)) + events.append(await monitor.wait_for_event(lambda e: e == 2)) + events.append(await monitor.wait_for_event(lambda e: e == 3)) + + await monitor.add_event(0) + await monitor.add_event(1) + await monitor.add_event(2) + await monitor.add_event(0) + + task = asyncio.create_task(wait_for_events()) + await asyncio.sleep(0.1) + assert events == [1, 2] + + await monitor.add_event(3) + await asyncio.sleep(0.1) + assert events == [1, 2, 3] + + assert task.done() + await monitor.stop() + + +@pytest.mark.asyncio +async def test_waitable_monitor_timeout_error(): + """Test if `WaitableMonitor.wait_for_event()` raises `TimeoutError` on timeout.""" + + monitor = EventMonitor() + monitor.start() + + with pytest.raises(asyncio.TimeoutError): + await monitor.wait_for_event(lambda e: e == 1, timeout=0.1) + + await monitor.stop() + + +@pytest.mark.asyncio +async def test_waitable_monitor_timeout_success(): + """Test if `WaitableMonitor.wait_for_event()` return success before timeout.""" + + monitor = EventMonitor() + monitor.start() + + async def worker_task(): + await asyncio.sleep(0.1) + await monitor.add_event(1) + + asyncio.create_task(worker_task()) + + await monitor.wait_for_event(lambda e: e == 1, timeout=1.0) + await monitor.stop() diff --git a/test/yagna/conftest.py b/test/yagna/conftest.py index ca554e651..f4e71a924 100644 --- a/test/yagna/conftest.py +++ b/test/yagna/conftest.py @@ -1,7 +1,6 @@ """Code common for all pytest modules in this package.""" from datetime import datetime, timezone -import json from pathlib import Path from typing import Callable, Optional @@ -173,17 +172,6 @@ def assets_path(request) -> Path: return path.resolve() -@pytest.fixture(scope="module") -def exe_script(assets_path: Path) -> list: - """Fixture which parses the exe_script.json file from `assets_path` dir.""" - - exe_script_path = assets_path / "requestor" / "exe_script.json" - with exe_script_path.open() as fd: - loaded = json.load(fd) - assert isinstance(loaded, list) - return loaded - - @pytest.fixture(scope="module") def task_package_template() -> str: """Fixture which provides the Demand's `golem.srv.comp.task_package` property. diff --git a/test/yagna/e2e/assets/docker/docker-compose.yml b/test/yagna/e2e/assets/docker/docker-compose.yml index d6c7bf874..d8378fcc0 100644 --- a/test/yagna/e2e/assets/docker/docker-compose.yml +++ b/test/yagna/e2e/assets/docker/docker-compose.yml @@ -21,7 +21,7 @@ services: - "8545:8545" zksync: - image: docker.pkg.github.com/golemfactory/yagna-zksync/yagna-zksync-mock:515b66be9f96 + image: docker.pkg.github.com/golemfactory/yagna-zksync/yagna-zksync-mock:f6d0cf02f6bc ports: - "3030:3030" environment: diff --git a/test/yagna/e2e/assets/requestor/exe_script.json b/test/yagna/e2e/assets/requestor/exe_script.json deleted file mode 100644 index 8c3844beb..000000000 --- a/test/yagna/e2e/assets/requestor/exe_script.json +++ /dev/null @@ -1,23 +0,0 @@ -[ - {"deploy": { - }}, - {"start": { - "args": [] - }}, - { - "transfer": { - "from": "http://3.249.139.167:8000/LICENSE", - "to": "container:/input/file_in" - } - }, - {"run": { - "entry_point": "rust-wasi-tutorial", - "args": ["/input/file_in", "/output/file_cp"] - }}, - { - "transfer": { - "from": "container:/output/file_cp", - "to": "http://3.249.139.167:8000/upload/file_up" - } - } -] diff --git a/test/yagna/e2e/test_e2e_vm.py b/test/yagna/e2e/test_e2e_vm.py index 7ea694d62..16fdc7fc1 100644 --- a/test/yagna/e2e/test_e2e_vm.py +++ b/test/yagna/e2e/test_e2e_vm.py @@ -18,6 +18,7 @@ from goth.runner.container.yagna import YagnaContainerConfig from goth.runner.provider import ProviderProbeWithLogSteps from goth.runner.requestor import RequestorProbeWithApiSteps +from test.yagna.helpers.activity import vm_exe_script logger = logging.getLogger(__name__) @@ -49,7 +50,6 @@ def _topology( YagnaContainerConfig( name="requestor", probe_type=RequestorProbeWithApiSteps, - volumes={assets_path / "requestor": "/asset"}, environment=requestor_env, payment_id=payment_id_pool.get_id(), ), @@ -70,39 +70,6 @@ def _topology( ] -def _exe_script(runner: Runner, output_file: str): - - output_path = Path(runner.web_root_path) / output_file - if output_path.exists(): - os.remove(output_path) - - web_server_addr = f"http://{runner.host_address}:{runner.web_server_port}" - - return [ - {"deploy": {}}, - {"start": {}}, - { - "transfer": { - "from": f"{web_server_addr}/scene.blend", - "to": "container:/golem/resource/scene.blend", - } - }, - { - "transfer": { - "from": f"{web_server_addr}/params.json", - "to": "container:/golem/work/params.json", - } - }, - {"run": {"entry_point": "/golem/entrypoints/run-blender.sh", "args": []}}, - { - "transfer": { - "from": f"container:/golem/output/{output_file}", - "to": f"{web_server_addr}/upload/{output_file}", - } - }, - ] - - @pytest.mark.asyncio async def test_e2e_vm_success( assets_path: Path, @@ -126,7 +93,7 @@ async def test_e2e_vm_success( if output_path.exists(): os.remove(output_path) - exe_script = _exe_script(runner, output_file) + exe_script = vm_exe_script(runner, output_file) requestor = runner.get_probes(probe_type=RequestorProbeWithApiSteps)[0] providers = runner.get_probes(probe_type=ProviderProbeWithLogSteps) diff --git a/test/yagna/e2e/test_e2e_wasm.py b/test/yagna/e2e/test_e2e_wasm.py index 3878410c5..bb07c9ce8 100644 --- a/test/yagna/e2e/test_e2e_wasm.py +++ b/test/yagna/e2e/test_e2e_wasm.py @@ -17,6 +17,7 @@ from goth.runner.container.yagna import YagnaContainerConfig from goth.runner.provider import ProviderProbeWithLogSteps from goth.runner.requestor import RequestorProbeWithApiSteps +from test.yagna.helpers.activity import wasi_exe_script logger = logging.getLogger(__name__) @@ -42,7 +43,6 @@ def _topology( YagnaContainerConfig( name="requestor", probe_type=RequestorProbeWithApiSteps, - volumes={assets_path / "requestor": "/asset"}, environment=requestor_env, payment_id=payment_id_pool.get_id(), ), @@ -69,7 +69,6 @@ def _topology( async def test_e2e_wasm_success( assets_path: Path, demand_constraints: str, - exe_script: dict, payment_id_pool: PaymentIdPool, runner: Runner, task_package_template: str, @@ -132,6 +131,7 @@ async def test_e2e_wasm_success( # Activity + exe_script = wasi_exe_script(runner) num_commands = len(exe_script) for agreement_id, provider in agreement_providers: diff --git a/test/yagna/helpers/activity.py b/test/yagna/helpers/activity.py index 750a12091..f73cfb23a 100644 --- a/test/yagna/helpers/activity.py +++ b/test/yagna/helpers/activity.py @@ -1,7 +1,10 @@ """Helper functions for running Activity.""" import json +import os +from pathlib import Path +from goth.runner import Runner from goth.runner.provider import ProviderProbeWithLogSteps from goth.runner.requestor import RequestorProbeWithApiSteps @@ -22,3 +25,72 @@ async def run_activity( await requestor.destroy_activity(activity_id) await provider.wait_for_exeunit_finished() + + +def vm_exe_script(runner: Runner, output_file: str = "output.png"): + """VM exe script builder.""" + """Create a VM exe script for running a Blender task.""" + + output_path = Path(runner.web_root_path) / output_file + if output_path.exists(): + os.remove(output_path) + + web_server_addr = f"http://{runner.host_address}:{runner.web_server_port}" + + return [ + {"deploy": {}}, + {"start": {}}, + { + "transfer": { + "from": f"{web_server_addr}/scene.blend", + "to": "container:/golem/resource/scene.blend", + } + }, + { + "transfer": { + "from": f"{web_server_addr}/params.json", + "to": "container:/golem/work/params.json", + } + }, + {"run": {"entry_point": "/golem/entrypoints/run-blender.sh", "args": []}}, + { + "transfer": { + "from": f"container:/golem/output/{output_file}", + "to": f"{web_server_addr}/upload/{output_file}", + } + }, + ] + + +def wasi_exe_script(runner: Runner, output_file: str = "upload_file"): + """WASI exe script builder.""" + """Create a WASI exe script for running a WASI tutorial task.""" + + output_path = Path(runner.web_root_path) / output_file + if output_path.exists(): + os.remove(output_path) + + web_server_addr = f"http://{runner.host_address}:{runner.web_server_port}" + + return [ + {"deploy": {}}, + {"start": {"args": []}}, + { + "transfer": { + "from": f"{web_server_addr}/params.json", + "to": "container:/input/file_in", + } + }, + { + "run": { + "entry_point": "rust-wasi-tutorial", + "args": ["/input/file_in", "/output/file_cp"], + } + }, + { + "transfer": { + "from": "container:/output/file_cp", + "to": f"{web_server_addr}/upload/{output_file}", + } + }, + ] diff --git a/test/yagna/interactive/test_interactive_vm.py b/test/yagna/interactive/test_interactive_vm.py index 84a85a770..7dbae6c66 100644 --- a/test/yagna/interactive/test_interactive_vm.py +++ b/test/yagna/interactive/test_interactive_vm.py @@ -64,7 +64,6 @@ def _topology( YagnaContainerConfig( name="requestor", probe_type=RequestorProbe, - volumes={assets_path / "requestor": "/asset"}, environment=requestor_env, payment_id=payment_id_pool.get_id(), ), diff --git a/test/yagna/module/payments/test_zero_amount_txs.py b/test/yagna/module/payments/test_zero_amount_txs.py index 9e1bfb6ab..2d80cfe32 100644 --- a/test/yagna/module/payments/test_zero_amount_txs.py +++ b/test/yagna/module/payments/test_zero_amount_txs.py @@ -43,7 +43,6 @@ def _topology( YagnaContainerConfig( name="requestor", probe_type=RequestorProbeWithApiSteps, - volumes={assets_path / "requestor": "/asset"}, environment=requestor_env, payment_id=payment_id_pool.get_id(), ), @@ -62,7 +61,6 @@ def _topology( async def test_zero_amount_invoice_is_settled( assets_path: Path, demand_constraints: str, - exe_script: dict, payment_id_pool: PaymentIdPool, runner: Runner, task_package_template: str, @@ -102,9 +100,11 @@ async def test_zero_amount_invoice_is_settled( counterproposal_id = await requestor.counter_proposal( subscription_id, demand, proposal ) + logger.info("Counter proposal %s", counterproposal_id) await provider.wait_for_proposal_accepted() new_proposals = await requestor.wait_for_proposals(subscription_id, (provider,)) + logger.info(new_proposals) new_proposal = new_proposals[0] assert new_proposal.prev_proposal_id == counterproposal_id @@ -116,7 +116,8 @@ async def test_zero_amount_invoice_is_settled( await requestor.unsubscribe_demand(subscription_id) logger.info("Got agreement") - # Zero-amount invoice is issued when agreement is terminated without activity + # Zero-amount invoice is issued when agreement is terminated + # without activity await requestor.wait_for_approval(agreement_id) await requestor.terminate_agreement(agreement_id, None) diff --git a/test/yagna/module/ya-provider/test_provider_multi_activity.py b/test/yagna/module/ya-provider/test_provider_multi_activity.py index 4c63d270c..56902be66 100644 --- a/test/yagna/module/ya-provider/test_provider_multi_activity.py +++ b/test/yagna/module/ya-provider/test_provider_multi_activity.py @@ -19,6 +19,7 @@ from goth.runner.container.yagna import YagnaContainerConfig from goth.runner.provider import ProviderProbeWithLogSteps from goth.runner.requestor import RequestorProbeWithApiSteps +from test.yagna.helpers.activity import wasi_exe_script from test.yagna.helpers.negotiation import negotiate_agreements, DemandBuilder from test.yagna.helpers.payment import pay_all @@ -49,7 +50,6 @@ def _topology( YagnaContainerConfig( name="requestor", probe_type=RequestorProbeWithApiSteps, - volumes={assets_path / "requestor": "/asset"}, environment=requestor_env, payment_id=payment_id_pool.get_id(), ), @@ -70,7 +70,6 @@ def _topology( async def test_provider_multi_activity( assets_path: Path, demand_constraints: str, - exe_script: dict, payment_id_pool: PaymentIdPool, runner: Runner, task_package_template: str, @@ -105,6 +104,8 @@ async def test_provider_multi_activity( ) # Activity + exe_script = wasi_exe_script(runner) + for agreement_id, provider in agreement_providers: for i in range(0, 3): logger.info("Running activity %n-th time on %s", i, provider.name)