Skip to content

Commit

Permalink
Merge branch 'master' of github.com:golemfactory/yagna-integration in…
Browse files Browse the repository at this point in the history
…to provider/test-breaking-agreement
  • Loading branch information
nieznanysprawiciel committed Mar 26, 2021
2 parents 1737719 + 79b4147 commit 47f2c3b
Show file tree
Hide file tree
Showing 23 changed files with 610 additions and 258 deletions.
2 changes: 1 addition & 1 deletion goth/api_monitor/monitor_addon.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _register_event(self, event: APIEvent) -> None:
"""Log an API event and add it to the monitor."""

self._logger.debug("%s", event)
self._monitor.add_event(event)
self._monitor.add_event_sync(event)

def request(self, flow: HTTPFlow) -> None:
"""Register a request."""
Expand Down
73 changes: 54 additions & 19 deletions goth/assertions/assertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ class EventStream(Protocol, AsyncIterable[E]):
class Assertion(AsyncIterable[E]):
"""A class for executing assertion coroutines.
An instance of this class wraps a coroutine function (called the
"assertion coroutine") and provides an asynchronous generator of
events that the assertion coroutine processes.
An instance of this class wraps a coroutine (called the "assertion coroutine")
and provides an asynchronous generator of events that this coroutine processes.
After creating an instance of this class, its client should await
the `update_events()` method each time a new event is appended to the list
of events (the list is passed as an argument to `Assertion()`).
Expand Down Expand Up @@ -98,13 +97,20 @@ def start(self) -> asyncio.Task:
if self.started:
raise RuntimeError("Assertion already started")

def on_done(*args) -> None:
"""Notify the tasks waiting until this assertion updates."""
self._notify_update_events()
async def func_wrapper():
"""Ensure `_notify_update_events` is called after processing each event.
See also comments in `_create_generator()`.
"""
try:
return await self._func(self)
except asyncio.CancelledError:
raise AssertionError("Assertion cancelled")
finally:
self._notify_update_events()

assert self._func is not None
self._task = asyncio.create_task(self._func(self))
self._task.add_done_callback(on_done)
self._task = asyncio.create_task(func_wrapper())
self._ready = asyncio.Event()
self._processed = asyncio.Event()
return self._task
Expand All @@ -126,27 +132,54 @@ def done(self) -> bool:
@property
def accepted(self) -> bool:
"""Return `True` iff this assertion finished execution successfuly."""
return self.started and self._task.done() and self._task.exception() is None
return (
self._task is not None
and self._task.done()
and self._task.exception() is None
)

@property
def failed(self) -> bool:
"""Return `True` iff this assertion finished execution by failing."""
return self.started and self._task.done() and self._task.exception() is not None
return (
self._task is not None
and self._task.done()
and self._task.exception() is not None
)

async def result(self) -> Any:
def result(self) -> Any:
"""Return the result of this assertion.
If the assertion succeeded its result will be returned.
If it failed, the exception will be raised.
If the assertion hasn't finished yet, `None` will be returned.
If it hasn't been started, `asyncio.InvalidStateError` will be raised.
The semantics is similar to that of the `result()` method of `asyncio.Task`
(https://docs.python.org/3/library/asyncio-task.html#task-object):
If the assertion is done, the result of the assertion coroutine is returned
or the exception raised by the coroutine is re-raised.
If the assertion is not done (in particular, if hasn't been started) then
this method raises `asyncio.InvalidStateError`.
"""
if not self.started:
if not self._task:
raise asyncio.InvalidStateError("Assertion not started")

if self._task.done():
return self._task.result()

async def wait_for_result(self, timeout: Optional[float] = None) -> Any:
"""Wait until this assertion's result becomes available and return it.
Optional `timeout` is in seconds, `None` means to wait indefinitely
(this is the default).
"""
if not self._task:
raise asyncio.InvalidStateError("Assertion not started")

if timeout is None:
return await self._task
return None

try:
return await asyncio.wait_for(self._task, timeout=timeout)
finally:
# This is to retrieve exception from `self._task` so no unretrieved
# exceptions are reported when the event loop closes.
_ = self._task.exception()

async def update_events(self, events_ended: bool = False) -> None:
"""Notify the assertion that a new event has been added."""
Expand Down Expand Up @@ -183,6 +216,8 @@ def __aiter__(self) -> AsyncIterator[E]:

def _notify_update_events(self) -> None:
"""Notify tasks waiting in `update_events()` that the update is processed."""
if not self._ready or not self._processed:
raise asyncio.InvalidStateError("Assertion not started")
self._ready.clear()
self._processed.set()

Expand All @@ -209,5 +244,5 @@ async def _create_generator(self) -> AsyncIterator[E]:
#
# In case the control does not return (since the events end or
# the assertion coroutine raises an exception), `_notify_update_events()`
# is called in the done callback for the coroutine task.
# must be called after returning from the assertion coroutine.
self._notify_update_events()
141 changes: 99 additions & 42 deletions goth/assertions/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import importlib
import logging
import sys
from typing import Generic, List, Optional, Sequence
from typing import Callable, Generic, List, Optional, Sequence, Union

from goth.assertions import Assertion, AssertionFunction, E

Expand Down Expand Up @@ -55,13 +55,23 @@ class EventMonitor(Generic[E]):
name: Optional[str]
"""The name of this monitor, for use in logging."""

_event_loop: asyncio.AbstractEventLoop
"""The event loop in which this monitor has been started."""

_events: List[E]
"""List of events registered so far."""

_incoming: "Optional[asyncio.Queue[Optional[E]]]"
_incoming: "asyncio.Queue[Optional[E]]"
"""A queue used to pass the events to the worker task."""

_logger: logging.Logger
_last_checked_event: int
"""The index of the last event examined by `wait_for_event()` method.
Subsequent calls to `wait_for_event` will only look at events that occurred
after this event.
"""

_logger: Union[logging.Logger, MonitorLoggerAdapter]
"""A logger instance for this monitor."""

_worker_task: Optional[asyncio.Task]
Expand All @@ -75,32 +85,33 @@ def __init__(
) -> None:
self.assertions = OrderedDict()
self.name = name

self._event_loop = asyncio.get_event_loop()
self._events = []
# Delay creating the queue to make sure it's created in the event loop
# used by the worker task.
self._incoming = None
self._worker_task = None
self._incoming = asyncio.Queue()
self._last_checked_event = -1
self._logger = logger or logging.getLogger(__name__)
if self.name:
self._logger = MonitorLoggerAdapter(
self._logger, {MonitorLoggerAdapter.EXTRA_MONITOR_NAME: self.name}
)
self._stop_callback = on_stop
self._worker_task = None

def add_assertion(
self, assertion_func: AssertionFunction[E], log_level: LogLevel = logging.INFO
) -> Assertion:
"""Add an assertion function to this monitor."""

result = Assertion(self._events, assertion_func)
self.assertions[result] = log_level
return result
assertion = Assertion(self._events, assertion_func)
assertion.start()
self._logger.debug("Assertion '%s' started", assertion.name)
self.assertions[assertion] = log_level
return assertion

def add_assertions(self, assertion_funcs: List[AssertionFunction[E]]) -> None:
"""Add a list of assertion functions to this monitor."""

# Create assertions here but don't start them yet, to make sure
# they're started in the same event loop in which they'll be running.
for func in assertion_funcs:
self.add_assertion(func)

Expand All @@ -123,29 +134,46 @@ def load_assertions(self, module_name: str) -> None:
mod_logger.addHandler(handler)

def start(self) -> None:
"""Start tracing events."""
"""Start tracing events.
Starting a monitor is decoupled from its initialisation. This allows the
user to add assertions to the monitor before starting to register events.
Such assertions are thus guaranteed not to "miss" any event registered
by the monitor.
"""

if self.is_running():
self._logger.warning("Monitor already started")
return

self._incoming = asyncio.Queue()
# Don't use `asyncio.create_task()` here, as it'll fail if the current
# event loop is not running yet. `asyncio.ensure_future()` will create
# a task which will be run when the loop is started.
future = asyncio.ensure_future(self._run_worker())
assert isinstance(future, asyncio.Task)
self._worker_task = future
self._worker_task = self._event_loop.create_task(self._run_worker())
self._logger.debug("Monitor started")

def add_event(self, event: E) -> None:
async def add_event(self, event: E) -> None:
"""Register a new event."""

# Note: this method is `async` even though it does not perform any `await`.
# This is to ensure that it's directly callable only from code running in
# an event loop, which in turn guarantees that tasks waiting for input from
# `self._incoming` will be notified.

if not self.is_running():
raise RuntimeError(f"Monitor {self.name or ''} is not running")

self._incoming.put_nowait(event)

def add_event_sync(self, event: E) -> None:
"""Schedule registering a new event.
This function can be called from a thread different from the one
that started this monitor.
"""

if not self.is_running():
raise RuntimeError(f"Monitor {self.name or ''} is not running")

self._event_loop.call_soon_threadsafe(self._incoming.put_nowait, event)

async def stop(self) -> None:
"""Stop tracing events."""

Expand All @@ -162,6 +190,7 @@ async def stop(self) -> None:
# will return `False` to prevent adding more events.
worker = self._worker_task
self._worker_task = None
assert worker
await worker
self._logger.debug("Monitor stopped")

Expand All @@ -183,8 +212,9 @@ def is_running(self) -> bool:
return False

if self._worker_task.done():
if self._worker_task.exception():
raise self._worker_task.exception()
exc = self._worker_task.exception()
if exc:
raise exc
return False

return True
Expand All @@ -200,7 +230,6 @@ async def _run_worker(self) -> None:
events_ended = False

while not events_ended:

event = await self._incoming.get()
if event is not None:
self._events.append(event)
Expand All @@ -220,40 +249,33 @@ async def _check_assertions(self, events_ended: bool) -> None:

for a, level in self.assertions.items():

# As new assertions may be added on the fly, we need to make sure
# that this one has been started already.
if not a.started:
a.start()
self._logger.debug("Assertion '%s' started", a.name)

if a.done:
continue

await a.update_events(events_ended=events_ended)

if a.accepted:
result = await a.result()
result = a.result()
msg = "Assertion '%s' succeeded after event: %s; result: %s"
self._logger.log(level, msg, a.name, event_descr, result)

elif a.failed:
await self._report_failure(a, event_descr)

# Ensure other tasks can also run between assertions
await asyncio.sleep(0)

async def _report_failure(self, a: Assertion, event_descr: str) -> None:
try:
await a.result()
except Exception:
exc_type, exc, tb = sys.exc_info()
# Drop top 2 frames from the traceback: the current one
# and the the one for `a.result()`, so that only the frames
# of the assertion functions are left.
tb = tb.tb_next.tb_next
a.result()
except Exception as exc:
_exc_type, _exc, tb = sys.exc_info()
# Drop top 3 frames from the traceback: the current one,
# the one for `a.result()` and the one for the `func_wrapper`
# used in `__init__()`, so that only the frames of the assertion
# functions are left.
for _ in (1, 2, 3):
tb = tb.tb_next if tb else tb
msg = "Assertion '%s' failed after event: %s; cause: %s"
self._logger.error(
msg, a.name, event_descr, exc, exc_info=(exc_type, exc, tb)
msg, a.name, event_descr, exc, exc_info=(type(exc), exc, tb)
)

@property
Expand All @@ -279,3 +301,38 @@ def finished(self) -> bool:
"""Return True iif all assertions are done."""

return all(a.done for a in self.assertions)

async def wait_for_event(
self, predicate: Callable[[E], bool], timeout: Optional[float] = None
) -> E:
"""Wait for an event that satisfies given `predicate`.
The first call to this method will examine all events gathered since
this monitor was started and then, if needed, will wait for up to `timeout`
seconds for a matching event.
Subsequent calls will examine all events gathered since the previous call
returned and then wait for up to `timeout` seconds.
When `timeout` elapses, `asyncio.TimeourError` will be raised.
"""

# First examine log lines already seen
while self._last_checked_event + 1 < len(self._events):
self._last_checked_event += 1
event = self._events[self._last_checked_event]
if predicate(event):
return event

# Otherwise create an assertion that waits for a matching event...
async def wait_for_match(stream) -> E:
async for e in stream:
self._last_checked_event = len(stream.past_events) - 1
if predicate(e):
return e
raise AssertionError("No matching event occurred")

assertion = self.add_assertion(wait_for_match, logging.DEBUG)

# ... and wait until the assertion completes
return await assertion.wait_for_result(timeout=timeout)
2 changes: 1 addition & 1 deletion goth/default-assets/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ services:
- "8545:8545"

zksync:
image: docker.pkg.github.com/golemfactory/yagna-zksync/yagna-zksync-mock:7a25ed913d4a
image: docker.pkg.github.com/golemfactory/yagna-zksync/yagna-zksync-mock:f6d0cf02f6bc
ports:
- "3030:3030"
environment:
Expand Down
2 changes: 1 addition & 1 deletion goth/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def node_environment(
"RUST_LOG": "debug,tokio_core=info,tokio_reactor=info,hyper=info",
"YA_PAYMENT_NETWORK": "rinkeby",
"YAGNA_API_URL": YAGNA_REST_URL.substitute(host="0.0.0.0"),
"ZKSYNC_FAUCET_ADDR": "http://zksync:3030/donate",
"ZKSYNC_FAUCET_ADDR": "http://zksync:3030/zk/donatex",
"ZKSYNC_RINKEBY_RPC_ADDRESS": "http://zksync:3030",
# left for compatibility with yagna prior to commit 800efe13
"ZKSYNC_RPC_ADDRESS": "http://zksync:3030",
Expand Down
Loading

0 comments on commit 47f2c3b

Please sign in to comment.