diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f01a3c35b..606bcbdb7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * [CHANGE] Distributor: Apply `max_fetched_series_per_query` limit for `/series` API. #4683 * [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels. * [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction +* [FEATURE] Querier/Query-Frontend: Add `-querier.per-step-stats-enabled` and `-frontend.cache-queryable-samples-stats` configurations to enable query sample statistics ## 1.12.0 in progress diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 25442c6b99..3b519e385c 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -136,6 +136,10 @@ querier: # CLI flag: -querier.at-modifier-enabled [at_modifier_enabled: | default = false] + # Enable returning samples stats per steps in query response. + # CLI flag: -querier.per-step-stats-enabled + [per_step_stats_enabled: | default = false] + # The time after which a metric should be queried from storage and not just # ingesters. 0 means all queries are sent to store. When running the blocks # storage, if this option is enabled, the time range of the query sent to the diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index e6f3c5ea7a..21fc13b256 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -889,6 +889,10 @@ The `querier_config` configures the Cortex querier. # CLI flag: -querier.at-modifier-enabled [at_modifier_enabled: | default = false] +# Enable returning samples stats per steps in query response. +# CLI flag: -querier.per-step-stats-enabled +[per_step_stats_enabled: | default = false] + # The time after which a metric should be queried from storage and not just # ingesters. 0 means all queries are sent to store. When running the blocks # storage, if this option is enabled, the time range of the query sent to the @@ -1153,6 +1157,10 @@ results_cache: # CLI flag: -frontend.compression [compression: | default = ""] + # Cache Statistics queryable samples on results cache. + # CLI flag: -frontend.cache-queryable-samples-stats + [cache_queryable_samples_stats: | default = false] + # Cache query results. # CLI flag: -querier.cache-results [cache_results: | default = false] diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 355bde6128..842534c8d0 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -224,7 +224,7 @@ func (c *Config) Validate(log log.Logger) error { if err := c.Worker.Validate(log); err != nil { return errors.Wrap(err, "invalid frontend_worker config") } - if err := c.QueryRange.Validate(); err != nil { + if err := c.QueryRange.Validate(c.Querier); err != nil { return errors.Wrap(err, "invalid query_range config") } if err := c.TableManager.Validate(); err != nil { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 2568029087..0973f4b5bc 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -523,11 +523,12 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryrange.PrometheusResponseExtractor{}, t.Cfg.Schema, promql.EngineOpts{ - Logger: util_log.Logger, - Reg: prometheus.DefaultRegisterer, - MaxSamples: t.Cfg.Querier.MaxSamples, - Timeout: t.Cfg.Querier.Timeout, - EnableAtModifier: t.Cfg.Querier.AtModifierEnabled, + Logger: util_log.Logger, + Reg: prometheus.DefaultRegisterer, + MaxSamples: t.Cfg.Querier.MaxSamples, + Timeout: t.Cfg.Querier.Timeout, + EnableAtModifier: t.Cfg.Querier.AtModifierEnabled, + EnablePerStepStats: t.Cfg.Querier.EnablePerStepStats, NoStepSubqueryIntervalFn: func(int64) int64 { return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds() }, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 3d6952a72f..7d87219d50 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -45,6 +45,7 @@ type Config struct { QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"` QueryStoreForLabels bool `yaml:"query_store_for_labels_enabled"` AtModifierEnabled bool `yaml:"at_modifier_enabled"` + EnablePerStepStats bool `yaml:"per_step_stats_enabled"` // QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters. QueryStoreAfter time.Duration `yaml:"query_store_after"` @@ -92,6 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") f.BoolVar(&cfg.QueryStoreForLabels, "querier.query-store-for-labels-enabled", false, "Query long-term store for series, label values and label names APIs. Works only with blocks engine.") f.BoolVar(&cfg.AtModifierEnabled, "querier.at-modifier-enabled", false, "Enable the @ modifier in PromQL.") + f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.") f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.") f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.") f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.") @@ -174,6 +176,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor MaxSamples: cfg.MaxSamples, Timeout: cfg.Timeout, LookbackDelta: cfg.LookbackDelta, + EnablePerStepStats: cfg.EnablePerStepStats, EnableAtModifier: cfg.AtModifierEnabled, NoStepSubqueryIntervalFn: func(int64) int64 { return cfg.DefaultEvaluationInterval.Milliseconds() diff --git a/pkg/querier/queryrange/query_range.go b/pkg/querier/queryrange/query_range.go index c0999231c5..ad8ef25982 100644 --- a/pkg/querier/queryrange/query_range.go +++ b/pkg/querier/queryrange/query_range.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "time" + "unsafe" "github.com/gogo/protobuf/proto" "github.com/gogo/status" @@ -88,6 +89,10 @@ type Request interface { proto.Message // LogToSpan writes information about this request to an OpenTracing span LogToSpan(opentracing.Span) + // GetStats returns the stats of the request. + GetStats() string + // WithStats clones the current `PrometheusRequest` with a new stats. + WithStats(stats string) Request } // Response represents a query range response. @@ -114,6 +119,13 @@ func (q *PrometheusRequest) WithQuery(query string) Request { return &new } +// WithStats clones the current `PrometheusRequest` with a new stats. +func (q *PrometheusRequest) WithStats(stats string) Request { + new := *q + new.Stats = stats + return &new +} + // LogToSpan logs the current `PrometheusRequest` parameters to the specified span. func (q *PrometheusRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( @@ -174,6 +186,7 @@ func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) { Data: PrometheusData{ ResultType: model.ValMatrix.String(), Result: matrixMerge(promResponses), + Stats: statsMerge(promResponses), }, } @@ -220,6 +233,7 @@ func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forward } result.Query = r.FormValue("query") + result.Stats = r.FormValue("stats") result.Path = r.URL.Path // Include the specified headers from http request in prometheusRequest. @@ -252,6 +266,7 @@ func (prometheusCodec) EncodeRequest(ctx context.Context, r Request) (*http.Requ "end": []string{encodeTime(promReq.End)}, "step": []string{encodeDurationMs(promReq.Step)}, "query": []string{promReq.Query}, + "stats": []string{promReq.Stats}, } u := &url.URL{ Path: promReq.Path, @@ -380,6 +395,46 @@ func (s *SampleStream) MarshalJSON() ([]byte, error) { return json.Marshal(stream) } +// statsMerge merge the stats from 2 responses +// this function is similar to matrixMerge +func statsMerge(resps []*PrometheusResponse) *PrometheusResponseStats { + output := map[int64]*PrometheusResponseQueryableSamplesStatsPerStep{} + hasStats := false + for _, resp := range resps { + if resp.Data.Stats == nil { + continue + } + + hasStats = true + if resp.Data.Stats.Samples == nil { + continue + } + + for _, s := range resp.Data.Stats.Samples.TotalQueryableSamplesPerStep { + output[s.GetTimestampMs()] = s + } + } + + if !hasStats { + return nil + } + + keys := make([]int64, 0, len(output)) + for key := range output { + keys = append(keys, key) + } + + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + + result := &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}} + for _, key := range keys { + result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, output[key]) + result.Samples.TotalQueryableSamples += output[key].Value + } + + return result +} + func matrixMerge(resps []*PrometheusResponse) []SampleStream { output := map[string]*SampleStream{} for _, resp := range resps { @@ -473,3 +528,41 @@ func decorateWithParamName(err error, field string) error { } return fmt.Errorf(errTmpl, field, err) } + +func PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode(ptr unsafe.Pointer, iter *jsoniter.Iterator) { + if !iter.ReadArray() { + iter.ReportError("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", "expected [") + return + } + + t := model.Time(iter.ReadFloat64() * float64(time.Second/time.Millisecond)) + + if !iter.ReadArray() { + iter.ReportError("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", "expected ,") + return + } + v := iter.ReadInt64() + + if iter.ReadArray() { + iter.ReportError("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", "expected ]") + } + + *(*PrometheusResponseQueryableSamplesStatsPerStep)(ptr) = PrometheusResponseQueryableSamplesStatsPerStep{ + TimestampMs: int64(t), + Value: v, + } +} + +func PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode(ptr unsafe.Pointer, stream *jsoniter.Stream) { + stats := (*PrometheusResponseQueryableSamplesStatsPerStep)(ptr) + stream.WriteArrayStart() + stream.WriteFloat64(float64(stats.TimestampMs) / float64(time.Second/time.Millisecond)) + stream.WriteMore() + stream.WriteInt64(stats.Value) + stream.WriteArrayEnd() +} + +func init() { + jsoniter.RegisterTypeEncoderFunc("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode, func(unsafe.Pointer) bool { return false }) + jsoniter.RegisterTypeDecoderFunc("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode) +} diff --git a/pkg/querier/queryrange/query_range_test.go b/pkg/querier/queryrange/query_range_test.go index 65c8b0e8e7..60751f3965 100644 --- a/pkg/querier/queryrange/query_range_test.go +++ b/pkg/querier/queryrange/query_range_test.go @@ -8,6 +8,8 @@ import ( "strconv" "testing" + "github.com/prometheus/common/model" + jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -22,7 +24,7 @@ func TestRequest(t *testing.T) { // The test below adds a Test-Header header to the request and expects it back once the encode/decode of request is done via PrometheusCodec parsedRequestWithHeaders := *parsedRequest parsedRequestWithHeaders.Headers = reqHeaders - for i, tc := range []struct { + for _, tc := range []struct { url string expected Request expectedErr error @@ -32,7 +34,7 @@ func TestRequest(t *testing.T) { expected: &parsedRequestWithHeaders, }, { - url: "api/v1/query_range?start=foo", + url: "api/v1/query_range?start=foo&stats=all", expectedErr: httpgrpc.Errorf(http.StatusBadRequest, "invalid parameter \"start\"; cannot parse \"foo\" to a valid timestamp"), }, { @@ -56,7 +58,7 @@ func TestRequest(t *testing.T) { expectedErr: errStepTooSmall, }, } { - t.Run(strconv.Itoa(i), func(t *testing.T) { + t.Run(tc.url, func(t *testing.T) { r, err := http.NewRequest("GET", tc.url, nil) require.NoError(t, err) r.Header.Add("Test-Header", "test") @@ -116,6 +118,66 @@ func TestResponse(t *testing.T) { } } +func TestResponseWithStats(t *testing.T) { + for i, tc := range []struct { + body string + expected *PrometheusResponse + }{ + { + body: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]]}}}}`, + expected: &PrometheusResponse{ + Status: "success", + Data: PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: []SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 137, TimestampMs: 1536673680000}, + {Value: 137, TimestampMs: 1536673780000}, + }, + }, + }, + Stats: &PrometheusResponseStats{ + Samples: &PrometheusResponseSamplesStats{ + TotalQueryableSamples: 10, + TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 5, TimestampMs: 1536673680000}, + {Value: 5, TimestampMs: 1536673780000}, + }, + }, + }, + }, + }, + }, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + tc.expected.Headers = respHeaders + response := &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: ioutil.NopCloser(bytes.NewBuffer([]byte(tc.body))), + } + resp, err := PrometheusCodec.DecodeResponse(context.Background(), response, nil) + require.NoError(t, err) + assert.Equal(t, tc.expected, resp) + + // Reset response, as the above call will have consumed the body reader. + response = &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: ioutil.NopCloser(bytes.NewBuffer([]byte(tc.body))), + ContentLength: int64(len(tc.body)), + } + resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp) + require.NoError(t, err) + assert.Equal(t, response, resp2) + }) + } +} + func TestMergeAPIResponses(t *testing.T) { for _, tc := range []struct { name string @@ -325,6 +387,267 @@ func TestMergeAPIResponses(t *testing.T) { }, }, }, + }, + { + name: "[stats] A single empty response shouldn't panic.", + input: []Response{ + &PrometheusResponse{ + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{}, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}, + }, + }, + }, + expected: &PrometheusResponse{ + Status: StatusSuccess, + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{}, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}, + }, + }, + }, + + { + name: "[stats] Multiple empty responses shouldn't panic.", + input: []Response{ + &PrometheusResponse{ + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{}, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}, + }, + }, + &PrometheusResponse{ + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{}, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}, + }, + }, + }, + expected: &PrometheusResponse{ + Status: StatusSuccess, + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{}, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}, + }, + }, + }, + + { + name: "[stats] Basic merging of two responses.", + input: []Response{ + &PrometheusResponse{ + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{ + { + Labels: []cortexpb.LabelAdapter{}, + Samples: []cortexpb.Sample{ + {Value: 0, TimestampMs: 0}, + {Value: 1, TimestampMs: 1}, + }, + }, + }, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{ + TotalQueryableSamples: 20, + TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 5, TimestampMs: 0}, + {Value: 15, TimestampMs: 1}, + }, + }}, + }, + }, + &PrometheusResponse{ + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{ + { + Labels: []cortexpb.LabelAdapter{}, + Samples: []cortexpb.Sample{ + {Value: 2, TimestampMs: 2}, + {Value: 3, TimestampMs: 3}, + }, + }, + }, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{ + TotalQueryableSamples: 10, + TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 5, TimestampMs: 2}, + {Value: 5, TimestampMs: 3}, + }, + }}, + }, + }, + }, + expected: &PrometheusResponse{ + Status: StatusSuccess, + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{ + { + Labels: []cortexpb.LabelAdapter{}, + Samples: []cortexpb.Sample{ + {Value: 0, TimestampMs: 0}, + {Value: 1, TimestampMs: 1}, + {Value: 2, TimestampMs: 2}, + {Value: 3, TimestampMs: 3}, + }, + }, + }, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{ + TotalQueryableSamples: 30, + TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 5, TimestampMs: 0}, + {Value: 15, TimestampMs: 1}, + {Value: 5, TimestampMs: 2}, + {Value: 5, TimestampMs: 3}, + }, + }}, + }, + }, + }, + { + name: "[stats] Merging of samples where there is single overlap.", + input: []Response{ + mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"],[2,"2"]]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,5],[2,5]]}}}}`), + mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[2,"2"],[3,"3"]]}],"stats":{"samples":{"totalQueryableSamples":20,"totalQueryableSamplesPerStep":[[2,5],[3,15]]}}}}`), + }, + expected: &PrometheusResponse{ + Status: StatusSuccess, + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{ + { + Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + {Value: 2, TimestampMs: 2000}, + {Value: 3, TimestampMs: 3000}, + }, + }, + }, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{ + TotalQueryableSamples: 25, + TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 5, TimestampMs: 1000}, + {Value: 5, TimestampMs: 2000}, + {Value: 15, TimestampMs: 3000}, + }, + }}, + }, + }, + }, + { + name: "[stats] Merging of multiple responses with some overlap.", + input: []Response{ + mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[3,"3"],[4,"4"],[5,"5"]]}],"stats":{"samples":{"totalQueryableSamples":12,"totalQueryableSamplesPerStep":[[3,3],[4,4],[5,5]]}}}}`), + mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"],[2,"2"],[3,"3"],[4,"4"]]}],"stats":{"samples":{"totalQueryableSamples":6,"totalQueryableSamplesPerStep":[[1,1],[2,2],[3,3],[4,4]]}}}}`), + mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[5,"5"],[6,"6"],[7,"7"]]}],"stats":{"samples":{"totalQueryableSamples":18,"totalQueryableSamplesPerStep":[[5,5],[6,6],[7,7]]}}}}`), + }, + expected: &PrometheusResponse{ + Status: StatusSuccess, + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{ + { + Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + {Value: 2, TimestampMs: 2000}, + {Value: 3, TimestampMs: 3000}, + {Value: 4, TimestampMs: 4000}, + {Value: 5, TimestampMs: 5000}, + {Value: 6, TimestampMs: 6000}, + {Value: 7, TimestampMs: 7000}, + }, + }, + }, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{ + TotalQueryableSamples: 28, + TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 1, TimestampMs: 1000}, + {Value: 2, TimestampMs: 2000}, + {Value: 3, TimestampMs: 3000}, + {Value: 4, TimestampMs: 4000}, + {Value: 5, TimestampMs: 5000}, + {Value: 6, TimestampMs: 6000}, + {Value: 7, TimestampMs: 7000}, + }, + }}, + }, + }, + }, + { + name: "[stats] Merging of samples where there is multiple partial overlaps.", + input: []Response{ + mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"],[2,"2"],[3,"3"]]}],"stats":{"samples":{"totalQueryableSamples":6,"totalQueryableSamplesPerStep":[[1,1],[2,2],[3,3]]}}}}`), + mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"],[4,"4"],[5,"5"]]}],"stats":{"samples":{"totalQueryableSamples":20,"totalQueryableSamplesPerStep":[[2,2],[3,3],[4,4],[5,5]]}}}}`), + }, + expected: &PrometheusResponse{ + Status: StatusSuccess, + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{ + { + Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + {Value: 2, TimestampMs: 2000}, + {Value: 3, TimestampMs: 3000}, + {Value: 4, TimestampMs: 4000}, + {Value: 5, TimestampMs: 5000}, + }, + }, + }, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{ + TotalQueryableSamples: 15, + TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 1, TimestampMs: 1000}, + {Value: 2, TimestampMs: 2000}, + {Value: 3, TimestampMs: 3000}, + {Value: 4, TimestampMs: 4000}, + {Value: 5, TimestampMs: 5000}, + }, + }}, + }, + }, + }, + { + name: "[stats] Merging of samples where there is complete overlap.", + input: []Response{ + mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[2,"2"],[3,"3"]]}],"stats":{"samples":{"totalQueryableSamples":20,"totalQueryableSamplesPerStep":[[2,2],[3,3]]}}}}`), + mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"],[4,"4"],[5,"5"]]}],"stats":{"samples":{"totalQueryableSamples":20,"totalQueryableSamplesPerStep":[[2,2],[3,3],[4,4],[5,5]]}}}}`), + }, + expected: &PrometheusResponse{ + Status: StatusSuccess, + Data: PrometheusData{ + ResultType: matrix, + Result: []SampleStream{ + { + Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, + Samples: []cortexpb.Sample{ + {Value: 2, TimestampMs: 2000}, + {Value: 3, TimestampMs: 3000}, + {Value: 4, TimestampMs: 4000}, + {Value: 5, TimestampMs: 5000}, + }, + }, + }, + Stats: &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{ + TotalQueryableSamples: 14, + TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{ + {Value: 2, TimestampMs: 2000}, + {Value: 3, TimestampMs: 3000}, + {Value: 4, TimestampMs: 4000}, + {Value: 5, TimestampMs: 5000}, + }, + }}, + }, + }, }} { t.Run(tc.name, func(t *testing.T) { output, err := PrometheusCodec.MergeResponse(tc.input...) diff --git a/pkg/querier/queryrange/queryrange.pb.go b/pkg/querier/queryrange/queryrange.pb.go index ea8157140a..cc2db50230 100644 --- a/pkg/querier/queryrange/queryrange.pb.go +++ b/pkg/querier/queryrange/queryrange.pb.go @@ -92,6 +92,7 @@ type PrometheusRequest struct { Query string `protobuf:"bytes,6,opt,name=query,proto3" json:"query,omitempty"` CachingOptions CachingOptions `protobuf:"bytes,7,opt,name=cachingOptions,proto3" json:"cachingOptions"` Headers []*PrometheusRequestHeader `protobuf:"bytes,8,rep,name=Headers,proto3" json:"-"` + Stats string `protobuf:"bytes,9,opt,name=stats,proto3" json:"stats,omitempty"` } func (m *PrometheusRequest) Reset() { *m = PrometheusRequest{} } @@ -182,6 +183,13 @@ func (m *PrometheusRequest) GetHeaders() []*PrometheusRequestHeader { return nil } +func (m *PrometheusRequest) GetStats() string { + if m != nil { + return m.Stats + } + return "" +} + type PrometheusResponseHeader struct { Name string `protobuf:"bytes,1,opt,name=Name,proto3" json:"-"` Values []string `protobuf:"bytes,2,rep,name=Values,proto3" json:"-"` @@ -309,8 +317,9 @@ func (m *PrometheusResponse) GetHeaders() []*PrometheusResponseHeader { } type PrometheusData struct { - ResultType string `protobuf:"bytes,1,opt,name=ResultType,proto3" json:"resultType"` - Result []SampleStream `protobuf:"bytes,2,rep,name=Result,proto3" json:"result"` + ResultType string `protobuf:"bytes,1,opt,name=ResultType,proto3" json:"resultType"` + Result []SampleStream `protobuf:"bytes,2,rep,name=Result,proto3" json:"result"` + Stats *PrometheusResponseStats `protobuf:"bytes,3,opt,name=stats,proto3" json:"stats,omitempty"` } func (m *PrometheusData) Reset() { *m = PrometheusData{} } @@ -359,6 +368,160 @@ func (m *PrometheusData) GetResult() []SampleStream { return nil } +func (m *PrometheusData) GetStats() *PrometheusResponseStats { + if m != nil { + return m.Stats + } + return nil +} + +type PrometheusResponseStats struct { + Samples *PrometheusResponseSamplesStats `protobuf:"bytes,1,opt,name=samples,proto3" json:"samples"` +} + +func (m *PrometheusResponseStats) Reset() { *m = PrometheusResponseStats{} } +func (*PrometheusResponseStats) ProtoMessage() {} +func (*PrometheusResponseStats) Descriptor() ([]byte, []int) { + return fileDescriptor_79b02382e213d0b2, []int{5} +} +func (m *PrometheusResponseStats) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PrometheusResponseStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PrometheusResponseStats.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PrometheusResponseStats) XXX_Merge(src proto.Message) { + xxx_messageInfo_PrometheusResponseStats.Merge(m, src) +} +func (m *PrometheusResponseStats) XXX_Size() int { + return m.Size() +} +func (m *PrometheusResponseStats) XXX_DiscardUnknown() { + xxx_messageInfo_PrometheusResponseStats.DiscardUnknown(m) +} + +var xxx_messageInfo_PrometheusResponseStats proto.InternalMessageInfo + +func (m *PrometheusResponseStats) GetSamples() *PrometheusResponseSamplesStats { + if m != nil { + return m.Samples + } + return nil +} + +type PrometheusResponseSamplesStats struct { + TotalQueryableSamples int64 `protobuf:"varint,1,opt,name=totalQueryableSamples,proto3" json:"totalQueryableSamples"` + TotalQueryableSamplesPerStep []*PrometheusResponseQueryableSamplesStatsPerStep `protobuf:"bytes,2,rep,name=totalQueryableSamplesPerStep,proto3" json:"totalQueryableSamplesPerStep"` +} + +func (m *PrometheusResponseSamplesStats) Reset() { *m = PrometheusResponseSamplesStats{} } +func (*PrometheusResponseSamplesStats) ProtoMessage() {} +func (*PrometheusResponseSamplesStats) Descriptor() ([]byte, []int) { + return fileDescriptor_79b02382e213d0b2, []int{6} +} +func (m *PrometheusResponseSamplesStats) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PrometheusResponseSamplesStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PrometheusResponseSamplesStats.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PrometheusResponseSamplesStats) XXX_Merge(src proto.Message) { + xxx_messageInfo_PrometheusResponseSamplesStats.Merge(m, src) +} +func (m *PrometheusResponseSamplesStats) XXX_Size() int { + return m.Size() +} +func (m *PrometheusResponseSamplesStats) XXX_DiscardUnknown() { + xxx_messageInfo_PrometheusResponseSamplesStats.DiscardUnknown(m) +} + +var xxx_messageInfo_PrometheusResponseSamplesStats proto.InternalMessageInfo + +func (m *PrometheusResponseSamplesStats) GetTotalQueryableSamples() int64 { + if m != nil { + return m.TotalQueryableSamples + } + return 0 +} + +func (m *PrometheusResponseSamplesStats) GetTotalQueryableSamplesPerStep() []*PrometheusResponseQueryableSamplesStatsPerStep { + if m != nil { + return m.TotalQueryableSamplesPerStep + } + return nil +} + +type PrometheusResponseQueryableSamplesStatsPerStep struct { + Value int64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"` + TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs,proto3" json:"timestamp_ms,omitempty"` +} + +func (m *PrometheusResponseQueryableSamplesStatsPerStep) Reset() { + *m = PrometheusResponseQueryableSamplesStatsPerStep{} +} +func (*PrometheusResponseQueryableSamplesStatsPerStep) ProtoMessage() {} +func (*PrometheusResponseQueryableSamplesStatsPerStep) Descriptor() ([]byte, []int) { + return fileDescriptor_79b02382e213d0b2, []int{7} +} +func (m *PrometheusResponseQueryableSamplesStatsPerStep) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PrometheusResponseQueryableSamplesStatsPerStep) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PrometheusResponseQueryableSamplesStatsPerStep.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PrometheusResponseQueryableSamplesStatsPerStep) XXX_Merge(src proto.Message) { + xxx_messageInfo_PrometheusResponseQueryableSamplesStatsPerStep.Merge(m, src) +} +func (m *PrometheusResponseQueryableSamplesStatsPerStep) XXX_Size() int { + return m.Size() +} +func (m *PrometheusResponseQueryableSamplesStatsPerStep) XXX_DiscardUnknown() { + xxx_messageInfo_PrometheusResponseQueryableSamplesStatsPerStep.DiscardUnknown(m) +} + +var xxx_messageInfo_PrometheusResponseQueryableSamplesStatsPerStep proto.InternalMessageInfo + +func (m *PrometheusResponseQueryableSamplesStatsPerStep) GetValue() int64 { + if m != nil { + return m.Value + } + return 0 +} + +func (m *PrometheusResponseQueryableSamplesStatsPerStep) GetTimestampMs() int64 { + if m != nil { + return m.TimestampMs + } + return 0 +} + type SampleStream struct { Labels []github_com_cortexproject_cortex_pkg_cortexpb.LabelAdapter `protobuf:"bytes,1,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/cortexpb.LabelAdapter" json:"metric"` Samples []cortexpb.Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"values"` @@ -367,7 +530,7 @@ type SampleStream struct { func (m *SampleStream) Reset() { *m = SampleStream{} } func (*SampleStream) ProtoMessage() {} func (*SampleStream) Descriptor() ([]byte, []int) { - return fileDescriptor_79b02382e213d0b2, []int{5} + return fileDescriptor_79b02382e213d0b2, []int{8} } func (m *SampleStream) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -412,7 +575,7 @@ type CachedResponse struct { func (m *CachedResponse) Reset() { *m = CachedResponse{} } func (*CachedResponse) ProtoMessage() {} func (*CachedResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_79b02382e213d0b2, []int{6} + return fileDescriptor_79b02382e213d0b2, []int{9} } func (m *CachedResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -465,7 +628,7 @@ type Extent struct { func (m *Extent) Reset() { *m = Extent{} } func (*Extent) ProtoMessage() {} func (*Extent) Descriptor() ([]byte, []int) { - return fileDescriptor_79b02382e213d0b2, []int{7} + return fileDescriptor_79b02382e213d0b2, []int{10} } func (m *Extent) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -529,7 +692,7 @@ type CachingOptions struct { func (m *CachingOptions) Reset() { *m = CachingOptions{} } func (*CachingOptions) ProtoMessage() {} func (*CachingOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_79b02382e213d0b2, []int{8} + return fileDescriptor_79b02382e213d0b2, []int{11} } func (m *CachingOptions) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -571,6 +734,9 @@ func init() { proto.RegisterType((*PrometheusResponseHeader)(nil), "queryrange.PrometheusResponseHeader") proto.RegisterType((*PrometheusResponse)(nil), "queryrange.PrometheusResponse") proto.RegisterType((*PrometheusData)(nil), "queryrange.PrometheusData") + proto.RegisterType((*PrometheusResponseStats)(nil), "queryrange.PrometheusResponseStats") + proto.RegisterType((*PrometheusResponseSamplesStats)(nil), "queryrange.PrometheusResponseSamplesStats") + proto.RegisterType((*PrometheusResponseQueryableSamplesStatsPerStep)(nil), "queryrange.PrometheusResponseQueryableSamplesStatsPerStep") proto.RegisterType((*SampleStream)(nil), "queryrange.SampleStream") proto.RegisterType((*CachedResponse)(nil), "queryrange.CachedResponse") proto.RegisterType((*Extent)(nil), "queryrange.Extent") @@ -580,61 +746,70 @@ func init() { func init() { proto.RegisterFile("queryrange.proto", fileDescriptor_79b02382e213d0b2) } var fileDescriptor_79b02382e213d0b2 = []byte{ - // 849 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcd, 0x6e, 0x1b, 0x55, - 0x14, 0xf6, 0xc4, 0xf6, 0xd8, 0x3e, 0xa9, 0xdc, 0x70, 0x53, 0xd1, 0x71, 0x24, 0x66, 0x2c, 0xc3, - 0x22, 0x48, 0xed, 0x44, 0x0a, 0x62, 0x01, 0x12, 0xa8, 0x1d, 0x12, 0x54, 0x7e, 0x04, 0xd5, 0x4d, - 0xc5, 0x82, 0x0d, 0xba, 0xf6, 0x1c, 0x9c, 0x69, 0x3d, 0x3f, 0xbd, 0x73, 0x07, 0xc5, 0x3b, 0xc4, - 0x13, 0xb0, 0xe4, 0x11, 0x40, 0xe2, 0x31, 0x58, 0x64, 0x99, 0x65, 0xc5, 0x62, 0x20, 0xce, 0x06, - 0xcd, 0xaa, 0x8f, 0x80, 0xee, 0xcf, 0xd8, 0xd3, 0x44, 0x5d, 0x20, 0x36, 0xd6, 0x39, 0xe7, 0x7e, - 0xdf, 0x77, 0xcf, 0xcf, 0x9d, 0x63, 0xd8, 0x79, 0x5e, 0x20, 0x5f, 0x72, 0x96, 0xcc, 0xd1, 0xcf, - 0x78, 0x2a, 0x52, 0x02, 0x9b, 0xc8, 0xde, 0xfd, 0x79, 0x24, 0x4e, 0x8b, 0xa9, 0x3f, 0x4b, 0xe3, - 0x83, 0x79, 0x3a, 0x4f, 0x0f, 0x14, 0x64, 0x5a, 0x7c, 0xaf, 0x3c, 0xe5, 0x28, 0x4b, 0x53, 0xf7, - 0xdc, 0x79, 0x9a, 0xce, 0x17, 0xb8, 0x41, 0x85, 0x05, 0x67, 0x22, 0x4a, 0x13, 0x73, 0xfe, 0x41, - 0x43, 0x6e, 0x96, 0x72, 0x81, 0x67, 0x19, 0x4f, 0x9f, 0xe2, 0x4c, 0x18, 0xef, 0x20, 0x7b, 0x36, - 0xaf, 0x0f, 0xa6, 0xc6, 0x30, 0xd4, 0xd1, 0x75, 0x69, 0x96, 0x2c, 0xf5, 0xd1, 0xe4, 0x04, 0xee, - 0x3e, 0xe6, 0x69, 0x8c, 0xe2, 0x14, 0x8b, 0x9c, 0xe2, 0xf3, 0x02, 0x73, 0xf1, 0x08, 0x59, 0x88, - 0x9c, 0x8c, 0xa0, 0xf3, 0x15, 0x8b, 0xd1, 0xb1, 0xc6, 0xd6, 0xfe, 0x20, 0xe8, 0x56, 0xa5, 0x67, - 0xdd, 0xa7, 0x2a, 0x44, 0xde, 0x02, 0xfb, 0x1b, 0xb6, 0x28, 0x30, 0x77, 0xb6, 0xc6, 0xed, 0xcd, - 0xa1, 0x09, 0x4e, 0xce, 0xb7, 0xe0, 0x8d, 0x1b, 0xaa, 0x84, 0x40, 0x27, 0x63, 0xe2, 0x54, 0xeb, - 0x51, 0x65, 0x93, 0x3b, 0xd0, 0xcd, 0x05, 0xe3, 0xc2, 0xd9, 0x1a, 0x5b, 0xfb, 0x6d, 0xaa, 0x1d, - 0xb2, 0x03, 0x6d, 0x4c, 0x42, 0xa7, 0xad, 0x62, 0xd2, 0x94, 0xdc, 0x5c, 0x60, 0xe6, 0x74, 0x54, - 0x48, 0xd9, 0xe4, 0x23, 0xe8, 0x89, 0x28, 0xc6, 0xb4, 0x10, 0x4e, 0x77, 0x6c, 0xed, 0x6f, 0x1f, - 0x8e, 0x7c, 0x5d, 0xa7, 0x5f, 0xd7, 0xe9, 0x1f, 0x99, 0x16, 0x06, 0xfd, 0xf3, 0xd2, 0x6b, 0xfd, - 0xf2, 0x97, 0x67, 0xd1, 0x9a, 0x23, 0xaf, 0x56, 0xc3, 0x72, 0x6c, 0x95, 0x8f, 0x76, 0xc8, 0x23, - 0x18, 0xce, 0xd8, 0xec, 0x34, 0x4a, 0xe6, 0x5f, 0x67, 0x92, 0x99, 0x3b, 0x3d, 0xa5, 0xbd, 0xe7, - 0x37, 0x66, 0xfd, 0xc9, 0x2b, 0x88, 0xa0, 0x23, 0xc5, 0xe9, 0x35, 0x1e, 0x39, 0x82, 0x9e, 0x6e, - 0x64, 0xee, 0xf4, 0xc7, 0xed, 0xfd, 0xed, 0xc3, 0xb7, 0x9b, 0x12, 0xaf, 0x69, 0x7a, 0xdd, 0xc9, - 0x9a, 0x3a, 0x79, 0x02, 0x4e, 0x13, 0x9a, 0x67, 0x69, 0x92, 0xe3, 0xff, 0x1e, 0xd0, 0x6f, 0x5b, - 0x40, 0x6e, 0xca, 0x92, 0x09, 0xd8, 0x27, 0x82, 0x89, 0x22, 0x37, 0x92, 0x50, 0x95, 0x9e, 0x9d, - 0xab, 0x08, 0x35, 0x27, 0xe4, 0x53, 0xe8, 0x1c, 0x31, 0xc1, 0xd4, 0xc0, 0xae, 0xb5, 0x65, 0xa3, - 0x28, 0x11, 0xc1, 0x9b, 0xb2, 0x2d, 0x55, 0xe9, 0x0d, 0x43, 0x26, 0xd8, 0xbd, 0x34, 0x8e, 0x04, - 0xc6, 0x99, 0x58, 0x52, 0xc5, 0x27, 0xef, 0xc3, 0xe0, 0x98, 0xf3, 0x94, 0x3f, 0x59, 0x66, 0xa8, - 0x26, 0x3d, 0x08, 0xee, 0x56, 0xa5, 0xb7, 0x8b, 0x75, 0xb0, 0xc1, 0xd8, 0x20, 0xc9, 0xbb, 0xd0, - 0x55, 0x8e, 0x7a, 0x09, 0x83, 0x60, 0xb7, 0x2a, 0xbd, 0xdb, 0x8a, 0xd2, 0x80, 0x6b, 0x04, 0x39, - 0xde, 0x0c, 0xa0, 0xab, 0x06, 0xf0, 0xce, 0xeb, 0x06, 0xd0, 0xec, 0xea, 0x8d, 0x09, 0xfc, 0x64, - 0xc1, 0xf0, 0xd5, 0xca, 0x88, 0x0f, 0x40, 0x31, 0x2f, 0x16, 0x42, 0x25, 0xaf, 0x7b, 0x35, 0xac, - 0x4a, 0x0f, 0xf8, 0x3a, 0x4a, 0x1b, 0x08, 0xf2, 0x00, 0x6c, 0xed, 0xa9, 0x69, 0x6c, 0x1f, 0x3a, - 0xcd, 0x44, 0x4e, 0x58, 0x9c, 0x2d, 0xf0, 0x44, 0x70, 0x64, 0x71, 0x30, 0x34, 0x3d, 0xb3, 0xb5, - 0x12, 0x35, 0xbc, 0xc9, 0x1f, 0x16, 0xdc, 0x6a, 0x02, 0xc9, 0x19, 0xd8, 0x0b, 0x36, 0xc5, 0x85, - 0x1c, 0x95, 0x94, 0xdc, 0xf5, 0xeb, 0x4f, 0xdf, 0xff, 0x52, 0xc6, 0x1f, 0xb3, 0x88, 0x07, 0x5f, - 0x48, 0xb5, 0x3f, 0x4b, 0xef, 0x3f, 0xad, 0x0e, 0xcd, 0x7f, 0x18, 0xb2, 0x4c, 0x20, 0x97, 0xa9, - 0xc4, 0x28, 0x78, 0x34, 0xa3, 0xe6, 0x3e, 0xf2, 0x21, 0xf4, 0x72, 0x95, 0x49, 0x6e, 0xaa, 0xd9, - 0xd9, 0x5c, 0xad, 0x53, 0xdc, 0x54, 0xf1, 0x83, 0x7a, 0x6e, 0xb4, 0x26, 0x4c, 0x9e, 0xc2, 0x50, - 0x7e, 0x3b, 0x18, 0xae, 0x9f, 0xdc, 0x08, 0xda, 0xcf, 0x70, 0x69, 0x7a, 0xd8, 0xab, 0x4a, 0x4f, - 0xba, 0x54, 0xfe, 0xc8, 0xef, 0x1b, 0xcf, 0x04, 0x26, 0xa2, 0xbe, 0x88, 0x34, 0xdb, 0x76, 0xac, - 0x8e, 0x82, 0xdb, 0xe6, 0xaa, 0x1a, 0x4a, 0x6b, 0x63, 0xf2, 0xbb, 0x05, 0xb6, 0x06, 0x11, 0xaf, - 0xde, 0x32, 0xf2, 0x9a, 0x76, 0x30, 0xa8, 0x4a, 0x4f, 0x07, 0xea, 0x85, 0x33, 0xd2, 0x0b, 0x47, - 0x2d, 0x21, 0x9d, 0x05, 0x26, 0xa1, 0xde, 0x3c, 0x63, 0xe8, 0x0b, 0xce, 0x66, 0xf8, 0x5d, 0x14, - 0x9a, 0x37, 0x57, 0x3f, 0x10, 0x15, 0xfe, 0x2c, 0x24, 0x1f, 0x43, 0x9f, 0x9b, 0x72, 0xcc, 0x22, - 0xba, 0x73, 0x63, 0x11, 0x3d, 0x4c, 0x96, 0xc1, 0xad, 0xaa, 0xf4, 0xd6, 0x48, 0xba, 0xb6, 0x3e, - 0xef, 0xf4, 0xdb, 0x3b, 0x9d, 0xc9, 0x3d, 0xdd, 0x9a, 0xc6, 0x02, 0xd9, 0x83, 0x7e, 0x18, 0xe5, - 0x6c, 0xba, 0xc0, 0x50, 0x25, 0xde, 0xa7, 0x6b, 0x3f, 0x78, 0x70, 0x71, 0xe9, 0xb6, 0x5e, 0x5c, - 0xba, 0xad, 0x97, 0x97, 0xae, 0xf5, 0xe3, 0xca, 0xb5, 0x7e, 0x5d, 0xb9, 0xd6, 0xf9, 0xca, 0xb5, - 0x2e, 0x56, 0xae, 0xf5, 0xf7, 0xca, 0xb5, 0xfe, 0x59, 0xb9, 0xad, 0x97, 0x2b, 0xd7, 0xfa, 0xf9, - 0xca, 0x6d, 0x5d, 0x5c, 0xb9, 0xad, 0x17, 0x57, 0x6e, 0xeb, 0xdb, 0xc6, 0xbf, 0xd3, 0xd4, 0x56, - 0xb9, 0xbd, 0xf7, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xa5, 0x4d, 0x41, 0xea, 0xc4, 0x06, 0x00, - 0x00, + // 1007 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0x4f, 0x6f, 0x1b, 0x55, + 0x10, 0xf7, 0xfa, 0xbf, 0xc7, 0x91, 0x1b, 0x5e, 0x0a, 0x5d, 0x47, 0xb0, 0x6b, 0xb6, 0x1c, 0x02, + 0x6a, 0x1d, 0x29, 0x88, 0x03, 0x95, 0x40, 0xed, 0x92, 0xa0, 0xf2, 0xaf, 0x4d, 0x9f, 0x2b, 0x0e, + 0x5c, 0xaa, 0x67, 0xef, 0xc3, 0xd9, 0xd6, 0xeb, 0xdd, 0xbe, 0x7d, 0x8b, 0xe2, 0x1b, 0x1f, 0x81, + 0x03, 0x07, 0x8e, 0x1c, 0x41, 0xe2, 0x63, 0x70, 0xe8, 0x31, 0xe2, 0x54, 0x71, 0x58, 0x88, 0x73, + 0x41, 0x7b, 0xea, 0x47, 0x40, 0xef, 0xcf, 0xda, 0x1b, 0x27, 0x35, 0xaa, 0xb8, 0x58, 0x6f, 0xe6, + 0xcd, 0x6f, 0x66, 0xde, 0x6f, 0x66, 0x67, 0x0c, 0x9b, 0x4f, 0x13, 0xca, 0x66, 0x8c, 0x4c, 0xc7, + 0xb4, 0x1f, 0xb1, 0x90, 0x87, 0x08, 0x96, 0x9a, 0xed, 0x9b, 0x63, 0x9f, 0x1f, 0x25, 0xc3, 0xfe, + 0x28, 0x0c, 0x76, 0xc7, 0xe1, 0x38, 0xdc, 0x95, 0x26, 0xc3, 0xe4, 0x5b, 0x29, 0x49, 0x41, 0x9e, + 0x14, 0x74, 0xdb, 0x1a, 0x87, 0xe1, 0x78, 0x42, 0x97, 0x56, 0x5e, 0xc2, 0x08, 0xf7, 0xc3, 0xa9, + 0xbe, 0xff, 0xb0, 0xe0, 0x6e, 0x14, 0x32, 0x4e, 0x8f, 0x23, 0x16, 0x3e, 0xa6, 0x23, 0xae, 0xa5, + 0xdd, 0xe8, 0xc9, 0x38, 0xbf, 0x18, 0xea, 0x83, 0x86, 0x76, 0x57, 0x5d, 0x93, 0xe9, 0x4c, 0x5d, + 0x39, 0x03, 0xb8, 0x76, 0xc8, 0xc2, 0x80, 0xf2, 0x23, 0x9a, 0xc4, 0x98, 0x3e, 0x4d, 0x68, 0xcc, + 0xef, 0x52, 0xe2, 0x51, 0x86, 0xba, 0x50, 0xbd, 0x47, 0x02, 0x6a, 0x1a, 0x3d, 0x63, 0xa7, 0xe5, + 0xd6, 0xb2, 0xd4, 0x36, 0x6e, 0x62, 0xa9, 0x42, 0x6f, 0x41, 0xfd, 0x6b, 0x32, 0x49, 0x68, 0x6c, + 0x96, 0x7b, 0x95, 0xe5, 0xa5, 0x56, 0x3a, 0x69, 0x19, 0x5e, 0xbb, 0xe0, 0x15, 0x21, 0xa8, 0x46, + 0x84, 0x1f, 0x29, 0x7f, 0x58, 0x9e, 0xd1, 0x55, 0xa8, 0xc5, 0x9c, 0x30, 0x6e, 0x96, 0x7b, 0xc6, + 0x4e, 0x05, 0x2b, 0x01, 0x6d, 0x42, 0x85, 0x4e, 0x3d, 0xb3, 0x22, 0x75, 0xe2, 0x28, 0xb0, 0x31, + 0xa7, 0x91, 0x59, 0x95, 0x2a, 0x79, 0x46, 0x1f, 0x41, 0x83, 0xfb, 0x01, 0x0d, 0x13, 0x6e, 0xd6, + 0x7a, 0xc6, 0x4e, 0x7b, 0xaf, 0xdb, 0x57, 0xef, 0xec, 0xe7, 0xef, 0xec, 0xef, 0x6b, 0x0a, 0xdd, + 0xe6, 0xb3, 0xd4, 0x2e, 0xfd, 0xf4, 0x97, 0x6d, 0xe0, 0x1c, 0x23, 0x42, 0xcb, 0x62, 0x99, 0x75, + 0x99, 0x8f, 0x12, 0xd0, 0x5d, 0xe8, 0x8c, 0xc8, 0xe8, 0xc8, 0x9f, 0x8e, 0xef, 0x47, 0x02, 0x19, + 0x9b, 0x0d, 0xe9, 0x7b, 0xbb, 0x5f, 0xa8, 0xf5, 0x27, 0xe7, 0x2c, 0xdc, 0xaa, 0x70, 0x8e, 0x57, + 0x70, 0x68, 0x1f, 0x1a, 0x8a, 0xc8, 0xd8, 0x6c, 0xf6, 0x2a, 0x3b, 0xed, 0xbd, 0xeb, 0x45, 0x17, + 0x2f, 0x21, 0x3d, 0x67, 0x32, 0x87, 0x6a, 0x82, 0x78, 0x6c, 0xb6, 0x54, 0x96, 0x52, 0x70, 0x1e, + 0x82, 0x59, 0x74, 0x10, 0x47, 0xe1, 0x34, 0xa6, 0xff, 0xbb, 0x6c, 0xbf, 0x96, 0x01, 0x5d, 0x74, + 0x8b, 0x1c, 0xa8, 0x0f, 0x38, 0xe1, 0x49, 0xac, 0x5d, 0x42, 0x96, 0xda, 0xf5, 0x58, 0x6a, 0xb0, + 0xbe, 0x41, 0x9f, 0x42, 0x75, 0x9f, 0x70, 0x22, 0xcb, 0xb8, 0x42, 0xd6, 0xd2, 0xa3, 0xb0, 0x70, + 0xdf, 0x10, 0x64, 0x65, 0xa9, 0xdd, 0xf1, 0x08, 0x27, 0x37, 0xc2, 0xc0, 0xe7, 0x34, 0x88, 0xf8, + 0x0c, 0x4b, 0x3c, 0xfa, 0x00, 0x5a, 0x07, 0x8c, 0x85, 0xec, 0xe1, 0x2c, 0xa2, 0xb2, 0xfe, 0x2d, + 0xf7, 0x5a, 0x96, 0xda, 0x5b, 0x34, 0x57, 0x16, 0x10, 0x4b, 0x4b, 0xf4, 0x2e, 0xd4, 0xa4, 0x20, + 0xfb, 0xa3, 0xe5, 0x6e, 0x65, 0xa9, 0x7d, 0x45, 0x42, 0x0a, 0xe6, 0xca, 0x02, 0x1d, 0x2c, 0xcb, + 0x52, 0x93, 0x65, 0x79, 0xe7, 0x65, 0x65, 0x29, 0xb2, 0xba, 0x5a, 0x17, 0xe7, 0x0f, 0x03, 0x3a, + 0xe7, 0x5f, 0x86, 0xfa, 0x00, 0x98, 0xc6, 0xc9, 0x84, 0xcb, 0xe4, 0x15, 0x57, 0x9d, 0x2c, 0xb5, + 0x81, 0x2d, 0xb4, 0xb8, 0x60, 0x81, 0x6e, 0x43, 0x5d, 0x49, 0xb2, 0x1a, 0xed, 0x3d, 0xb3, 0x98, + 0xc8, 0x80, 0x04, 0xd1, 0x84, 0x0e, 0x38, 0xa3, 0x24, 0x70, 0x3b, 0x9a, 0xb3, 0xba, 0xf2, 0x84, + 0x35, 0x0e, 0xdd, 0xcb, 0x9b, 0xa3, 0x22, 0x69, 0xbf, 0xbe, 0xfe, 0x25, 0xa2, 0x54, 0xb1, 0xe2, + 0x46, 0xa2, 0x8a, 0xdc, 0xa8, 0xb6, 0x9a, 0x9c, 0x1f, 0x06, 0x05, 0x18, 0x7a, 0x00, 0x8d, 0x58, + 0xa6, 0xa4, 0xba, 0xa0, 0xbd, 0xf7, 0xde, 0x7f, 0x04, 0x53, 0xc6, 0x2a, 0x66, 0x3b, 0x4b, 0xed, + 0x1c, 0x8e, 0xf3, 0x83, 0xf3, 0x63, 0x19, 0xac, 0xf5, 0x40, 0x74, 0x1f, 0x5e, 0xe7, 0x21, 0x27, + 0x93, 0x07, 0x22, 0x14, 0x19, 0x4e, 0xf2, 0x5b, 0x99, 0x43, 0xc5, 0xed, 0x66, 0xa9, 0x7d, 0xb9, + 0x01, 0xbe, 0x5c, 0x8d, 0x7e, 0x36, 0xe0, 0xcd, 0x4b, 0x6f, 0x0e, 0x29, 0x1b, 0x88, 0x01, 0xa3, + 0x4a, 0x71, 0x6b, 0xfd, 0xe3, 0x56, 0xc1, 0x32, 0x59, 0xed, 0xc1, 0xed, 0x65, 0xa9, 0xbd, 0x36, + 0x06, 0x5e, 0x7b, 0xeb, 0xf8, 0xf0, 0x8a, 0x11, 0xc5, 0x8c, 0xf8, 0x4e, 0x7c, 0xc1, 0x8a, 0x15, + 0xac, 0x04, 0xf4, 0x36, 0x6c, 0x88, 0x51, 0x17, 0x73, 0x12, 0x44, 0x8f, 0x82, 0x58, 0x4f, 0xd8, + 0xf6, 0x42, 0xf7, 0x55, 0xec, 0xfc, 0x6e, 0xc0, 0x46, 0xb1, 0xd1, 0xd0, 0x31, 0xd4, 0x27, 0x64, + 0x48, 0x27, 0x82, 0x60, 0xc1, 0xc3, 0x56, 0x3f, 0x5f, 0x28, 0xfd, 0x2f, 0x85, 0xfe, 0x90, 0xf8, + 0xcc, 0xfd, 0x42, 0x74, 0xe3, 0x9f, 0xa9, 0xfd, 0x4a, 0x0b, 0x49, 0xe1, 0xef, 0x78, 0x24, 0xe2, + 0x94, 0x89, 0x56, 0x0e, 0x28, 0x67, 0xfe, 0x08, 0xeb, 0x78, 0xe8, 0xd6, 0xb2, 0xbf, 0x54, 0x09, + 0x36, 0x97, 0xa1, 0x55, 0x8a, 0xcb, 0xaf, 0x40, 0xbe, 0xaf, 0xd0, 0x48, 0x8f, 0xa1, 0x23, 0x26, + 0x32, 0xf5, 0x16, 0x23, 0xab, 0x0b, 0x95, 0x27, 0x74, 0xa6, 0xbf, 0xc1, 0x46, 0x96, 0xda, 0x42, + 0xc4, 0xe2, 0x47, 0x6c, 0x0d, 0x7a, 0xcc, 0xe9, 0x94, 0xe7, 0x81, 0x50, 0xb1, 0xd6, 0x07, 0xf2, + 0xca, 0xbd, 0xa2, 0x43, 0xe5, 0xa6, 0x38, 0x3f, 0x38, 0xbf, 0x19, 0x50, 0x57, 0x46, 0xc8, 0xce, + 0x77, 0x97, 0x6a, 0xc6, 0x56, 0x96, 0xda, 0x4a, 0x91, 0xaf, 0xb1, 0xae, 0x5a, 0x63, 0x92, 0x78, + 0x95, 0x05, 0x9d, 0x7a, 0x6a, 0x9f, 0xf5, 0xa0, 0xc9, 0x19, 0x19, 0xd1, 0x47, 0xbe, 0xa7, 0x67, + 0x56, 0x3e, 0x60, 0xa4, 0xfa, 0x33, 0x0f, 0x7d, 0x0c, 0x4d, 0xa6, 0x9f, 0xa3, 0xd7, 0xdb, 0xd5, + 0x0b, 0xeb, 0xed, 0xce, 0x74, 0xe6, 0x6e, 0x64, 0xa9, 0xbd, 0xb0, 0xc4, 0x8b, 0xd3, 0xe7, 0xd5, + 0x66, 0x65, 0xb3, 0xea, 0xdc, 0x50, 0xd4, 0x14, 0xd6, 0xd2, 0x36, 0x34, 0x3d, 0x3f, 0x16, 0xad, + 0xe4, 0xc9, 0xc4, 0x9b, 0x78, 0x21, 0xbb, 0xb7, 0x4f, 0x4e, 0xad, 0xd2, 0xf3, 0x53, 0xab, 0xf4, + 0xe2, 0xd4, 0x32, 0xbe, 0x9f, 0x5b, 0xc6, 0x2f, 0x73, 0xcb, 0x78, 0x36, 0xb7, 0x8c, 0x93, 0xb9, + 0x65, 0xfc, 0x3d, 0xb7, 0x8c, 0x7f, 0xe6, 0x56, 0xe9, 0xc5, 0xdc, 0x32, 0x7e, 0x38, 0xb3, 0x4a, + 0x27, 0x67, 0x56, 0xe9, 0xf9, 0x99, 0x55, 0xfa, 0xa6, 0xf0, 0x9f, 0x67, 0x58, 0x97, 0xb9, 0xbd, + 0xff, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe8, 0x24, 0x32, 0x4a, 0x1a, 0x09, 0x00, 0x00, } func (this *PrometheusRequestHeader) Equal(that interface{}) bool { @@ -717,6 +892,9 @@ func (this *PrometheusRequest) Equal(that interface{}) bool { return false } } + if this.Stats != that1.Stats { + return false + } return true } func (this *PrometheusResponseHeader) Equal(that interface{}) bool { @@ -822,6 +1000,92 @@ func (this *PrometheusData) Equal(that interface{}) bool { return false } } + if !this.Stats.Equal(that1.Stats) { + return false + } + return true +} +func (this *PrometheusResponseStats) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*PrometheusResponseStats) + if !ok { + that2, ok := that.(PrometheusResponseStats) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.Samples.Equal(that1.Samples) { + return false + } + return true +} +func (this *PrometheusResponseSamplesStats) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*PrometheusResponseSamplesStats) + if !ok { + that2, ok := that.(PrometheusResponseSamplesStats) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.TotalQueryableSamples != that1.TotalQueryableSamples { + return false + } + if len(this.TotalQueryableSamplesPerStep) != len(that1.TotalQueryableSamplesPerStep) { + return false + } + for i := range this.TotalQueryableSamplesPerStep { + if !this.TotalQueryableSamplesPerStep[i].Equal(that1.TotalQueryableSamplesPerStep[i]) { + return false + } + } + return true +} +func (this *PrometheusResponseQueryableSamplesStatsPerStep) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*PrometheusResponseQueryableSamplesStatsPerStep) + if !ok { + that2, ok := that.(PrometheusResponseQueryableSamplesStatsPerStep) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Value != that1.Value { + return false + } + if this.TimestampMs != that1.TimestampMs { + return false + } return true } func (this *SampleStream) Equal(that interface{}) bool { @@ -965,7 +1229,7 @@ func (this *PrometheusRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 12) + s := make([]string, 0, 13) s = append(s, "&queryrange.PrometheusRequest{") s = append(s, "Path: "+fmt.Sprintf("%#v", this.Path)+",\n") s = append(s, "Start: "+fmt.Sprintf("%#v", this.Start)+",\n") @@ -977,6 +1241,7 @@ func (this *PrometheusRequest) GoString() string { if this.Headers != nil { s = append(s, "Headers: "+fmt.Sprintf("%#v", this.Headers)+",\n") } + s = append(s, "Stats: "+fmt.Sprintf("%#v", this.Stats)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1011,7 +1276,7 @@ func (this *PrometheusData) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&queryrange.PrometheusData{") s = append(s, "ResultType: "+fmt.Sprintf("%#v", this.ResultType)+",\n") if this.Result != nil { @@ -1021,6 +1286,45 @@ func (this *PrometheusData) GoString() string { } s = append(s, "Result: "+fmt.Sprintf("%#v", vs)+",\n") } + if this.Stats != nil { + s = append(s, "Stats: "+fmt.Sprintf("%#v", this.Stats)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *PrometheusResponseStats) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&queryrange.PrometheusResponseStats{") + if this.Samples != nil { + s = append(s, "Samples: "+fmt.Sprintf("%#v", this.Samples)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *PrometheusResponseSamplesStats) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&queryrange.PrometheusResponseSamplesStats{") + s = append(s, "TotalQueryableSamples: "+fmt.Sprintf("%#v", this.TotalQueryableSamples)+",\n") + if this.TotalQueryableSamplesPerStep != nil { + s = append(s, "TotalQueryableSamplesPerStep: "+fmt.Sprintf("%#v", this.TotalQueryableSamplesPerStep)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *PrometheusResponseQueryableSamplesStatsPerStep) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&queryrange.PrometheusResponseQueryableSamplesStatsPerStep{") + s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") + s = append(s, "TimestampMs: "+fmt.Sprintf("%#v", this.TimestampMs)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1150,6 +1454,13 @@ func (m *PrometheusRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.Stats) > 0 { + i -= len(m.Stats) + copy(dAtA[i:], m.Stats) + i = encodeVarintQueryrange(dAtA, i, uint64(len(m.Stats))) + i-- + dAtA[i] = 0x4a + } if len(m.Headers) > 0 { for iNdEx := len(m.Headers) - 1; iNdEx >= 0; iNdEx-- { { @@ -1341,6 +1652,18 @@ func (m *PrometheusData) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Stats != nil { + { + size, err := m.Stats.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQueryrange(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } if len(m.Result) > 0 { for iNdEx := len(m.Result) - 1; iNdEx >= 0; iNdEx-- { { @@ -1365,7 +1688,7 @@ func (m *PrometheusData) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *SampleStream) Marshal() (dAtA []byte, err error) { +func (m *PrometheusResponseStats) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -1375,48 +1698,32 @@ func (m *SampleStream) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *SampleStream) MarshalTo(dAtA []byte) (int, error) { +func (m *PrometheusResponseStats) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *SampleStream) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *PrometheusResponseStats) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Samples) > 0 { - for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintQueryrange(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x12 - } - } - if len(m.Labels) > 0 { - for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { - { - size := m.Labels[iNdEx].Size() - i -= size - if _, err := m.Labels[iNdEx].MarshalTo(dAtA[i:]); err != nil { - return 0, err - } - i = encodeVarintQueryrange(dAtA, i, uint64(size)) + if m.Samples != nil { + { + size, err := m.Samples.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err } - i-- - dAtA[i] = 0xa + i -= size + i = encodeVarintQueryrange(dAtA, i, uint64(size)) } + i-- + dAtA[i] = 0xa } return len(dAtA) - i, nil } -func (m *CachedResponse) Marshal() (dAtA []byte, err error) { +func (m *PrometheusResponseSamplesStats) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -1426,17 +1733,143 @@ func (m *CachedResponse) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *CachedResponse) MarshalTo(dAtA []byte) (int, error) { +func (m *PrometheusResponseSamplesStats) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *CachedResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *PrometheusResponseSamplesStats) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Extents) > 0 { + if len(m.TotalQueryableSamplesPerStep) > 0 { + for iNdEx := len(m.TotalQueryableSamplesPerStep) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.TotalQueryableSamplesPerStep[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQueryrange(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if m.TotalQueryableSamples != 0 { + i = encodeVarintQueryrange(dAtA, i, uint64(m.TotalQueryableSamples)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *PrometheusResponseQueryableSamplesStatsPerStep) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PrometheusResponseQueryableSamplesStatsPerStep) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PrometheusResponseQueryableSamplesStatsPerStep) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.TimestampMs != 0 { + i = encodeVarintQueryrange(dAtA, i, uint64(m.TimestampMs)) + i-- + dAtA[i] = 0x10 + } + if m.Value != 0 { + i = encodeVarintQueryrange(dAtA, i, uint64(m.Value)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *SampleStream) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SampleStream) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SampleStream) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Samples) > 0 { + for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQueryrange(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if len(m.Labels) > 0 { + for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { + { + size := m.Labels[iNdEx].Size() + i -= size + if _, err := m.Labels[iNdEx].MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + i = encodeVarintQueryrange(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *CachedResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CachedResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *CachedResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Extents) > 0 { for iNdEx := len(m.Extents) - 1; iNdEx >= 0; iNdEx-- { { size, err := m.Extents[iNdEx].MarshalToSizedBuffer(dAtA[:i]) @@ -1608,6 +2041,10 @@ func (m *PrometheusRequest) Size() (n int) { n += 1 + l + sovQueryrange(uint64(l)) } } + l = len(m.Stats) + if l > 0 { + n += 1 + l + sovQueryrange(uint64(l)) + } return n } @@ -1675,6 +2112,56 @@ func (m *PrometheusData) Size() (n int) { n += 1 + l + sovQueryrange(uint64(l)) } } + if m.Stats != nil { + l = m.Stats.Size() + n += 1 + l + sovQueryrange(uint64(l)) + } + return n +} + +func (m *PrometheusResponseStats) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Samples != nil { + l = m.Samples.Size() + n += 1 + l + sovQueryrange(uint64(l)) + } + return n +} + +func (m *PrometheusResponseSamplesStats) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.TotalQueryableSamples != 0 { + n += 1 + sovQueryrange(uint64(m.TotalQueryableSamples)) + } + if len(m.TotalQueryableSamplesPerStep) > 0 { + for _, e := range m.TotalQueryableSamplesPerStep { + l = e.Size() + n += 1 + l + sovQueryrange(uint64(l)) + } + } + return n +} + +func (m *PrometheusResponseQueryableSamplesStatsPerStep) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Value != 0 { + n += 1 + sovQueryrange(uint64(m.Value)) + } + if m.TimestampMs != 0 { + n += 1 + sovQueryrange(uint64(m.TimestampMs)) + } return n } @@ -1788,6 +2275,7 @@ func (this *PrometheusRequest) String() string { `Query:` + fmt.Sprintf("%v", this.Query) + `,`, `CachingOptions:` + strings.Replace(strings.Replace(this.CachingOptions.String(), "CachingOptions", "CachingOptions", 1), `&`, ``, 1) + `,`, `Headers:` + repeatedStringForHeaders + `,`, + `Stats:` + fmt.Sprintf("%v", this.Stats) + `,`, `}`, }, "") return s @@ -1834,6 +2322,44 @@ func (this *PrometheusData) String() string { s := strings.Join([]string{`&PrometheusData{`, `ResultType:` + fmt.Sprintf("%v", this.ResultType) + `,`, `Result:` + repeatedStringForResult + `,`, + `Stats:` + strings.Replace(this.Stats.String(), "PrometheusResponseStats", "PrometheusResponseStats", 1) + `,`, + `}`, + }, "") + return s +} +func (this *PrometheusResponseStats) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PrometheusResponseStats{`, + `Samples:` + strings.Replace(this.Samples.String(), "PrometheusResponseSamplesStats", "PrometheusResponseSamplesStats", 1) + `,`, + `}`, + }, "") + return s +} +func (this *PrometheusResponseSamplesStats) String() string { + if this == nil { + return "nil" + } + repeatedStringForTotalQueryableSamplesPerStep := "[]*PrometheusResponseQueryableSamplesStatsPerStep{" + for _, f := range this.TotalQueryableSamplesPerStep { + repeatedStringForTotalQueryableSamplesPerStep += strings.Replace(f.String(), "PrometheusResponseQueryableSamplesStatsPerStep", "PrometheusResponseQueryableSamplesStatsPerStep", 1) + "," + } + repeatedStringForTotalQueryableSamplesPerStep += "}" + s := strings.Join([]string{`&PrometheusResponseSamplesStats{`, + `TotalQueryableSamples:` + fmt.Sprintf("%v", this.TotalQueryableSamples) + `,`, + `TotalQueryableSamplesPerStep:` + repeatedStringForTotalQueryableSamplesPerStep + `,`, + `}`, + }, "") + return s +} +func (this *PrometheusResponseQueryableSamplesStatsPerStep) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PrometheusResponseQueryableSamplesStatsPerStep{`, + `Value:` + fmt.Sprintf("%v", this.Value) + `,`, + `TimestampMs:` + fmt.Sprintf("%v", this.TimestampMs) + `,`, `}`, }, "") return s @@ -2268,6 +2794,38 @@ func (m *PrometheusRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 9: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Stats", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Stats = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipQueryrange(dAtA[iNdEx:]) @@ -2720,6 +3278,328 @@ func (m *PrometheusData) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Stats", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Stats == nil { + m.Stats = &PrometheusResponseStats{} + } + if err := m.Stats.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQueryrange(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueryrange + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueryrange + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PrometheusResponseStats) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PrometheusResponseStats: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PrometheusResponseStats: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Samples", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Samples == nil { + m.Samples = &PrometheusResponseSamplesStats{} + } + if err := m.Samples.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQueryrange(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueryrange + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueryrange + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PrometheusResponseSamplesStats) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PrometheusResponseSamplesStats: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PrometheusResponseSamplesStats: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TotalQueryableSamples", wireType) + } + m.TotalQueryableSamples = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TotalQueryableSamples |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TotalQueryableSamplesPerStep", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TotalQueryableSamplesPerStep = append(m.TotalQueryableSamplesPerStep, &PrometheusResponseQueryableSamplesStatsPerStep{}) + if err := m.TotalQueryableSamplesPerStep[len(m.TotalQueryableSamplesPerStep)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQueryrange(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueryrange + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueryrange + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PrometheusResponseQueryableSamplesStatsPerStep) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PrometheusResponseQueryableSamplesStatsPerStep: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PrometheusResponseQueryableSamplesStatsPerStep: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + m.Value = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Value |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TimestampMs", wireType) + } + m.TimestampMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TimestampMs |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipQueryrange(dAtA[iNdEx:]) diff --git a/pkg/querier/queryrange/queryrange.proto b/pkg/querier/queryrange/queryrange.proto index 1725bdc4ef..ec8b120128 100644 --- a/pkg/querier/queryrange/queryrange.proto +++ b/pkg/querier/queryrange/queryrange.proto @@ -25,6 +25,7 @@ message PrometheusRequest { string query = 6; CachingOptions cachingOptions = 7 [(gogoproto.nullable) = false]; repeated PrometheusRequestHeader Headers = 8 [(gogoproto.jsontag) = "-"]; + string stats = 9; } message PrometheusResponseHeader { @@ -43,6 +44,21 @@ message PrometheusResponse { message PrometheusData { string ResultType = 1 [(gogoproto.jsontag) = "resultType"]; repeated SampleStream Result = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "result"]; + PrometheusResponseStats stats = 3 [(gogoproto.jsontag) = "stats,omitempty"]; +} + +message PrometheusResponseStats { + PrometheusResponseSamplesStats samples = 1 [(gogoproto.jsontag) = "samples"]; +} + +message PrometheusResponseSamplesStats { + int64 totalQueryableSamples = 1 [(gogoproto.jsontag) = "totalQueryableSamples"]; + repeated PrometheusResponseQueryableSamplesStatsPerStep totalQueryableSamplesPerStep = 2 [(gogoproto.jsontag) = "totalQueryableSamplesPerStep"]; +} + +message PrometheusResponseQueryableSamplesStatsPerStep { + int64 value = 1; + int64 timestamp_ms = 2; } message SampleStream { diff --git a/pkg/querier/queryrange/results_cache.go b/pkg/querier/queryrange/results_cache.go index e00339428e..3096300bf7 100644 --- a/pkg/querier/queryrange/results_cache.go +++ b/pkg/querier/queryrange/results_cache.go @@ -26,6 +26,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -47,8 +48,9 @@ type CacheGenNumberLoader interface { // ResultsCacheConfig is the config for the results cache. type ResultsCacheConfig struct { - CacheConfig cache.Config `yaml:"cache"` - Compression string `yaml:"compression"` + CacheConfig cache.Config `yaml:"cache"` + Compression string `yaml:"compression"` + CacheQueryableSamplesStats bool `yaml:"cache_queryable_samples_stats"` } // RegisterFlags registers flags. @@ -56,11 +58,12 @@ func (cfg *ResultsCacheConfig) RegisterFlags(f *flag.FlagSet) { cfg.CacheConfig.RegisterFlagsWithPrefix("frontend.", "", f) f.StringVar(&cfg.Compression, "frontend.compression", "", "Use compression in results cache. Supported values are: 'snappy' and '' (disable compression).") + f.BoolVar(&cfg.CacheQueryableSamplesStats, "frontend.cache-queryable-samples-stats", false, "Cache Statistics queryable samples on results cache.") //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods flagext.DeprecatedFlag(f, "frontend.cache-split-interval", "Deprecated: The maximum interval expected for each request, results will be cached per single interval. This behavior is now determined by querier.split-queries-by-interval.", util_log.Logger) } -func (cfg *ResultsCacheConfig) Validate() error { +func (cfg *ResultsCacheConfig) Validate(qCfg querier.Config) error { switch cfg.Compression { case "snappy", "": // valid @@ -68,6 +71,10 @@ func (cfg *ResultsCacheConfig) Validate() error { return errors.Errorf("unsupported compression type: %s", cfg.Compression) } + if cfg.CacheQueryableSamplesStats && !qCfg.EnablePerStepStats { + return errors.New("frontend.cache-queryable-samples-stats may only be enabled in conjunction with querier.per-step-stats-enabled. Please set the latter") + } + return cfg.CacheConfig.Validate() } @@ -76,6 +83,7 @@ type Extractor interface { // Extract extracts a subset of a response from the `start` and `end` timestamps in milliseconds in the `from` response. Extract(start, end int64, from Response) Response ResponseWithoutHeaders(resp Response) Response + ResponseWithoutStats(resp Response) Response } // PrometheusResponseExtractor helps extracting specific info from Query Response. @@ -89,6 +97,7 @@ func (PrometheusResponseExtractor) Extract(start, end int64, from Response) Resp Data: PrometheusData{ ResultType: promRes.Data.ResultType, Result: extractMatrix(start, end, promRes.Data.Result), + Stats: extractStats(start, end, promRes.Data.Stats), }, Headers: promRes.Headers, } @@ -97,6 +106,19 @@ func (PrometheusResponseExtractor) Extract(start, end int64, from Response) Resp // ResponseWithoutHeaders is useful in caching data without headers since // we anyways do not need headers for sending back the response so this saves some space by reducing size of the objects. func (PrometheusResponseExtractor) ResponseWithoutHeaders(resp Response) Response { + promRes := resp.(*PrometheusResponse) + return &PrometheusResponse{ + Status: StatusSuccess, + Data: PrometheusData{ + ResultType: promRes.Data.ResultType, + Result: promRes.Data.Result, + Stats: promRes.Data.Stats, + }, + } +} + +// ResponseWithoutStats is returns the response without the stats information +func (PrometheusResponseExtractor) ResponseWithoutStats(resp Response) Response { promRes := resp.(*PrometheusResponse) return &PrometheusResponse{ Status: StatusSuccess, @@ -104,6 +126,7 @@ func (PrometheusResponseExtractor) ResponseWithoutHeaders(resp Response) Respons ResultType: promRes.Data.ResultType, Result: promRes.Data.Result, }, + Headers: promRes.Headers, } } @@ -134,11 +157,12 @@ type resultsCache struct { limits Limits splitter CacheSplitter - extractor Extractor - minCacheExtent int64 // discard any cache extent smaller than this - merger Merger - cacheGenNumberLoader CacheGenNumberLoader - shouldCache ShouldCacheFn + extractor Extractor + minCacheExtent int64 // discard any cache extent smaller than this + merger Merger + cacheGenNumberLoader CacheGenNumberLoader + shouldCache ShouldCacheFn + cacheQueryableSamplesStats bool } // NewResultsCacheMiddleware creates results cache middleware from config. @@ -172,27 +196,36 @@ func NewResultsCacheMiddleware( return MiddlewareFunc(func(next Handler) Handler { return &resultsCache{ - logger: logger, - cfg: cfg, - next: next, - cache: c, - limits: limits, - merger: merger, - extractor: extractor, - minCacheExtent: (5 * time.Minute).Milliseconds(), - splitter: splitter, - cacheGenNumberLoader: cacheGenNumberLoader, - shouldCache: shouldCache, + logger: logger, + cfg: cfg, + next: next, + cache: c, + limits: limits, + merger: merger, + extractor: extractor, + minCacheExtent: (5 * time.Minute).Milliseconds(), + splitter: splitter, + cacheGenNumberLoader: cacheGenNumberLoader, + shouldCache: shouldCache, + cacheQueryableSamplesStats: cfg.CacheQueryableSamplesStats, } }), c, nil } func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { tenantIDs, err := tenant.TenantIDs(ctx) + respWithStats := r.GetStats() != "" && s.cacheQueryableSamplesStats if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } + // If cache_queryable_samples_stats is enabled we always need request the status upstream + if s.cacheQueryableSamplesStats { + r = r.WithStats("all") + } else { + r = r.WithStats("") + } + if s.shouldCache != nil && !s.shouldCache(r) { return s.next.Do(ctx, r) } @@ -228,6 +261,9 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { s.put(ctx, key, extents) } + if err == nil && !respWithStats { + response = s.extractor.ResponseWithoutStats(response) + } return response, err } @@ -617,6 +653,23 @@ func jaegerTraceID(ctx context.Context) string { return spanContext.TraceID().String() } +// extractStats returns the stats for a given time range +// this function is similar to extractSampleStream +func extractStats(start, end int64, stats *PrometheusResponseStats) *PrometheusResponseStats { + if stats == nil || stats.Samples == nil { + return stats + } + + result := &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}} + for _, s := range stats.Samples.TotalQueryableSamplesPerStep { + if start <= s.TimestampMs && s.TimestampMs <= end { + result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, s) + result.Samples.TotalQueryableSamples += s.Value + } + } + return result +} + func extractMatrix(start, end int64, matrix []SampleStream) []SampleStream { result := make([]SampleStream, 0, len(matrix)) for _, stream := range matrix { diff --git a/pkg/querier/queryrange/results_cache_test.go b/pkg/querier/queryrange/results_cache_test.go index 4ab4b84571..cf9dfbab47 100644 --- a/pkg/querier/queryrange/results_cache_test.go +++ b/pkg/querier/queryrange/results_cache_test.go @@ -20,7 +20,7 @@ import ( ) const ( - query = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&step=120" + query = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120" responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}` ) @@ -31,6 +31,7 @@ var ( End: 1536716898 * 1e3, Step: 120 * 1e3, Query: "sum(container_memory_rss) by (namespace)", + Stats: "all", } reqHeaders = []*PrometheusRequestHeader{ { @@ -46,6 +47,15 @@ var ( Query: "sum(container_memory_rss) by (namespace)", CachingOptions: CachingOptions{Disabled: true}, } + noCacheRequestWithStats = &PrometheusRequest{ + Path: "/api/v1/query_range", + Start: 1536673680 * 1e3, + End: 1536716898 * 1e3, + Step: 120 * 1e3, + Stats: "all", + Query: "sum(container_memory_rss) by (namespace)", + CachingOptions: CachingOptions{Disabled: true}, + } respHeaders = []*PrometheusResponseHeader{ { Name: "Content-Type", @@ -72,18 +82,36 @@ var ( ) func mkAPIResponse(start, end, step int64) *PrometheusResponse { + return mkAPIResponseWithStats(start, end, step, false) +} + +func mkAPIResponseWithStats(start, end, step int64, withStats bool) *PrometheusResponse { var samples []cortexpb.Sample + var stats *PrometheusResponseStats + if withStats { + stats = &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}} + } for i := start; i <= end; i += step { samples = append(samples, cortexpb.Sample{ TimestampMs: int64(i), Value: float64(i), }) + + if withStats { + stats.Samples.TotalQueryableSamplesPerStep = append(stats.Samples.TotalQueryableSamplesPerStep, &PrometheusResponseQueryableSamplesStatsPerStep{ + TimestampMs: i, + Value: i, + }) + + stats.Samples.TotalQueryableSamples += i + } } return &PrometheusResponse{ Status: StatusSuccess, Data: PrometheusData{ ResultType: matrix, + Stats: stats, Result: []SampleStream{ { Labels: []cortexpb.LabelAdapter{ @@ -96,12 +124,20 @@ func mkAPIResponse(start, end, step int64) *PrometheusResponse { } } +func mkExtentWithStats(start, end int64) Extent { + return mkExtentWithStepWithStats(start, end, 10, true) +} + func mkExtent(start, end int64) Extent { - return mkExtentWithStep(start, end, 10) + return mkExtentWithStepWithStats(start, end, 10, false) } func mkExtentWithStep(start, end, step int64) Extent { - res := mkAPIResponse(start, end, step) + return mkExtentWithStepWithStats(start, end, step, false) +} + +func mkExtentWithStepWithStats(start, end, step int64, withStats bool) Extent { + res := mkAPIResponseWithStats(start, end, step, withStats) any, err := types.MarshalAny(res) if err != nil { panic(err) @@ -113,6 +149,87 @@ func mkExtentWithStep(start, end, step int64) Extent { } } +func TestStatsCacheQuerySamples(t *testing.T) { + + for _, tc := range []struct { + name string + cacheQueryableSamplesStats bool + err error + req Request + upstreamResponse Response + expectedResponse Response + }{ + { + name: "should return error", + cacheQueryableSamplesStats: true, + req: noCacheRequest, + err: fmt.Errorf("error"), + }, + { + name: "should return response with stats", + cacheQueryableSamplesStats: true, + req: noCacheRequestWithStats, + upstreamResponse: mkAPIResponseWithStats(0, 100, 10, true), + expectedResponse: mkAPIResponseWithStats(0, 100, 10, true), + }, + { + name: "should return response strip stats if not requested", + cacheQueryableSamplesStats: true, + req: noCacheRequest, + upstreamResponse: mkAPIResponseWithStats(0, 100, 10, false), + expectedResponse: mkAPIResponseWithStats(0, 100, 10, false), + }, + { + name: "should not ask stats is cacheQueryableSamplesStats is disabled", + cacheQueryableSamplesStats: false, + req: noCacheRequest, + upstreamResponse: mkAPIResponseWithStats(0, 100, 10, false), + expectedResponse: mkAPIResponseWithStats(0, 100, 10, false), + }, + { + name: "should not forward stats when cacheQueryableSamplesStats is disabled", + cacheQueryableSamplesStats: false, + req: noCacheRequestWithStats, + upstreamResponse: mkAPIResponseWithStats(0, 100, 10, true), + expectedResponse: mkAPIResponseWithStats(0, 100, 10, false), + }, + } { + t.Run(tc.name, func(t *testing.T) { + cfg := ResultsCacheConfig{ + CacheConfig: cache.Config{ + Cache: cache.NewMockCache(), + }, + CacheQueryableSamplesStats: tc.cacheQueryableSamplesStats, + } + rcm, _, err := NewResultsCacheMiddleware( + log.NewNopLogger(), + cfg, + constSplitter(day), + mockLimits{}, + PrometheusCodec, + PrometheusResponseExtractor{}, + nil, + nil, + nil, + ) + require.NoError(t, err) + + rc := rcm.Wrap(HandlerFunc(func(_ context.Context, req Request) (Response, error) { + if tc.cacheQueryableSamplesStats { + require.Equal(t, "all", req.GetStats()) + } else { + require.Equal(t, "", req.GetStats()) + } + return tc.upstreamResponse, tc.err + })) + ctx := user.InjectOrgID(context.Background(), "1") + r, err := rc.Do(ctx, tc.req) + require.Equal(t, tc.err, err) + require.Equal(t, tc.expectedResponse, r) + }) + } +} + func TestShouldCache(t *testing.T) { maxCacheTime := int64(150 * 1000) c := &resultsCache{logger: log.NewNopLogger(), cacheGenNumberLoader: newMockCacheGenNumberLoader()} @@ -522,6 +639,125 @@ func TestPartition(t *testing.T) { mkAPIResponse(100, 105, 10), }, }, + { + name: "[Stats] Test a complete hit.", + input: &PrometheusRequest{ + Start: 0, + End: 100, + }, + prevCachedResponse: []Extent{ + mkExtentWithStats(0, 100), + }, + expectedCachedResponse: []Response{ + mkAPIResponseWithStats(0, 100, 10, true), + }, + }, + + { + name: "[Stats] Test with a complete miss.", + input: &PrometheusRequest{ + Start: 0, + End: 100, + }, + prevCachedResponse: []Extent{ + mkExtentWithStats(110, 210), + }, + expectedRequests: []Request{ + &PrometheusRequest{ + Start: 0, + End: 100, + }}, + }, + { + name: "[stats] Test a partial hit.", + input: &PrometheusRequest{ + Start: 0, + End: 100, + }, + prevCachedResponse: []Extent{ + mkExtentWithStats(50, 100), + }, + expectedRequests: []Request{ + &PrometheusRequest{ + Start: 0, + End: 50, + }, + }, + expectedCachedResponse: []Response{ + mkAPIResponseWithStats(50, 100, 10, true), + }, + }, + { + name: "[stats] Test multiple partial hits.", + input: &PrometheusRequest{ + Start: 100, + End: 200, + }, + prevCachedResponse: []Extent{ + mkExtentWithStats(50, 120), + mkExtentWithStats(160, 250), + }, + expectedRequests: []Request{ + &PrometheusRequest{ + Start: 120, + End: 160, + }, + }, + expectedCachedResponse: []Response{ + mkAPIResponseWithStats(100, 120, 10, true), + mkAPIResponseWithStats(160, 200, 10, true), + }, + }, + { + name: "[stats] Partial hits with tiny gap.", + input: &PrometheusRequest{ + Start: 100, + End: 160, + }, + prevCachedResponse: []Extent{ + mkExtentWithStats(50, 120), + mkExtentWithStats(122, 130), + }, + expectedRequests: []Request{ + &PrometheusRequest{ + Start: 120, + End: 160, + }, + }, + expectedCachedResponse: []Response{ + mkAPIResponseWithStats(100, 120, 10, true), + }, + }, + { + name: "[stats] Extent is outside the range and the request has a single step (same start and end).", + input: &PrometheusRequest{ + Start: 100, + End: 100, + }, + prevCachedResponse: []Extent{ + mkExtentWithStats(50, 90), + }, + expectedRequests: []Request{ + &PrometheusRequest{ + Start: 100, + End: 100, + }, + }, + }, + { + name: "[stats] Test when hit has a large step and only a single sample extent.", + // If there is a only a single sample in the split interval, start and end will be the same. + input: &PrometheusRequest{ + Start: 100, + End: 100, + }, + prevCachedResponse: []Extent{ + mkExtentWithStats(100, 100), + }, + expectedCachedResponse: []Response{ + mkAPIResponseWithStats(100, 105, 10, true), + }, + }, } { t.Run(tc.name, func(t *testing.T) { s := resultsCache{ @@ -791,6 +1027,7 @@ func TestResultsCacheRecent(t *testing.T) { var cfg ResultsCacheConfig flagext.DefaultValues(&cfg) cfg.CacheConfig.Cache = cache.NewMockCache() + cfg.CacheQueryableSamplesStats = true rcm, _, err := NewResultsCacheMiddleware( log.NewNopLogger(), cfg, diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 5dd9c47a30..f6b31c4f22 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -36,6 +36,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/cache" + "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -76,12 +77,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { } // Validate validates the config. -func (cfg *Config) Validate() error { +func (cfg *Config) Validate(qCfg querier.Config) error { if cfg.CacheResults { if cfg.SplitQueriesByInterval <= 0 { return errors.New("querier.cache-results may only be enabled in conjunction with querier.split-queries-by-interval. Please set the latter") } - if err := cfg.ResultsCacheConfig.Validate(); err != nil { + if err := cfg.ResultsCacheConfig.Validate(qCfg); err != nil { return errors.Wrap(err, "invalid ResultsCache config") } }