Skip to content

Commit

Permalink
Merge pull request #134 from golemfactory/hotfix/graceful-shutdown
Browse files Browse the repository at this point in the history
Graceful shutdown on Ctrl+C in blender/yacat
  • Loading branch information
azawlocki authored Nov 23, 2020
2 parents e53fcca + ce63a49 commit 6fb69fe
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 51 deletions.
19 changes: 16 additions & 3 deletions examples/blender/blender.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,20 @@ async def worker(ctx: WorkContext, tasks):
task = loop.create_task(main(subnet_tag=args.subnet_tag))
try:
loop.run_until_complete(task)
except (Exception, KeyboardInterrupt) as e:
print(e)
except KeyboardInterrupt:
print(
f"{utils.TEXT_COLOR_YELLOW}"
"Shutting down gracefully, please wait a few seconds "
"or press Ctrl+C to exit immediately..."
f"{utils.TEXT_COLOR_DEFAULT}"
)
task.cancel()
loop.run_until_complete(task)
try:
loop.run_until_complete(task)
print(
f"{utils.TEXT_COLOR_YELLOW}"
"Shutdown completed, thank you for waiting!"
f"{utils.TEXT_COLOR_DEFAULT}"
)
except KeyboardInterrupt:
pass
19 changes: 16 additions & 3 deletions examples/yacat/yacat.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,20 @@ async def worker_find_password(ctx: WorkContext, tasks):

try:
loop.run_until_complete(task)
except (Exception, KeyboardInterrupt) as e:
print(e)
except KeyboardInterrupt:
print(
f"{utils.TEXT_COLOR_YELLOW}"
"Shutting down gracefully, please wait a few seconds "
"or press Ctrl+C to exit immediately..."
f"{utils.TEXT_COLOR_DEFAULT}"
)
task.cancel()
loop.run_until_complete(task)
try:
loop.run_until_complete(task)
print(
f"{utils.TEXT_COLOR_YELLOW}"
"Shutdown completed, thank you for waiting!"
f"{utils.TEXT_COLOR_DEFAULT}"
)
except KeyboardInterrupt:
pass
91 changes: 46 additions & 45 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
An implementation of the new Golem's task executor.
"""
import asyncio
from asyncio import CancelledError
from datetime import datetime, timedelta, timezone
from decimal import Decimal
Expand Down Expand Up @@ -134,7 +135,29 @@ async def submit(
:param data: an iterator of Task objects to be computed on providers
:return: yields computation progress events
"""
import asyncio

services: Set[asyncio.Task] = set()
workers: Set[asyncio.Task] = set()

try:
async for result in self._submit(worker, data, services, workers):
yield result

finally:
# Cancel and gather all tasks to make sure all exceptions are retrieved.
all_tasks = workers.union(services)
for task in all_tasks:
if not task.done():
task.cancel()
await asyncio.gather(*all_tasks, return_exceptions=True)

async def _submit(
self,
worker: Callable[[WorkContext, AsyncIterator[Task[D, R]]], AsyncGenerator[Work, None]],
data: Iterable[Task[D, R]],
services: Set[asyncio.Task],
workers: Set[asyncio.Task],
) -> AsyncIterator[Task[D, R]]:
import contextlib
import random

Expand Down Expand Up @@ -177,7 +200,6 @@ def input_tasks() -> Iterable[Task[D, R]]:

work_queue = SmartQueue(input_tasks())

workers: Set[asyncio.Task[None]] = set()
last_wid = 0

agreements_to_pay: Set[str] = set()
Expand Down Expand Up @@ -401,21 +423,14 @@ async def worker_starter() -> None:

get_offers_deadline = datetime.now(timezone.utc) + self._conf.get_offers_timeout
get_done_task: Optional[asyncio.Task] = None
initial_services = {
find_offers_task,
loop.create_task(worker_starter()),
process_invoices_job,
wait_until_done,
}
services = initial_services.copy()

async def _cancel_tasks_silently() -> None:
for task in initial_services.union(services).union(workers):
try:
task.cancel()
await asyncio.wait_for(task, timeout=0.1)
except (Exception, CancelledError, KeyboardInterrupt) as e:
pass
services.update(
{
find_offers_task,
loop.create_task(worker_starter()),
process_invoices_job,
wait_until_done,
}
)

try:
while wait_until_done in services or not done_queue.empty():
Expand Down Expand Up @@ -462,43 +477,29 @@ async def _cancel_tasks_silently() -> None:
emit(events.ComputationFinished())

except (Exception, CancelledError, KeyboardInterrupt) as e:

emit(events.ComputationFinished(exc_info=sys.exc_info())) # type: ignore
if not isinstance(e, (CancelledError, KeyboardInterrupt)):
raise

finally:

try:
payment_closing = True
find_offers_task.cancel()
try:
if workers:
for worker_task in workers:
worker_task.cancel()
await asyncio.wait(workers, timeout=15, return_when=asyncio.ALL_COMPLETED)
except Exception:
if self._conf.traceback:
traceback.print_exc()

for worker_task in workers:
worker_task.cancel()
await asyncio.wait(
workers.union({find_offers_task, process_invoices_job}),
timeout=5,
return_when=asyncio.ALL_COMPLETED,
)
finally:
await _cancel_tasks_silently()

try:
payment_closing = True
find_offers_task.cancel()
try:
if workers:
for worker_task in workers:
worker_task.cancel()
await asyncio.wait(workers, timeout=15, return_when=asyncio.ALL_COMPLETED)
except Exception:
if self._conf.traceback:
traceback.print_exc()
await asyncio.wait(
workers.union({find_offers_task, process_invoices_job}),
timeout=5,
return_when=asyncio.ALL_COMPLETED,
)
if agreements_to_pay:
await asyncio.wait(
{process_invoices_job}, timeout=15, return_when=asyncio.ALL_COMPLETED
)
finally:
await _cancel_tasks_silently()

async def _create_allocations(self) -> rest.payment.MarketDecoration:
if not self._budget_allocations:
Expand Down

0 comments on commit 6fb69fe

Please sign in to comment.