diff --git a/Makefile b/Makefile index fd3f4acc61..08c38f4a9f 100644 --- a/Makefile +++ b/Makefile @@ -340,7 +340,11 @@ test-e2e: docker-e2e $(GOTESPLIT) # NOTE(GiedriusS): # * If you want to limit CPU time available in e2e tests then pass E2E_DOCKER_CPUS environment variable. For example, E2E_DOCKER_CPUS=0.05 limits CPU time available # to spawned Docker containers to 0.05 cores. - @$(GOTESPLIT) -total ${GH_PARALLEL} -index ${GH_INDEX} ./test/e2e/... -- ${GOTEST_OPTS} + @if [ -n "$(SINGLE_E2E_TEST)" ]; then \ + $(GOTESPLIT) -total ${GH_PARALLEL} -index ${GH_INDEX} ./test/e2e -- -run $(SINGLE_E2E_TEST) ${GOTEST_OPTS}; \ + else \ + $(GOTESPLIT) -total ${GH_PARALLEL} -index ${GH_INDEX} ./test/e2e/... -- ${GOTEST_OPTS}; \ + fi .PHONY: test-e2e-local test-e2e-local: ## Runs all thanos e2e tests locally. diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 69ffb8ea32..f047c4f24b 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -127,6 +127,9 @@ func registerQuery(app *extkingpin.App) { Strings() queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations. If this is empty then all labels are used as partition labels.").Strings() + // currently, we choose the highest MinT of an engine when querying multiple engines. This flag allows to change this behavior to choose the lowest MinT. + queryDistributedWithOverlappingInterval := cmd.Flag("query.distributed-with-overlapping-interval", "Allow for distributed queries using an engines lowest MinT.").Hidden().Default("false").Bool() + instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden()) defaultMetadataTimeRange := cmd.Flag("query.metadata.default-time-range", "The default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning.").Default("0s").Duration() @@ -371,12 +374,13 @@ func registerQuery(app *extkingpin.App) { *tenantCertField, *enforceTenancy, *tenantLabel, + *queryDistributedWithOverlappingInterval, ) }) } // runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured -// store nodes, merging and duplicating the data to satisfy user query. +// store nodes, merging and deduplicating the data to satisfy user query. func runQuery( g *run.Group, logger log.Logger, @@ -453,6 +457,7 @@ func runQuery( tenantCertField string, enforceTenancy bool, tenantLabel string, + queryDistributedWithOverlappingInterval bool, ) error { comp := component.Query if alertQueryURL == "" { @@ -688,11 +693,12 @@ func runQuery( level.Info(logger).Log("msg", "Distributed query mode enabled, using Thanos as the default query engine.") defaultEngine = string(apiv1.PromqlEngineThanos) remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{ - AutoDownsample: enableAutodownsampling, - ReplicaLabels: queryReplicaLabels, - PartitionLabels: queryPartitionLabels, - Timeout: queryTimeout, - EnablePartialResponse: enableQueryPartialResponse, + AutoDownsample: enableAutodownsampling, + ReplicaLabels: queryReplicaLabels, + PartitionLabels: queryPartitionLabels, + Timeout: queryTimeout, + EnablePartialResponse: enableQueryPartialResponse, + QueryDistributedWithOverlappingInterval: queryDistributedWithOverlappingInterval, }) } diff --git a/pkg/query/remote_engine.go b/pkg/query/remote_engine.go index 77a74c9a6f..c44dd04b8d 100644 --- a/pkg/query/remote_engine.go +++ b/pkg/query/remote_engine.go @@ -33,11 +33,12 @@ import ( // Opts are the options for a PromQL query. type Opts struct { - AutoDownsample bool - ReplicaLabels []string - PartitionLabels []string - Timeout time.Duration - EnablePartialResponse bool + AutoDownsample bool + ReplicaLabels []string + PartitionLabels []string + Timeout time.Duration + EnablePartialResponse bool + QueryDistributedWithOverlappingInterval bool } // Client is a query client that executes PromQL queries. @@ -114,6 +115,7 @@ func NewRemoteEngine(logger log.Logger, queryClient Client, opts Opts) *remoteEn // a block due to retention before other replicas did the same. // See https://github.com/thanos-io/promql-engine/issues/187. func (r *remoteEngine) MinT() int64 { + r.mintOnce.Do(func() { var ( hashBuf = make([]byte, 0, 128) @@ -126,7 +128,11 @@ func (r *remoteEngine) MinT() int64 { highestMintByLabelSet[key] = lset.MinTime continue } - if lset.MinTime > lsetMinT { + // If we are querying with overlapping intervals, we want to find the first available timestamp + // otherwise we want to find the last available timestamp. + if r.opts.QueryDistributedWithOverlappingInterval && lset.MinTime < lsetMinT { + highestMintByLabelSet[key] = lset.MinTime + } else if !r.opts.QueryDistributedWithOverlappingInterval && lset.MinTime > lsetMinT { highestMintByLabelSet[key] = lset.MinTime } } diff --git a/pkg/receive/writer_errors.go b/pkg/receive/writer_errors.go index ee807564e6..cabc626054 100644 --- a/pkg/receive/writer_errors.go +++ b/pkg/receive/writer_errors.go @@ -65,7 +65,9 @@ func (a *writeErrorTracker) addSampleError(err error, tLogger log.Logger, lset l level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", v, "timestamp", t) case errors.Is(err, storage.ErrTooOldSample): a.numSamplesTooOld++ - level.Debug(tLogger).Log("msg", "Sample is too old", "lset", lset, "value", v, "timestamp", t) + // we could pass in current head max time, but in case that is not updated, maxTime would be < current time + // so we can just point to the metric that shows the current head max time + level.Debug(tLogger).Log("msg", "Sample is too old", "lset", lset, "value", v, "timestamp", t, "for current latest, check prometheus_tsdb_head_max_time metric") default: level.Debug(tLogger).Log("msg", "Error ingesting sample", "err", err) } diff --git a/test/e2e/distributed_query_test.go b/test/e2e/distributed_query_test.go index d421d6b031..3037864f23 100644 --- a/test/e2e/distributed_query_test.go +++ b/test/e2e/distributed_query_test.go @@ -5,14 +5,28 @@ package e2e_test import ( "context" + "os" + "path" + "path/filepath" "testing" "time" "github.com/efficientgo/core/testutil" "github.com/efficientgo/e2e" + e2edb "github.com/efficientgo/e2e/db" + "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" + "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/client" + "github.com/thanos-io/objstore/providers/s3" + v1 "github.com/thanos-io/thanos/pkg/api/query" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/testutil/e2eutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) @@ -82,3 +96,191 @@ func TestDistributedQueryExecution(t *testing.T) { }, }) } + +func TestDistributedEngineWithOverlappingIntervalsEnabled(t *testing.T) { + t.Parallel() + + e, err := e2e.New(e2e.WithName("dist-disj-tsdbs")) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + ctx := context.Background() + l := log.NewLogfmtLogger(os.Stdout) + now := time.Now() + + bucket1 := "dist-disj-tsdbs-test1" + minio1 := e2edb.NewMinio(e, "1", bucket1, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(minio1)) + + bkt1, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket1, minio1.Endpoint("http"), minio1.Dir()), "test", nil) + testutil.Ok(t, err) + + // Setup a storage GW with 2 blocks that have a gap to trigger distributed query MinT bug + dir1 := filepath.Join(e.SharedDir(), "tmp1") + testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir1), os.ModePerm)) + blockID1, err := e2eutil.CreateBlockWithBlockDelay(ctx, + dir1, + []labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")}, + 1000, + timestamp.FromTime(now.Add(-10*time.Hour)), + timestamp.FromTime(now.Add(-8*time.Hour)), + 30*time.Minute, + labels.FromStrings("prometheus", "p1", "replica", "0"), + 0, + metadata.NoneFunc, + ) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID1.String()), blockID1.String())) + + blockID2, err := e2eutil.CreateBlockWithBlockDelay(ctx, + dir1, + []labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")}, + 1000, + timestamp.FromTime(now.Add(-4*time.Hour)), + timestamp.FromTime(now.Add(-2*time.Hour)), + 30*time.Minute, + labels.FromStrings("prometheus", "p1", "replica", "0"), + 0, + metadata.NoneFunc, + ) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID2.String()), blockID2.String())) + store1 := e2ethanos.NewStoreGW( + e, + "s1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket1, minio1.InternalEndpoint("http"), minio1.InternalDir()), + }, + "", + "", + nil, + ) + testutil.Ok(t, e2e.StartAndWaitReady(store1)) + + querierLeaf1 := e2ethanos.NewQuerierBuilder(e, "1", store1.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(querierLeaf1)) + // We need another querier to circumvent the passthrough optimizer + promConfig2 := e2ethanos.DefaultPromConfig("p2", 0, "", "", e2ethanos.LocalPrometheusTarget) + prom2, sidecar2 := e2ethanos.NewPrometheusWithSidecar(e, "p2", promConfig2, "", e2ethanos.DefaultPrometheusImage(), "") + testutil.Ok(t, e2e.StartAndWaitReady(prom2, sidecar2)) + querierLeaf2 := e2ethanos.NewQuerierBuilder(e, "2", sidecar2.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(querierLeaf2)) + querierDistributed := e2ethanos.NewQuerierBuilder(e, "3", + querierLeaf1.InternalEndpoint("grpc"), + querierLeaf2.InternalEndpoint("grpc"), + ). + WithEngine(v1.PromqlEngineThanos). + WithQueryMode("distributed"). + WithDistributedOverlap(true). + Init() + + testutil.Ok(t, e2e.StartAndWaitReady(querierDistributed)) + + // We would expect 2x2h ranges for the 2 blocks containing foo samples. That would be around 240 expected sample pairs in the result matrix. + // We assert on more then 200 to reduce flakiness + rangeQuery(t, ctx, querierDistributed.Endpoint("http"), func() string { return "foo" }, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now), 60, promclient.QueryOptions{}, func(res model.Matrix) error { + if res.Len() < 1 { + return errors.New("No result series returned") + } + if nvals := len(res[0].Values); nvals < 200 { + return errors.Errorf("Too few values in result matrix, got %d, expected > 200", nvals) + } + return nil + }) +} + +func TestDistributedEngineWithoutOverlappingIntervals(t *testing.T) { + t.Skip("skipping test as this replicates a bug") + t.Parallel() + e, err := e2e.New(e2e.WithName("dist-disj-tsdbs2")) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + ctx := context.Background() + l := log.NewLogfmtLogger(os.Stdout) + now := time.Now() + + bucket1 := "dist-disj-tsdbs2-test2" + minio1 := e2edb.NewMinio(e, "1", bucket1, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(minio1)) + + bkt1, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket1, minio1.Endpoint("http"), minio1.Dir()), "test", nil) + testutil.Ok(t, err) + + // Setup a storage GW with 2 blocks that have a gap to trigger distributed query MinT bug + dir1 := filepath.Join(e.SharedDir(), "tmp1") + testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir1), os.ModePerm)) + blockID1, err := e2eutil.CreateBlockWithBlockDelay(ctx, + dir1, + []labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")}, + 1000, + timestamp.FromTime(now.Add(-14*time.Hour)), + timestamp.FromTime(now.Add(-12*time.Hour)), + 30*time.Minute, + labels.FromStrings("prometheus", "p1", "replica", "0"), + 0, + metadata.NoneFunc, + ) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID1.String()), blockID1.String())) + + blockID2, err := e2eutil.CreateBlockWithBlockDelay(ctx, + dir1, + []labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")}, + 1000, + timestamp.FromTime(now.Add(-4*time.Hour)), + timestamp.FromTime(now.Add(-2*time.Hour)), + 30*time.Minute, + labels.FromStrings("prometheus", "p1", "replica", "0"), + 0, + metadata.NoneFunc, + ) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID2.String()), blockID2.String())) + store1 := e2ethanos.NewStoreGW( + e, + "s1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket1, minio1.InternalEndpoint("http"), minio1.InternalDir()), + }, + "", + "", + nil, + ) + testutil.Ok(t, e2e.StartAndWaitReady(store1)) + + querierLeaf1 := e2ethanos.NewQuerierBuilder(e, "1", store1.InternalEndpoint("grpc")).Init() + + testutil.Ok(t, e2e.StartAndWaitReady(querierLeaf1)) + // We need another querier to circumvent the passthrough optimizer + promConfig2 := e2ethanos.DefaultPromConfig("p2", 0, "", "", e2ethanos.LocalPrometheusTarget) + prom2, sidecar2 := e2ethanos.NewPrometheusWithSidecar(e, "p2", promConfig2, "", e2ethanos.DefaultPrometheusImage(), "") + testutil.Ok(t, e2e.StartAndWaitReady(prom2, sidecar2)) + querierLeaf2 := e2ethanos.NewQuerierBuilder(e, "2", sidecar2.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(querierLeaf2)) + + querierDistributed := e2ethanos.NewQuerierBuilder(e, "3", + querierLeaf1.InternalEndpoint("grpc"), + querierLeaf2.InternalEndpoint("grpc"), + ). + WithEngine(v1.PromqlEngineThanos). + WithQueryMode("distributed"). + Init() + + testutil.Ok(t, e2e.StartAndWaitReady(querierDistributed)) + + // We would expect 2x2h ranges for the 2 blocks containing foo samples. That would be around 240 expected sample pairs in the result matrix. + // We assert on more then 200 to reduce flakiness + rangeQuery(t, ctx, querierDistributed.Endpoint("http"), func() string { return "foo" }, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now), 60, promclient.QueryOptions{}, func(res model.Matrix) error { + if res.Len() < 1 { + return errors.New("No result series returned") + } + if nvals := len(res[0].Values); nvals < 200 { + return errors.Errorf("Too few values in result matrix, got %d, expected > 200", nvals) + } + + return nil + }) +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index b4448aa633..5e0a005c48 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -254,9 +254,10 @@ type QuerierBuilder struct { endpoints []string strictEndpoints []string - engine apiv1.PromqlEngineType - queryMode string - enableXFunctions bool + engine apiv1.PromqlEngineType + queryMode string + queryDistributedWithOverlappingInterval bool + enableXFunctions bool replicaLabels []string tracingConfig string @@ -376,6 +377,10 @@ func (q *QuerierBuilder) WithQueryMode(mode string) *QuerierBuilder { q.queryMode = mode return q } +func (q *QuerierBuilder) WithDistributedOverlap(overlap bool) *QuerierBuilder { + q.queryDistributedWithOverlappingInterval = overlap + return q +} func (q *QuerierBuilder) WithEnableXFunctions() *QuerierBuilder { q.enableXFunctions = true @@ -513,6 +518,9 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { if q.queryMode != "" { args = append(args, "--query.mode="+q.queryMode) } + if q.queryDistributedWithOverlappingInterval { + args = append(args, "--query.distributed-with-overlapping-interval") + } if q.engine != "" { args = append(args, "--query.promql-engine="+string(q.engine)) } diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 11bb203247..0351cc9bc8 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -2376,93 +2376,3 @@ func TestDistributedEngineWithExtendedFunctions(t *testing.T) { }, time.Now, promclient.QueryOptions{}, 1) testutil.Equals(t, model.SampleValue(0), result[0].Value) } - -func TestDistributedEngineWithDisjointTSDBs(t *testing.T) { - t.Skip("skipping test as this replicates a bug") - e, err := e2e.New(e2e.WithName("dist-disj-tsdbs")) - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - ctx := context.Background() - l := log.NewLogfmtLogger(os.Stdout) - now := time.Now() - - bucket1 := "dist-disj-tsdbs-test1" - minio1 := e2edb.NewMinio(e, "1", bucket1, e2edb.WithMinioTLS()) - testutil.Ok(t, e2e.StartAndWaitReady(minio1)) - - bkt1, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket1, minio1.Endpoint("http"), minio1.Dir()), "test", nil) - testutil.Ok(t, err) - - // Setup a storage GW with 2 blocks that have a gap to trigger distributed query MinT bug - dir1 := filepath.Join(e.SharedDir(), "tmp1") - testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir1), os.ModePerm)) - blockID1, err := e2eutil.CreateBlockWithBlockDelay(ctx, - dir1, - []labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")}, - 1000, - timestamp.FromTime(now.Add(-10*time.Hour)), - timestamp.FromTime(now.Add(-8*time.Hour)), - 30*time.Minute, - labels.FromStrings("prometheus", "p1", "replica", "0"), - 0, - metadata.NoneFunc, - ) - testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID1.String()), blockID1.String())) - - blockID2, err := e2eutil.CreateBlockWithBlockDelay(ctx, - dir1, - []labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")}, - 1000, - timestamp.FromTime(now.Add(-4*time.Hour)), - timestamp.FromTime(now.Add(-2*time.Hour)), - 30*time.Minute, - labels.FromStrings("prometheus", "p1", "replica", "0"), - 0, - metadata.NoneFunc, - ) - testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID2.String()), blockID2.String())) - store1 := e2ethanos.NewStoreGW( - e, - "s1", - client.BucketConfig{ - Type: client.S3, - Config: e2ethanos.NewS3Config(bucket1, minio1.InternalEndpoint("http"), minio1.InternalDir()), - }, - "", - "", - nil, - ) - testutil.Ok(t, e2e.StartAndWaitReady(store1)) - - querierLeaf1 := e2ethanos.NewQuerierBuilder(e, "1", store1.InternalEndpoint("grpc")).Init() - - // We need another querier to circumvent the passthrough optimizer - promConfig2 := e2ethanos.DefaultPromConfig("p2", 0, "", "", e2ethanos.LocalPrometheusTarget) - prom2, sidecar2 := e2ethanos.NewPrometheusWithSidecar(e, "p2", promConfig2, "", e2ethanos.DefaultPrometheusImage(), "") - querierLeaf2 := e2ethanos.NewQuerierBuilder(e, "2", sidecar2.InternalEndpoint("grpc")).Init() - - querierDistributed := e2ethanos.NewQuerierBuilder(e, "3", - querierLeaf1.InternalEndpoint("grpc"), - querierLeaf2.InternalEndpoint("grpc"), - ). - WithEngine(v1.PromqlEngineThanos). - WithQueryMode("distributed"). - Init() - - testutil.Ok(t, e2e.StartAndWaitReady(querierLeaf1, prom2, sidecar2, querierLeaf2, querierDistributed)) - - // We would expect 2x2h ranges for the 2 blocks containing foo samples. That would be around 240 expected sample pairs in the result matrix. - // We assert on more then 200 to reduce flakiness - rangeQuery(t, ctx, querierDistributed.Endpoint("http"), func() string { return "foo" }, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now), 60, promclient.QueryOptions{}, func(res model.Matrix) error { - if res.Len() < 1 { - return errors.New("No result series returned") - } - if nvals := len(res[0].Values); nvals < 200 { - return errors.Errorf("Too few values in result matrix, got %d, expected > 200", nvals) - } - return nil - }) -}