From 275bcca6c8d4f35aa4a5d3319db5da31c0730303 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Thu, 5 Mar 2020 12:53:26 -0500 Subject: [PATCH] Revise iface. --- .../bootstrap/bootstrapper/commitlog/source.go | 8 +++++--- .../storage/bootstrap/bootstrapper/fs/source.go | 6 +++--- .../bootstrap/bootstrapper/fs/source_data_test.go | 14 ++++++++++---- .../storage/bootstrap/bootstrapper/peers/source.go | 6 ++++-- .../bootstrap/bootstrapper/uninitialized/source.go | 4 +++- .../storage/bootstrap/result/shard_ranges.go | 9 +++++---- src/dbnode/storage/bootstrap/result/types.go | 2 +- src/dbnode/storage/bootstrap/util.go | 4 ++-- 8 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index f44a0c27a2..6fe515a24b 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -495,8 +495,8 @@ func (s *commitLogSource) Read( // NB(r): This can occur when a topology change happens then we // bootstrap from the commit log data that the node no longer owns. shard := seriesEntry.series.Shard - bootstrappingShard := seriesEntry.namespace.dataAndIndexShardRanges.Get(shard) - if bootstrappingShard == nil { + _, ok = seriesEntry.namespace.dataAndIndexShardRanges.Get(shard) + if !ok { datapointsSkippedNotBootstrappingShard++ continue } @@ -1035,7 +1035,9 @@ func (s *commitLogSource) availability( // to distinguish between "unfulfilled" data and "corrupt" data, then // modify this to only say the commit log bootstrapper can fullfil // "unfulfilled" data, but not corrupt data. - availableShardTimeRanges.Set(shardIDUint, shardsTimeRanges.Get(shardIDUint)) + if tr, ok := shardsTimeRanges.Get(shardIDUint); ok { + availableShardTimeRanges.Set(shardIDUint, tr) + } case shard.Unknown: fallthrough default: diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index dd883d45a6..546e43eb0b 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -767,12 +767,12 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( } willFulfill := result.NewShardTimeRanges() for _, shard := range info.Shards { - tr := shardsTimeRanges.Get(shard) - if tr == nil { + tr, ok := shardsTimeRanges.Get(shard) + if !ok { // No ranges match for this shard. continue } - if willFulfill.Get(shard) == nil { + if _, ok := willFulfill.Get(shard); !ok { willFulfill.Set(shard, xtime.NewRanges()) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go index 20571bc3d2..7bff19b3f0 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go @@ -390,12 +390,15 @@ func TestAvailableTimeRangeFilter(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) require.Equal(t, 1, res.Len()) - require.NotNil(t, res.Get(testShard)) + _, ok := res.Get(testShard) + require.True(t, ok) expected := xtime.NewRanges( xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)}, xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)}) - validateTimeRanges(t, res.Get(testShard), expected) + tr, ok := res.Get(testShard) + require.True(t, ok) + validateTimeRanges(t, tr, expected) } func TestAvailableTimeRangePartialError(t *testing.T) { @@ -417,12 +420,15 @@ func TestAvailableTimeRangePartialError(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) require.Equal(t, 1, res.Len()) - require.NotNil(t, res.Get(testShard)) + _, ok := res.Get(testShard) + require.True(t, ok) expected := xtime.NewRanges( xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)}, xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)}) - validateTimeRanges(t, res.Get(testShard), expected) + tr, ok := res.Get(testShard) + require.True(t, ok) + validateTimeRanges(t, tr, expected) } // NB: too real :'( diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 354efe68a9..41273c4c7f 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -995,7 +995,9 @@ func (s *peersSource) peerAvailability( // all the data. This assumption is safe, as the shard/block ranges // will simply be marked unfulfilled if the peers are not able to // satisfy the requests. - availableShardTimeRanges.Set(shardIDUint, shardsTimeRanges.Get(shardIDUint)) + if tr, ok := shardsTimeRanges.Get(shardIDUint); ok { + availableShardTimeRanges.Set(shardIDUint, tr) + } } return availableShardTimeRanges, nil @@ -1010,7 +1012,7 @@ func (s *peersSource) markIndexResultErrorAsUnfulfilled( ) { // NB(r): We explicitly do not remove entries from the index results // as they are additive and get merged together with results from other - // bootstrappers by just appending the result (unlike data bootstrap + // bootstrappers by just appending the result (ounlike data bootstrap // results that when merged replace the block with the current block). // It would also be difficult to remove only series that were added to the // index block as results from a specific data block can be subsets of the diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go index e2749f81a6..ce0c83b930 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source.go @@ -126,7 +126,9 @@ func (s *uninitializedTopologySource) availability( // factor to actually increase correctly. shardHasNeverBeenCompletelyInitialized := numInitializing-numLeaving > 0 if shardHasNeverBeenCompletelyInitialized { - availableShardTimeRanges.Set(shardIDUint, shardsTimeRanges.Get(shardIDUint)) + if tr, ok := shardsTimeRanges.Get(shardIDUint); ok { + availableShardTimeRanges.Set(shardIDUint, tr) + } } } diff --git a/src/dbnode/storage/bootstrap/result/shard_ranges.go b/src/dbnode/storage/bootstrap/result/shard_ranges.go index 7716c530fc..aea75365b3 100644 --- a/src/dbnode/storage/bootstrap/result/shard_ranges.go +++ b/src/dbnode/storage/bootstrap/result/shard_ranges.go @@ -50,8 +50,9 @@ func NewShardTimeRanges() ShardTimeRanges { } // Get time ranges for a shard. -func (r shardTimeRanges) Get(shard uint32) xtime.Ranges { - return r[shard] +func (r shardTimeRanges) Get(shard uint32) (xtime.Ranges, bool) { + tr, ok := r[shard] + return tr, ok } // Set time ranges for a shard. @@ -166,8 +167,8 @@ func (r shardTimeRanges) Subtract(other ShardTimeRanges) { return } for shard, ranges := range r { - otherRanges := other.Get(shard) - if otherRanges == nil { + otherRanges, ok := other.Get(shard) + if !ok { continue } diff --git a/src/dbnode/storage/bootstrap/result/types.go b/src/dbnode/storage/bootstrap/result/types.go index 8659199aeb..cc3ef06c6e 100644 --- a/src/dbnode/storage/bootstrap/result/types.go +++ b/src/dbnode/storage/bootstrap/result/types.go @@ -125,7 +125,7 @@ type ShardResults map[uint32]ShardResult // ShardTimeRanges is a map of shards to time ranges. type ShardTimeRanges interface { // Get time ranges for a shard. - Get(shard uint32) xtime.Ranges + Get(shard uint32) (xtime.Ranges, bool) // Set time ranges for a shard. Set(shard uint32, ranges xtime.Ranges) ShardTimeRanges diff --git a/src/dbnode/storage/bootstrap/util.go b/src/dbnode/storage/bootstrap/util.go index 7ed16e886c..9422c8e84a 100644 --- a/src/dbnode/storage/bootstrap/util.go +++ b/src/dbnode/storage/bootstrap/util.go @@ -587,8 +587,8 @@ func validateShardTimeRanges( seen := make(map[uint32]struct{}, r.Len()) for k, val := range r.Iter() { - expectedVal := ex.Get(k) - if expectedVal == nil { + expectedVal, ok := ex.Get(k) + if !ok { return fmt.Errorf("expected shard map %v does not have shard %d; "+ "actual: %v", ex, k, r) }