Skip to content

Commit

Permalink
[dbnode] Add migration configuration and options (#2519)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles authored Aug 10, 2020
1 parent a801229 commit 6e8840e
Show file tree
Hide file tree
Showing 9 changed files with 378 additions and 1 deletion.
37 changes: 36 additions & 1 deletion src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/migration"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
Expand Down Expand Up @@ -70,18 +71,51 @@ type BootstrapConfiguration struct {
type BootstrapFilesystemConfiguration struct {
// NumProcessorsPerCPU is the number of processors per CPU.
NumProcessorsPerCPU float64 `yaml:"numProcessorsPerCPU" validate:"min=0.0"`

// Migration configuration specifies what version, if any, existing data filesets should be migrated to
// if necessary.
Migration *BootstrapMigrationConfiguration `yaml:"migration"`
}

func (c BootstrapFilesystemConfiguration) numCPUs() int {
return int(math.Ceil(float64(c.NumProcessorsPerCPU * float64(runtime.NumCPU()))))
}

func (c BootstrapFilesystemConfiguration) migration() BootstrapMigrationConfiguration {
if cfg := c.Migration; cfg != nil {
return *cfg
}
return BootstrapMigrationConfiguration{}
}

func newDefaultBootstrapFilesystemConfiguration() BootstrapFilesystemConfiguration {
return BootstrapFilesystemConfiguration{
NumProcessorsPerCPU: defaultNumProcessorsPerCPU,
Migration: &BootstrapMigrationConfiguration{},
}
}

// BootstrapMigrationConfiguration specifies configuration for data migrations during bootstrapping.
type BootstrapMigrationConfiguration struct {
// TargetMigrationVersion indicates that we should attempt to upgrade filesets to
// what’s expected of the specified version.
TargetMigrationVersion migration.MigrationVersion `yaml:"targetMigrationVersion"`

// Concurrency sets the number of concurrent workers performing migrations.
Concurrency int `yaml:"concurrency"`
}

// NewOptions generates migration.Options from the configuration.
func (m BootstrapMigrationConfiguration) NewOptions() migration.Options {
opts := migration.NewOptions().SetTargetMigrationVersion(m.TargetMigrationVersion)

if m.Concurrency > 0 {
opts = opts.SetConcurrency(m.Concurrency)
}

return opts
}

// BootstrapCommitlogConfiguration specifies config for the commitlog bootstrapper.
type BootstrapCommitlogConfiguration struct {
// ReturnUnfulfilledForCorruptCommitLogFiles controls whether the commitlog bootstrapper
Expand Down Expand Up @@ -182,7 +216,8 @@ func (bsc BootstrapConfiguration) New(
SetCompactor(compactor).
SetBoostrapDataNumProcessors(fsCfg.numCPUs()).
SetRuntimeOptionsManager(opts.RuntimeOptionsManager()).
SetIdentifierPool(opts.IdentifierPool())
SetIdentifierPool(opts.IdentifierPool()).
SetMigrationOptions(fsCfg.migration().NewOptions())
if err := validator.ValidateFilesystemBootstrapperOptions(fsbOpts); err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func TestConfiguration(t *testing.T) {
- noop-all
fs:
numProcessorsPerCPU: 0.42
migration: null
commitlog:
returnUnfulfilledForCorruptCommitLogFiles: false
peers: null
Expand Down
71 changes: 71 additions & 0 deletions src/dbnode/persist/fs/migration/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package migration

import (
"fmt"
"runtime"
)

// defaultMigrationConcurrency is the default number of concurrent workers to perform migrations.
var defaultMigrationConcurrency = runtime.NumCPU()

type options struct {
targetMigrationVersion MigrationVersion
concurrency int
}

// NewOptions creates new migration options.
func NewOptions() Options {
return &options{
concurrency: defaultMigrationConcurrency,
}
}

func (o *options) Validate() error {
if err := ValidateMigrationVersion(o.targetMigrationVersion); err != nil {
return err
}
if o.concurrency < 1 {
return fmt.Errorf("concurrency value %d must be >= 1", o.concurrency)
}
return nil
}

func (o *options) SetTargetMigrationVersion(value MigrationVersion) Options {
opts := *o
opts.targetMigrationVersion = value
return &opts
}

func (o *options) TargetMigrationVersion() MigrationVersion {
return o.targetMigrationVersion
}

func (o *options) SetConcurrency(value int) Options {
opts := *o
opts.concurrency = value
return &opts
}

func (o *options) Concurrency() int {
return o.concurrency
}
51 changes: 51 additions & 0 deletions src/dbnode/persist/fs/migration/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package migration

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestOptionsTargetMigrationVersion(t *testing.T) {
opts := NewOptions()
require.Equal(t, MigrationVersionNone, opts.TargetMigrationVersion())

opts = opts.SetTargetMigrationVersion(MigrationVersion_1_1)
require.Equal(t, MigrationVersion_1_1, opts.TargetMigrationVersion())
}

func TestOptionsConcurrency(t *testing.T) {
opts := NewOptions()
require.Equal(t, defaultMigrationConcurrency, opts.Concurrency())

opts = opts.SetConcurrency(100)
require.Equal(t, 100, opts.Concurrency())
}

func TestOptionsValidate(t *testing.T) {
opts := NewOptions()
require.NoError(t, opts.Validate())

require.Error(t, opts.SetTargetMigrationVersion(2).Validate())
require.Error(t, opts.SetConcurrency(0).Validate())
}
42 changes: 42 additions & 0 deletions src/dbnode/persist/fs/migration/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package migration

// Options represents the options for migrations.
type Options interface {
// Validate validates migration options.
Validate() error

// SetTargetMigrationVersion sets the target version for a migration
SetTargetMigrationVersion(value MigrationVersion) Options

// TargetMigrationVersion is the target version for a migration.
TargetMigrationVersion() MigrationVersion

// SetConcurrency sets the number of concurrent workers performing migrations.
SetConcurrency(value int) Options

// Concurrency gets the number of concurrent workers performing migrations.
Concurrency() int
}

// MigrationVersion is an enum that corresponds to the major and minor version number to migrate data files to.
type MigrationVersion uint
87 changes: 87 additions & 0 deletions src/dbnode/persist/fs/migration/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package migration

import "fmt"

const (
// MigrationVersionNone indicates node should not attempt to perform any migrations.
MigrationVersionNone MigrationVersion = iota
// MigrationVersion_1_1 indicates node should attempt to migrate data files up to version 1.1.
MigrationVersion_1_1
)

var (
validMigrationVersions = []MigrationVersion{
MigrationVersionNone,
MigrationVersion_1_1,
}
)

func (m *MigrationVersion) String() string {
switch *m {
case MigrationVersionNone:
return "none"
case MigrationVersion_1_1:
return "1.1"
default:
return "unknown"
}
}

// ParseMigrationVersion parses a string for a MigrationVersion.
func ParseMigrationVersion(str string) (MigrationVersion, error) {
for _, valid := range validMigrationVersions {
if str == valid.String() {
return valid, nil
}
}

return 0, fmt.Errorf("unrecognized migrate version: %v", str)
}

// ValidateMigrationVersion validates a stored metrics type.
func ValidateMigrationVersion(m MigrationVersion) error {
for _, valid := range validMigrationVersions {
if valid == m {
return nil
}
}

return fmt.Errorf("invalid migrate version '%v': should be one of %v",
m, validMigrationVersions)
}

// UnmarshalYAML unmarshals a migrate version.
func (m *MigrationVersion) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
if err := unmarshal(&str); err != nil {
return err
}

if value, err := ParseMigrationVersion(str); err == nil {
*m = value
return nil
}

return fmt.Errorf("invalid MigrationVersion '%s' valid types are: %v",
str, validMigrationVersions)
}
63 changes: 63 additions & 0 deletions src/dbnode/persist/fs/migration/version_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package migration

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
yaml "gopkg.in/yaml.v2"
)

func TestParseMigrateVersion(t *testing.T) {
v, err := ParseMigrationVersion("none")
require.NoError(t, err)
require.Equal(t, MigrationVersionNone, v)

v, err = ParseMigrationVersion("1.1")
require.NoError(t, err)
require.Equal(t, MigrationVersion_1_1, v)
}

func TestValidateMigrateVersion(t *testing.T) {
err := ValidateMigrationVersion(MigrationVersion_1_1)
require.NoError(t, err)

err = ValidateMigrationVersion(2)
require.Error(t, err)
}

func TestUnmarshalYAML(t *testing.T) {
type config struct {
Version MigrationVersion `yaml:"version"`
}

for _, value := range validMigrationVersions {
str := fmt.Sprintf("version: %s\n", value.String())
var cfg config
require.NoError(t, yaml.Unmarshal([]byte(str), &cfg))
require.Equal(t, value, cfg.Version)
}

var cfg config
require.Error(t, yaml.Unmarshal([]byte("version: abc"), &cfg))
}
Loading

0 comments on commit 6e8840e

Please sign in to comment.