diff --git a/src/ert/async_utils.py b/src/ert/async_utils.py index 6b82a8c8b1f..7b4e353ddf0 100644 --- a/src/ert/async_utils.py +++ b/src/ert/async_utils.py @@ -1,9 +1,44 @@ +from __future__ import annotations + import asyncio +import sys +from traceback import print_exception +from typing import Any, Coroutine, Generator, TypeVar, Union + +_T = TypeVar("_T") +_T_co = TypeVar("_T_co", covariant=True) + + +def new_event_loop() -> asyncio.AbstractEventLoop: + loop = asyncio.new_event_loop() + loop.set_task_factory(_create_task) + return loop def get_event_loop() -> asyncio.AbstractEventLoop: try: return asyncio.get_event_loop() except RuntimeError: - asyncio.set_event_loop(asyncio.new_event_loop()) + asyncio.set_event_loop(new_event_loop()) return asyncio.get_event_loop() + + +def _create_task( + loop: asyncio.AbstractEventLoop, + coro: Union[Coroutine[Any, Any, _T], Generator[Any, None, _T]], +) -> asyncio.Task[_T]: + task = asyncio.Task(coro, loop=loop) + task.add_done_callback(_done_callback) + return task + + +def _done_callback(task: asyncio.Task[_T_co]) -> None: + assert task.done() + try: + if (exc := task.exception()) is None: + return + + print(f"Exception during {task.get_name()}", file=sys.stderr) + print_exception(exc, file=sys.stderr) + except asyncio.CancelledError: + pass diff --git a/src/ert/ensemble_evaluator/_builder/_legacy.py b/src/ert/ensemble_evaluator/_builder/_legacy.py index af97d34d753..d8b3ff662d2 100644 --- a/src/ert/ensemble_evaluator/_builder/_legacy.py +++ b/src/ert/ensemble_evaluator/_builder/_legacy.py @@ -9,7 +9,7 @@ from cloudevents.http.event import CloudEvent -from ert.async_utils import get_event_loop +from ert.async_utils import get_event_loop, new_event_loop from ert.config.parsing.queue_system import QueueSystem from ert.ensemble_evaluator import identifiers from ert.job_queue import JobQueue @@ -117,7 +117,7 @@ def _evaluate(self) -> None: a coroutine """ # Get a fresh eventloop - asyncio.set_event_loop(asyncio.new_event_loop()) + asyncio.set_event_loop(new_event_loop()) if self._config is None: raise ValueError("no config")