diff --git a/.golangci.yml b/.golangci.yml index 934f6b08107..973f29bb53c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -53,6 +53,8 @@ linters-settings: exclude: ./.errcheck_excludes.txt misspell: locale: US + goconst: + min-occurrences: 5 issues: exclude-rules: diff --git a/CHANGELOG.md b/CHANGELOG.md index b56413ce57c..1d502dc2d96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#3261](https://github.com/thanos-io/thanos/pull/3261) Thanos Store: Use segment files specified in meta.json file, if present. If not present, Store does the LIST operation as before. - [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors. +- [#3276](https://github.com/thanos-io/thanos/pull/3276) Query Frontend: Support query splitting and retry for labels and series requests. ## [v0.16.0](https://github.com/thanos-io/thanos/releases) - Release in progress diff --git a/cmd/thanos/query_frontend.go b/cmd/thanos/query_frontend.go index 5ac7342bf81..09f09aa165a 100644 --- a/cmd/thanos/query_frontend.go +++ b/cmd/thanos/query_frontend.go @@ -42,33 +42,59 @@ func registerQueryFrontend(app *extkingpin.App) { cmd := app.Command(comp.String(), "query frontend") cfg := &queryFrontendConfig{ Config: queryfrontend.Config{ - CortexFrontendConfig: &cortexfrontend.Config{}, - CortexLimits: &cortexvalidation.Limits{}, - CortexResultsCacheConfig: &queryrange.ResultsCacheConfig{}, + CortexFrontendConfig: &cortexfrontend.Config{}, + QueryRangeConfig: queryfrontend.QueryRangeConfig{ + Limits: &cortexvalidation.Limits{}, + ResultsCacheConfig: &queryrange.ResultsCacheConfig{}, + }, + LabelsConfig: queryfrontend.LabelsConfig{ + Limits: &cortexvalidation.Limits{}, + ResultsCacheConfig: &queryrange.ResultsCacheConfig{}, + }, }, } cfg.http.registerFlag(cmd) - cmd.Flag("query-range.split-interval", "Split queries by an interval and execute in parallel, it should be greater than 0 when response-cache-config is configured."). - Default("24h").DurationVar(&cfg.SplitQueriesByInterval) + // Query range tripperware flags. + cmd.Flag("query-range.split-interval", "Split query range requests by an interval and execute in parallel, it should be greater than 0 when query-range.response-cache-config is configured."). + Default("24h").DurationVar(&cfg.QueryRangeConfig.SplitQueriesByInterval) - cmd.Flag("query-range.max-retries-per-request", "Maximum number of retries for a single request; beyond this, the downstream error is returned."). - Default("5").IntVar(&cfg.MaxRetries) + cmd.Flag("query-range.max-retries-per-request", "Maximum number of retries for a single query range request; beyond this, the downstream error is returned."). + Default("5").IntVar(&cfg.QueryRangeConfig.MaxRetries) cmd.Flag("query-range.max-query-length", "Limit the query time range (end - start time) in the query-frontend, 0 disables it."). - Default("0").DurationVar(&cfg.CortexLimits.MaxQueryLength) + Default("0").DurationVar(&cfg.QueryRangeConfig.Limits.MaxQueryLength) - cmd.Flag("query-range.max-query-parallelism", "Maximum number of queries will be scheduled in parallel by the Frontend."). - Default("14").IntVar(&cfg.CortexLimits.MaxQueryParallelism) + cmd.Flag("query-range.max-query-parallelism", "Maximum number of query range requests will be scheduled in parallel by the Frontend."). + Default("14").IntVar(&cfg.QueryRangeConfig.Limits.MaxQueryParallelism) - cmd.Flag("query-range.response-cache-max-freshness", "Most recent allowed cacheable result, to prevent caching very recent results that might still be in flux."). - Default("1m").DurationVar(&cfg.CortexLimits.MaxCacheFreshness) + cmd.Flag("query-range.response-cache-max-freshness", "Most recent allowed cacheable result for query range requests, to prevent caching very recent results that might still be in flux."). + Default("1m").DurationVar(&cfg.QueryRangeConfig.Limits.MaxCacheFreshness) - cmd.Flag("query-range.partial-response", "Enable partial response for queries if no partial_response param is specified. --no-query-range.partial-response for disabling."). - Default("true").BoolVar(&cfg.PartialResponseStrategy) + cmd.Flag("query-range.partial-response", "Enable partial response for query range requests if no partial_response param is specified. --no-query-range.partial-response for disabling."). + Default("true").BoolVar(&cfg.QueryRangeConfig.PartialResponseStrategy) - cfg.CachePathOrContent = *extflag.RegisterPathOrContent(cmd, "query-range.response-cache-config", "YAML file that contains response cache configuration.", false) + cfg.QueryRangeConfig.CachePathOrContent = *extflag.RegisterPathOrContent(cmd, "query-range.response-cache-config", "YAML file that contains response cache configuration.", false) + + // Labels tripperware flags. + cmd.Flag("labels.split-interval", "Split labels requests by an interval and execute in parallel, it should be greater than 0 when labels.response-cache-config is configured."). + Default("24h").DurationVar(&cfg.LabelsConfig.SplitQueriesByInterval) + + cmd.Flag("labels.max-retries-per-request", "Maximum number of retries for a single label/series API request; beyond this, the downstream error is returned."). + Default("5").IntVar(&cfg.LabelsConfig.MaxRetries) + + cmd.Flag("labels.max-query-parallelism", "Maximum number of labels requests will be scheduled in parallel by the Frontend."). + Default("14").IntVar(&cfg.LabelsConfig.Limits.MaxQueryParallelism) + + cmd.Flag("labels.response-cache-max-freshness", "Most recent allowed cacheable result for labels requests, to prevent caching very recent results that might still be in flux."). + Default("1m").DurationVar(&cfg.LabelsConfig.Limits.MaxCacheFreshness) + + cmd.Flag("labels.partial-response", "Enable partial response for labels requests if no partial_response param is specified. --no-labels.partial-response for disabling."). + Default("true").BoolVar(&cfg.LabelsConfig.PartialResponseStrategy) + + cmd.Flag("labels.default-time-range", "The default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified."). + Default("24h").DurationVar(&cfg.DefaultTimeRange) cmd.Flag("cache-compression-type", "Use compression in results cache. Supported values are: 'snappy' and '' (disable compression)."). Default("").StringVar(&cfg.CacheCompression) @@ -97,20 +123,16 @@ func runQueryFrontend( cfg *queryFrontendConfig, comp component.Component, ) error { - cacheConfContentYaml, err := cfg.CachePathOrContent.Content() + queryRangeCacheConfContentYaml, err := cfg.QueryRangeConfig.CachePathOrContent.Content() if err != nil { return err } - if len(cacheConfContentYaml) > 0 { - cacheConfig, err := queryfrontend.NewCacheConfig(logger, cacheConfContentYaml) + if len(queryRangeCacheConfContentYaml) > 0 { + cacheConfig, err := queryfrontend.NewCacheConfig(logger, queryRangeCacheConfContentYaml) if err != nil { - return errors.Wrap(err, "initializing the query frontend config") - } - if cfg.CortexResultsCacheConfig.CacheConfig.Memcache.Expiration == 0 { - level.Warn(logger).Log("msg", "memcached cache valid time set to 0, so using a default of 24 hours expiration time") - cfg.CortexResultsCacheConfig.CacheConfig.Memcache.Expiration = 24 * time.Hour + return errors.Wrap(err, "initializing the query range cache config") } - cfg.CortexResultsCacheConfig = &queryrange.ResultsCacheConfig{ + cfg.QueryRangeConfig.ResultsCacheConfig = &queryrange.ResultsCacheConfig{ Compression: cfg.CacheCompression, CacheConfig: *cacheConfig, } @@ -128,7 +150,7 @@ func runQueryFrontend( tripperWare, err := queryfrontend.NewTripperware(cfg.Config, reg, logger) if err != nil { - return errors.Wrap(err, "setup query range middlewares") + return errors.Wrap(err, "setup tripperwares") } fe.Wrap(tripperWare) diff --git a/docs/components/query-frontend.md b/docs/components/query-frontend.md index 6e174baf375..d96a25951cb 100644 --- a/docs/components/query-frontend.md +++ b/docs/components/query-frontend.md @@ -107,71 +107,99 @@ usage: thanos query-frontend [] query frontend Flags: - -h, --help Show context-sensitive help (also try --help-long - and --help-man). - --version Show application version. - --log.level=info Log filtering level. - --log.format=logfmt Log format to use. Possible options: logfmt or - json. + -h, --help Show context-sensitive help (also try + --help-long and --help-man). + --version Show application version. + --log.level=info Log filtering level. + --log.format=logfmt Log format to use. Possible options: logfmt or + json. --tracing.config-file= - Path to YAML file with tracing configuration. See - format details: - https://thanos.io/tip/thanos/tracing.md/#configuration + Path to YAML file with tracing configuration. + See format details: + https://thanos.io/tip/thanos/tracing.md/#configuration --tracing.config= - Alternative to 'tracing.config-file' flag (lower - priority). Content of YAML file with tracing - configuration. See format details: - https://thanos.io/tip/thanos/tracing.md/#configuration + Alternative to 'tracing.config-file' flag + (lower priority). Content of YAML file with + tracing configuration. See format details: + https://thanos.io/tip/thanos/tracing.md/#configuration --http-address="0.0.0.0:10902" - Listen host:port for HTTP endpoints. - --http-grace-period=2m Time to wait after an interrupt received for HTTP - Server. + Listen host:port for HTTP endpoints. + --http-grace-period=2m Time to wait after an interrupt received for + HTTP Server. --query-range.split-interval=24h - Split queries by an interval and execute in - parallel, it should be greater than 0 when - response-cache-config is configured. + Split query range requests by an interval and + execute in parallel, it should be greater than + 0 when query-range.response-cache-config is + configured. --query-range.max-retries-per-request=5 - Maximum number of retries for a single request; - beyond this, the downstream error is returned. + Maximum number of retries for a single query + range request; beyond this, the downstream + error is returned. --query-range.max-query-length=0 - Limit the query time range (end - start time) in - the query-frontend, 0 disables it. + Limit the query time range (end - start time) + in the query-frontend, 0 disables it. --query-range.max-query-parallelism=14 - Maximum number of queries will be scheduled in - parallel by the Frontend. + Maximum number of query range requests will be + scheduled in parallel by the Frontend. --query-range.response-cache-max-freshness=1m - Most recent allowed cacheable result, to prevent - caching very recent results that might still be in - flux. + Most recent allowed cacheable result for query + range requests, to prevent caching very recent + results that might still be in flux. --query-range.partial-response - Enable partial response for queries if no - partial_response param is specified. - --no-query-range.partial-response for disabling. + Enable partial response for query range + requests if no partial_response param is + specified. --no-query-range.partial-response + for disabling. --query-range.response-cache-config-file= - Path to YAML file that contains response cache - configuration. + Path to YAML file that contains response cache + configuration. --query-range.response-cache-config= - Alternative to - 'query-range.response-cache-config-file' flag - (lower priority). Content of YAML file that - contains response cache configuration. + Alternative to + 'query-range.response-cache-config-file' flag + (lower priority). Content of YAML file that + contains response cache configuration. + --labels.split-interval=24h + Split labels requests by an interval and + execute in parallel, it should be greater than + 0 when labels.response-cache-config is + configured. + --labels.max-retries-per-request=5 + Maximum number of retries for a single + label/series API request; beyond this, the + downstream error is returned. + --labels.max-query-parallelism=14 + Maximum number of labels requests will be + scheduled in parallel by the Frontend. + --labels.response-cache-max-freshness=1m + Most recent allowed cacheable result for labels + requests, to prevent caching very recent + results that might still be in flux. + --labels.partial-response Enable partial response for labels requests if + no partial_response param is specified. + --no-labels.partial-response for disabling. + --labels.default-time-range=24h + The default metadata time range duration for + retrieving labels through Labels and Series API + when the range parameters are not specified. --cache-compression-type="" - Use compression in results cache. Supported values - are: 'snappy' and ” (disable compression). + Use compression in results cache. Supported + values are: 'snappy' and ” (disable + compression). --query-frontend.downstream-url="http://localhost:9090" - URL of downstream Prometheus Query compatible API. + URL of downstream Prometheus Query compatible + API. --query-frontend.compress-responses - Compress HTTP responses. + Compress HTTP responses. --query-frontend.log-queries-longer-than=0 - Log queries that are slower than the specified - duration. Set to 0 to disable. Set to < 0 to - enable on all queries. + Log queries that are slower than the specified + duration. Set to 0 to disable. Set to < 0 to + enable on all queries. --log.request.decision=LogFinishCall - Request Logging for logging the start and end of - requests. LogFinishCall is enabled by default. - LogFinishCall : Logs the finish call of the - requests. LogStartAndFinishCall : Logs the start - and finish call of the requests. NoLogCall : - Disable request logging. + Request Logging for logging the start and end + of requests. LogFinishCall is enabled by + default. LogFinishCall : Logs the finish call + of the requests. LogStartAndFinishCall : Logs + the start and finish call of the requests. + NoLogCall : Disable request logging. ``` diff --git a/go.mod b/go.mod index 4bafa406f67..87554f58f91 100644 --- a/go.mod +++ b/go.mod @@ -44,11 +44,11 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 - github.com/prometheus/alertmanager v0.21.0 + github.com/prometheus/alertmanager v0.21.1-0.20200911160112-1fdff6b3f939 github.com/prometheus/client_golang v1.7.1 github.com/prometheus/client_model v0.2.0 - github.com/prometheus/common v0.13.0 - github.com/prometheus/prometheus v1.8.2-0.20200819132913-cb830b0a9c78 + github.com/prometheus/common v0.14.0 + github.com/prometheus/prometheus v1.8.2-0.20200923143134-7e2db3d092f3 github.com/uber/jaeger-client-go v2.25.0+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099 diff --git a/go.sum b/go.sum index d888ece2bf7..60b7ed8799c 100644 --- a/go.sum +++ b/go.sum @@ -865,8 +865,9 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prometheus/alertmanager v0.19.0/go.mod h1:Eyp94Yi/T+kdeb2qvq66E3RGuph5T/jm/RBVh4yz1xo= -github.com/prometheus/alertmanager v0.21.0 h1:qK51JcUR9l/unhawGA9F9B64OCYfcGewhPNprem/Acc= github.com/prometheus/alertmanager v0.21.0/go.mod h1:h7tJ81NA0VLWvWEayi1QltevFkLF3KxmC/malTcT8Go= +github.com/prometheus/alertmanager v0.21.1-0.20200911160112-1fdff6b3f939 h1:/gGoc4W45469qMuGGEMArYEs8wsk31/5oE56NUGjEN0= +github.com/prometheus/alertmanager v0.21.1-0.20200911160112-1fdff6b3f939/go.mod h1:imXRHOP6QTsE0fFsIsAV/cXimS32m7gVZOiUj11m6Ig= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= @@ -898,8 +899,10 @@ github.com/prometheus/common v0.8.0/go.mod h1:PC/OgXc+UN7B4ALwvn1yzVZmVwvhXp5Jsb github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.11.1/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= -github.com/prometheus/common v0.13.0 h1:vJlpe9wPgDRM1Z+7Wj3zUUjY1nr6/1jNKyl7llliccg= +github.com/prometheus/common v0.12.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.13.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= +github.com/prometheus/common v0.14.0 h1:RHRyE8UocrbjU+6UvRzwi6HjiDfxrrBU91TtbKzkGp4= +github.com/prometheus/common v0.14.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289 h1:dTUS1vaLWq+Y6XKOTnrFpoVsQKLCbCp1OLj24TDi7oM= github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289/go.mod h1:FGbBv5OPKjch+jNUJmEQpMZytIdyW0NdBtWFcfSKusc= github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index eab980379f5..06576a9ec5b 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -55,6 +55,7 @@ const ( PartialResponseParam = "partial_response" MaxSourceResolutionParam = "max_source_resolution" ReplicaLabelsParam = "replicaLabels[]" + MatcherParam = "match[]" StoreMatcherParam = "storeMatch[]" ) @@ -459,7 +460,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")} } - if len(r.Form["match[]"]) == 0 { + if len(r.Form[MatcherParam]) == 0 { return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("no match[] parameter provided")} } @@ -469,7 +470,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr } var matcherSets [][]*labels.Matcher - for _, s := range r.Form["match[]"] { + for _, s := range r.Form[MatcherParam] { matchers, err := parser.ParseMetricSelector(s) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err} diff --git a/pkg/queryfrontend/cache_splitter.go b/pkg/queryfrontend/cache_splitter.go index 3c144fa04c7..4da5074a4e8 100644 --- a/pkg/queryfrontend/cache_splitter.go +++ b/pkg/queryfrontend/cache_splitter.go @@ -29,7 +29,7 @@ func newThanosCacheKeyGenerator(interval time.Duration) thanosCacheKeyGenerator // GenerateCacheKey generates a cache key based on the Request and interval. func (t thanosCacheKeyGenerator) GenerateCacheKey(_ string, r queryrange.Request) string { currentInterval := r.GetStart() / t.interval.Milliseconds() - if tr, ok := r.(*ThanosRequest); ok { + if tr, ok := r.(*ThanosQueryRangeRequest); ok { i := 0 for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ { } diff --git a/pkg/queryfrontend/cache_splitter_test.go b/pkg/queryfrontend/cache_splitter_test.go index 6051e98d4c8..3504419e7ba 100644 --- a/pkg/queryfrontend/cache_splitter_test.go +++ b/pkg/queryfrontend/cache_splitter_test.go @@ -30,7 +30,7 @@ func TestGenerateCacheKey(t *testing.T) { }, { name: "non downsampling resolution specified", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Query: "up", Start: 0, Step: 60 * seconds, @@ -39,7 +39,7 @@ func TestGenerateCacheKey(t *testing.T) { }, { name: "10s step", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Query: "up", Start: 0, Step: 10 * seconds, @@ -48,7 +48,7 @@ func TestGenerateCacheKey(t *testing.T) { }, { name: "1m downsampling resolution", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Query: "up", Start: 0, Step: 10 * seconds, @@ -58,7 +58,7 @@ func TestGenerateCacheKey(t *testing.T) { }, { name: "5m downsampling resolution, different cache key", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Query: "up", Start: 0, Step: 10 * seconds, @@ -68,7 +68,7 @@ func TestGenerateCacheKey(t *testing.T) { }, { name: "1h downsampling resolution, different cache key", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Query: "up", Start: 0, Step: 10 * seconds, diff --git a/pkg/queryfrontend/config.go b/pkg/queryfrontend/config.go index 3df378e76aa..a04380db9ae 100644 --- a/pkg/queryfrontend/config.go +++ b/pkg/queryfrontend/config.go @@ -88,6 +88,11 @@ func NewCacheConfig(logger log.Logger, confContentYaml []byte) (*cortexcache.Con level.Warn(logger).Log("message", "MaxItemSize is not yet supported by the memcached client") } + if config.Expiration == 0 { + level.Warn(logger).Log("msg", "memcached cache valid time set to 0, so using a default of 24 hours expiration time") + config.Expiration = 24 * time.Hour + } + return &cortexcache.Config{ Memcache: cortexcache.MemcachedConfig{ Expiration: config.Expiration, @@ -112,32 +117,59 @@ func NewCacheConfig(logger log.Logger, confContentYaml []byte) (*cortexcache.Con // Config holds the query frontend configs. type Config struct { + QueryRangeConfig + LabelsConfig + + CortexFrontendConfig *cortexfrontend.Config + CacheCompression string + RequestLoggingDecision string +} + +// QueryRangeConfig holds the config for query range tripperware. +type QueryRangeConfig struct { // PartialResponseStrategy is the default strategy used // when parsing thanos query request. PartialResponseStrategy bool - CortexFrontendConfig *cortexfrontend.Config - CortexLimits *cortexvalidation.Limits - CortexResultsCacheConfig *queryrange.ResultsCacheConfig + ResultsCacheConfig *queryrange.ResultsCacheConfig + CachePathOrContent extflag.PathOrContent - CachePathOrContent extflag.PathOrContent - CacheCompression string - RequestLoggingDecision string + SplitQueriesByInterval time.Duration + MaxRetries int + Limits *cortexvalidation.Limits +} + +// LabelsConfig holds the config for labels tripperware. +type LabelsConfig struct { + // PartialResponseStrategy is the default strategy used + // when parsing thanos query request. + PartialResponseStrategy bool + DefaultTimeRange time.Duration + + ResultsCacheConfig *queryrange.ResultsCacheConfig + CachePathOrContent extflag.PathOrContent SplitQueriesByInterval time.Duration MaxRetries int + + Limits *cortexvalidation.Limits } // Validate a fully initialized config. func (cfg *Config) Validate() error { - if cfg.CortexResultsCacheConfig != nil { - if cfg.SplitQueriesByInterval <= 0 { - return errors.New("split queries interval should be greater then 0") + if cfg.QueryRangeConfig.ResultsCacheConfig != nil { + if cfg.QueryRangeConfig.SplitQueriesByInterval <= 0 { + return errors.New("split queries interval should be greater than 0") } - if err := cfg.CortexResultsCacheConfig.Validate(); err != nil { + if err := cfg.QueryRangeConfig.ResultsCacheConfig.Validate(); err != nil { return errors.Wrap(err, "invalid ResultsCache config") } } + + if cfg.LabelsConfig.DefaultTimeRange == 0 { + return errors.New("labels.default-time-range cannot be set to 0") + } + if len(cfg.CortexFrontendConfig.DownstreamURL) == 0 { return errors.New("downstream URL should be configured") } diff --git a/pkg/queryfrontend/labels_codec.go b/pkg/queryfrontend/labels_codec.go new file mode 100644 index 00000000000..c9101390dd1 --- /dev/null +++ b/pkg/queryfrontend/labels_codec.go @@ -0,0 +1,377 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package queryfrontend + +import ( + "bytes" + "context" + "encoding/json" + "io/ioutil" + "math" + "net/http" + "net/url" + "sort" + "strconv" + "strings" + "time" + + "github.com/cortexproject/cortex/pkg/querier/queryrange" + cortexutil "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/weaveworks/common/httpgrpc" + + queryv1 "github.com/thanos-io/thanos/pkg/api/query" + "github.com/thanos-io/thanos/pkg/store/labelpb" +) + +var ( + infMinTime = time.Unix(math.MinInt64/1000+62135596801, 0) + infMaxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999) +) + +// labelsCodec is used to encode/decode Thanos labels and series requests and responses. +type labelsCodec struct { + queryrange.Codec + partialResponse bool + defaultMetadataTimeRange time.Duration +} + +// NewThanosLabelsCodec initializes a labelsCodec. +func NewThanosLabelsCodec(partialResponse bool, defaultMetadataTimeRange time.Duration) *labelsCodec { + return &labelsCodec{ + Codec: queryrange.PrometheusCodec, + partialResponse: partialResponse, + defaultMetadataTimeRange: defaultMetadataTimeRange, + } +} + +func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) { + if len(responses) == 0 { + return &ThanosLabelsResponse{ + Status: queryrange.StatusSuccess, + Data: []string{}, + }, nil + } + + if len(responses) == 1 { + return responses[0], nil + } + + switch responses[0].(type) { + case *ThanosLabelsResponse: + set := make(map[string]struct{}) + + for _, res := range responses { + for _, value := range res.(*ThanosLabelsResponse).Data { + if _, ok := set[value]; !ok { + set[value] = struct{}{} + } + } + } + lbls := make([]string, 0, len(set)) + for label := range set { + lbls = append(lbls, label) + } + + sort.Strings(lbls) + return &ThanosLabelsResponse{ + Status: queryrange.StatusSuccess, + Data: lbls, + }, nil + case *ThanosSeriesResponse: + seriesData := make([]labelpb.LabelSet, 0) + + // seriesString is used in soring so we don't have to calculate the string of label sets again. + seriesString := make([]string, 0) + uniqueSeries := make(map[string]struct{}) + for _, res := range responses { + for _, series := range res.(*ThanosSeriesResponse).Data { + s := labelpb.LabelsToPromLabels(series.Labels).String() + if _, ok := uniqueSeries[s]; !ok { + seriesData = append(seriesData, series) + seriesString = append(seriesString, s) + uniqueSeries[s] = struct{}{} + } + } + } + + sort.Slice(seriesData, func(i, j int) bool { + return seriesString[i] < seriesString[j] + }) + return &ThanosSeriesResponse{ + Status: queryrange.StatusSuccess, + Data: seriesData, + }, nil + default: + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format") + } +} + +func (c labelsCodec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Request, error) { + if err := r.ParseForm(); err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + var ( + req queryrange.Request + err error + ) + switch op := getOperation(r); op { + case labelNamesOp, labelValuesOp: + req, err = c.parseLabelsRequest(r, op) + case seriesOp: + req, err = c.parseSeriesRequest(r) + } + if err != nil { + return nil, err + } + + return req, nil +} + +func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) { + var u *url.URL + switch thanosReq := r.(type) { + case *ThanosLabelsRequest: + var params = url.Values{ + "start": []string{encodeTime(thanosReq.Start)}, + "end": []string{encodeTime(thanosReq.End)}, + queryv1.PartialResponseParam: []string{strconv.FormatBool(thanosReq.PartialResponse)}, + } + if len(thanosReq.StoreMatchers) > 0 { + params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers) + } + u = &url.URL{ + Path: thanosReq.Path, + RawQuery: params.Encode(), + } + case *ThanosSeriesRequest: + var params = url.Values{ + "start": []string{encodeTime(thanosReq.Start)}, + "end": []string{encodeTime(thanosReq.End)}, + queryv1.DedupParam: []string{strconv.FormatBool(thanosReq.Dedup)}, + queryv1.PartialResponseParam: []string{strconv.FormatBool(thanosReq.PartialResponse)}, + queryv1.ReplicaLabelsParam: thanosReq.ReplicaLabels, + } + if len(thanosReq.Matchers) > 0 { + params[queryv1.MatcherParam] = matchersToStringSlice(thanosReq.Matchers) + } + if len(thanosReq.StoreMatchers) > 0 { + params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers) + } + u = &url.URL{ + Path: thanosReq.Path, + RawQuery: params.Encode(), + } + default: + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format") + } + + req := &http.Request{ + Method: "GET", + RequestURI: u.String(), // This is what the httpgrpc code looks at. + URL: u, + Body: http.NoBody, + Header: http.Header{}, + } + + return req.WithContext(ctx), nil +} + +func (c labelsCodec) DecodeResponse(ctx context.Context, r *http.Response, req queryrange.Request) (queryrange.Response, error) { + if r.StatusCode/100 != 2 { + body, _ := ioutil.ReadAll(r.Body) + return nil, httpgrpc.Errorf(r.StatusCode, string(body)) + } + log, ctx := spanlogger.New(ctx, "ParseQueryResponse") //nolint:ineffassign,staticcheck + defer log.Finish() + + buf, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Error(err) //nolint:errcheck + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + } + + log.LogFields(otlog.Int("bytes", len(buf))) + + switch req.(type) { + case *ThanosLabelsRequest: + var resp ThanosLabelsResponse + if err := json.Unmarshal(buf, &resp); err != nil { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + } + return &resp, nil + case *ThanosSeriesRequest: + var resp ThanosSeriesResponse + if err := json.Unmarshal(buf, &resp); err != nil { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + } + return &resp, nil + default: + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request type") + } +} + +func (c labelsCodec) EncodeResponse(ctx context.Context, res queryrange.Response) (*http.Response, error) { + sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse") + defer sp.Finish() + + var ( + b []byte + err error + ) + switch resp := res.(type) { + case *ThanosLabelsResponse: + sp.LogFields(otlog.Int("labels", len(resp.Data))) + b, err = json.Marshal(resp) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err) + } + case *ThanosSeriesResponse: + sp.LogFields(otlog.Int("series", len(resp.Data))) + b, err = json.Marshal(resp) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err) + } + default: + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format") + } + + sp.LogFields(otlog.Int("bytes", len(b))) + resp := http.Response{ + Header: http.Header{ + "Content-Type": []string{"application/json"}, + }, + Body: ioutil.NopCloser(bytes.NewBuffer(b)), + StatusCode: http.StatusOK, + } + return &resp, nil +} + +func (c labelsCodec) parseLabelsRequest(r *http.Request, op string) (queryrange.Request, error) { + var ( + result ThanosLabelsRequest + err error + ) + result.Start, result.End, err = parseMetadataTimeRange(r, c.defaultMetadataTimeRange) + if err != nil { + return nil, err + } + + result.PartialResponse, err = parsePartialResponseParam(r.FormValue(queryv1.PartialResponseParam), c.partialResponse) + if err != nil { + return nil, err + } + + result.StoreMatchers, err = parseMatchersParam(r.Form[queryv1.StoreMatcherParam]) + if err != nil { + return nil, err + } + + result.Path = r.URL.Path + + if op == labelValuesOp { + parts := strings.Split(r.URL.Path, "/") + if len(parts) > 1 { + result.Label = parts[len(parts)-2] + } + } + + for _, value := range r.Header.Values(cacheControlHeader) { + if strings.Contains(value, noStoreValue) { + result.CachingOptions.Disabled = true + break + } + } + + return &result, nil +} + +func (c labelsCodec) parseSeriesRequest(r *http.Request) (queryrange.Request, error) { + var ( + result ThanosSeriesRequest + err error + ) + result.Start, result.End, err = parseMetadataTimeRange(r, c.defaultMetadataTimeRange) + if err != nil { + return nil, err + } + + result.Matchers, err = parseMatchersParam(r.Form[queryv1.MatcherParam]) + if err != nil { + return nil, err + } + + result.Dedup, err = parseEnableDedupParam(r.FormValue(queryv1.DedupParam)) + if err != nil { + return nil, err + } + + result.PartialResponse, err = parsePartialResponseParam(r.FormValue(queryv1.PartialResponseParam), c.partialResponse) + if err != nil { + return nil, err + } + + if len(r.Form[queryv1.ReplicaLabelsParam]) > 0 { + result.ReplicaLabels = r.Form[queryv1.ReplicaLabelsParam] + } + + result.StoreMatchers, err = parseMatchersParam(r.Form[queryv1.StoreMatcherParam]) + if err != nil { + return nil, err + } + + result.Path = r.URL.Path + + for _, value := range r.Header.Values(cacheControlHeader) { + if strings.Contains(value, noStoreValue) { + result.CachingOptions.Disabled = true + break + } + } + + return &result, nil +} + +func parseMetadataTimeRange(r *http.Request, defaultMetadataTimeRange time.Duration) (int64, int64, error) { + // If start and end time not specified as query parameter, we get the range from the beginning of time by default. + var defaultStartTime, defaultEndTime time.Time + if defaultMetadataTimeRange == 0 { + defaultStartTime = infMinTime + defaultEndTime = infMaxTime + } else { + now := time.Now() + defaultStartTime = now.Add(-defaultMetadataTimeRange) + defaultEndTime = now + } + + start, err := parseTimeParam(r, "start", defaultStartTime) + if err != nil { + return 0, 0, err + } + end, err := parseTimeParam(r, "end", defaultEndTime) + if err != nil { + return 0, 0, err + } + if end < start { + return 0, 0, errEndBeforeStart + } + + return start, end, nil +} + +func parseTimeParam(r *http.Request, paramName string, defaultValue time.Time) (int64, error) { + val := r.FormValue(paramName) + if val == "" { + return timestamp.FromTime(defaultValue), nil + } + result, err := cortexutil.ParseTime(val) + if err != nil { + return 0, err + } + return result, nil +} diff --git a/pkg/queryfrontend/labels_codec_test.go b/pkg/queryfrontend/labels_codec_test.go new file mode 100644 index 00000000000..9bc00e53e0e --- /dev/null +++ b/pkg/queryfrontend/labels_codec_test.go @@ -0,0 +1,366 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package queryfrontend + +import ( + "bytes" + "context" + "encoding/json" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/weaveworks/common/httpgrpc" + + queryv1 "github.com/thanos-io/thanos/pkg/api/query" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestLabelsCodec_DecodeRequest(t *testing.T) { + for _, tc := range []struct { + name string + url string + partialResponse bool + expectedError error + expectedRequest ThanosRequest + }{ + { + name: "label_names cannot parse start", + url: "/api/v1/labels?start=foo", + partialResponse: false, + expectedError: httpgrpc.Errorf(http.StatusBadRequest, `cannot parse "foo" to a valid timestamp`), + }, + { + name: "label_values cannot parse start", + url: "/api/v1/label/__name__/values?start=foo", + partialResponse: false, + expectedError: httpgrpc.Errorf(http.StatusBadRequest, `cannot parse "foo" to a valid timestamp`), + }, + { + name: "series cannot parse start", + url: "/api/v1/series?start=foo", + partialResponse: false, + expectedError: httpgrpc.Errorf(http.StatusBadRequest, `cannot parse "foo" to a valid timestamp`), + }, + { + name: "label_names cannot parse end", + url: "/api/v1/labels?start=123&end=bar", + partialResponse: false, + expectedError: httpgrpc.Errorf(http.StatusBadRequest, `cannot parse "bar" to a valid timestamp`), + }, + { + name: "label_values cannot parse end", + url: "/api/v1/label/__name__/values?start=123&end=bar", + partialResponse: false, + expectedError: httpgrpc.Errorf(http.StatusBadRequest, `cannot parse "bar" to a valid timestamp`), + }, + { + name: "series cannot parse end", + url: "/api/v1/series?start=123&end=bar", + partialResponse: false, + expectedError: httpgrpc.Errorf(http.StatusBadRequest, `cannot parse "bar" to a valid timestamp`), + }, + { + name: "label_names end before start", + url: "/api/v1/labels?start=123&end=0", + partialResponse: false, + expectedError: errEndBeforeStart, + }, + { + name: "label_values end before start", + url: "/api/v1/label/__name__/values?start=123&end=0", + partialResponse: false, + expectedError: errEndBeforeStart, + }, + { + name: "series end before start", + url: "/api/v1/series?start=123&end=0", + partialResponse: false, + expectedError: errEndBeforeStart, + }, + { + name: "cannot parse partial_response", + url: "/api/v1/labels?start=123&end=456&partial_response=boo", + partialResponse: false, + expectedError: httpgrpc.Errorf(http.StatusBadRequest, "cannot parse parameter partial_response"), + }, + { + name: "label_names partial_response default to true", + url: "/api/v1/labels?start=123&end=456", + partialResponse: true, + expectedRequest: &ThanosLabelsRequest{ + Path: "/api/v1/labels", + Start: 123000, + End: 456000, + PartialResponse: true, + StoreMatchers: [][]*labels.Matcher{}, + }, + }, + { + name: "label_values partial_response default to true", + url: "/api/v1/label/__name__/values?start=123&end=456", + partialResponse: true, + expectedRequest: &ThanosLabelsRequest{ + Path: "/api/v1/label/__name__/values", + Start: 123000, + End: 456000, + PartialResponse: true, + Label: "__name__", + StoreMatchers: [][]*labels.Matcher{}, + }, + }, + { + name: "series partial_response default to true", + url: `/api/v1/series?start=123&end=456&match[]={foo="bar"}`, + partialResponse: true, + expectedRequest: &ThanosSeriesRequest{ + Path: "/api/v1/series", + Start: 123000, + End: 456000, + PartialResponse: true, + Dedup: true, + Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + StoreMatchers: [][]*labels.Matcher{}, + }, + }, + { + name: "partial_response default to false, but set to true in query", + url: "/api/v1/labels?start=123&end=456&partial_response=true", + partialResponse: false, + expectedRequest: &ThanosLabelsRequest{ + Path: "/api/v1/labels", + Start: 123000, + End: 456000, + PartialResponse: true, + StoreMatchers: [][]*labels.Matcher{}, + }, + }, + { + name: "storeMatchers", + url: `/api/v1/labels?start=123&end=456&storeMatch[]={__address__="localhost:10901", cluster="test"}`, + partialResponse: false, + expectedRequest: &ThanosLabelsRequest{ + Path: "/api/v1/labels", + Start: 123000, + End: 456000, + StoreMatchers: [][]*labels.Matcher{ + { + labels.MustNewMatcher(labels.MatchEqual, "__address__", "localhost:10901"), + labels.MustNewMatcher(labels.MatchEqual, "cluster", "test"), + }, + }, + }, + }, + { + name: "series dedup set to false", + url: `/api/v1/series?start=123&dedup=false&end=456&match[]={foo="bar"}`, + partialResponse: false, + expectedRequest: &ThanosSeriesRequest{ + Path: "/api/v1/series", + Start: 123000, + End: 456000, + Dedup: false, + Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + StoreMatchers: [][]*labels.Matcher{}, + }, + }, + { + name: "series replicaLabels", + url: "/api/v1/series?start=123&end=456&replicaLabels[]=foo&replicaLabels[]=bar", + partialResponse: false, + expectedRequest: &ThanosSeriesRequest{ + Path: "/api/v1/series", + Start: 123000, + End: 456000, + Dedup: true, + ReplicaLabels: []string{"foo", "bar"}, + Matchers: [][]*labels.Matcher{}, + StoreMatchers: [][]*labels.Matcher{}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + r, err := http.NewRequest(http.MethodGet, tc.url, nil) + testutil.Ok(t, err) + + codec := NewThanosLabelsCodec(tc.partialResponse, 2*time.Hour) + req, err := codec.DecodeRequest(context.Background(), r) + if tc.expectedError != nil { + testutil.Equals(t, tc.expectedError, err) + } else { + testutil.Ok(t, err) + testutil.Equals(t, tc.expectedRequest, req) + } + }) + } +} + +func TestLabelsCodec_EncodeRequest(t *testing.T) { + const ( + start = "start" + end = "end" + startTime = "123" + endTime = "456" + ) + for _, tc := range []struct { + name string + expectedError error + checkFunc func(r *http.Request) bool + req queryrange.Request + }{ + { + name: "prometheus request, invalid format", + req: &queryrange.PrometheusRequest{}, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format"), + }, + { + name: "thanos query range request, invalid format", + req: &ThanosQueryRangeRequest{}, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format"), + }, + { + name: "thanos labels names request", + req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/labels"}, + checkFunc: func(r *http.Request) bool { + return r.URL.Query().Get(start) == startTime && + r.URL.Query().Get(end) == endTime && + r.URL.Path == "/api/v1/labels" + }, + }, + { + name: "thanos labels values request", + req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values"}, + checkFunc: func(r *http.Request) bool { + return r.URL.Query().Get(start) == startTime && + r.URL.Query().Get(end) == endTime && + r.URL.Path == "/api/v1/label/__name__/values" + }, + }, + { + name: "thanos labels values request, partial response set to true", + req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", PartialResponse: true}, + checkFunc: func(r *http.Request) bool { + return r.URL.Query().Get(start) == startTime && + r.URL.Query().Get(end) == endTime && + r.URL.Path == "/api/v1/label/__name__/values" && + r.URL.Query().Get(queryv1.PartialResponseParam) == "true" + }, + }, + { + name: "thanos series request with empty matchers", + req: &ThanosSeriesRequest{Start: 123000, End: 456000, Path: "/api/v1/series"}, + checkFunc: func(r *http.Request) bool { + return r.URL.Query().Get(start) == startTime && + r.URL.Query().Get(end) == endTime && + r.URL.Path == "/api/v1/series" + }, + }, + { + name: "thanos series request", + req: &ThanosSeriesRequest{ + Start: 123000, + End: 456000, + Path: "/api/v1/series", + Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "cluster", "test")}}, + }, + checkFunc: func(r *http.Request) bool { + return r.URL.Query().Get(start) == startTime && + r.URL.Query().Get(end) == endTime && + r.URL.Query().Get(queryv1.MatcherParam) == `{cluster="test"}` && + r.URL.Path == "/api/v1/series" + }, + }, + { + name: "thanos series request, dedup to true", + req: &ThanosSeriesRequest{ + Start: 123000, + End: 456000, + Path: "/api/v1/series", + Dedup: true, + }, + checkFunc: func(r *http.Request) bool { + return r.URL.Query().Get(start) == startTime && + r.URL.Query().Get(end) == endTime && + r.URL.Query().Get(queryv1.DedupParam) == "true" && + r.URL.Path == "/api/v1/series" + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Default partial response value doesn't matter when encoding requests. + codec := NewThanosLabelsCodec(false, time.Hour*2) + r, err := codec.EncodeRequest(context.TODO(), tc.req) + if tc.expectedError != nil { + testutil.Equals(t, tc.expectedError, err) + } else { + testutil.Ok(t, err) + testutil.Equals(t, true, tc.checkFunc(r)) + } + }) + } +} + +func TestLabelsCodec_DecodeResponse(t *testing.T) { + labelResponse := &ThanosLabelsResponse{ + Status: "success", + Data: []string{"__name__"}, + } + labelsData, err := json.Marshal(labelResponse) + testutil.Ok(t, err) + + seriesResponse := &ThanosSeriesResponse{ + Status: "success", + Data: []labelpb.LabelSet{{Labels: []labelpb.Label{{Name: "foo", Value: "bar"}}}}, + } + seriesData, err := json.Marshal(seriesResponse) + testutil.Ok(t, err) + for _, tc := range []struct { + name string + expectedError error + res http.Response + req queryrange.Request + expectedResponse queryrange.Response + }{ + { + name: "prometheus request, invalid for labelsCodec", + req: &queryrange.PrometheusRequest{}, + res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer([]byte("foo")))}, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "invalid request type"), + }, + { + name: "thanos query range request, invalid for labelsCodec", + req: &ThanosQueryRangeRequest{}, + res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer([]byte("foo")))}, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "invalid request type"), + }, + { + name: "thanos labels request", + req: &ThanosLabelsRequest{}, + res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(labelsData))}, + expectedResponse: labelResponse, + }, + { + name: "thanos series request", + req: &ThanosSeriesRequest{}, + res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(seriesData))}, + expectedResponse: seriesResponse, + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Default partial response value doesn't matter when encoding requests. + codec := NewThanosLabelsCodec(false, time.Hour*2) + r, err := codec.DecodeResponse(context.TODO(), &tc.res, tc.req) + if tc.expectedError != nil { + testutil.Equals(t, err, tc.expectedError) + } else { + testutil.Ok(t, err) + testutil.Equals(t, tc.expectedResponse, r) + } + }) + } +} diff --git a/pkg/queryfrontend/codec.go b/pkg/queryfrontend/queryrange_codec.go similarity index 88% rename from pkg/queryfrontend/codec.go rename to pkg/queryfrontend/queryrange_codec.go index 338e18ef812..cb123d55839 100644 --- a/pkg/queryfrontend/codec.go +++ b/pkg/queryfrontend/queryrange_codec.go @@ -38,21 +38,23 @@ var ( errCannotParse = "cannot parse parameter %s" ) -type codec struct { +// queryRangeCodec is used to encode/decode Thanos query range requests and responses. +type queryRangeCodec struct { queryrange.Codec partialResponse bool } -func NewThanosCodec(partialResponse bool) *codec { - return &codec{ +// NewThanosQueryRangeCodec initializes a queryRangeCodec. +func NewThanosQueryRangeCodec(partialResponse bool) *queryRangeCodec { + return &queryRangeCodec{ Codec: queryrange.PrometheusCodec, partialResponse: partialResponse, } } -func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Request, error) { +func (c queryRangeCodec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Request, error) { var ( - result ThanosRequest + result ThanosQueryRangeRequest err error ) result.Start, err = cortexutil.ParseTime(r.FormValue("start")) @@ -89,7 +91,7 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req return nil, err } - if r.FormValue("max_source_resolution") == "auto" { + if r.FormValue(queryv1.MaxSourceResolutionParam) == "auto" { result.AutoDownsampling = true result.MaxSourceResolution = result.Step / 5 } else { @@ -108,7 +110,7 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req result.ReplicaLabels = r.Form[queryv1.ReplicaLabelsParam] } - result.StoreMatchers, err = parseStoreMatchersParam(r.Form[queryv1.StoreMatcherParam]) + result.StoreMatchers, err = parseMatchersParam(r.Form[queryv1.StoreMatcherParam]) if err != nil { return nil, err } @@ -126,8 +128,8 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req return &result, nil } -func (c codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) { - thanosReq, ok := r.(*ThanosRequest) +func (c queryRangeCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) { + thanosReq, ok := r.(*ThanosQueryRangeRequest) if !ok { return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format") } @@ -224,16 +226,16 @@ func parsePartialResponseParam(s string, defaultEnablePartialResponse bool) (boo return defaultEnablePartialResponse, nil } -func parseStoreMatchersParam(ss []string) ([][]*labels.Matcher, error) { - storeMatchers := make([][]*labels.Matcher, 0, len(ss)) +func parseMatchersParam(ss []string) ([][]*labels.Matcher, error) { + matchers := make([][]*labels.Matcher, 0, len(ss)) for _, s := range ss { - matchers, err := parser.ParseMetricSelector(s) + ms, err := parser.ParseMetricSelector(s) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.StoreMatcherParam) } - storeMatchers = append(storeMatchers, matchers) + matchers = append(matchers, ms) } - return storeMatchers, nil + return matchers, nil } func encodeTime(t int64) string { diff --git a/pkg/queryfrontend/codec_test.go b/pkg/queryfrontend/queryrange_codec_test.go similarity index 92% rename from pkg/queryfrontend/codec_test.go rename to pkg/queryfrontend/queryrange_codec_test.go index 885c06f00fc..e1392dd9635 100644 --- a/pkg/queryfrontend/codec_test.go +++ b/pkg/queryfrontend/queryrange_codec_test.go @@ -18,13 +18,13 @@ import ( "github.com/thanos-io/thanos/pkg/testutil" ) -func TestCodec_DecodeRequest(t *testing.T) { +func TestQueryRangeCodec_DecodeRequest(t *testing.T) { for _, tc := range []struct { name string url string partialResponse bool expectedError error - expectedRequest *ThanosRequest + expectedRequest *ThanosQueryRangeRequest }{ { name: "instant query, no params set", @@ -89,7 +89,7 @@ func TestCodec_DecodeRequest(t *testing.T) { { name: "auto downsampling enabled", url: "/api/v1/query_range?start=123&end=456&step=10&max_source_resolution=auto", - expectedRequest: &ThanosRequest{ + expectedRequest: &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 123000, End: 456000, @@ -110,7 +110,7 @@ func TestCodec_DecodeRequest(t *testing.T) { name: "partial_response default to true", url: "/api/v1/query_range?start=123&end=456&step=1", partialResponse: true, - expectedRequest: &ThanosRequest{ + expectedRequest: &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 123000, End: 456000, @@ -124,7 +124,7 @@ func TestCodec_DecodeRequest(t *testing.T) { name: "partial_response default to false, but set to true in query", url: "/api/v1/query_range?start=123&end=456&step=1&partial_response=true", partialResponse: false, - expectedRequest: &ThanosRequest{ + expectedRequest: &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 123000, End: 456000, @@ -138,7 +138,7 @@ func TestCodec_DecodeRequest(t *testing.T) { name: "replicaLabels", url: "/api/v1/query_range?start=123&end=456&step=1&replicaLabels[]=foo&replicaLabels[]=bar", partialResponse: false, - expectedRequest: &ThanosRequest{ + expectedRequest: &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 123000, End: 456000, @@ -152,7 +152,7 @@ func TestCodec_DecodeRequest(t *testing.T) { name: "storeMatchers", url: `/api/v1/query_range?start=123&end=456&step=1&storeMatch[]={__address__="localhost:10901", cluster="test"}`, partialResponse: false, - expectedRequest: &ThanosRequest{ + expectedRequest: &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 123000, End: 456000, @@ -171,7 +171,7 @@ func TestCodec_DecodeRequest(t *testing.T) { r, err := http.NewRequest(http.MethodGet, tc.url, nil) testutil.Ok(t, err) - codec := NewThanosCodec(tc.partialResponse) + codec := NewThanosQueryRangeCodec(tc.partialResponse) req, err := codec.DecodeRequest(context.Background(), r) if tc.expectedError != nil { testutil.Equals(t, err, tc.expectedError) @@ -183,7 +183,7 @@ func TestCodec_DecodeRequest(t *testing.T) { } } -func TestCodec_EncodeRequest(t *testing.T) { +func TestQueryRangeCodec_EncodeRequest(t *testing.T) { for _, tc := range []struct { name string expectedError error @@ -197,7 +197,7 @@ func TestCodec_EncodeRequest(t *testing.T) { }, { name: "normal thanos request", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Start: 123000, End: 456000, Step: 1000, @@ -210,7 +210,7 @@ func TestCodec_EncodeRequest(t *testing.T) { }, { name: "Dedup enabled", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Start: 123000, End: 456000, Step: 1000, @@ -225,7 +225,7 @@ func TestCodec_EncodeRequest(t *testing.T) { }, { name: "Partial response set to true", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Start: 123000, End: 456000, Step: 1000, @@ -240,7 +240,7 @@ func TestCodec_EncodeRequest(t *testing.T) { }, { name: "Downsampling resolution set to 5m", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Start: 123000, End: 456000, Step: 1000, @@ -255,7 +255,7 @@ func TestCodec_EncodeRequest(t *testing.T) { }, { name: "Downsampling resolution set to 1h", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Start: 123000, End: 456000, Step: 1000, @@ -271,7 +271,7 @@ func TestCodec_EncodeRequest(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { // Default partial response value doesn't matter when encoding requests. - codec := NewThanosCodec(false) + codec := NewThanosQueryRangeCodec(false) r, err := codec.EncodeRequest(context.TODO(), tc.req) if tc.expectedError != nil { testutil.Equals(t, err, tc.expectedError) diff --git a/pkg/queryfrontend/request.go b/pkg/queryfrontend/request.go index 0bed4881f87..2074815ac66 100644 --- a/pkg/queryfrontend/request.go +++ b/pkg/queryfrontend/request.go @@ -13,7 +13,13 @@ import ( "github.com/prometheus/prometheus/pkg/timestamp" ) -type ThanosRequest struct { +// TODO(yeya24): add partial result when needed. +// ThanosRequest is a common interface defined for specific thanos requests. +type ThanosRequest interface { + GetStoreMatchers() [][]*labels.Matcher +} + +type ThanosQueryRangeRequest struct { Path string Start int64 End int64 @@ -30,31 +36,23 @@ type ThanosRequest struct { } // GetStart returns the start timestamp of the request in milliseconds. -func (r *ThanosRequest) GetStart() int64 { - return r.Start -} +func (r *ThanosQueryRangeRequest) GetStart() int64 { return r.Start } // GetEnd returns the end timestamp of the request in milliseconds. -func (r *ThanosRequest) GetEnd() int64 { - return r.End -} +func (r *ThanosQueryRangeRequest) GetEnd() int64 { return r.End } // GetStep returns the step of the request in milliseconds. -func (r *ThanosRequest) GetStep() int64 { - return r.Step -} +func (r *ThanosQueryRangeRequest) GetStep() int64 { return r.Step } // GetQuery returns the query of the request. -func (r *ThanosRequest) GetQuery() string { - return r.Query -} +func (r *ThanosQueryRangeRequest) GetQuery() string { return r.Query } -func (r *ThanosRequest) GetCachingOptions() queryrange.CachingOptions { +func (r *ThanosQueryRangeRequest) GetCachingOptions() queryrange.CachingOptions { return r.CachingOptions } // WithStartEnd clone the current request with different start and end timestamp. -func (r *ThanosRequest) WithStartEnd(start int64, end int64) queryrange.Request { +func (r *ThanosQueryRangeRequest) WithStartEnd(start int64, end int64) queryrange.Request { q := *r q.Start = start q.End = end @@ -62,14 +60,14 @@ func (r *ThanosRequest) WithStartEnd(start int64, end int64) queryrange.Request } // WithQuery clone the current request with a different query. -func (r *ThanosRequest) WithQuery(query string) queryrange.Request { +func (r *ThanosQueryRangeRequest) WithQuery(query string) queryrange.Request { q := *r q.Query = query return &q } // LogToSpan writes information about this request to an OpenTracing span. -func (r *ThanosRequest) LogToSpan(sp opentracing.Span) { +func (r *ThanosQueryRangeRequest) LogToSpan(sp opentracing.Span) { fields := []otlog.Field{ otlog.String("query", r.GetQuery()), otlog.String("start", timestamp.Time(r.GetStart()).String()), @@ -88,12 +86,150 @@ func (r *ThanosRequest) LogToSpan(sp opentracing.Span) { // Reset implements proto.Message interface required by queryrange.Request, // which is not used in thanos. -func (r *ThanosRequest) Reset() {} +func (r *ThanosQueryRangeRequest) Reset() {} + +// String implements proto.Message interface required by queryrange.Request, +// which is not used in thanos. +func (r *ThanosQueryRangeRequest) String() string { return "" } + +// ProtoMessage implements proto.Message interface required by queryrange.Request, +// which is not used in thanos. +func (r *ThanosQueryRangeRequest) ProtoMessage() {} + +func (r *ThanosQueryRangeRequest) GetStoreMatchers() [][]*labels.Matcher { return r.StoreMatchers } + +type ThanosLabelsRequest struct { + Start int64 + End int64 + Label string + Path string + StoreMatchers [][]*labels.Matcher + PartialResponse bool + CachingOptions queryrange.CachingOptions +} + +// GetStart returns the start timestamp of the request in milliseconds. +func (r *ThanosLabelsRequest) GetStart() int64 { return r.Start } + +// GetEnd returns the end timestamp of the request in milliseconds. +func (r *ThanosLabelsRequest) GetEnd() int64 { return r.End } + +// GetStep returns the step of the request in milliseconds. +func (r *ThanosLabelsRequest) GetStep() int64 { return 0 } + +// GetQuery returns the query of the request. +func (r *ThanosLabelsRequest) GetQuery() string { return "" } + +func (r *ThanosLabelsRequest) GetCachingOptions() queryrange.CachingOptions { return r.CachingOptions } + +// WithStartEnd clone the current request with different start and end timestamp. +func (r *ThanosLabelsRequest) WithStartEnd(start int64, end int64) queryrange.Request { + q := *r + q.Start = start + q.End = end + return &q +} + +// WithQuery clone the current request with a different query. +func (r *ThanosLabelsRequest) WithQuery(_ string) queryrange.Request { + q := *r + return &q +} + +// LogToSpan writes information about this request to an OpenTracing span. +func (r *ThanosLabelsRequest) LogToSpan(sp opentracing.Span) { + fields := []otlog.Field{ + otlog.String("start", timestamp.Time(r.GetStart()).String()), + otlog.String("end", timestamp.Time(r.GetEnd()).String()), + otlog.Bool("partial_response", r.PartialResponse), + otlog.Object("storeMatchers", r.StoreMatchers), + } + if r.Label != "" { + otlog.Object("label", r.Label) + } + + sp.LogFields(fields...) +} + +// Reset implements proto.Message interface required by queryrange.Request, +// which is not used in thanos. +func (r *ThanosLabelsRequest) Reset() {} // String implements proto.Message interface required by queryrange.Request, // which is not used in thanos. -func (r *ThanosRequest) String() string { return "" } +func (r *ThanosLabelsRequest) String() string { return "" } // ProtoMessage implements proto.Message interface required by queryrange.Request, // which is not used in thanos. -func (r *ThanosRequest) ProtoMessage() {} +func (r *ThanosLabelsRequest) ProtoMessage() {} + +func (r *ThanosLabelsRequest) GetStoreMatchers() [][]*labels.Matcher { return r.StoreMatchers } + +type ThanosSeriesRequest struct { + Path string + Start int64 + End int64 + Dedup bool + PartialResponse bool + ReplicaLabels []string + Matchers [][]*labels.Matcher + StoreMatchers [][]*labels.Matcher + CachingOptions queryrange.CachingOptions +} + +// GetStart returns the start timestamp of the request in milliseconds. +func (r *ThanosSeriesRequest) GetStart() int64 { return r.Start } + +// GetEnd returns the end timestamp of the request in milliseconds. +func (r *ThanosSeriesRequest) GetEnd() int64 { return r.End } + +// GetStep returns the step of the request in milliseconds. +func (r *ThanosSeriesRequest) GetStep() int64 { return 0 } + +// GetQuery returns the query of the request. +func (r *ThanosSeriesRequest) GetQuery() string { return "" } + +func (r *ThanosSeriesRequest) GetCachingOptions() queryrange.CachingOptions { return r.CachingOptions } + +// WithStartEnd clone the current request with different start and end timestamp. +func (r *ThanosSeriesRequest) WithStartEnd(start int64, end int64) queryrange.Request { + q := *r + q.Start = start + q.End = end + return &q +} + +// WithQuery clone the current request with a different query. +func (r *ThanosSeriesRequest) WithQuery(_ string) queryrange.Request { + q := *r + return &q +} + +// LogToSpan writes information about this request to an OpenTracing span. +func (r *ThanosSeriesRequest) LogToSpan(sp opentracing.Span) { + fields := []otlog.Field{ + otlog.String("start", timestamp.Time(r.GetStart()).String()), + otlog.String("end", timestamp.Time(r.GetEnd()).String()), + otlog.Bool("dedup", r.Dedup), + otlog.Bool("partial_response", r.PartialResponse), + otlog.Object("replicaLabels", r.ReplicaLabels), + otlog.Object("matchers", r.Matchers), + otlog.Object("storeMatchers", r.StoreMatchers), + } + + sp.LogFields(fields...) +} + +// Reset implements proto.Message interface required by queryrange.Request, +// which is not used in thanos. +func (r *ThanosSeriesRequest) Reset() {} + +// String implements proto.Message interface required by queryrange.Request, +// which is not used in thanos. +func (r *ThanosSeriesRequest) String() string { return "" } + +// ProtoMessage implements proto.Message interface required by queryrange.Request, +// which is not used in thanos. +func (r *ThanosSeriesRequest) ProtoMessage() {} + +func (r *ThanosSeriesRequest) GetStoreMatchers() [][]*labels.Matcher { return r.StoreMatchers } diff --git a/pkg/queryfrontend/response.pb.go b/pkg/queryfrontend/response.pb.go new file mode 100644 index 00000000000..ffceb01bc09 --- /dev/null +++ b/pkg/queryfrontend/response.pb.go @@ -0,0 +1,770 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: queryfrontend/response.proto + +package queryfrontend + +import ( + fmt "fmt" + io "io" + math "math" + math_bits "math/bits" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + _ "github.com/thanos-io/thanos/pkg/store/labelpb" + github_com_thanos_io_thanos_pkg_store_labelpb "github.com/thanos-io/thanos/pkg/store/labelpb" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type ThanosLabelsResponse struct { + Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"status"` + Data []string `protobuf:"bytes,2,rep,name=Data,proto3" json:"data"` + ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"` + Error string `protobuf:"bytes,4,opt,name=Error,proto3" json:"error,omitempty"` +} + +func (m *ThanosLabelsResponse) Reset() { *m = ThanosLabelsResponse{} } +func (m *ThanosLabelsResponse) String() string { return proto.CompactTextString(m) } +func (*ThanosLabelsResponse) ProtoMessage() {} +func (*ThanosLabelsResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_b882fa7024d92f38, []int{0} +} +func (m *ThanosLabelsResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ThanosLabelsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ThanosLabelsResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ThanosLabelsResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ThanosLabelsResponse.Merge(m, src) +} +func (m *ThanosLabelsResponse) XXX_Size() int { + return m.Size() +} +func (m *ThanosLabelsResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ThanosLabelsResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ThanosLabelsResponse proto.InternalMessageInfo + +type ThanosSeriesResponse struct { + Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"status"` + Data []github_com_thanos_io_thanos_pkg_store_labelpb.LabelSet `protobuf:"bytes,2,rep,name=Data,proto3,customtype=github.com/thanos-io/thanos/pkg/store/labelpb.LabelSet" json:"data"` + ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"` + Error string `protobuf:"bytes,4,opt,name=Error,proto3" json:"error,omitempty"` +} + +func (m *ThanosSeriesResponse) Reset() { *m = ThanosSeriesResponse{} } +func (m *ThanosSeriesResponse) String() string { return proto.CompactTextString(m) } +func (*ThanosSeriesResponse) ProtoMessage() {} +func (*ThanosSeriesResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_b882fa7024d92f38, []int{1} +} +func (m *ThanosSeriesResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ThanosSeriesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ThanosSeriesResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ThanosSeriesResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ThanosSeriesResponse.Merge(m, src) +} +func (m *ThanosSeriesResponse) XXX_Size() int { + return m.Size() +} +func (m *ThanosSeriesResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ThanosSeriesResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ThanosSeriesResponse proto.InternalMessageInfo + +func init() { + proto.RegisterType((*ThanosLabelsResponse)(nil), "queryfrontend.ThanosLabelsResponse") + proto.RegisterType((*ThanosSeriesResponse)(nil), "queryfrontend.ThanosSeriesResponse") +} + +func init() { proto.RegisterFile("queryfrontend/response.proto", fileDescriptor_b882fa7024d92f38) } + +var fileDescriptor_b882fa7024d92f38 = []byte{ + // 334 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x51, 0x4f, 0x4e, 0xf2, 0x40, + 0x1c, 0x6d, 0x81, 0x8f, 0x7c, 0x8c, 0x31, 0x26, 0x85, 0xc4, 0x4a, 0xc8, 0x94, 0xb0, 0xc2, 0x44, + 0x3b, 0x89, 0x46, 0x0f, 0x50, 0x95, 0x95, 0xab, 0xc2, 0x05, 0xa6, 0x32, 0x96, 0xc6, 0xd2, 0x19, + 0x67, 0x7e, 0x5d, 0xf4, 0x16, 0xc6, 0xe3, 0x78, 0x02, 0x96, 0x2c, 0x8d, 0x8b, 0x46, 0x61, 0xd7, + 0x53, 0x18, 0xa6, 0x34, 0xc2, 0xd2, 0x8d, 0xbb, 0xdf, 0xbc, 0x7f, 0x79, 0x79, 0x83, 0x7a, 0xcf, + 0x29, 0x93, 0xd9, 0xa3, 0xe4, 0x09, 0xb0, 0x64, 0x4a, 0x24, 0x53, 0x82, 0x27, 0x8a, 0xb9, 0x42, + 0x72, 0xe0, 0xd6, 0xe1, 0x1e, 0xdb, 0xed, 0x84, 0x3c, 0xe4, 0x9a, 0x21, 0x9b, 0xab, 0x14, 0x75, + 0x4f, 0x14, 0x70, 0xc9, 0x48, 0x4c, 0x03, 0x16, 0x8b, 0x80, 0x40, 0x26, 0x98, 0x2a, 0xa9, 0xc1, + 0x9b, 0x89, 0x3a, 0x93, 0x19, 0x4d, 0xb8, 0xba, 0xdf, 0xb0, 0xca, 0xdf, 0xc6, 0x5b, 0x03, 0xd4, + 0x1c, 0x03, 0x85, 0x54, 0xd9, 0x66, 0xdf, 0x1c, 0xb6, 0x3c, 0x54, 0xe4, 0x4e, 0x53, 0x69, 0xc4, + 0xdf, 0x32, 0x56, 0x0f, 0x35, 0x6e, 0x29, 0x50, 0xbb, 0xd6, 0xaf, 0x0f, 0x5b, 0xde, 0xff, 0x22, + 0x77, 0x1a, 0x53, 0x0a, 0xd4, 0xd7, 0xa8, 0x75, 0x85, 0x5a, 0x77, 0x52, 0x72, 0x39, 0xc9, 0x04, + 0xb3, 0xeb, 0x3a, 0xe4, 0xb8, 0xc8, 0x9d, 0x36, 0xab, 0xc0, 0x33, 0x3e, 0x8f, 0x80, 0xcd, 0x05, + 0x64, 0xfe, 0x8f, 0xd2, 0x3a, 0x45, 0xff, 0xf4, 0xc3, 0x6e, 0x68, 0x4b, 0xbb, 0xc8, 0x9d, 0x23, + 0x6d, 0xd9, 0x91, 0x97, 0x8a, 0xc1, 0x6b, 0xad, 0x2a, 0x3f, 0x66, 0x32, 0x62, 0xbf, 0x2b, 0x0f, + 0x3b, 0xe5, 0x0f, 0x2e, 0x6c, 0x17, 0x74, 0x90, 0x3b, 0x4a, 0xe3, 0xf8, 0x86, 0x8b, 0x4c, 0xcf, + 0x31, 0x66, 0xe0, 0x8d, 0x16, 0xb9, 0x63, 0x7c, 0xe4, 0xce, 0x75, 0x18, 0xc1, 0x2c, 0x0d, 0xdc, + 0x07, 0x3e, 0x27, 0xa5, 0xf6, 0x3c, 0xe2, 0xdb, 0x8b, 0x88, 0xa7, 0x90, 0xec, 0x8d, 0xec, 0x56, + 0xfe, 0xbf, 0x1e, 0xc5, 0xeb, 0x2d, 0xbe, 0xb0, 0xb1, 0x58, 0x61, 0x73, 0xb9, 0xc2, 0xe6, 0xe7, + 0x0a, 0x9b, 0x2f, 0x6b, 0x6c, 0x2c, 0xd7, 0xd8, 0x78, 0x5f, 0x63, 0x23, 0x68, 0xea, 0x6f, 0xbf, + 0xfc, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x6b, 0xcd, 0xb0, 0xaa, 0x56, 0x02, 0x00, 0x00, +} + +func (m *ThanosLabelsResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ThanosLabelsResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ThanosLabelsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Error) > 0 { + i -= len(m.Error) + copy(dAtA[i:], m.Error) + i = encodeVarintResponse(dAtA, i, uint64(len(m.Error))) + i-- + dAtA[i] = 0x22 + } + if len(m.ErrorType) > 0 { + i -= len(m.ErrorType) + copy(dAtA[i:], m.ErrorType) + i = encodeVarintResponse(dAtA, i, uint64(len(m.ErrorType))) + i-- + dAtA[i] = 0x1a + } + if len(m.Data) > 0 { + for iNdEx := len(m.Data) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Data[iNdEx]) + copy(dAtA[i:], m.Data[iNdEx]) + i = encodeVarintResponse(dAtA, i, uint64(len(m.Data[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if len(m.Status) > 0 { + i -= len(m.Status) + copy(dAtA[i:], m.Status) + i = encodeVarintResponse(dAtA, i, uint64(len(m.Status))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *ThanosSeriesResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ThanosSeriesResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ThanosSeriesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Error) > 0 { + i -= len(m.Error) + copy(dAtA[i:], m.Error) + i = encodeVarintResponse(dAtA, i, uint64(len(m.Error))) + i-- + dAtA[i] = 0x22 + } + if len(m.ErrorType) > 0 { + i -= len(m.ErrorType) + copy(dAtA[i:], m.ErrorType) + i = encodeVarintResponse(dAtA, i, uint64(len(m.ErrorType))) + i-- + dAtA[i] = 0x1a + } + if len(m.Data) > 0 { + for iNdEx := len(m.Data) - 1; iNdEx >= 0; iNdEx-- { + { + size := m.Data[iNdEx].Size() + i -= size + if _, err := m.Data[iNdEx].MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + i = encodeVarintResponse(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if len(m.Status) > 0 { + i -= len(m.Status) + copy(dAtA[i:], m.Status) + i = encodeVarintResponse(dAtA, i, uint64(len(m.Status))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintResponse(dAtA []byte, offset int, v uint64) int { + offset -= sovResponse(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *ThanosLabelsResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Status) + if l > 0 { + n += 1 + l + sovResponse(uint64(l)) + } + if len(m.Data) > 0 { + for _, s := range m.Data { + l = len(s) + n += 1 + l + sovResponse(uint64(l)) + } + } + l = len(m.ErrorType) + if l > 0 { + n += 1 + l + sovResponse(uint64(l)) + } + l = len(m.Error) + if l > 0 { + n += 1 + l + sovResponse(uint64(l)) + } + return n +} + +func (m *ThanosSeriesResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Status) + if l > 0 { + n += 1 + l + sovResponse(uint64(l)) + } + if len(m.Data) > 0 { + for _, e := range m.Data { + l = e.Size() + n += 1 + l + sovResponse(uint64(l)) + } + } + l = len(m.ErrorType) + if l > 0 { + n += 1 + l + sovResponse(uint64(l)) + } + l = len(m.Error) + if l > 0 { + n += 1 + l + sovResponse(uint64(l)) + } + return n +} + +func sovResponse(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozResponse(x uint64) (n int) { + return sovResponse(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *ThanosLabelsResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ThanosLabelsResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ThanosLabelsResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Status = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ErrorType", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ErrorType = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Error = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipResponse(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthResponse + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthResponse + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ThanosSeriesResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ThanosSeriesResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ThanosSeriesResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Status = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data, github_com_thanos_io_thanos_pkg_store_labelpb.LabelSet{}) + if err := m.Data[len(m.Data)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ErrorType", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ErrorType = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Error = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipResponse(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthResponse + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthResponse + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipResponse(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowResponse + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowResponse + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowResponse + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthResponse + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupResponse + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthResponse + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthResponse = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowResponse = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupResponse = fmt.Errorf("proto: unexpected end of group") +) diff --git a/pkg/queryfrontend/response.proto b/pkg/queryfrontend/response.proto new file mode 100644 index 00000000000..85cf4382a47 --- /dev/null +++ b/pkg/queryfrontend/response.proto @@ -0,0 +1,35 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +syntax = "proto3"; + +package queryfrontend; + +import "gogoproto/gogo.proto"; +import "store/labelpb/types.proto"; + +option (gogoproto.sizer_all) = true; +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; + +// Do not generate XXX fields to reduce memory footprint and opening a door +// for zero-copy casts to/from prometheus data types. +option (gogoproto.goproto_unkeyed_all) = false; +option (gogoproto.goproto_unrecognized_all) = false; +option (gogoproto.goproto_sizecache_all) = false; + + +message ThanosLabelsResponse { + string Status = 1 [(gogoproto.jsontag) = "status"]; + repeated string Data = 2 [(gogoproto.jsontag) = "data"]; + string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"]; + string Error = 4 [(gogoproto.jsontag) = "error,omitempty"]; +} + +message ThanosSeriesResponse { + string Status = 1 [(gogoproto.jsontag) = "status"]; + repeated thanos.FullCopyLabelSet Data = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "data", (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.LabelSet"]; + string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"]; + string Error = 4 [(gogoproto.jsontag) = "error,omitempty"]; +} diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index 4d1f290d570..1ca3f857bc7 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -5,6 +5,7 @@ package queryfrontend import ( "net/http" + "regexp" "strings" "time" @@ -20,60 +21,148 @@ import ( const ( // labels used in metrics. - labelQuery = "query" - labelQueryRange = "query_range" + rangeQueryOp = "query_range" + instantQueryOp = "query" + labelNamesOp = "label_names" + labelValuesOp = "label_values" + seriesOp = "series" ) -// NewTripperware returns a Tripperware configured with middlewares to -// limit, align, split,cache requests and retry. -// Not using the cortex one as it uses query parallelisations based on -// storage sharding configuration and query ASTs. -func NewTripperware( - config Config, - reg prometheus.Registerer, - logger log.Logger, -) (frontend.Tripperware, error) { +// NewTripperware returns a Tripperware which sends requests to different sub tripperwares based on the query type. +func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger) (frontend.Tripperware, error) { + var ( + queryRangeLimits, labelsLimits queryrange.Limits + err error + ) + if config.QueryRangeConfig.Limits != nil { + queryRangeLimits, err = validation.NewOverrides(*config.QueryRangeConfig.Limits, nil) + if err != nil { + return nil, errors.Wrap(err, "initialize query range limits") + } + } - queriesCount := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_query_frontend_queries_total", - Help: "Total queries passing through query frontend", - }, []string{"op"}) - queriesCount.WithLabelValues(labelQuery) - queriesCount.WithLabelValues(labelQueryRange) + if config.LabelsConfig.Limits != nil { + labelsLimits, err = validation.NewOverrides(*config.LabelsConfig.Limits, nil) + if err != nil { + return nil, errors.Wrap(err, "initialize labels limits") + } + } - limits, err := validation.NewOverrides(*config.CortexLimits, nil) + queryRangeCodec := NewThanosQueryRangeCodec(config.QueryRangeConfig.PartialResponseStrategy) + labelsCodec := NewThanosLabelsCodec(config.LabelsConfig.PartialResponseStrategy, config.DefaultTimeRange) + + queryRangeTripperware, err := newQueryRangeTripperware(config.QueryRangeConfig, queryRangeLimits, queryRangeCodec, + prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_range"}, reg), logger) if err != nil { - return nil, errors.Wrap(err, "initialize limits") + return nil, err + } + + labelsTripperware := newLabelsTripperware(config.LabelsConfig, labelsLimits, labelsCodec, + prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "labels"}, reg), logger) + + return func(next http.RoundTripper) http.RoundTripper { + return newRoundTripper(next, queryRangeTripperware(next), labelsTripperware(next), reg) + }, nil +} + +type roundTripper struct { + next, queryRange, labels http.RoundTripper + + queriesCount *prometheus.CounterVec +} + +func newRoundTripper(next, queryRange, metadata http.RoundTripper, reg prometheus.Registerer) roundTripper { + r := roundTripper{ + next: next, + queryRange: queryRange, + labels: metadata, + queriesCount: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_query_frontend_queries_total", + Help: "Total queries passing through query frontend", + }, []string{"op"}), + } + + r.queriesCount.WithLabelValues(instantQueryOp) + r.queriesCount.WithLabelValues(rangeQueryOp) + r.queriesCount.WithLabelValues(labelNamesOp) + r.queriesCount.WithLabelValues(labelValuesOp) + r.queriesCount.WithLabelValues(seriesOp) + return r +} + +func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + switch op := getOperation(req); op { + case instantQueryOp: + r.queriesCount.WithLabelValues(instantQueryOp).Inc() + case rangeQueryOp: + r.queriesCount.WithLabelValues(rangeQueryOp).Inc() + return r.queryRange.RoundTrip(req) + case labelNamesOp, labelValuesOp, seriesOp: + r.queriesCount.WithLabelValues(op).Inc() + return r.labels.RoundTrip(req) + default: + } + + return r.next.RoundTrip(req) +} + +func getOperation(r *http.Request) string { + if r.Method == http.MethodGet || r.Method == http.MethodPost { + switch { + case strings.HasSuffix(r.URL.Path, "/api/v1/query"): + return instantQueryOp + case strings.HasSuffix(r.URL.Path, "/api/v1/query_range"): + return rangeQueryOp + case strings.HasSuffix(r.URL.Path, "/api/v1/labels"): + return labelNamesOp + case strings.HasSuffix(r.URL.Path, "/api/v1/series"): + return seriesOp + default: + matched, err := regexp.MatchString("/api/v1/label/.+/values$", r.URL.Path) + if err == nil && matched { + return labelValuesOp + } + } } - metrics := queryrange.NewInstrumentMiddlewareMetrics(reg) + return "" +} + +// newQueryRangeTripperware returns a Tripperware for range queries configured with middlewares of +// limit, step align, split by interval, cache requests and retry. +func newQueryRangeTripperware( + config QueryRangeConfig, + limits queryrange.Limits, + codec *queryRangeCodec, + reg prometheus.Registerer, + logger log.Logger, +) (frontend.Tripperware, error) { queryRangeMiddleware := []queryrange.Middleware{queryrange.LimitsMiddleware(limits)} + m := queryrange.NewInstrumentMiddlewareMetrics(reg) // step align middleware. queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("step_align", metrics), + queryrange.InstrumentMiddleware("step_align", m), queryrange.StepAlignMiddleware, ) - codec := NewThanosCodec(config.PartialResponseStrategy) + queryIntervalFn := func(_ queryrange.Request) time.Duration { + return config.SplitQueriesByInterval + } if config.SplitQueriesByInterval != 0 { - // TODO(yeya24): make interval dynamic in next pr. - queryIntervalFn := func(_ queryrange.Request) time.Duration { - return config.SplitQueriesByInterval - } queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("split_by_interval", metrics), - queryrange.SplitByIntervalMiddleware(queryIntervalFn, limits, codec, reg), + queryrange.InstrumentMiddleware("split_by_interval", m), + SplitByIntervalMiddleware(queryIntervalFn, limits, codec, reg), ) } - if config.CortexResultsCacheConfig != nil { + if config.ResultsCacheConfig != nil { queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware( logger, - *config.CortexResultsCacheConfig, + *config.ResultsCacheConfig, newThanosCacheKeyGenerator(config.SplitQueriesByInterval), limits, codec, @@ -88,7 +177,7 @@ func NewTripperware( queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("results_cache", metrics), + queryrange.InstrumentMiddleware("results_cache", m), queryCacheMiddleware, ) } @@ -96,33 +185,62 @@ func NewTripperware( if config.MaxRetries > 0 { queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("retry", metrics), + queryrange.InstrumentMiddleware("retry", m), queryrange.NewRetryMiddleware(logger, config.MaxRetries, queryrange.NewRetryMiddlewareMetrics(reg)), ) } return func(next http.RoundTripper) http.RoundTripper { - queryRangeTripper := queryrange.NewRoundTripper(next, codec, queryRangeMiddleware...) + rt := queryrange.NewRoundTripper(next, codec, queryRangeMiddleware...) return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) { - if strings.HasSuffix(r.URL.Path, "/api/v1/query") { - if r.Method == http.MethodGet || r.Method == http.MethodPost { - queriesCount.WithLabelValues(labelQuery).Inc() - } - } else if strings.HasSuffix(r.URL.Path, "/api/v1/query_range") { - if r.Method == http.MethodGet || r.Method == http.MethodPost { - queriesCount.WithLabelValues(labelQueryRange).Inc() - return queryRangeTripper.RoundTrip(r) - } - } - return next.RoundTrip(r) + return rt.RoundTrip(r) }) }, nil } +// newLabelsTripperware returns a Tripperware for labels and series requests +// configured with middlewares of split by interval and retry. +func newLabelsTripperware( + config LabelsConfig, + limits queryrange.Limits, + codec *labelsCodec, + reg prometheus.Registerer, + logger log.Logger, +) frontend.Tripperware { + labelsMiddleware := []queryrange.Middleware{} + m := queryrange.NewInstrumentMiddlewareMetrics(reg) + + queryIntervalFn := func(_ queryrange.Request) time.Duration { + return config.SplitQueriesByInterval + } + + if config.SplitQueriesByInterval != 0 { + labelsMiddleware = append( + labelsMiddleware, + queryrange.InstrumentMiddleware("split_interval", m), + SplitByIntervalMiddleware(queryIntervalFn, limits, codec, reg), + ) + } + + if config.MaxRetries > 0 { + labelsMiddleware = append( + labelsMiddleware, + queryrange.InstrumentMiddleware("retry", m), + queryrange.NewRetryMiddleware(logger, config.MaxRetries, queryrange.NewRetryMiddlewareMetrics(reg)), + ) + } + return func(next http.RoundTripper) http.RoundTripper { + rt := queryrange.NewRoundTripper(next, codec, labelsMiddleware...) + return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) { + return rt.RoundTrip(r) + }) + } +} + // Don't go to response cache if StoreMatchers are set. func shouldCache(r queryrange.Request) bool { - if thanosReq, ok := r.(*ThanosRequest); ok { - if len(thanosReq.StoreMatchers) > 0 { + if thanosReq, ok := r.(ThanosRequest); ok { + if len(thanosReq.GetStoreMatchers()) > 0 { return false } } diff --git a/pkg/queryfrontend/roundtrip_test.go b/pkg/queryfrontend/roundtrip_test.go index 047c256d215..a6efcf391eb 100644 --- a/pkg/queryfrontend/roundtrip_test.go +++ b/pkg/queryfrontend/roundtrip_test.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/user" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -69,82 +70,153 @@ func (r *fakeRoundTripper) RoundTrip(h *http.Request) (*http.Response, error) { // TestRoundTripRetryMiddleware tests the retry middleware. func TestRoundTripRetryMiddleware(t *testing.T) { - testRequest := &ThanosRequest{ + testRequest := &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 0, End: 2 * hour, Step: 10 * seconds, } + testLabelsRequest := &ThanosLabelsRequest{Path: "/api/v1/labels", Start: 0, End: 2 * hour} + testSeriesRequest := &ThanosSeriesRequest{ + Path: "/api/v1/series", + Start: 0, + End: 2 * hour, + Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + } + + queryRangeCodec := NewThanosQueryRangeCodec(true) + labelsCodec := NewThanosLabelsCodec(true, 2*time.Hour) + for _, tc := range []struct { - name string - maxRetries int - req queryrange.Request - fail bool - expected int + name string + maxRetries int + req queryrange.Request + codec queryrange.Codec + handlerFunc func(fail bool) (*int, http.Handler) + fail bool + expected int }{ { name: "not query range, retry won't be triggered 1.", maxRetries: 100, - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Path: "/api/v1/query", Start: 0, End: 2 * hour, Step: 10 * seconds, }, - expected: 1, + codec: queryRangeCodec, + handlerFunc: promqlResults, + expected: 1, }, { - name: "not query range, retry won't be triggered 2.", - maxRetries: 100, - req: &ThanosRequest{ - Path: "/api/v1/labels", - Start: 0, - End: 2 * hour, - Step: 10 * seconds, - }, - expected: 1, + name: "no retry, get counter value 1", + maxRetries: 0, + req: testRequest, + codec: queryRangeCodec, + handlerFunc: promqlResults, + fail: true, + expected: 1, + }, + { + name: "retry set to 1", + maxRetries: 1, + req: testRequest, + codec: queryRangeCodec, + handlerFunc: promqlResults, + fail: true, + expected: 1, + }, + { + name: "retry set to 3", + maxRetries: 3, + fail: true, + req: testRequest, + codec: queryRangeCodec, + handlerFunc: promqlResults, + expected: 3, }, { - name: "no retry, get counter value 1", - maxRetries: 0, - req: testRequest, - fail: true, - expected: 1, + name: "labels requests: no retry, get counter value 1", + maxRetries: 0, + req: testLabelsRequest, + codec: labelsCodec, + handlerFunc: labelsResults, + fail: true, + expected: 1, }, { - name: "retry set to 1", - maxRetries: 1, - req: testRequest, - fail: true, - expected: 1, + name: "labels requests: retry set to 1", + maxRetries: 1, + req: testLabelsRequest, + codec: labelsCodec, + handlerFunc: labelsResults, + fail: true, + expected: 1, }, { - name: "retry set to 3", - maxRetries: 3, - fail: true, - req: testRequest, - expected: 3, + name: "labels requests: retry set to 3", + maxRetries: 3, + fail: true, + req: testLabelsRequest, + codec: labelsCodec, + handlerFunc: labelsResults, + expected: 3, + }, + { + name: "series requests: no retry, get counter value 1", + maxRetries: 0, + req: testSeriesRequest, + codec: labelsCodec, + handlerFunc: seriesResults, + fail: true, + expected: 1, + }, + { + name: "series requests: retry set to 1", + maxRetries: 1, + req: testSeriesRequest, + codec: labelsCodec, + handlerFunc: seriesResults, + fail: true, + expected: 1, + }, + { + name: "series requests: retry set to 3", + maxRetries: 3, + fail: true, + req: testSeriesRequest, + codec: labelsCodec, + handlerFunc: seriesResults, + expected: 3, }, } { t.Run(tc.name, func(t *testing.T) { tpw, err := NewTripperware( Config{ - SplitQueriesByInterval: day, - MaxRetries: tc.maxRetries, - CortexLimits: defaultLimits, + QueryRangeConfig: QueryRangeConfig{ + MaxRetries: tc.maxRetries, + Limits: defaultLimits, + SplitQueriesByInterval: day, + }, + LabelsConfig: LabelsConfig{ + MaxRetries: tc.maxRetries, + Limits: defaultLimits, + SplitQueriesByInterval: day, + }, }, nil, log.NewNopLogger(), ) testutil.Ok(t, err) rt, err := newFakeRoundTripper() testutil.Ok(t, err) - res, handler := promqlResults(tc.fail) + res, handler := tc.handlerFunc(tc.fail) rt.setHandler(handler) ctx := user.InjectOrgID(context.Background(), "1") - httpReq, err := NewThanosCodec(true).EncodeRequest(ctx, tc.req) + httpReq, err := tc.codec.EncodeRequest(ctx, tc.req) testutil.Ok(t, err) _, err = tpw(rt).RoundTrip(httpReq) @@ -158,64 +230,111 @@ func TestRoundTripRetryMiddleware(t *testing.T) { // TestRoundTripSplitIntervalMiddleware tests the split interval middleware. func TestRoundTripSplitIntervalMiddleware(t *testing.T) { - testRequest := &ThanosRequest{ + testRequest := &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 0, End: 2 * hour, Step: 10 * seconds, } - codec := NewThanosCodec(true) + testLabelsRequest := &ThanosLabelsRequest{ + Path: "/api/v1/labels", + Start: 0, + End: 2 * hour, + } + + testSeriesRequest := &ThanosSeriesRequest{ + Path: "/api/v1/series", + Start: 0, + End: 2 * hour, + Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + } + + queryRangeCodec := NewThanosQueryRangeCodec(true) + labelsCodec := NewThanosLabelsCodec(true, 2*time.Hour) for _, tc := range []struct { name string splitInterval time.Duration req queryrange.Request + codec queryrange.Codec + handlerFunc func(bool) (*int, http.Handler) expected int }{ { name: "non query range request won't be split 1", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Path: "/api/v1/query", Start: 0, End: 2 * hour, Step: 10 * seconds, }, - splitInterval: time.Hour, - expected: 1, - }, - { - name: "non query range request won't be split 2", - req: &ThanosRequest{ - Path: "/api/v1/labels", - Start: 0, - End: 2 * hour, - Step: 10 * seconds, - }, + handlerFunc: promqlResults, + codec: queryRangeCodec, splitInterval: time.Hour, expected: 1, }, { name: "split interval == 0, disable split", req: testRequest, + handlerFunc: promqlResults, + codec: queryRangeCodec, splitInterval: 0, expected: 1, }, { name: "won't be split. Interval == time range", req: testRequest, + handlerFunc: promqlResults, + codec: queryRangeCodec, splitInterval: 2 * time.Hour, expected: 1, }, { name: "won't be split. Interval > time range", req: testRequest, + handlerFunc: promqlResults, + codec: queryRangeCodec, splitInterval: day, expected: 1, }, { name: "split to 2 requests", req: testRequest, + handlerFunc: promqlResults, + codec: queryRangeCodec, + splitInterval: 1 * time.Hour, + expected: 2, + }, + { + name: "labels request won't be split", + req: testLabelsRequest, + handlerFunc: labelsResults, + codec: labelsCodec, + splitInterval: day, + expected: 1, + }, + { + name: "labels request split to 2", + req: testLabelsRequest, + handlerFunc: labelsResults, + codec: labelsCodec, + splitInterval: 1 * time.Hour, + expected: 2, + }, + { + name: "series request won't be split", + req: testSeriesRequest, + handlerFunc: seriesResults, + codec: labelsCodec, + splitInterval: day, + expected: 1, + }, + { + name: "series request split to 2", + req: testSeriesRequest, + handlerFunc: seriesResults, + codec: labelsCodec, splitInterval: 1 * time.Hour, expected: 2, }, @@ -224,8 +343,14 @@ func TestRoundTripSplitIntervalMiddleware(t *testing.T) { t.Run(tc.name, func(t *testing.T) { tpw, err := NewTripperware( Config{ - SplitQueriesByInterval: tc.splitInterval, - CortexLimits: defaultLimits, + QueryRangeConfig: QueryRangeConfig{ + Limits: defaultLimits, + SplitQueriesByInterval: tc.splitInterval, + }, + LabelsConfig: LabelsConfig{ + Limits: defaultLimits, + SplitQueriesByInterval: tc.splitInterval, + }, }, nil, log.NewNopLogger(), ) testutil.Ok(t, err) @@ -233,11 +358,11 @@ func TestRoundTripSplitIntervalMiddleware(t *testing.T) { rt, err := newFakeRoundTripper() testutil.Ok(t, err) defer rt.Close() - res, handler := promqlResults(false) + res, handler := tc.handlerFunc(false) rt.setHandler(handler) ctx := user.InjectOrgID(context.Background(), "1") - httpReq, err := codec.EncodeRequest(ctx, tc.req) + httpReq, err := tc.codec.EncodeRequest(ctx, tc.req) testutil.Ok(t, err) _, err = tpw(rt).RoundTrip(httpReq) @@ -245,13 +370,12 @@ func TestRoundTripSplitIntervalMiddleware(t *testing.T) { testutil.Equals(t, tc.expected, *res) }) - } } // TestRoundTripCacheMiddleware tests the cache middleware. func TestRoundTripCacheMiddleware(t *testing.T) { - testRequest := &ThanosRequest{ + testRequest := &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 0, End: 2 * hour, @@ -260,7 +384,7 @@ func TestRoundTripCacheMiddleware(t *testing.T) { } // Non query range request, won't be cached. - testRequestInstant := &ThanosRequest{ + testRequestInstant := &ThanosQueryRangeRequest{ Path: "/api/v1/query", Start: 0, End: 2 * hour, @@ -269,7 +393,7 @@ func TestRoundTripCacheMiddleware(t *testing.T) { // Same query params as testRequest, different maxSourceResolution // but still in the same downsampling level, so it will be cached in this case. - testRequestSameLevelDownsampling := &ThanosRequest{ + testRequestSameLevelDownsampling := &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 0, End: 2 * hour, @@ -279,7 +403,7 @@ func TestRoundTripCacheMiddleware(t *testing.T) { // Same query params as testRequest, different maxSourceResolution // and downsampling level so it won't be cached in this case. - testRequestHigherLevelDownsampling := &ThanosRequest{ + testRequestHigherLevelDownsampling := &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 0, End: 2 * hour, @@ -288,7 +412,7 @@ func TestRoundTripCacheMiddleware(t *testing.T) { } // Same query params as testRequest, but with storeMatchers - testRequestWithStoreMatchers := &ThanosRequest{ + testRequestWithStoreMatchers := &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 0, End: 2 * hour, @@ -311,9 +435,11 @@ func TestRoundTripCacheMiddleware(t *testing.T) { now := time.Now() tpw, err := NewTripperware( Config{ - SplitQueriesByInterval: day, - CortexResultsCacheConfig: cacheConf, - CortexLimits: defaultLimits, + QueryRangeConfig: QueryRangeConfig{ + Limits: defaultLimits, + ResultsCacheConfig: cacheConf, + SplitQueriesByInterval: day, + }, }, nil, log.NewNopLogger(), ) testutil.Ok(t, err) @@ -339,7 +465,7 @@ func TestRoundTripCacheMiddleware(t *testing.T) { {name: "storeMatchers requests won't go to cache", req: testRequestWithStoreMatchers, expected: 5}, { name: "request but will be partitioned", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: timestamp.FromTime(now.Add(-time.Hour)), End: timestamp.FromTime(now.Add(time.Hour)), @@ -349,7 +475,7 @@ func TestRoundTripCacheMiddleware(t *testing.T) { }, { name: "same query as the previous one", - req: &ThanosRequest{ + req: &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: timestamp.FromTime(now.Add(-time.Hour)), End: timestamp.FromTime(now.Add(time.Hour)), @@ -362,7 +488,7 @@ func TestRoundTripCacheMiddleware(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "1") - httpReq, err := NewThanosCodec(true).EncodeRequest(ctx, tc.req) + httpReq, err := NewThanosQueryRangeCodec(true).EncodeRequest(ctx, tc.req) testutil.Ok(t, err) _, err = tpw(rt).RoundTrip(httpReq) @@ -374,7 +500,7 @@ func TestRoundTripCacheMiddleware(t *testing.T) { } } -// promqlResults is a mock handler used to test cache middleware. +// promqlResults is a mock handler used to test split and cache middleware. // Modified from Loki https://github.com/grafana/loki/blob/master/pkg/querier/queryrange/roundtrip_test.go#L547. func promqlResults(fail bool) (*int, http.Handler) { count := 0 @@ -409,3 +535,51 @@ func promqlResults(fail bool) (*int, http.Handler) { count++ }) } + +//labelsResults is a mock handler used to test split and cache middleware for label names and label values requests. +func labelsResults(fail bool) (*int, http.Handler) { + count := 0 + var lock sync.Mutex + q := ThanosLabelsResponse{ + Status: "success", + Data: []string{"__name__", "job"}, + } + + return &count, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + lock.Lock() + defer lock.Unlock() + + // Set fail in the response code to test retry. + if fail { + w.WriteHeader(500) + } + if err := json.NewEncoder(w).Encode(q); err != nil { + panic(err) + } + count++ + }) +} + +// seriesResults is a mock handler used to test split and cache middleware for series requests. +func seriesResults(fail bool) (*int, http.Handler) { + count := 0 + var lock sync.Mutex + q := ThanosSeriesResponse{ + Status: "success", + Data: []labelpb.LabelSet{{Labels: []labelpb.Label{{Name: "__name__", Value: "up"}, {Name: "foo", Value: "bar"}}}}, + } + + return &count, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + lock.Lock() + defer lock.Unlock() + + // Set fail in the response code to test retry. + if fail { + w.WriteHeader(500) + } + if err := json.NewEncoder(w).Encode(q); err != nil { + panic(err) + } + count++ + }) +} diff --git a/pkg/queryfrontend/split_by_interval.go b/pkg/queryfrontend/split_by_interval.go new file mode 100644 index 00000000000..22b1d510594 --- /dev/null +++ b/pkg/queryfrontend/split_by_interval.go @@ -0,0 +1,104 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// This is a modified copy from +// https://github.com/cortexproject/cortex/blob/master/pkg/querier/queryrange/split_by_interval.go. + +package queryfrontend + +import ( + "context" + "time" + + "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// SplitByIntervalMiddleware creates a new Middleware that splits requests by a given interval. +func SplitByIntervalMiddleware(interval queryrange.IntervalFn, limits queryrange.Limits, merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware { + return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { + return splitByInterval{ + next: next, + limits: limits, + merger: merger, + interval: interval, + splitByCounter: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: "thanos", + Name: "frontend_split_queries_total", + Help: "Total number of underlying query requests after the split by interval is applied", + }), + } + }) +} + +type splitByInterval struct { + next queryrange.Handler + limits queryrange.Limits + merger queryrange.Merger + interval queryrange.IntervalFn + + // Metrics. + splitByCounter prometheus.Counter +} + +func (s splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { + // First we're going to build new requests, one for each day, taking care + // to line up the boundaries with step. + reqs := splitQuery(r, s.interval(r)) + s.splitByCounter.Add(float64(len(reqs))) + + reqResps, err := queryrange.DoRequests(ctx, s.next, reqs, s.limits) + if err != nil { + return nil, err + } + + resps := make([]queryrange.Response, 0, len(reqResps)) + for _, reqResp := range reqResps { + resps = append(resps, reqResp.Response) + } + + response, err := s.merger.MergeResponse(resps...) + if err != nil { + return nil, err + } + return response, nil +} + +func splitQuery(r queryrange.Request, interval time.Duration) []queryrange.Request { + var reqs []queryrange.Request + if _, ok := r.(*ThanosQueryRangeRequest); ok { + for start := r.GetStart(); start < r.GetEnd(); start = nextIntervalBoundary(start, r.GetStep(), interval) + r.GetStep() { + end := nextIntervalBoundary(start, r.GetStep(), interval) + if end+r.GetStep() >= r.GetEnd() { + end = r.GetEnd() + } + + reqs = append(reqs, r.WithStartEnd(start, end)) + } + } else { + dur := int64(interval / time.Millisecond) + for start := r.GetStart(); start < r.GetEnd(); start = start + dur { + end := start + dur + if end > r.GetEnd() { + end = r.GetEnd() + } + + reqs = append(reqs, r.WithStartEnd(start, end)) + } + } + + return reqs +} + +// Round up to the step before the next interval boundary. +func nextIntervalBoundary(t, step int64, interval time.Duration) int64 { + msPerInterval := int64(interval / time.Millisecond) + startOfNextInterval := ((t / msPerInterval) + 1) * msPerInterval + // ensure that target is a multiple of steps away from the start time + target := startOfNextInterval - ((startOfNextInterval - t) % step) + if target == startOfNextInterval { + target -= step + } + return target +} diff --git a/scripts/genproto.sh b/scripts/genproto.sh index f965514856b..66b016eccab 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -25,7 +25,7 @@ PATH=${PATH}:/tmp/protobin GOGOPROTO_ROOT="$(GO111MODULE=on go list -modfile=.bingo/protoc-gen-gogofast.mod -f '{{ .Dir }}' -m github.com/gogo/protobuf)" GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf" -DIRS="store/storepb/ store/storepb/prompb/ store/labelpb rules/rulespb store/hintspb" +DIRS="store/storepb/ store/storepb/prompb/ store/labelpb rules/rulespb store/hintspb queryfrontend" echo "generating code" pushd "pkg" for dir in ${DIRS}; do diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index 8795457304f..196365a36dd 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -16,6 +16,7 @@ import ( "github.com/thanos-io/thanos/pkg/cacheutil" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/queryfrontend" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) @@ -96,27 +97,6 @@ func TestQueryFrontend(t *testing.T) { ) }) - t.Run("query frontend works for labels APIs", func(t *testing.T) { - // LabelNames and LabelValues API should still work via query frontend. - labelNames(t, ctx, queryFrontend.HTTPEndpoint(), timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { - return len(res) > 0 - }) - testutil.Ok(t, q.WaitSumMetricsWithOptions( - e2e.Equals(1), - []string{"http_requests_total"}, - e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "handler", "label_names"))), - ) - - labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { - return len(res) > 0 - }) - testutil.Ok(t, q.WaitSumMetricsWithOptions( - e2e.Equals(1), - []string{"http_requests_total"}, - e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "handler", "label_values"))), - ) - }) - t.Run("query frontend works for range query and it can cache results", func(t *testing.T) { rangeQuery( t, @@ -146,7 +126,7 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "querier_cache_misses_total")) // Query is only 2h so it won't be split. - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "cortex_frontend_split_queries_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "thanos_frontend_split_queries_total")) testutil.Ok(t, q.WaitSumMetricsWithOptions( e2e.Equals(1), @@ -185,7 +165,10 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "querier_cache_misses_total")) // Query is only 2h so it won't be split. - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_frontend_split_queries_total")) + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(2), []string{"thanos_frontend_split_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "query_range"))), + ) // One more request is needed in order to satisfy the req range. testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -224,7 +207,10 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "querier_cache_misses_total")) // Query is 25h so it will be split to 2 requests. - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(4), "cortex_frontend_split_queries_total")) + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(4), []string{"thanos_frontend_split_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "query_range"))), + ) testutil.Ok(t, q.WaitSumMetricsWithOptions( e2e.Equals(4), @@ -232,6 +218,143 @@ func TestQueryFrontend(t *testing.T) { e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "handler", "query_range"))), ) }) + + t.Run("query frontend splitting works for labels names API", func(t *testing.T) { + // LabelNames and LabelValues API should still work via query frontend. + labelNames(t, ctx, queryFrontend.HTTPEndpoint(), timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + return len(res) > 0 + }) + testutil.Ok(t, q.WaitSumMetricsWithOptions( + e2e.Equals(1), + []string{"http_requests_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "handler", "label_names"))), + ) + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(1), + []string{"thanos_query_frontend_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "op", "label_names"))), + ) + // Query is only 2h so it won't be split. + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(1), []string{"thanos_frontend_split_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "labels"))), + ) + + labelNames(t, ctx, queryFrontend.HTTPEndpoint(), timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + return len(res) > 0 + }) + testutil.Ok(t, q.WaitSumMetricsWithOptions( + e2e.Equals(3), + []string{"http_requests_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "handler", "label_names"))), + ) + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(2), + []string{"thanos_query_frontend_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "op", "label_names"))), + ) + // Query is 25h so split to 2 requests. + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(3), []string{"thanos_frontend_split_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "labels"))), + ) + }) + + t.Run("query frontend splitting works for labels values API", func(t *testing.T) { + labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + return len(res) > 0 + }) + testutil.Ok(t, q.WaitSumMetricsWithOptions( + e2e.Equals(1), + []string{"http_requests_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "handler", "label_values"))), + ) + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(1), + []string{"thanos_query_frontend_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "op", "label_values"))), + ) + // Query is only 2h so it won't be split. + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(4), []string{"thanos_frontend_split_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "labels"))), + ) + + labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + return len(res) > 0 + }) + testutil.Ok(t, q.WaitSumMetricsWithOptions( + e2e.Equals(3), + []string{"http_requests_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "handler", "label_values"))), + ) + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(2), + []string{"thanos_query_frontend_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "op", "label_values"))), + ) + // Query is 25h so split to 2 requests. + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(6), []string{"thanos_frontend_split_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "labels"))), + ) + }) + + t.Run("query frontend splitting works for series API", func(t *testing.T) { + series( + t, + ctx, + queryFrontend.HTTPEndpoint(), + []storepb.LabelMatcher{{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "up"}}, + timestamp.FromTime(now.Add(-time.Hour)), + timestamp.FromTime(now.Add(time.Hour)), + func(res []map[string]string) bool { + return len(res) > 0 + }, + ) + testutil.Ok(t, q.WaitSumMetricsWithOptions( + e2e.Equals(1), + []string{"http_requests_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "handler", "series"))), + ) + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(1), + []string{"thanos_query_frontend_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "op", "series"))), + ) + // Query is only 2h so it won't be split. + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(7), []string{"thanos_frontend_split_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "labels"))), + ) + + series( + t, + ctx, + queryFrontend.HTTPEndpoint(), + []storepb.LabelMatcher{{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "up"}}, + timestamp.FromTime(now.Add(-24*time.Hour)), + timestamp.FromTime(now.Add(time.Hour)), + func(res []map[string]string) bool { + return len(res) > 0 + }, + ) + testutil.Ok(t, q.WaitSumMetricsWithOptions( + e2e.Equals(3), + []string{"http_requests_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "handler", "series"))), + ) + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(2), + []string{"thanos_query_frontend_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "op", "series"))), + ) + // Query is only 2h so it won't be split. + testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Equals(9), []string{"thanos_frontend_split_queries_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "labels"))), + ) + }) } func TestQueryFrontendMemcachedCache(t *testing.T) { @@ -322,7 +445,7 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(0), "cortex_cache_hits")) // Query is only 2h so it won't be split. - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "cortex_frontend_split_queries_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "thanos_frontend_split_queries_total")) // Run the same range query again, the result can be retrieved from cache directly. rangeQuery( @@ -347,7 +470,7 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { // Query is only 2h so it won't be split. // If it was split this would be increase by more then 1. - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_frontend_split_queries_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "thanos_frontend_split_queries_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_cache_fetched_keys")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "cortex_cache_hits")) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index b6a89745279..ef7f5bf2119 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -25,6 +25,7 @@ import ( "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) @@ -440,6 +441,7 @@ func labelNames(t *testing.T, ctx context.Context, addr string, start, end int64 })) } +//nolint:unparam func labelValues(t *testing.T, ctx context.Context, addr, label string, start, end int64, check func(res []string) bool) { t.Helper() @@ -458,6 +460,24 @@ func labelValues(t *testing.T, ctx context.Context, addr, label string, start, e })) } +func series(t *testing.T, ctx context.Context, addr string, matchers []storepb.LabelMatcher, start int64, end int64, check func(res []map[string]string) bool) { + t.Helper() + + logger := log.NewLogfmtLogger(os.Stdout) + logger = log.With(logger, "ts", log.DefaultTimestampUTC) + testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { + res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end) + if err != nil { + return err + } + if check(res) { + return nil + } + + return errors.Errorf("unexpected results size %d", len(res)) + })) +} + //nolint:unparam func rangeQuery(t *testing.T, ctx context.Context, addr string, q string, start, end, step int64, opts promclient.QueryOptions, check func(res model.Matrix) bool) { t.Helper()