-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add integration test for agreement termination
- Loading branch information
Showing
2 changed files
with
191 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
#!/usr/bin/env python3 | ||
"""A requestor script used for testing agreement termination.""" | ||
import asyncio | ||
from datetime import timedelta | ||
import logging | ||
|
||
from yapapi import Executor, Task, WorkContext | ||
from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa | ||
from yapapi.package import vm | ||
|
||
|
||
async def main(): | ||
|
||
package = await vm.repo( | ||
image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae", | ||
min_mem_gib=0.5, | ||
min_storage_gib=2.0, | ||
) | ||
|
||
first_worker = True | ||
|
||
async def worker(ctx: WorkContext, tasks): | ||
"""A worker function for `Executor.submit()`. | ||
The first call to this function will produce a worker | ||
that sends an invalid `run` command to the provider. | ||
This should cause `yield ctx.commit()` to fail with | ||
`CommandExecutionError`. | ||
The remaining calls will just send `sleep 5` to the | ||
provider to simulate some work. | ||
""" | ||
|
||
nonlocal first_worker | ||
should_fail = first_worker | ||
first_worker = False | ||
|
||
async for task in tasks: | ||
|
||
if should_fail: | ||
# Send a command that will fail on the provider | ||
ctx.run("xyz") | ||
yield ctx.commit() | ||
else: | ||
# Simulate some work | ||
ctx.run("/bin/sleep", "5") | ||
yield ctx.commit() | ||
|
||
task.accept_result() | ||
|
||
async with Executor( | ||
package=package, | ||
max_workers=1, | ||
budget=10.0, | ||
timeout=timedelta(minutes=6), | ||
subnet_tag="goth", | ||
driver="zksync", | ||
network="rinkeby", | ||
event_consumer=log_event_repr, | ||
) as executor: | ||
|
||
tasks = [Task(data=n) for n in range(6)] | ||
async for task in executor.submit(worker, tasks): | ||
print(f"Task computed: {task}, time: {task.running_time}") | ||
|
||
print("All tasks computed") | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
enable_default_logger(log_file="test.log") | ||
|
||
console_handler = logging.StreamHandler() | ||
console_handler.setLevel(logging.DEBUG) | ||
logging.getLogger("yapapi.events").addHandler(console_handler) | ||
|
||
loop = asyncio.get_event_loop() | ||
task = loop.create_task(main()) | ||
|
||
try: | ||
loop.run_until_complete(task) | ||
except KeyboardInterrupt: | ||
print("Shutting down gracefully...") | ||
task.cancel() | ||
try: | ||
loop.run_until_complete(task) | ||
print("Shutdown completed") | ||
except (asyncio.CancelledError, KeyboardInterrupt): | ||
pass |
102 changes: 102 additions & 0 deletions
102
tests/goth/test_agreement_termination/test_agreement_termination.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
"""A goth test scenario for agreement termination.""" | ||
from functools import partial | ||
import logging | ||
import os | ||
from pathlib import Path | ||
import re | ||
|
||
import pytest | ||
|
||
from goth.configuration import load_yaml | ||
from goth.runner.log import configure_logging | ||
from goth.runner import Runner | ||
from goth.runner.probe import RequestorProbe | ||
|
||
|
||
logger = logging.getLogger("goth.test.agreement_termination") | ||
|
||
|
||
async def assert_command_error(stream): | ||
"""Assert that a worker failure due to `CommandExecutionError` is reported.""" | ||
|
||
async for line in stream: | ||
m = re.match(r"WorkerFinished\(agr_id='([^']+)'.*CommandExecutionError", line) | ||
if m: | ||
return m.group(1) | ||
raise AssertionError("Expected CommandExecutionError failure") | ||
|
||
|
||
async def assert_agreement_cancelled(agr_id, stream): | ||
"""Assert that the agreement with the given id is eventually terminated. | ||
Fails if a new task is started for the agreement. | ||
""" | ||
|
||
async for line in stream: | ||
if re.match(rf"TaskStarted\(.*agr_id='{agr_id}'", line): | ||
raise AssertionError(f"Task started for agreement {agr_id}") | ||
if re.match( | ||
rf"AgreementTerminated\(agr_id='{agr_id}'.*'golem.requestor.code': 'Cancelled'", line | ||
): | ||
return | ||
|
||
|
||
async def assert_all_tasks_computed(stream): | ||
"""Assert that for every task id, `TaskAccepted` with that id occurs.""" | ||
remaining_ids = {1, 2, 3, 4, 5, 6} | ||
|
||
async for line in stream: | ||
m = re.search(r"TaskAccepted\(task_id='([0-9]+)'", line) | ||
if m: | ||
task_id = int(m.group(1)) | ||
logger.debug("assert_all_tasks_computed: Task %d computed", task_id) | ||
remaining_ids.discard(task_id) | ||
if not remaining_ids: | ||
return | ||
|
||
raise AssertionError(f"Tasks not computed: {remaining_ids}") | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_agreement_termination( | ||
project_dir: Path, | ||
log_dir: Path, | ||
) -> None: | ||
|
||
# This is the default configuration with 2 wasm/VM providers | ||
goth_config = load_yaml(project_dir / "tests" / "goth" / "assets" / "goth-config.yml") | ||
test_script_path = str(Path(__file__).parent / "requestor.py") | ||
|
||
configure_logging(log_dir) | ||
|
||
runner = Runner( | ||
base_log_dir=log_dir, | ||
compose_config=goth_config.compose_config, | ||
) | ||
|
||
async with runner(goth_config.containers): | ||
|
||
requestor = runner.get_probes(probe_type=RequestorProbe)[0] | ||
|
||
async with requestor.run_command_on_host(test_script_path, env=os.environ) as ( | ||
_cmd_task, | ||
cmd_monitor, | ||
): | ||
|
||
cmd_monitor.add_assertion(assert_all_tasks_computed) | ||
|
||
# Wait for worker failure due to command error | ||
assertion = cmd_monitor.add_assertion(assert_command_error) | ||
agr_id = await assertion.wait_for_result(timeout=60) | ||
logger.info("Detected command error in activity for agreement %s", agr_id) | ||
|
||
# Make sure no new tasks are sent and the agreement is terminated | ||
assertion = cmd_monitor.add_assertion( | ||
partial(assert_agreement_cancelled, agr_id), | ||
name=f"assert_agreement_cancelled({agr_id})", | ||
) | ||
await assertion.wait_for_result(timeout=10) | ||
|
||
# Wait for executor shutdown | ||
await cmd_monitor.wait_for_pattern("ShutdownFinished", timeout=60) | ||
logger.info("Requestor script finished") |