Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RowCountTrigger to Collector Service #859

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/book/monitoring/collector_service.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ report_config = ReportConfig.from_test_suite(test_suite)

## CollectorTrigger

Currently, there is one option available: `IntervalTrigger`. It triggers the snapshot calculation each interval seconds.
Currently, there are two options available:
* `IntervalTrigger`: triggers the snapshot calculation each interval seconds
* `RowsCountTrigger`: triggers the snapshot calculation every time the configured amount of rows have been sent to the collector service

**Note**: we are also working on `CronTrigger` and `RowsCountTrigger`. Would you like to see additional scenarios? Please open a GitHub issue with your suggestions.
**Note**: we are also working on `CronTrigger` and other triggers. Would you like to see additional scenarios? Please open a GitHub issue with your suggestions.

## Setup via file

Expand Down
49 changes: 31 additions & 18 deletions examples/integrations/collector_service/example_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,35 @@
import time

import pandas as pd

from requests.exceptions import RequestException

from evidently.collector.client import CollectorClient
from evidently.collector.config import CollectorConfig, IntervalTrigger, ReportConfig
from evidently.collector.config import CollectorConfig
from evidently.collector.config import IntervalTrigger
from evidently.collector.config import ReportConfig
from evidently.collector.config import RowsCountTrigger
from evidently.metrics import ColumnValueRangeMetric
from evidently.report import Report
from evidently.test_suite import TestSuite
from evidently.tests import TestNumberOfOutRangeValues
from evidently.ui.dashboards import DashboardPanelPlot, PanelValue, PlotType, ReportFilter
from evidently.ui.dashboards import DashboardPanelPlot
from evidently.ui.dashboards import PanelValue
from evidently.ui.dashboards import PlotType
from evidently.ui.dashboards import ReportFilter
from evidently.ui.workspace import Workspace


COLLECTOR_ID = "default"
COLLECTOR_TEST_ID = "default_test"

PROJECT_NAME = "My Cool Project"

WORKSACE_PATH = "workspace"
WORKSPACE_PATH = "workspace"

client = CollectorClient("http://localhost:8001")


def get_data():
cur = ref = pd.DataFrame([{"values1": 5., "values2": 0.} for _ in range(10)])
cur = ref = pd.DataFrame([{"values1": 5.0, "values2": 0.0} for _ in range(10)])
return cur, ref


Expand All @@ -38,6 +42,7 @@ def setup_report():
report.run(reference_data=ref, current_data=cur)
return ReportConfig.from_report(report)


def setup_test_suite():
report = TestSuite(tests=[TestNumberOfOutRangeValues("values1", left=5)], tags=["quality"])

Expand All @@ -47,15 +52,17 @@ def setup_test_suite():


def setup_workspace():
ws = Workspace.create(WORKSACE_PATH)
ws = Workspace.create(WORKSPACE_PATH)
project = ws.create_project(PROJECT_NAME)
project.dashboard.add_panel(
DashboardPanelPlot(
title="sample_panel",
filter=ReportFilter(metadata_values={}, tag_values=["quality"]),
values=[
PanelValue(metric_id="ColumnValueRangeMetric", field_path="current.share_in_range", legend="current"),
PanelValue(metric_id="ColumnValueRangeMetric", field_path="reference.share_in_range", legend="reference"),
PanelValue(
metric_id="ColumnValueRangeMetric", field_path="reference.share_in_range", legend="reference"
),
],
plot_type=PlotType.LINE,
)
Expand All @@ -64,22 +71,28 @@ def setup_workspace():


def setup_config():
ws = Workspace.create(WORKSACE_PATH)
ws = Workspace.create(WORKSPACE_PATH)
project = ws.search_project(PROJECT_NAME)[0]
conf = CollectorConfig(trigger=IntervalTrigger(interval=5), report_config=setup_report(), project_id=str(project.id))
client.create_collector(COLLECTOR_ID, conf)
# conf = CollectorConfig(trigger=IntervalTrigger(interval=5), report_config=setup_report(), project_id=str(project.id))
conf = CollectorConfig(
trigger=RowsCountTrigger(rows_count=5), report_config=setup_report(), project_id=str(project.id)
)
client.create_collector(id=COLLECTOR_ID, collector=conf)

test_conf = CollectorConfig(trigger=IntervalTrigger(interval=5), report_config=setup_test_suite(), project_id=str(project.id))
client.create_collector(COLLECTOR_TEST_ID, test_conf)
# test_conf = CollectorConfig(trigger=IntervalTrigger(interval=5), report_config=setup_test_suite(), project_id=str(project.id))
test_conf = CollectorConfig(
trigger=RowsCountTrigger(rows_count=5), report_config=setup_test_suite(), project_id=str(project.id)
)
client.create_collector(id=COLLECTOR_TEST_ID, collector=test_conf)

_, ref = get_data()
client.set_reference(COLLECTOR_ID, ref)
client.set_reference(COLLECTOR_TEST_ID, ref)
client.set_reference(id=COLLECTOR_ID, reference=ref)
client.set_reference(id=COLLECTOR_TEST_ID, reference=ref)


def send_data():
size = 1
data = pd.DataFrame([{"values1": 3. + datetime.datetime.now().minute % 5, "values2": 0.} for _ in range(size)])
data = pd.DataFrame([{"values1": 3.0 + datetime.datetime.now().minute % 5, "values2": 0.0} for _ in range(size)])

client.send_data(COLLECTOR_ID, data)
client.send_data(COLLECTOR_TEST_ID, data)
Expand All @@ -97,13 +110,13 @@ def start_sending_data():


def main():
if not os.path.exists(WORKSACE_PATH) or len(Workspace.create(WORKSACE_PATH).search_project(PROJECT_NAME)) == 0:
if not os.path.exists(WORKSPACE_PATH) or len(Workspace.create(WORKSPACE_PATH).search_project(PROJECT_NAME)) == 0:
setup_workspace()

setup_config()

start_sending_data()


if __name__ == '__main__':
if __name__ == "__main__":
main()
8 changes: 8 additions & 0 deletions src/evidently/collector/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def is_ready(self, config: "CollectorConfig", storage: "CollectorStorage") -> bo
return False


class RowsCountTrigger(CollectorTrigger):
rows_count: int = 1

def is_ready(self, config: "CollectorConfig", storage: "CollectorStorage") -> bool:
buffer_size = storage.get_buffer_size(config.id)
return buffer_size > 0 and buffer_size >= self.rows_count


class ReportConfig(Config):
metrics: List[Metric]
tests: List[Test]
Expand Down
7 changes: 7 additions & 0 deletions src/evidently/collector/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def init_all(self, config):
def append(self, id: str, data: Any):
raise NotImplementedError

@abc.abstractmethod
def get_buffer_size(self, id: str):
raise NotImplementedError

@abc.abstractmethod
def get_and_flush(self, id: str):
raise NotImplementedError
Expand Down Expand Up @@ -62,6 +66,9 @@ def init(self, id: str):
def append(self, id: str, data: Any):
self._buffers[id].append(data)

def get_buffer_size(self, id: str):
return len(self._buffers[id])

def get_and_flush(self, id: str):
if id not in self._buffers or len(self._buffers[id]) == 0:
return None
Expand Down