Skip to content

Commit

Permalink
Add RunState and thread through bootstrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles committed Sep 6, 2020
1 parent 9f176c8 commit dc2c4cb
Show file tree
Hide file tree
Showing 20 changed files with 247 additions and 98 deletions.
2 changes: 1 addition & 1 deletion src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (bsc BootstrapConfiguration) New(
if bsc.CacheSeriesMetadata != nil {
providerOpts = providerOpts.SetCacheSeriesMetadata(*bsc.CacheSeriesMetadata)
}
return bootstrap.NewProcessProvider(bs, providerOpts, rsOpts)
return bootstrap.NewProcessProvider(bs, providerOpts, rsOpts, fsOpts)
}

func (bsc BootstrapConfiguration) filesystemConfig() BootstrapFilesystemConfiguration {
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func newDefaultBootstrappableTestSetups(
processOpts := bootstrap.NewProcessOptions().
SetTopologyMapProvider(setup).
SetOrigin(setup.Origin())
provider, err := bootstrap.NewProcessProvider(fsBootstrapper, processOpts, bsOpts)
provider, err := bootstrap.NewProcessProvider(fsBootstrapper, processOpts, bsOpts, fsOpts)
require.NoError(t, err)

setup.SetStorageOpts(setup.StorageOpts().SetBootstrapProcessProvider(provider))
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ func (ts *testSetup) InitializeBootstrappers(opts InitializeBootstrappersOptions
processOpts := bootstrap.NewProcessOptions().
SetTopologyMapProvider(ts).
SetOrigin(ts.Origin())
process, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts)
process, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts, fsOpts)
if err != nil {
return err
}
Expand Down
6 changes: 0 additions & 6 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,9 +622,3 @@ type Segments interface {
AbsoluteFilePaths() []string
BlockStart() time.Time
}

// InfoFileResultsPerShard maps shards to info files.
type InfoFileResultsPerShard map[uint32][]ReadInfoFileResult

// InfoFilesByNamespace maps a namespace to info files grouped by shard.
type InfoFilesByNamespace map[namespace.Metadata]InfoFileResultsPerShard
9 changes: 5 additions & 4 deletions src/dbnode/storage/bootstrap/bootstrapper/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (b baseBootstrapper) String() string {
func (b baseBootstrapper) Bootstrap(
ctx context.Context,
namespaces bootstrap.Namespaces,
runState bootstrap.RunState,
) (bootstrap.NamespaceResults, error) {
logFields := []zapcore.Field{
zap.String("bootstrapper", b.name),
Expand All @@ -96,7 +97,7 @@ func (b baseBootstrapper) Bootstrap(
logFields, currNamespace)

dataAvailable, err := b.src.AvailableData(currNamespace.Metadata,
currNamespace.DataRunOptions.ShardTimeRanges.Copy(),
currNamespace.DataRunOptions.ShardTimeRanges.Copy(), runState,
currNamespace.DataRunOptions.RunOptions)
if err != nil {
return bootstrap.NamespaceResults{}, err
Expand All @@ -107,7 +108,7 @@ func (b baseBootstrapper) Bootstrap(
// Prepare index if required.
if currNamespace.Metadata.Options().IndexOptions().Enabled() {
indexAvailable, err := b.src.AvailableIndex(currNamespace.Metadata,
currNamespace.IndexRunOptions.ShardTimeRanges.Copy(),
currNamespace.IndexRunOptions.ShardTimeRanges.Copy(), runState,
currNamespace.IndexRunOptions.RunOptions)
if err != nil {
return bootstrap.NamespaceResults{}, err
Expand Down Expand Up @@ -137,7 +138,7 @@ func (b baseBootstrapper) Bootstrap(
b.log.Info("bootstrap from source started", logFields...)

// Run the bootstrap source.
currResults, err := b.src.Read(ctx, curr)
currResults, err := b.src.Read(ctx, curr, runState)

logFields = append(logFields, zap.Duration("took", nowFn().Sub(begin)))
if err != nil {
Expand Down Expand Up @@ -166,7 +167,7 @@ func (b baseBootstrapper) Bootstrap(
// If there are some time ranges the current bootstrapper could not fulfill,
// that we can attempt then pass it along to the next bootstrapper.
if next.Namespaces.Len() > 0 {
nextResults, err := b.next.Bootstrap(ctx, next)
nextResults, err := b.next.Bootstrap(ctx, next, runState)
if err != nil {
return bootstrap.NamespaceResults{}, err
}
Expand Down
10 changes: 6 additions & 4 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func newCommitLogSource(
func (s *commitLogSource) AvailableData(
ns namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
_ bootstrap.RunState,
runOpts bootstrap.RunOptions,
) (result.ShardTimeRanges, error) {
return s.availability(ns, shardsTimeRanges, runOpts)
Expand All @@ -154,6 +155,7 @@ func (s *commitLogSource) AvailableData(
func (s *commitLogSource) AvailableIndex(
ns namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
_ bootstrap.RunState,
runOpts bootstrap.RunOptions,
) (result.ShardTimeRanges, error) {
return s.availability(ns, shardsTimeRanges, runOpts)
Expand All @@ -171,6 +173,7 @@ type readNamespaceResult struct {
func (s *commitLogSource) Read(
ctx context.Context,
namespaces bootstrap.Namespaces,
runState bootstrap.RunState,
) (bootstrap.NamespaceResults, error) {
ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperCommitLogSourceRead)
defer span.Finish()
Expand Down Expand Up @@ -235,7 +238,7 @@ func (s *commitLogSource) Read(
for shard, tr := range shardTimeRanges.Iter() {
err := s.bootstrapShardSnapshots(
ns.Metadata, accumulator, shard, tr, blockSize,
mostRecentCompleteSnapshotByBlockShard)
mostRecentCompleteSnapshotByBlockShard, runState)
if err != nil {
return bootstrap.NamespaceResults{}, err
}
Expand Down Expand Up @@ -661,14 +664,13 @@ func (s *commitLogSource) bootstrapShardSnapshots(
shardTimeRanges xtime.Ranges,
blockSize time.Duration,
mostRecentCompleteSnapshotByBlockShard map[xtime.UnixNano]map[uint32]fs.FileSetFile,
runState bootstrap.RunState,
) error {
// NB(bodu): We use info files on disk to check if a snapshot should be loaded in as cold or warm.
// We do this instead of cross refing blockstarts and current time to handle the case of bootstrapping a
// once warm block start after a node has been shut down for a long time. We consider all block starts we
// haven't flushed data for yet a warm block start.
fsOpts := s.opts.CommitLogOptions().FilesystemOptions()
readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), ns.ID(), shard,
fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions(), persist.FileSetFlushType)
readInfoFilesResults := runState.InfoFilesForNamespace(ns)[shard]
shardBlockStartsOnDisk := make(map[xtime.UnixNano]struct{})
for _, result := range readInfoFilesResults {
if err := result.Err.Error(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/migration"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/instrument"
Expand All @@ -44,7 +45,7 @@ type worker struct {
// the info files.
type Migrator struct {
migrationTaskFn MigrationTaskFn
infoFilesByNamespace fs.InfoFilesByNamespace
infoFilesByNamespace bootstrap.InfoFilesByNamespace
migrationOpts migration.Options
fsOpts fs.Options
instrumentOpts instrument.Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs/migration"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
Expand All @@ -50,7 +51,7 @@ func TestMigratorRun(t *testing.T) {

// Create some dummy ReadInfoFileResults as these are used to determine if we need to run a migration or not.
// Put some in a state requiring migrations and others not to flex both paths.
infoFilesByNamespace := fs.InfoFilesByNamespace{
infoFilesByNamespace := bootstrap.InfoFilesByNamespace{
md1: {
1: {testInfoFileWithVolumeIndex(0), testInfoFileWithVolumeIndex(1)},
2: {testInfoFileWithVolumeIndex(0), testInfoFileWithVolumeIndex(1)},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/migration"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/x/instrument"
)

Expand All @@ -40,7 +41,7 @@ var (

type options struct {
migrationTaskFn MigrationTaskFn
infoFilesByNamespace fs.InfoFilesByNamespace
infoFilesByNamespace bootstrap.InfoFilesByNamespace
migrationOpts migration.Options
fsOpts fs.Options
instrumentOpts instrument.Options
Expand Down Expand Up @@ -93,13 +94,13 @@ func (o *options) MigrationTaskFn() MigrationTaskFn {
return o.migrationTaskFn
}

func (o *options) SetInfoFilesByNamespace(value fs.InfoFilesByNamespace) Options {
func (o *options) SetInfoFilesByNamespace(value bootstrap.InfoFilesByNamespace) Options {
opts := *o
opts.infoFilesByNamespace = value
return &opts
}

func (o *options) InfoFilesByNamespace() fs.InfoFilesByNamespace {
func (o *options) InfoFilesByNamespace() bootstrap.InfoFilesByNamespace {
return o.infoFilesByNamespace
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/migration"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
xtest "github.com/m3db/m3/src/x/test"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -109,7 +110,7 @@ func newTestOptions(ctrl *gomock.Controller) Options {
SetMigrationTaskFn(func(result fs.ReadInfoFileResult) (migration.NewTaskFn, bool) {
return nil, false
}).
SetInfoFilesByNamespace(make(fs.InfoFilesByNamespace)).
SetInfoFilesByNamespace(make(bootstrap.InfoFilesByNamespace)).
SetStorageOptions(mockOpts).
SetFilesystemOptions(fs.NewOptions())
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/migration"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/x/instrument"
)

Expand All @@ -42,10 +43,10 @@ type Options interface {
MigrationTaskFn() MigrationTaskFn

// SetInfoFilesByNamespaces sets the info file results to operate on keyed by namespace.
SetInfoFilesByNamespace(value fs.InfoFilesByNamespace) Options
SetInfoFilesByNamespace(value bootstrap.InfoFilesByNamespace) Options

// InfoFilesByNamespaces returns the info file results to operate on keyed by namespace.
InfoFilesByNamespace() fs.InfoFilesByNamespace
InfoFilesByNamespace() bootstrap.InfoFilesByNamespace

// SetMigrationOptions sets the migration options.
SetMigrationOptions(value migration.Options) Options
Expand Down
Loading

0 comments on commit dc2c4cb

Please sign in to comment.