Skip to content

Commit

Permalink
Refactor bootstrapper AvailableData and AvailableIndex to optionally …
Browse files Browse the repository at this point in the history
…return an error (#920)
  • Loading branch information
richardartoul authored Oct 3, 2018
1 parent d1891c7 commit acc491b
Show file tree
Hide file tree
Showing 19 changed files with 189 additions and 130 deletions.
20 changes: 10 additions & 10 deletions src/dbnode/integration/bootstrap_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func newTestBootstrapperSource(
if opts.availableData != nil {
src.availableData = opts.availableData
} else {
src.availableData = func(_ namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, _ bootstrap.RunOptions) result.ShardTimeRanges {
return shardsTimeRanges
src.availableData = func(_ namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, _ bootstrap.RunOptions) (result.ShardTimeRanges, error) {
return shardsTimeRanges, nil
}
}

Expand All @@ -66,8 +66,8 @@ func newTestBootstrapperSource(
if opts.availableIndex != nil {
src.availableIndex = opts.availableIndex
} else {
src.availableIndex = func(_ namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, _ bootstrap.RunOptions) result.ShardTimeRanges {
return shardsTimeRanges
src.availableIndex = func(_ namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, _ bootstrap.RunOptions) (result.ShardTimeRanges, error) {
return shardsTimeRanges, nil
}
}

Expand Down Expand Up @@ -110,19 +110,19 @@ type testBootstrapper struct {

type testBootstrapperSourceOptions struct {
can func(bootstrap.Strategy) bool
availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) result.ShardTimeRanges
availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error)
readData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.DataBootstrapResult, error)
availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) result.ShardTimeRanges
availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error)
readIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.IndexBootstrapResult, error)
}

var _ bootstrap.Source = &testBootstrapperSource{}

type testBootstrapperSource struct {
can func(bootstrap.Strategy) bool
availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) result.ShardTimeRanges
availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error)
readData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.DataBootstrapResult, error)
availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) result.ShardTimeRanges
availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error)
readIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.IndexBootstrapResult, error)
}

Expand All @@ -134,7 +134,7 @@ func (t testBootstrapperSource) AvailableData(
ns namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
) result.ShardTimeRanges {
) (result.ShardTimeRanges, error) {
return t.availableData(ns, shardsTimeRanges, runOpts)
}

Expand All @@ -150,7 +150,7 @@ func (t testBootstrapperSource) AvailableIndex(
ns namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
) result.ShardTimeRanges {
) (result.ShardTimeRanges, error) {
return t.availableIndex(ns, shardsTimeRanges, runOpts)
}

Expand Down
10 changes: 7 additions & 3 deletions src/dbnode/integration/peers_bootstrap_none_available_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestPeersBootstrapNoneAvailable(t *testing.T) {
maxShard := uint32(opts.NumShards()) - uint32(1)
start := []services.ServiceInstance{
node(t, 0, newClusterShardsRange(minShard, maxShard, shard.Initializing)),
node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Unknown)),
node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Initializing)),
}

hostShardSets := []topology.HostShardSet{}
Expand Down Expand Up @@ -106,11 +106,15 @@ func TestPeersBootstrapNoneAvailable(t *testing.T) {

// Start both servers "simultaneously"
go func() {
require.NoError(t, setups[0].startServer())
if err := setups[0].startServer(); err != nil {
panic(err)
}
serversAreUp.Done()
}()
go func() {
require.NoError(t, setups[1].startServer())
if err := setups[1].startServer(); err != nil {
panic(err)
}
serversAreUp.Done()
}()

Expand Down
10 changes: 6 additions & 4 deletions src/dbnode/storage/bootstrap/bootstrap_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ func (b baseBootstrapper) runBootstrapStep(
totalRanges result.ShardTimeRanges,
step bootstrapStep,
) error {
prepareResult, err := step.prepare(totalRanges)
if err != nil {
return err
}

var (
prepareResult = step.prepare(totalRanges)
wg sync.WaitGroup
currStatus, nextStatus bootstrapStepStatus
currErr, nextErr error
Expand Down
11 changes: 8 additions & 3 deletions src/dbnode/storage/bootstrap/bootstrapper/base_data_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ func newBootstrapDataStep(

func (s *bootstrapData) prepare(
totalRanges result.ShardTimeRanges,
) bootstrapStepPreparedResult {
return bootstrapStepPreparedResult{
currAvailable: s.curr.AvailableData(s.namespace, totalRanges, s.opts),
) (bootstrapStepPreparedResult, error) {
currAvailable, err := s.curr.AvailableData(s.namespace, totalRanges, s.opts)
if err != nil {
return bootstrapStepPreparedResult{}, err
}

return bootstrapStepPreparedResult{
currAvailable: currAvailable,
}, nil
}

func (s *bootstrapData) runCurrStep(
Expand Down
11 changes: 8 additions & 3 deletions src/dbnode/storage/bootstrap/bootstrapper/base_index_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ func newBootstrapIndexStep(

func (s *bootstrapIndex) prepare(
totalRanges result.ShardTimeRanges,
) bootstrapStepPreparedResult {
return bootstrapStepPreparedResult{
currAvailable: s.curr.AvailableIndex(s.namespace, totalRanges, s.opts),
) (bootstrapStepPreparedResult, error) {
currAvailable, err := s.curr.AvailableIndex(s.namespace, totalRanges, s.opts)
if err != nil {
return bootstrapStepPreparedResult{}, err
}

return bootstrapStepPreparedResult{
currAvailable: currAvailable,
}, nil
}

func (s *bootstrapIndex) runCurrStep(
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/base_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

type bootstrapStep interface {
prepare(totalRanges result.ShardTimeRanges) bootstrapStepPreparedResult
prepare(totalRanges result.ShardTimeRanges) (bootstrapStepPreparedResult, error)
runCurrStep(targetRanges result.ShardTimeRanges) (bootstrapStepStatus, error)
runNextStep(targetRanges result.ShardTimeRanges) (bootstrapStepStatus, error)
mergeResults(totalUnfulfilled result.ShardTimeRanges)
Expand Down
12 changes: 7 additions & 5 deletions src/dbnode/storage/bootstrap/bootstrapper/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestBaseBootstrapperCurrentNoUnfulfilled(t *testing.T) {

source.EXPECT().
AvailableData(testNs, targetRanges, testDefaultRunOpts).
Return(targetRanges)
Return(targetRanges, nil)
source.EXPECT().
ReadData(testNs, targetRanges, testDefaultRunOpts).
Return(result, nil)
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestBaseBootstrapperCurrentSomeUnfulfilled(t *testing.T) {

source.EXPECT().
AvailableData(testNs, targetRanges, testDefaultRunOpts).
Return(targetRanges)
Return(targetRanges, nil)
source.EXPECT().
ReadData(testNs, targetRanges, testDefaultRunOpts).
Return(currResult, nil)
Expand Down Expand Up @@ -331,7 +331,7 @@ func testBasebootstrapperNext(t *testing.T, nextUnfulfilled result.ShardTimeRang

source.EXPECT().
AvailableData(testNs, targetRanges, testDefaultRunOpts).
Return(nil)
Return(nil, nil)
source.EXPECT().
ReadData(testNs, shardTimeRangesMatcher{nil},
testDefaultRunOpts).
Expand Down Expand Up @@ -407,7 +407,9 @@ func TestBaseBootstrapperBoth(t *testing.T) {
})

source.EXPECT().Can(bootstrap.BootstrapParallel).Return(true)
source.EXPECT().AvailableData(testNs, targetRanges, testDefaultRunOpts).Return(availableRanges)
source.EXPECT().
AvailableData(testNs, targetRanges, testDefaultRunOpts).
Return(availableRanges, nil)
source.EXPECT().
ReadData(testNs, shardTimeRangesMatcher{availableRanges},
testDefaultRunOpts).
Expand Down Expand Up @@ -475,7 +477,7 @@ func TestBaseBootstrapperIndexHalfCurrentHalfNext(t *testing.T) {
source.EXPECT().Can(bootstrap.BootstrapParallel).Return(false)
source.EXPECT().
AvailableIndex(testNs, shardTimeRangesMatcher{targetRanges}, testDefaultRunOpts).
Return(firstHalf)
Return(firstHalf, nil)
source.EXPECT().
ReadIndex(testNs, shardTimeRangesMatcher{firstHalf}, testDefaultRunOpts).
Return(currResult, nil)
Expand Down
21 changes: 8 additions & 13 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *commitLogSource) AvailableData(
ns namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
) result.ShardTimeRanges {
) (result.ShardTimeRanges, error) {
return s.availability(ns, shardsTimeRanges, runOpts)
}

Expand Down Expand Up @@ -1250,7 +1250,7 @@ func (s *commitLogSource) AvailableIndex(
ns namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
) result.ShardTimeRanges {
) (result.ShardTimeRanges, error) {
return s.availability(ns, shardsTimeRanges, runOpts)
}

Expand Down Expand Up @@ -1432,7 +1432,7 @@ func (s *commitLogSource) availability(
ns namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
) result.ShardTimeRanges {
) (result.ShardTimeRanges, error) {
var (
topoState = runOpts.InitialTopologyState()
availableShardTimeRanges = result.ShardTimeRanges{}
Expand All @@ -1449,13 +1449,11 @@ func (s *commitLogSource) availability(

originHostShardState, ok := hostShardStates[topology.HostID(topoState.Origin.ID())]
if !ok {
// TODO(rartoul): Make this a hard error once we refactor the interface to support
// returning errors.
errMsg := fmt.Sprintf("initial topology state does not contain shard state for origin node and shard: %d", shardIDUint)
iOpts := s.opts.CommitLogOptions().InstrumentOptions()
invariantLogger := instrument.EmitInvariantViolationAndGetLogger(iOpts)
invariantLogger.Errorf(
"initial topology state does not contain shard state for origin node and shard: %d", shardIDUint)
continue
invariantLogger.Error(errMsg)
return nil, errors.New(errMsg)
}

originShardState := originHostShardState.ShardState
Expand Down Expand Up @@ -1484,14 +1482,11 @@ func (s *commitLogSource) availability(
case shard.Unknown:
fallthrough
default:
// TODO(rartoul): Make this a hard error once we refactor the interface to support
// returning errors.
s.log.Errorf("unknown shard state: %v", originShardState)
return result.ShardTimeRanges{}
return result.ShardTimeRanges{}, fmt.Errorf("unknown shard state: %v", originShardState)
}
}

return availableShardTimeRanges
return availableShardTimeRanges, nil
}

func newReadSeriesPredicate(ns namespace.Metadata) commitlog.SeriesFilterPredicate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ func testOptions() Options {
}

func TestAvailableEmptyRangeError(t *testing.T) {
opts := testDefaultOpts
src := newCommitLogSource(opts, fs.Inspection{})
res := src.AvailableData(testNsMetadata(t), result.ShardTimeRanges{}, testDefaultRunOpts)
var (
opts = testDefaultOpts
src = newCommitLogSource(opts, fs.Inspection{})
res, err = src.AvailableData(testNsMetadata(t), result.ShardTimeRanges{}, testDefaultRunOpts)
)
require.NoError(t, err)
require.True(t, result.ShardTimeRanges{}.Equal(res))
}

Expand Down
26 changes: 19 additions & 7 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package commitlog

import (
"errors"
"testing"
"time"

Expand Down Expand Up @@ -61,6 +62,7 @@ func TestAvailableData(t *testing.T) {
topoState *topology.StateSnapshot
shardsTimeRangesToBootstrap result.ShardTimeRanges
expectedAvailableShardsTimeRanges result.ShardTimeRanges
expectedErr error
}{
{
title: "Single node - Shard initializing",
Expand All @@ -77,6 +79,7 @@ func TestAvailableData(t *testing.T) {
}),
shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap,
expectedAvailableShardsTimeRanges: result.ShardTimeRanges{},
expectedErr: errors.New("unknown shard state: Unknown"),
},
{
title: "Single node - Shard leaving",
Expand Down Expand Up @@ -116,17 +119,26 @@ func TestAvailableData(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.title, func(t *testing.T) {

var (
src = newCommitLogSource(testOptions(), fs.Inspection{})
runOpts = testDefaultRunOpts.SetInitialTopologyState(tc.topoState)
dataRes = src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts)
src = newCommitLogSource(testOptions(), fs.Inspection{})
runOpts = testDefaultRunOpts.SetInitialTopologyState(tc.topoState)
dataRes, err = src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts)
)

require.Equal(t, tc.expectedAvailableShardsTimeRanges, dataRes)
if tc.expectedErr != nil {
require.Equal(t, err, tc.expectedErr)
} else {
require.NoError(t, err)
require.Equal(t, tc.expectedAvailableShardsTimeRanges, dataRes)
}

indexRes := src.AvailableIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts)
require.Equal(t, tc.expectedAvailableShardsTimeRanges, indexRes)
indexRes, err := src.AvailableIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts)
if tc.expectedErr != nil {
require.Equal(t, err, tc.expectedErr)
} else {
require.NoError(t, err)
require.Equal(t, tc.expectedAvailableShardsTimeRanges, indexRes)
}
})
}
}
8 changes: 4 additions & 4 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *fileSystemSource) AvailableData(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
) result.ShardTimeRanges {
) (result.ShardTimeRanges, error) {
return s.availability(md, shardsTimeRanges)
}

Expand All @@ -146,7 +146,7 @@ func (s *fileSystemSource) AvailableIndex(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
) result.ShardTimeRanges {
) (result.ShardTimeRanges, error) {
return s.availability(md, shardsTimeRanges)
}

Expand All @@ -165,12 +165,12 @@ func (s *fileSystemSource) ReadIndex(
func (s *fileSystemSource) availability(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
) result.ShardTimeRanges {
) (result.ShardTimeRanges, error) {
result := make(map[uint32]xtime.Ranges)
for shard, ranges := range shardsTimeRanges {
result[shard] = s.shardAvailability(md.ID(), shard, ranges)
}
return result
return result, nil
}

func (s *fileSystemSource) shardAvailability(
Expand Down
Loading

0 comments on commit acc491b

Please sign in to comment.