Skip to content

Commit

Permalink
Integration test for a Simple Service example (#516)
Browse files Browse the repository at this point in the history
* Move assertions used in blender and yacat tests to a separate module
* Add cmdline param specifying instance running time to simple_service.py
* Add integration test that runs the simple service example with goth
* Fix imports from tests.goth.assertions module
* Rename tests/goth to tests/goth_tests to avoid conflicts with goth pkg
  • Loading branch information
azawlocki authored Jul 6, 2021
1 parent 314db23 commit 5ee0706
Show file tree
Hide file tree
Showing 17 changed files with 221 additions and 148 deletions.
28 changes: 22 additions & 6 deletions examples/simple-service-poc/simple_service.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def shutdown(self):
yield self._ctx.commit()


async def main(subnet_tag, driver=None, network=None):
async def main(subnet_tag, running_time, driver=None, network=None):
async with Golem(
budget=1.0,
subnet_tag=subnet_tag,
Expand All @@ -102,7 +102,9 @@ async def main(subnet_tag, driver=None, network=None):
commissioning_time = datetime.now()

print(
f"{TEXT_COLOR_YELLOW}starting {pluralize(NUM_INSTANCES, 'instance')}{TEXT_COLOR_DEFAULT}"
f"{TEXT_COLOR_YELLOW}"
f"Starting {pluralize(NUM_INSTANCES, 'instance')}..."
f"{TEXT_COLOR_DEFAULT}"
)

# start the service
Expand Down Expand Up @@ -135,18 +137,18 @@ def still_starting():
if still_starting():
raise Exception(f"Failed to start instances before {STARTING_TIMEOUT} elapsed :( ...")

print("All instances started :)")
print(f"{TEXT_COLOR_YELLOW}All instances started :){TEXT_COLOR_DEFAULT}")

# allow the service to run for a short while
# (and allowing its requestor-end handlers to interact with it)

start_time = datetime.now()

while datetime.now() < start_time + timedelta(minutes=2):
while datetime.now() < start_time + timedelta(seconds=running_time):
print(f"instances: {instances()}")
await asyncio.sleep(5)

print(f"{TEXT_COLOR_YELLOW}stopping instances{TEXT_COLOR_DEFAULT}")
print(f"{TEXT_COLOR_YELLOW}Stopping instances...{TEXT_COLOR_DEFAULT}")
cluster.stop()

# wait for instances to stop
Expand All @@ -163,6 +165,15 @@ def still_starting():
parser = build_parser(
"A very simple / POC example of a service running on Golem, utilizing the VM runtime"
)
parser.add_argument(
"--running-time",
default=120,
type=int,
help=(
"How long should the instance run before the cluster is stopped "
"(in seconds, default: %(default)s)"
),
)
now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"simple-service-yapapi-{now}.log")
args = parser.parse_args()
Expand All @@ -179,7 +190,12 @@ def still_starting():

loop = asyncio.get_event_loop()
task = loop.create_task(
main(subnet_tag=args.subnet_tag, driver=args.driver, network=args.network)
main(
subnet_tag=args.subnet_tag,
running_time=args.running_time,
driver=args.driver,
network=args.network,
)
)

try:
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ ya-client-payment = "0.1.0"
ya-market = "0.1.0"

[tool.poe.tasks]
test = "pytest --cov=yapapi --ignore tests/goth"
goth-assets = "python -m goth create-assets tests/goth/assets"
goth-tests = "pytest -svx tests/goth"
test = "pytest --cov=yapapi --ignore tests/goth_tests"
goth-assets = "python -m goth create-assets tests/goth_tests/assets"
goth-tests = "pytest -svx tests/goth_tests"
typecheck = "mypy ."
codestyle = "black --check --diff ."
_liccheck_export = "poetry export -E cli -f requirements.txt -o .requirements.txt"
Expand Down
Empty file added tests/goth_tests/__init__.py
Empty file.
59 changes: 59 additions & 0 deletions tests/goth_tests/assertions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Temporal assertions used in `goth` integration tests.
The assertions express properties of lines of output printed by requestor
scripts (e.g. `blender.py`) to their stdout or stderr. For example, one such
property would be that if the requestor script prints a line containing
the string "Agreement confirmed by provider P", for any name P, then
eventually it will also print a line containing "Accepted invoice from P".
"""
import logging
import re
from typing import Set

from goth.assertions import EventStream


logger = logging.getLogger("goth.test.assertions")


async def assert_no_errors(output_lines: EventStream[str]):
"""Assert that no output line contains the substring `ERROR`."""
async for line in output_lines:
if "ERROR" in line:
raise AssertionError("Command reported ERROR")


async def assert_all_invoices_accepted(output_lines: EventStream[str]):
"""Assert that an invoice is accepted for every provider that confirmed an agreement."""
unpaid_agreement_providers = set()

async for line in output_lines:
m = re.search("Agreement confirmed by provider '([^']*)'", line)
if m:
prov_name = m.group(1)
logger.debug("assert_all_invoices_accepted: adding provider '%s'", prov_name)
unpaid_agreement_providers.add(prov_name)
m = re.search("Accepted invoice from '([^']*)'", line)
if m:
prov_name = m.group(1)
logger.debug("assert_all_invoices_accepted: adding invoice for '%s'", prov_name)
unpaid_agreement_providers.remove(prov_name)

if unpaid_agreement_providers:
raise AssertionError(f"Unpaid agreements for: {','.join(unpaid_agreement_providers)}")


async def assert_tasks_processed(tasks: Set[str], status: str, output_lines: EventStream[str]):
"""Assert that for every task in `tasks` a line with `Task {status}` will appear."""
remaining_tasks = tasks.copy()

async for line in output_lines:
m = re.search(rf".*Task {status} .* task data: (.+)$", line)
if m:
task_data = m.group(1)
logger.debug("assert_tasks_processed: Task %s: %s", status, task_data)
remaining_tasks.discard(task_data)
if not remaining_tasks:
return

raise AssertionError(f"Tasks not {status}: {remaining_tasks}")
9 changes: 7 additions & 2 deletions tests/goth/conftest.py → tests/goth_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from datetime import datetime, timezone
from pathlib import Path
from typing import cast, List, Tuple
from typing import cast, List

import pytest

from goth.configuration import Override
from yapapi.package import Package, vm
from yapapi.package import vm


def pytest_addoption(parser):
Expand Down Expand Up @@ -49,6 +49,11 @@ def log_dir() -> Path:
return log_dir


@pytest.fixture(scope="session")
def goth_config_path(project_dir) -> Path:
return project_dir / "tests" / "goth_tests" / "assets" / "goth-config.yml"


@pytest.fixture()
def blender_vm_package():
async def coro():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import os
from pathlib import Path
import re
from typing import List

import pytest

from goth.configuration import load_yaml
from goth.configuration import load_yaml, Override
from goth.runner.log import configure_logging
from goth.runner import Runner
from goth.runner.probe import RequestorProbe
Expand Down Expand Up @@ -59,16 +60,13 @@ async def assert_all_tasks_computed(stream):

@pytest.mark.asyncio
async def test_agreement_termination(
project_dir: Path,
log_dir: Path,
config_overrides,
goth_config_path: Path,
config_overrides: List[Override],
) -> None:

# This is the default configuration with 2 wasm/VM providers
goth_config = load_yaml(
project_dir / "tests" / "goth" / "assets" / "goth-config.yml",
config_overrides,
)
goth_config = load_yaml(goth_config_path, config_overrides)
test_script_path = str(Path(__file__).parent / "requestor.py")

configure_logging(log_dir)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
from pathlib import Path
from typing import List

import pytest

Expand All @@ -14,7 +15,11 @@


@pytest.mark.asyncio
async def test_async_task_generation(project_dir: Path, log_dir: Path, config_overrides) -> None:
async def test_async_task_generation(
log_dir: Path,
goth_config_path: Path,
config_overrides: List[goth.configuration.Override],
) -> None:
"""Run the `requestor.py` and make sure that it's standard output is as expected."""

configure_logging(log_dir)
Expand All @@ -25,10 +30,7 @@ async def test_async_task_generation(project_dir: Path, log_dir: Path, config_ov
{"name": "provider-1", "type": "VM-Wasm-Provider", "use-proxy": True},
]
config_overrides.append(("nodes", nodes))
goth_config = goth.configuration.load_yaml(
project_dir / "tests" / "goth" / "assets" / "goth-config.yml",
config_overrides,
)
goth_config = goth.configuration.load_yaml(goth_config_path, config_overrides)

runner = Runner(base_log_dir=log_dir, compose_config=goth_config.compose_config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,9 @@
termination before going to the `running` state.
"""
import asyncio
from datetime import datetime, timedelta
import pathlib
import sys
from types import TracebackType
from typing import Optional, Type
from datetime import datetime

from yapapi import Golem
from yapapi.rest.activity import CommandExecutionError, BatchTimeoutError
from yapapi.services import Service

from yapapi.log import enable_default_logger, log_summary, log_event_repr, pluralize # noqa
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
from pathlib import Path
from typing import List

import pytest

Expand All @@ -15,7 +16,11 @@


@pytest.mark.asyncio
async def test_instance_restart(project_dir: Path, log_dir: Path, config_overrides) -> None:
async def test_instance_restart(
log_dir: Path,
goth_config_path: Path,
config_overrides: List[goth.configuration.Override],
) -> None:
"""Run the `requestor.py` and make sure that it's standard output is as expected."""

configure_logging(log_dir)
Expand All @@ -26,10 +31,7 @@ async def test_instance_restart(project_dir: Path, log_dir: Path, config_overrid
{"name": "provider-1", "type": "VM-Wasm-Provider", "use-proxy": True},
]
config_overrides.append(("nodes", nodes))
goth_config = goth.configuration.load_yaml(
project_dir / "tests" / "goth" / "assets" / "goth-config.yml",
config_overrides,
)
goth_config = goth.configuration.load_yaml(goth_config_path, config_overrides)

runner = Runner(base_log_dir=log_dir, compose_config=goth_config.compose_config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
from pathlib import Path
import re
from typing import List

import pytest

Expand Down Expand Up @@ -49,7 +50,11 @@ async def assert_multiple_workers_run(agr_id, events):


@pytest.mark.asyncio
async def test_multiactivity_agreement(project_dir: Path, log_dir: Path, config_overrides) -> None:
async def test_multiactivity_agreement(
log_dir: Path,
goth_config_path: Path,
config_overrides: List[goth.configuration.Override],
) -> None:

configure_logging(log_dir)

Expand All @@ -59,10 +64,7 @@ async def test_multiactivity_agreement(project_dir: Path, log_dir: Path, config_
{"name": "provider-1", "type": "VM-Wasm-Provider", "use-proxy": True},
]
config_overrides.append(("nodes", nodes))
goth_config = goth.configuration.load_yaml(
project_dir / "tests" / "goth" / "assets" / "goth-config.yml",
config_overrides,
)
goth_config = goth.configuration.load_yaml(goth_config_path, config_overrides)

runner = Runner(base_log_dir=log_dir, compose_config=goth_config.compose_config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async def wait_for_event(event_type: Type[Event], timeout: float):


@pytest.mark.asyncio
async def test_demand_resubscription(log_dir: Path, monkeypatch) -> None:
async def test_demand_resubscription(log_dir: Path, goth_config_path: Path, monkeypatch) -> None:
"""Test that checks that a demand is re-submitted after its previous submission expires."""

configure_logging(log_dir)
Expand All @@ -138,9 +138,7 @@ async def test_demand_resubscription(log_dir: Path, monkeypatch) -> None:
{"name": "requestor", "type": "Requestor"},
{"name": "provider-1", "type": "VM-Wasm-Provider", "use-proxy": True},
]
goth_config = load_yaml(
Path(__file__).parent / "assets" / "goth-config.yml", [("nodes", nodes)]
)
goth_config = load_yaml(goth_config_path, [("nodes", nodes)])

vm_package = await vm.repo(
image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
Expand Down
Loading

0 comments on commit 5ee0706

Please sign in to comment.