Skip to content

Commit

Permalink
Added call of synchronous functions using ThreadPoolExecutor in creat…
Browse files Browse the repository at this point in the history
…e_snapshot to avoid blocking the loop
  • Loading branch information
c0t0ber committed Mar 4, 2024
1 parent 884fc15 commit 95d625b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
11 changes: 8 additions & 3 deletions src/evidently/collector/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from evidently.ui.security.service import SecurityService
from evidently.ui.security.token import TokenSecurity
from evidently.ui.security.token import TokenSecurityConfig
from evidently.utils.executors import run_in_thread_poll_executor

COLLECTOR_INTERFACE = "collector"

Expand Down Expand Up @@ -117,14 +118,16 @@ async def check_snapshots_factory(service: CollectorServiceConfig, storage: Coll

async def create_snapshot(collector: CollectorConfig, storage: CollectorStorage) -> None:
async with storage.lock(collector.id):
current = storage.get_and_flush(collector.id)
current = await run_in_thread_poll_executor(storage.get_and_flush, collector.id) # FIXME: sync function
if current is None:
return
current.index = current.index.astype(int)
report_conf = collector.report_config
report = report_conf.to_report_base()
try:
report.run(reference_data=collector.reference, current_data=current, column_mapping=ColumnMapping())
await run_in_thread_poll_executor(
report.run, reference_data=collector.reference, current_data=current, column_mapping=ColumnMapping()
) # FIXME: sync function
report._inner_suite.raise_for_error()
except Exception as e:
logger.exception(f"Error running report: {e}")
Expand All @@ -133,7 +136,9 @@ async def create_snapshot(collector: CollectorConfig, storage: CollectorStorage)
)
return
try:
collector.workspace.add_snapshot(collector.project_id, report.to_snapshot())
await run_in_thread_poll_executor(
collector.workspace.add_snapshot, collector.project_id, report.to_snapshot()
) # FIXME: sync function
except Exception as e:
logger.exception(f"Error saving snapshot: {e}")
storage.log(
Expand Down
14 changes: 14 additions & 0 deletions src/evidently/utils/executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import asyncio
import contextvars
import functools
from typing import Callable
from typing import TypeVar

R = TypeVar("R")


async def run_in_thread_poll_executor(f: Callable[..., R], *args, **kwargs) -> R:
loop = asyncio.get_running_loop()
ctx = contextvars.copy_context()
f_with_args = functools.partial(f, *args, **kwargs)
return await loop.run_in_executor(None, lambda: ctx.run(f_with_args))

0 comments on commit 95d625b

Please sign in to comment.