Skip to content

Commit

Permalink
Merge responses without sort for topk and bottomk
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
  • Loading branch information
harry671003 authored and Nathaniel Graham committed Apr 17, 2023
1 parent 704ef5d commit 425aa57
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 194 deletions.
104 changes: 67 additions & 37 deletions pkg/queryfrontend/queryinstant_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (

"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"

"github.com/prometheus/prometheus/promql/parser"
promqlparser "github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/internal/cortex/cortexpb"
"github.com/thanos-io/thanos/internal/cortex/querier/queryrange"
Expand Down Expand Up @@ -262,7 +262,8 @@ func (c queryInstantCodec) DecodeResponse(ctx context.Context, r *http.Response,

func vectorMerge(req queryrange.Request, resps []*queryrange.PrometheusInstantQueryResponse) (*queryrange.Vector, error) {
output := map[string]*queryrange.Sample{}
sortAsc, sortDesc, err := parseQueryForSort(req.GetQuery())
metrics := []string{} // Used to preserve the order for topk and bottomk.
sortPlan, err := sortPlanForQuery(req.GetQuery())
if err != nil {
return nil, err
}
Expand All @@ -283,17 +284,27 @@ func vectorMerge(req queryrange.Request, resps []*queryrange.PrometheusInstantQu
metric := cortexpb.FromLabelAdaptersToLabels(sample.Labels).String()
if existingSample, ok := output[metric]; !ok {
output[metric] = s
metrics = append(metrics, metric) // Preserve the order of metric.
} else if existingSample.GetSample().TimestampMs < s.GetSample().TimestampMs {
// Choose the latest sample if we see overlap.
output[metric] = s
}
}
}

result := &queryrange.Vector{
Samples: make([]*queryrange.Sample, 0, len(output)),
}

if len(output) == 0 {
return &queryrange.Vector{
Samples: make([]*queryrange.Sample, 0),
}, nil
return result, nil
}

if sortPlan == mergeOnly {
for _, k := range metrics {
result.Samples = append(result.Samples, output[k])
}
return result, nil
}

type pair struct {
Expand All @@ -310,60 +321,79 @@ func vectorMerge(req queryrange.Request, resps []*queryrange.PrometheusInstantQu
}

sort.Slice(samples, func(i, j int) bool {
// Order is determined by the sortFn in the query.
if sortAsc {
// Order is determined by vector
switch sortPlan {
case sortByValuesAsc:
return samples[i].s.Sample.Value < samples[j].s.Sample.Value
} else if sortDesc {
case sortByValuesDesc:
return samples[i].s.Sample.Value > samples[j].s.Sample.Value
} else {
// Fallback on sorting by labels.
return samples[i].metric < samples[j].metric
}
return samples[i].metric < samples[j].metric
})
result := &queryrange.Vector{
Samples: make([]*queryrange.Sample, 0, len(output)),
}

for _, p := range samples {
result.Samples = append(result.Samples, p.s)
}
return result, nil
}

func parseQueryForSort(q string) (bool, bool, error) {
type sortPlan int

const (
mergeOnly sortPlan = 0
sortByValuesAsc sortPlan = 1
sortByValuesDesc sortPlan = 2
sortByLabels sortPlan = 3
)

func sortPlanForQuery(q string) (sortPlan, error) {
expr, err := promqlparser.ParseExpr(q)
if err != nil {
return false, false, err
}
var sortAsc bool = false
var sortDesc bool = false
done := errors.New("done")
promqlparser.Inspect(expr, func(n promqlparser.Node, _ []promqlparser.Node) error {
if n, ok := n.(*promqlparser.AggregateExpr); ok {
if n.Op == promqlparser.TOPK {
sortDesc = true
return done
}
if n.Op == promqlparser.BOTTOMK {
sortAsc = true
return done
}
return nil
return 0, err
}
// Check if the root expression is topk or bottomk
if aggr, ok := expr.(*parser.AggregateExpr); ok {
if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK {
return mergeOnly, nil
}
if n, ok := n.(*promqlparser.Call); ok {
}
checkForSort := func(expr promqlparser.Expr) (sortAsc, sortDesc bool) {
if n, ok := expr.(*promqlparser.Call); ok {
if n.Func != nil {
if n.Func.Name == "sort" {
sortAsc = true
return done
}
if n.Func.Name == "sort_desc" {
sortDesc = true
return done
}
}
}
return nil
})
return sortAsc, sortDesc, nil
return sortAsc, sortDesc
}
// Check the root expression for sort
if sortAsc, sortDesc := checkForSort(expr); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}

// If the root expression is a binary expression, check the LHS and RHS for sort
if bin, ok := expr.(*parser.BinaryExpr); ok {
if sortAsc, sortDesc := checkForSort(bin.LHS); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}
if sortAsc, sortDesc := checkForSort(bin.RHS); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}
}
return sortByLabels, nil
}

func matrixMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange.Matrix {
Expand Down
Loading

0 comments on commit 425aa57

Please sign in to comment.