diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index a2e65a6280..5ea7bfcc5b 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "go.uber.org/atomic" "golang.org/x/exp/slices" @@ -154,7 +155,34 @@ type tenant struct { exemplarsTSDB *exemplars.TSDB ship *shipper.Shipper - mtx *sync.RWMutex + mtx *sync.RWMutex + tsdb *tsdb.DB + + // For tests. + blocksToDeleteFn func(db *tsdb.DB) tsdb.BlocksToDeleteFunc +} + +func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} { + t.mtx.RLock() + defer t.mtx.RUnlock() + + if t.tsdb == nil { + return nil + } + + deletable := t.blocksToDeleteFn(t.tsdb)(blocks) + if t.ship == nil { + return deletable + } + + uploaded := t.ship.UploadedBlocks() + for deletableID := range deletable { + if _, ok := uploaded[deletableID]; !ok { + delete(deletable, deletableID) + } + } + + return deletable } func newTenant() *tenant { @@ -202,14 +230,15 @@ func (t *tenant) shipper() *shipper.Shipper { func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) { t.readyS.Set(tenantTSDB) t.mtx.Lock() - t.setComponents(storeTSDB, ship, exemplarsTSDB) + t.setComponents(storeTSDB, ship, exemplarsTSDB, tenantTSDB) t.mtx.Unlock() } -func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) { +func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) { t.storeTSDB = storeTSDB t.ship = ship t.exemplarsTSDB = exemplarsTSDB + t.tsdb = tenantTSDB } func (t *MultiTSDB) Open() error { @@ -353,7 +382,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { // pruneTSDB removes a TSDB if its past the retention period. // It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk. -func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (bool, error) { +func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (pruned bool, rerr error) { tenantTSDB := tenantInstance.readyStorage() if tenantTSDB == nil { return false, nil @@ -383,9 +412,21 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst tenantTSDB.mtx.Lock() defer tenantTSDB.mtx.Unlock() - // Lock the entire tenant to make sure the shipper is not running in parallel. + // Make sure the shipper is not running in parallel. tenantInstance.mtx.Lock() - defer tenantInstance.mtx.Unlock() + shipper := tenantInstance.ship + tenantInstance.ship = nil + tenantInstance.mtx.Unlock() + + defer func() { + if pruned { + return + } + // If the tenant was not pruned, re-enable the shipper. + tenantInstance.mtx.Lock() + tenantInstance.ship = shipper + tenantInstance.mtx.Unlock() + }() sinceLastAppendMillis = time.Since(time.UnixMilli(head.MaxTime())).Milliseconds() if sinceLastAppendMillis <= compactThreshold { @@ -402,8 +443,9 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst } level.Info(logger).Log("msg", "Pruning tenant") - if tenantInstance.ship != nil { - uploaded, err := tenantInstance.ship.Sync(ctx) + if shipper != nil { + // No other code can reach this shipper anymore so enable it again to be able to sync manually. + uploaded, err := shipper.Sync(ctx) if err != nil { return false, err } @@ -421,8 +463,10 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst return false, err } + tenantInstance.mtx.Lock() tenantInstance.readyS.set(nil) - tenantInstance.setComponents(nil, nil, nil) + tenantInstance.setComponents(nil, nil, nil, nil) + tenantInstance.mtx.Unlock() return true, nil } @@ -574,6 +618,8 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant level.Info(logger).Log("msg", "opening TSDB") opts := *t.tsdbOpts + opts.BlocksToDelete = tenant.blocksToDelete + tenant.blocksToDeleteFn = tsdb.DefaultBlocksToDelete // NOTE(GiedriusS): always set to false to properly handle OOO samples - OOO samples are written into the WBL // which gets later converted into a block. Without setting this flag to false, the block would get compacted diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 89aeebab73..53d99cabfa 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -9,17 +9,21 @@ import ( "math" "os" "path" + "path/filepath" "strings" + "sync" "testing" "time" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" + "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -28,6 +32,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -466,8 +471,16 @@ func TestMultiTSDBPrune(t *testing.T) { } testutil.Ok(t, m.Prune(ctx)) + if test.bucket != nil { + _, err := m.Sync(ctx) + testutil.Ok(t, err) + } + testutil.Equals(t, test.expectedTenants, len(m.TSDBLocalClients())) var shippedBlocks int + if test.bucket == nil && shippedBlocks > 0 { + t.Fatal("can't expect uploads when there is no bucket") + } if test.bucket != nil { testutil.Ok(t, test.bucket.Iter(context.Background(), "", func(s string) error { shippedBlocks++ @@ -829,3 +842,59 @@ func BenchmarkMultiTSDB(b *testing.B) { _, _ = a.Append(0, l, int64(i), float64(i)) } } + +func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) { + tenant := &tenant{ + mtx: &sync.RWMutex{}, + } + + t.Run("no blocks", func(t *testing.T) { + require.Equal(t, (map[ulid.ULID]struct{})(nil), tenant.blocksToDelete(nil)) + }) + + tenant.tsdb = &tsdb.DB{} + + mockBlockIDs := []ulid.ULID{ + ulid.MustNew(1, nil), + ulid.MustNew(2, nil), + } + + t.Run("no shipper", func(t *testing.T) { + tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc { + return func(_ []*tsdb.Block) map[ulid.ULID]struct{} { + return map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + } + } + } + + require.Equal(t, map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + }, tenant.blocksToDelete(nil)) + }) + + t.Run("some blocks uploaded", func(t *testing.T) { + tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc { + return func(_ []*tsdb.Block) map[ulid.ULID]struct{} { + return map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + } + } + } + + td := t.TempDir() + + require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), filepath.Join(td, shipper.DefaultMetaFilename), &shipper.Meta{ + Version: shipper.MetaVersion1, + Uploaded: []ulid.ULID{mockBlockIDs[0]}, + })) + + tenant.ship = shipper.New(log.NewNopLogger(), nil, td, nil, nil, metadata.BucketUploadSource, nil, false, metadata.NoneFunc, "") + require.Equal(t, map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + }, tenant.blocksToDelete(nil)) + }) +} diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index ec41466ce4..26fd679f54 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -225,9 +225,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR numExemplarsLabelLength++ level.Debug(exLogger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength) default: - if err != nil { - level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err) - } + level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err) } } } diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index f01c4b2b35..6b7be6cd12 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -136,13 +136,6 @@ func (s *Shipper) SetLabels(lbls labels.Labels) { s.labels = func() labels.Labels { return lbls } } -func (s *Shipper) getLabels() labels.Labels { - s.mtx.RLock() - defer s.mtx.RUnlock() - - return s.labels() -} - // Timestamps returns the minimum timestamp for which data is available and the highest timestamp // of blocks that were successfully uploaded. func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { @@ -254,6 +247,9 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo // // It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok). func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { + s.mtx.Lock() + defer s.mtx.Unlock() + meta, err := ReadMetaFile(s.metadataFilePath) if err != nil { // If we encounter any error, proceed with an empty meta file and overwrite it later. @@ -275,7 +271,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { meta.Uploaded = nil var ( - checker = newLazyOverlapChecker(s.logger, s.bucket, s.getLabels) + checker = newLazyOverlapChecker(s.logger, s.bucket, func() labels.Labels { return s.labels() }) uploadErrs int ) @@ -355,6 +351,21 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { return uploaded, nil } +func (s *Shipper) UploadedBlocks() map[ulid.ULID]struct{} { + meta, err := ReadMetaFile(s.metadataFilePath) + if err != nil { + // NOTE(GiedriusS): Sync() will inform users about any problems. + return nil + } + + ret := make(map[ulid.ULID]struct{}, len(meta.Uploaded)) + for _, id := range meta.Uploaded { + ret[id] = struct{}{} + } + + return ret +} + // sync uploads the block if not exists in remote storage. // TODO(khyatisoneji): Double check if block does not have deletion-mark.json for some reason, otherwise log it or return error. func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { @@ -382,7 +393,7 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { return errors.Wrap(err, "hard link block") } // Attach current labels and write a new meta file with Thanos extensions. - if lset := s.getLabels(); !lset.IsEmpty() { + if lset := s.labels(); !lset.IsEmpty() { lset.Range(func(l labels.Label) { meta.Thanos.Labels[l.Name] = l.Value })