Skip to content

Commit

Permalink
Hook migrations into fs bootstrap process
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles committed Aug 10, 2020
1 parent 005bddc commit 945e340
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 5 deletions.
3 changes: 2 additions & 1 deletion src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func (bsc BootstrapConfiguration) New(
SetBoostrapDataNumProcessors(fsCfg.numCPUs()).
SetRuntimeOptionsManager(opts.RuntimeOptionsManager()).
SetIdentifierPool(opts.IdentifierPool()).
SetMigrationOptions(fsCfg.migration().NewOptions())
SetMigrationOptions(fsCfg.migration().NewOptions()).
SetStorageOptions(opts)
if err := validator.ValidateFilesystemBootstrapperOptions(fsbOpts); err != nil {
return nil, err
}
Expand Down
13 changes: 13 additions & 0 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/migration"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/index/compaction"
Expand Down Expand Up @@ -69,6 +70,7 @@ type options struct {
runtimeOptsMgr runtime.OptionsManager
identifierPool ident.Pool
migrationOpts migration.Options
storageOpts storage.Options
}

// NewOptions creates new bootstrap options
Expand All @@ -87,6 +89,7 @@ func NewOptions() Options {
runtimeOptsMgr: runtime.NewOptionsManager(),
identifierPool: idPool,
migrationOpts: migration.NewOptions(),
storageOpts: storage.NewOptions(),
}
}

Expand Down Expand Up @@ -221,3 +224,13 @@ func (o *options) SetMigrationOptions(value migration.Options) Options {
func (o *options) MigrationOptions() migration.Options {
return o.migrationOpts
}

func (o *options) SetStorageOptions(value storage.Options) Options {
opts := *o
opts.storageOpts = value
return &opts
}

func (o *options) StorageOptions() storage.Options {
return o.storageOpts
}
32 changes: 31 additions & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/migration"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/index/convert"
Expand Down Expand Up @@ -151,6 +153,9 @@ func (s *fileSystemSource) Read(
// Preload info file results so they can be used to bootstrap data filesets and data migrations
infoFilesByNamespace := s.loadInfoFiles(namespaces)

// Perform any necessary migrations but don't block bootstrap process on failure
s.runMigrations(infoFilesByNamespace)

// NB(r): Perform all data bootstrapping first then index bootstrapping
// to more clearly deliniate which process is slower than the other.
nowFn := s.opts.ResultOptions().ClockOptions().NowFn()
Expand Down Expand Up @@ -219,6 +224,30 @@ func (s *fileSystemSource) Read(
return results, nil
}

func (s *fileSystemSource) runMigrations(infoFilesByNamespace map[namespace.Metadata]fs.InfoFileResultsPerShard) {
// Only one migration for now, so just short circuit entirely if not enabled
if s.opts.MigrationOptions().ToVersion() != migration.MigrateVersion_1_1 {
return
}

migrator, err := migrator.NewMigrator(migrator.NewOptions().
SetNewMigrationFn(migration.NewToVersion1_1Task).
SetShouldMigrateFn(migration.ShouldMigrateToVersion1_1).
SetInfoFilesByNamespace(infoFilesByNamespace).
SetMigrationOptions(s.opts.MigrationOptions()).
SetFilesystemOptions(s.fsopts).
SetInstrumentOptions(s.opts.InstrumentOptions()).
SetStorageOptions(s.opts.StorageOptions()))
if err != nil {
s.log.Error("error creating migrator. continuing bootstrap", zap.Error(err))
}

err = migrator.Run()
if err != nil {
s.log.Error("error performing migrations. continuing bootstrap", zap.Error(err))
}
}

func (s *fileSystemSource) availability(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
Expand Down Expand Up @@ -909,7 +938,8 @@ func (s *fileSystemSource) loadInfoFiles(
result := make(fs.InfoFileResultsPerShard, shardTimeRanges.Len())
for shard := range shardTimeRanges.Iter() {
result[shard] = fs.ReadInfoFiles(s.fsopts.FilePathPrefix(),
namespace.Metadata.ID(), shard, s.fsopts.InfoReaderBufferSize(), s.fsopts.DecodingOptions())
namespace.Metadata.ID(), shard, s.fsopts.InfoReaderBufferSize(), s.fsopts.DecodingOptions(),
persist.FileSetFlushType)
}

infoFilesByNamespace[namespace.Metadata] = result
Expand Down
70 changes: 67 additions & 3 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/migration"
"github.com/m3db/m3/src/dbnode/persist/fs/msgpack"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/index"
Expand All @@ -44,6 +48,7 @@ import (
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"
Expand Down Expand Up @@ -191,6 +196,10 @@ func testBootstrappingIndexShardTimeRanges() result.ShardTimeRanges {
}

func writeGoodFiles(t *testing.T, dir string, namespace ident.ID, shard uint32) {
writeGoodFilesWithFsOpts(t, namespace, shard, newTestFsOptions(dir))
}

func writeGoodFilesWithFsOpts(t *testing.T, namespace ident.ID, shard uint32, fsOpts fs.Options) {
inputs := []struct {
start time.Time
id string
Expand All @@ -203,8 +212,8 @@ func writeGoodFiles(t *testing.T, dir string, namespace ident.ID, shard uint32)
}

for _, input := range inputs {
writeTSDBFiles(t, dir, namespace, shard, input.start,
[]testSeries{{input.id, input.tags, input.data}})
writeTSDBFilesWithFsOpts(t, namespace, shard, input.start,
[]testSeries{{input.id, input.tags, input.data}}, fsOpts)
}
}

Expand Down Expand Up @@ -246,7 +255,18 @@ func writeTSDBFiles(
start time.Time,
series []testSeries,
) {
w, err := fs.NewWriter(newTestFsOptions(dir))
writeTSDBFilesWithFsOpts(t, namespace, shard, start, series, newTestFsOptions(dir))
}

func writeTSDBFilesWithFsOpts(
t require.TestingT,
namespace ident.ID,
shard uint32,
start time.Time,
series []testSeries,
opts fs.Options,
) {
w, err := fs.NewWriter(opts)
require.NoError(t, err)
writerOpts := fs.DataWriterOpenOptions{
Identifier: fs.FileSetFileIdentifier{
Expand Down Expand Up @@ -859,3 +879,47 @@ func TestReadTags(t *testing.T) {
require.Equal(t, tags, reader.Tags)
tester.EnsureNoWrites()
}

func TestReadRunMigrations(t *testing.T) {
dir := createTempDir(t)
defer os.RemoveAll(dir)

// Write existing data filesets with legacy encoding
eOpts := msgpack.LegacyEncodingOptions{
EncodeLegacyIndexInfoVersion: msgpack.LegacyEncodingIndexVersionV4, // MinorVersion 0
EncodeLegacyIndexEntryVersion: msgpack.LegacyEncodingIndexEntryVersionV2, // No checksum data
}
enc := msgpack.NewEncoderWithOptions(eOpts)

writeGoodFilesWithFsOpts(t, testNs1ID, testShard, newTestFsOptions(dir).SetEncoder(enc))

opts := newTestOptions(t, dir)
sOpts, closer := newTestStorageOptions(t, opts.PersistManager())
defer closer()

src, err := newFileSystemSource(opts.
SetMigrationOptions(migration.NewOptions().
SetToVersion(migration.MigrateVersion_1_1)).
SetStorageOptions(sOpts))
require.NoError(t, err)

validateReadResults(t, src, dir, testShardTimeRanges())
}

func newTestStorageOptions(t *testing.T, pm persist.Manager) (storage.Options, index.Closer) {
plCache, closer, err := index.NewPostingsListCache(1, index.PostingsListCacheOptions{
InstrumentOptions: instrument.NewOptions(),
})
require.NoError(t, err)

md, err := namespace.NewMetadata(testNs1ID, testNamespaceOptions)
require.NoError(t, err)

return storage.NewOptions().
SetPersistManager(pm).
SetNamespaceInitializer(namespace.NewStaticInitializer([]namespace.Metadata{md})).
SetRepairEnabled(false).
SetIndexOptions(index.NewOptions().
SetPostingsListCache(plCache)).
SetBlockLeaseManager(block.NewLeaseManager(nil)), closer
}
7 changes: 7 additions & 0 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/migration"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/index/compaction"
Expand Down Expand Up @@ -108,4 +109,10 @@ type Options interface {

// MigrationOptions gets the migration options.
MigrationOptions() migration.Options

// SetStorageOptions sets storage options.
SetStorageOptions(value storage.Options) Options

// StorageOptions gets the storage options.
StorageOptions() storage.Options
}

0 comments on commit 945e340

Please sign in to comment.