Skip to content

Commit

Permalink
Revise iface.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Mar 5, 2020
1 parent 9a8aba1 commit 275bcca
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 20 deletions.
8 changes: 5 additions & 3 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,8 @@ func (s *commitLogSource) Read(
// NB(r): This can occur when a topology change happens then we
// bootstrap from the commit log data that the node no longer owns.
shard := seriesEntry.series.Shard
bootstrappingShard := seriesEntry.namespace.dataAndIndexShardRanges.Get(shard)
if bootstrappingShard == nil {
_, ok = seriesEntry.namespace.dataAndIndexShardRanges.Get(shard)
if !ok {
datapointsSkippedNotBootstrappingShard++
continue
}
Expand Down Expand Up @@ -1035,7 +1035,9 @@ func (s *commitLogSource) availability(
// to distinguish between "unfulfilled" data and "corrupt" data, then
// modify this to only say the commit log bootstrapper can fullfil
// "unfulfilled" data, but not corrupt data.
availableShardTimeRanges.Set(shardIDUint, shardsTimeRanges.Get(shardIDUint))
if tr, ok := shardsTimeRanges.Get(shardIDUint); ok {
availableShardTimeRanges.Set(shardIDUint, tr)
}
case shard.Unknown:
fallthrough
default:
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,12 +767,12 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
}
willFulfill := result.NewShardTimeRanges()
for _, shard := range info.Shards {
tr := shardsTimeRanges.Get(shard)
if tr == nil {
tr, ok := shardsTimeRanges.Get(shard)
if !ok {
// No ranges match for this shard.
continue
}
if willFulfill.Get(shard) == nil {
if _, ok := willFulfill.Get(shard); !ok {
willFulfill.Set(shard, xtime.NewRanges())
}

Expand Down
14 changes: 10 additions & 4 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,15 @@ func TestAvailableTimeRangeFilter(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, 1, res.Len())
require.NotNil(t, res.Get(testShard))
_, ok := res.Get(testShard)
require.True(t, ok)

expected := xtime.NewRanges(
xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)},
xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)})
validateTimeRanges(t, res.Get(testShard), expected)
tr, ok := res.Get(testShard)
require.True(t, ok)
validateTimeRanges(t, tr, expected)
}

func TestAvailableTimeRangePartialError(t *testing.T) {
Expand All @@ -417,12 +420,15 @@ func TestAvailableTimeRangePartialError(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, 1, res.Len())
require.NotNil(t, res.Get(testShard))
_, ok := res.Get(testShard)
require.True(t, ok)

expected := xtime.NewRanges(
xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)},
xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)})
validateTimeRanges(t, res.Get(testShard), expected)
tr, ok := res.Get(testShard)
require.True(t, ok)
validateTimeRanges(t, tr, expected)
}

// NB: too real :'(
Expand Down
6 changes: 4 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,9 @@ func (s *peersSource) peerAvailability(
// all the data. This assumption is safe, as the shard/block ranges
// will simply be marked unfulfilled if the peers are not able to
// satisfy the requests.
availableShardTimeRanges.Set(shardIDUint, shardsTimeRanges.Get(shardIDUint))
if tr, ok := shardsTimeRanges.Get(shardIDUint); ok {
availableShardTimeRanges.Set(shardIDUint, tr)
}
}

return availableShardTimeRanges, nil
Expand All @@ -1010,7 +1012,7 @@ func (s *peersSource) markIndexResultErrorAsUnfulfilled(
) {
// NB(r): We explicitly do not remove entries from the index results
// as they are additive and get merged together with results from other
// bootstrappers by just appending the result (unlike data bootstrap
// bootstrappers by just appending the result (ounlike data bootstrap
// results that when merged replace the block with the current block).
// It would also be difficult to remove only series that were added to the
// index block as results from a specific data block can be subsets of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ func (s *uninitializedTopologySource) availability(
// factor to actually increase correctly.
shardHasNeverBeenCompletelyInitialized := numInitializing-numLeaving > 0
if shardHasNeverBeenCompletelyInitialized {
availableShardTimeRanges.Set(shardIDUint, shardsTimeRanges.Get(shardIDUint))
if tr, ok := shardsTimeRanges.Get(shardIDUint); ok {
availableShardTimeRanges.Set(shardIDUint, tr)
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/dbnode/storage/bootstrap/result/shard_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ func NewShardTimeRanges() ShardTimeRanges {
}

// Get time ranges for a shard.
func (r shardTimeRanges) Get(shard uint32) xtime.Ranges {
return r[shard]
func (r shardTimeRanges) Get(shard uint32) (xtime.Ranges, bool) {
tr, ok := r[shard]
return tr, ok
}

// Set time ranges for a shard.
Expand Down Expand Up @@ -166,8 +167,8 @@ func (r shardTimeRanges) Subtract(other ShardTimeRanges) {
return
}
for shard, ranges := range r {
otherRanges := other.Get(shard)
if otherRanges == nil {
otherRanges, ok := other.Get(shard)
if !ok {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/bootstrap/result/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type ShardResults map[uint32]ShardResult
// ShardTimeRanges is a map of shards to time ranges.
type ShardTimeRanges interface {
// Get time ranges for a shard.
Get(shard uint32) xtime.Ranges
Get(shard uint32) (xtime.Ranges, bool)

// Set time ranges for a shard.
Set(shard uint32, ranges xtime.Ranges) ShardTimeRanges
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/storage/bootstrap/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,8 @@ func validateShardTimeRanges(

seen := make(map[uint32]struct{}, r.Len())
for k, val := range r.Iter() {
expectedVal := ex.Get(k)
if expectedVal == nil {
expectedVal, ok := ex.Get(k)
if !ok {
return fmt.Errorf("expected shard map %v does not have shard %d; "+
"actual: %v", ex, k, r)
}
Expand Down

0 comments on commit 275bcca

Please sign in to comment.