Skip to content

Commit

Permalink
fix: trace dataset to disc (#1798)
Browse files Browse the repository at this point in the history
* fix: trace dataset to disc

* feat(traces): evaluation annotations on traces for associating spans with eval metrics (#1693)

* feat: initial associations of evaluations to traces

* add some documentaiton

* wip: add dataframe utils

* Switch to a single evaluation per dataframe

* make copy the default

* fix doc string

* fix name

* fix notebook

* Add immutability

* remove value from being required

* fix tutorials formatting

* make type a string to see if it fixes tests

* fix test to handle un-parsable

* Update src/phoenix/trace/trace_eval_dataset.py

Co-authored-by: Xander Song <[email protected]>

* Update src/phoenix/trace/trace_eval_dataset.py

Co-authored-by: Xander Song <[email protected]>

* change to trace_evaluations

* cleanup

* Fix formatting

* pr comments

* cleanup notebook

* make sure columns are dropped

* remove unused test

---------

Co-authored-by: Xander Song <[email protected]>

* delete the metadata

* optemize removal of metadata

* shallow copy of dataframe

---------

Co-authored-by: Xander Song <[email protected]>
  • Loading branch information
mikeldking and axiomofjoy authored Dec 4, 2023
1 parent 3dfb7bd commit 278d344
Showing 1 changed file with 51 additions and 3 deletions.
54 changes: 51 additions & 3 deletions src/phoenix/trace/trace_dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import uuid
from datetime import datetime
from typing import Iterable, Iterator, List, Optional, cast
from typing import Any, Iterable, Iterator, List, Optional, cast

import pandas as pd
from pandas import DataFrame, read_parquet
Expand All @@ -10,6 +10,12 @@

from ..config import DATASET_DIR, GENERATED_DATASET_NAME_PREFIX
from .schemas import ATTRIBUTE_PREFIX, CONTEXT_PREFIX, Span
from .semantic_conventions import (
DOCUMENT_METADATA,
RERANKER_INPUT_DOCUMENTS,
RERANKER_OUTPUT_DOCUMENTS,
RETRIEVAL_DOCUMENTS,
)
from .span_evaluations import EVALUATIONS_INDEX_NAME, SpanEvaluations
from .span_json_decoder import json_to_span
from .span_json_encoder import span_to_json
Expand All @@ -27,6 +33,16 @@
"context.trace_id",
]

RETRIEVAL_DOCUMENTS_COLUMN_NAME = f"{ATTRIBUTE_PREFIX}{RETRIEVAL_DOCUMENTS}"
RERANKER_INPUT_DOCUMENTS_COLUMN_NAME = f"{ATTRIBUTE_PREFIX}{RERANKER_INPUT_DOCUMENTS}"
RERANKER_OUTPUT_DOCUMENTS_COLUMN_NAME = f"{ATTRIBUTE_PREFIX}{RERANKER_OUTPUT_DOCUMENTS}"

DOCUMENT_COLUMNS = [
RETRIEVAL_DOCUMENTS_COLUMN_NAME,
RERANKER_INPUT_DOCUMENTS_COLUMN_NAME,
RERANKER_OUTPUT_DOCUMENTS_COLUMN_NAME,
]


def normalize_dataframe(dataframe: DataFrame) -> "DataFrame":
"""Makes the dataframe have appropriate data types"""
Expand All @@ -37,6 +53,38 @@ def normalize_dataframe(dataframe: DataFrame) -> "DataFrame":
return dataframe


def _delete_empty_document_metadata(documents: Any) -> Any:
"""
Removes ambiguous and empty dicts from the documents list so the is object
serializable to parquet
"""
# If the documents is a list, iterate over them, check that the metadata is
# a dict, see if it is empty, and if it's empty, delete the metadata
if isinstance(documents, list):
# Make a shallow copy of the keys
documents = list(map(dict, documents))
for document in documents:
metadata = document.get(DOCUMENT_METADATA)
if isinstance(metadata, dict) and not metadata:
# Delete the metadata object since empty dicts are not serializable
del document[DOCUMENT_METADATA]
return documents


def get_serializable_spans_dataframe(dataframe: DataFrame) -> DataFrame:
"""
Returns a dataframe that can be serialized to parquet. This means that
the dataframe must not contain any unserializable objects. This function
will delete any unserializable objects from the dataframe.
"""
dataframe = dataframe.copy(deep=False) # copy, don't mutate
# Check if the dataframe has any document columns
is_documents_column = dataframe.columns.isin(DOCUMENT_COLUMNS)
for name, column in dataframe.loc[:, is_documents_column].items(): # type: ignore
dataframe[name] = column.apply(_delete_empty_document_metadata)
return dataframe


class TraceDataset:
"""
A TraceDataset is a wrapper around a dataframe which is a flattened representation
Expand Down Expand Up @@ -145,7 +193,7 @@ def to_disc(self) -> None:
"""writes the data to disc"""
directory = DATASET_DIR / self.name
directory.mkdir(parents=True, exist_ok=True)
self.dataframe.to_parquet(
get_serializable_spans_dataframe(self.dataframe).to_parquet(
directory / self._data_file_name,
allow_truncated_timestamps=True,
coerce_timestamps="ms",
Expand Down Expand Up @@ -175,7 +223,7 @@ def get_spans_dataframe(self, include_evaluations: bool = True) -> DataFrame:
include_evaluations: bool
if True, the evaluations are merged into the dataframe
"""
if not include_evaluations:
if not include_evaluations or not self.evaluations:
return self.dataframe.copy()
evals_df = self.get_evals_dataframe()
# Make sure the index is set to the span_id
Expand Down

0 comments on commit 278d344

Please sign in to comment.