From 278d344434d43d5d05cc66abfcb9646b0ac2fb6d Mon Sep 17 00:00:00 2001 From: Mikyo King Date: Mon, 4 Dec 2023 11:18:36 -0700 Subject: [PATCH] fix: trace dataset to disc (#1798) * fix: trace dataset to disc * feat(traces): evaluation annotations on traces for associating spans with eval metrics (#1693) * feat: initial associations of evaluations to traces * add some documentaiton * wip: add dataframe utils * Switch to a single evaluation per dataframe * make copy the default * fix doc string * fix name * fix notebook * Add immutability * remove value from being required * fix tutorials formatting * make type a string to see if it fixes tests * fix test to handle un-parsable * Update src/phoenix/trace/trace_eval_dataset.py Co-authored-by: Xander Song * Update src/phoenix/trace/trace_eval_dataset.py Co-authored-by: Xander Song * change to trace_evaluations * cleanup * Fix formatting * pr comments * cleanup notebook * make sure columns are dropped * remove unused test --------- Co-authored-by: Xander Song * delete the metadata * optemize removal of metadata * shallow copy of dataframe --------- Co-authored-by: Xander Song --- src/phoenix/trace/trace_dataset.py | 54 ++++++++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/src/phoenix/trace/trace_dataset.py b/src/phoenix/trace/trace_dataset.py index 1efbd91d9f..b14d584de6 100644 --- a/src/phoenix/trace/trace_dataset.py +++ b/src/phoenix/trace/trace_dataset.py @@ -1,7 +1,7 @@ import json import uuid from datetime import datetime -from typing import Iterable, Iterator, List, Optional, cast +from typing import Any, Iterable, Iterator, List, Optional, cast import pandas as pd from pandas import DataFrame, read_parquet @@ -10,6 +10,12 @@ from ..config import DATASET_DIR, GENERATED_DATASET_NAME_PREFIX from .schemas import ATTRIBUTE_PREFIX, CONTEXT_PREFIX, Span +from .semantic_conventions import ( + DOCUMENT_METADATA, + RERANKER_INPUT_DOCUMENTS, + RERANKER_OUTPUT_DOCUMENTS, + RETRIEVAL_DOCUMENTS, +) from .span_evaluations import EVALUATIONS_INDEX_NAME, SpanEvaluations from .span_json_decoder import json_to_span from .span_json_encoder import span_to_json @@ -27,6 +33,16 @@ "context.trace_id", ] +RETRIEVAL_DOCUMENTS_COLUMN_NAME = f"{ATTRIBUTE_PREFIX}{RETRIEVAL_DOCUMENTS}" +RERANKER_INPUT_DOCUMENTS_COLUMN_NAME = f"{ATTRIBUTE_PREFIX}{RERANKER_INPUT_DOCUMENTS}" +RERANKER_OUTPUT_DOCUMENTS_COLUMN_NAME = f"{ATTRIBUTE_PREFIX}{RERANKER_OUTPUT_DOCUMENTS}" + +DOCUMENT_COLUMNS = [ + RETRIEVAL_DOCUMENTS_COLUMN_NAME, + RERANKER_INPUT_DOCUMENTS_COLUMN_NAME, + RERANKER_OUTPUT_DOCUMENTS_COLUMN_NAME, +] + def normalize_dataframe(dataframe: DataFrame) -> "DataFrame": """Makes the dataframe have appropriate data types""" @@ -37,6 +53,38 @@ def normalize_dataframe(dataframe: DataFrame) -> "DataFrame": return dataframe +def _delete_empty_document_metadata(documents: Any) -> Any: + """ + Removes ambiguous and empty dicts from the documents list so the is object + serializable to parquet + """ + # If the documents is a list, iterate over them, check that the metadata is + # a dict, see if it is empty, and if it's empty, delete the metadata + if isinstance(documents, list): + # Make a shallow copy of the keys + documents = list(map(dict, documents)) + for document in documents: + metadata = document.get(DOCUMENT_METADATA) + if isinstance(metadata, dict) and not metadata: + # Delete the metadata object since empty dicts are not serializable + del document[DOCUMENT_METADATA] + return documents + + +def get_serializable_spans_dataframe(dataframe: DataFrame) -> DataFrame: + """ + Returns a dataframe that can be serialized to parquet. This means that + the dataframe must not contain any unserializable objects. This function + will delete any unserializable objects from the dataframe. + """ + dataframe = dataframe.copy(deep=False) # copy, don't mutate + # Check if the dataframe has any document columns + is_documents_column = dataframe.columns.isin(DOCUMENT_COLUMNS) + for name, column in dataframe.loc[:, is_documents_column].items(): # type: ignore + dataframe[name] = column.apply(_delete_empty_document_metadata) + return dataframe + + class TraceDataset: """ A TraceDataset is a wrapper around a dataframe which is a flattened representation @@ -145,7 +193,7 @@ def to_disc(self) -> None: """writes the data to disc""" directory = DATASET_DIR / self.name directory.mkdir(parents=True, exist_ok=True) - self.dataframe.to_parquet( + get_serializable_spans_dataframe(self.dataframe).to_parquet( directory / self._data_file_name, allow_truncated_timestamps=True, coerce_timestamps="ms", @@ -175,7 +223,7 @@ def get_spans_dataframe(self, include_evaluations: bool = True) -> DataFrame: include_evaluations: bool if True, the evaluations are merged into the dataframe """ - if not include_evaluations: + if not include_evaluations or not self.evaluations: return self.dataframe.copy() evals_df = self.get_evals_dataframe() # Make sure the index is set to the span_id