diff --git a/CHANGELOG.md b/CHANGELOG.md index acc9145923..4b1d6411be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re ### Added - [#3700](https://github.com/thanos-io/thanos/pull/3700) ui: make old bucket viewer UI work with vanilla Prometheus blocks +- [#2641](https://github.com/thanos-io/thanos/issues/2641) Query Frontend: Added `--query-range.request-downsampled` flag enabling additional queries for downsampled data in case of empty or incomplete response to range request. ### Changed diff --git a/cmd/thanos/query_frontend.go b/cmd/thanos/query_frontend.go index 52688a65ea..55318452c4 100644 --- a/cmd/thanos/query_frontend.go +++ b/cmd/thanos/query_frontend.go @@ -65,6 +65,9 @@ func registerQueryFrontend(app *extkingpin.App) { cmd.Flag("query-range.align-range-with-step", "Mutate incoming queries to align their start and end with their step for better cache-ability. Note: Grafana dashboards do that by default."). Default("true").BoolVar(&cfg.QueryRangeConfig.AlignRangeWithStep) + cmd.Flag("query-range.request-downsampled", "Make additional query for downsampled data in case of empty or incomplete response to range request."). + Default("true").BoolVar(&cfg.QueryRangeConfig.RequestDownsampled) + cmd.Flag("query-range.split-interval", "Split query range requests by an interval and execute in parallel, it should be greater than 0 when query-range.response-cache-config is configured."). Default("24h").DurationVar(&cfg.QueryRangeConfig.SplitQueriesByInterval) diff --git a/docs/components/query-frontend.md b/docs/components/query-frontend.md index b29d71411a..f1afeaa1dd 100644 --- a/docs/components/query-frontend.md +++ b/docs/components/query-frontend.md @@ -147,6 +147,10 @@ Flags: and end with their step for better cache-ability. Note: Grafana dashboards do that by default. + --query-range.request-downsampled + Make additional query for downsampled data in + case of empty or incomplete response to range + request. --query-range.split-interval=24h Split query range requests by an interval and execute in parallel, it should be greater than diff --git a/pkg/queryfrontend/config.go b/pkg/queryfrontend/config.go index 5656cdbcaf..8b9e4483d9 100644 --- a/pkg/queryfrontend/config.go +++ b/pkg/queryfrontend/config.go @@ -162,6 +162,7 @@ type QueryRangeConfig struct { CachePathOrContent extflag.PathOrContent AlignRangeWithStep bool + RequestDownsampled bool SplitQueriesByInterval time.Duration MaxRetries int Limits *cortexvalidation.Limits diff --git a/pkg/queryfrontend/downsampled.go b/pkg/queryfrontend/downsampled.go new file mode 100644 index 0000000000..44a48c3bcd --- /dev/null +++ b/pkg/queryfrontend/downsampled.go @@ -0,0 +1,103 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package queryfrontend + +import ( + "context" + + "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/compact/downsample" +) + +// DownsampledMiddleware creates a new Middleware that requests downsampled data +// should response to original request with auto max_source_resolution not contain data points. +func DownsampledMiddleware(merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware { + return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { + return downsampled{ + next: next, + merger: merger, + extraCounter: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: "thanos", + Name: "frontend_downsampled_extra_queries_total", + Help: "Total number of extra queries for downsampled data", + }), + } + }) +} + +type downsampled struct { + next queryrange.Handler + merger queryrange.Merger + + // Metrics. + extraCounter prometheus.Counter +} + +var resolutions = []int64{downsample.ResLevel1, downsample.ResLevel2} + +func (d downsampled) Do(ctx context.Context, req queryrange.Request) (queryrange.Response, error) { + tqrr, ok := req.(*ThanosQueryRangeRequest) + if !ok || !tqrr.AutoDownsampling { + return d.next.Do(ctx, req) + } + + var ( + resps = make([]queryrange.Response, 0) + resp queryrange.Response + err error + i int + ) + +forLoop: + for i < len(resolutions) { + if i > 0 { + d.extraCounter.Inc() + } + r := *tqrr + resp, err = d.next.Do(ctx, &r) + if err != nil { + return nil, err + } + resps = append(resps, resp) + // Set MaxSourceResolution for next request, if any. + for i < len(resolutions) { + if tqrr.MaxSourceResolution < resolutions[i] { + tqrr.AutoDownsampling = false + tqrr.MaxSourceResolution = resolutions[i] + break + } + i++ + } + m := minResponseTime(resp) + switch m { + case tqrr.Start: // Response not impacted by retention policy. + break forLoop + case -1: // Empty response, retry with higher MaxSourceResolution. + continue + default: // Data partially present, query for empty part with higher MaxSourceResolution. + tqrr.End = m - tqrr.Step + } + if tqrr.Start > tqrr.End { + break forLoop + } + } + response, err := d.merger.MergeResponse(resps...) + if err != nil { + return nil, err + } + return response, nil +} + +func minResponseTime(r queryrange.Response) int64 { + var res = r.(*queryrange.PrometheusResponse).Data.Result + if len(res) == 0 { + return -1 + } + if len(res[0].Samples) == 0 { + return -1 + } + return res[0].Samples[0].TimestampMs +} diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index 835842ff47..ab780e14fd 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -131,7 +131,7 @@ func getOperation(r *http.Request) string { } // newQueryRangeTripperware returns a Tripperware for range queries configured with middlewares of -// limit, step align, split by interval, cache requests and retry. +// limit, step align, downsampled, split by interval, cache requests and retry. func newQueryRangeTripperware( config QueryRangeConfig, limits queryrange.Limits, @@ -151,6 +151,14 @@ func newQueryRangeTripperware( ) } + if config.RequestDownsampled { + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrange.InstrumentMiddleware("downsampled", m), + DownsampledMiddleware(codec, reg), + ) + } + queryIntervalFn := func(_ queryrange.Request) time.Duration { return config.SplitQueriesByInterval }