From 066e95682ab004037a949e9da3a9e470a084ec60 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Wed, 9 Dec 2020 01:46:44 -0500 Subject: [PATCH] [dbnode] Skip out of retention index segments during bootstrap. (#2992) --- src/dbnode/persist/fs/index_claims_manager.go | 6 +-- .../persist/fs/index_claims_manager_test.go | 2 +- .../bootstrap/bootstrapper/fs/source.go | 40 +++++++++------ .../bootstrap/bootstrapper/peers/source.go | 50 +++++++++++++------ 4 files changed, 65 insertions(+), 33 deletions(-) diff --git a/src/dbnode/persist/fs/index_claims_manager.go b/src/dbnode/persist/fs/index_claims_manager.go index aaedf1cfab..2c3734de58 100644 --- a/src/dbnode/persist/fs/index_claims_manager.go +++ b/src/dbnode/persist/fs/index_claims_manager.go @@ -40,9 +40,9 @@ var ( // errMustUseSingleClaimsManager returned when a second claims manager // created, since this is a violation of expected behavior. errMustUseSingleClaimsManager = errors.New("not using single global claims manager") - // errOutOfRetentionClaim returned when reserving a claim that is + // ErrOutOfRetentionClaim returned when reserving a claim that is // out of retention. - errOutOfRetentionClaim = errors.New("out of retention index volume claim") + ErrOutOfRetentionClaim = errors.New("out of retention index volume claim") globalIndexClaimsManagers uint64 ) @@ -110,7 +110,7 @@ func (i *indexClaimsManager) ClaimNextIndexFileSetVolumeIndex( // Reject out of retention claims. if blockStart.Before(earliestBlockStart) { - return 0, errOutOfRetentionClaim + return 0, ErrOutOfRetentionClaim } volumeIndexClaimsByBlockStart, ok := i.volumeIndexClaims[md.ID().String()] diff --git a/src/dbnode/persist/fs/index_claims_manager_test.go b/src/dbnode/persist/fs/index_claims_manager_test.go index beece3d5d8..e99ad125d8 100644 --- a/src/dbnode/persist/fs/index_claims_manager_test.go +++ b/src/dbnode/persist/fs/index_claims_manager_test.go @@ -129,7 +129,7 @@ func TestIndexClaimsManagerOutOfRetention(t *testing.T) { md, blockStart, ) - require.Equal(t, errOutOfRetentionClaim, err) + require.Equal(t, ErrOutOfRetentionClaim, err) // Verify that the out of retention entry has been deleted as well. _, ok = mgr.volumeIndexClaims[md.ID().String()][xtime.ToUnixNano(blockStart)] diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index a9a790d15f..ea17ffbaef 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -21,6 +21,7 @@ package fs import ( + "errors" "fmt" "sync" "time" @@ -83,8 +84,9 @@ type fileSystemSource struct { } type fileSystemSourceMetrics struct { - persistedIndexBlocksRead tally.Counter - persistedIndexBlocksWrite tally.Counter + persistedIndexBlocksRead tally.Counter + persistedIndexBlocksWrite tally.Counter + persistedIndexBlocksOutOfRetention tally.Counter } func newFileSystemSource(opts Options) (bootstrap.Source, error) { @@ -105,8 +107,9 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) { idPool: opts.IdentifierPool(), newReaderFn: fs.NewReader, metrics: fileSystemSourceMetrics{ - persistedIndexBlocksRead: scope.Counter("persist-index-blocks-read"), - persistedIndexBlocksWrite: scope.Counter("persist-index-blocks-write"), + persistedIndexBlocksRead: scope.Counter("persist-index-blocks-read"), + persistedIndexBlocksWrite: scope.Counter("persist-index-blocks-write"), + persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), }, } s.newReaderPoolOpts.Alloc = s.newReader @@ -408,6 +411,17 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( requestedRanges := timeWindowReaders.Ranges remainingRanges := requestedRanges.Copy() shardReaders := timeWindowReaders.Readers + defer func() { + // Return readers to pool. + for _, shardReaders := range shardReaders { + for _, r := range shardReaders.Readers { + if err := r.Close(); err == nil { + readerPool.Put(r) + } + } + } + }() + for shard, shardReaders := range shardReaders { shard := uint32(shard) readers := shardReaders.Readers @@ -590,7 +604,14 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( blockStart, blockEnd, ) - if err != nil { + if errors.Is(err, fs.ErrOutOfRetentionClaim) { + // Bail early if the index segment is already out of retention. + // This can happen when the edge of requested ranges at time of data bootstrap + // is now out of retention. + s.log.Debug("skipping out of retention index segment", buildIndexLogFields...) + s.metrics.persistedIndexBlocksOutOfRetention.Inc(1) + return + } else if err != nil { instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { l.Error("persist fs index bootstrap failed", zap.Error(err), @@ -637,15 +658,6 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( runResult.Unlock() } - // Return readers to pool. - for _, shardReaders := range shardReaders { - for _, r := range shardReaders.Readers { - if err := r.Close(); err == nil { - readerPool.Put(r) - } - } - } - s.markRunResultErrorsAndUnfulfilled(runResult, requestedRanges, remainingRanges, timesWithErrors) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index f51c0816f3..a0bf66d87b 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -21,11 +21,17 @@ package peers import ( + "errors" "fmt" "io" "sync" "time" + "github.com/opentracing/opentracing-go" + "github.com/uber-go/tally" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/namespace" @@ -50,10 +56,6 @@ import ( xresource "github.com/m3db/m3/src/x/resource" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" - - "github.com/opentracing/opentracing-go" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) type peersSource struct { @@ -61,6 +63,11 @@ type peersSource struct { log *zap.Logger newPersistManager func() (persist.Manager, error) nowFn clock.NowFn + metrics peersSourceMetrics +} + +type peersSourceMetrics struct { + persistedIndexBlocksOutOfRetention tally.Counter } type persistenceFlush struct { @@ -76,6 +83,8 @@ func newPeersSource(opts Options) (bootstrap.Source, error) { } iopts := opts.ResultOptions().InstrumentOptions() + scope := iopts.MetricsScope().SubScope("peers-bootstrapper") + iopts = iopts.SetMetricsScope(scope) return &peersSource{ opts: opts, log: iopts.Logger().With(zap.String("bootstrapper", "peers")), @@ -83,6 +92,9 @@ func newPeersSource(opts Options) (bootstrap.Source, error) { return fs.NewPersistManager(opts.FilesystemOptions()) }, nowFn: opts.ResultOptions().ClockOptions().NowFn(), + metrics: peersSourceMetrics{ + persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), + }, }, nil } @@ -824,7 +836,17 @@ func (s *peersSource) processReaders( timesWithErrors []time.Time totalEntries int ) - defer docsPool.Put(batch) + defer func() { + docsPool.Put(batch) + // Return readers to pool. + for _, shardReaders := range timeWindowReaders.Readers { + for _, r := range shardReaders.Readers { + if err := r.Close(); err == nil { + readerPool.Put(r) + } + } + } + }() requestedRanges := timeWindowReaders.Ranges remainingRanges := requestedRanges.Copy() @@ -934,7 +956,14 @@ func (s *peersSource) processReaders( blockStart, blockEnd, ) - if err != nil { + if errors.Is(err, fs.ErrOutOfRetentionClaim) { + // Bail early if the index segment is already out of retention. + // This can happen when the edge of requested ranges at time of data bootstrap + // is now out of retention. + s.log.Debug("skipping out of retention index segment", buildIndexLogFields...) + s.metrics.persistedIndexBlocksOutOfRetention.Inc(1) + return remainingRanges, timesWithErrors + } else if err != nil { instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { l.Error("persist fs index bootstrap failed", zap.Stringer("namespace", ns.ID()), @@ -978,15 +1007,6 @@ func (s *peersSource) processReaders( r.IndexResults()[xtime.ToUnixNano(blockStart)].SetBlock(idxpersist.DefaultIndexVolumeType, result.NewIndexBlock(segments, newFulfilled)) resultLock.Unlock() - // Return readers to pool. - for _, shardReaders := range timeWindowReaders.Readers { - for _, r := range shardReaders.Readers { - if err := r.Close(); err == nil { - readerPool.Put(r) - } - } - } - return remainingRanges, timesWithErrors }