From f175b1badafa788bd229cd6aa2905a12ad8b7257 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 28 Feb 2024 16:53:45 +0200 Subject: [PATCH 1/3] receive/multitsdb: do not delete not uploaded blocks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a block hasn't been uploaded yet then tell the TSDB layer not to delete them. This prevents a nasty race where the TSDB layer can delete a block before the shipper gets to it. I saw this happen with a very small block. Signed-off-by: Giedrius Statkevičius --- pkg/receive/multitsdb.go | 67 +++++++++++++++++++++++++++++----- pkg/receive/multitsdb_test.go | 69 +++++++++++++++++++++++++++++++++++ pkg/receive/writer.go | 4 +- pkg/shipper/shipper.go | 51 ++++++++++++++++++++++++++ 4 files changed, 179 insertions(+), 12 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index a2e65a6280..5b0d75980c 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "go.uber.org/atomic" "golang.org/x/exp/slices" @@ -154,7 +155,34 @@ type tenant struct { exemplarsTSDB *exemplars.TSDB ship *shipper.Shipper - mtx *sync.RWMutex + mtx *sync.RWMutex + tsdb *tsdb.DB + + // For tests. + blocksToDeleteFn func(db *tsdb.DB) tsdb.BlocksToDeleteFunc +} + +func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} { + t.mtx.RLock() + defer t.mtx.RUnlock() + + if t.tsdb == nil { + return nil + } + + deletable := t.blocksToDeleteFn(t.tsdb)(blocks) + if t.ship == nil { + return deletable + } + + uploaded := t.ship.UploadedBlocks() + for deletableID := range deletable { + if _, ok := uploaded[deletableID]; !ok { + delete(deletable, deletableID) + } + } + + return deletable } func newTenant() *tenant { @@ -202,14 +230,15 @@ func (t *tenant) shipper() *shipper.Shipper { func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) { t.readyS.Set(tenantTSDB) t.mtx.Lock() - t.setComponents(storeTSDB, ship, exemplarsTSDB) + t.setComponents(storeTSDB, ship, exemplarsTSDB, tenantTSDB) t.mtx.Unlock() } -func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) { +func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) { t.storeTSDB = storeTSDB t.ship = ship t.exemplarsTSDB = exemplarsTSDB + t.tsdb = tenantTSDB } func (t *MultiTSDB) Open() error { @@ -353,7 +382,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { // pruneTSDB removes a TSDB if its past the retention period. // It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk. -func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (bool, error) { +func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (pruned bool, rerr error) { tenantTSDB := tenantInstance.readyStorage() if tenantTSDB == nil { return false, nil @@ -383,9 +412,23 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst tenantTSDB.mtx.Lock() defer tenantTSDB.mtx.Unlock() - // Lock the entire tenant to make sure the shipper is not running in parallel. + // Make sure the shipper is not running in parallel. tenantInstance.mtx.Lock() - defer tenantInstance.mtx.Unlock() + shipper := tenantInstance.ship + tenantInstance.ship = nil + tenantInstance.mtx.Unlock() + shipper.DisableWait() + + defer func() { + if pruned { + return + } + // If the tenant was not pruned, re-enable the shipper. + tenantInstance.mtx.Lock() + tenantInstance.ship = shipper + shipper.Enable() + tenantInstance.mtx.Unlock() + }() sinceLastAppendMillis = time.Since(time.UnixMilli(head.MaxTime())).Milliseconds() if sinceLastAppendMillis <= compactThreshold { @@ -402,8 +445,10 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst } level.Info(logger).Log("msg", "Pruning tenant") - if tenantInstance.ship != nil { - uploaded, err := tenantInstance.ship.Sync(ctx) + if shipper != nil { + // No other code can reach this shipper anymore so enable it again to be able to sync manually. + shipper.Enable() + uploaded, err := shipper.Sync(ctx) if err != nil { return false, err } @@ -421,8 +466,10 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst return false, err } + tenantInstance.mtx.Lock() tenantInstance.readyS.set(nil) - tenantInstance.setComponents(nil, nil, nil) + tenantInstance.setComponents(nil, nil, nil, nil) + tenantInstance.mtx.Unlock() return true, nil } @@ -574,6 +621,8 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant level.Info(logger).Log("msg", "opening TSDB") opts := *t.tsdbOpts + opts.BlocksToDelete = tenant.blocksToDelete + tenant.blocksToDeleteFn = tsdb.DefaultBlocksToDelete // NOTE(GiedriusS): always set to false to properly handle OOO samples - OOO samples are written into the WBL // which gets later converted into a block. Without setting this flag to false, the block would get compacted diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 89aeebab73..53d99cabfa 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -9,17 +9,21 @@ import ( "math" "os" "path" + "path/filepath" "strings" + "sync" "testing" "time" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" + "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -28,6 +32,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -466,8 +471,16 @@ func TestMultiTSDBPrune(t *testing.T) { } testutil.Ok(t, m.Prune(ctx)) + if test.bucket != nil { + _, err := m.Sync(ctx) + testutil.Ok(t, err) + } + testutil.Equals(t, test.expectedTenants, len(m.TSDBLocalClients())) var shippedBlocks int + if test.bucket == nil && shippedBlocks > 0 { + t.Fatal("can't expect uploads when there is no bucket") + } if test.bucket != nil { testutil.Ok(t, test.bucket.Iter(context.Background(), "", func(s string) error { shippedBlocks++ @@ -829,3 +842,59 @@ func BenchmarkMultiTSDB(b *testing.B) { _, _ = a.Append(0, l, int64(i), float64(i)) } } + +func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) { + tenant := &tenant{ + mtx: &sync.RWMutex{}, + } + + t.Run("no blocks", func(t *testing.T) { + require.Equal(t, (map[ulid.ULID]struct{})(nil), tenant.blocksToDelete(nil)) + }) + + tenant.tsdb = &tsdb.DB{} + + mockBlockIDs := []ulid.ULID{ + ulid.MustNew(1, nil), + ulid.MustNew(2, nil), + } + + t.Run("no shipper", func(t *testing.T) { + tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc { + return func(_ []*tsdb.Block) map[ulid.ULID]struct{} { + return map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + } + } + } + + require.Equal(t, map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + }, tenant.blocksToDelete(nil)) + }) + + t.Run("some blocks uploaded", func(t *testing.T) { + tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc { + return func(_ []*tsdb.Block) map[ulid.ULID]struct{} { + return map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + } + } + } + + td := t.TempDir() + + require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), filepath.Join(td, shipper.DefaultMetaFilename), &shipper.Meta{ + Version: shipper.MetaVersion1, + Uploaded: []ulid.ULID{mockBlockIDs[0]}, + })) + + tenant.ship = shipper.New(log.NewNopLogger(), nil, td, nil, nil, metadata.BucketUploadSource, nil, false, metadata.NoneFunc, "") + require.Equal(t, map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + }, tenant.blocksToDelete(nil)) + }) +} diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index ec41466ce4..26fd679f54 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -225,9 +225,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR numExemplarsLabelLength++ level.Debug(exLogger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength) default: - if err != nil { - level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err) - } + level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err) } } } diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index f01c4b2b35..6f3d9bed7c 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -82,6 +82,8 @@ type Shipper struct { labels func() labels.Labels mtx sync.RWMutex + closed bool + wg sync.WaitGroup } // New creates a new shipper that detects new TSDB blocks in dir and uploads them to @@ -247,6 +249,30 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo return nil } +// DisableWait disables the shipper and waits for all ongoing syncs to finish. +// Useful when you want to sync one last time before pruning a TSDB. +func (s *Shipper) DisableWait() { + if s == nil { + return + } + s.mtx.Lock() + s.closed = true + s.mtx.Unlock() + s.wg.Wait() +} + +// Enable enables the shipper again. +// Useful when you want to sync one last time before pruning a TSDB. +// Remove all references to the shipper, call DisableWait, call Enable, and then call Sync() one last time. +func (s *Shipper) Enable() { + if s == nil { + return + } + s.mtx.Lock() + s.closed = false + s.mtx.Unlock() +} + // Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded // to the object bucket once. // @@ -254,6 +280,16 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo // // It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok). func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { + s.mtx.Lock() + if s.closed { + s.mtx.Unlock() + return 0, nil + } + s.wg.Add(1) + s.mtx.Unlock() + + defer s.wg.Done() + meta, err := ReadMetaFile(s.metadataFilePath) if err != nil { // If we encounter any error, proceed with an empty meta file and overwrite it later. @@ -355,6 +391,21 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { return uploaded, nil } +func (s *Shipper) UploadedBlocks() map[ulid.ULID]struct{} { + meta, err := ReadMetaFile(s.metadataFilePath) + if err != nil { + // NOTE(GiedriusS): Sync() will inform users about any problems. + return nil + } + + ret := make(map[ulid.ULID]struct{}, len(meta.Uploaded)) + for _, id := range meta.Uploaded { + ret[id] = struct{}{} + } + + return ret +} + // sync uploads the block if not exists in remote storage. // TODO(khyatisoneji): Double check if block does not have deletion-mark.json for some reason, otherwise log it or return error. func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { From 36f5ff1edee8dc2b81bd1ec216dfa89ee9b43741 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 7 Mar 2024 16:28:56 +0200 Subject: [PATCH 2/3] receive/multitsdb: change order MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/receive/multitsdb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 5b0d75980c..b74b780176 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -416,8 +416,8 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst tenantInstance.mtx.Lock() shipper := tenantInstance.ship tenantInstance.ship = nil - tenantInstance.mtx.Unlock() shipper.DisableWait() + tenantInstance.mtx.Unlock() defer func() { if pruned { From 48f502c78979c776c5039b86d18389b25a1e444d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 2 Apr 2024 17:26:48 +0300 Subject: [PATCH 3/3] shipper/receive: just use a single lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/receive/multitsdb.go | 3 --- pkg/shipper/shipper.go | 46 +++------------------------------------- 2 files changed, 3 insertions(+), 46 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index b74b780176..5ea7bfcc5b 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -416,7 +416,6 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst tenantInstance.mtx.Lock() shipper := tenantInstance.ship tenantInstance.ship = nil - shipper.DisableWait() tenantInstance.mtx.Unlock() defer func() { @@ -426,7 +425,6 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst // If the tenant was not pruned, re-enable the shipper. tenantInstance.mtx.Lock() tenantInstance.ship = shipper - shipper.Enable() tenantInstance.mtx.Unlock() }() @@ -447,7 +445,6 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst level.Info(logger).Log("msg", "Pruning tenant") if shipper != nil { // No other code can reach this shipper anymore so enable it again to be able to sync manually. - shipper.Enable() uploaded, err := shipper.Sync(ctx) if err != nil { return false, err diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 6f3d9bed7c..6b7be6cd12 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -82,8 +82,6 @@ type Shipper struct { labels func() labels.Labels mtx sync.RWMutex - closed bool - wg sync.WaitGroup } // New creates a new shipper that detects new TSDB blocks in dir and uploads them to @@ -138,13 +136,6 @@ func (s *Shipper) SetLabels(lbls labels.Labels) { s.labels = func() labels.Labels { return lbls } } -func (s *Shipper) getLabels() labels.Labels { - s.mtx.RLock() - defer s.mtx.RUnlock() - - return s.labels() -} - // Timestamps returns the minimum timestamp for which data is available and the highest timestamp // of blocks that were successfully uploaded. func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { @@ -249,30 +240,6 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo return nil } -// DisableWait disables the shipper and waits for all ongoing syncs to finish. -// Useful when you want to sync one last time before pruning a TSDB. -func (s *Shipper) DisableWait() { - if s == nil { - return - } - s.mtx.Lock() - s.closed = true - s.mtx.Unlock() - s.wg.Wait() -} - -// Enable enables the shipper again. -// Useful when you want to sync one last time before pruning a TSDB. -// Remove all references to the shipper, call DisableWait, call Enable, and then call Sync() one last time. -func (s *Shipper) Enable() { - if s == nil { - return - } - s.mtx.Lock() - s.closed = false - s.mtx.Unlock() -} - // Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded // to the object bucket once. // @@ -281,14 +248,7 @@ func (s *Shipper) Enable() { // It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok). func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { s.mtx.Lock() - if s.closed { - s.mtx.Unlock() - return 0, nil - } - s.wg.Add(1) - s.mtx.Unlock() - - defer s.wg.Done() + defer s.mtx.Unlock() meta, err := ReadMetaFile(s.metadataFilePath) if err != nil { @@ -311,7 +271,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { meta.Uploaded = nil var ( - checker = newLazyOverlapChecker(s.logger, s.bucket, s.getLabels) + checker = newLazyOverlapChecker(s.logger, s.bucket, func() labels.Labels { return s.labels() }) uploadErrs int ) @@ -433,7 +393,7 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { return errors.Wrap(err, "hard link block") } // Attach current labels and write a new meta file with Thanos extensions. - if lset := s.getLabels(); !lset.IsEmpty() { + if lset := s.labels(); !lset.IsEmpty() { lset.Range(func(l labels.Label) { meta.Thanos.Labels[l.Name] = l.Value })