Skip to content

Commit

Permalink
feat: Upload benchmark lineages from SDK to studio (#1164)
Browse files Browse the repository at this point in the history
* feat: Add skeleton for Benchmark Lineage Upload

* feat: Adding tests

* refactor: Type Handling of connector function

* feat: Include batch upload of benchmark lineages

* docs: Add Changelog entry

* feat: Add get_benchmark_lineage method for better testing

TASK: PHS-885
  • Loading branch information
MerlinKallenbornAA authored Dec 2, 2024
1 parent 170acbe commit eab6ed1
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Introduce `BenchmarkRepository`and `StudioBenchmarkRepository`
- Add `create_project` bool to `StudioClient.__init__()` to enable users to automatically create their Studio projects
- Add progressbar to the `Runner` to be able to track the `Run`
- Add `StudioClient.submit_benchmark_lineages` function and include it in `StudioClient.submit_benchmark_execution`

### Fixes
...
Expand Down
133 changes: 130 additions & 3 deletions src/intelligence_layer/connectors/studio/studio.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gzip
import json
import os
from collections import defaultdict, deque
Expand All @@ -8,7 +9,7 @@
from uuid import uuid4

import requests
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, RootModel
from requests.exceptions import ConnectionError, MissingSchema

from intelligence_layer.connectors import JsonSerializable
Expand All @@ -24,6 +25,8 @@

Input = TypeVar("Input", bound=PydanticSerializable)
ExpectedOutput = TypeVar("ExpectedOutput", bound=PydanticSerializable)
Output = TypeVar("Output", bound=PydanticSerializable)
Evaluation = TypeVar("Evaluation", bound=BaseModel, covariant=True)


class StudioProject(BaseModel):
Expand Down Expand Up @@ -140,6 +143,38 @@ class GetDatasetExamplesResponse(BaseModel, Generic[Input, ExpectedOutput]):
items: Sequence[StudioExample[Input, ExpectedOutput]]


class BenchmarkLineage(BaseModel, Generic[Input, Output, ExpectedOutput, Evaluation]):
trace_id: str
input: Input
expected_output: ExpectedOutput
output: Output
example_metadata: Optional[dict[str, Any]] = None
evaluation: Any
run_latency: int
run_tokens: int


class PostBenchmarkLineagesRequest(RootModel[Sequence[BenchmarkLineage]]):
pass


class PostBenchmarkLineagesResponse(RootModel[Sequence[str]]):
pass


class GetBenchmarkLineageResponse(BaseModel):
id: str
trace_id: str
benchmark_execution_id: str
input: Any
expected_output: Any
example_metadata: Optional[dict[str, Any]] = None
output: Any
evaluation: Any
run_latency: int
run_tokens: int


class StudioClient:
"""Client for communicating with Studio.
Expand Down Expand Up @@ -403,7 +438,7 @@ def get_dataset_examples(
if page is None:
break

def create_benchmark(
def submit_benchmark(
self,
dataset_id: str,
eval_logic: EvaluationLogicIdentifier,
Expand Down Expand Up @@ -449,7 +484,7 @@ def get_benchmark(
return None
return GetBenchmarkResponse.model_validate(response_text)

def create_benchmark_execution(
def submit_benchmark_execution(
self, benchmark_id: str, data: PostBenchmarkExecution
) -> str:
url = urljoin(
Expand All @@ -464,6 +499,98 @@ def create_benchmark_execution(
self._raise_for_status(response)
return str(response.json())

def submit_benchmark_lineages(
self,
benchmark_lineages: Sequence[BenchmarkLineage],
benchmark_id: str,
execution_id: str,
max_payload_size: int = 50
* 1024
* 1024, # Maximum request size handled by Studio
) -> PostBenchmarkLineagesResponse:
"""Submit benchmark lineages in batches to avoid exceeding the maximum payload size.
Args:
benchmark_lineages: List of :class: `BenchmarkLineages` to submit.
benchmark_id: ID of the benchmark.
execution_id: ID of the execution.
max_payload_size: Maximum size of the payload in bytes. Defaults to 50MB.
Returns:
Response containing the results of the submissions.
"""
all_responses = []
remaining_lineages = list(benchmark_lineages)
lineage_sizes = [
len(lineage.model_dump_json().encode("utf-8"))
for lineage in benchmark_lineages
]

while remaining_lineages:
batch = []
current_size = 0
# Build batch while checking size
for lineage, size in zip(remaining_lineages, lineage_sizes, strict=True):
if current_size + size <= max_payload_size:
batch.append(lineage)
current_size += size
else:
break

if batch:
# Send batch
response = self._send_compressed_batch(
batch, benchmark_id, execution_id
)
all_responses.extend(response)

else: # Only reached if a lineage is too big for the request
print("Lineage exceeds maximum of upload size", lineage)
batch.append(lineage)
remaining_lineages = remaining_lineages[len(batch) :]
lineage_sizes = lineage_sizes[len(batch) :]

return PostBenchmarkLineagesResponse(all_responses)

def get_benchmark_lineage(
self, benchmark_id: str, execution_id: str, lineage_id: str
) -> GetBenchmarkLineageResponse | None:
url = urljoin(
self.url,
f"/api/projects/{self.project_id}/evaluation/benchmarks/{benchmark_id}/executions/{execution_id}/lineages/{lineage_id}",
)
response = requests.get(
url,
headers=self._headers,
)
self._raise_for_status(response)
response_text = response.json()
if response_text is None:
return None
return GetBenchmarkLineageResponse.model_validate(response_text)

def _send_compressed_batch(
self, batch: list[BenchmarkLineage], benchmark_id: str, execution_id: str
) -> list[str]:
url = urljoin(
self.url,
f"/api/projects/{self.project_id}/evaluation/benchmarks/{benchmark_id}/executions/{execution_id}/lineages",
)

json_data = PostBenchmarkLineagesRequest(root=batch).model_dump_json()
compressed_data = gzip.compress(json_data.encode("utf-8"))

headers = {**self._headers, "Content-Encoding": "gzip"}

response = requests.post(
url,
headers=headers,
data=compressed_data,
)

self._raise_for_status(response)
return response.json()

def _raise_for_status(self, response: requests.Response) -> None:
try:
response.raise_for_status()
Expand Down
56 changes: 52 additions & 4 deletions src/intelligence_layer/evaluation/benchmark/studio_benchmark.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
from collections.abc import Sequence
from datetime import datetime
from http import HTTPStatus
from typing import Any, Optional
Expand All @@ -9,6 +10,7 @@

from intelligence_layer.connectors.studio.studio import (
AggregationLogicIdentifier,
BenchmarkLineage,
EvaluationLogicIdentifier,
PostBenchmarkExecution,
StudioClient,
Expand Down Expand Up @@ -39,6 +41,9 @@
from intelligence_layer.evaluation.evaluation.in_memory_evaluation_repository import (
InMemoryEvaluationRepository,
)
from intelligence_layer.evaluation.infrastructure.repository_navigator import (
EvaluationLineage,
)
from intelligence_layer.evaluation.run.in_memory_run_repository import (
InMemoryRunRepository,
)
Expand Down Expand Up @@ -135,17 +140,60 @@ def execute(
statistics=aggregation_overview.statistics.model_dump_json(),
)

benchmark_execution_id = self.client.create_benchmark_execution(
benchmark_execution_id = self.client.submit_benchmark_execution(
benchmark_id=self.id, data=data
)

evaluation_lineages = self.evaluator.evaluation_lineages(evaluation_overview.id)
evaluation_lineages = list(
self.evaluator.evaluation_lineages(evaluation_overview.id)
)
trace_ids = []
for lineage in tqdm(evaluation_lineages, desc="Submitting traces to Studio"):
trace = lineage.tracers[0]
assert trace
self.client.submit_trace(trace.export_for_viewing())
trace_id = self.client.submit_trace(trace.export_for_viewing())
trace_ids.append(trace_id)

benchmark_lineages = self._create_benchmark_lineages(
eval_lineages=evaluation_lineages,
trace_ids=trace_ids,
)
self.client.submit_benchmark_lineages(
benchmark_lineages=benchmark_lineages,
execution_id=benchmark_execution_id,
benchmark_id=self.id,
)

return benchmark_execution_id

def _create_benchmark_lineages(
self,
eval_lineages: list[
EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]
],
trace_ids: list[str],
) -> Sequence[BenchmarkLineage[Input, Output, ExpectedOutput, Evaluation]]:
return [
self._create_benchmark_lineage(eval_lineage, trace_id)
for eval_lineage, trace_id in zip(eval_lineages, trace_ids, strict=True)
]

def _create_benchmark_lineage(
self,
eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation],
trace_id: str,
) -> BenchmarkLineage:
return BenchmarkLineage(
trace_id=trace_id,
input=eval_lineage.example.input,
expected_output=eval_lineage.example.expected_output,
example_metadata=eval_lineage.example.metadata,
output=eval_lineage.outputs[0].output,
evaluation=eval_lineage.evaluation.result,
run_latency=0, # TODO: Implement this
run_tokens=0, # TODO: Implement this
)


class StudioBenchmarkRepository(BenchmarkRepository):
def __init__(self, studio_client: StudioClient):
Expand All @@ -161,7 +209,7 @@ def create_benchmark(
description: Optional[str] = None,
) -> StudioBenchmark:
try:
benchmark_id = self.client.create_benchmark(
benchmark_id = self.client.submit_benchmark(
dataset_id,
create_evaluation_logic_identifier(eval_logic),
create_aggregation_logic_identifier(aggregation_logic),
Expand Down
Loading

0 comments on commit eab6ed1

Please sign in to comment.