Skip to content

Commit

Permalink
Merge pull request #547 from golemfactory/az/dont-respawn-after-stop
Browse files Browse the repository at this point in the history
Don't spawn another instance if one instance stops due to a signal
  • Loading branch information
azawlocki authored Jul 15, 2021
2 parents d5731a6 + dac37d0 commit 56cc368
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 23 deletions.
83 changes: 70 additions & 13 deletions tests/goth_tests/test_instance_restart/requestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""
import asyncio
from datetime import datetime
import sys

from yapapi import Golem
from yapapi.services import Service
Expand All @@ -17,6 +18,12 @@

instances_started = 0
instances_running = 0
instances_stopped = 0


def log(*args):
# Like `print` but outputs to stderr to avoid buffering
print(*args, file=sys.stderr)


class FirstInstanceFailsToStart(Service):
Expand All @@ -31,40 +38,90 @@ async def get_payload():
async def start(self):

global instances_started
instances_started += 1

await asyncio.sleep(1)
self._ctx.run("/bin/echo", "STARTING", str(instances_started + 1))
future_results = yield self._ctx.commit()
results = await future_results
log(f"{results[-1].stdout.strip()}")
instances_started += 1

if instances_started == 1:
self._ctx.run("/no/such/command")
else:
self._ctx.run("/bin/echo", "STARTING")
future_results = yield self._ctx.commit()
results = await future_results
print(f"{results[-1].stdout.strip()}")
future_results = yield self._ctx.commit()
await future_results

if instances_started > 2:
# Wait for the stop signal here
await asyncio.sleep(30)

async def run(self):

global instances_running
instances_running += 1

await asyncio.sleep(1)
self._ctx.run("/bin/echo", "RUNNING")
self._ctx.run("/bin/echo", "RUNNING", str(instances_started))
future_results = yield self._ctx.commit()
results = await future_results
print(f"{results[-1].stdout.strip()}")
log(f"{results[-1].stdout.strip()}")

self._ctx.run("/no/such/command")
future_results = yield self._ctx.commit()
await future_results

async def shutdown(self):

global instances_stopped

log("STOPPING", instances_started)
if False:
yield
instances_stopped += 1


async def main():

async with Golem(budget=1.0, subnet_tag="goth") as golem:

print("Starting cluster...")
# Start a cluster with a single service.
# The first instance will fail before reaching the `running` state
# due to an error. Another instance should be spawned in its place.

log("Starting cluster...")
await golem.run_service(FirstInstanceFailsToStart)

# This another instance should get to `running` state.

while not instances_running:
print("Waiting for an instance...")
await asyncio.sleep(5)
log("Waiting for an instance...")
await asyncio.sleep(2)

# And then stop.

while not instances_stopped:
log("Waiting until the instance stops...")
await asyncio.sleep(2)

# Then we start another cluster with a single instance.
# This time the instance is stopped by a signal before it reaches the `running` state,
# but in that case the cluster should not spawn another instance.

log("Starting another cluster...")
cluster = await golem.run_service(FirstInstanceFailsToStart)

while instances_started < 3:
log("Waiting for another instance...")
await asyncio.sleep(2)

assert [i for i in cluster.instances if i.is_available]

log("Closing the second cluster...")
cluster.stop()

while [i for i in cluster.instances if i.is_available]:
log("Waiting for the cluster to stop...")
await asyncio.sleep(2)

log("Cluster stopped")


if __name__ == "__main__":
Expand Down
49 changes: 39 additions & 10 deletions tests/goth_tests/test_instance_restart/test_instance_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,25 @@
logger = logging.getLogger("goth.test.async_task_generation")


instances_started = set()
instances_running = set()


async def count_instances(events):
global instances_started, instances_running

async for line in events:
line = line.strip()
try:
word, num = line.split()
if word == "STARTING":
instances_started.add(int(num))
elif word == "RUNNING":
instances_running.add(int(num))
except ValueError:
pass


@pytest.mark.asyncio
async def test_instance_restart(
log_dir: Path,
Expand Down Expand Up @@ -43,17 +62,27 @@ async def test_instance_restart(
str(Path(__file__).parent / "requestor.py"), env=os.environ
) as (_cmd_task, cmd_monitor):

cmd_monitor.add_assertion(count_instances)

# The first attempt to create an instance should fail
await cmd_monitor.wait_for_pattern(
r".*INFO yapapi\.services\] .* commissioned$", timeout=60
)
await cmd_monitor.wait_for_pattern("STARTING 1$", timeout=60)
await cmd_monitor.wait_for_pattern(".*CommandExecutionError", timeout=20)

# The second one should successfully start and fail in `running` state
await cmd_monitor.wait_for_pattern("STARTING 2$", timeout=20)
await cmd_monitor.wait_for_pattern("RUNNING 2$", timeout=20)
await cmd_monitor.wait_for_pattern(".*CommandExecutionError", timeout=20)
await cmd_monitor.wait_for_pattern(
r".*INFO yapapi\.services\] .* decommissioned$", timeout=20
await cmd_monitor.wait_for_pattern("STOPPING 2$", timeout=20)

# The third instance should be started, but not running
await cmd_monitor.wait_for_pattern("STARTING 3$", timeout=20)
await cmd_monitor.wait_for_pattern("Cluster stopped$", timeout=60)

assert instances_started == {1, 2, 3}, (
"Expected to see instances 1, 2, 3 starting, saw instances "
f"{', '.join(str(n) for n in instances_started)} instead"
)
# The second one should succeed
await cmd_monitor.wait_for_pattern(
r".*INFO yapapi\.services\] .* commissioned$", timeout=30
assert instances_running == {2}, (
"Expected to see only instance 2 running, saw instances "
f"{', '.join(str(n) for n in instances_running)} instead"
)
await cmd_monitor.wait_for_pattern("STARTING$", timeout=20)
await cmd_monitor.wait_for_pattern("RUNNING$", timeout=20)
5 changes: 5 additions & 0 deletions yapapi/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,13 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->
try:
await task
if (
# if the instance was created ...
instance
# but failed to start ...
and not instance.started_successfully
# due to an error (and not a `STOP` signal) ...
and instance.service.exc_info() != (None, None, None)
# and re-spawning instances is enabled for this cluster
and self._respawn_unstarted_instances
):
logger.warning("Instance failed when starting, trying to create another one...")
Expand Down

0 comments on commit 56cc368

Please sign in to comment.