From 2138b6e34d5ba2e7c98c910062c2b117bc6b4c04 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Wed, 14 Jul 2021 21:59:39 +0200 Subject: [PATCH 1/2] Disable spawning another instance if one instance was stopped by signal --- yapapi/services.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/yapapi/services.py b/yapapi/services.py index 59afcbffd..10e32e25c 100644 --- a/yapapi/services.py +++ b/yapapi/services.py @@ -593,8 +593,13 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> try: await task if ( + # if the instance was created ... instance + # but failed to start ... and not instance.started_successfully + # due to an error (and not a `STOP` signal) ... + and instance.service.exc_info() != (None, None, None) + # and re-spawning instances is enabled for this cluster and self._respawn_unstarted_instances ): logger.warning("Instance failed when starting, trying to create another one...") From dac37d0413e4cf19f8b76840054bfd357144bbde Mon Sep 17 00:00:00 2001 From: azawlocki Date: Wed, 14 Jul 2021 22:00:47 +0200 Subject: [PATCH 2/2] Extend the integration test for re-spawning instances --- .../test_instance_restart/requestor.py | 83 ++++++++++++++++--- .../test_instance_restart.py | 49 ++++++++--- 2 files changed, 109 insertions(+), 23 deletions(-) diff --git a/tests/goth_tests/test_instance_restart/requestor.py b/tests/goth_tests/test_instance_restart/requestor.py index abc3b12a8..4895042e2 100755 --- a/tests/goth_tests/test_instance_restart/requestor.py +++ b/tests/goth_tests/test_instance_restart/requestor.py @@ -7,6 +7,7 @@ """ import asyncio from datetime import datetime +import sys from yapapi import Golem from yapapi.services import Service @@ -17,6 +18,12 @@ instances_started = 0 instances_running = 0 +instances_stopped = 0 + + +def log(*args): + # Like `print` but outputs to stderr to avoid buffering + print(*args, file=sys.stderr) class FirstInstanceFailsToStart(Service): @@ -31,40 +38,90 @@ async def get_payload(): async def start(self): global instances_started - instances_started += 1 - await asyncio.sleep(1) + self._ctx.run("/bin/echo", "STARTING", str(instances_started + 1)) + future_results = yield self._ctx.commit() + results = await future_results + log(f"{results[-1].stdout.strip()}") + instances_started += 1 if instances_started == 1: self._ctx.run("/no/such/command") - else: - self._ctx.run("/bin/echo", "STARTING") - future_results = yield self._ctx.commit() - results = await future_results - print(f"{results[-1].stdout.strip()}") + future_results = yield self._ctx.commit() + await future_results + + if instances_started > 2: + # Wait for the stop signal here + await asyncio.sleep(30) async def run(self): global instances_running instances_running += 1 - await asyncio.sleep(1) - self._ctx.run("/bin/echo", "RUNNING") + self._ctx.run("/bin/echo", "RUNNING", str(instances_started)) future_results = yield self._ctx.commit() results = await future_results - print(f"{results[-1].stdout.strip()}") + log(f"{results[-1].stdout.strip()}") + + self._ctx.run("/no/such/command") + future_results = yield self._ctx.commit() + await future_results + + async def shutdown(self): + + global instances_stopped + + log("STOPPING", instances_started) + if False: + yield + instances_stopped += 1 async def main(): async with Golem(budget=1.0, subnet_tag="goth") as golem: - print("Starting cluster...") + # Start a cluster with a single service. + # The first instance will fail before reaching the `running` state + # due to an error. Another instance should be spawned in its place. + + log("Starting cluster...") await golem.run_service(FirstInstanceFailsToStart) + # This another instance should get to `running` state. + while not instances_running: - print("Waiting for an instance...") - await asyncio.sleep(5) + log("Waiting for an instance...") + await asyncio.sleep(2) + + # And then stop. + + while not instances_stopped: + log("Waiting until the instance stops...") + await asyncio.sleep(2) + + # Then we start another cluster with a single instance. + # This time the instance is stopped by a signal before it reaches the `running` state, + # but in that case the cluster should not spawn another instance. + + log("Starting another cluster...") + cluster = await golem.run_service(FirstInstanceFailsToStart) + + while instances_started < 3: + log("Waiting for another instance...") + await asyncio.sleep(2) + + assert [i for i in cluster.instances if i.is_available] + + log("Closing the second cluster...") + cluster.stop() + + while [i for i in cluster.instances if i.is_available]: + log("Waiting for the cluster to stop...") + await asyncio.sleep(2) + + log("Cluster stopped") if __name__ == "__main__": diff --git a/tests/goth_tests/test_instance_restart/test_instance_restart.py b/tests/goth_tests/test_instance_restart/test_instance_restart.py index 1dc5ef5db..39c3d4d36 100644 --- a/tests/goth_tests/test_instance_restart/test_instance_restart.py +++ b/tests/goth_tests/test_instance_restart/test_instance_restart.py @@ -15,6 +15,25 @@ logger = logging.getLogger("goth.test.async_task_generation") +instances_started = set() +instances_running = set() + + +async def count_instances(events): + global instances_started, instances_running + + async for line in events: + line = line.strip() + try: + word, num = line.split() + if word == "STARTING": + instances_started.add(int(num)) + elif word == "RUNNING": + instances_running.add(int(num)) + except ValueError: + pass + + @pytest.mark.asyncio async def test_instance_restart( log_dir: Path, @@ -43,17 +62,27 @@ async def test_instance_restart( str(Path(__file__).parent / "requestor.py"), env=os.environ ) as (_cmd_task, cmd_monitor): + cmd_monitor.add_assertion(count_instances) + # The first attempt to create an instance should fail - await cmd_monitor.wait_for_pattern( - r".*INFO yapapi\.services\] .* commissioned$", timeout=60 - ) + await cmd_monitor.wait_for_pattern("STARTING 1$", timeout=60) + await cmd_monitor.wait_for_pattern(".*CommandExecutionError", timeout=20) + + # The second one should successfully start and fail in `running` state + await cmd_monitor.wait_for_pattern("STARTING 2$", timeout=20) + await cmd_monitor.wait_for_pattern("RUNNING 2$", timeout=20) await cmd_monitor.wait_for_pattern(".*CommandExecutionError", timeout=20) - await cmd_monitor.wait_for_pattern( - r".*INFO yapapi\.services\] .* decommissioned$", timeout=20 + await cmd_monitor.wait_for_pattern("STOPPING 2$", timeout=20) + + # The third instance should be started, but not running + await cmd_monitor.wait_for_pattern("STARTING 3$", timeout=20) + await cmd_monitor.wait_for_pattern("Cluster stopped$", timeout=60) + + assert instances_started == {1, 2, 3}, ( + "Expected to see instances 1, 2, 3 starting, saw instances " + f"{', '.join(str(n) for n in instances_started)} instead" ) - # The second one should succeed - await cmd_monitor.wait_for_pattern( - r".*INFO yapapi\.services\] .* commissioned$", timeout=30 + assert instances_running == {2}, ( + "Expected to see only instance 2 running, saw instances " + f"{', '.join(str(n) for n in instances_running)} instead" ) - await cmd_monitor.wait_for_pattern("STARTING$", timeout=20) - await cmd_monitor.wait_for_pattern("RUNNING$", timeout=20)