From dc2c4cbd688b7c388f342ea7d0d6a34b0a6f0fe7 Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Sun, 6 Sep 2020 14:34:21 -0400 Subject: [PATCH] Add RunState and thread through bootstrapper --- src/cmd/services/m3dbnode/config/bootstrap.go | 2 +- src/dbnode/integration/integration.go | 2 +- src/dbnode/integration/setup.go | 2 +- src/dbnode/persist/fs/types.go | 6 -- .../storage/bootstrap/bootstrapper/base.go | 9 +- .../bootstrapper/commitlog/source.go | 10 +- .../bootstrapper/fs/migrator/migrator.go | 3 +- .../bootstrapper/fs/migrator/migrator_test.go | 3 +- .../bootstrapper/fs/migrator/options.go | 7 +- .../bootstrapper/fs/migrator/options_test.go | 3 +- .../bootstrapper/fs/migrator/types.go | 5 +- .../bootstrap/bootstrapper/fs/source.go | 63 ++++------- .../storage/bootstrap/bootstrapper/noop.go | 6 +- .../bootstrap/bootstrapper/peers/source.go | 10 +- .../storage/bootstrap/bootstrapper/readers.go | 23 +--- .../bootstrapper/uninitialized/source.go | 5 +- src/dbnode/storage/bootstrap/process.go | 22 +++- src/dbnode/storage/bootstrap/run_state.go | 102 ++++++++++++++++++ src/dbnode/storage/bootstrap/types.go | 46 +++++++- src/dbnode/storage/bootstrap/util.go | 16 ++- 20 files changed, 247 insertions(+), 98 deletions(-) create mode 100644 src/dbnode/storage/bootstrap/run_state.go diff --git a/src/cmd/services/m3dbnode/config/bootstrap.go b/src/cmd/services/m3dbnode/config/bootstrap.go index db32771f67..f39ef73fdd 100644 --- a/src/cmd/services/m3dbnode/config/bootstrap.go +++ b/src/cmd/services/m3dbnode/config/bootstrap.go @@ -283,7 +283,7 @@ func (bsc BootstrapConfiguration) New( if bsc.CacheSeriesMetadata != nil { providerOpts = providerOpts.SetCacheSeriesMetadata(*bsc.CacheSeriesMetadata) } - return bootstrap.NewProcessProvider(bs, providerOpts, rsOpts) + return bootstrap.NewProcessProvider(bs, providerOpts, rsOpts, fsOpts) } func (bsc BootstrapConfiguration) filesystemConfig() BootstrapFilesystemConfiguration { diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index b0fdba6c39..f6091f7cf4 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -299,7 +299,7 @@ func newDefaultBootstrappableTestSetups( processOpts := bootstrap.NewProcessOptions(). SetTopologyMapProvider(setup). SetOrigin(setup.Origin()) - provider, err := bootstrap.NewProcessProvider(fsBootstrapper, processOpts, bsOpts) + provider, err := bootstrap.NewProcessProvider(fsBootstrapper, processOpts, bsOpts, fsOpts) require.NoError(t, err) setup.SetStorageOpts(setup.StorageOpts().SetBootstrapProcessProvider(provider)) diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index 9186052bc7..6e07a5e150 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -952,7 +952,7 @@ func (ts *testSetup) InitializeBootstrappers(opts InitializeBootstrappersOptions processOpts := bootstrap.NewProcessOptions(). SetTopologyMapProvider(ts). SetOrigin(ts.Origin()) - process, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts) + process, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts, fsOpts) if err != nil { return err } diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 2c28c7e359..ee1ddd4bc5 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -622,9 +622,3 @@ type Segments interface { AbsoluteFilePaths() []string BlockStart() time.Time } - -// InfoFileResultsPerShard maps shards to info files. -type InfoFileResultsPerShard map[uint32][]ReadInfoFileResult - -// InfoFilesByNamespace maps a namespace to info files grouped by shard. -type InfoFilesByNamespace map[namespace.Metadata]InfoFileResultsPerShard diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base.go b/src/dbnode/storage/bootstrap/bootstrapper/base.go index c6e78d36cb..604fe53ff1 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base.go @@ -78,6 +78,7 @@ func (b baseBootstrapper) String() string { func (b baseBootstrapper) Bootstrap( ctx context.Context, namespaces bootstrap.Namespaces, + runState bootstrap.RunState, ) (bootstrap.NamespaceResults, error) { logFields := []zapcore.Field{ zap.String("bootstrapper", b.name), @@ -96,7 +97,7 @@ func (b baseBootstrapper) Bootstrap( logFields, currNamespace) dataAvailable, err := b.src.AvailableData(currNamespace.Metadata, - currNamespace.DataRunOptions.ShardTimeRanges.Copy(), + currNamespace.DataRunOptions.ShardTimeRanges.Copy(), runState, currNamespace.DataRunOptions.RunOptions) if err != nil { return bootstrap.NamespaceResults{}, err @@ -107,7 +108,7 @@ func (b baseBootstrapper) Bootstrap( // Prepare index if required. if currNamespace.Metadata.Options().IndexOptions().Enabled() { indexAvailable, err := b.src.AvailableIndex(currNamespace.Metadata, - currNamespace.IndexRunOptions.ShardTimeRanges.Copy(), + currNamespace.IndexRunOptions.ShardTimeRanges.Copy(), runState, currNamespace.IndexRunOptions.RunOptions) if err != nil { return bootstrap.NamespaceResults{}, err @@ -137,7 +138,7 @@ func (b baseBootstrapper) Bootstrap( b.log.Info("bootstrap from source started", logFields...) // Run the bootstrap source. - currResults, err := b.src.Read(ctx, curr) + currResults, err := b.src.Read(ctx, curr, runState) logFields = append(logFields, zap.Duration("took", nowFn().Sub(begin))) if err != nil { @@ -166,7 +167,7 @@ func (b baseBootstrapper) Bootstrap( // If there are some time ranges the current bootstrapper could not fulfill, // that we can attempt then pass it along to the next bootstrapper. if next.Namespaces.Len() > 0 { - nextResults, err := b.next.Bootstrap(ctx, next) + nextResults, err := b.next.Bootstrap(ctx, next, runState) if err != nil { return bootstrap.NamespaceResults{}, err } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 91af1ba4a3..6738661c30 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -146,6 +146,7 @@ func newCommitLogSource( func (s *commitLogSource) AvailableData( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, + _ bootstrap.RunState, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { return s.availability(ns, shardsTimeRanges, runOpts) @@ -154,6 +155,7 @@ func (s *commitLogSource) AvailableData( func (s *commitLogSource) AvailableIndex( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, + _ bootstrap.RunState, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { return s.availability(ns, shardsTimeRanges, runOpts) @@ -171,6 +173,7 @@ type readNamespaceResult struct { func (s *commitLogSource) Read( ctx context.Context, namespaces bootstrap.Namespaces, + runState bootstrap.RunState, ) (bootstrap.NamespaceResults, error) { ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperCommitLogSourceRead) defer span.Finish() @@ -235,7 +238,7 @@ func (s *commitLogSource) Read( for shard, tr := range shardTimeRanges.Iter() { err := s.bootstrapShardSnapshots( ns.Metadata, accumulator, shard, tr, blockSize, - mostRecentCompleteSnapshotByBlockShard) + mostRecentCompleteSnapshotByBlockShard, runState) if err != nil { return bootstrap.NamespaceResults{}, err } @@ -661,14 +664,13 @@ func (s *commitLogSource) bootstrapShardSnapshots( shardTimeRanges xtime.Ranges, blockSize time.Duration, mostRecentCompleteSnapshotByBlockShard map[xtime.UnixNano]map[uint32]fs.FileSetFile, + runState bootstrap.RunState, ) error { // NB(bodu): We use info files on disk to check if a snapshot should be loaded in as cold or warm. // We do this instead of cross refing blockstarts and current time to handle the case of bootstrapping a // once warm block start after a node has been shut down for a long time. We consider all block starts we // haven't flushed data for yet a warm block start. - fsOpts := s.opts.CommitLogOptions().FilesystemOptions() - readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), ns.ID(), shard, - fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions(), persist.FileSetFlushType) + readInfoFilesResults := runState.InfoFilesForNamespace(ns)[shard] shardBlockStartsOnDisk := make(map[xtime.UnixNano]struct{}) for _, result := range readInfoFilesResults { if err := result.Err.Error(); err != nil { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/migrator.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/migrator.go index c9e7489031..5dcbe60ba3 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/migrator.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/migrator.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/migration" "github.com/m3db/m3/src/dbnode/storage" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/instrument" @@ -44,7 +45,7 @@ type worker struct { // the info files. type Migrator struct { migrationTaskFn MigrationTaskFn - infoFilesByNamespace fs.InfoFilesByNamespace + infoFilesByNamespace bootstrap.InfoFilesByNamespace migrationOpts migration.Options fsOpts fs.Options instrumentOpts instrument.Options diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/migrator_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/migrator_test.go index 67e4cea200..dda1fbc183 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/migrator_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/migrator_test.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs/migration" "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/storage" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" @@ -50,7 +51,7 @@ func TestMigratorRun(t *testing.T) { // Create some dummy ReadInfoFileResults as these are used to determine if we need to run a migration or not. // Put some in a state requiring migrations and others not to flex both paths. - infoFilesByNamespace := fs.InfoFilesByNamespace{ + infoFilesByNamespace := bootstrap.InfoFilesByNamespace{ md1: { 1: {testInfoFileWithVolumeIndex(0), testInfoFileWithVolumeIndex(1)}, 2: {testInfoFileWithVolumeIndex(0), testInfoFileWithVolumeIndex(1)}, diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/options.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/options.go index be077a602d..716ad19171 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/options.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/options.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/migration" "github.com/m3db/m3/src/dbnode/storage" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/x/instrument" ) @@ -40,7 +41,7 @@ var ( type options struct { migrationTaskFn MigrationTaskFn - infoFilesByNamespace fs.InfoFilesByNamespace + infoFilesByNamespace bootstrap.InfoFilesByNamespace migrationOpts migration.Options fsOpts fs.Options instrumentOpts instrument.Options @@ -93,13 +94,13 @@ func (o *options) MigrationTaskFn() MigrationTaskFn { return o.migrationTaskFn } -func (o *options) SetInfoFilesByNamespace(value fs.InfoFilesByNamespace) Options { +func (o *options) SetInfoFilesByNamespace(value bootstrap.InfoFilesByNamespace) Options { opts := *o opts.infoFilesByNamespace = value return &opts } -func (o *options) InfoFilesByNamespace() fs.InfoFilesByNamespace { +func (o *options) InfoFilesByNamespace() bootstrap.InfoFilesByNamespace { return o.infoFilesByNamespace } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/options_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/options_test.go index ef3c9589ca..288889338e 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/options_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/options_test.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/migration" "github.com/m3db/m3/src/dbnode/storage" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" xtest "github.com/m3db/m3/src/x/test" "github.com/golang/mock/gomock" @@ -109,7 +110,7 @@ func newTestOptions(ctrl *gomock.Controller) Options { SetMigrationTaskFn(func(result fs.ReadInfoFileResult) (migration.NewTaskFn, bool) { return nil, false }). - SetInfoFilesByNamespace(make(fs.InfoFilesByNamespace)). + SetInfoFilesByNamespace(make(bootstrap.InfoFilesByNamespace)). SetStorageOptions(mockOpts). SetFilesystemOptions(fs.NewOptions()) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/types.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/types.go index 7e5ff20465..d4aa53a1d9 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/types.go @@ -24,6 +24,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/migration" "github.com/m3db/m3/src/dbnode/storage" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/x/instrument" ) @@ -42,10 +43,10 @@ type Options interface { MigrationTaskFn() MigrationTaskFn // SetInfoFilesByNamespaces sets the info file results to operate on keyed by namespace. - SetInfoFilesByNamespace(value fs.InfoFilesByNamespace) Options + SetInfoFilesByNamespace(value bootstrap.InfoFilesByNamespace) Options // InfoFilesByNamespaces returns the info file results to operate on keyed by namespace. - InfoFilesByNamespace() fs.InfoFilesByNamespace + InfoFilesByNamespace() bootstrap.InfoFilesByNamespace // SetMigrationOptions sets the migration options. SetMigrationOptions(value migration.Options) Options diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index 924bdaac00..ff8cd5da1f 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -123,22 +123,25 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) { func (s *fileSystemSource) AvailableData( md namespace.Metadata, shardTimeRanges result.ShardTimeRanges, - runOpts bootstrap.RunOptions, + runState bootstrap.RunState, + _ bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { - return s.availability(md, shardTimeRanges) + return s.availability(md, shardTimeRanges, runState.InfoFilesForNamespace(md)) } func (s *fileSystemSource) AvailableIndex( md namespace.Metadata, shardTimeRanges result.ShardTimeRanges, - runOpts bootstrap.RunOptions, + runState bootstrap.RunState, + _ bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { - return s.availability(md, shardTimeRanges) + return s.availability(md, shardTimeRanges, runState.InfoFilesForNamespace(md)) } func (s *fileSystemSource) Read( ctx context.Context, namespaces bootstrap.Namespaces, + runState bootstrap.RunState, ) (bootstrap.NamespaceResults, error) { ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperFilesystemSourceRead) defer span.Finish() @@ -154,13 +157,10 @@ func (s *fileSystemSource) Read( } builder := result.NewIndexBuilder(segBuilder) - // Preload info file results so they can be used to bootstrap data filesets and data migrations - infoFilesByNamespace := s.loadInfoFiles(namespaces) - // Perform any necessary migrations but don't block bootstrap process on failure. Will update info file // in-memory structures in place if migrations have written new files to disk. This saves us the need from // having to re-read migrated info files. - s.runMigrations(ctx, infoFilesByNamespace) + s.runMigrations(ctx, runState.ReadInfoFiles()) // NB(r): Perform all data bootstrapping first then index bootstrapping // to more clearly deliniate which process is slower than the other. @@ -177,7 +177,7 @@ func (s *fileSystemSource) Read( r, err := s.read(bootstrapDataRunType, md, namespace.DataAccumulator, namespace.DataRunOptions.ShardTimeRanges, - namespace.DataRunOptions.RunOptions, builder, span, infoFilesByNamespace) + namespace.DataRunOptions.RunOptions, builder, span, runState.InfoFilesForNamespace(md)) if err != nil { return bootstrap.NamespaceResults{}, err } @@ -207,7 +207,7 @@ func (s *fileSystemSource) Read( r, err := s.read(bootstrapIndexRunType, md, namespace.DataAccumulator, namespace.IndexRunOptions.ShardTimeRanges, - namespace.IndexRunOptions.RunOptions, builder, span, infoFilesByNamespace) + namespace.IndexRunOptions.RunOptions, builder, span, runState.InfoFilesForNamespace(md)) if err != nil { return bootstrap.NamespaceResults{}, err } @@ -229,7 +229,7 @@ func (s *fileSystemSource) Read( return results, nil } -func (s *fileSystemSource) runMigrations(ctx context.Context, infoFilesByNamespace fs.InfoFilesByNamespace) { +func (s *fileSystemSource) runMigrations(ctx context.Context, infoFilesByNamespace bootstrap.InfoFilesByNamespace) { // Only one migration for now, so just short circuit entirely if not enabled if s.opts.MigrationOptions().TargetMigrationVersion() != migration.MigrationVersion_1_1 { return @@ -263,10 +263,11 @@ func (s *fileSystemSource) runMigrations(ctx context.Context, infoFilesByNamespa func (s *fileSystemSource) availability( md namespace.Metadata, shardTimeRanges result.ShardTimeRanges, + infoFilesByShard bootstrap.InfoFileResultsPerShard, ) (result.ShardTimeRanges, error) { result := result.NewShardTimeRangesFromSize(shardTimeRanges.Len()) for shard, ranges := range shardTimeRanges.Iter() { - result.Set(shard, s.shardAvailability(md.ID(), shard, ranges)) + result.Set(shard, s.shardAvailability(md.ID(), shard, ranges, infoFilesByShard)) } return result, nil } @@ -275,16 +276,12 @@ func (s *fileSystemSource) shardAvailability( namespace ident.ID, shard uint32, targetRangesForShard xtime.Ranges, + infoFilesByShard bootstrap.InfoFileResultsPerShard, ) xtime.Ranges { if targetRangesForShard.IsEmpty() { return xtime.NewRanges() } - - readInfoFilesResults := fs.ReadInfoFiles(s.fsopts.FilePathPrefix(), - namespace, shard, s.fsopts.InfoReaderBufferSize(), s.fsopts.DecodingOptions(), - persist.FileSetFlushType) - - return s.shardAvailabilityWithInfoFiles(namespace, shard, targetRangesForShard, readInfoFilesResults) + return s.shardAvailabilityWithInfoFiles(namespace, shard, targetRangesForShard, infoFilesByShard[shard]) } func (s *fileSystemSource) shardAvailabilityWithInfoFiles( @@ -735,7 +732,7 @@ func (s *fileSystemSource) read( runOpts bootstrap.RunOptions, builder *result.IndexBuilder, span opentracing.Span, - infoFilesByNamespace fs.InfoFilesByNamespace, + infoFilesByShard bootstrap.InfoFileResultsPerShard, ) (*runResult, error) { var ( seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() @@ -760,7 +757,7 @@ func (s *fileSystemSource) read( if seriesCachePolicy != series.CacheAll { // Unless we're caching all series (or all series metadata) in memory, we // return just the availability of the files we have. - return s.bootstrapDataRunResultFromAvailability(md, shardTimeRanges, infoFilesByNamespace), nil + return s.bootstrapDataRunResultFromAvailability(md, shardTimeRanges, infoFilesByShard), nil } } @@ -820,6 +817,7 @@ func (s *fileSystemSource) read( Logger: s.log, Span: span, NowFn: s.nowFn, + InfoFilesByShard: infoFilesByShard, }) bootstrapFromDataReadersResult := s.bootstrapFromReaders(run, md, accumulator, runOpts, readerPool, readersCh, builder) @@ -838,7 +836,7 @@ func (s *fileSystemSource) newReader() (fs.DataFileSetReader, error) { func (s *fileSystemSource) bootstrapDataRunResultFromAvailability( md namespace.Metadata, shardTimeRanges result.ShardTimeRanges, - infoFilesByNamespace fs.InfoFilesByNamespace, + infoFilesByShard bootstrap.InfoFileResultsPerShard, ) *runResult { runResult := newRunResult() unfulfilled := runResult.data.Unfulfilled() @@ -846,7 +844,7 @@ func (s *fileSystemSource) bootstrapDataRunResultFromAvailability( if ranges.IsEmpty() { continue } - availability := s.shardAvailabilityWithInfoFiles(md.ID(), shard, ranges, infoFilesByNamespace[md][shard]) + availability := s.shardAvailabilityWithInfoFiles(md.ID(), shard, ranges, infoFilesByShard[shard]) remaining := ranges.Clone() remaining.RemoveRanges(availability) if !remaining.IsEmpty() { @@ -967,27 +965,6 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( return res, nil } -func (s *fileSystemSource) loadInfoFiles( - namespaces bootstrap.Namespaces, -) fs.InfoFilesByNamespace { - infoFilesByNamespace := make(fs.InfoFilesByNamespace) - - for _, elem := range namespaces.Namespaces.Iter() { - namespace := elem.Value() - shardTimeRanges := namespace.DataRunOptions.ShardTimeRanges - result := make(fs.InfoFileResultsPerShard, shardTimeRanges.Len()) - for shard := range shardTimeRanges.Iter() { - result[shard] = fs.ReadInfoFiles(s.fsopts.FilePathPrefix(), - namespace.Metadata.ID(), shard, s.fsopts.InfoReaderBufferSize(), s.fsopts.DecodingOptions(), - persist.FileSetFlushType) - } - - infoFilesByNamespace[namespace.Metadata] = result - } - - return infoFilesByNamespace -} - type runResult struct { sync.RWMutex data result.DataBootstrapResult diff --git a/src/dbnode/storage/bootstrap/bootstrapper/noop.go b/src/dbnode/storage/bootstrap/bootstrapper/noop.go index e8e2aaf8f1..d68dd700b3 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/noop.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/noop.go @@ -62,8 +62,9 @@ func (noop noOpNoneBootstrapper) String() string { } func (noop noOpNoneBootstrapper) Bootstrap( - ctx context.Context, + _ context.Context, namespaces bootstrap.Namespaces, + _ bootstrap.RunState, ) (bootstrap.NamespaceResults, error) { results := bootstrap.NewNamespaceResults(namespaces) for _, elem := range results.Results.Iter() { @@ -119,8 +120,9 @@ func (noop noOpAllBootstrapper) String() string { } func (noop noOpAllBootstrapper) Bootstrap( - ctx context.Context, + _ context.Context, namespaces bootstrap.Namespaces, + _ bootstrap.RunState, ) (bootstrap.NamespaceResults, error) { return bootstrap.NewNamespaceResults(namespaces), nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index d76efed0df..4b369a66e5 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -95,6 +95,7 @@ type shardPeerAvailability struct { func (s *peersSource) AvailableData( nsMetadata namespace.Metadata, shardTimeRanges result.ShardTimeRanges, + _ bootstrap.RunState, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { if err := s.validateRunOpts(runOpts); err != nil { @@ -106,6 +107,7 @@ func (s *peersSource) AvailableData( func (s *peersSource) AvailableIndex( nsMetadata namespace.Metadata, shardTimeRanges result.ShardTimeRanges, + _ bootstrap.RunState, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { if err := s.validateRunOpts(runOpts); err != nil { @@ -117,6 +119,7 @@ func (s *peersSource) AvailableIndex( func (s *peersSource) Read( ctx context.Context, namespaces bootstrap.Namespaces, + runState bootstrap.RunState, ) (bootstrap.NamespaceResults, error) { ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) defer span.Finish() @@ -193,7 +196,8 @@ func (s *peersSource) Read( namespace.IndexRunOptions.ShardTimeRanges, builder, namespace.IndexRunOptions.RunOptions, - span) + span, + runState.InfoFilesForNamespace(md)) if err != nil { return bootstrap.NamespaceResults{}, err } @@ -661,6 +665,7 @@ func (s *peersSource) readIndex( builder *result.IndexBuilder, opts bootstrap.RunOptions, span opentracing.Span, + infoFilesByShard bootstrap.InfoFileResultsPerShard, ) (result.IndexBootstrapResult, error) { if err := s.validateRunOpts(opts); err != nil { return nil, err @@ -707,6 +712,7 @@ func (s *peersSource) readIndex( Logger: s.log, Span: span, NowFn: s.nowFn, + InfoFilesByShard: infoFilesByShard, }) for timeWindowReaders := range readersCh { @@ -984,7 +990,7 @@ func (s *peersSource) readBlockMetadataAndIndex( } func (s *peersSource) peerAvailability( - nsMetadata namespace.Metadata, + _ namespace.Metadata, shardTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/readers.go b/src/dbnode/storage/bootstrap/bootstrapper/readers.go index a44c823e81..a210914b29 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/readers.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/readers.go @@ -26,7 +26,6 @@ import ( "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/namespace" - "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/bootstrap" @@ -77,6 +76,7 @@ type EnqueueReadersOptions struct { Logger *zap.Logger Span opentracing.Span NowFn clock.NowFn + InfoFilesByShard bootstrap.InfoFileResultsPerShard } // EnqueueReaders into a readers channel grouped by data block. @@ -87,7 +87,6 @@ func EnqueueReaders(opts EnqueueReadersOptions) { // Normal run, open readers enqueueReadersGroupedByBlockSize( opts.NsMD, - opts.RunOpts, opts.FsOpts, opts.ShardTimeRanges, opts.ReaderPool, @@ -97,12 +96,12 @@ func EnqueueReaders(opts EnqueueReadersOptions) { opts.Logger, opts.Span, opts.NowFn, + opts.InfoFilesByShard, ) } func enqueueReadersGroupedByBlockSize( ns namespace.Metadata, - runOpts bootstrap.RunOptions, fsOpts fs.Options, shardTimeRanges result.ShardTimeRanges, readerPool *ReaderPool, @@ -112,31 +111,17 @@ func enqueueReadersGroupedByBlockSize( logger *zap.Logger, span opentracing.Span, nowFn clock.NowFn, + infoFilesByShard bootstrap.InfoFileResultsPerShard, ) { // Group them by block size. groupFn := NewShardTimeRangesTimeWindowGroups groupedByBlockSize := groupFn(shardTimeRanges, blockSize) - // Cache info files by shard. - readInfoFilesResultsByShard := make(map[uint32][]fs.ReadInfoFileResult) - // Now enqueue across all shards by block size. for _, group := range groupedByBlockSize { readers := make(map[ShardID]ShardReaders, group.Ranges.Len()) for shard, tr := range group.Ranges.Iter() { - readInfoFilesResults, ok := readInfoFilesResultsByShard[shard] - if !ok { - start := nowFn() - logger.Debug("enqueue readers read info files start", - zap.Uint32("shard", shard)) - readInfoFilesResults = fs.ReadInfoFiles(fsOpts.FilePathPrefix(), - ns.ID(), shard, fsOpts.InfoReaderBufferSize(), - fsOpts.DecodingOptions(), persist.FileSetFlushType) - logger.Debug("enqueue readers read info files done", - zap.Uint32("shard", shard), - zap.Duration("took", nowFn().Sub(start))) - readInfoFilesResultsByShard[shard] = readInfoFilesResults - } + readInfoFilesResults := infoFilesByShard[shard] shardReaders := newShardReaders(ns, fsOpts, readerPool, shard, tr, optimizedReadMetadataOnly, logger, span, nowFn, readInfoFilesResults) readers[ShardID(shard)] = shardReaders diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go index 1493f189b1..6339717bb2 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go @@ -55,6 +55,7 @@ func newTopologyUninitializedSource(opts Options) bootstrap.Source { func (s *uninitializedTopologySource) AvailableData( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, + _ bootstrap.RunState, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { return s.availability(ns, shardsTimeRanges, runOpts) @@ -63,13 +64,14 @@ func (s *uninitializedTopologySource) AvailableData( func (s *uninitializedTopologySource) AvailableIndex( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, + _ bootstrap.RunState, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { return s.availability(ns, shardsTimeRanges, runOpts) } func (s *uninitializedTopologySource) availability( - ns namespace.Metadata, + _ namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) (result.ShardTimeRanges, error) { @@ -140,6 +142,7 @@ func (s *uninitializedTopologySource) availability( func (s *uninitializedTopologySource) Read( ctx context.Context, namespaces bootstrap.Namespaces, + _ bootstrap.RunState, ) (bootstrap.NamespaceResults, error) { ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperUninitializedSourceRead) defer span.Finish() diff --git a/src/dbnode/storage/bootstrap/process.go b/src/dbnode/storage/bootstrap/process.go index f875918fae..fcb0e575db 100644 --- a/src/dbnode/storage/bootstrap/process.go +++ b/src/dbnode/storage/bootstrap/process.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" @@ -45,6 +46,7 @@ type bootstrapProcessProvider struct { sync.RWMutex processOpts ProcessOptions resultOpts result.Options + fsOpts fs.Options log *zap.Logger bootstrapperProvider BootstrapperProvider } @@ -61,6 +63,7 @@ func NewProcessProvider( bootstrapperProvider BootstrapperProvider, processOpts ProcessOptions, resultOpts result.Options, + fsOpts fs.Options, ) (ProcessProvider, error) { if err := processOpts.Validate(); err != nil { return nil, err @@ -69,6 +72,7 @@ func NewProcessProvider( return &bootstrapProcessProvider{ processOpts: processOpts, resultOpts: resultOpts, + fsOpts: fsOpts, log: resultOpts.InstrumentOptions().Logger(), bootstrapperProvider: bootstrapperProvider, }, nil @@ -102,6 +106,7 @@ func (b *bootstrapProcessProvider) Provide() (Process, error) { return bootstrapProcess{ processOpts: b.processOpts, resultOpts: b.resultOpts, + fsOpts: b.fsOpts, nowFn: b.resultOpts.ClockOptions().NowFn(), log: b.log, bootstrapper: bootstrapper, @@ -147,6 +152,7 @@ func (b *bootstrapProcessProvider) newInitialTopologyState() (*topology.StateSna type bootstrapProcess struct { processOpts ProcessOptions resultOpts result.Options + fsOpts fs.Options nowFn clock.NowFn log *zap.Logger bootstrapper Bootstrapper @@ -164,6 +170,7 @@ func (b bootstrapProcess) Run( namespacesRunSecond := Namespaces{ Namespaces: NewNamespacesMap(NamespacesMapOptions{}), } + finders := make([]InfoFilesFinder, 0, len(namespaces)) for _, namespace := range namespaces { ropts := namespace.Metadata.Options().RetentionOptions() idxopts := namespace.Metadata.Options().IndexOptions() @@ -211,6 +218,16 @@ func (b bootstrapProcess) Run( RunOptions: indexRanges.secondRangeWithPersistFalse.RunOptions, }, }) + finders = append(finders, InfoFilesFinder{ + Namespace: namespace.Metadata, + Shards: namespace.Shards, + }) + } + runState, err := NewRunState(NewRunStateOptions(). + SetFilesystemOptions(b.fsOpts). + SetInfoFilesFinders(finders)) + if err != nil { + return NamespaceResults{}, err } bootstrapResult := NewNamespaceResults(namespacesRunFirst) @@ -218,7 +235,7 @@ func (b bootstrapProcess) Run( namespacesRunFirst, namespacesRunSecond, } { - res, err := b.runPass(ctx, namespaces) + res, err := b.runPass(ctx, namespaces, runState) if err != nil { return NamespaceResults{}, err } @@ -232,6 +249,7 @@ func (b bootstrapProcess) Run( func (b bootstrapProcess) runPass( ctx context.Context, namespaces Namespaces, + runState RunState, ) (NamespaceResults, error) { ctx, span, sampled := ctx.StartSampledTraceSpan(tracepoint.BootstrapProcessRun) defer span.Finish() @@ -258,7 +276,7 @@ func (b bootstrapProcess) runPass( } begin := b.nowFn() - res, err := b.bootstrapper.Bootstrap(ctx, namespaces) + res, err := b.bootstrapper.Bootstrap(ctx, namespaces, runState) took := b.nowFn().Sub(begin) if err != nil { b.log.Error("bootstrap process error", diff --git a/src/dbnode/storage/bootstrap/run_state.go b/src/dbnode/storage/bootstrap/run_state.go new file mode 100644 index 0000000000..15034f3036 --- /dev/null +++ b/src/dbnode/storage/bootstrap/run_state.go @@ -0,0 +1,102 @@ +package bootstrap + +import ( + "errors" + + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/persist/fs" +) + +var ( + errFilesystemOptsNotSet = errors.New("filesystemOptions not set") + errInfoFilesFindersNotSet = errors.New("infoFilesFinders not set") +) + +// NewRunState creates state specifically to be used during the bootstrap process. +// Primarily a mechanism for passing info files along without needing to re-read them at each +// stage of the bootstrap process. +func NewRunState(options RunStateOptions) (RunState, error) { + // TODO(nate): enable this once properly instantiating in NamespacesTester in util.go + // if err := options.Validate(); err != nil { + // return RunState{}, err + // } + return RunState{ + fsOpts: options.FilesystemOptions(), + infoFilesFinders: options.InfoFilesFinders(), + }, nil +} + +// InfoFilesForNamespace returns the info files grouped by shard for the provided namespace. +func (r *RunState) InfoFilesForNamespace(ns namespace.Metadata) InfoFileResultsPerShard { + return r.ReadInfoFiles()[ns] +} + +// TODO(nate): Make this threadsafe? If so, we'll need to clone the map +// before returning, provide an update method, and incorporate locking. +// +// ReadInfoFiles returns info file results for each shard grouped by namespace. A cached copy +// is returned if the info files have already been read. A re-fetch can be triggered by passing +// in invalidateCache as true. +func (r *RunState) ReadInfoFiles() InfoFilesByNamespace { + if r.infoFilesByNamespace != nil { + return r.infoFilesByNamespace + } + + r.infoFilesByNamespace = make(InfoFilesByNamespace, len(r.infoFilesFinders)) + for _, finder := range r.infoFilesFinders { + result := make(InfoFileResultsPerShard, len(finder.Shards)) + for _, shard := range finder.Shards { + result[shard] = fs.ReadInfoFiles(r.fsOpts.FilePathPrefix(), + finder.Namespace.ID(), shard, r.fsOpts.InfoReaderBufferSize(), r.fsOpts.DecodingOptions(), + persist.FileSetFlushType) + } + + r.infoFilesByNamespace[finder.Namespace] = result + } + + return r.infoFilesByNamespace +} + +type runStateOptions struct { + fsOpts fs.Options + infoFilesFinders []InfoFilesFinder +} + +// NewRunStateOptions creates new RunStateOptions. +func NewRunStateOptions() RunStateOptions { + return &runStateOptions{} +} + +func (r *runStateOptions) Validate() error { + if r.fsOpts == nil { + return errFilesystemOptsNotSet + } + if err := r.fsOpts.Validate(); err != nil { + return err + } + if len(r.infoFilesFinders) == 0 { + return errInfoFilesFindersNotSet + } + return nil +} + +func (r *runStateOptions) SetFilesystemOptions(value fs.Options) RunStateOptions { + opts := *r + opts.fsOpts = value + return &opts +} + +func (r *runStateOptions) FilesystemOptions() fs.Options { + return r.fsOpts +} + +func (r *runStateOptions) SetInfoFilesFinders(value []InfoFilesFinder) RunStateOptions { + opts := *r + opts.infoFilesFinders = value + return &opts +} + +func (r *runStateOptions) InfoFilesFinders() []InfoFilesFinder { + return r.infoFilesFinders +} diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index f6bb4b932b..3947e9e67a 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/topology" @@ -390,7 +391,7 @@ type Bootstrapper interface { // A bootstrapper should only return an error should it want to entirely // cancel the bootstrapping of the node, i.e. non-recoverable situation // like not being able to read from the filesystem. - Bootstrap(ctx context.Context, namespaces Namespaces) (NamespaceResults, error) + Bootstrap(ctx context.Context, namespaces Namespaces, runState RunState) (NamespaceResults, error) } // Source represents a bootstrap source. Note that a source can and will be reused so @@ -401,6 +402,7 @@ type Source interface { AvailableData( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, + runState RunState, runOpts RunOptions, ) (result.ShardTimeRanges, error) @@ -408,6 +410,7 @@ type Source interface { AvailableIndex( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, + runState RunState, opts RunOptions, ) (result.ShardTimeRanges, error) @@ -416,5 +419,44 @@ type Source interface { // A bootstrapper source should only return an error should it want to // entirely cancel the bootstrapping of the node, i.e. non-recoverable // situation like not being able to read from the filesystem. - Read(ctx context.Context, namespaces Namespaces) (NamespaceResults, error) + Read(ctx context.Context, namespaces Namespaces, runState RunState) (NamespaceResults, error) +} + +// InfoFileResultsPerShard maps shards to info files. +type InfoFileResultsPerShard map[uint32][]fs.ReadInfoFileResult + +// InfoFilesByNamespace maps a namespace to info files grouped by shard. +type InfoFilesByNamespace map[namespace.Metadata]InfoFileResultsPerShard + +// RunState provides a snapshot of info files for use throughout all stages of the bootstrap. +type RunState struct { + fsOpts fs.Options + infoFilesFinders []InfoFilesFinder + infoFilesByNamespace InfoFilesByNamespace +} + +// RunStateOptions represents the options for RunState. +type RunStateOptions interface { + // Validate will validate the options and return an error if not valid. + Validate() error + + // SetFilesystemOptions sets the filesystem options. + SetFilesystemOptions(value fs.Options) RunStateOptions + + // FilesystemOptions returns the filesystem options. + FilesystemOptions() fs.Options + + // SetInfoFilesFinders sets the finders used to load info files for all namespaces provided. + SetInfoFilesFinders(value []InfoFilesFinder) RunStateOptions + + // InfoFilesFinders returns the finders used to load info files for all namespaces provided. + InfoFilesFinders() []InfoFilesFinder +} + +// InfoFilesFinder is used to lookup info files for the given combination of namespace and shards. +type InfoFilesFinder struct { + // Namespace is the namespace to retrieve info files for. + Namespace namespace.Metadata + // Shards are the shards to retrieve info files for in the specified namespace. + Shards []uint32 } diff --git a/src/dbnode/storage/bootstrap/util.go b/src/dbnode/storage/bootstrap/util.go index 560b9fc48f..d291cbf37c 100644 --- a/src/dbnode/storage/bootstrap/util.go +++ b/src/dbnode/storage/bootstrap/util.go @@ -307,6 +307,8 @@ type NamespacesTester struct { // Namespaces are the namespaces for this tester. Namespaces Namespaces + // RunState is a snapshot of state useful during bootstrapping. + RunState RunState // Results are the namespace results after bootstrapping. Results NamespaceResults } @@ -359,6 +361,7 @@ func BuildNamespacesTesterWithReaderIteratorPool( ctrl := xtest.NewController(t) namespacesMap := NewNamespacesMap(NamespacesMapOptions{}) accumulators := make([]*TestDataAccumulator, 0, len(mds)) + finders := make([]InfoFilesFinder, 0, len(mds)) for _, md := range mds { nsCtx := namespace.NewContextFrom(md) acc := &TestDataAccumulator{ @@ -388,13 +391,22 @@ func BuildNamespacesTesterWithReaderIteratorPool( RunOptions: runOpts, }, }) + finders = append(finders, InfoFilesFinder{ + Namespace: md, + Shards: shards, + }) } + // TODO(nate): configure and set fs opts + runState, err := NewRunState(NewRunStateOptions(). + SetInfoFilesFinders(finders)) + require.NoError(t, err) return NamespacesTester{ t: t, ctrl: ctrl, pool: iterPool, Accumulators: accumulators, + RunState: runState, Namespaces: Namespaces{ Namespaces: namespacesMap, }, @@ -540,7 +552,7 @@ func (nt *NamespacesTester) ResultForNamespace(id ident.ID) NamespaceResult { func (nt *NamespacesTester) TestBootstrapWith(b Bootstrapper) { ctx := context.NewContext() defer ctx.Close() - res, err := b.Bootstrap(ctx, nt.Namespaces) + res, err := b.Bootstrap(ctx, nt.Namespaces, nt.RunState) assert.NoError(nt.t, err) nt.Results = res } @@ -550,7 +562,7 @@ func (nt *NamespacesTester) TestBootstrapWith(b Bootstrapper) { func (nt *NamespacesTester) TestReadWith(s Source) { ctx := context.NewContext() defer ctx.Close() - res, err := s.Read(ctx, nt.Namespaces) + res, err := s.Read(ctx, nt.Namespaces, nt.RunState) require.NoError(nt.t, err) nt.Results = res }