From 95d625bc57a463e3154ca0ef8bed1a2c66c39892 Mon Sep 17 00:00:00 2001 From: Danil Ustinov Date: Mon, 4 Mar 2024 23:49:04 +0800 Subject: [PATCH] Added call of synchronous functions using ThreadPoolExecutor in create_snapshot to avoid blocking the loop --- src/evidently/collector/app.py | 11 ++++++++--- src/evidently/utils/executors.py | 14 ++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) create mode 100644 src/evidently/utils/executors.py diff --git a/src/evidently/collector/app.py b/src/evidently/collector/app.py index 615c1defbc..125a046e28 100644 --- a/src/evidently/collector/app.py +++ b/src/evidently/collector/app.py @@ -36,6 +36,7 @@ from evidently.ui.security.service import SecurityService from evidently.ui.security.token import TokenSecurity from evidently.ui.security.token import TokenSecurityConfig +from evidently.utils.executors import run_in_thread_poll_executor COLLECTOR_INTERFACE = "collector" @@ -117,14 +118,16 @@ async def check_snapshots_factory(service: CollectorServiceConfig, storage: Coll async def create_snapshot(collector: CollectorConfig, storage: CollectorStorage) -> None: async with storage.lock(collector.id): - current = storage.get_and_flush(collector.id) + current = await run_in_thread_poll_executor(storage.get_and_flush, collector.id) # FIXME: sync function if current is None: return current.index = current.index.astype(int) report_conf = collector.report_config report = report_conf.to_report_base() try: - report.run(reference_data=collector.reference, current_data=current, column_mapping=ColumnMapping()) + await run_in_thread_poll_executor( + report.run, reference_data=collector.reference, current_data=current, column_mapping=ColumnMapping() + ) # FIXME: sync function report._inner_suite.raise_for_error() except Exception as e: logger.exception(f"Error running report: {e}") @@ -133,7 +136,9 @@ async def create_snapshot(collector: CollectorConfig, storage: CollectorStorage) ) return try: - collector.workspace.add_snapshot(collector.project_id, report.to_snapshot()) + await run_in_thread_poll_executor( + collector.workspace.add_snapshot, collector.project_id, report.to_snapshot() + ) # FIXME: sync function except Exception as e: logger.exception(f"Error saving snapshot: {e}") storage.log( diff --git a/src/evidently/utils/executors.py b/src/evidently/utils/executors.py new file mode 100644 index 0000000000..234138e803 --- /dev/null +++ b/src/evidently/utils/executors.py @@ -0,0 +1,14 @@ +import asyncio +import contextvars +import functools +from typing import Callable +from typing import TypeVar + +R = TypeVar("R") + + +async def run_in_thread_poll_executor(f: Callable[..., R], *args, **kwargs) -> R: + loop = asyncio.get_running_loop() + ctx = contextvars.copy_context() + f_with_args = functools.partial(f, *args, **kwargs) + return await loop.run_in_executor(None, lambda: ctx.run(f_with_args))