diff --git a/pkg/query/remote_engine.go b/pkg/query/remote_engine.go index c625cab5ba..77a74c9a6f 100644 --- a/pkg/query/remote_engine.go +++ b/pkg/query/remote_engine.go @@ -126,12 +126,10 @@ func (r *remoteEngine) MinT() int64 { highestMintByLabelSet[key] = lset.MinTime continue } - if lset.MinTime > lsetMinT { highestMintByLabelSet[key] = lset.MinTime } } - var mint int64 = math.MaxInt64 for _, m := range highestMintByLabelSet { if m < mint { @@ -190,7 +188,6 @@ func (r *remoteEngine) adjustedInfos() infopb.TSDBInfos { labelpb.ZLabelsFromPromLabels(builder.Labels())), ) } - return infos } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index eec1de1005..963de83fbe 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -922,17 +922,15 @@ func (s *BucketStore) TSDBInfos() []infopb.TSDBInfo { sort.Slice(infos, func(i, j int) bool { return infos[i].MinTime < infos[j].MinTime }) cur := infos[0] - for i, info := range infos { + for _, info := range infos { if info.MinTime > cur.MaxTime { res = append(res, cur) cur = info continue } cur.MaxTime = info.MaxTime - if i == len(infos)-1 { - res = append(res, cur) - } } + res = append(res, cur) } return res diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index ec3e0bee7e..e8dffd093b 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -682,6 +682,8 @@ func TestBucketStore_TSDBInfo(t *testing.T) { {mint: 3500, maxt: 5000, extLabels: labels.FromStrings("a", "b")}, {mint: 0, maxt: 1000, extLabels: labels.FromStrings("a", "c")}, {mint: 500, maxt: 2000, extLabels: labels.FromStrings("a", "c")}, + {mint: 0, maxt: 1000, extLabels: labels.FromStrings("a", "d")}, + {mint: 2000, maxt: 3000, extLabels: labels.FromStrings("a", "d")}, } { id1, err := e2eutil.CreateBlock(ctx, dir, series, 10, tt.mint, tt.maxt, tt.extLabels, 0, metadata.NoneFunc) testutil.Ok(t, err) @@ -738,6 +740,16 @@ func TestBucketStore_TSDBInfo(t *testing.T) { MinTime: 0, MaxTime: 2000, }, + { + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: "a", Value: "d"}}}, + MinTime: 0, + MaxTime: 1000, + }, + { + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: "a", Value: "d"}}}, + MinTime: 2000, + MaxTime: 3000, + }, }) } diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 763e5cd9cb..5d1ccf82ee 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -2376,3 +2376,92 @@ func TestDistributedEngineWithExtendedFunctions(t *testing.T) { }, time.Now, promclient.QueryOptions{}, 1) testutil.Equals(t, model.SampleValue(0), result[0].Value) } + +func TestDistributedEngineWithDisjointTSDBs(t *testing.T) { + e, err := e2e.New(e2e.WithName("dist-disj-tsdbs")) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + ctx := context.Background() + l := log.NewLogfmtLogger(os.Stdout) + now := time.Now() + + bucket1 := "dist-disj-tsdbs-test1" + minio1 := e2edb.NewMinio(e, "1", bucket1, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(minio1)) + + bkt1, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket1, minio1.Endpoint("http"), minio1.Dir()), "test") + testutil.Ok(t, err) + + // Setup a storage GW with 2 blocks that have a gap to trigger distributed query MinT bug + dir1 := filepath.Join(e.SharedDir(), "tmp1") + testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir1), os.ModePerm)) + blockID1, err := e2eutil.CreateBlockWithBlockDelay(ctx, + dir1, + []labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")}, + 1000, + timestamp.FromTime(now.Add(-10*time.Hour)), + timestamp.FromTime(now.Add(-8*time.Hour)), + 30*time.Minute, + labels.FromStrings("prometheus", "p1", "replica", "0"), + 0, + metadata.NoneFunc, + ) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID1.String()), blockID1.String())) + + blockID2, err := e2eutil.CreateBlockWithBlockDelay(ctx, + dir1, + []labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")}, + 1000, + timestamp.FromTime(now.Add(-4*time.Hour)), + timestamp.FromTime(now.Add(-2*time.Hour)), + 30*time.Minute, + labels.FromStrings("prometheus", "p1", "replica", "0"), + 0, + metadata.NoneFunc, + ) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID2.String()), blockID2.String())) + store1 := e2ethanos.NewStoreGW( + e, + "s1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket1, minio1.InternalEndpoint("http"), minio1.InternalDir()), + }, + "", + "", + nil, + ) + testutil.Ok(t, e2e.StartAndWaitReady(store1)) + + querierLeaf1 := e2ethanos.NewQuerierBuilder(e, "1", store1.InternalEndpoint("grpc")).Init() + + // We need another querier to circumvent the passthrough optimizer + promConfig2 := e2ethanos.DefaultPromConfig("p2", 0, "", "", e2ethanos.LocalPrometheusTarget) + prom2, sidecar2 := e2ethanos.NewPrometheusWithSidecar(e, "p2", promConfig2, "", e2ethanos.DefaultPrometheusImage(), "") + querierLeaf2 := e2ethanos.NewQuerierBuilder(e, "2", sidecar2.InternalEndpoint("grpc")).Init() + + querierDistributed := e2ethanos.NewQuerierBuilder(e, "3", + querierLeaf1.InternalEndpoint("grpc"), + querierLeaf2.InternalEndpoint("grpc"), + ). + WithEngine(v1.PromqlEngineThanos). + WithQueryMode("distributed"). + Init() + + testutil.Ok(t, e2e.StartAndWaitReady(querierLeaf1, prom2, sidecar2, querierLeaf2, querierDistributed)) + + // We would expect 2x2h ranges for the 2 blocks containing foo samples. That would be around 240 expected sample pairs in the result matrix. + // We assert on more then 200 to reduce flakiness + rangeQuery(t, ctx, querierDistributed.Endpoint("http"), func() string { return "foo" }, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now), 60, promclient.QueryOptions{}, func(res model.Matrix) error { + if res.Len() < 1 { + return errors.New("No result series returned") + } + if nvals := len(res[0].Values); nvals < 200 { + return errors.Errorf("Too few values in result matrix, got %d, expected > 200", nvals) + } + return nil + }) +}