Skip to content

Commit

Permalink
store, query: remote engine bug (#7904)
Browse files Browse the repository at this point in the history
* Fix a storage GW bug that loses TSDB infos when joining them
* E2E test demonstrating a bug in the MinT calculation in distributed
  Engine

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann authored Nov 15, 2024
1 parent 20af3eb commit caa972f
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 7 deletions.
3 changes: 0 additions & 3 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,10 @@ func (r *remoteEngine) MinT() int64 {
highestMintByLabelSet[key] = lset.MinTime
continue
}

if lset.MinTime > lsetMinT {
highestMintByLabelSet[key] = lset.MinTime
}
}

var mint int64 = math.MaxInt64
for _, m := range highestMintByLabelSet {
if m < mint {
Expand Down Expand Up @@ -190,7 +188,6 @@ func (r *remoteEngine) adjustedInfos() infopb.TSDBInfos {
labelpb.ZLabelsFromPromLabels(builder.Labels())),
)
}

return infos
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,17 +922,15 @@ func (s *BucketStore) TSDBInfos() []infopb.TSDBInfo {
sort.Slice(infos, func(i, j int) bool { return infos[i].MinTime < infos[j].MinTime })

cur := infos[0]
for i, info := range infos {
for _, info := range infos {
if info.MinTime > cur.MaxTime {
res = append(res, cur)
cur = info
continue
}
cur.MaxTime = info.MaxTime
if i == len(infos)-1 {
res = append(res, cur)
}
}
res = append(res, cur)
}

return res
Expand Down
12 changes: 12 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ func TestBucketStore_TSDBInfo(t *testing.T) {
{mint: 3500, maxt: 5000, extLabels: labels.FromStrings("a", "b")},
{mint: 0, maxt: 1000, extLabels: labels.FromStrings("a", "c")},
{mint: 500, maxt: 2000, extLabels: labels.FromStrings("a", "c")},
{mint: 0, maxt: 1000, extLabels: labels.FromStrings("a", "d")},
{mint: 2000, maxt: 3000, extLabels: labels.FromStrings("a", "d")},
} {
id1, err := e2eutil.CreateBlock(ctx, dir, series, 10, tt.mint, tt.maxt, tt.extLabels, 0, metadata.NoneFunc)
testutil.Ok(t, err)
Expand Down Expand Up @@ -738,6 +740,16 @@ func TestBucketStore_TSDBInfo(t *testing.T) {
MinTime: 0,
MaxTime: 2000,
},
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: "a", Value: "d"}}},
MinTime: 0,
MaxTime: 1000,
},
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: "a", Value: "d"}}},
MinTime: 2000,
MaxTime: 3000,
},
})
}

Expand Down
89 changes: 89 additions & 0 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2376,3 +2376,92 @@ func TestDistributedEngineWithExtendedFunctions(t *testing.T) {
}, time.Now, promclient.QueryOptions{}, 1)
testutil.Equals(t, model.SampleValue(0), result[0].Value)
}

func TestDistributedEngineWithDisjointTSDBs(t *testing.T) {
e, err := e2e.New(e2e.WithName("dist-disj-tsdbs"))
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

ctx := context.Background()
l := log.NewLogfmtLogger(os.Stdout)
now := time.Now()

bucket1 := "dist-disj-tsdbs-test1"
minio1 := e2edb.NewMinio(e, "1", bucket1, e2edb.WithMinioTLS())
testutil.Ok(t, e2e.StartAndWaitReady(minio1))

bkt1, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket1, minio1.Endpoint("http"), minio1.Dir()), "test")

Check failure on line 2393 in test/e2e/query_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

not enough arguments in call to s3.NewBucketWithConfig

Check failure on line 2393 in test/e2e/query_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

not enough arguments in call to s3.NewBucketWithConfig

Check failure on line 2393 in test/e2e/query_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 4)

not enough arguments in call to s3.NewBucketWithConfig

Check failure on line 2393 in test/e2e/query_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 2)

not enough arguments in call to s3.NewBucketWithConfig

Check failure on line 2393 in test/e2e/query_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 5)

not enough arguments in call to s3.NewBucketWithConfig

Check failure on line 2393 in test/e2e/query_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 3)

not enough arguments in call to s3.NewBucketWithConfig

Check failure on line 2393 in test/e2e/query_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 0)

not enough arguments in call to s3.NewBucketWithConfig

Check failure on line 2393 in test/e2e/query_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 1)

not enough arguments in call to s3.NewBucketWithConfig

Check failure on line 2393 in test/e2e/query_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 6)

not enough arguments in call to s3.NewBucketWithConfig

Check failure on line 2393 in test/e2e/query_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 7)

not enough arguments in call to s3.NewBucketWithConfig
testutil.Ok(t, err)

// Setup a storage GW with 2 blocks that have a gap to trigger distributed query MinT bug
dir1 := filepath.Join(e.SharedDir(), "tmp1")
testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir1), os.ModePerm))
blockID1, err := e2eutil.CreateBlockWithBlockDelay(ctx,
dir1,
[]labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")},
1000,
timestamp.FromTime(now.Add(-10*time.Hour)),
timestamp.FromTime(now.Add(-8*time.Hour)),
30*time.Minute,
labels.FromStrings("prometheus", "p1", "replica", "0"),
0,
metadata.NoneFunc,
)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID1.String()), blockID1.String()))

blockID2, err := e2eutil.CreateBlockWithBlockDelay(ctx,
dir1,
[]labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")},
1000,
timestamp.FromTime(now.Add(-4*time.Hour)),
timestamp.FromTime(now.Add(-2*time.Hour)),
30*time.Minute,
labels.FromStrings("prometheus", "p1", "replica", "0"),
0,
metadata.NoneFunc,
)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID2.String()), blockID2.String()))
store1 := e2ethanos.NewStoreGW(
e,
"s1",
client.BucketConfig{
Type: client.S3,
Config: e2ethanos.NewS3Config(bucket1, minio1.InternalEndpoint("http"), minio1.InternalDir()),
},
"",
"",
nil,
)
testutil.Ok(t, e2e.StartAndWaitReady(store1))

querierLeaf1 := e2ethanos.NewQuerierBuilder(e, "1", store1.InternalEndpoint("grpc")).Init()

// We need another querier to circumvent the passthrough optimizer
promConfig2 := e2ethanos.DefaultPromConfig("p2", 0, "", "", e2ethanos.LocalPrometheusTarget)
prom2, sidecar2 := e2ethanos.NewPrometheusWithSidecar(e, "p2", promConfig2, "", e2ethanos.DefaultPrometheusImage(), "")
querierLeaf2 := e2ethanos.NewQuerierBuilder(e, "2", sidecar2.InternalEndpoint("grpc")).Init()

querierDistributed := e2ethanos.NewQuerierBuilder(e, "3",
querierLeaf1.InternalEndpoint("grpc"),
querierLeaf2.InternalEndpoint("grpc"),
).
WithEngine(v1.PromqlEngineThanos).
WithQueryMode("distributed").
Init()

testutil.Ok(t, e2e.StartAndWaitReady(querierLeaf1, prom2, sidecar2, querierLeaf2, querierDistributed))

// We would expect 2x2h ranges for the 2 blocks containing foo samples. That would be around 240 expected sample pairs in the result matrix.
// We assert on more then 200 to reduce flakiness
rangeQuery(t, ctx, querierDistributed.Endpoint("http"), func() string { return "foo" }, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now), 60, promclient.QueryOptions{}, func(res model.Matrix) error {
if res.Len() < 1 {
return errors.New("No result series returned")
}
if nvals := len(res[0].Values); nvals < 200 {
return errors.Errorf("Too few values in result matrix, got %d, expected > 200", nvals)
}
return nil
})
}

0 comments on commit caa972f

Please sign in to comment.