Skip to content

Commit

Permalink
Compact tenant TSDBs after 3h inactivity
Browse files Browse the repository at this point in the history
The Receiver tenant retention mechanism has an edge case when a
tenant TSDB gets evicted and uploaded to S3. Since there is a delay
between uploads to S3 and downloads in Store Gateways, if a user
executes a query between the upload and download time, they may
not see the latest head block from the evicted tenant.

As a result, this commit decouples head compaction from tenant eviction.
Head compaction, as in Prometheus, will happen if there are no new appends
after 1.5x max-block-size. This will also cause the compacted block to be
uploaded to S3 by the shipper. Eviction will then kick in which will cause
the tenant TSDB to be deleted.

By this time, the latest head block would have been picked up by store-gateway
and would be available during query execution.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Nov 20, 2022
1 parent baac7aa commit c4523c2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
12 changes: 9 additions & 3 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,16 +322,22 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
return false, nil
}

sinceLastAppend := time.Since(time.UnixMilli(head.MaxTime()))
if sinceLastAppend.Milliseconds() <= t.tsdbOpts.RetentionDuration {
sinceLastAppendMillis := time.Since(time.UnixMilli(head.MaxTime())).Milliseconds()
compactThreshold := int64(1.5 * float64(t.tsdbOpts.MaxBlockDuration))
if sinceLastAppendMillis <= compactThreshold {
return false, nil
}

level.Info(logger).Log("msg", "Pruning tenant")
level.Info(logger).Log("msg", "Compacting tenant")
if err := tdb.CompactHead(tsdb.NewRangeHead(head, head.MinTime(), head.MaxTime())); err != nil {
return false, err
}

if sinceLastAppendMillis <= t.tsdbOpts.RetentionDuration {
return false, nil
}

level.Info(logger).Log("msg", "Pruning tenant")
if tenantInstance.shipper() != nil {
uploaded, err := tenantInstance.shipper().Sync(ctx)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,14 @@ func TestMultiTSDBPrune(t *testing.T) {
{
name: "prune tsdbs without object storage",
bucket: nil,
expectedTenants: 1,
expectedTenants: 2,
expectedUploads: 0,
},
{
name: "prune tsdbs with object storage",
bucket: objstore.NewInMemBucket(),
expectedTenants: 1,
expectedUploads: 2,
expectedTenants: 2,
expectedUploads: 1,
},
}

Expand All @@ -419,7 +419,7 @@ func TestMultiTSDBPrune(t *testing.T) {

for i := 0; i < 100; i++ {
testutil.Ok(t, appendSample(m, "foo", time.UnixMilli(int64(10+i))))
testutil.Ok(t, appendSample(m, "bar", time.UnixMilli(int64(10+i))))
testutil.Ok(t, appendSample(m, "bar", time.Now().Add(-4*time.Hour)))
testutil.Ok(t, appendSample(m, "baz", time.Now().Add(time.Duration(i)*time.Second)))
}
testutil.Equals(t, 3, len(m.TSDBLocalClients()))
Expand Down

0 comments on commit c4523c2

Please sign in to comment.