From c4523c2e95b9356257ca8df394f2051158affbab Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Sun, 20 Nov 2022 08:43:56 +0100 Subject: [PATCH] Compact tenant TSDBs after 3h inactivity The Receiver tenant retention mechanism has an edge case when a tenant TSDB gets evicted and uploaded to S3. Since there is a delay between uploads to S3 and downloads in Store Gateways, if a user executes a query between the upload and download time, they may not see the latest head block from the evicted tenant. As a result, this commit decouples head compaction from tenant eviction. Head compaction, as in Prometheus, will happen if there are no new appends after 1.5x max-block-size. This will also cause the compacted block to be uploaded to S3 by the shipper. Eviction will then kick in which will cause the tenant TSDB to be deleted. By this time, the latest head block would have been picked up by store-gateway and would be available during query execution. Signed-off-by: Filip Petkovski --- pkg/receive/multitsdb.go | 12 +++++++++--- pkg/receive/multitsdb_test.go | 8 ++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 9636c4b61a6..8972fdf88c5 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -322,16 +322,22 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst return false, nil } - sinceLastAppend := time.Since(time.UnixMilli(head.MaxTime())) - if sinceLastAppend.Milliseconds() <= t.tsdbOpts.RetentionDuration { + sinceLastAppendMillis := time.Since(time.UnixMilli(head.MaxTime())).Milliseconds() + compactThreshold := int64(1.5 * float64(t.tsdbOpts.MaxBlockDuration)) + if sinceLastAppendMillis <= compactThreshold { return false, nil } - level.Info(logger).Log("msg", "Pruning tenant") + level.Info(logger).Log("msg", "Compacting tenant") if err := tdb.CompactHead(tsdb.NewRangeHead(head, head.MinTime(), head.MaxTime())); err != nil { return false, err } + if sinceLastAppendMillis <= t.tsdbOpts.RetentionDuration { + return false, nil + } + + level.Info(logger).Log("msg", "Pruning tenant") if tenantInstance.shipper() != nil { uploaded, err := tenantInstance.shipper().Sync(ctx) if err != nil { diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index a89e24c2c5d..bc100945041 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -388,14 +388,14 @@ func TestMultiTSDBPrune(t *testing.T) { { name: "prune tsdbs without object storage", bucket: nil, - expectedTenants: 1, + expectedTenants: 2, expectedUploads: 0, }, { name: "prune tsdbs with object storage", bucket: objstore.NewInMemBucket(), - expectedTenants: 1, - expectedUploads: 2, + expectedTenants: 2, + expectedUploads: 1, }, } @@ -419,7 +419,7 @@ func TestMultiTSDBPrune(t *testing.T) { for i := 0; i < 100; i++ { testutil.Ok(t, appendSample(m, "foo", time.UnixMilli(int64(10+i)))) - testutil.Ok(t, appendSample(m, "bar", time.UnixMilli(int64(10+i)))) + testutil.Ok(t, appendSample(m, "bar", time.Now().Add(-4*time.Hour))) testutil.Ok(t, appendSample(m, "baz", time.Now().Add(time.Duration(i)*time.Second))) } testutil.Equals(t, 3, len(m.TSDBLocalClients()))