Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't spawn another instance if one instance stops due to a signal #547

Merged
merged 2 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 70 additions & 13 deletions tests/goth_tests/test_instance_restart/requestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""
import asyncio
from datetime import datetime
import sys

from yapapi import Golem
from yapapi.services import Service
Expand All @@ -17,6 +18,12 @@

instances_started = 0
instances_running = 0
instances_stopped = 0


def log(*args):
# Like `print` but outputs to stderr to avoid buffering
print(*args, file=sys.stderr)


class FirstInstanceFailsToStart(Service):
Expand All @@ -31,40 +38,90 @@ async def get_payload():
async def start(self):

global instances_started
instances_started += 1

await asyncio.sleep(1)
self._ctx.run("/bin/echo", "STARTING", str(instances_started + 1))
future_results = yield self._ctx.commit()
results = await future_results
log(f"{results[-1].stdout.strip()}")
instances_started += 1

if instances_started == 1:
self._ctx.run("/no/such/command")
else:
self._ctx.run("/bin/echo", "STARTING")
future_results = yield self._ctx.commit()
results = await future_results
print(f"{results[-1].stdout.strip()}")
future_results = yield self._ctx.commit()
await future_results

if instances_started > 2:
# Wait for the stop signal here
await asyncio.sleep(30)

async def run(self):

global instances_running
instances_running += 1

await asyncio.sleep(1)
self._ctx.run("/bin/echo", "RUNNING")
self._ctx.run("/bin/echo", "RUNNING", str(instances_started))
future_results = yield self._ctx.commit()
results = await future_results
print(f"{results[-1].stdout.strip()}")
log(f"{results[-1].stdout.strip()}")

self._ctx.run("/no/such/command")
future_results = yield self._ctx.commit()
await future_results

async def shutdown(self):

global instances_stopped

log("STOPPING", instances_started)
if False:
yield
instances_stopped += 1


async def main():

async with Golem(budget=1.0, subnet_tag="goth") as golem:

print("Starting cluster...")
# Start a cluster with a single service.
# The first instance will fail before reaching the `running` state
# due to an error. Another instance should be spawned in its place.

log("Starting cluster...")
await golem.run_service(FirstInstanceFailsToStart)

# This another instance should get to `running` state.

while not instances_running:
print("Waiting for an instance...")
await asyncio.sleep(5)
log("Waiting for an instance...")
await asyncio.sleep(2)

# And then stop.

while not instances_stopped:
log("Waiting until the instance stops...")
await asyncio.sleep(2)

# Then we start another cluster with a single instance.
# This time the instance is stopped by a signal before it reaches the `running` state,
# but in that case the cluster should not spawn another instance.

log("Starting another cluster...")
cluster = await golem.run_service(FirstInstanceFailsToStart)

while instances_started < 3:
log("Waiting for another instance...")
await asyncio.sleep(2)

assert [i for i in cluster.instances if i.is_available]

log("Closing the second cluster...")
cluster.stop()

while [i for i in cluster.instances if i.is_available]:
log("Waiting for the cluster to stop...")
await asyncio.sleep(2)

log("Cluster stopped")


if __name__ == "__main__":
Expand Down
49 changes: 39 additions & 10 deletions tests/goth_tests/test_instance_restart/test_instance_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,25 @@
logger = logging.getLogger("goth.test.async_task_generation")


instances_started = set()
instances_running = set()


async def count_instances(events):
global instances_started, instances_running

async for line in events:
line = line.strip()
try:
word, num = line.split()
if word == "STARTING":
instances_started.add(int(num))
elif word == "RUNNING":
instances_running.add(int(num))
except ValueError:
pass


@pytest.mark.asyncio
async def test_instance_restart(
log_dir: Path,
Expand Down Expand Up @@ -43,17 +62,27 @@ async def test_instance_restart(
str(Path(__file__).parent / "requestor.py"), env=os.environ
) as (_cmd_task, cmd_monitor):

cmd_monitor.add_assertion(count_instances)

# The first attempt to create an instance should fail
await cmd_monitor.wait_for_pattern(
r".*INFO yapapi\.services\] .* commissioned$", timeout=60
)
await cmd_monitor.wait_for_pattern("STARTING 1$", timeout=60)
await cmd_monitor.wait_for_pattern(".*CommandExecutionError", timeout=20)

# The second one should successfully start and fail in `running` state
await cmd_monitor.wait_for_pattern("STARTING 2$", timeout=20)
await cmd_monitor.wait_for_pattern("RUNNING 2$", timeout=20)
await cmd_monitor.wait_for_pattern(".*CommandExecutionError", timeout=20)
await cmd_monitor.wait_for_pattern(
r".*INFO yapapi\.services\] .* decommissioned$", timeout=20
await cmd_monitor.wait_for_pattern("STOPPING 2$", timeout=20)

# The third instance should be started, but not running
await cmd_monitor.wait_for_pattern("STARTING 3$", timeout=20)
await cmd_monitor.wait_for_pattern("Cluster stopped$", timeout=60)

assert instances_started == {1, 2, 3}, (
"Expected to see instances 1, 2, 3 starting, saw instances "
f"{', '.join(str(n) for n in instances_started)} instead"
)
# The second one should succeed
await cmd_monitor.wait_for_pattern(
r".*INFO yapapi\.services\] .* commissioned$", timeout=30
assert instances_running == {2}, (
"Expected to see only instance 2 running, saw instances "
f"{', '.join(str(n) for n in instances_running)} instead"
)
await cmd_monitor.wait_for_pattern("STARTING$", timeout=20)
await cmd_monitor.wait_for_pattern("RUNNING$", timeout=20)
5 changes: 5 additions & 0 deletions yapapi/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,13 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->
try:
await task
if (
# if the instance was created ...
instance
# but failed to start ...
and not instance.started_successfully
# due to an error (and not a `STOP` signal) ...
and instance.service.exc_info() != (None, None, None)
# and re-spawning instances is enabled for this cluster
and self._respawn_unstarted_instances
):
logger.warning("Instance failed when starting, trying to create another one...")
Expand Down