From acc491b3ed17aa0a38d4804c691db1988bac3c7a Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 3 Oct 2018 10:40:00 -0400 Subject: [PATCH] Refactor bootstrapper AvailableData and AvailableIndex to optionally return an error (#920) --- src/dbnode/integration/bootstrap_helpers.go | 20 +++---- .../peers_bootstrap_none_available_test.go | 10 +++- .../storage/bootstrap/bootstrap_mock.go | 10 ++-- .../storage/bootstrap/bootstrapper/base.go | 6 +- .../bootstrap/bootstrapper/base_data_step.go | 11 +++- .../bootstrap/bootstrapper/base_index_step.go | 11 +++- .../bootstrap/bootstrapper/base_step.go | 2 +- .../bootstrap/bootstrapper/base_test.go | 12 ++-- .../bootstrapper/commitlog/source.go | 21 +++---- .../commitlog/source_data_test.go | 9 ++- .../bootstrapper/commitlog/source_test.go | 26 ++++++--- .../bootstrap/bootstrapper/fs/source.go | 8 +-- .../bootstrapper/fs/source_data_test.go | 18 ++++-- .../bootstrap/bootstrapper/peers/source.go | 23 ++++---- .../bootstrapper/peers/source_data_test.go | 11 ++-- .../bootstrapper/peers/source_test.go | 27 ++++++--- .../bootstrapper/uninitialized/source.go | 35 ++++++------ .../bootstrapper/uninitialized/source_test.go | 55 +++++++++++-------- src/dbnode/storage/bootstrap/types.go | 4 +- 19 files changed, 189 insertions(+), 130 deletions(-) diff --git a/src/dbnode/integration/bootstrap_helpers.go b/src/dbnode/integration/bootstrap_helpers.go index 9e8dcb2d48..67be4f33cf 100644 --- a/src/dbnode/integration/bootstrap_helpers.go +++ b/src/dbnode/integration/bootstrap_helpers.go @@ -50,8 +50,8 @@ func newTestBootstrapperSource( if opts.availableData != nil { src.availableData = opts.availableData } else { - src.availableData = func(_ namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, _ bootstrap.RunOptions) result.ShardTimeRanges { - return shardsTimeRanges + src.availableData = func(_ namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, _ bootstrap.RunOptions) (result.ShardTimeRanges, error) { + return shardsTimeRanges, nil } } @@ -66,8 +66,8 @@ func newTestBootstrapperSource( if opts.availableIndex != nil { src.availableIndex = opts.availableIndex } else { - src.availableIndex = func(_ namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, _ bootstrap.RunOptions) result.ShardTimeRanges { - return shardsTimeRanges + src.availableIndex = func(_ namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, _ bootstrap.RunOptions) (result.ShardTimeRanges, error) { + return shardsTimeRanges, nil } } @@ -110,9 +110,9 @@ type testBootstrapper struct { type testBootstrapperSourceOptions struct { can func(bootstrap.Strategy) bool - availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) result.ShardTimeRanges + availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error) readData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.DataBootstrapResult, error) - availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) result.ShardTimeRanges + availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error) readIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.IndexBootstrapResult, error) } @@ -120,9 +120,9 @@ var _ bootstrap.Source = &testBootstrapperSource{} type testBootstrapperSource struct { can func(bootstrap.Strategy) bool - availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) result.ShardTimeRanges + availableData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error) readData func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.DataBootstrapResult, error) - availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) result.ShardTimeRanges + availableIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.ShardTimeRanges, error) readIndex func(namespace.Metadata, result.ShardTimeRanges, bootstrap.RunOptions) (result.IndexBootstrapResult, error) } @@ -134,7 +134,7 @@ func (t testBootstrapperSource) AvailableData( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { return t.availableData(ns, shardsTimeRanges, runOpts) } @@ -150,7 +150,7 @@ func (t testBootstrapperSource) AvailableIndex( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { return t.availableIndex(ns, shardsTimeRanges, runOpts) } diff --git a/src/dbnode/integration/peers_bootstrap_none_available_test.go b/src/dbnode/integration/peers_bootstrap_none_available_test.go index b9426eba56..720f5ae5d8 100644 --- a/src/dbnode/integration/peers_bootstrap_none_available_test.go +++ b/src/dbnode/integration/peers_bootstrap_none_available_test.go @@ -63,7 +63,7 @@ func TestPeersBootstrapNoneAvailable(t *testing.T) { maxShard := uint32(opts.NumShards()) - uint32(1) start := []services.ServiceInstance{ node(t, 0, newClusterShardsRange(minShard, maxShard, shard.Initializing)), - node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Unknown)), + node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Initializing)), } hostShardSets := []topology.HostShardSet{} @@ -106,11 +106,15 @@ func TestPeersBootstrapNoneAvailable(t *testing.T) { // Start both servers "simultaneously" go func() { - require.NoError(t, setups[0].startServer()) + if err := setups[0].startServer(); err != nil { + panic(err) + } serversAreUp.Done() }() go func() { - require.NoError(t, setups[1].startServer()) + if err := setups[1].startServer(); err != nil { + panic(err) + } serversAreUp.Done() }() diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index 20e63574c1..6d148b5e7b 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -465,10 +465,11 @@ func (mr *MockSourceMockRecorder) Can(strategy interface{}) *gomock.Call { } // AvailableData mocks base method -func (m *MockSource) AvailableData(ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts RunOptions) result.ShardTimeRanges { +func (m *MockSource) AvailableData(ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts RunOptions) (result.ShardTimeRanges, error) { ret := m.ctrl.Call(m, "AvailableData", ns, shardsTimeRanges, runOpts) ret0, _ := ret[0].(result.ShardTimeRanges) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // AvailableData indicates an expected call of AvailableData @@ -490,10 +491,11 @@ func (mr *MockSourceMockRecorder) ReadData(ns, shardsTimeRanges, runOpts interfa } // AvailableIndex mocks base method -func (m *MockSource) AvailableIndex(ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, opts RunOptions) result.ShardTimeRanges { +func (m *MockSource) AvailableIndex(ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, opts RunOptions) (result.ShardTimeRanges, error) { ret := m.ctrl.Call(m, "AvailableIndex", ns, shardsTimeRanges, opts) ret0, _ := ret[0].(result.ShardTimeRanges) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // AvailableIndex indicates an expected call of AvailableIndex diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base.go b/src/dbnode/storage/bootstrap/bootstrapper/base.go index 504e15b019..00ba21c8c9 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base.go @@ -115,8 +115,12 @@ func (b baseBootstrapper) runBootstrapStep( totalRanges result.ShardTimeRanges, step bootstrapStep, ) error { + prepareResult, err := step.prepare(totalRanges) + if err != nil { + return err + } + var ( - prepareResult = step.prepare(totalRanges) wg sync.WaitGroup currStatus, nextStatus bootstrapStepStatus currErr, nextErr error diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base_data_step.go b/src/dbnode/storage/bootstrap/bootstrapper/base_data_step.go index 204b0bc301..ac8b24f42c 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base_data_step.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base_data_step.go @@ -58,10 +58,15 @@ func newBootstrapDataStep( func (s *bootstrapData) prepare( totalRanges result.ShardTimeRanges, -) bootstrapStepPreparedResult { - return bootstrapStepPreparedResult{ - currAvailable: s.curr.AvailableData(s.namespace, totalRanges, s.opts), +) (bootstrapStepPreparedResult, error) { + currAvailable, err := s.curr.AvailableData(s.namespace, totalRanges, s.opts) + if err != nil { + return bootstrapStepPreparedResult{}, err } + + return bootstrapStepPreparedResult{ + currAvailable: currAvailable, + }, nil } func (s *bootstrapData) runCurrStep( diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base_index_step.go b/src/dbnode/storage/bootstrap/bootstrapper/base_index_step.go index 8d45a9cac1..1fa67ef7d9 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base_index_step.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base_index_step.go @@ -58,10 +58,15 @@ func newBootstrapIndexStep( func (s *bootstrapIndex) prepare( totalRanges result.ShardTimeRanges, -) bootstrapStepPreparedResult { - return bootstrapStepPreparedResult{ - currAvailable: s.curr.AvailableIndex(s.namespace, totalRanges, s.opts), +) (bootstrapStepPreparedResult, error) { + currAvailable, err := s.curr.AvailableIndex(s.namespace, totalRanges, s.opts) + if err != nil { + return bootstrapStepPreparedResult{}, err } + + return bootstrapStepPreparedResult{ + currAvailable: currAvailable, + }, nil } func (s *bootstrapIndex) runCurrStep( diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base_step.go b/src/dbnode/storage/bootstrap/bootstrapper/base_step.go index 77a4816b75..31514b2571 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base_step.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base_step.go @@ -26,7 +26,7 @@ import ( ) type bootstrapStep interface { - prepare(totalRanges result.ShardTimeRanges) bootstrapStepPreparedResult + prepare(totalRanges result.ShardTimeRanges) (bootstrapStepPreparedResult, error) runCurrStep(targetRanges result.ShardTimeRanges) (bootstrapStepStatus, error) runNextStep(targetRanges result.ShardTimeRanges) (bootstrapStepStatus, error) mergeResults(totalUnfulfilled result.ShardTimeRanges) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go index df5762d9d3..e510204ead 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go @@ -256,7 +256,7 @@ func TestBaseBootstrapperCurrentNoUnfulfilled(t *testing.T) { source.EXPECT(). AvailableData(testNs, targetRanges, testDefaultRunOpts). - Return(targetRanges) + Return(targetRanges, nil) source.EXPECT(). ReadData(testNs, targetRanges, testDefaultRunOpts). Return(result, nil) @@ -294,7 +294,7 @@ func TestBaseBootstrapperCurrentSomeUnfulfilled(t *testing.T) { source.EXPECT(). AvailableData(testNs, targetRanges, testDefaultRunOpts). - Return(targetRanges) + Return(targetRanges, nil) source.EXPECT(). ReadData(testNs, targetRanges, testDefaultRunOpts). Return(currResult, nil) @@ -331,7 +331,7 @@ func testBasebootstrapperNext(t *testing.T, nextUnfulfilled result.ShardTimeRang source.EXPECT(). AvailableData(testNs, targetRanges, testDefaultRunOpts). - Return(nil) + Return(nil, nil) source.EXPECT(). ReadData(testNs, shardTimeRangesMatcher{nil}, testDefaultRunOpts). @@ -407,7 +407,9 @@ func TestBaseBootstrapperBoth(t *testing.T) { }) source.EXPECT().Can(bootstrap.BootstrapParallel).Return(true) - source.EXPECT().AvailableData(testNs, targetRanges, testDefaultRunOpts).Return(availableRanges) + source.EXPECT(). + AvailableData(testNs, targetRanges, testDefaultRunOpts). + Return(availableRanges, nil) source.EXPECT(). ReadData(testNs, shardTimeRangesMatcher{availableRanges}, testDefaultRunOpts). @@ -475,7 +477,7 @@ func TestBaseBootstrapperIndexHalfCurrentHalfNext(t *testing.T) { source.EXPECT().Can(bootstrap.BootstrapParallel).Return(false) source.EXPECT(). AvailableIndex(testNs, shardTimeRangesMatcher{targetRanges}, testDefaultRunOpts). - Return(firstHalf) + Return(firstHalf, nil) source.EXPECT(). ReadIndex(testNs, shardTimeRangesMatcher{firstHalf}, testDefaultRunOpts). Return(currResult, nil) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 9bc97d4c5e..fef5db801f 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -105,7 +105,7 @@ func (s *commitLogSource) AvailableData( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { return s.availability(ns, shardsTimeRanges, runOpts) } @@ -1250,7 +1250,7 @@ func (s *commitLogSource) AvailableIndex( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { return s.availability(ns, shardsTimeRanges, runOpts) } @@ -1432,7 +1432,7 @@ func (s *commitLogSource) availability( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { var ( topoState = runOpts.InitialTopologyState() availableShardTimeRanges = result.ShardTimeRanges{} @@ -1449,13 +1449,11 @@ func (s *commitLogSource) availability( originHostShardState, ok := hostShardStates[topology.HostID(topoState.Origin.ID())] if !ok { - // TODO(rartoul): Make this a hard error once we refactor the interface to support - // returning errors. + errMsg := fmt.Sprintf("initial topology state does not contain shard state for origin node and shard: %d", shardIDUint) iOpts := s.opts.CommitLogOptions().InstrumentOptions() invariantLogger := instrument.EmitInvariantViolationAndGetLogger(iOpts) - invariantLogger.Errorf( - "initial topology state does not contain shard state for origin node and shard: %d", shardIDUint) - continue + invariantLogger.Error(errMsg) + return nil, errors.New(errMsg) } originShardState := originHostShardState.ShardState @@ -1484,14 +1482,11 @@ func (s *commitLogSource) availability( case shard.Unknown: fallthrough default: - // TODO(rartoul): Make this a hard error once we refactor the interface to support - // returning errors. - s.log.Errorf("unknown shard state: %v", originShardState) - return result.ShardTimeRanges{} + return result.ShardTimeRanges{}, fmt.Errorf("unknown shard state: %v", originShardState) } } - return availableShardTimeRanges + return availableShardTimeRanges, nil } func newReadSeriesPredicate(ns namespace.Metadata) commitlog.SeriesFilterPredicate { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go index 9a68b8a2ee..996626059c 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go @@ -86,9 +86,12 @@ func testOptions() Options { } func TestAvailableEmptyRangeError(t *testing.T) { - opts := testDefaultOpts - src := newCommitLogSource(opts, fs.Inspection{}) - res := src.AvailableData(testNsMetadata(t), result.ShardTimeRanges{}, testDefaultRunOpts) + var ( + opts = testDefaultOpts + src = newCommitLogSource(opts, fs.Inspection{}) + res, err = src.AvailableData(testNsMetadata(t), result.ShardTimeRanges{}, testDefaultRunOpts) + ) + require.NoError(t, err) require.True(t, result.ShardTimeRanges{}.Equal(res)) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go index 96cbc56485..605debddb1 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_test.go @@ -21,6 +21,7 @@ package commitlog import ( + "errors" "testing" "time" @@ -61,6 +62,7 @@ func TestAvailableData(t *testing.T) { topoState *topology.StateSnapshot shardsTimeRangesToBootstrap result.ShardTimeRanges expectedAvailableShardsTimeRanges result.ShardTimeRanges + expectedErr error }{ { title: "Single node - Shard initializing", @@ -77,6 +79,7 @@ func TestAvailableData(t *testing.T) { }), shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedErr: errors.New("unknown shard state: Unknown"), }, { title: "Single node - Shard leaving", @@ -116,17 +119,26 @@ func TestAvailableData(t *testing.T) { for _, tc := range testCases { t.Run(tc.title, func(t *testing.T) { - var ( - src = newCommitLogSource(testOptions(), fs.Inspection{}) - runOpts = testDefaultRunOpts.SetInitialTopologyState(tc.topoState) - dataRes = src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + src = newCommitLogSource(testOptions(), fs.Inspection{}) + runOpts = testDefaultRunOpts.SetInitialTopologyState(tc.topoState) + dataRes, err = src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) ) - require.Equal(t, tc.expectedAvailableShardsTimeRanges, dataRes) + if tc.expectedErr != nil { + require.Equal(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, tc.expectedAvailableShardsTimeRanges, dataRes) + } - indexRes := src.AvailableIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) - require.Equal(t, tc.expectedAvailableShardsTimeRanges, indexRes) + indexRes, err := src.AvailableIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + if tc.expectedErr != nil { + require.Equal(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, tc.expectedAvailableShardsTimeRanges, indexRes) + } }) } } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index 1dbd78604e..fa95913635 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -126,7 +126,7 @@ func (s *fileSystemSource) AvailableData( md namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { return s.availability(md, shardsTimeRanges) } @@ -146,7 +146,7 @@ func (s *fileSystemSource) AvailableIndex( md namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { return s.availability(md, shardsTimeRanges) } @@ -165,12 +165,12 @@ func (s *fileSystemSource) ReadIndex( func (s *fileSystemSource) availability( md namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { result := make(map[uint32]xtime.Ranges) for shard, ranges := range shardsTimeRanges { result[shard] = s.shardAvailability(md.ID(), shard, ranges) } - return result + return result, nil } func (s *fileSystemSource) shardAvailability( diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go index 3abdafe67c..f3cd348adf 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go @@ -246,22 +246,24 @@ func validateTimeRanges(t *testing.T, tr xtime.Ranges, expected xtime.Ranges) { func TestAvailableEmptyRangeError(t *testing.T) { src := newFileSystemSource(newTestOptions("foo")) - res := src.AvailableData( + res, err := src.AvailableData( testNsMetadata(t), map[uint32]xtime.Ranges{0: xtime.Ranges{}}, testDefaultRunOpts, ) + require.NoError(t, err) require.NotNil(t, res) require.True(t, res.IsEmpty()) } func TestAvailablePatternError(t *testing.T) { src := newFileSystemSource(newTestOptions("[[")) - res := src.AvailableData( + res, err := src.AvailableData( testNsMetadata(t), testShardTimeRanges(), testDefaultRunOpts, ) + require.NoError(t, err) require.NotNil(t, res) require.True(t, res.IsEmpty()) } @@ -278,11 +280,12 @@ func TestAvailableReadInfoError(t *testing.T) { writeInfoFile(t, dir, testNs1ID, shard, testStart, []byte{0x1, 0x2}) src := newFileSystemSource(newTestOptions(dir)) - res := src.AvailableData( + res, err := src.AvailableData( testNsMetadata(t), testShardTimeRanges(), testDefaultRunOpts, ) + require.NoError(t, err) require.NotNil(t, res) require.True(t, res.IsEmpty()) } @@ -299,11 +302,12 @@ func TestAvailableDigestOfDigestMismatch(t *testing.T) { writeDigestFile(t, dir, testNs1ID, shard, testStart, nil) src := newFileSystemSource(newTestOptions(dir)) - res := src.AvailableData( + res, err := src.AvailableData( testNsMetadata(t), testShardTimeRanges(), testDefaultRunOpts, ) + require.NoError(t, err) require.NotNil(t, res) require.True(t, res.IsEmpty()) } @@ -316,11 +320,12 @@ func TestAvailableTimeRangeFilter(t *testing.T) { writeGoodFiles(t, dir, testNs1ID, shard) src := newFileSystemSource(newTestOptions(dir)) - res := src.AvailableData( + res, err := src.AvailableData( testNsMetadata(t), testShardTimeRanges(), testDefaultRunOpts, ) + require.NoError(t, err) require.NotNil(t, res) require.Equal(t, 1, len(res)) require.NotNil(t, res[testShard]) @@ -341,11 +346,12 @@ func TestAvailableTimeRangePartialError(t *testing.T) { writeInfoFile(t, dir, testNs1ID, shard, testStart.Add(4*time.Hour), []byte{0x1, 0x2}) src := newFileSystemSource(newTestOptions(dir)) - res := src.AvailableData( + res, err := src.AvailableData( testNsMetadata(t), testShardTimeRanges(), testDefaultRunOpts, ) + require.NoError(t, err) require.NotNil(t, res) require.Equal(t, 1, len(res)) require.NotNil(t, res[testShard]) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index a04768de3b..4a33fbf6ac 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -82,9 +82,10 @@ func (s *peersSource) AvailableData( nsMetadata namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { - // TODO: Call validateRunOpts here when we modify this interface - // to support returning errors. +) (result.ShardTimeRanges, error) { + if err := s.validateRunOpts(runOpts); err != nil { + return nil, err + } return s.peerAvailability(nsMetadata, shardsTimeRanges, runOpts) } @@ -550,9 +551,10 @@ func (s *peersSource) AvailableIndex( nsMetadata namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { - // TODO: Call validateRunOpts here when we modify this interface - // to support returning errors. +) (result.ShardTimeRanges, error) { + if err := s.validateRunOpts(runOpts); err != nil { + return nil, err + } return s.peerAvailability(nsMetadata, shardsTimeRanges, runOpts) } @@ -711,7 +713,7 @@ func (s *peersSource) peerAvailability( nsMetadata namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { var ( peerAvailabilityByShard = map[topology.ShardID]*shardPeerAvailability{} initialTopologyState = runOpts.InitialTopologyState() @@ -753,10 +755,7 @@ func (s *peersSource) peerAvailability( case shard.Unknown: fallthrough default: - // TODO(rartoul): Make this a hard error once we refactor the interface to support - // returning errors. - s.log.Errorf("unknown shard state: %v", shardState) - return result.ShardTimeRanges{} + return nil, fmt.Errorf("unknown shard state: %v", shardState) } } } @@ -791,7 +790,7 @@ func (s *peersSource) peerAvailability( availableShardTimeRanges[shardIDUint] = shardsTimeRanges[shardIDUint] } - return availableShardTimeRanges + return availableShardTimeRanges, nil } func (s *peersSource) markIndexResultErrorAsUnfulfilled( diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go index 05a39c4c68..dc8c1fa499 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go @@ -131,13 +131,14 @@ func TestPeersSourceEmptyShardTimeRanges(t *testing.T) { target = result.ShardTimeRanges{} runOpts = testDefaultRunOpts.SetInitialTopologyState(&topology.StateSnapshot{}) ) - available := src.AvailableData(nsMetdata, target, runOpts) - assert.Equal(t, target, available) + available, err := src.AvailableData(nsMetdata, target, runOpts) + require.NoError(t, err) + require.Equal(t, target, available) r, err := src.ReadData(nsMetdata, target, testDefaultRunOpts) - assert.NoError(t, err) - assert.Equal(t, 0, len(r.ShardResults())) - assert.True(t, r.Unfulfilled().IsEmpty()) + require.NoError(t, err) + require.Equal(t, 0, len(r.ShardResults())) + require.True(t, r.Unfulfilled().IsEmpty()) } func TestPeersSourceReturnsErrorForAdminSession(t *testing.T) { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go index c75d6b39fc..5399843c0a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go @@ -21,6 +21,7 @@ package peers import ( + "errors" "strings" "testing" "time" @@ -71,6 +72,7 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { bootstrapReadConsistency topology.ReadConsistencyLevel shardsTimeRangesToBootstrap result.ShardTimeRanges expectedAvailableShardsTimeRanges result.ShardTimeRanges + expectedErr error }{ { title: "Returns empty if only self is available", @@ -91,6 +93,7 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { bootstrapReadConsistency: topology.ReadConsistencyLevelMajority, shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + expectedErr: errors.New("unknown shard state: Unknown"), }, { title: "Returns success if consistency can be met (available/leaving)", @@ -149,14 +152,22 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { src, err := newPeersSource(opts) require.NoError(t, err) - var ( - runOpts = testDefaultRunOpts.SetInitialTopologyState(tc.topoState) - dataRes = src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) - ) - require.Equal(t, tc.expectedAvailableShardsTimeRanges, dataRes) - - indexRes := src.AvailableIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) - require.Equal(t, tc.expectedAvailableShardsTimeRanges, indexRes) + runOpts := testDefaultRunOpts.SetInitialTopologyState(tc.topoState) + dataRes, err := src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + if tc.expectedErr != nil { + require.Equal(t, tc.expectedErr, err) + } else { + require.NoError(t, err) + require.Equal(t, tc.expectedAvailableShardsTimeRanges, dataRes) + } + + indexRes, err := src.AvailableIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + if tc.expectedErr != nil { + require.Equal(t, tc.expectedErr, err) + } else { + require.NoError(t, err) + require.Equal(t, tc.expectedAvailableShardsTimeRanges, indexRes) + } }) } } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go index 0e40be5876..d3c34433fe 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go @@ -21,6 +21,8 @@ package uninitialized import ( + "fmt" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/namespace" @@ -61,7 +63,7 @@ func (s *uninitializedTopologySource) AvailableData( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { return s.availability(ns, shardsTimeRanges, runOpts) } @@ -69,7 +71,7 @@ func (s *uninitializedTopologySource) AvailableIndex( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { return s.availability(ns, shardsTimeRanges, runOpts) } @@ -77,7 +79,7 @@ func (s *uninitializedTopologySource) availability( ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, -) result.ShardTimeRanges { +) (result.ShardTimeRanges, error) { var ( topoState = runOpts.InitialTopologyState() availableShardTimeRanges = result.ShardTimeRanges{} @@ -119,10 +121,7 @@ func (s *uninitializedTopologySource) availability( case shard.Unknown: fallthrough default: - // TODO(rartoul): Make this a hard error once we refactor the interface to support - // returning errors. - s.opts.InstrumentOptions().Logger().Errorf("unknown shard state: %v", shardState) - return result.ShardTimeRanges{} + return nil, fmt.Errorf("unknown shard state: %v", shardState) } } @@ -138,7 +137,7 @@ func (s *uninitializedTopologySource) availability( } } - return availableShardTimeRanges + return availableShardTimeRanges, nil } func (s *uninitializedTopologySource) ReadData( @@ -146,10 +145,12 @@ func (s *uninitializedTopologySource) ReadData( shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) (result.DataBootstrapResult, error) { - var ( - availability = s.availability(ns, shardsTimeRanges, runOpts) - missing = shardsTimeRanges.Copy() - ) + availability, err := s.availability(ns, shardsTimeRanges, runOpts) + if err != nil { + return nil, err + } + + missing := shardsTimeRanges.Copy() missing.Subtract(availability) if missing.IsEmpty() { @@ -164,10 +165,12 @@ func (s *uninitializedTopologySource) ReadIndex( shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) (result.IndexBootstrapResult, error) { - var ( - availability = s.availability(ns, shardsTimeRanges, runOpts) - missing = shardsTimeRanges.Copy() - ) + availability, err := s.availability(ns, shardsTimeRanges, runOpts) + if err != nil { + return nil, err + } + + missing := shardsTimeRanges.Copy() missing.Subtract(availability) if missing.IsEmpty() { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go index be20411eea..da84108159 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go @@ -21,6 +21,7 @@ package uninitialized import ( + "errors" "testing" "time" @@ -68,6 +69,7 @@ func TestUnitializedTopologySourceAvailableDataAndAvailableIndex(t *testing.T) { topoState *topology.StateSnapshot shardsTimeRangesToBootstrap result.ShardTimeRanges expectedAvailableShardsTimeRanges result.ShardTimeRanges + expectedErr error }{ // Snould return that it can bootstrap everything because // it's a new namespace. @@ -86,8 +88,8 @@ func TestUnitializedTopologySourceAvailableDataAndAvailableIndex(t *testing.T) { topoState: tu.NewStateSnapshot(1, tu.HostShardStates{ tu.SelfID: tu.ShardsRange(0, numShards, shard.Unknown), }), - shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedErr: errors.New("unknown shard state: Unknown"), }, // Snould return that it can't bootstrap anything because it's not // a new namespace. @@ -168,8 +170,8 @@ func TestUnitializedTopologySourceAvailableDataAndAvailableIndex(t *testing.T) { notSelfID1: tu.ShardsRange(0, numShards, shard.Available), notSelfID2: tu.ShardsRange(0, numShards, shard.Unknown), }), - shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, - expectedAvailableShardsTimeRanges: result.ShardTimeRanges{}, + shardsTimeRangesToBootstrap: shardTimeRangesToBootstrap, + expectedErr: errors.New("unknown shard state: Unknown"), }, } @@ -177,30 +179,35 @@ func TestUnitializedTopologySourceAvailableDataAndAvailableIndex(t *testing.T) { t.Run(tc.title, func(t *testing.T) { var ( - srcOpts = NewOptions().SetInstrumentOptions(instrument.NewOptions()) - src = newTopologyUninitializedSource(srcOpts) - runOpts = testDefaultRunOpts.SetInitialTopologyState(tc.topoState) - dataAvailabilityResult = src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) - indexAvailabilityResult = src.AvailableIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + srcOpts = NewOptions().SetInstrumentOptions(instrument.NewOptions()) + src = newTopologyUninitializedSource(srcOpts) + runOpts = testDefaultRunOpts.SetInitialTopologyState(tc.topoState) ) + dataAvailabilityResult, dataErr := src.AvailableData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + indexAvailabilityResult, indexErr := src.AvailableIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) - // Make sure AvailableData and AvailableIndex return the correct result - require.Equal(t, tc.expectedAvailableShardsTimeRanges, dataAvailabilityResult) - require.Equal(t, tc.expectedAvailableShardsTimeRanges, indexAvailabilityResult) + if tc.expectedErr != nil { + require.Equal(t, tc.expectedErr, dataErr) + require.Equal(t, tc.expectedErr, indexErr) + } else { + // Make sure AvailableData and AvailableIndex return the correct result + require.Equal(t, tc.expectedAvailableShardsTimeRanges, dataAvailabilityResult) + require.Equal(t, tc.expectedAvailableShardsTimeRanges, indexAvailabilityResult) - // Make sure ReadData marks anything that AvailableData wouldn't return as unfulfilled - dataResult, err := src.ReadData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) - require.NoError(t, err) - expectedDataUnfulfilled := tc.shardsTimeRangesToBootstrap.Copy() - expectedDataUnfulfilled.Subtract(tc.expectedAvailableShardsTimeRanges) - require.Equal(t, expectedDataUnfulfilled, dataResult.Unfulfilled()) + // Make sure ReadData marks anything that AvailableData wouldn't return as unfulfilled + dataResult, err := src.ReadData(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + require.NoError(t, err) + expectedDataUnfulfilled := tc.shardsTimeRangesToBootstrap.Copy() + expectedDataUnfulfilled.Subtract(tc.expectedAvailableShardsTimeRanges) + require.Equal(t, expectedDataUnfulfilled, dataResult.Unfulfilled()) - // Make sure ReadIndex marks anything that AvailableIndex wouldn't return as unfulfilled - indexResult, err := src.ReadIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) - require.NoError(t, err) - expectedIndexUnfulfilled := tc.shardsTimeRangesToBootstrap.Copy() - expectedIndexUnfulfilled.Subtract(tc.expectedAvailableShardsTimeRanges) - require.Equal(t, expectedIndexUnfulfilled, indexResult.Unfulfilled()) + // Make sure ReadIndex marks anything that AvailableIndex wouldn't return as unfulfilled + indexResult, err := src.ReadIndex(nsMetadata, tc.shardsTimeRangesToBootstrap, runOpts) + require.NoError(t, err) + expectedIndexUnfulfilled := tc.shardsTimeRangesToBootstrap.Copy() + expectedIndexUnfulfilled.Subtract(tc.expectedAvailableShardsTimeRanges) + require.Equal(t, expectedIndexUnfulfilled, indexResult.Unfulfilled()) + } }) } } diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index a81f416922..87fa62810c 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -186,7 +186,7 @@ type Source interface { ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, runOpts RunOptions, - ) result.ShardTimeRanges + ) (result.ShardTimeRanges, error) // ReadData returns raw series for a given set of shards & specified time ranges and // the time ranges it's unable to fulfill. A bootstrapper source should only return @@ -203,7 +203,7 @@ type Source interface { ns namespace.Metadata, shardsTimeRanges result.ShardTimeRanges, opts RunOptions, - ) result.ShardTimeRanges + ) (result.ShardTimeRanges, error) // ReadIndex returns series index blocks. ReadIndex(