Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration test for a Simple Service example #516

Merged
merged 9 commits into from
Jul 6, 2021
28 changes: 22 additions & 6 deletions examples/simple-service-poc/simple_service.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def shutdown(self):
yield self._ctx.commit()


async def main(subnet_tag, driver=None, network=None):
async def main(subnet_tag, running_time, driver=None, network=None):
async with Golem(
budget=1.0,
subnet_tag=subnet_tag,
Expand All @@ -102,7 +102,9 @@ async def main(subnet_tag, driver=None, network=None):
commissioning_time = datetime.now()

print(
f"{TEXT_COLOR_YELLOW}starting {pluralize(NUM_INSTANCES, 'instance')}{TEXT_COLOR_DEFAULT}"
f"{TEXT_COLOR_YELLOW}"
f"Starting {pluralize(NUM_INSTANCES, 'instance')}..."
f"{TEXT_COLOR_DEFAULT}"
)

# start the service
Expand Down Expand Up @@ -135,18 +137,18 @@ def still_starting():
if still_starting():
raise Exception(f"Failed to start instances before {STARTING_TIMEOUT} elapsed :( ...")

print("All instances started :)")
print(f"{TEXT_COLOR_YELLOW}All instances started :){TEXT_COLOR_DEFAULT}")

# allow the service to run for a short while
# (and allowing its requestor-end handlers to interact with it)

start_time = datetime.now()

while datetime.now() < start_time + timedelta(minutes=2):
while datetime.now() < start_time + timedelta(seconds=running_time):
print(f"instances: {instances()}")
await asyncio.sleep(5)

print(f"{TEXT_COLOR_YELLOW}stopping instances{TEXT_COLOR_DEFAULT}")
print(f"{TEXT_COLOR_YELLOW}Stopping instances...{TEXT_COLOR_DEFAULT}")
cluster.stop()

# wait for instances to stop
Expand All @@ -163,6 +165,15 @@ def still_starting():
parser = build_parser(
"A very simple / POC example of a service running on Golem, utilizing the VM runtime"
)
parser.add_argument(
"--running-time",
default=120,
type=int,
help=(
"How long should the instance run before the cluster is stopped "
"(in seconds, default: %(default)s)"
),
)
now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"simple-service-yapapi-{now}.log")
args = parser.parse_args()
Expand All @@ -179,7 +190,12 @@ def still_starting():

loop = asyncio.get_event_loop()
task = loop.create_task(
main(subnet_tag=args.subnet_tag, driver=args.driver, network=args.network)
main(
subnet_tag=args.subnet_tag,
running_time=args.running_time,
driver=args.driver,
network=args.network,
)
)

try:
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ ya-client-payment = "0.1.0"
ya-market = "0.1.0"

[tool.poe.tasks]
test = "pytest --cov=yapapi --ignore tests/goth"
goth-assets = "python -m goth create-assets tests/goth/assets"
goth-tests = "pytest -svx tests/goth"
test = "pytest --cov=yapapi --ignore tests/goth_tests"
goth-assets = "python -m goth create-assets tests/goth_tests/assets"
goth-tests = "pytest -svx tests/goth_tests"
typecheck = "mypy ."
codestyle = "black --check --diff ."
_liccheck_export = "poetry export -E cli -f requirements.txt -o .requirements.txt"
Expand Down
Empty file added tests/goth_tests/__init__.py
Empty file.
59 changes: 59 additions & 0 deletions tests/goth_tests/assertions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Temporal assertions used in `goth` integration tests.

The assertions express properties of lines of output printed by requestor
scripts (e.g. `blender.py`) to their stdout or stderr. For example, one such
property would be that if the requestor script prints a line containing
the string "Agreement confirmed by provider P", for any name P, then
eventually it will also print a line containing "Accepted invoice from P".
"""
import logging
import re
from typing import Set

from goth.assertions import EventStream


logger = logging.getLogger("goth.test.assertions")


async def assert_no_errors(output_lines: EventStream[str]):
"""Assert that no output line contains the substring `ERROR`."""
async for line in output_lines:
if "ERROR" in line:
raise AssertionError("Command reported ERROR")


async def assert_all_invoices_accepted(output_lines: EventStream[str]):
"""Assert that an invoice is accepted for every provider that confirmed an agreement."""
unpaid_agreement_providers = set()

async for line in output_lines:
m = re.search("Agreement confirmed by provider '([^']*)'", line)
if m:
prov_name = m.group(1)
logger.debug("assert_all_invoices_accepted: adding provider '%s'", prov_name)
unpaid_agreement_providers.add(prov_name)
m = re.search("Accepted invoice from '([^']*)'", line)
if m:
prov_name = m.group(1)
logger.debug("assert_all_invoices_accepted: adding invoice for '%s'", prov_name)
unpaid_agreement_providers.remove(prov_name)

if unpaid_agreement_providers:
raise AssertionError(f"Unpaid agreements for: {','.join(unpaid_agreement_providers)}")


async def assert_tasks_processed(tasks: Set[str], status: str, output_lines: EventStream[str]):
"""Assert that for every task in `tasks` a line with `Task {status}` will appear."""
remaining_tasks = tasks.copy()

async for line in output_lines:
m = re.search(rf".*Task {status} .* task data: (.+)$", line)
if m:
task_data = m.group(1)
logger.debug("assert_tasks_processed: Task %s: %s", status, task_data)
remaining_tasks.discard(task_data)
if not remaining_tasks:
return

raise AssertionError(f"Tasks not {status}: {remaining_tasks}")
4 changes: 2 additions & 2 deletions tests/goth/conftest.py → tests/goth_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from datetime import datetime, timezone
from pathlib import Path
from typing import cast, List, Tuple
from typing import cast, List

import pytest

from goth.configuration import Override
from yapapi.package import Package, vm
from yapapi.package import vm


def pytest_addoption(parser):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import os
from pathlib import Path
import re

import pytest

Expand All @@ -11,67 +10,26 @@
from goth.runner import Runner
from goth.runner.probe import RequestorProbe

from .assertions import assert_no_errors, assert_all_invoices_accepted, assert_tasks_processed


logger = logging.getLogger("goth.test.run_blender")

ALL_TASKS = {0, 10, 20, 30, 40, 50}
ALL_TASKS = {"0", "10", "20", "30", "40", "50"}


# Temporal assertions expressing properties of sequences of "events". In this case, each "event"
# is just a line of output from `blender.py`.


async def assert_no_errors(output_lines: EventStream[str]):
"""Assert that no output line contains the substring `ERROR`."""
async for line in output_lines:
if "ERROR" in line:
raise AssertionError("Command reported ERROR")


async def assert_all_tasks_processed(status: str, output_lines: EventStream[str]):
"""Assert that for every task in `ALL_TASKS` a line with `Task {status}` will appear."""
remaining_tasks = ALL_TASKS.copy()

async for line in output_lines:
m = re.search(rf".*Task {status} .* task data: ([0-9]+)", line)
if m:
task_data = int(m.group(1))
logger.debug("assert_all_tasks_processed: Task %s: %d", status, task_data)
remaining_tasks.discard(task_data)
if not remaining_tasks:
return

raise AssertionError(f"Tasks not {status}: {remaining_tasks}")


async def assert_all_tasks_started(output_lines: EventStream[str]):
"""Assert that for every task a line with `Task started on provider` will appear."""
await assert_all_tasks_processed("started on provider", output_lines)
await assert_tasks_processed(ALL_TASKS, "started on provider", output_lines)


async def assert_all_tasks_computed(output_lines: EventStream[str]):
"""Assert that for every task a line with `Task computed by provider` will appear."""
await assert_all_tasks_processed("finished by provider", output_lines)


async def assert_all_invoices_accepted(output_lines: EventStream[str]):
"""Assert that an invoice is accepted for every provider that confirmed an agreement."""
unpaid_agreement_providers = set()

async for line in output_lines:
m = re.search("Agreement confirmed by provider '([^']*)'", line)
if m:
prov_name = m.group(1)
logger.debug("assert_all_invoices_accepted: adding provider '%s'", prov_name)
unpaid_agreement_providers.add(prov_name)
m = re.search("Accepted invoice from '([^']*)'", line)
if m:
prov_name = m.group(1)
logger.debug("assert_all_invoices_accepted: adding invoice for '%s'", prov_name)
unpaid_agreement_providers.remove(prov_name)

if unpaid_agreement_providers:
raise AssertionError(f"Unpaid agreements for: {','.join(unpaid_agreement_providers)}")
await assert_tasks_processed(ALL_TASKS, "finished by provider", output_lines)


@pytest.mark.asyncio
Expand Down
71 changes: 71 additions & 0 deletions tests/goth_tests/test_run_simple_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""An integration test scenario that runs the Simple Service example requestor app."""
import logging
import os
from pathlib import Path
import time

import pytest

from goth.configuration import load_yaml
from goth.runner.log import configure_logging
from goth.runner import Runner
from goth.runner.probe import RequestorProbe

from .assertions import assert_no_errors, assert_all_invoices_accepted


logger = logging.getLogger("goth.test.run_simple_service")


@pytest.mark.asyncio
async def test_run_simple_service(log_dir: Path, project_dir: Path, config_overrides) -> None:

# This is the default configuration with 2 wasm/VM providers
goth_config = load_yaml(
Path(__file__).parent / "assets" / "goth-config.yml", overrides=config_overrides
)

requestor_path = project_dir / "examples" / "simple-service-poc" / "simple_service.py"

configure_logging(log_dir)

runner = Runner(
base_log_dir=log_dir,
compose_config=goth_config.compose_config,
)

async with runner(goth_config.containers):

requestor = runner.get_probes(probe_type=RequestorProbe)[0]

async with requestor.run_command_on_host(
f"{requestor_path} --running-time 40 --subnet-tag goth",
env=os.environ,
) as (_cmd_task, cmd_monitor):

start_time = time.time()

def elapsed_time():
return f"time: {(time.time() - start_time):.1f}"

# Add assertions to the command output monitor `cmd_monitor`:
cmd_monitor.add_assertion(assert_no_errors)
cmd_monitor.add_assertion(assert_all_invoices_accepted)

await cmd_monitor.wait_for_pattern("Starting 1 instance", timeout=20)
await cmd_monitor.wait_for_pattern("instances:.*'starting'", timeout=20)
logger.info(f"The instance is starting ({elapsed_time()})")

# A longer timeout to account for downloading a VM image
await cmd_monitor.wait_for_pattern("All instances started", timeout=120)
logger.info(f"The instance was started successfully ({elapsed_time()})")

for _ in range(3):
await cmd_monitor.wait_for_pattern("instances:.*'running'", timeout=20)
logger.info("The instance is running")

await cmd_monitor.wait_for_pattern("Stopping instances", timeout=60)
logger.info(f"The instance is stopping ({elapsed_time()})")

await cmd_monitor.wait_for_pattern(".*All jobs have finished", timeout=20)
logger.info(f"Requestor script finished ({elapsed_time()}")
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import math
import os
from pathlib import Path
import re

import pytest

Expand All @@ -12,6 +11,8 @@
from goth.runner import Runner
from goth.runner.probe import RequestorProbe

from .assertions import assert_no_errors, assert_all_invoices_accepted, assert_tasks_processed


logger = logging.getLogger("goth.test.run_yacat")

Expand All @@ -25,57 +26,14 @@
# is just a line of output from `yacat.py`.


async def assert_no_errors(output_lines: EventStream[str]):
"""Assert that no output line contains the substring `ERROR`."""
async for line in output_lines:
if "ERROR" in line:
raise AssertionError("Command reported ERROR")


async def assert_all_tasks_processed(status: str, output_lines: EventStream[str]):
"""Assert that for every task in `ALL_TASKS` a line with `Task {status}` will appear."""
remaining_tasks = ALL_TASKS.copy()

async for line in output_lines:
m = re.search(rf".*Task {status} .* task data: (.+)$", line)
if m:
task_data = m.group(1)
logger.debug("assert_all_tasks_processed: Task %s: %s", status, task_data)
remaining_tasks.discard(task_data)
if not remaining_tasks:
return

raise AssertionError(f"Tasks not {status}: {remaining_tasks}")


async def assert_all_tasks_started(output_lines: EventStream[str]):
"""Assert that for every task a line with `Task started on provider` will appear."""
await assert_all_tasks_processed("started on provider", output_lines)
await assert_tasks_processed(ALL_TASKS, "started on provider", output_lines)


async def assert_all_tasks_computed(output_lines: EventStream[str]):
"""Assert that for every task a line with `Task computed by provider` will appear."""
await assert_all_tasks_processed("finished by provider", output_lines)


async def assert_all_invoices_accepted(output_lines: EventStream[str]):
"""Assert that an invoice is accepted for every provider that confirmed an agreement."""
unpaid_agreement_providers = set()

async for line in output_lines:
m = re.search("Agreement confirmed by provider '([^']*)'", line)
if m:
prov_name = m.group(1)
logger.debug("assert_all_invoices_accepted: adding provider '%s'", prov_name)
unpaid_agreement_providers.add(prov_name)
m = re.search("Accepted invoice from '([^']*)'", line)
if m:
prov_name = m.group(1)
logger.debug("assert_all_invoices_accepted: adding invoice for '%s'", prov_name)
unpaid_agreement_providers.remove(prov_name)

if unpaid_agreement_providers:
raise AssertionError(f"Unpaid agreements for: {','.join(unpaid_agreement_providers)}")
await assert_tasks_processed(ALL_TASKS, "finished by provider", output_lines)


@pytest.mark.asyncio
Expand Down