diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index d1b63cebb..0f1aa98ed 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -1,6 +1,7 @@ """ An implementation of the new Golem's task executor. """ +from asyncio import CancelledError from datetime import datetime, timedelta, timezone from decimal import Decimal import os @@ -397,17 +398,26 @@ async def worker_starter() -> None: process_invoices_job = loop.create_task(process_invoices()) wait_until_done = loop.create_task(work_queue.wait_until_done()) # Py38: find_offers_task.set_name('find_offers_task') + get_offers_deadline = datetime.now(timezone.utc) + self._conf.get_offers_timeout + get_done_task: Optional[asyncio.Task] = None + initial_services = { + find_offers_task, + loop.create_task(worker_starter()), + process_invoices_job, + wait_until_done, + } + services = initial_services.copy() - try: - get_done_task: Optional[asyncio.Task] = None - services = { - find_offers_task, - loop.create_task(worker_starter()), - process_invoices_job, - wait_until_done, - } + async def _cancel_tasks_silently() -> None: + for task in initial_services.union(services).union(workers): + try: + task.cancel() + await asyncio.wait_for(task, timeout=0.1) + except (Exception, CancelledError, KeyboardInterrupt) as e: + pass + try: while wait_until_done in services or not done_queue.empty(): now = datetime.now(timezone.utc) @@ -451,38 +461,44 @@ async def worker_starter() -> None: emit(events.ComputationFinished()) - except Exception as e: - if ( - not isinstance(e, (KeyboardInterrupt, asyncio.CancelledError)) - and self._conf.traceback - ): - traceback.print_exc() + except (Exception, CancelledError, KeyboardInterrupt) as e: + emit(events.ComputationFinished(exc_info=sys.exc_info())) # type: ignore + if not isinstance(e, (CancelledError, KeyboardInterrupt)): + raise finally: - payment_closing = True - find_offers_task.cancel() + try: - if workers: - for worker_task in workers: - worker_task.cancel() - await asyncio.wait(workers, timeout=15, return_when=asyncio.ALL_COMPLETED) - except Exception: - if self._conf.traceback: - traceback.print_exc() - - for worker_task in workers: - worker_task.cancel() - await asyncio.wait( - workers.union({find_offers_task, process_invoices_job}), - timeout=5, - return_when=asyncio.ALL_COMPLETED, - ) - payment_closing = True - if agreements_to_pay: - await asyncio.wait( - {process_invoices_job}, timeout=15, return_when=asyncio.ALL_COMPLETED - ) + payment_closing = True + find_offers_task.cancel() + try: + if workers: + for worker_task in workers: + worker_task.cancel() + await asyncio.wait(workers, timeout=15, return_when=asyncio.ALL_COMPLETED) + except Exception: + if self._conf.traceback: + traceback.print_exc() + + for worker_task in workers: + worker_task.cancel() + await asyncio.wait( + workers.union({find_offers_task, process_invoices_job}), + timeout=5, + return_when=asyncio.ALL_COMPLETED, + ) + finally: + await _cancel_tasks_silently() + + try: + payment_closing = True + if agreements_to_pay: + await asyncio.wait( + {process_invoices_job}, timeout=15, return_when=asyncio.ALL_COMPLETED + ) + finally: + await _cancel_tasks_silently() async def _create_allocations(self) -> rest.payment.MarketDecoration: if not self._budget_allocations: diff --git a/yapapi/rest/activity.py b/yapapi/rest/activity.py index 5a3e0b549..e266330b9 100644 --- a/yapapi/rest/activity.py +++ b/yapapi/rest/activity.py @@ -83,7 +83,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: # w/o for some buggy providers which do not kill exe-unit # on destroy_activity event. if exc_type: - _log.info( + _log.debug( "activity %s CLOSE for [%s] %s", self._id, exc_type.__name__, @@ -112,7 +112,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: with contextlib.suppress(yexc.ApiException): await self._api.destroy_activity(self._id) if exc_type: - _log.info("activity %s CLOSE done", self._id) + _log.debug("activity %s CLOSE done", self._id) @dataclass