Skip to content

Commit

Permalink
Distributor: fix bug in merging exemplars (cortexproject#4583)
Browse files Browse the repository at this point in the history
We need to add the merged value back to the map.

Extract merging as a separate function so it can be tested.
Adapt the existing test to cover multiple series.

Signed-off-by: Bryan Boreham <[email protected]>
Signed-off-by: Alvin Lin <[email protected]>
  • Loading branch information
bboreham authored and alvinlin123 committed Jan 14, 2022
1 parent f12fe5f commit 88ea8b7
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* [ENHANCEMENT] Updated Prometheus to latest. Includes changes from prometheus#9239, adding 15 new functions. Multiple TSDB bugfixes prometheus#9438 & prometheus#9381. #4524
* [ENHANCEMENT] Query Frontend: Add setting `-frontend.forward-headers-list` in frontend to configure the set of headers from the requests to be forwarded to downstream requests. #4486
* [BUGFIX] AlertManager: remove stale template files. #4495
* [BUGFIX] Distributor: fix bug in query-exemplar where some results would get dropped. #4582

## 1.11.0 2021-11-25

Expand Down
13 changes: 9 additions & 4 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,10 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
return nil, err
}

// Merge results from replication set.
return mergeExemplarQueryResponses(results), nil
}

func mergeExemplarQueryResponses(results []interface{}) *ingester_client.ExemplarQueryResponse {
var keys []string
exemplarResults := make(map[string]cortexpb.TimeSeries)
for _, result := range results {
Expand All @@ -262,9 +265,11 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
if !ok {
exemplarResults[lbls] = ts
keys = append(keys, lbls)
} else {
// Merge in any missing values from another ingesters exemplars for this series.
e.Exemplars = mergeExemplarSets(e.Exemplars, ts.Exemplars)
exemplarResults[lbls] = e
}
// Merge in any missing values from another ingesters exemplars for this series.
e.Exemplars = mergeExemplarSets(e.Exemplars, ts.Exemplars)
}
}

Expand All @@ -276,7 +281,7 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
result[i] = exemplarResults[k]
}

return &ingester_client.ExemplarQueryResponse{Timeseries: result}, nil
return &ingester_client.ExemplarQueryResponse{Timeseries: result}
}

// queryIngesterStream queries the ingesters using the new streaming API.
Expand Down
82 changes: 54 additions & 28 deletions pkg/distributor/query_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package distributor

import (
"fmt"
"testing"
"time"

Expand All @@ -9,6 +10,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/cortexpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
)

func TestMergeSamplesIntoFirstDuplicates(t *testing.T) {
Expand Down Expand Up @@ -110,51 +112,75 @@ func TestMergeSamplesIntoFirstNilB(t *testing.T) {
require.Equal(t, b, a)
}

func TestMergeExemplarSets(t *testing.T) {
func TestMergeExemplars(t *testing.T) {
now := timestamp.FromTime(time.Now())
exemplar1 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-1")), TimestampMs: now, Value: 1}
exemplar2 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-2")), TimestampMs: now + 1, Value: 2}
exemplar3 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-3")), TimestampMs: now + 4, Value: 3}
exemplar4 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-4")), TimestampMs: now + 8, Value: 7}
exemplar5 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-4")), TimestampMs: now, Value: 7}

for _, c := range []struct {
exemplarsA []cortexpb.Exemplar
exemplarsB []cortexpb.Exemplar
expected []cortexpb.Exemplar
labels1 := []cortexpb.LabelAdapter{{Name: "label1", Value: "foo1"}}
labels2 := []cortexpb.LabelAdapter{{Name: "label1", Value: "foo2"}}

for i, c := range []struct {
seriesA []cortexpb.TimeSeries
seriesB []cortexpb.TimeSeries
expected []cortexpb.TimeSeries
nonReversible bool
}{
{
exemplarsA: []cortexpb.Exemplar{},
exemplarsB: []cortexpb.Exemplar{},
expected: []cortexpb.Exemplar{},
},
{
exemplarsA: []cortexpb.Exemplar{exemplar1},
exemplarsB: []cortexpb.Exemplar{},
expected: []cortexpb.Exemplar{exemplar1},
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}},
seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}},
expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}},
},
{
exemplarsA: []cortexpb.Exemplar{},
exemplarsB: []cortexpb.Exemplar{exemplar1},
expected: []cortexpb.Exemplar{exemplar1},
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}},
seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}},
expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}},
},
{
exemplarsA: []cortexpb.Exemplar{exemplar1},
exemplarsB: []cortexpb.Exemplar{exemplar1},
expected: []cortexpb.Exemplar{exemplar1},
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}},
seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}},
expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}},
},
{
exemplarsA: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3},
exemplarsB: []cortexpb.Exemplar{exemplar1, exemplar3, exemplar4},
expected: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4},
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3}}},
seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar3, exemplar4}}},
expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}}},
},
{ // Ensure that when there are exemplars with duplicate timestamps, the first one wins.
exemplarsA: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3},
exemplarsB: []cortexpb.Exemplar{exemplar5, exemplar3, exemplar4},
expected: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4},
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3}}},
seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar5, exemplar3, exemplar4}}},
expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}}},
nonReversible: true,
},
{ // Disjoint exemplars on two different series.
seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}}},
seriesB: []cortexpb.TimeSeries{{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3, exemplar4}}},
expected: []cortexpb.TimeSeries{
{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}},
{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3, exemplar4}}},
},
{ // Second input adds to first on one series.
seriesA: []cortexpb.TimeSeries{
{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}},
{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3}}},
seriesB: []cortexpb.TimeSeries{{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar4}}},
expected: []cortexpb.TimeSeries{
{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}},
{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3, exemplar4}}},
},
} {
e := mergeExemplarSets(c.exemplarsA, c.exemplarsB)
require.Equal(t, c.expected, e)
t.Run(fmt.Sprint("test", i), func(t *testing.T) {
rA := &ingester_client.ExemplarQueryResponse{Timeseries: c.seriesA}
rB := &ingester_client.ExemplarQueryResponse{Timeseries: c.seriesB}
e := mergeExemplarQueryResponses([]interface{}{rA, rB})
require.Equal(t, c.expected, e.Timeseries)
if !c.nonReversible {
// Check the other way round too
e = mergeExemplarQueryResponses([]interface{}{rB, rA})
require.Equal(t, c.expected, e.Timeseries)
}
})
}
}

0 comments on commit 88ea8b7

Please sign in to comment.