diff --git a/go.mod b/go.mod index 2e350c3586..d5809c9a1e 100644 --- a/go.mod +++ b/go.mod @@ -116,7 +116,6 @@ require ( ) require ( - dario.cat/mergo v1.0.0 github.com/mitchellh/go-ps v1.0.0 github.com/onsi/gomega v1.27.10 github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 diff --git a/go.sum b/go.sum index 74afed96eb..d6225b7eb0 100644 --- a/go.sum +++ b/go.sum @@ -603,8 +603,6 @@ cloud.google.com/go/workflows v1.7.0/go.mod h1:JhSrZuVZWuiDfKEFxU0/F1PQjmpnpcoIS cloud.google.com/go/workflows v1.8.0/go.mod h1:ysGhmEajwZxGn1OhGOGKsTXc5PyxOc0vfKf5Af+to4M= cloud.google.com/go/workflows v1.9.0/go.mod h1:ZGkj1aFIOd9c8Gerkjjq7OW7I5+l6cSvT3ujaO/WwSA= cloud.google.com/go/workflows v1.10.0/go.mod h1:fZ8LmRmZQWacon9UCX1r/g/DfAXx5VcPALq2CxzdePw= -dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= -dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc= diff --git a/internal/cortex/querier/queryrange/query_range.go b/internal/cortex/querier/queryrange/query_range.go index 97375f3670..e81991de91 100644 --- a/internal/cortex/querier/queryrange/query_range.go +++ b/internal/cortex/querier/queryrange/query_range.go @@ -12,14 +12,12 @@ import ( "math" "net/http" "net/url" - "reflect" "sort" "strconv" "strings" "time" "unsafe" - "dario.cat/mergo" "github.com/gogo/protobuf/proto" github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" "github.com/gogo/status" @@ -211,22 +209,38 @@ func NewEmptyPrometheusInstantQueryResponse() *PrometheusInstantQueryResponse { } } -type TimeDurationTransformer struct{} +func traverseAnalysis(a *Analysis, results *[]*Analysis) { + if a == nil { + return + } -func (t TimeDurationTransformer) Transformer(typ reflect.Type) func(dst, src reflect.Value) error { - if typ == reflect.TypeOf(Duration(0)) { - return func(dst, src reflect.Value) error { - if dst.CanSet() { - d := dst.Interface().(Duration) - s := src.Interface().(Duration) + *results = append(*results, a) - merged := d + s - dst.Set(reflect.ValueOf(merged)) - } - return nil + for _, ch := range a.Children { + traverseAnalysis(ch, results) + } +} + +func AnalyzesMerge(analysis ...*Analysis) *Analysis { + if len(analysis) == 0 { + return &Analysis{} + } + + root := analysis[0] + + var rootElements []*Analysis + traverseAnalysis(root, &rootElements) + + for _, a := range analysis[1:] { + var elements []*Analysis + traverseAnalysis(a, &elements) + + for i := 0; i < len(elements) && i < len(rootElements); i++ { + rootElements[i].ExecutionTime += analysis[i].ExecutionTime } } - return nil + + return root } func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response, error) { @@ -246,18 +260,13 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response // Merge the responses. sort.Sort(byFirstTime(promResponses)) - var analysis Analysis + analyzes := make([]*Analysis, 0, len(responses)) for i := range promResponses { if promResponses[i].Data.GetAnalysis() == nil { continue } - if err := mergo.Merge(&analysis, - promResponses[i].Data.GetAnalysis(), - mergo.WithTransformers(TimeDurationTransformer{}), - ); err != nil { - return nil, err - } + analyzes = append(analyzes, promResponses[i].Data.GetAnalysis()) } response := PrometheusResponse{ @@ -266,7 +275,7 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response ResultType: model.ValMatrix.String(), Result: matrixMerge(promResponses), Stats: StatsMerge(responses), - Analysis: &analysis, + Analysis: AnalyzesMerge(analyzes...), }, } diff --git a/internal/cortex/querier/queryrange/query_range_test.go b/internal/cortex/querier/queryrange/query_range_test.go index 978672e0db..121459de5e 100644 --- a/internal/cortex/querier/queryrange/query_range_test.go +++ b/internal/cortex/querier/queryrange/query_range_test.go @@ -344,6 +344,72 @@ func TestMergeAPIResponses(t *testing.T) { }, }, + { + name: "Basic merging of two responses with nested analysis trees.", + input: []Response{ + &PrometheusResponse{ + Data: PrometheusData{ + ResultType: matrix, + Analysis: &Analysis{ + Name: "foo", + Children: []*Analysis{{Name: "bar", ExecutionTime: Duration(1 * time.Second)}}, + ExecutionTime: Duration(1 * time.Second), + }, + Result: []SampleStream{ + { + Labels: []cortexpb.LabelAdapter{}, + Samples: []cortexpb.Sample{ + {Value: 0, TimestampMs: 0}, + {Value: 1, TimestampMs: 1}, + }, + }, + }, + }, + }, + &PrometheusResponse{ + Data: PrometheusData{ + ResultType: matrix, + Analysis: &Analysis{ + Name: "foo", + Children: []*Analysis{{Name: "bar", ExecutionTime: Duration(1 * time.Second)}}, + ExecutionTime: Duration(1 * time.Second), + }, + Result: []SampleStream{ + { + Labels: []cortexpb.LabelAdapter{}, + Samples: []cortexpb.Sample{ + {Value: 2, TimestampMs: 2}, + {Value: 3, TimestampMs: 3}, + }, + }, + }, + }, + }, + }, + expected: &PrometheusResponse{ + Status: StatusSuccess, + Data: PrometheusData{ + ResultType: matrix, + Analysis: &Analysis{ + Name: "foo", + Children: []*Analysis{{Name: "bar", ExecutionTime: Duration(2 * time.Second)}}, + ExecutionTime: Duration(2 * time.Second), + }, + Result: []SampleStream{ + { + Labels: []cortexpb.LabelAdapter{}, + Samples: []cortexpb.Sample{ + {Value: 0, TimestampMs: 0}, + {Value: 1, TimestampMs: 1}, + {Value: 2, TimestampMs: 2}, + {Value: 3, TimestampMs: 3}, + }, + }, + }, + }, + }, + }, + { name: "Merging of responses when labels are in different order.", input: []Response{ diff --git a/pkg/queryfrontend/queryinstant_codec.go b/pkg/queryfrontend/queryinstant_codec.go index e367784ab7..edd412a4a9 100644 --- a/pkg/queryfrontend/queryinstant_codec.go +++ b/pkg/queryfrontend/queryinstant_codec.go @@ -14,7 +14,6 @@ import ( "strconv" "strings" - "dario.cat/mergo" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" @@ -55,18 +54,13 @@ func (c queryInstantCodec) MergeResponse(req queryrange.Request, responses ...qu promResponses = append(promResponses, resp.(*queryrange.PrometheusInstantQueryResponse)) } - var analysis queryrange.Analysis + var analyzes []*queryrange.Analysis for i := range promResponses { if promResponses[i].Data.GetAnalysis() == nil { continue } - if err := mergo.Merge(&analysis, - promResponses[i].Data.GetAnalysis(), - mergo.WithTransformers(queryrange.TimeDurationTransformer{}), - ); err != nil { - return nil, err - } + analyzes = append(analyzes, promResponses[i].Data.GetAnalysis()) } var res queryrange.Response @@ -81,7 +75,7 @@ func (c queryInstantCodec) MergeResponse(req queryrange.Request, responses ...qu Matrix: matrixMerge(promResponses), }, }, - Analysis: &analysis, + Analysis: queryrange.AnalyzesMerge(analyzes...), Stats: queryrange.StatsMerge(responses), }, } @@ -99,7 +93,7 @@ func (c queryInstantCodec) MergeResponse(req queryrange.Request, responses ...qu Vector: v, }, }, - Analysis: &analysis, + Analysis: queryrange.AnalyzesMerge(analyzes...), Stats: queryrange.StatsMerge(responses), }, }