Skip to content

Commit

Permalink
Implement tenant expiration (#5420)
Browse files Browse the repository at this point in the history
* Implement tenant expiration

This commit adds dynamic TSDB pruning for tenants which have not
received new samples within a certain period of time.

Signed-off-by: Filip Petkovski <[email protected]>

* Add link to receiver tenant-lifecycle-management

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored Jun 27, 2022
1 parent eb65de3 commit 37cc176
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Implement api/v1/status/tsdb.
- [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: export metrics regarding size of remote write requests
- [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants.

### Changed

Expand Down
17 changes: 16 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,21 @@ func runReceive(
)
}

level.Debug(logger).Log("msg", "setting up periodic tenant pruning")
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(2*time.Hour, ctx.Done(), func() error {
if err := dbs.Prune(ctx); err != nil {
level.Error(logger).Log("err", err)
}
return nil
})
}, func(err error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting receiver")
return nil
}
Expand Down Expand Up @@ -778,7 +793,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)

rc.retention = extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention.").Default("15d"))
rc.retention = extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention. For more details on how retention is enforced for individual tenants, please refer to the Tenant lifecycle management section in the Receive documentation: https://thanos.io/tip/components/receive.md/#tenant-lifecycle-management").Default("15d"))

cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration. A watcher is initialized to watch changes and update the hashring dynamically.").PlaceHolder("<path>").StringVar(&rc.hashringsFilePath)

Expand Down
15 changes: 14 additions & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ Thanos Receive supports getting TSDB stats using the `/api/v1/status/tsdb` endpo

Note that each Thanos Receive will only expose local stats and replicated series will not be included in the response.

## Tenant lifecycle management

Tenants in Receivers are created dynamically and do not need to be provisioned upfront. When a new value is detected in the tenant HTTP header, Receivers will provision and start managing an independent TSDB for that tenant. TSDB blocks that are sent to S3 will contain a unique `tenant_id` label which can be used to compact blocks independently for each tenant.

A Receiver will automatically decommission a tenant once new samples have not been seen for longer than the `--tsdb.retention` period configured for the Receiver. The tenant decommission process includes flushing all in-memory samples for that tenant to disk, sending all unsent blocks to S3, and removing the tenant TSDB from the filesystem. If a tenant receives new samples after being decommissioned, a new TSDB will be created for the tenant.

Note that because of the built-in decommissioning process, the semantic of the `--tsdb.retention` flag in the Receiver is different than the one in Prometheus. For Receivers, `--tsdb.retention=t` indicates that the data for a tenant will be kept for `t` amount of time, whereas in Prometheus, `--tsdb.retention=t` denotes that the last `t` duration of data will be maintained in TSDB. In other words, Prometheus will keep the last `t` duration of data even when it stops getting new samples.

## Example

```bash
Expand Down Expand Up @@ -233,7 +241,12 @@ Flags:
next startup.
--tsdb.path="./data" Data directory of TSDB.
--tsdb.retention=15d How long to retain raw samples on local
storage. 0d - disables this retention.
storage. 0d - disables this retention. For more
details on how retention is enforced for
individual tenants, please refer to the Tenant
lifecycle management section in the Receive
documentation:
https://thanos.io/tip/components/receive.md/#tenant-lifecycle-management
--tsdb.wal-compression Compress the tsdb WAL.
--version Show application version.
Expand Down
26 changes: 26 additions & 0 deletions pkg/errutil/multierror.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package errutil
import (
"bytes"
"fmt"
"sync"
)

// The MultiError type implements the error interface, and contains the
Expand All @@ -32,6 +33,31 @@ func (es MultiError) Err() error {
return NonNilMultiError(es)
}

// SyncMultiError is a thread-safe implementation of MultiError.
type SyncMultiError struct {
mtx sync.Mutex
es MultiError
}

// Add adds the error to the error list if it is not nil.
func (es *SyncMultiError) Add(err error) {
if err == nil {
return
}
es.mtx.Lock()
defer es.mtx.Unlock()

es.Add(err)
}

// Err returns the error list as an error or nil if it is empty.
func (es *SyncMultiError) Err() error {
es.mtx.Lock()
defer es.mtx.Unlock()

return es.es.Err()
}

type NonNilMultiError MultiError

// Returns a concatenated string of the contained errors.
Expand Down
84 changes: 84 additions & 0 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"path/filepath"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -200,6 +201,89 @@ func (t *MultiTSDB) Close() error {
return merr.Err()
}

// Prune flushes and closes the TSDB for tenants that haven't received
// any new samples for longer than the TSDB retention period.
func (t *MultiTSDB) Prune(ctx context.Context) error {
// Retention of 0 means infinite retention.
if t.tsdbOpts.RetentionDuration == 0 {
return nil
}

t.mtx.Lock()
defer t.mtx.Unlock()

var (
wg sync.WaitGroup
merr errutil.SyncMultiError
)

for tenantID, tenantInstance := range t.tenants {
wg.Add(1)
go func(tenantID string, tenantInstance *tenant) {
defer wg.Done()
tlog := log.With(t.logger, "tenant", tenantID)
pruned, err := t.pruneTSDB(ctx, tlog, tenantInstance)
if err != nil {
merr.Add(err)
return
}

if pruned {
level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
delete(t.tenants, tenantID)
}
}(tenantID, tenantInstance)
}
wg.Wait()

return merr.Err()
}

// pruneTSDB removes a TSDB if its past the retention period.
// It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk.
func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (bool, error) {
tenantTSDB := tenantInstance.readyStorage().get()
if tenantTSDB == nil {
return false, nil
}
tdb := tenantTSDB.db
head := tdb.Head()
if head.MaxTime() < 0 {
return false, nil
}

sinceLastAppend := time.Since(time.UnixMilli(head.MaxTime()))
if sinceLastAppend.Milliseconds() <= t.tsdbOpts.RetentionDuration {
return false, nil
}

level.Info(logger).Log("msg", "Pruning tenant")
if err := tdb.CompactHead(tsdb.NewRangeHead(head, head.MinTime(), head.MaxTime())); err != nil {
return false, err
}

if tenantInstance.shipper() != nil {
uploaded, err := tenantInstance.shipper().Sync(ctx)
if err != nil {
return false, err
}

if uploaded > 0 {
level.Info(logger).Log("msg", "Uploaded head block")
}
}

if err := tdb.Close(); err != nil {
return false, err
}

if err := os.RemoveAll(tenantTSDB.db.Dir()); err != nil {
return false, err
}

return true, nil
}

func (t *MultiTSDB) Sync(ctx context.Context) (int, error) {
if t.bucket == nil {
return 0, errors.New("bucket is not specified, Sync should not be invoked")
Expand Down
90 changes: 90 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

"github.com/thanos-io/thanos/pkg/objstore"

"github.com/go-kit/log"
"github.com/gogo/protobuf/types"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -412,6 +414,94 @@ func checkExemplarsResponse(t *testing.T, name string, expected, data []exemplar
}
}

func TestMultiTSDBPrune(t *testing.T) {
tests := []struct {
name string
bucket objstore.Bucket
expectedTenants int
expectedUploads int
}{
{
name: "prune tsdbs without object storage",
bucket: nil,
expectedTenants: 1,
expectedUploads: 0,
},
{
name: "prune tsdbs with object storage",
bucket: objstore.NewInMemBucket(),
expectedTenants: 1,
expectedUploads: 2,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "multitsdb-prune")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(),
&tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
},
labels.FromStrings("replica", "test"),
"tenant_id",
test.bucket,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()

for i := 0; i < 100; i++ {
testutil.Ok(t, appendSample(m, "foo", time.UnixMilli(int64(10+i))))
testutil.Ok(t, appendSample(m, "bar", time.UnixMilli(int64(10+i))))
testutil.Ok(t, appendSample(m, "baz", time.Now().Add(time.Duration(i)*time.Second)))
}
testutil.Equals(t, 3, len(m.TSDBStores()))

testutil.Ok(t, m.Prune(context.Background()))
testutil.Equals(t, test.expectedTenants, len(m.TSDBStores()))

var shippedBlocks int
if test.bucket != nil {
testutil.Ok(t, test.bucket.Iter(context.Background(), "", func(s string) error {
shippedBlocks++
return nil
}))
}
testutil.Equals(t, test.expectedUploads, shippedBlocks)
})
}
}

func appendSample(m *MultiTSDB, tenant string, timestamp time.Time) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

app, err := m.TenantAppendable(tenant)
if err != nil {
return err
}

var a storage.Appender
if err := runutil.Retry(1*time.Second, ctx.Done(), func() error {
a, err = app.Appender(ctx)
return err
}); err != nil {
return err
}

_, err = a.Append(0, labels.FromStrings("foo", "bar"), timestamp.UnixMilli(), 10)
if err != nil {
return err
}

return a.Commit()
}

func BenchmarkMultiTSDB(b *testing.B) {
dir, err := ioutil.TempDir("", "multitsdb")
testutil.Ok(b, err)
Expand Down

0 comments on commit 37cc176

Please sign in to comment.