From c28a4d2382f5f99eadae5136235f3b769b8a692b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 28 Feb 2024 16:53:45 +0200 Subject: [PATCH] receive/multitsdb: do not delete not uploaded blocks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a block hasn't been uploaded yet then tell the TSDB layer not to delete them. This prevents a nasty race where the TSDB layer can delete a block before the shipper gets to it. I saw this happen with a very small block. Signed-off-by: Giedrius Statkevičius --- pkg/receive/multitsdb.go | 65 ++++++++++++++++++++++++++++----- pkg/receive/multitsdb_test.go | 69 +++++++++++++++++++++++++++++++++++ pkg/receive/writer.go | 4 +- pkg/shipper/shipper.go | 51 ++++++++++++++++++++++++++ 4 files changed, 177 insertions(+), 12 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index e5a7e29c1d6..4283565ce5b 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "go.uber.org/atomic" "golang.org/x/exp/slices" @@ -154,7 +155,34 @@ type tenant struct { exemplarsTSDB *exemplars.TSDB ship *shipper.Shipper - mtx *sync.RWMutex + mtx *sync.RWMutex + tsdb *tsdb.DB + + // For tests. + blocksToDeleteFn func(db *tsdb.DB) tsdb.BlocksToDeleteFunc +} + +func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} { + t.mtx.RLock() + defer t.mtx.RUnlock() + + if t.tsdb == nil { + return nil + } + + deletable := t.blocksToDeleteFn(t.tsdb)(blocks) + if t.ship == nil { + return deletable + } + + uploaded := t.ship.UploadedBlocks() + for deletableID := range deletable { + if _, ok := uploaded[deletableID]; !ok { + delete(deletable, deletableID) + } + } + + return deletable } func newTenant() *tenant { @@ -202,14 +230,15 @@ func (t *tenant) shipper() *shipper.Shipper { func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) { t.readyS.Set(tenantTSDB) t.mtx.Lock() - t.setComponents(storeTSDB, ship, exemplarsTSDB) + t.setComponents(storeTSDB, ship, exemplarsTSDB, tenantTSDB) t.mtx.Unlock() } -func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) { +func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) { t.storeTSDB = storeTSDB t.ship = ship t.exemplarsTSDB = exemplarsTSDB + t.tsdb = tenantTSDB } func (t *MultiTSDB) Open() error { @@ -353,7 +382,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { // pruneTSDB removes a TSDB if its past the retention period. // It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk. -func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (bool, error) { +func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (pruned bool, rerr error) { tenantTSDB := tenantInstance.readyStorage() if tenantTSDB == nil { return false, nil @@ -383,9 +412,23 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst tenantTSDB.mtx.Lock() defer tenantTSDB.mtx.Unlock() - // Lock the entire tenant to make sure the shipper is not running in parallel. + // Make sure the shipper is not running in parallel. tenantInstance.mtx.Lock() - defer tenantInstance.mtx.Unlock() + shipper := tenantInstance.ship + tenantInstance.ship = nil + tenantInstance.mtx.Unlock() + shipper.DisableWait() + + defer func() { + if pruned { + return + } + // If the tenant was not pruned, re-enable the shipper. + tenantInstance.mtx.Lock() + tenantInstance.ship = shipper + shipper.Enable() + tenantInstance.mtx.Unlock() + }() sinceLastAppendMillis = time.Since(time.UnixMilli(head.MaxTime())).Milliseconds() if sinceLastAppendMillis <= compactThreshold { @@ -402,8 +445,10 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst } level.Info(logger).Log("msg", "Pruning tenant") - if tenantInstance.ship != nil { - uploaded, err := tenantInstance.ship.Sync(ctx) + if shipper != nil { + // No other code can reach this shipper anymore so enable it again to be able to sync manually. + shipper.Enable() + uploaded, err := shipper.Sync(ctx) if err != nil { return false, err } @@ -422,7 +467,7 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst } tenantInstance.readyS.set(nil) - tenantInstance.setComponents(nil, nil, nil) + tenantInstance.setComponents(nil, nil, nil, nil) return true, nil } @@ -574,6 +619,8 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant level.Info(logger).Log("msg", "opening TSDB") opts := *t.tsdbOpts + opts.BlocksToDelete = tenant.blocksToDelete + tenant.blocksToDeleteFn = tsdb.DefaultBlocksToDelete // NOTE(GiedriusS): always set to false to properly handle OOO samples - OOO samples are written into the WBL // which gets later converted into a block. Without setting this flag to false, the block would get compacted diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 89aeebab734..53d99cabfa6 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -9,17 +9,21 @@ import ( "math" "os" "path" + "path/filepath" "strings" + "sync" "testing" "time" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" + "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -28,6 +32,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -466,8 +471,16 @@ func TestMultiTSDBPrune(t *testing.T) { } testutil.Ok(t, m.Prune(ctx)) + if test.bucket != nil { + _, err := m.Sync(ctx) + testutil.Ok(t, err) + } + testutil.Equals(t, test.expectedTenants, len(m.TSDBLocalClients())) var shippedBlocks int + if test.bucket == nil && shippedBlocks > 0 { + t.Fatal("can't expect uploads when there is no bucket") + } if test.bucket != nil { testutil.Ok(t, test.bucket.Iter(context.Background(), "", func(s string) error { shippedBlocks++ @@ -829,3 +842,59 @@ func BenchmarkMultiTSDB(b *testing.B) { _, _ = a.Append(0, l, int64(i), float64(i)) } } + +func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) { + tenant := &tenant{ + mtx: &sync.RWMutex{}, + } + + t.Run("no blocks", func(t *testing.T) { + require.Equal(t, (map[ulid.ULID]struct{})(nil), tenant.blocksToDelete(nil)) + }) + + tenant.tsdb = &tsdb.DB{} + + mockBlockIDs := []ulid.ULID{ + ulid.MustNew(1, nil), + ulid.MustNew(2, nil), + } + + t.Run("no shipper", func(t *testing.T) { + tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc { + return func(_ []*tsdb.Block) map[ulid.ULID]struct{} { + return map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + } + } + } + + require.Equal(t, map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + }, tenant.blocksToDelete(nil)) + }) + + t.Run("some blocks uploaded", func(t *testing.T) { + tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc { + return func(_ []*tsdb.Block) map[ulid.ULID]struct{} { + return map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + } + } + } + + td := t.TempDir() + + require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), filepath.Join(td, shipper.DefaultMetaFilename), &shipper.Meta{ + Version: shipper.MetaVersion1, + Uploaded: []ulid.ULID{mockBlockIDs[0]}, + })) + + tenant.ship = shipper.New(log.NewNopLogger(), nil, td, nil, nil, metadata.BucketUploadSource, nil, false, metadata.NoneFunc, "") + require.Equal(t, map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + }, tenant.blocksToDelete(nil)) + }) +} diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index ec41466ce45..26fd679f544 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -225,9 +225,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR numExemplarsLabelLength++ level.Debug(exLogger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength) default: - if err != nil { - level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err) - } + level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err) } } } diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index a3b520d15ec..62a71119e67 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -82,6 +82,8 @@ type Shipper struct { labels func() labels.Labels mtx sync.RWMutex + closed bool + wg sync.WaitGroup } // New creates a new shipper that detects new TSDB blocks in dir and uploads them to @@ -247,6 +249,30 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo return nil } +// DisableWait disables the shipper and waits for all ongoing syncs to finish. +// Useful when you want to sync one last time before pruning a TSDB. +func (s *Shipper) DisableWait() { + if s == nil { + return + } + s.mtx.Lock() + s.closed = true + s.mtx.Unlock() + s.wg.Wait() +} + +// Enable enables the shipper again. +// Useful when you want to sync one last time before pruning a TSDB. +// Remove all references to the shipper, call DisableWait, call Enable, and then call Sync() one last time. +func (s *Shipper) Enable() { + if s == nil { + return + } + s.mtx.Lock() + s.closed = false + s.mtx.Unlock() +} + // Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded // to the object bucket once. // @@ -254,6 +280,16 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo // // It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok). func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { + s.mtx.Lock() + if s.closed { + s.mtx.Unlock() + return 0, nil + } + s.wg.Add(1) + s.mtx.Unlock() + + defer s.wg.Done() + meta, err := ReadMetaFile(s.metadataFilePath) if err != nil { // If we encounter any error, proceed with an empty meta file and overwrite it later. @@ -355,6 +391,21 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { return uploaded, nil } +func (s *Shipper) UploadedBlocks() map[ulid.ULID]struct{} { + meta, err := ReadMetaFile(s.metadataFilePath) + if err != nil { + // NOTE(GiedriusS): Sync() will inform users about any problems. + return nil + } + + ret := make(map[ulid.ULID]struct{}, len(meta.Uploaded)) + for _, id := range meta.Uploaded { + ret[id] = struct{}{} + } + + return ret +} + // sync uploads the block if not exists in remote storage. // TODO(khyatisoneji): Double check if block does not have deletion-mark.json for some reason, otherwise log it or return error. func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {