Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Il 388 submit individual traces #895

Merged
merged 2 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
## Unreleased

### Breaking Changes
...
* Remove the `Trace` class, as it was no longer used.
* Renamed `example_trace` to `example_tracer` and changed return type to `Optional[Tracer]`.
* Renamed `example_tracer` to `create_tracer_for_example`.

### New Features
...
* `Lineages` now contain `Tracer` for individual `Output`s.
* `convert_to_pandas_data_frame` now also creates a column containing the `Tracer`s.
* `run_dataset` now has a flag `trace_examples_individually` to create `Tracer`s for each example. Defaults to True.

### Fixes
...
### Deprecations
Expand Down
32 changes: 25 additions & 7 deletions src/documentation/evaluation.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"from dotenv import load_dotenv\n",
"\n",
"from intelligence_layer.connectors import LimitedConcurrencyClient\n",
"from intelligence_layer.core import NoOpTracer, TextChunk\n",
"from intelligence_layer.core import TextChunk\n",
"from intelligence_layer.evaluation import (\n",
" Aggregator,\n",
" Evaluator,\n",
Expand Down Expand Up @@ -129,7 +129,7 @@
" dataset_name=\"ClassifyDataset\",\n",
")\n",
"\n",
"run_overview = runner.run_dataset(single_example_dataset.id, NoOpTracer())\n",
"run_overview = runner.run_dataset(single_example_dataset.id)\n",
"evaluation_overview = evaluator.evaluate_runs(run_overview.id)\n",
"aggregation_overview = aggregator.aggregate_evaluation(evaluation_overview.id)\n",
"\n",
Expand Down Expand Up @@ -220,7 +220,7 @@
"source": [
"Ok, let's run this!\n",
"\n",
"Note that this may take a while as we parallelise the tasks in a way that accommodates the inference API."
"Note that this may take a while as we parallelize the tasks in a way that accommodates the inference API."
]
},
{
Expand Down Expand Up @@ -268,11 +268,29 @@
"lineages = navigator.evaluation_lineages(\n",
" next(iter(aggregation_overview.evaluation_overviews)).id,\n",
" input_type=ClassifyInput,\n",
" expected_output_type=SingleLabelClassifyOutput,\n",
" output_type=Sequence[str],\n",
" expected_output_type=str,\n",
" output_type=SingleLabelClassifyOutput,\n",
" evaluation_type=SingleLabelClassifyEvaluation,\n",
")\n",
"evaluation_lineages_to_pandas(lineages).head(2)"
"\n",
"df = evaluation_lineages_to_pandas(lineages).head(10)\n",
"df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To inspect the trace of a specific example, we can simply select the `tracer` column of the desired row. If the trace viewer is running in the background, the trace will be automatically send to the trace viewer. If the trace viewer is not running, the trace will be visualized directly in the notebook instead, which will, for this example, lead to a *very* large output."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df[\"tracer\"].iloc[0]"
]
},
{
Expand Down Expand Up @@ -431,7 +449,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.2"
"version": "3.11.8"
}
},
"nbformat": 4,
Expand Down
7 changes: 0 additions & 7 deletions src/documentation/how_tos/how_to_log_and_debug_a_task.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,6 @@
"\n",
"pass"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
11 changes: 2 additions & 9 deletions src/documentation/quickstart_task.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,6 @@
"print(aggregation_overview)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
Expand All @@ -523,10 +516,10 @@
")\n",
"print(examples[1].input.text)\n",
"examples.sort(key=lambda x: x.input.text)\n",
"last_example_result = run_repository.example_trace(\n",
"last_example_result = run_repository.example_tracer(\n",
" next(iter(aggregation_overview.run_overviews())).id, examples[1].id\n",
")\n",
"last_example_result.trace"
"last_example_result"
]
},
{
Expand Down
62 changes: 36 additions & 26 deletions src/intelligence_layer/core/tracer/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,41 @@ class ExportedSpan(BaseModel):
ExportedSpanList = RootModel[Sequence[ExportedSpan]]


def submit_to_trace_viewer(exported_spans: Sequence[ExportedSpan]) -> bool:
"""Submits the trace to the UI for visualization

Args:
exported_spans: The exported spans to submit to the trace viewer.

Returns:
Boolean indicating whether the trace was submitted successfully.

"""
trace_viewer_url = os.getenv("TRACE_VIEWER_URL", "http://localhost:3000")
trace_viewer_trace_upload = f"{trace_viewer_url}/trace"
try:
res = requests.post(
trace_viewer_trace_upload,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
json=ExportedSpanList(exported_spans).model_dump(mode="json"),
)
print(res)
if res.status_code != 200:
raise requests.HTTPError(res.status_code)
rich.print(
f"Open the [link={trace_viewer_url}]Trace Viewer[/link] to view the trace."
)
return True
except requests.ConnectionError:
print(
f"Trace viewer not found under {trace_viewer_url}.\nConsider running it for a better viewing experience.\nIf it is, set `TRACE_VIEWER_URL` in the environment."
)
return False


class Tracer(ABC):
"""Provides a consistent way to instrument a :class:`Task` with logging for each step of the
workflow.
Expand Down Expand Up @@ -174,32 +209,7 @@ def export_for_viewing(self) -> Sequence[ExportedSpan]:
...

def submit_to_trace_viewer(self) -> bool:
"""Submits the trace to the UI for visualization"""
trace_viewer_url = os.getenv("TRACE_VIEWER_URL", "http://localhost:3000")
trace_viewer_trace_upload = f"{trace_viewer_url}/trace"
try:
res = requests.post(
trace_viewer_trace_upload,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
json=ExportedSpanList(self.export_for_viewing()).model_dump(
mode="json"
),
)
print(res)
if res.status_code != 200:
raise requests.HTTPError(res.status_code)
rich.print(
f"Open the [link={trace_viewer_url}]Trace Viewer[/link] to view the trace."
)
return True
except requests.ConnectionError:
print(
f"Trace viewer not found under {trace_viewer_url}.\nConsider running it for a better viewing experience.\nIf it is, set `TRACE_VIEWER_URL` in the environment."
)
return False
return submit_to_trace_viewer(self.export_for_viewing())


class ErrorValue(BaseModel):
Expand Down
4 changes: 0 additions & 4 deletions src/intelligence_layer/evaluation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,5 @@
from .run.in_memory_run_repository import InMemoryRunRepository as InMemoryRunRepository
from .run.run_repository import RunRepository as RunRepository
from .run.runner import Runner as Runner
from .run.trace import ExampleTrace as ExampleTrace
from .run.trace import LogTrace as LogTrace
from .run.trace import SpanTrace as SpanTrace
from .run.trace import TaskSpanTrace as TaskSpanTrace

__all__ = [symbol for symbol in dir()]
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import itertools
from typing import Generic, Iterable, Sequence
from typing import Generic, Iterable, Optional, Sequence

import pandas as pd
import rich
from pydantic import BaseModel
from rich.tree import Tree

from intelligence_layer.core import Tracer
from intelligence_layer.core.task import Input, Output
from intelligence_layer.evaluation.aggregation.domain import (
AggregatedEvaluation,
Expand All @@ -24,9 +24,20 @@
from intelligence_layer.evaluation.run.run_repository import RunRepository


class RunLineage(BaseModel, Generic[Input, ExpectedOutput, Output]):
class RunLineage(Generic[Input, ExpectedOutput, Output]):
example: Example[Input, ExpectedOutput]
output: ExampleOutput[Output]
tracer: Optional[Tracer]

def __init__(
self,
example: Example[Input, ExpectedOutput],
output: ExampleOutput[Output],
tracer: Optional[Tracer] = None,
) -> None:
self.example = example
self.output = output
self.tracer = tracer

def _rich_render(self) -> Tree:
tree = Tree("Run Lineage")
Expand Down Expand Up @@ -62,10 +73,23 @@ def run_lineages_to_pandas(
return df


class EvaluationLineage(BaseModel, Generic[Input, ExpectedOutput, Output, Evaluation]):
class EvaluationLineage(Generic[Input, ExpectedOutput, Output, Evaluation]):
example: Example[Input, ExpectedOutput]
outputs: Sequence[ExampleOutput[Output]]
evaluation: ExampleEvaluation[Evaluation]
tracers: Sequence[Optional[Tracer]]

def __init__(
self,
example: Example[Input, ExpectedOutput],
outputs: Sequence[ExampleOutput[Output]],
evaluation: ExampleEvaluation[Evaluation],
tracers: Sequence[Optional[Tracer]],
) -> None:
self.example = example
self.outputs = outputs
self.evaluation = evaluation
self.tracers = tracers

def _rich_render(self) -> Tree:
tree = Tree("Run Lineage")
Expand Down Expand Up @@ -102,9 +126,10 @@ def evaluation_lineages_to_pandas(
vars(lineage.example)
| vars(output)
| vars(lineage.evaluation)
| {"tracer": lineage.tracers[index]}
| {"lineage": lineage}
for lineage in evaluation_lineages
for output in lineage.outputs
for index, output in enumerate(lineage.outputs)
]
)
df = df.drop(columns="id")
Expand Down Expand Up @@ -198,7 +223,13 @@ def run_lineages(
# join
for example, example_output in itertools.product(examples, example_outputs):
if example.id == example_output.example_id:
yield RunLineage(example=example, output=example_output)
yield RunLineage(
example=example,
output=example_output,
tracer=self._run_repository.example_tracer(
run_id=run_id, example_id=example.id
),
)

def evaluation_lineages(
self,
Expand Down Expand Up @@ -245,17 +276,26 @@ def evaluation_lineages(
for evaluation in evaluations:
example = None
outputs = []
tracers = []
for run_lineage in run_lineages:
if run_lineage.example.id == evaluation.example_id:
if example is None:
# the evaluation has only one example
# and all relevant run lineages contain the same example
example = run_lineage.example
outputs.append(run_lineage.output)
tracers.append(
self._run_repository.example_tracer(
run_lineage.output.run_id, run_lineage.output.example_id
)
)

if example is not None:
yield EvaluationLineage(
example=example, outputs=outputs, evaluation=evaluation
example=example,
outputs=outputs,
evaluation=evaluation,
tracers=tracers,
)

def run_lineage(
Expand Down Expand Up @@ -295,7 +335,11 @@ def run_lineage(
if example_output is None:
return None

return RunLineage(example=example, output=example_output)
return RunLineage(
example=example,
output=example_output,
tracer=self._run_repository.example_tracer(run_id, example_id),
)

def evaluation_lineage(
self,
Expand Down Expand Up @@ -352,4 +396,5 @@ def evaluation_lineage(
example=existing_run_lineages[0].example,
outputs=[lineage.output for lineage in existing_run_lineages],
evaluation=example_evaluation,
tracers=[lineage.tracer for lineage in existing_run_lineages],
)
23 changes: 6 additions & 17 deletions src/intelligence_layer/evaluation/run/file_run_repository.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
from pathlib import Path
from typing import Iterable, Optional, Sequence, cast
from typing import Iterable, Optional, Sequence

from fsspec.implementations.local import LocalFileSystem # type: ignore

from intelligence_layer.core import (
FileTracer,
InMemoryTaskSpan,
InMemoryTracer,
JsonSerializer,
Output,
Tracer,
)
from intelligence_layer.core import FileTracer, InMemoryTracer, JsonSerializer, Output
from intelligence_layer.core.tracer.tracer import Tracer
from intelligence_layer.evaluation.infrastructure.file_system_based_repository import (
FileSystemBasedRepository,
)
from intelligence_layer.evaluation.run.domain import ExampleOutput, RunOverview
from intelligence_layer.evaluation.run.run_repository import RunRepository
from intelligence_layer.evaluation.run.trace import ExampleTrace, TaskSpanTrace


class FileSystemRunRepository(RunRepository, FileSystemBasedRepository):
Expand Down Expand Up @@ -62,17 +55,13 @@ def example_output(
json_data=content
)

def example_trace(self, run_id: str, example_id: str) -> Optional[ExampleTrace]:
def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]:
file_path = self._example_trace_path(run_id, example_id)
if not self.exists(file_path):
return None
in_memory_tracer = self._parse_log(file_path)
trace = TaskSpanTrace.from_task_span(
cast(InMemoryTaskSpan, in_memory_tracer.entries[0])
)
return ExampleTrace(run_id=run_id, example_id=example_id, trace=trace)
return self._parse_log(file_path)

def example_tracer(self, run_id: str, example_id: str) -> Tracer:
def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer:
file_path = self._example_trace_path(run_id, example_id)
return FileTracer(file_path)

Expand Down
Loading