Skip to content

Commit

Permalink
feat(traces): document retrieval metrics based on document evaluation…
Browse files Browse the repository at this point in the history
… scores (#1826)

* feat: span document retrieval metrics
  • Loading branch information
RogerHYang authored Dec 4, 2023
1 parent f066e10 commit 3dfb7bd
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 4 deletions.
34 changes: 34 additions & 0 deletions app/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,33 @@ type DocumentEvaluation implements Evaluation {
documentPosition: Int!
}

"""
A collection of retrieval metrics computed on a list of document evaluation scores: NDCG@K, Precision@K, Reciprocal Rank, etc.
"""
type DocumentRetrievalMetrics {
evaluationName: String!

"""
Normalized Discounted Cumulative Gain (NDCG) at `k` with log base 2 discounting. If `k` is None, it's set to the length of the scores. If `k` < 1, return 0.0.
"""
ndcg(k: Int): Float

"""
Precision at `k`, defined as the fraction of truthy scores among first `k` positions (1-based index). If `k` is None, then it's set to the length of the scores. If `k` < 1, return 0.0.
"""
precision(k: Int): Float

"""
Return `1/R` where `R` is the rank of the first hit, i.e. the 1-based index position of first truthy score, e.g. score=1. If a non-finite value (e.g. `NaN`) is encountered before the first (finite) truthy score, then return `NaN`, otherwise if no truthy score is found (or if the count of scores is zero), return 0.0.
"""
reciprocalRank: Float

"""
Return 1.0 if any score is truthy (i.e. is a hit), e.g. score=1. Otherwise, return `NaN` if any score is non-finite (e.g. `NaN`), or return 0.0 if all scores are falsy, e.g. all scores are 0.
"""
hit: Float
}

type DriftTimeSeries implements TimeSeries {
data: [TimeSeriesDataPoint!]!
}
Expand Down Expand Up @@ -516,6 +543,9 @@ type Query {
Names of all available evaluations for spans. (The list contains no duplicates.)
"""
spanEvaluationNames: [String!]!

"""Names of available document evaluations."""
documentEvaluationNames(spanId: ID): [String!]!
traceDatasetInfo: TraceDatasetInfo
validateSpanFilterCondition(condition: String!): ValidationResult!
}
Expand Down Expand Up @@ -561,6 +591,7 @@ type Span {

"""Span attributes as a JSON string"""
attributes: String!
numDocuments: Int
tokenCountTotal: Int
tokenCountPrompt: Int
tokenCountCompletion: Int
Expand Down Expand Up @@ -598,6 +629,9 @@ type Span {
"""
documentEvaluations: [DocumentEvaluation!]!

"""Retrieval metrics: NDCG@K, Precision@K, Reciprocal Rank, etc."""
documentRetrievalMetrics(evaluationName: String): [DocumentRetrievalMetrics!]!

"""All descendant spans (children, grandchildren, etc.)"""
descendants: [Span!]!
}
Expand Down
36 changes: 34 additions & 2 deletions src/phoenix/core/evals.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from types import MethodType
from typing import DefaultDict, Dict, List, Optional

import numpy as np
from google.protobuf.json_format import MessageToDict
from typing_extensions import TypeAlias, assert_never

Expand All @@ -26,7 +27,6 @@ def __init__(self) -> None:
self._queue: "SimpleQueue[Optional[pb.Evaluation]]" = SimpleQueue()
weakref.finalize(self, self._queue.put, END_OF_QUEUE)
self._lock = RLock()
self._start_consumer()
self._trace_evaluations_by_name: DefaultDict[
EvaluationName, Dict[TraceID, pb.Evaluation]
] = defaultdict(dict)
Expand All @@ -42,6 +42,10 @@ def __init__(self) -> None:
self._document_evaluations_by_span_id: DefaultDict[
SpanID, DefaultDict[EvaluationName, Dict[DocumentPosition, pb.Evaluation]]
] = defaultdict(lambda: defaultdict(dict))
self._document_evaluations_by_name: DefaultDict[
EvaluationName, DefaultDict[SpanID, Dict[DocumentPosition, pb.Evaluation]]
] = defaultdict(lambda: defaultdict(dict))
self._start_consumer()

def put(self, evaluation: pb.Evaluation) -> None:
self._queue.put(evaluation)
Expand Down Expand Up @@ -69,6 +73,7 @@ def _process_evaluation(self, evaluation: pb.Evaluation) -> None:
span_id = SpanID(document_retrieval_id.span_id)
document_position = document_retrieval_id.document_position
self._document_evaluations_by_span_id[span_id][name][document_position] = evaluation
self._document_evaluations_by_name[name][span_id][document_position] = evaluation
elif subject_id_kind == "span_id":
span_id = SpanID(subject_id.span_id)
self._evaluations_by_span_id[span_id][name] = evaluation
Expand All @@ -90,7 +95,16 @@ def get_span_evaluation(self, span_id: SpanID, name: str) -> Optional[pb.Evaluat

def get_span_evaluation_names(self) -> List[EvaluationName]:
with self._lock:
return list(self._span_evaluations_by_name.keys())
return list(self._span_evaluations_by_name)

def get_document_evaluation_names(
self,
span_id: Optional[SpanID] = None,
) -> List[EvaluationName]:
with self._lock:
if span_id is None:
return list(self._document_evaluations_by_name)
return list(self._document_evaluations_by_span_id[span_id])

def get_evaluations_by_span_id(self, span_id: SpanID) -> List[pb.Evaluation]:
with self._lock:
Expand All @@ -102,3 +116,21 @@ def get_document_evaluations_by_span_id(self, span_id: SpanID) -> List[pb.Evalua
for evaluations in self._document_evaluations_by_span_id[span_id].values():
all_evaluations.extend(evaluations.values())
return all_evaluations

def get_document_evaluation_scores(
self,
span_id: SpanID,
evaluation_name: str,
num_documents: int,
) -> List[float]:
# num_documents is needed as argument because the document position values
# are not checked during ingestion: e.g. if there exists a position value
# of one trillion, we would not want to create a result that large.
scores: List[float] = [np.nan] * num_documents
with self._lock:
evaluations = self._document_evaluations_by_span_id[span_id][evaluation_name]
for document_position, evaluation in evaluations.items():
result = evaluation.result
if result.HasField("score") and document_position < num_documents:
scores[document_position] = result.score.value
return scores
105 changes: 105 additions & 0 deletions src/phoenix/metrics/retrieval_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from dataclasses import dataclass, field
from typing import Iterable, Optional, cast

import numpy as np
import pandas as pd
from sklearn.metrics import ndcg_score


@dataclass(frozen=True)
class RetrievalMetrics:
"""
Ranking metrics computed on a list of evaluation scores sorted from high to
low by their ranking scores (prior to evaluation). For example, if the items
are search results and the evaluation scores are their relevance scores (e.g.
1 if relevant and 0 if not relevant), then the evaluation scores should be
sorted by the original order of the displayed results, i.e. the first search
result should go first. For more info on these metrics,
see https://cran.r-project.org/web/packages/recometrics/vignettes/Evaluating_recommender_systems.html
""" # noqa: E501

eval_scores: "pd.Series[float]"
length: int = field(init=False)
has_nan: bool = field(init=False)

def __init__(self, eval_scores: Iterable[float]) -> None:
_eval_scores = np.fromiter(eval_scores, dtype=float)
object.__setattr__(self, "length", len(_eval_scores))
object.__setattr__(self, "has_nan", not np.all(np.isfinite(_eval_scores)))
if self.length < 2:
# len < 2 won't work for sklearn.metrics.ndcg_score, so we pad it
# with zeros (but still keep track of the original length)
_scores = _eval_scores
_eval_scores = np.zeros(2)
_eval_scores[: len(_scores)] = _scores
# For ranking metrics, the actual scores used for ranking are only
# needed for sorting the items. Since we assume the items are already
# sorted from high to low by their ranking scores, we can assign ranking
# scores to be the reverse of the indices of eval_scores, just so that
# it goes from high to low.
ranking_scores = reversed(range(len(_eval_scores)))
object.__setattr__(
self,
"eval_scores",
pd.Series(_eval_scores, dtype=float, index=ranking_scores), # type: ignore
)

def ndcg(self, k: Optional[int] = None) -> float:
"""
Normalized Discounted Cumulative Gain (NDCG) at `k` with log base 2
discounting. If `k` is None, it's set to the length of the scores. If
`k` < 1, return 0.0.
"""
if self.has_nan:
return np.nan
if k is None:
k = self.length
if k < 1:
return 0.0
y_true = [self.eval_scores]
y_score = [self.eval_scores.index]
# Note that ndcg_score calculates differently depending on whether ties
# are involved, but this is not an issue for us because our setup has no
# ties in y_score, so we can set ignore_ties=True.
return cast(float, ndcg_score(y_true=y_true, y_score=y_score, k=k, ignore_ties=True))

def precision(self, k: Optional[int] = None) -> float:
"""
Precision at `k`, defined as the fraction of truthy scores among first
`k` positions (1-based index). If `k` is None, then it's set to the
length of the scores. If `k` < 1, return 0.0.
"""
if self.has_nan:
return np.nan
if k is None:
k = self.length
if k < 1:
return 0.0
return self.eval_scores[:k].astype(bool).sum() / k

def reciprocal_rank(self) -> float:
"""
Return `1/R` where `R` is the rank of the first hit, i.e. the 1-based
index position of first truthy score, e.g. score=1. If a non-finite
value (e.g. `NaN`) is encountered before the first (finite) truthy
score, then return `NaN`, otherwise if no truthy score is found (or if
the count of scores is zero), return 0.0.
"""
for i, score in enumerate(self.eval_scores):
if not np.isfinite(score):
return np.nan
if score:
return 1 / (i + 1)
return 0.0

def hit(self) -> float:
"""
Return 1.0 if any score is truthy (i.e. is a hit), e.g. score=1.
Otherwise, return `NaN` if any score is non-finite (e.g. `NaN`), or
return 0.0 if all scores are falsy, e.g. all scores are 0.
"""
if self.eval_scores.any():
return 1.0
if self.has_nan:
return np.nan
return 0.0
15 changes: 15 additions & 0 deletions src/phoenix/server/api/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from phoenix.server.api.types.Cluster import Cluster, to_gql_clusters

from ...trace.filter import SpanFilter
from ...trace.schemas import SpanID
from .context import Context
from .input_types.TimeRange import TimeRange
from .types.DatasetInfo import TraceDatasetInfo
Expand Down Expand Up @@ -251,6 +252,20 @@ def span_evaluation_names(
return []
return evals.get_span_evaluation_names()

@strawberry.field(
description="Names of available document evaluations.",
) # type: ignore
def document_evaluation_names(
self,
info: Info[Context, None],
span_id: Optional[ID] = UNSET,
) -> List[str]:
if (evals := info.context.evals) is None:
return []
return evals.get_document_evaluation_names(
None if span_id is UNSET else SpanID(span_id),
)

@strawberry.field
def trace_dataset_info(
self,
Expand Down
47 changes: 47 additions & 0 deletions src/phoenix/server/api/types/DocumentRetrievalMetrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import math
import re
from typing import Optional

import strawberry
from strawberry import UNSET, Private

from phoenix.metrics.retrieval_metrics import RetrievalMetrics


def _clean_docstring(docstring: Optional[str]) -> Optional[str]:
return re.sub(r"\s*\n+\s*", " ", docstring).strip() if docstring else None


_ndcg_docstring = _clean_docstring(RetrievalMetrics.ndcg.__doc__)
_precision_docstring = _clean_docstring(RetrievalMetrics.precision.__doc__)
_reciprocal_rank_docstring = _clean_docstring(RetrievalMetrics.reciprocal_rank.__doc__)
_hit_docstring = _clean_docstring(RetrievalMetrics.hit.__doc__)


@strawberry.type(
description="A collection of retrieval metrics computed on a list of document "
"evaluation scores: NDCG@K, Precision@K, Reciprocal Rank, etc."
)
class DocumentRetrievalMetrics:
evaluation_name: str
metrics: Private[RetrievalMetrics]

@strawberry.field(description=_ndcg_docstring) # type: ignore
def ndcg(self, k: Optional[int] = UNSET) -> Optional[float]:
value = self.metrics.ndcg(None if k is UNSET else k)
return value if math.isfinite(value) else None

@strawberry.field(description=_precision_docstring) # type: ignore
def precision(self, k: Optional[int] = UNSET) -> Optional[float]:
value = self.metrics.precision(None if k is UNSET else k)
return value if math.isfinite(value) else None

@strawberry.field(description=_reciprocal_rank_docstring) # type: ignore
def reciprocal_rank(self) -> Optional[float]:
value = self.metrics.reciprocal_rank()
return value if math.isfinite(value) else None

@strawberry.field(description=_hit_docstring) # type: ignore
def hit(self) -> Optional[float]:
value = self.metrics.hit()
return value if math.isfinite(value) else None
Loading

0 comments on commit 3dfb7bd

Please sign in to comment.