From 44e56aa59535a8c773141e28f23f5ccaa9a85d52 Mon Sep 17 00:00:00 2001 From: azawlocki Date: Mon, 23 Nov 2020 10:11:31 +0100 Subject: [PATCH 1/2] Gather exceptions from all tasks on exit; simplify finally in submit() --- yapapi/executor/__init__.py | 91 +++++++++++++++++++------------------ 1 file changed, 46 insertions(+), 45 deletions(-) diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 0f1aa98ed..cad0eaee1 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -1,6 +1,7 @@ """ An implementation of the new Golem's task executor. """ +import asyncio from asyncio import CancelledError from datetime import datetime, timedelta, timezone from decimal import Decimal @@ -134,7 +135,29 @@ async def submit( :param data: an iterator of Task objects to be computed on providers :return: yields computation progress events """ - import asyncio + + services: Set[asyncio.Task] = set() + workers: Set[asyncio.Task] = set() + + try: + async for result in self._submit(worker, data, services, workers): + yield result + + finally: + # Cancel and gather all tasks to make sure all exceptions are retrieved. + all_tasks = workers.union(services) + for task in all_tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*all_tasks, return_exceptions=True) + + async def _submit( + self, + worker: Callable[[WorkContext, AsyncIterator[Task[D, R]]], AsyncGenerator[Work, None]], + data: Iterable[Task[D, R]], + services: Set[asyncio.Task], + workers: Set[asyncio.Task], + ) -> AsyncIterator[Task[D, R]]: import contextlib import random @@ -177,7 +200,6 @@ def input_tasks() -> Iterable[Task[D, R]]: work_queue = SmartQueue(input_tasks()) - workers: Set[asyncio.Task[None]] = set() last_wid = 0 agreements_to_pay: Set[str] = set() @@ -401,21 +423,14 @@ async def worker_starter() -> None: get_offers_deadline = datetime.now(timezone.utc) + self._conf.get_offers_timeout get_done_task: Optional[asyncio.Task] = None - initial_services = { - find_offers_task, - loop.create_task(worker_starter()), - process_invoices_job, - wait_until_done, - } - services = initial_services.copy() - - async def _cancel_tasks_silently() -> None: - for task in initial_services.union(services).union(workers): - try: - task.cancel() - await asyncio.wait_for(task, timeout=0.1) - except (Exception, CancelledError, KeyboardInterrupt) as e: - pass + services.update( + { + find_offers_task, + loop.create_task(worker_starter()), + process_invoices_job, + wait_until_done, + } + ) try: while wait_until_done in services or not done_queue.empty(): @@ -462,43 +477,29 @@ async def _cancel_tasks_silently() -> None: emit(events.ComputationFinished()) except (Exception, CancelledError, KeyboardInterrupt) as e: - emit(events.ComputationFinished(exc_info=sys.exc_info())) # type: ignore - if not isinstance(e, (CancelledError, KeyboardInterrupt)): - raise finally: - try: - payment_closing = True - find_offers_task.cancel() - try: - if workers: - for worker_task in workers: - worker_task.cancel() - await asyncio.wait(workers, timeout=15, return_when=asyncio.ALL_COMPLETED) - except Exception: - if self._conf.traceback: - traceback.print_exc() - - for worker_task in workers: - worker_task.cancel() - await asyncio.wait( - workers.union({find_offers_task, process_invoices_job}), - timeout=5, - return_when=asyncio.ALL_COMPLETED, - ) - finally: - await _cancel_tasks_silently() - - try: payment_closing = True + find_offers_task.cancel() + try: + if workers: + for worker_task in workers: + worker_task.cancel() + await asyncio.wait(workers, timeout=15, return_when=asyncio.ALL_COMPLETED) + except Exception: + if self._conf.traceback: + traceback.print_exc() + await asyncio.wait( + workers.union({find_offers_task, process_invoices_job}), + timeout=5, + return_when=asyncio.ALL_COMPLETED, + ) if agreements_to_pay: await asyncio.wait( {process_invoices_job}, timeout=15, return_when=asyncio.ALL_COMPLETED ) - finally: - await _cancel_tasks_silently() async def _create_allocations(self) -> rest.payment.MarketDecoration: if not self._budget_allocations: From ce63a4921b40ebda2be387ad3cfd09a5fcf0acec Mon Sep 17 00:00:00 2001 From: azawlocki Date: Mon, 23 Nov 2020 10:22:47 +0100 Subject: [PATCH 2/2] Catch KeyboardInterrupt in blender/yacat for graceful shutdown --- examples/blender/blender.py | 19 ++++++++++++++++--- examples/yacat/yacat.py | 19 ++++++++++++++++--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/examples/blender/blender.py b/examples/blender/blender.py index 0fda1ad03..504f6c343 100755 --- a/examples/blender/blender.py +++ b/examples/blender/blender.py @@ -119,7 +119,20 @@ async def worker(ctx: WorkContext, tasks): task = loop.create_task(main(subnet_tag=args.subnet_tag)) try: loop.run_until_complete(task) - except (Exception, KeyboardInterrupt) as e: - print(e) + except KeyboardInterrupt: + print( + f"{utils.TEXT_COLOR_YELLOW}" + "Shutting down gracefully, please wait a few seconds " + "or press Ctrl+C to exit immediately..." + f"{utils.TEXT_COLOR_DEFAULT}" + ) task.cancel() - loop.run_until_complete(task) + try: + loop.run_until_complete(task) + print( + f"{utils.TEXT_COLOR_YELLOW}" + "Shutdown completed, thank you for waiting!" + f"{utils.TEXT_COLOR_DEFAULT}" + ) + except KeyboardInterrupt: + pass diff --git a/examples/yacat/yacat.py b/examples/yacat/yacat.py index 845022b86..63fdd1d66 100644 --- a/examples/yacat/yacat.py +++ b/examples/yacat/yacat.py @@ -155,7 +155,20 @@ async def worker_find_password(ctx: WorkContext, tasks): try: loop.run_until_complete(task) - except (Exception, KeyboardInterrupt) as e: - print(e) + except KeyboardInterrupt: + print( + f"{utils.TEXT_COLOR_YELLOW}" + "Shutting down gracefully, please wait a few seconds " + "or press Ctrl+C to exit immediately..." + f"{utils.TEXT_COLOR_DEFAULT}" + ) task.cancel() - loop.run_until_complete(task) + try: + loop.run_until_complete(task) + print( + f"{utils.TEXT_COLOR_YELLOW}" + "Shutdown completed, thank you for waiting!" + f"{utils.TEXT_COLOR_DEFAULT}" + ) + except KeyboardInterrupt: + pass