diff --git a/CHANGELOG.md b/CHANGELOG.md index aa0821284c..1409f75058 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings. - [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Implement api/v1/status/tsdb. - [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: export metrics regarding size of remote write requests +- [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 491838e6e5..c01c67fb06 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -293,6 +293,21 @@ func runReceive( ) } + level.Debug(logger).Log("msg", "setting up periodic tenant pruning") + { + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(2*time.Hour, ctx.Done(), func() error { + if err := dbs.Prune(ctx); err != nil { + level.Error(logger).Log("err", err) + } + return nil + }) + }, func(err error) { + cancel() + }) + } + level.Info(logger).Log("msg", "starting receiver") return nil } @@ -778,7 +793,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false) - rc.retention = extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention.").Default("15d")) + rc.retention = extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention. For more details on how retention is enforced for individual tenants, please refer to the Tenant lifecycle management section in the Receive documentation: https://thanos.io/tip/components/receive.md/#tenant-lifecycle-management").Default("15d")) cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration. A watcher is initialized to watch changes and update the hashring dynamically.").PlaceHolder("").StringVar(&rc.hashringsFilePath) diff --git a/docs/components/receive.md b/docs/components/receive.md index b6aa60b5dd..30296221b7 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -18,6 +18,14 @@ Thanos Receive supports getting TSDB stats using the `/api/v1/status/tsdb` endpo Note that each Thanos Receive will only expose local stats and replicated series will not be included in the response. +## Tenant lifecycle management + +Tenants in Receivers are created dynamically and do not need to be provisioned upfront. When a new value is detected in the tenant HTTP header, Receivers will provision and start managing an independent TSDB for that tenant. TSDB blocks that are sent to S3 will contain a unique `tenant_id` label which can be used to compact blocks independently for each tenant. + +A Receiver will automatically decommission a tenant once new samples have not been seen for longer than the `--tsdb.retention` period configured for the Receiver. The tenant decommission process includes flushing all in-memory samples for that tenant to disk, sending all unsent blocks to S3, and removing the tenant TSDB from the filesystem. If a tenant receives new samples after being decommissioned, a new TSDB will be created for the tenant. + +Note that because of the built-in decommissioning process, the semantic of the `--tsdb.retention` flag in the Receiver is different than the one in Prometheus. For Receivers, `--tsdb.retention=t` indicates that the data for a tenant will be kept for `t` amount of time, whereas in Prometheus, `--tsdb.retention=t` denotes that the last `t` duration of data will be maintained in TSDB. In other words, Prometheus will keep the last `t` duration of data even when it stops getting new samples. + ## Example ```bash @@ -233,7 +241,12 @@ Flags: next startup. --tsdb.path="./data" Data directory of TSDB. --tsdb.retention=15d How long to retain raw samples on local - storage. 0d - disables this retention. + storage. 0d - disables this retention. For more + details on how retention is enforced for + individual tenants, please refer to the Tenant + lifecycle management section in the Receive + documentation: + https://thanos.io/tip/components/receive.md/#tenant-lifecycle-management --tsdb.wal-compression Compress the tsdb WAL. --version Show application version. diff --git a/pkg/errutil/multierror.go b/pkg/errutil/multierror.go index aa53706dde..40bce8a36a 100644 --- a/pkg/errutil/multierror.go +++ b/pkg/errutil/multierror.go @@ -6,6 +6,7 @@ package errutil import ( "bytes" "fmt" + "sync" ) // The MultiError type implements the error interface, and contains the @@ -32,6 +33,31 @@ func (es MultiError) Err() error { return NonNilMultiError(es) } +// SyncMultiError is a thread-safe implementation of MultiError. +type SyncMultiError struct { + mtx sync.Mutex + es MultiError +} + +// Add adds the error to the error list if it is not nil. +func (es *SyncMultiError) Add(err error) { + if err == nil { + return + } + es.mtx.Lock() + defer es.mtx.Unlock() + + es.Add(err) +} + +// Err returns the error list as an error or nil if it is empty. +func (es *SyncMultiError) Err() error { + es.mtx.Lock() + defer es.mtx.Unlock() + + return es.es.Err() +} + type NonNilMultiError MultiError // Returns a concatenated string of the contained errors. diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 7ffab5cfc3..e98cded856 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -10,6 +10,7 @@ import ( "path" "path/filepath" "sync" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -200,6 +201,89 @@ func (t *MultiTSDB) Close() error { return merr.Err() } +// Prune flushes and closes the TSDB for tenants that haven't received +// any new samples for longer than the TSDB retention period. +func (t *MultiTSDB) Prune(ctx context.Context) error { + // Retention of 0 means infinite retention. + if t.tsdbOpts.RetentionDuration == 0 { + return nil + } + + t.mtx.Lock() + defer t.mtx.Unlock() + + var ( + wg sync.WaitGroup + merr errutil.SyncMultiError + ) + + for tenantID, tenantInstance := range t.tenants { + wg.Add(1) + go func(tenantID string, tenantInstance *tenant) { + defer wg.Done() + tlog := log.With(t.logger, "tenant", tenantID) + pruned, err := t.pruneTSDB(ctx, tlog, tenantInstance) + if err != nil { + merr.Add(err) + return + } + + if pruned { + level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID) + delete(t.tenants, tenantID) + } + }(tenantID, tenantInstance) + } + wg.Wait() + + return merr.Err() +} + +// pruneTSDB removes a TSDB if its past the retention period. +// It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk. +func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (bool, error) { + tenantTSDB := tenantInstance.readyStorage().get() + if tenantTSDB == nil { + return false, nil + } + tdb := tenantTSDB.db + head := tdb.Head() + if head.MaxTime() < 0 { + return false, nil + } + + sinceLastAppend := time.Since(time.UnixMilli(head.MaxTime())) + if sinceLastAppend.Milliseconds() <= t.tsdbOpts.RetentionDuration { + return false, nil + } + + level.Info(logger).Log("msg", "Pruning tenant") + if err := tdb.CompactHead(tsdb.NewRangeHead(head, head.MinTime(), head.MaxTime())); err != nil { + return false, err + } + + if tenantInstance.shipper() != nil { + uploaded, err := tenantInstance.shipper().Sync(ctx) + if err != nil { + return false, err + } + + if uploaded > 0 { + level.Info(logger).Log("msg", "Uploaded head block") + } + } + + if err := tdb.Close(); err != nil { + return false, err + } + + if err := os.RemoveAll(tenantTSDB.db.Dir()); err != nil { + return false, err + } + + return true, nil +} + func (t *MultiTSDB) Sync(ctx context.Context) (int, error) { if t.bucket == nil { return 0, errors.New("bucket is not specified, Sync should not be invoked") diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index bc7ccea787..efa55a8b4f 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/go-kit/log" "github.com/gogo/protobuf/types" "github.com/prometheus/client_golang/prometheus" @@ -412,6 +414,94 @@ func checkExemplarsResponse(t *testing.T, name string, expected, data []exemplar } } +func TestMultiTSDBPrune(t *testing.T) { + tests := []struct { + name string + bucket objstore.Bucket + expectedTenants int + expectedUploads int + }{ + { + name: "prune tsdbs without object storage", + bucket: nil, + expectedTenants: 1, + expectedUploads: 0, + }, + { + name: "prune tsdbs with object storage", + bucket: objstore.NewInMemBucket(), + expectedTenants: 1, + expectedUploads: 2, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dir, err := ioutil.TempDir("", "multitsdb-prune") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), + &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, + labels.FromStrings("replica", "test"), + "tenant_id", + test.bucket, + false, + metadata.NoneFunc, + ) + defer func() { testutil.Ok(t, m.Close()) }() + + for i := 0; i < 100; i++ { + testutil.Ok(t, appendSample(m, "foo", time.UnixMilli(int64(10+i)))) + testutil.Ok(t, appendSample(m, "bar", time.UnixMilli(int64(10+i)))) + testutil.Ok(t, appendSample(m, "baz", time.Now().Add(time.Duration(i)*time.Second))) + } + testutil.Equals(t, 3, len(m.TSDBStores())) + + testutil.Ok(t, m.Prune(context.Background())) + testutil.Equals(t, test.expectedTenants, len(m.TSDBStores())) + + var shippedBlocks int + if test.bucket != nil { + testutil.Ok(t, test.bucket.Iter(context.Background(), "", func(s string) error { + shippedBlocks++ + return nil + })) + } + testutil.Equals(t, test.expectedUploads, shippedBlocks) + }) + } +} + +func appendSample(m *MultiTSDB, tenant string, timestamp time.Time) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + app, err := m.TenantAppendable(tenant) + if err != nil { + return err + } + + var a storage.Appender + if err := runutil.Retry(1*time.Second, ctx.Done(), func() error { + a, err = app.Appender(ctx) + return err + }); err != nil { + return err + } + + _, err = a.Append(0, labels.FromStrings("foo", "bar"), timestamp.UnixMilli(), 10) + if err != nil { + return err + } + + return a.Commit() +} + func BenchmarkMultiTSDB(b *testing.B) { dir, err := ioutil.TempDir("", "multitsdb") testutil.Ok(b, err)