From 6b1dc58a7a4c8d1e4c241d23580e8e809732899b Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 23 Apr 2024 08:29:04 +0200 Subject: [PATCH] Show warnings in query frontend (#7289) * Show warnings in query frontend QFE currently does not parse warnings from downstream queriers. This commit fixes that by adding the field to proto messages and modifies the merge function to take warnings into account. Signed-off-by: Filip Petkovski * Add CHANGELOG entry Signed-off-by: Filip Petkovski * Omit empty warnings Signed-off-by: Filip Petkovski --------- Signed-off-by: Filip Petkovski --- CHANGELOG.md | 1 + .../cortex/querier/queryrange/query_range.go | 15 ++- .../querier/queryrange/queryrange.pb.go | 110 ++++++++++++++++++ .../querier/queryrange/queryrange.proto | 2 + .../querier/queryrange/results_cache.go | 7 +- .../querier/queryrange/results_cache_test.go | 3 +- 6 files changed, 130 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4fd823cff..8564bfc778 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7244](https://github.com/thanos-io/thanos/pull/7244) Query: Fix Internal Server Error unknown targetHealth: "unknown" when trying to open the targets page. - [#7248](https://github.com/thanos-io/thanos/pull/7248) Receive: Fix RemoteWriteAsync was sequentially executed causing high latency in the ingestion path. - [#7271](https://github.com/thanos-io/thanos/pull/7271) Query: fixing dedup iterator when working on mixed sample types. +- [#7289](https://github.com/thanos-io/thanos/pull/7289) Query Frontend: show warnings from downstream queries. ### Added diff --git a/internal/cortex/querier/queryrange/query_range.go b/internal/cortex/querier/queryrange/query_range.go index 5084029577..afc1597f31 100644 --- a/internal/cortex/querier/queryrange/query_range.go +++ b/internal/cortex/querier/queryrange/query_range.go @@ -271,13 +271,17 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response // Merge the responses. sort.Sort(byFirstTime(promResponses)) - analyzes := make([]*Analysis, 0, len(responses)) + var ( + analyzes = make([]*Analysis, 0, len(responses)) + warnings []string = nil + ) for i := range promResponses { - if promResponses[i].Data.GetAnalysis() == nil { - continue + if promResponses[i].Data.GetAnalysis() != nil { + analyzes = append(analyzes, promResponses[i].Data.GetAnalysis()) + } + if len(promResponses[i].Warnings) > 0 { + warnings = append(warnings, promResponses[i].Warnings...) } - - analyzes = append(analyzes, promResponses[i].Data.GetAnalysis()) } seriesStatsCounters := make([]*SeriesStatsCounter, 0, len(responses)) @@ -297,6 +301,7 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response Stats: StatsMerge(responses), Analysis: AnalyzesMerge(analyzes...), }, + Warnings: warnings, } if len(seriesStatsCounters) > 0 { response.Data.SeriesStatsCounter = SeriesStatsCounterMerge(seriesStatsCounters...) diff --git a/internal/cortex/querier/queryrange/queryrange.pb.go b/internal/cortex/querier/queryrange/queryrange.pb.go index 098b38fe20..993cf366ae 100644 --- a/internal/cortex/querier/queryrange/queryrange.pb.go +++ b/internal/cortex/querier/queryrange/queryrange.pb.go @@ -259,6 +259,7 @@ type PrometheusResponse struct { ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"` Error string `protobuf:"bytes,4,opt,name=Error,proto3" json:"error,omitempty"` Headers []*PrometheusResponseHeader `protobuf:"bytes,5,rep,name=Headers,proto3" json:"-"` + Warnings []string `protobuf:"bytes,6,rep,name=Warnings,proto3" json:"warnings,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -332,6 +333,13 @@ func (m *PrometheusResponse) GetHeaders() []*PrometheusResponseHeader { return nil } +func (m *PrometheusResponse) GetWarnings() []string { + if m != nil { + return m.Warnings + } + return nil +} + type PrometheusData struct { ResultType string `protobuf:"bytes,1,opt,name=ResultType,proto3" json:"resultType"` Result []SampleStream `protobuf:"bytes,2,rep,name=Result,proto3" json:"result"` @@ -417,6 +425,7 @@ type PrometheusInstantQueryResponse struct { ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"` Error string `protobuf:"bytes,4,opt,name=Error,proto3" json:"error,omitempty"` Headers []*PrometheusResponseHeader `protobuf:"bytes,5,rep,name=Headers,proto3" json:"-"` + Warnings []string `protobuf:"bytes,6,rep,name=Warnings,proto3" json:"warnings,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -490,6 +499,13 @@ func (m *PrometheusInstantQueryResponse) GetHeaders() []*PrometheusResponseHeade return nil } +func (m *PrometheusInstantQueryResponse) GetWarnings() []string { + if m != nil { + return m.Warnings + } + return nil +} + type PrometheusInstantQueryData struct { ResultType string `protobuf:"bytes,1,opt,name=ResultType,proto3" json:"resultType"` Result PrometheusInstantQueryResult `protobuf:"bytes,2,opt,name=Result,proto3" json:"result"` @@ -1999,6 +2015,15 @@ func (m *PrometheusResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if len(m.Warnings) > 0 { + for iNdEx := len(m.Warnings) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Warnings[iNdEx]) + copy(dAtA[i:], m.Warnings[iNdEx]) + i = encodeVarintQueryrange(dAtA, i, uint64(len(m.Warnings[iNdEx]))) + i-- + dAtA[i] = 0x32 + } + } if len(m.Headers) > 0 { for iNdEx := len(m.Headers) - 1; iNdEx >= 0; iNdEx-- { { @@ -2155,6 +2180,15 @@ func (m *PrometheusInstantQueryResponse) MarshalToSizedBuffer(dAtA []byte) (int, i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if len(m.Warnings) > 0 { + for iNdEx := len(m.Warnings) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Warnings[iNdEx]) + copy(dAtA[i:], m.Warnings[iNdEx]) + i = encodeVarintQueryrange(dAtA, i, uint64(len(m.Warnings[iNdEx]))) + i-- + dAtA[i] = 0x32 + } + } if len(m.Headers) > 0 { for iNdEx := len(m.Headers) - 1; iNdEx >= 0; iNdEx-- { { @@ -3342,6 +3376,12 @@ func (m *PrometheusResponse) Size() (n int) { n += 1 + l + sovQueryrange(uint64(l)) } } + if len(m.Warnings) > 0 { + for _, s := range m.Warnings { + l = len(s) + n += 1 + l + sovQueryrange(uint64(l)) + } + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -3408,6 +3448,12 @@ func (m *PrometheusInstantQueryResponse) Size() (n int) { n += 1 + l + sovQueryrange(uint64(l)) } } + if len(m.Warnings) > 0 { + for _, s := range m.Warnings { + l = len(s) + n += 1 + l + sovQueryrange(uint64(l)) + } + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -4605,6 +4651,38 @@ func (m *PrometheusResponse) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Warnings", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Warnings = append(m.Warnings, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipQueryrange(dAtA[iNdEx:]) @@ -5044,6 +5122,38 @@ func (m *PrometheusInstantQueryResponse) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Warnings", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Warnings = append(m.Warnings, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipQueryrange(dAtA[iNdEx:]) diff --git a/internal/cortex/querier/queryrange/queryrange.proto b/internal/cortex/querier/queryrange/queryrange.proto index c6a05f3f52..7d31f91a61 100644 --- a/internal/cortex/querier/queryrange/queryrange.proto +++ b/internal/cortex/querier/queryrange/queryrange.proto @@ -42,6 +42,7 @@ message PrometheusResponse { string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"]; string Error = 4 [(gogoproto.jsontag) = "error,omitempty"]; repeated PrometheusResponseHeader Headers = 5 [(gogoproto.jsontag) = "-"]; + repeated string Warnings = 6 [(gogoproto.jsontag) = "warnings,omitempty"]; } message PrometheusData { @@ -58,6 +59,7 @@ message PrometheusInstantQueryResponse { string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"]; string Error = 4 [(gogoproto.jsontag) = "error,omitempty"]; repeated PrometheusResponseHeader Headers = 5 [(gogoproto.jsontag) = "-"]; + repeated string Warnings = 6 [(gogoproto.jsontag) = "warnings,omitempty"]; } message PrometheusInstantQueryData { diff --git a/internal/cortex/querier/queryrange/results_cache.go b/internal/cortex/querier/queryrange/results_cache.go index 0a0ece3fec..553e0aa214 100644 --- a/internal/cortex/querier/queryrange/results_cache.go +++ b/internal/cortex/querier/queryrange/results_cache.go @@ -99,7 +99,8 @@ func (PrometheusResponseExtractor) Extract(start, end int64, from Response) Resp Stats: extractStats(start, end, promRes.Data.Stats), Analysis: promRes.Data.Analysis, }, - Headers: promRes.Headers, + Headers: promRes.Headers, + Warnings: promRes.Warnings, } } @@ -115,6 +116,7 @@ func (PrometheusResponseExtractor) ResponseWithoutHeaders(resp Response) Respons Stats: promRes.Data.Stats, Analysis: promRes.Data.Analysis, }, + Warnings: promRes.Warnings, } } @@ -129,7 +131,8 @@ func (PrometheusResponseExtractor) ResponseWithoutStats(resp Response) Response Analysis: promRes.Data.Analysis, SeriesStatsCounter: promRes.Data.SeriesStatsCounter, }, - Headers: promRes.Headers, + Headers: promRes.Headers, + Warnings: promRes.Warnings, } } diff --git a/internal/cortex/querier/queryrange/results_cache_test.go b/internal/cortex/querier/queryrange/results_cache_test.go index 346c9e4a25..7527b6ad11 100644 --- a/internal/cortex/querier/queryrange/results_cache_test.go +++ b/internal/cortex/querier/queryrange/results_cache_test.go @@ -24,7 +24,7 @@ import ( const ( query = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120" - responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}],"analysis":null}}` + responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}],"analysis":null},"warnings":["test-warn"]}` histogramResponseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"fake":"histogram"},"histograms":[[1536673680,{"count":"5","sum":"18.4","buckets":[[3,"-0.001","0.001","2"],[0,"0.7071067811865475","1","1"],[0,"1","1.414213562373095","2"],[0,"2","2.82842712474619","1"],[0,"2.82842712474619","4","1"]]}]]}],"analysis":null}}` ) @@ -91,6 +91,7 @@ var ( }, }, }, + Warnings: []string{"test-warn"}, } parsedHistogramResponse = &PrometheusResponse{ Status: "success",