Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown on Ctrl+C in blender/yacat #134

Merged
merged 2 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions examples/blender/blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,20 @@ async def worker(ctx: WorkContext, tasks):
task = loop.create_task(main(subnet_tag=args.subnet_tag))
try:
loop.run_until_complete(task)
except (Exception, KeyboardInterrupt) as e:
print(e)
except KeyboardInterrupt:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put this into some utility function... context manager in examples/utils.py maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, but I would create an issue for this instead of including it in the current PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the example to fit in one file and be as short as possible. I discussed this with @mfranciszkiewicz last week - Marek, can you share your opinion here?

Copy link
Contributor

@mfranciszkiewicz mfranciszkiewicz Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO utils.py is not the way to go

  • the argument parser is trivial enough to be copied between examples; we're copying the sys.path expansion code instead
  • if there's a need for creating common worker launcher code / logging facility, maybe it's worth considering making those a part of the library

I'm worried that the initial purpose of examples will be obscured and they'll be better off being separate apps instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mfranciszkiewicz thanks for your remarks. Including the code for building the parser in blender.py/yacat.py was also my idea at the beginning but then a reviewer convinced me that that code should be shared between the example scripts. That's how we ended up with a shared examples/utils.py module.

I'd consider adding some boilerplate code for creating an asyncio event loop and running the main task + handling graceful termination to the yapapi itself (and maybe pulling the code for creating commandline parser back to b lender.py/yacat.py) but that's not in the scope of the current PR.

print(
f"{utils.TEXT_COLOR_YELLOW}"
"Shutting down gracefully, please wait a few seconds "
"or press Ctrl+C to exit immediately..."
f"{utils.TEXT_COLOR_DEFAULT}"
)
task.cancel()
loop.run_until_complete(task)
try:
loop.run_until_complete(task)
print(
f"{utils.TEXT_COLOR_YELLOW}"
"Shutdown completed, thank you for waiting!"
f"{utils.TEXT_COLOR_DEFAULT}"
)
except KeyboardInterrupt:
pass
19 changes: 16 additions & 3 deletions examples/yacat/yacat.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,20 @@ async def worker_find_password(ctx: WorkContext, tasks):

try:
loop.run_until_complete(task)
except (Exception, KeyboardInterrupt) as e:
print(e)
except KeyboardInterrupt:
print(
f"{utils.TEXT_COLOR_YELLOW}"
"Shutting down gracefully, please wait a few seconds "
"or press Ctrl+C to exit immediately..."
f"{utils.TEXT_COLOR_DEFAULT}"
)
task.cancel()
loop.run_until_complete(task)
try:
loop.run_until_complete(task)
print(
f"{utils.TEXT_COLOR_YELLOW}"
"Shutdown completed, thank you for waiting!"
f"{utils.TEXT_COLOR_DEFAULT}"
)
except KeyboardInterrupt:
pass
91 changes: 46 additions & 45 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
An implementation of the new Golem's task executor.
"""
import asyncio
from asyncio import CancelledError
from datetime import datetime, timedelta, timezone
from decimal import Decimal
Expand Down Expand Up @@ -134,7 +135,29 @@ async def submit(
:param data: an iterator of Task objects to be computed on providers
:return: yields computation progress events
"""
import asyncio

services: Set[asyncio.Task] = set()
workers: Set[asyncio.Task] = set()

try:
async for result in self._submit(worker, data, services, workers):
yield result

finally:
# Cancel and gather all tasks to make sure all exceptions are retrieved.
all_tasks = workers.union(services)
for task in all_tasks:
if not task.done():
task.cancel()
await asyncio.gather(*all_tasks, return_exceptions=True)

async def _submit(
self,
worker: Callable[[WorkContext, AsyncIterator[Task[D, R]]], AsyncGenerator[Work, None]],
data: Iterable[Task[D, R]],
services: Set[asyncio.Task],
workers: Set[asyncio.Task],
) -> AsyncIterator[Task[D, R]]:
import contextlib
import random

Expand Down Expand Up @@ -177,7 +200,6 @@ def input_tasks() -> Iterable[Task[D, R]]:

work_queue = SmartQueue(input_tasks())

workers: Set[asyncio.Task[None]] = set()
last_wid = 0

agreements_to_pay: Set[str] = set()
Expand Down Expand Up @@ -401,21 +423,14 @@ async def worker_starter() -> None:

get_offers_deadline = datetime.now(timezone.utc) + self._conf.get_offers_timeout
get_done_task: Optional[asyncio.Task] = None
initial_services = {
find_offers_task,
loop.create_task(worker_starter()),
process_invoices_job,
wait_until_done,
}
services = initial_services.copy()

async def _cancel_tasks_silently() -> None:
for task in initial_services.union(services).union(workers):
try:
task.cancel()
await asyncio.wait_for(task, timeout=0.1)
except (Exception, CancelledError, KeyboardInterrupt) as e:
pass
services.update(
{
find_offers_task,
loop.create_task(worker_starter()),
process_invoices_job,
wait_until_done,
}
)

try:
while wait_until_done in services or not done_queue.empty():
Expand Down Expand Up @@ -462,43 +477,29 @@ async def _cancel_tasks_silently() -> None:
emit(events.ComputationFinished())

except (Exception, CancelledError, KeyboardInterrupt) as e:

emit(events.ComputationFinished(exc_info=sys.exc_info())) # type: ignore
if not isinstance(e, (CancelledError, KeyboardInterrupt)):
raise

finally:

try:
payment_closing = True
find_offers_task.cancel()
try:
if workers:
for worker_task in workers:
worker_task.cancel()
await asyncio.wait(workers, timeout=15, return_when=asyncio.ALL_COMPLETED)
except Exception:
if self._conf.traceback:
traceback.print_exc()

for worker_task in workers:
worker_task.cancel()
await asyncio.wait(
workers.union({find_offers_task, process_invoices_job}),
timeout=5,
return_when=asyncio.ALL_COMPLETED,
)
finally:
await _cancel_tasks_silently()

try:
payment_closing = True
find_offers_task.cancel()
try:
if workers:
for worker_task in workers:
worker_task.cancel()
await asyncio.wait(workers, timeout=15, return_when=asyncio.ALL_COMPLETED)
except Exception:
if self._conf.traceback:
traceback.print_exc()
await asyncio.wait(
workers.union({find_offers_task, process_invoices_job}),
timeout=5,
return_when=asyncio.ALL_COMPLETED,
)
if agreements_to_pay:
await asyncio.wait(
{process_invoices_job}, timeout=15, return_when=asyncio.ALL_COMPLETED
)
finally:
await _cancel_tasks_silently()

async def _create_allocations(self) -> rest.payment.MarketDecoration:
if not self._budget_allocations:
Expand Down