diff --git a/CHANGELOG.md b/CHANGELOG.md index 91c878e508..e6d4a9635c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * [ENHANCEMENT] Updated Prometheus to latest. Includes changes from prometheus#9239, adding 15 new functions. Multiple TSDB bugfixes prometheus#9438 & prometheus#9381. #4524 * [ENHANCEMENT] Query Frontend: Add setting `-frontend.forward-headers-list` in frontend to configure the set of headers from the requests to be forwarded to downstream requests. #4486 * [BUGFIX] AlertManager: remove stale template files. #4495 +* [BUGFIX] Distributor: fix bug in query-exemplar where some results would get dropped. #4582 ## 1.11.0 2021-11-25 diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 436c91b1f2..da22f35187 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -251,7 +251,10 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe return nil, err } - // Merge results from replication set. + return mergeExemplarQueryResponses(results), nil +} + +func mergeExemplarQueryResponses(results []interface{}) *ingester_client.ExemplarQueryResponse { var keys []string exemplarResults := make(map[string]cortexpb.TimeSeries) for _, result := range results { @@ -262,9 +265,11 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe if !ok { exemplarResults[lbls] = ts keys = append(keys, lbls) + } else { + // Merge in any missing values from another ingesters exemplars for this series. + e.Exemplars = mergeExemplarSets(e.Exemplars, ts.Exemplars) + exemplarResults[lbls] = e } - // Merge in any missing values from another ingesters exemplars for this series. - e.Exemplars = mergeExemplarSets(e.Exemplars, ts.Exemplars) } } @@ -276,7 +281,7 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe result[i] = exemplarResults[k] } - return &ingester_client.ExemplarQueryResponse{Timeseries: result}, nil + return &ingester_client.ExemplarQueryResponse{Timeseries: result} } // queryIngesterStream queries the ingesters using the new streaming API. diff --git a/pkg/distributor/query_test.go b/pkg/distributor/query_test.go index 09414bdcf2..eced05a2c8 100644 --- a/pkg/distributor/query_test.go +++ b/pkg/distributor/query_test.go @@ -1,6 +1,7 @@ package distributor import ( + "fmt" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/cortexpb" + ingester_client "github.com/cortexproject/cortex/pkg/ingester/client" ) func TestMergeSamplesIntoFirstDuplicates(t *testing.T) { @@ -110,51 +112,75 @@ func TestMergeSamplesIntoFirstNilB(t *testing.T) { require.Equal(t, b, a) } -func TestMergeExemplarSets(t *testing.T) { +func TestMergeExemplars(t *testing.T) { now := timestamp.FromTime(time.Now()) exemplar1 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-1")), TimestampMs: now, Value: 1} exemplar2 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-2")), TimestampMs: now + 1, Value: 2} exemplar3 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-3")), TimestampMs: now + 4, Value: 3} exemplar4 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-4")), TimestampMs: now + 8, Value: 7} exemplar5 := cortexpb.Exemplar{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "trace-4")), TimestampMs: now, Value: 7} - - for _, c := range []struct { - exemplarsA []cortexpb.Exemplar - exemplarsB []cortexpb.Exemplar - expected []cortexpb.Exemplar + labels1 := []cortexpb.LabelAdapter{{Name: "label1", Value: "foo1"}} + labels2 := []cortexpb.LabelAdapter{{Name: "label1", Value: "foo2"}} + + for i, c := range []struct { + seriesA []cortexpb.TimeSeries + seriesB []cortexpb.TimeSeries + expected []cortexpb.TimeSeries + nonReversible bool }{ { - exemplarsA: []cortexpb.Exemplar{}, - exemplarsB: []cortexpb.Exemplar{}, - expected: []cortexpb.Exemplar{}, - }, - { - exemplarsA: []cortexpb.Exemplar{exemplar1}, - exemplarsB: []cortexpb.Exemplar{}, - expected: []cortexpb.Exemplar{exemplar1}, + seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}}, + seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}}, + expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}}, }, { - exemplarsA: []cortexpb.Exemplar{}, - exemplarsB: []cortexpb.Exemplar{exemplar1}, - expected: []cortexpb.Exemplar{exemplar1}, + seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}}, + seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{}}}, + expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}}, }, { - exemplarsA: []cortexpb.Exemplar{exemplar1}, - exemplarsB: []cortexpb.Exemplar{exemplar1}, - expected: []cortexpb.Exemplar{exemplar1}, + seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}}, + seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}}, + expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1}}}, }, { - exemplarsA: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3}, - exemplarsB: []cortexpb.Exemplar{exemplar1, exemplar3, exemplar4}, - expected: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}, + seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3}}}, + seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar3, exemplar4}}}, + expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}}}, }, { // Ensure that when there are exemplars with duplicate timestamps, the first one wins. - exemplarsA: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3}, - exemplarsB: []cortexpb.Exemplar{exemplar5, exemplar3, exemplar4}, - expected: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}, + seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3}}}, + seriesB: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar5, exemplar3, exemplar4}}}, + expected: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}}}, + nonReversible: true, + }, + { // Disjoint exemplars on two different series. + seriesA: []cortexpb.TimeSeries{{Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}}}, + seriesB: []cortexpb.TimeSeries{{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3, exemplar4}}}, + expected: []cortexpb.TimeSeries{ + {Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}}, + {Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3, exemplar4}}}, + }, + { // Second input adds to first on one series. + seriesA: []cortexpb.TimeSeries{ + {Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}}, + {Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3}}}, + seriesB: []cortexpb.TimeSeries{{Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar4}}}, + expected: []cortexpb.TimeSeries{ + {Labels: labels1, Exemplars: []cortexpb.Exemplar{exemplar1, exemplar2}}, + {Labels: labels2, Exemplars: []cortexpb.Exemplar{exemplar3, exemplar4}}}, }, } { - e := mergeExemplarSets(c.exemplarsA, c.exemplarsB) - require.Equal(t, c.expected, e) + t.Run(fmt.Sprint("test", i), func(t *testing.T) { + rA := &ingester_client.ExemplarQueryResponse{Timeseries: c.seriesA} + rB := &ingester_client.ExemplarQueryResponse{Timeseries: c.seriesB} + e := mergeExemplarQueryResponses([]interface{}{rA, rB}) + require.Equal(t, c.expected, e.Timeseries) + if !c.nonReversible { + // Check the other way round too + e = mergeExemplarQueryResponses([]interface{}{rB, rA}) + require.Equal(t, c.expected, e.Timeseries) + } + }) } }