Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Add config option to set repair strategy (default/full_sweep) and concurrency #3573

Merged
merged 14 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ type RepairPolicy struct {
// Type is the type of repair to run.
Type repair.Type `yaml:"type"`

// Strategy is the type of repair strategy to use.
Strategy repair.Strategy `yaml:"strategy"`

// Force the repair to run regardless of whether namespaces have repair enabled or not.
Force bool `yaml:"force"`

Expand All @@ -549,6 +552,9 @@ type RepairPolicy struct {
// The repair check interval.
CheckInterval time.Duration `yaml:"checkInterval"`

// Concurrency sets the repair shard concurrency if set.
Concurrency int `yaml:"concurrency"`

// Whether debug shadow comparisons are enabled.
DebugShadowComparisonsEnabled bool `yaml:"debugShadowComparisonsEnabled"`

Expand Down
2 changes: 2 additions & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,11 @@ func TestConfiguration(t *testing.T) {
repair:
enabled: false
type: 0
strategy: 0
force: false
throttle: 2m0s
checkInterval: 1m0s
concurrency: 0
debugShadowComparisonsEnabled: false
debugShadowComparisonsPercentage: 0
replication: null
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,7 @@ func Run(runOpts RunOptions) {
if repairCfg := cfg.Repair; repairCfg != nil {
repairOpts = repairOpts.
SetType(repairCfg.Type).
SetStrategy(repairCfg.Strategy).
SetForce(repairCfg.Force).
SetResultOptions(rsOpts).
SetDebugShadowComparisonsEnabled(cfg.Repair.DebugShadowComparisonsEnabled)
Expand All @@ -933,6 +934,9 @@ func Run(runOpts RunOptions) {
if cfg.Repair.CheckInterval > 0 {
repairOpts = repairOpts.SetRepairCheckInterval(cfg.Repair.CheckInterval)
}
if cfg.Repair.Concurrency > 0 {
repairOpts = repairOpts.SetRepairShardConcurrency(cfg.Repair.Concurrency)
}

if cfg.Repair.DebugShadowComparisonsPercentage > 0 {
// Set conditionally to avoid stomping on the default value of 1.0.
Expand Down
23 changes: 22 additions & 1 deletion src/dbnode/storage/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,20 @@ func (r *dbRepairer) Repair() error {
return err
}

var (
strategy = r.ropts.Strategy()
repairBlockStartShortCircuitRepair bool
)
switch strategy {
case repair.DefaultStrategy:
repairBlockStartShortCircuitRepair = true
case repair.FullSweepStrategy:
repairBlockStartShortCircuitRepair = false
default:
// Unrecognized strategy.
return fmt.Errorf("unknown repair strategy: %v", strategy)
}

for _, n := range namespaces {
repairRange := r.namespaceRepairTimeRange(n)
blockSize := n.Options().RetentionOptions().BlockSize()
Expand All @@ -681,6 +695,13 @@ func (r *dbRepairer) Repair() error {
leastRecentlyRepairedBlockStartLastRepairTime xtime.UnixNano
)
repairRange.IterateBackward(blockSize, func(blockStart xtime.UnixNano) bool {
// Update metrics around progress of repair.
blockStartUnixSeconds := blockStart.ToTime().Unix()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: blockStart.Seconds()

r.scope.Tagged(map[string]string{
"namespace": n.ID().String(),
}).Gauge("timestamp-current-block-repair").Update(float64(blockStartUnixSeconds))

// Update state for later reporting of least recently repaired block.
repairState, ok := r.repairStatesByNs.repairStates(n.ID(), blockStart)
if ok && (leastRecentlyRepairedBlockStart.IsZero() ||
repairState.LastAttempt.Before(leastRecentlyRepairedBlockStartLastRepairTime)) {
Expand All @@ -694,7 +715,7 @@ func (r *dbRepairer) Repair() error {

// Failed or unrepair block from this point onwards.
numUnrepairedBlocks++
if hasRepairedABlockStart {
if hasRepairedABlockStart && repairBlockStartShortCircuitRepair {
// Only want to repair one namespace/blockStart per call to Repair()
// so once we've repaired a single blockStart we don't perform any
// more actual repairs although we do keep iterating so that we can
Expand Down
15 changes: 15 additions & 0 deletions src/dbnode/storage/repair/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
)

const (
defaultRepairType = DefaultRepair
defaultRepairStrategy = DefaultStrategy
// Allow repairs to progress when a single peer is down (I.E during single node failure
// or deployments).
defaultRepairConsistencyLevel = topology.ReadConsistencyLevelUnstrictMajority
Expand All @@ -52,6 +54,7 @@ var (

type options struct {
repairType Type
strategy Strategy
force bool
adminClients []client.AdminClient
repairConsistencyLevel topology.ReadConsistencyLevel
Expand All @@ -67,6 +70,8 @@ type options struct {
// NewOptions creates new bootstrap options
func NewOptions() Options {
return &options{
repairType: defaultRepairType,
strategy: defaultRepairStrategy,
repairConsistencyLevel: defaultRepairConsistencyLevel,
repairShardConcurrency: defaultRepairShardConcurrency,
repairCheckInterval: defaultRepairCheckInterval,
Expand All @@ -88,6 +93,16 @@ func (o *options) Type() Type {
return o.repairType
}

func (o *options) SetStrategy(value Strategy) Options {
opts := *o
opts.strategy = value
return &opts
}

func (o *options) Strategy() Strategy {
return o.strategy
}

func (o *options) SetForce(value bool) Options {
opts := *o
opts.force = value
Expand Down
66 changes: 66 additions & 0 deletions src/dbnode/storage/repair/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,66 @@ func (t Type) String() string {
return "unknown"
}

// Strategy defines the repair strategy.
type Strategy uint

const (
// DefaultStrategy will compare iterating backwards then on repairing a
// block needing repair it will restart from the latest block start and
// work backwards again.
// This strategy is best at keeping most recent data repaired as quickly
// as possible but when turning on repair for the first time in a cluster
// you may want to do run repairs in full sweep for a while first.
DefaultStrategy Strategy = iota
// FullSweepStrategy will compare iterating backwards and repairing
// blocks needing repair until reaching the end of retention and then only
// once reaching the end of retention to repair does the repair restart
// evaluating blocks from the most recent block starts again.
// This mode may be more ideal in clusters that have never had repair
// enabled to ensure that historical data gets repaired at least once on
// a full sweep before switching back to the default strategy.
FullSweepStrategy
)

var validStrategies = []Strategy{
DefaultStrategy,
FullSweepStrategy,
}

// UnmarshalYAML unmarshals an Type into a valid type from string.
func (t *Strategy) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
if err := unmarshal(&str); err != nil {
return err
}

// If unspecified, use default mode.
if str == "" {
*t = DefaultStrategy
return nil
}

for _, valid := range validStrategies {
if str == valid.String() {
*t = valid
return nil
}
}
return fmt.Errorf("invalid repair Strategy '%s' valid types are: %s",
str, validStrategies)
}

// String returns the bootstrap mode as a string
func (t Strategy) String() string {
switch t {
case DefaultStrategy:
return "default"
case FullSweepStrategy:
return "full_sweep"
}
return "unknown"
}

// ReplicaMetadataSlice captures a slice of block.ReplicaMetadata.
type ReplicaMetadataSlice interface {
// Add adds the metadata to the slice.
Expand Down Expand Up @@ -272,6 +332,12 @@ type Options interface {
// Type returns the type of repair to run.
Type() Type

// SetStrategy sets the repair strategy.
SetStrategy(value Strategy) Options

// Strategy returns the repair strategy.
Strategy() Strategy

// SetForce sets whether to force repairs to run for all namespaces.
SetForce(value bool) Options

Expand Down
73 changes: 47 additions & 26 deletions src/dbnode/storage/repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,6 @@ type expectedRepair struct {
}

func TestDatabaseRepairPrioritizationLogic(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

var (
rOpts = retention.NewOptions().
SetRetentionPeriod(retention.NewOptions().BlockSize() * 2)
Expand All @@ -607,22 +604,25 @@ func TestDatabaseRepairPrioritizationLogic(t *testing.T) {
require.Equal(t, blockSize, flushTimeEnd.Sub(flushTimeStart))

testCases := []struct {
title string
repairState repairStatesByNs
expectedNS1Repair expectedRepair
expectedNS2Repair expectedRepair
title string
strategy repair.Strategy
repairState repairStatesByNs
expectedNS1Repairs []expectedRepair
expectedNS2Repairs []expectedRepair
}{
{
title: "repairs most recent block if no repair state",
expectedNS1Repair: expectedRepair{
expectedRepairRange: xtime.Range{Start: flushTimeEnd, End: flushTimeEnd.Add(blockSize)},
title: "repairs most recent block if no repair state",
strategy: repair.DefaultStrategy,
expectedNS1Repairs: []expectedRepair{
{expectedRepairRange: xtime.Range{Start: flushTimeEnd, End: flushTimeEnd.Add(blockSize)}},
},
expectedNS2Repair: expectedRepair{
expectedRepairRange: xtime.Range{Start: flushTimeEnd, End: flushTimeEnd.Add(blockSize)},
expectedNS2Repairs: []expectedRepair{
{expectedRepairRange: xtime.Range{Start: flushTimeEnd, End: flushTimeEnd.Add(blockSize)}},
},
},
{
title: "repairs next unrepaired block in reverse order if some (but not all) blocks have been repaired",
title: "repairs next unrepaired block in reverse order if some (but not all) blocks have been repaired",
strategy: repair.DefaultStrategy,
repairState: repairStatesByNs{
"ns1": namespaceRepairStateByTime{
flushTimeEnd: repairState{
Expand All @@ -637,15 +637,16 @@ func TestDatabaseRepairPrioritizationLogic(t *testing.T) {
},
},
},
expectedNS1Repair: expectedRepair{
expectedRepairRange: xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)},
expectedNS1Repairs: []expectedRepair{
{expectedRepairRange: xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)}},
},
expectedNS2Repair: expectedRepair{
expectedRepairRange: xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)},
expectedNS2Repairs: []expectedRepair{
{expectedRepairRange: xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)}},
},
},
{
title: "repairs least recently repaired block if all blocks have been repaired",
title: "repairs least recently repaired block if all blocks have been repaired",
strategy: repair.DefaultStrategy,
repairState: repairStatesByNs{
"ns1": namespaceRepairStateByTime{
flushTimeStart: repairState{
Expand All @@ -668,19 +669,35 @@ func TestDatabaseRepairPrioritizationLogic(t *testing.T) {
},
},
},
expectedNS1Repair: expectedRepair{
expectedRepairRange: xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)},
expectedNS1Repairs: []expectedRepair{
{expectedRepairRange: xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)}},
},
expectedNS2Repair: expectedRepair{
expectedRepairRange: xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)},
expectedNS2Repairs: []expectedRepair{
{expectedRepairRange: xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)}},
},
},
{
title: "repairs all blocks block if no repair state with full sweep strategy",
strategy: repair.FullSweepStrategy,
expectedNS1Repairs: []expectedRepair{
{expectedRepairRange: xtime.Range{Start: flushTimeEnd, End: flushTimeEnd.Add(blockSize)}},
{expectedRepairRange: xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)}},
},
expectedNS2Repairs: []expectedRepair{
{expectedRepairRange: xtime.Range{Start: flushTimeEnd, End: flushTimeEnd.Add(blockSize)}},
{expectedRepairRange: xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)}},
},
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.title, func(t *testing.T) {
opts := DefaultTestOptions().SetRepairOptions(testRepairOptions(ctrl))
ctrl := xtest.NewController(t)
defer ctrl.Finish()

repairOpts := testRepairOptions(ctrl).SetStrategy(tc.strategy)
opts := DefaultTestOptions().SetRepairOptions(repairOpts)
mockDatabase := NewMockdatabase(ctrl)

databaseRepairer, err := newDatabaseRepairer(mockDatabase, opts)
Expand All @@ -707,8 +724,12 @@ func TestDatabaseRepairPrioritizationLogic(t *testing.T) {
ns1.EXPECT().ID().Return(ident.StringID("ns1")).AnyTimes()
ns2.EXPECT().ID().Return(ident.StringID("ns2")).AnyTimes()

ns1.EXPECT().Repair(gomock.Any(), tc.expectedNS1Repair.expectedRepairRange, NamespaceRepairOptions{})
ns2.EXPECT().Repair(gomock.Any(), tc.expectedNS2Repair.expectedRepairRange, NamespaceRepairOptions{})
for _, expected := range tc.expectedNS1Repairs {
ns1.EXPECT().Repair(gomock.Any(), expected.expectedRepairRange, NamespaceRepairOptions{})
}
for _, expected := range tc.expectedNS2Repairs {
ns2.EXPECT().Repair(gomock.Any(), expected.expectedRepairRange, NamespaceRepairOptions{})
}

mockDatabase.EXPECT().OwnedNamespaces().Return(namespaces, nil)
require.Nil(t, repairer.Repair())
Expand All @@ -718,7 +739,7 @@ func TestDatabaseRepairPrioritizationLogic(t *testing.T) {

// Database repairer repairs blocks in decreasing time ranges for each namespace. If database repairer fails to
// repair a time range of a namespace then instead of skipping repair of all past time ranges of that namespace, test
// that database repaier tries to repair the past corrupt time range of that namespace.
// that database repairer tries to repair the past corrupt time range of that namespace.
func TestDatabaseRepairSkipsPoisonShard(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down