Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add new snapshot trigger and fix collector startup #1014

Merged
merged 10 commits into from
Mar 15, 2024
2 changes: 1 addition & 1 deletion requirements.dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ isort==5.10.1
pyspark

# service dependencies
litestar
litestar>=2.6.3
dynaconf
uvicorn
pyarrow
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"requests>=2.21.0",
"PyYAML>=5.1",
"pydantic>=1.10.14",
"litestar>=2.5.1",
"litestar>=2.6.3",
"typing-inspect>=0.9.0",
"uvicorn>=0.22.0",
"watchdog>=3",
Expand Down
94 changes: 55 additions & 39 deletions src/evidently/collector/app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import asyncio
from asyncio import ensure_future
import contextlib
import logging
from typing import AsyncGenerator
from typing import Dict
from typing import List
from typing import Optional

import pandas as pd
import uvicorn
from litestar import Litestar
from litestar import Request
from litestar import get
from litestar import post
from litestar.concurrency import sync_to_thread
from litestar.connection import ASGIConnection
from litestar.di import Provide
from litestar.exceptions import HTTPException
Expand All @@ -29,22 +34,19 @@
from evidently.telemetry import DO_NOT_TRACK_ENV
from evidently.telemetry import event_logger
from evidently.ui.config import NoSecurityConfig
from evidently.ui.errors import EvidentlyServiceError
from evidently.ui.security.no_security import NoSecurityService
from evidently.ui.security.service import SecurityService
from evidently.ui.security.token import TokenSecurity
from evidently.ui.security.token import TokenSecurityConfig

COLLECTOR_INTERFACE = "collector"


async def entity_not_found_exception_handler(request: Request, exc: EvidentlyServiceError):
return exc.to_response()
logger = logging.getLogger(__name__)


@post("/{id:str}")
async def create_collector(
id: Annotated[str, Parameter(query="id", description="Collector ID")],
id: Annotated[str, Parameter(description="Collector ID")],
data: CollectorConfig,
service: CollectorServiceConfig,
storage: CollectorStorage,
Expand All @@ -58,7 +60,7 @@ async def create_collector(

@get("/{id:str}")
async def get_collector(
id: Annotated[str, Parameter(query="id", description="Collector ID")],
id: Annotated[str, Parameter(description="Collector ID")],
service: CollectorServiceConfig,
) -> CollectorConfig:
if id not in service.collectors:
Expand All @@ -68,10 +70,10 @@ async def get_collector(

@post("/{id:str}/reference")
async def reference(
id: Annotated[str, Parameter(query="id", description="Collector ID")],
id: Annotated[str, Parameter(description="Collector ID")],
request: Request,
service: CollectorServiceConfig,
):
) -> Dict[str, str]:
if id not in service.collectors:
raise HTTPException(status_code=404, detail=f"Collector config with id '{id}' not found")
collector = service.collectors[id]
Expand All @@ -85,11 +87,11 @@ async def reference(

@post("/{id:str}/data")
async def data(
id: Annotated[str, Parameter(query="id", description="Collector ID")],
id: Annotated[str, Parameter(description="Collector ID")],
request: Request,
service: CollectorServiceConfig,
storage: CollectorStorage,
):
) -> Dict[str, str]:
if id not in service.collectors:
raise HTTPException(status_code=404, detail=f"Collector config with id '{id}' not found")
async with storage.lock(id):
Expand All @@ -99,7 +101,7 @@ async def data(

@get("/{id:str}/logs")
async def get_logs(
id: Annotated[str, Parameter(query="id", description="Collector ID")],
id: Annotated[str, Parameter(description="Collector ID")],
service: CollectorServiceConfig,
storage: CollectorStorage,
) -> List[LogEvent]:
Expand All @@ -108,50 +110,46 @@ async def get_logs(
return storage.get_logs(id)


def check_snapshots_factory(service: CollectorServiceConfig, storage: CollectorStorage):
async def check_snapshots():
for _, collector in service.collectors.items():
if not collector.trigger.is_ready(collector, storage):
continue
# todo: call async
await create_snapshot(collector, storage)

return check_snapshots
async def check_snapshots_factory(service: CollectorServiceConfig, storage: CollectorStorage) -> None:
for _, collector in service.collectors.items():
if not collector.trigger.is_ready(collector, storage):
continue
await create_snapshot(collector, storage)


async def create_snapshot(collector: CollectorConfig, storage: CollectorStorage):
async def create_snapshot(collector: CollectorConfig, storage: CollectorStorage) -> None:
async with storage.lock(collector.id):
current = storage.get_and_flush(collector.id)
current = await sync_to_thread(storage.get_and_flush, collector.id) # FIXME: sync function
if current is None:
return
current.index = current.index.astype(int)
report_conf = collector.report_config
report = report_conf.to_report_base()
try:
report.run(reference_data=collector.reference, current_data=current, column_mapping=ColumnMapping())
await sync_to_thread(
report.run, reference_data=collector.reference, current_data=current, column_mapping=ColumnMapping()
) # FIXME: sync function
report._inner_suite.raise_for_error()
except Exception as e:
logger.exception(f"Error running report: {e}")
storage.log(
collector.id, LogEvent(ok=False, error=f"Error running report: {e.__class__.__name__}: {e.args}")
)
return
try:
collector.workspace.add_snapshot(collector.project_id, report.to_snapshot())
await sync_to_thread(
collector.workspace.add_snapshot, collector.project_id, report.to_snapshot()
) # FIXME: sync function
except Exception as e:
logger.exception(f"Error saving snapshot: {e}")
storage.log(
collector.id, LogEvent(ok=False, error=f"Error saving snapshot: {e.__class__.__name__}: {e.args}")
)
return
storage.log(collector.id, LogEvent(ok=True))


async def loop(seconds: int, func):
while True:
await func()
await asyncio.sleep(seconds)


def run(host: str = "0.0.0.0", port: int = 8001, config_path: str = CONFIG_PATH, secret: str = None):
def create_app(config_path: str = CONFIG_PATH, secret: Optional[str] = None) -> Litestar:
service = CollectorServiceConfig.load_or_default(config_path)
service.storage.init_all(service)

Expand All @@ -161,7 +159,6 @@ def run(host: str = "0.0.0.0", port: int = 8001, config_path: str = CONFIG_PATH,
print("Anonimous usage reporting is disabled")
event_logger.send_event(COLLECTOR_INTERFACE, "startup")

ensure_future(loop(seconds=service.check_interval, func=check_snapshots_factory(service, service.storage)))
security: SecurityService
if secret is None:
security = NoSecurityService(NoSecurityConfig())
Expand Down Expand Up @@ -190,7 +187,26 @@ def is_authenticated(connection: ASGIConnection, _: BaseRouteHandler) -> None:
if not connection.scope["auth"]["authenticated"]:
raise NotAuthorizedException()

app = Litestar(
@contextlib.asynccontextmanager
async def check_snapshots_factory_lifespan(app: Litestar) -> AsyncGenerator[None, None]:
stop_event = asyncio.Event()

async def check_service_snapshots_periodically():
while not stop_event.is_set():
try:
await check_snapshots_factory(service, service.storage)
except Exception as e:
logger.exception(f"Check snapshots factory error: {e}")
await asyncio.sleep(service.check_interval)

task = asyncio.create_task(check_service_snapshots_periodically())
try:
yield
finally:
stop_event.set()
await task

return Litestar(
route_handlers=[
create_collector,
get_collector,
Expand All @@ -205,14 +221,14 @@ def is_authenticated(connection: ASGIConnection, _: BaseRouteHandler) -> None:
},
middleware=[auth_middleware_factory],
guards=[is_authenticated],
lifespan=[check_snapshots_factory_lifespan],
)

uvicorn.run(app, host=host, port=port)


def main():
run()
def run(host: str = "0.0.0.0", port: int = 8001, config_path: str = CONFIG_PATH, secret: Optional[str] = None):
app = create_app(config_path, secret)
uvicorn.run(app, host=host, port=port)


if __name__ == "__main__":
main()
run()
15 changes: 9 additions & 6 deletions src/evidently/collector/client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import Any
from typing import Dict

import pandas as pd

from evidently.collector.config import CollectorConfig
from evidently.ui.utils import RemoteClientBase


class CollectorClient(RemoteClientBase):
def create_collector(self, id: str, collector: CollectorConfig):
self._request(f"/{id}", "POST", body=collector.dict())
def create_collector(self, id: str, collector: CollectorConfig) -> Dict[str, Any]:
return self._request(f"/{id}", "POST", body=collector.dict()).json()

def send_data(self, id: str, data: pd.DataFrame):
self._request(f"/{id}/data", "POST", body=data.to_dict())
def send_data(self, id: str, data: pd.DataFrame) -> Dict[str, Any]:
return self._request(f"/{id}/data", "POST", body=data.to_dict()).json()

def set_reference(self, id: str, reference: pd.DataFrame):
self._request(f"/{id}/reference", "POST", body=reference.to_dict())
def set_reference(self, id: str, reference: pd.DataFrame) -> Dict[str, Any]:
return self._request(f"/{id}/reference", "POST", body=reference.to_dict()).json()
19 changes: 14 additions & 5 deletions src/evidently/collector/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pandas as pd

from evidently._pydantic_compat import BaseModel
from evidently._pydantic_compat import Field
from evidently._pydantic_compat import parse_obj_as
from evidently.base_metric import Metric
from evidently.collector.storage import CollectorStorage
Expand Down Expand Up @@ -44,25 +45,33 @@ def is_ready(self, config: "CollectorConfig", storage: "CollectorStorage") -> bo


class IntervalTrigger(CollectorTrigger):
interval: float
interval: float = Field(gt=0)
last_triggered: float = 0

def is_ready(self, config: "CollectorConfig", storage: "CollectorStorage") -> bool:
now = time.time()
if now - self.last_triggered > self.interval:
is_ready = (now - self.last_triggered) > self.interval
if is_ready:
self.last_triggered = now
return True
return False
return is_ready


class RowsCountTrigger(CollectorTrigger):
rows_count: int = 1
rows_count: int = Field(default=1, gt=0)

def is_ready(self, config: "CollectorConfig", storage: "CollectorStorage") -> bool:
buffer_size = storage.get_buffer_size(config.id)
return buffer_size > 0 and buffer_size >= self.rows_count


class RowsCountOrIntervalTrigger(CollectorTrigger):
rows_count_trigger: RowsCountTrigger
interval_trigger: IntervalTrigger

def is_ready(self, config: "CollectorConfig", storage: "CollectorStorage") -> bool:
return self.interval_trigger.is_ready(config, storage) or self.rows_count_trigger.is_ready(config, storage)


class ReportConfig(Config):
metrics: List[Metric]
tests: List[Test]
Expand Down
9 changes: 7 additions & 2 deletions src/evidently/ui/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import urllib.parse
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union

import requests

Expand All @@ -9,6 +12,8 @@
from evidently.ui.storage.common import SECRET_HEADER_NAME
from evidently.utils import NumpyEncoder

T = TypeVar("T", bound=BaseModel)


class RemoteClientBase:
def __init__(self, base_url: str, secret: str = None):
Expand All @@ -21,8 +26,8 @@ def _request(
method: str,
query_params: Optional[dict] = None,
body: Optional[dict] = None,
response_model=None,
):
response_model: Optional[Type[T]] = None,
) -> Union[T, requests.Response]:
# todo: better encoding
headers = {SECRET_HEADER_NAME: self.secret}
data = None
Expand Down
Empty file added tests/collector/__init__.py
Empty file.
Loading
Loading