From e2b4a8a11e65a12a9e0a051d13fb6dcf7c6acbf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=CE=BBinas?= Date: Wed, 17 Nov 2021 11:14:23 +0200 Subject: [PATCH] [dbnode] Avoid loading blocks in memory for namespaces with snapshots disabled during bootstrapping (#3919) * Use `persist.FileSetFlushType` instead of `persist.FileSetSnapshotType` for second target data and index ranges if bootstrapping namespace is read only. Because of this change, bootstrappers won't keep second range blocks in memory for read only namespaces (`shouldPersist` will evaluate to true for them). * Inline `namespace.ReadOnly` instead of declaring it as var. * Use `snapshotEnabled` instead of `namespace.ReadOnly`. * Removed unused field. * Updated unit test names to align with snapshotEnabled. --- src/dbnode/storage/bootstrap/process.go | 83 +++++++++++--------- src/dbnode/storage/bootstrap/process_test.go | 33 ++++++++ src/dbnode/storage/bootstrap_test.go | 15 ++-- 3 files changed, 85 insertions(+), 46 deletions(-) diff --git a/src/dbnode/storage/bootstrap/process.go b/src/dbnode/storage/bootstrap/process.go index 904370cc70..ca9cc2518e 100644 --- a/src/dbnode/storage/bootstrap/process.go +++ b/src/dbnode/storage/bootstrap/process.go @@ -34,7 +34,6 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" - "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/tracepoint" @@ -188,14 +187,16 @@ func (b bootstrapProcess) Run( } namespaceDetails := make([]NamespaceDetails, 0, len(namespaces)) for _, namespace := range namespaces { - ropts := namespace.Metadata.Options().RetentionOptions() - idxopts := namespace.Metadata.Options().IndexOptions() - dataRanges := b.targetRangesForData(at, ropts) - indexRanges := b.targetRangesForIndex(at, ropts, idxopts) - firstRanges := b.newShardTimeRanges( - dataRanges.firstRangeWithPersistTrue.Range, - namespace.Shards, + var ( + nsOpts = namespace.Metadata.Options() + dataRanges = b.targetRangesForData(at, nsOpts) + indexRanges = b.targetRangesForIndex(at, nsOpts) + firstRanges = b.newShardTimeRanges( + dataRanges.firstRangeWithPersistTrue.Range, + namespace.Shards, + ) ) + namespacesRunFirst.Namespaces.Set(namespace.Metadata.ID(), Namespace{ Metadata: namespace.Metadata, Shards: namespace.Shards, @@ -215,23 +216,23 @@ func (b bootstrapProcess) Run( }, }) secondRanges := b.newShardTimeRanges( - dataRanges.secondRangeWithPersistFalse.Range, namespace.Shards) + dataRanges.secondRange.Range, namespace.Shards) namespacesRunSecond.Namespaces.Set(namespace.Metadata.ID(), Namespace{ Metadata: namespace.Metadata, Shards: namespace.Shards, DataAccumulator: namespace.DataAccumulator, Hooks: namespace.Hooks, - DataTargetRange: dataRanges.secondRangeWithPersistFalse, - IndexTargetRange: indexRanges.secondRangeWithPersistFalse, + DataTargetRange: dataRanges.secondRange, + IndexTargetRange: indexRanges.secondRange, DataRunOptions: NamespaceRunOptions{ ShardTimeRanges: secondRanges.Copy(), TargetShardTimeRanges: secondRanges.Copy(), - RunOptions: dataRanges.secondRangeWithPersistFalse.RunOptions, + RunOptions: dataRanges.secondRange.RunOptions, }, IndexRunOptions: NamespaceRunOptions{ ShardTimeRanges: secondRanges.Copy(), TargetShardTimeRanges: secondRanges.Copy(), - RunOptions: indexRanges.secondRangeWithPersistFalse.RunOptions, + RunOptions: indexRanges.secondRange.RunOptions, }, }) namespaceDetails = append(namespaceDetails, NamespaceDetails{ @@ -247,12 +248,12 @@ func (b bootstrapProcess) Run( return NamespaceResults{}, err } - bootstrapResult := NewNamespaceResults(namespacesRunFirst) - for _, namespaces := range []Namespaces{ - namespacesRunFirst, - namespacesRunSecond, - } { - + var ( + bootstrapResult = NewNamespaceResults(namespacesRunFirst) + namespacesToRun = []Namespaces{namespacesRunFirst, namespacesRunSecond} + lastRunIndex = len(namespacesToRun) - 1 + ) + for runIndex, namespaces := range namespacesToRun { for _, entry := range namespaces.Namespaces.Iter() { ns := entry.Value() @@ -266,24 +267,22 @@ func (b bootstrapProcess) Run( continue } - // Check if snapshot-type ranges have advanced while bootstrapping previous ranges. - // If yes, return an error to force a retry - if persistConf := ns.DataRunOptions.RunOptions.PersistConfig(); persistConf.Enabled && - persistConf.FileSetType == persist.FileSetSnapshotType { + // If last run, check if ranges have advanced while bootstrapping previous ranges. + // If yes, return an error to force a retry. + if runIndex == lastRunIndex { var ( now = xtime.ToUnixNano(b.nowFn()) nsOptions = ns.Metadata.Options() - upToDateDataRanges = b.targetRangesForData(now, nsOptions.RetentionOptions()) + upToDateDataRanges = b.targetRangesForData(now, nsOptions) ) // Only checking data ranges. Since index blocks can only be a multiple of // data block size, the ranges for index could advance only if data ranges // have advanced, too (while opposite is not necessarily true) - if !upToDateDataRanges.secondRangeWithPersistFalse.Range.Equal(ns.DataTargetRange.Range) { - upToDateIndexRanges := b.targetRangesForIndex(now, nsOptions.RetentionOptions(), - nsOptions.IndexOptions()) + if !upToDateDataRanges.secondRange.Range.Equal(ns.DataTargetRange.Range) { + upToDateIndexRanges := b.targetRangesForIndex(now, nsOptions) fields := b.logFields(ns.Metadata, ns.Shards, - upToDateDataRanges.secondRangeWithPersistFalse.Range, - upToDateIndexRanges.secondRangeWithPersistFalse.Range) + upToDateDataRanges.secondRange.Range, + upToDateIndexRanges.secondRange.Range) b.log.Error("time ranges of snapshot-type blocks advanced", fields...) return NamespaceResults{}, ErrFileSetSnapshotTypeRangeAdvanced } @@ -444,28 +443,31 @@ func (b bootstrapProcess) logBootstrapResult( func (b bootstrapProcess) targetRangesForData( at xtime.UnixNano, - ropts retention.Options, + nsOpts namespace.Options, ) targetRangesResult { + ropts := nsOpts.RetentionOptions() return b.targetRanges(at, targetRangesOptions{ retentionPeriod: ropts.RetentionPeriod(), futureRetentionPeriod: ropts.FutureRetentionPeriod(), blockSize: ropts.BlockSize(), bufferPast: ropts.BufferPast(), bufferFuture: ropts.BufferFuture(), + snapshotEnabled: nsOpts.SnapshotEnabled(), }) } func (b bootstrapProcess) targetRangesForIndex( at xtime.UnixNano, - ropts retention.Options, - idxopts namespace.IndexOptions, + nsOpts namespace.Options, ) targetRangesResult { + ropts := nsOpts.RetentionOptions() return b.targetRanges(at, targetRangesOptions{ retentionPeriod: ropts.RetentionPeriod(), futureRetentionPeriod: ropts.FutureRetentionPeriod(), - blockSize: idxopts.BlockSize(), + blockSize: nsOpts.IndexOptions().BlockSize(), bufferPast: ropts.BufferPast(), bufferFuture: ropts.BufferFuture(), + snapshotEnabled: nsOpts.SnapshotEnabled(), }) } @@ -475,11 +477,12 @@ type targetRangesOptions struct { blockSize time.Duration bufferPast time.Duration bufferFuture time.Duration + snapshotEnabled bool } type targetRangesResult struct { - firstRangeWithPersistTrue TargetRange - secondRangeWithPersistFalse TargetRange + firstRangeWithPersistTrue TargetRange + secondRange TargetRange } func (b bootstrapProcess) targetRanges( @@ -499,6 +502,12 @@ func (b bootstrapProcess) targetRanges( Truncate(opts.blockSize). Add(opts.blockSize) + secondRangeFilesetType := persist.FileSetSnapshotType + if !opts.snapshotEnabled { + // NB: If snapshots are disabled for a namespace, we want to use flush type. + secondRangeFilesetType = persist.FileSetFlushType + } + // NB(r): We want the large initial time range bootstrapped to // bootstrap with persistence so we don't keep the full raw // data in process until we finish bootstrapping which could @@ -514,7 +523,7 @@ func (b bootstrapProcess) targetRanges( FileSetType: persist.FileSetFlushType, }), }, - secondRangeWithPersistFalse: TargetRange{ + secondRange: TargetRange{ Range: xtime.Range{Start: midPoint, End: cutover}, RunOptions: b.newRunOptions().SetPersistConfig(PersistConfig{ Enabled: true, @@ -522,7 +531,7 @@ func (b bootstrapProcess) targetRanges( // in memory, but we want to snapshot them as we receive them // so that once bootstrapping completes we can still recover // from just the commit log bootstrapper. - FileSetType: persist.FileSetSnapshotType, + FileSetType: secondRangeFilesetType, }), }, } diff --git a/src/dbnode/storage/bootstrap/process_test.go b/src/dbnode/storage/bootstrap/process_test.go index 6e8cb63fc0..8fdc33faa5 100644 --- a/src/dbnode/storage/bootstrap/process_test.go +++ b/src/dbnode/storage/bootstrap/process_test.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/sharding" @@ -135,3 +136,35 @@ func TestBootstrapProcessRunActiveBlockAdvanced(t *testing.T) { }) } } + +func TestTargetRangesFileSetTypeForSnapshotDisabledNamespace(t *testing.T) { + sut := bootstrapProcess{processOpts: NewProcessOptions()} + nsOpts := namespace.NewOptions().SetSnapshotEnabled(false) + + rangesForData := sut.targetRangesForData(xtime.Now(), nsOpts) + rangesForIndex := sut.targetRangesForIndex(xtime.Now(), nsOpts) + + requireFilesetTypes(t, rangesForData, persist.FileSetFlushType) + requireFilesetTypes(t, rangesForIndex, persist.FileSetFlushType) +} + +func TestTargetRangesFileSetTypeForSnapshotEnabledNamespace(t *testing.T) { + sut := bootstrapProcess{processOpts: NewProcessOptions()} + nsOpts := namespace.NewOptions().SetSnapshotEnabled(true) + + rangesForData := sut.targetRangesForData(xtime.Now(), nsOpts) + rangesForIndex := sut.targetRangesForIndex(xtime.Now(), nsOpts) + + requireFilesetTypes(t, rangesForData, persist.FileSetSnapshotType) + requireFilesetTypes(t, rangesForIndex, persist.FileSetSnapshotType) +} + +func requireFilesetTypes(t *testing.T, ranges targetRangesResult, expectedSecond persist.FileSetType) { + persistConfigFirstRange := ranges.firstRangeWithPersistTrue.RunOptions.PersistConfig() + require.True(t, persistConfigFirstRange.Enabled) + require.Equal(t, persist.FileSetFlushType, persistConfigFirstRange.FileSetType) + + persistConfigSecondRange := ranges.secondRange.RunOptions.PersistConfig() + require.True(t, persistConfigSecondRange.Enabled) + require.Equal(t, expectedSecond, persistConfigSecondRange.FileSetType) +} diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 0a69ef826e..92d49cd80f 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -26,15 +26,14 @@ import ( "testing" "time" - "github.com/m3db/m3/src/dbnode/namespace" - "github.com/m3db/m3/src/dbnode/storage/bootstrap" - "github.com/m3db/m3/src/x/context" - "github.com/m3db/m3/src/x/ident" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" + "github.com/m3db/m3/src/x/context" + "github.com/m3db/m3/src/x/ident" xtest "github.com/m3db/m3/src/x/test" ) @@ -141,9 +140,8 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil).AnyTimes() - ns.EXPECT().Metadata().Return(meta).AnyTimes() - + ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil).Times(2) + ns.EXPECT().Metadata().Return(meta).Times(2) ns.EXPECT(). Bootstrap(gomock.Any(), gomock.Any()). Return(nil). @@ -216,7 +214,6 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) { ns.EXPECT().PrepareBootstrap(gomock.Any()).Return(shards, nil).AnyTimes() ns.EXPECT().Metadata().Return(meta).AnyTimes() - ns.EXPECT(). Bootstrap(gomock.Any(), gomock.Any()). Return(nil).