-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add
--query-range.request-downsampled
flag to Query Frontend (#2641)
Signed-off-by: Vladimir Kononov <[email protected]>
- Loading branch information
1 parent
c7a171f
commit 87e84da
Showing
6 changed files
with
121 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package queryfrontend | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/cortexproject/cortex/pkg/querier/queryrange" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
"github.com/thanos-io/thanos/pkg/compact/downsample" | ||
) | ||
|
||
// DownsampledMiddleware creates a new Middleware that requests downsampled data | ||
// should response to original request with auto max_source_resolution not contain data points. | ||
func DownsampledMiddleware(merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware { | ||
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { | ||
return downsampled{ | ||
next: next, | ||
merger: merger, | ||
extraCounter: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ | ||
Namespace: "thanos", | ||
Name: "frontend_downsampled_extra_queries_total", | ||
Help: "Total number of extra queries for downsampled data", | ||
}), | ||
} | ||
}) | ||
} | ||
|
||
type downsampled struct { | ||
next queryrange.Handler | ||
merger queryrange.Merger | ||
|
||
// Metrics. | ||
extraCounter prometheus.Counter | ||
} | ||
|
||
var resolutions = []int64{downsample.ResLevel1, downsample.ResLevel2} | ||
|
||
func (d downsampled) Do(ctx context.Context, req queryrange.Request) (queryrange.Response, error) { | ||
tqrr, ok := req.(*ThanosQueryRangeRequest) | ||
if !ok || !tqrr.AutoDownsampling { | ||
return d.next.Do(ctx, req) | ||
} | ||
|
||
var ( | ||
resps = make([]queryrange.Response, 0) | ||
resp queryrange.Response | ||
err error | ||
i int | ||
) | ||
|
||
forLoop: | ||
for i < len(resolutions) { | ||
if i > 0 { | ||
d.extraCounter.Inc() | ||
} | ||
r := *tqrr | ||
resp, err = d.next.Do(ctx, &r) | ||
if err != nil { | ||
return nil, err | ||
} | ||
resps = append(resps, resp) | ||
// Set MaxSourceResolution for next request, if any. | ||
for i < len(resolutions) { | ||
if tqrr.MaxSourceResolution < resolutions[i] { | ||
tqrr.AutoDownsampling = false | ||
tqrr.MaxSourceResolution = resolutions[i] | ||
break | ||
} | ||
i++ | ||
} | ||
m := minResponseTime(resp) | ||
switch m { | ||
case tqrr.Start: // Response not impacted by retention policy. | ||
break forLoop | ||
case -1: // Empty response, retry with higher MaxSourceResolution. | ||
continue | ||
default: // Data partially present, query for empty part with higher MaxSourceResolution. | ||
tqrr.End = m - tqrr.Step | ||
} | ||
if tqrr.Start > tqrr.End { | ||
break forLoop | ||
} | ||
} | ||
response, err := d.merger.MergeResponse(resps...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return response, nil | ||
} | ||
|
||
func minResponseTime(r queryrange.Response) int64 { | ||
var res = r.(*queryrange.PrometheusResponse).Data.Result | ||
if len(res) == 0 { | ||
return -1 | ||
} | ||
if len(res[0].Samples) == 0 { | ||
return -1 | ||
} | ||
return res[0].Samples[0].TimestampMs | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters