Skip to content

Commit

Permalink
queryfrontend: fix analysis after API changes
Browse files Browse the repository at this point in the history
Fix the analysis functionality with query-frontend after the recent
changes. Added tests for this.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS authored and jnyi committed Apr 4, 2024
1 parent eb720bf commit 67abf60
Show file tree
Hide file tree
Showing 13 changed files with 744 additions and 221 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ require (
)

require (
dario.cat/mergo v1.0.0
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.27.10
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ cloud.google.com/go/workflows v1.7.0/go.mod h1:JhSrZuVZWuiDfKEFxU0/F1PQjmpnpcoIS
cloud.google.com/go/workflows v1.8.0/go.mod h1:ysGhmEajwZxGn1OhGOGKsTXc5PyxOc0vfKf5Af+to4M=
cloud.google.com/go/workflows v1.9.0/go.mod h1:ZGkj1aFIOd9c8Gerkjjq7OW7I5+l6cSvT3ujaO/WwSA=
cloud.google.com/go/workflows v1.10.0/go.mod h1:fZ8LmRmZQWacon9UCX1r/g/DfAXx5VcPALq2CxzdePw=
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc=
Expand Down
161 changes: 116 additions & 45 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (
"math"
"net/http"
"net/url"
"reflect"
"sort"
"strconv"
"strings"
"time"
"unsafe"

"dario.cat/mergo"
"github.com/gogo/protobuf/proto"
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
"github.com/gogo/status"
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -208,6 +211,24 @@ func NewEmptyPrometheusInstantQueryResponse() *PrometheusInstantQueryResponse {
}
}

type TimeDurationTransformer struct{}

func (t TimeDurationTransformer) Transformer(typ reflect.Type) func(dst, src reflect.Value) error {
if typ == reflect.TypeOf(Duration(0)) {
return func(dst, src reflect.Value) error {
if dst.CanSet() {
d := dst.Interface().(Duration)
s := src.Interface().(Duration)

merged := d + s
dst.Set(reflect.ValueOf(merged))
}
return nil
}
}
return nil
}

func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response, error) {
if len(responses) == 0 {
return NewEmptyPrometheusResponse(), nil
Expand All @@ -225,21 +246,27 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response
// Merge the responses.
sort.Sort(byFirstTime(promResponses))

var explanation *Explanation
var analysis Analysis
for i := range promResponses {
if promResponses[i].Data.GetExplanation() != nil {
explanation = promResponses[i].Data.GetExplanation()
break
if promResponses[i].Data.GetAnalysis() == nil {
continue
}

if err := mergo.Merge(&analysis,
promResponses[i].Data.GetAnalysis(),
mergo.WithTransformers(TimeDurationTransformer{}),
); err != nil {
return nil, err
}
}

response := PrometheusResponse{
Status: StatusSuccess,
Data: PrometheusData{
ResultType: model.ValMatrix.String(),
Result: matrixMerge(promResponses),
Stats: StatsMerge(responses),
Explanation: explanation,
ResultType: model.ValMatrix.String(),
Result: matrixMerge(promResponses),
Stats: StatsMerge(responses),
Analysis: &analysis,
},
}

Expand Down Expand Up @@ -533,10 +560,10 @@ func (s *StringSample) UnmarshalJSON(b []byte) error {
// UnmarshalJSON implements json.Unmarshaler.
func (s *PrometheusInstantQueryData) UnmarshalJSON(data []byte) error {
var queryData struct {
ResultType string `json:"resultType"`
Result jsoniter.RawMessage `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
Explanation *Explanation `json:"explanation,omitempty"`
ResultType string `json:"resultType"`
Result jsoniter.RawMessage `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
Analysis *Analysis `json:"analysis,omitempty"`
}

if err := json.Unmarshal(data, &queryData); err != nil {
Expand All @@ -545,7 +572,7 @@ func (s *PrometheusInstantQueryData) UnmarshalJSON(data []byte) error {

s.ResultType = queryData.ResultType
s.Stats = queryData.Stats
s.Explanation = queryData.Explanation
s.Analysis = queryData.Analysis
switch s.ResultType {
case model.ValVector.String():
var result struct {
Expand Down Expand Up @@ -605,54 +632,54 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
switch s.ResultType {
case model.ValVector.String():
res := struct {
ResultType string `json:"resultType"`
Data []*Sample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
Explanation *Explanation `json:"explanation,omitempty"`
ResultType string `json:"resultType"`
Data []*Sample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
Analysis *Analysis `json:"analysis,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetVector().Samples,
Stats: s.Stats,
Explanation: s.Explanation,
ResultType: s.ResultType,
Data: s.Result.GetVector().Samples,
Stats: s.Stats,
Analysis: s.Analysis,
}
return json.Marshal(res)
case model.ValMatrix.String():
res := struct {
ResultType string `json:"resultType"`
Data []*SampleStream `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
Explanation *Explanation `json:"explanation,omitempty"`
ResultType string `json:"resultType"`
Data []*SampleStream `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
Analysis *Analysis `json:"analysis,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetMatrix().SampleStreams,
Stats: s.Stats,
Explanation: s.Explanation,
ResultType: s.ResultType,
Data: s.Result.GetMatrix().SampleStreams,
Stats: s.Stats,
Analysis: s.Analysis,
}
return json.Marshal(res)
case model.ValScalar.String():
res := struct {
ResultType string `json:"resultType"`
Data *cortexpb.Sample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
Explanation *Explanation `json:"explanation,omitempty"`
ResultType string `json:"resultType"`
Data *cortexpb.Sample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
Analysis *Analysis `json:"analysis,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetScalar(),
Stats: s.Stats,
Explanation: s.Explanation,
ResultType: s.ResultType,
Data: s.Result.GetScalar(),
Stats: s.Stats,
Analysis: s.Analysis,
}
return json.Marshal(res)
case model.ValString.String():
res := struct {
ResultType string `json:"resultType"`
Data *StringSample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
Explanation *Explanation `json:"explanation,omitempty"`
ResultType string `json:"resultType"`
Data *StringSample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
Analysis *Analysis `json:"analysis,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetStringSample(),
Stats: s.Stats,
Explanation: s.Explanation,
ResultType: s.ResultType,
Data: s.Result.GetStringSample(),
Stats: s.Stats,
Analysis: s.Analysis,
}
return json.Marshal(res)
default:
Expand Down Expand Up @@ -865,3 +892,47 @@ func init() {
jsoniter.RegisterTypeEncoderFunc("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode, func(unsafe.Pointer) bool { return false })
jsoniter.RegisterTypeDecoderFunc("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode)
}

type Duration time.Duration

func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d).String())
}

func (d *Duration) UnmarshalJSON(b []byte) error {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}
switch value := v.(type) {
case float64:
*d = Duration(time.Duration(value))
return nil
case string:
tmp, err := time.ParseDuration(value)
if err != nil {
return err
}
*d = Duration(tmp)
return nil
default:
return errors.New("invalid duration")
}
}

func (d *Duration) Size() int {
return github_com_gogo_protobuf_types.SizeOfStdDuration(time.Duration(*d))
}

func (d *Duration) Unmarshal(b []byte) error {
var td time.Duration
if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(&td, b); err != nil {
return err
}
*d = Duration(td)
return nil
}

func (d *Duration) MarshalTo(b []byte) (int, error) {
return github_com_gogo_protobuf_types.StdDurationMarshalTo(time.Duration(*d), b)
}
Loading

0 comments on commit 67abf60

Please sign in to comment.