From 425aa573b033e3f22fd8a91fdb8f0729156db8af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Wed, 22 Feb 2023 10:06:16 -0800 Subject: [PATCH] Merge responses without sort for topk and bottomk MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- pkg/queryfrontend/queryinstant_codec.go | 104 ++++++--- pkg/queryfrontend/queryinstant_codec_test.go | 224 ++++++------------- 2 files changed, 134 insertions(+), 194 deletions(-) diff --git a/pkg/queryfrontend/queryinstant_codec.go b/pkg/queryfrontend/queryinstant_codec.go index b5adedebbdc..27d01beffb7 100644 --- a/pkg/queryfrontend/queryinstant_codec.go +++ b/pkg/queryfrontend/queryinstant_codec.go @@ -16,10 +16,10 @@ import ( "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" - "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" + "github.com/prometheus/prometheus/promql/parser" promqlparser "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-io/thanos/internal/cortex/cortexpb" "github.com/thanos-io/thanos/internal/cortex/querier/queryrange" @@ -262,7 +262,8 @@ func (c queryInstantCodec) DecodeResponse(ctx context.Context, r *http.Response, func vectorMerge(req queryrange.Request, resps []*queryrange.PrometheusInstantQueryResponse) (*queryrange.Vector, error) { output := map[string]*queryrange.Sample{} - sortAsc, sortDesc, err := parseQueryForSort(req.GetQuery()) + metrics := []string{} // Used to preserve the order for topk and bottomk. + sortPlan, err := sortPlanForQuery(req.GetQuery()) if err != nil { return nil, err } @@ -283,6 +284,7 @@ func vectorMerge(req queryrange.Request, resps []*queryrange.PrometheusInstantQu metric := cortexpb.FromLabelAdaptersToLabels(sample.Labels).String() if existingSample, ok := output[metric]; !ok { output[metric] = s + metrics = append(metrics, metric) // Preserve the order of metric. } else if existingSample.GetSample().TimestampMs < s.GetSample().TimestampMs { // Choose the latest sample if we see overlap. output[metric] = s @@ -290,10 +292,19 @@ func vectorMerge(req queryrange.Request, resps []*queryrange.PrometheusInstantQu } } + result := &queryrange.Vector{ + Samples: make([]*queryrange.Sample, 0, len(output)), + } + if len(output) == 0 { - return &queryrange.Vector{ - Samples: make([]*queryrange.Sample, 0), - }, nil + return result, nil + } + + if sortPlan == mergeOnly { + for _, k := range metrics { + result.Samples = append(result.Samples, output[k]) + } + return result, nil } type pair struct { @@ -310,60 +321,79 @@ func vectorMerge(req queryrange.Request, resps []*queryrange.PrometheusInstantQu } sort.Slice(samples, func(i, j int) bool { - // Order is determined by the sortFn in the query. - if sortAsc { + // Order is determined by vector + switch sortPlan { + case sortByValuesAsc: return samples[i].s.Sample.Value < samples[j].s.Sample.Value - } else if sortDesc { + case sortByValuesDesc: return samples[i].s.Sample.Value > samples[j].s.Sample.Value - } else { - // Fallback on sorting by labels. - return samples[i].metric < samples[j].metric } + return samples[i].metric < samples[j].metric }) - result := &queryrange.Vector{ - Samples: make([]*queryrange.Sample, 0, len(output)), - } + for _, p := range samples { result.Samples = append(result.Samples, p.s) } return result, nil } -func parseQueryForSort(q string) (bool, bool, error) { +type sortPlan int + +const ( + mergeOnly sortPlan = 0 + sortByValuesAsc sortPlan = 1 + sortByValuesDesc sortPlan = 2 + sortByLabels sortPlan = 3 +) + +func sortPlanForQuery(q string) (sortPlan, error) { expr, err := promqlparser.ParseExpr(q) if err != nil { - return false, false, err - } - var sortAsc bool = false - var sortDesc bool = false - done := errors.New("done") - promqlparser.Inspect(expr, func(n promqlparser.Node, _ []promqlparser.Node) error { - if n, ok := n.(*promqlparser.AggregateExpr); ok { - if n.Op == promqlparser.TOPK { - sortDesc = true - return done - } - if n.Op == promqlparser.BOTTOMK { - sortAsc = true - return done - } - return nil + return 0, err + } + // Check if the root expression is topk or bottomk + if aggr, ok := expr.(*parser.AggregateExpr); ok { + if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK { + return mergeOnly, nil } - if n, ok := n.(*promqlparser.Call); ok { + } + checkForSort := func(expr promqlparser.Expr) (sortAsc, sortDesc bool) { + if n, ok := expr.(*promqlparser.Call); ok { if n.Func != nil { if n.Func.Name == "sort" { sortAsc = true - return done } if n.Func.Name == "sort_desc" { sortDesc = true - return done } } } - return nil - }) - return sortAsc, sortDesc, nil + return sortAsc, sortDesc + } + // Check the root expression for sort + if sortAsc, sortDesc := checkForSort(expr); sortAsc || sortDesc { + if sortAsc { + return sortByValuesAsc, nil + } + return sortByValuesDesc, nil + } + + // If the root expression is a binary expression, check the LHS and RHS for sort + if bin, ok := expr.(*parser.BinaryExpr); ok { + if sortAsc, sortDesc := checkForSort(bin.LHS); sortAsc || sortDesc { + if sortAsc { + return sortByValuesAsc, nil + } + return sortByValuesDesc, nil + } + if sortAsc, sortDesc := checkForSort(bin.RHS); sortAsc || sortDesc { + if sortAsc { + return sortByValuesAsc, nil + } + return sortByValuesDesc, nil + } + } + return sortByLabels, nil } func matrixMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange.Matrix { diff --git a/pkg/queryfrontend/queryinstant_codec_test.go b/pkg/queryfrontend/queryinstant_codec_test.go index ac638a6da3e..8f67928fcab 100644 --- a/pkg/queryfrontend/queryinstant_codec_test.go +++ b/pkg/queryfrontend/queryinstant_codec_test.go @@ -409,166 +409,10 @@ func TestMergeResponse(t *testing.T) { }, }, }, - { - name: "merge two responses with sort_desc", - req: &queryrange.PrometheusRequest{ - Query: "1 + sort_desc(bottomk(1, up))", - }, - resps: []queryrange.Response{ - &queryrange.PrometheusInstantQueryResponse{ - Status: queryrange.StatusSuccess, - Data: queryrange.PrometheusInstantQueryData{ - ResultType: model.ValVector.String(), - Result: queryrange.PrometheusInstantQueryResult{ - Result: &queryrange.PrometheusInstantQueryResult_Vector{ - Vector: &queryrange.Vector{ - Samples: []*queryrange.Sample{ - { - Sample: cortexpb.Sample{TimestampMs: 0, Value: 3}, - Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ - "__name__": "up", - "job": "foo", - })), - }, - }, - }, - }, - }, - }, - }, - &queryrange.PrometheusInstantQueryResponse{ - Status: queryrange.StatusSuccess, - Data: queryrange.PrometheusInstantQueryData{ - ResultType: model.ValVector.String(), - Result: queryrange.PrometheusInstantQueryResult{ - Result: &queryrange.PrometheusInstantQueryResult_Vector{ - Vector: &queryrange.Vector{ - Samples: []*queryrange.Sample{ - { - Sample: cortexpb.Sample{TimestampMs: 0, Value: 10}, - Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ - "__name__": "up", - "job": "bar", - })), - }, - }, - }, - }, - }, - }, - }, - }, - expectedResp: &queryrange.PrometheusInstantQueryResponse{ - Status: queryrange.StatusSuccess, - Data: queryrange.PrometheusInstantQueryData{ - ResultType: model.ValVector.String(), - Result: queryrange.PrometheusInstantQueryResult{ - Result: &queryrange.PrometheusInstantQueryResult_Vector{ - Vector: &queryrange.Vector{ - Samples: []*queryrange.Sample{ - { - Sample: cortexpb.Sample{TimestampMs: 0, Value: 10}, - Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ - "__name__": "up", - "job": "bar", - })), - }, - { - Sample: cortexpb.Sample{TimestampMs: 0, Value: 3}, - Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ - "__name__": "up", - "job": "foo", - })), - }, - }, - }, - }, - }, - }, - }, - }, { name: "merge two responses with topk", req: &queryrange.PrometheusRequest{ - Query: "1 + topk(10, sort(up))", - }, - resps: []queryrange.Response{ - &queryrange.PrometheusInstantQueryResponse{ - Status: queryrange.StatusSuccess, - Data: queryrange.PrometheusInstantQueryData{ - ResultType: model.ValVector.String(), - Result: queryrange.PrometheusInstantQueryResult{ - Result: &queryrange.PrometheusInstantQueryResult_Vector{ - Vector: &queryrange.Vector{ - Samples: []*queryrange.Sample{ - { - Sample: cortexpb.Sample{TimestampMs: 0, Value: 1}, - Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ - "__name__": "up", - "job": "foo", - })), - }, - }, - }, - }, - }, - }, - }, - &queryrange.PrometheusInstantQueryResponse{ - Status: queryrange.StatusSuccess, - Data: queryrange.PrometheusInstantQueryData{ - ResultType: model.ValVector.String(), - Result: queryrange.PrometheusInstantQueryResult{ - Result: &queryrange.PrometheusInstantQueryResult_Vector{ - Vector: &queryrange.Vector{ - Samples: []*queryrange.Sample{ - { - Sample: cortexpb.Sample{TimestampMs: 0, Value: 2}, - Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ - "__name__": "up", - "job": "bar", - })), - }, - }, - }, - }, - }, - }, - }, - }, - expectedResp: &queryrange.PrometheusInstantQueryResponse{ - Status: queryrange.StatusSuccess, - Data: queryrange.PrometheusInstantQueryData{ - ResultType: model.ValVector.String(), - Result: queryrange.PrometheusInstantQueryResult{ - Result: &queryrange.PrometheusInstantQueryResult_Vector{ - Vector: &queryrange.Vector{ - Samples: []*queryrange.Sample{ - { - Sample: cortexpb.Sample{TimestampMs: 0, Value: 2}, - Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ - "__name__": "up", - "job": "bar", - })), - }, - { - Sample: cortexpb.Sample{TimestampMs: 0, Value: 1}, - Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{ - "__name__": "up", - "job": "foo", - })), - }, - }, - }, - }, - }, - }, - }, - }, - { - name: "merge two responses with bottomk", - req: &queryrange.PrometheusRequest{ - Query: "1 + bottomk(10, sort(up))", + Query: "topk(10, sort(up)) by (job)", }, resps: []queryrange.Response{ &queryrange.PrometheusInstantQueryResponse{ @@ -1367,3 +1211,69 @@ func TestDecodeResponse(t *testing.T) { testutil.Equals(t, tc.expectedResponse, gotResponse) } } + +func Test_sortPlanForQuery(t *testing.T) { + tc := []struct { + query string + expectedPlan sortPlan + err bool + }{ + { + query: "invalid(10, up)", + expectedPlan: mergeOnly, + err: true, + }, + { + query: "topk(10, up)", + expectedPlan: mergeOnly, + err: false, + }, + { + query: "bottomk(10, up)", + expectedPlan: mergeOnly, + err: false, + }, + { + query: "1 + topk(10, up)", + expectedPlan: sortByLabels, + err: false, + }, + { + query: "1 + sort_desc(sum by (job) (up) )", + expectedPlan: sortByValuesDesc, + err: false, + }, + { + query: "sort(topk by (job) (10, up))", + expectedPlan: sortByValuesAsc, + err: false, + }, + { + query: "topk(5, up) by (job) + sort_desc(up)", + expectedPlan: sortByValuesDesc, + err: false, + }, + { + query: "sort(up) + topk(5, up) by (job)", + expectedPlan: sortByValuesAsc, + err: false, + }, + { + query: "sum(up) by (job)", + expectedPlan: sortByLabels, + err: false, + }, + } + + for _, tc := range tc { + t.Run(tc.query, func(t *testing.T) { + p, err := sortPlanForQuery(tc.query) + if tc.err { + testutil.NotOk(t, err) + } else { + testutil.Ok(t, err) + testutil.Equals(t, tc.expectedPlan, p) + } + }) + } +}