Skip to content

Commit

Permalink
Fix shard time ranges superset logic. Cleanup and add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Dec 2, 2020
1 parent ae0403a commit 3454070
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 47 deletions.
34 changes: 33 additions & 1 deletion src/dbnode/storage/bootstrap/result/shard_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,47 @@ func (r shardTimeRanges) Equal(other ShardTimeRanges) bool {

// IsSuperset returns whether the current shard time ranges is a superset of the
// other shard time ranges.
//
// The following are superset conditions.
// - Always superset if other shard time ranges are empty. (Even empty vs empty).
// - Shards must be a superset of shards in other.
// - Time ranges must be a superset of time ranges in other.
//
// The following are superset failure conditions:
// - Partially overlapping shards.
// - Non overlapping shards.
// - Partially overlapping time ranges.
// - Non overlapping time ranges.
func (r shardTimeRanges) IsSuperset(other ShardTimeRanges) bool {
// If other shard time ranges is empty then the curr shard time ranges
// is considered a superset.
if other.Len() == 0 {
return true
}

// Short-circuit if we have more shards than other.
if len(r) < other.Len() {
return false
}

// Check if other ranges has any shards we do not have.
for shard := range other.Iter() {
if _, ok := r.Get(shard); !ok {
return false
}
}

for shard, ranges := range r {
otherRanges := other.GetOrAdd(shard)
otherRanges, ok := other.Get(shard)
if !ok {
continue
}

// Short-circuit if we have more ranges than other.
if ranges.Len() < otherRanges.Len() {
return false
}

it := ranges.Iter()
otherIt := otherRanges.Iter()

Expand Down
122 changes: 76 additions & 46 deletions src/dbnode/storage/bootstrap/result/shard_ranges_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,62 +50,92 @@ func TestShardTimeRangesAdd(t *testing.T) {

func TestShardTimeRangesIsSuperset(t *testing.T) {
start := time.Now().Truncate(testBlockSize)
times := []time.Time{start, start.Add(testBlockSize), start.Add(2 * testBlockSize), start.Add(3 * testBlockSize)}

sr1 := []ShardTimeRanges{
NewShardTimeRangesFromRange(times[0], times[1], 1, 2, 3),
NewShardTimeRangesFromRange(times[1], times[2], 1, 2, 3),
NewShardTimeRangesFromRange(times[2], times[3], 1, 2, 3),
}
ranges1 := NewShardTimeRanges()
for _, r := range sr1 {
ranges1.AddRanges(r)
}
sr2 := []ShardTimeRanges{
NewShardTimeRangesFromRange(times[0], times[1], 2),
NewShardTimeRangesFromRange(times[1], times[2], 1, 2),
NewShardTimeRangesFromRange(times[2], times[3], 1),
}
ranges2 := NewShardTimeRanges()
for _, r := range sr2 {
ranges2.AddRanges(r)
}
sr3 := []ShardTimeRanges{
NewShardTimeRangesFromRange(times[1], times[2], 1, 2, 3),
}
ranges3 := NewShardTimeRanges()
for _, r := range sr3 {
ranges3.AddRanges(r)
}
ranges1 := createShardTimeRanges(start, testBlockSize, 3, []uint32{1, 2, 3})
ranges2 := createShardTimeRanges(start, testBlockSize, 3, []uint32{1, 2})
ranges3 := createShardTimeRanges(start, testBlockSize, 1, []uint32{1, 2, 3})

require.True(t, ranges1.IsSuperset(ranges2))
require.True(t, ranges1.IsSuperset(ranges1))
require.True(t, ranges1.IsSuperset(ranges3))

// Reverse sanity checks.
// Subset of shards fails superset condition.
require.False(t, ranges2.IsSuperset(ranges1))
// Subset of time ranges fails superset condition.
require.False(t, ranges3.IsSuperset(ranges1))
}

// Added some more false checks for non overlapping time ranges and no time ranges.
sr1 = []ShardTimeRanges{
NewShardTimeRangesFromRange(times[0], times[1], 1, 2, 3),
}
ranges1 = NewShardTimeRanges()
for _, r := range sr1 {
ranges1.AddRanges(r)
}
sr2 = []ShardTimeRanges{
NewShardTimeRangesFromRange(times[1], times[2], 1, 2, 3),
}
ranges2 = NewShardTimeRanges()
for _, r := range sr2 {
ranges2.AddRanges(r)
}
ranges3 = NewShardTimeRanges()
func TestShardTimeRangesIsSupersetNoOverlapShards(t *testing.T) {
start := time.Now().Truncate(testBlockSize)

ranges1 := createShardTimeRanges(start, testBlockSize, 3, []uint32{1, 3, 5})
ranges2 := createShardTimeRanges(start, testBlockSize, 3, []uint32{2, 4, 6})

// Neither are supersets of each other as they have non overlapping shards.
require.False(t, ranges1.IsSuperset(ranges2))
require.False(t, ranges2.IsSuperset(ranges1))
}

func TestShardTimeRangesIsSupersetPartialOverlapShards(t *testing.T) {
start := time.Now().Truncate(testBlockSize)

ranges1 := createShardTimeRanges(start, testBlockSize, 3, []uint32{1, 3, 5})
ranges2 := createShardTimeRanges(start, testBlockSize, 3, []uint32{3, 4, 6})

// Neither are supersets of each other as they only have partially overlapping shards.
require.False(t, ranges1.IsSuperset(ranges2))
require.True(t, ranges2.IsSuperset(ranges3))
require.False(t, ranges3.IsSuperset(ranges1))
require.False(t, ranges3.IsSuperset(ranges2))
require.False(t, ranges2.IsSuperset(ranges1))
}

func TestShardTimeRangesIsSupersetNoOverlapTimeRanges(t *testing.T) {
start := time.Now().Truncate(testBlockSize)

ranges1 := createShardTimeRanges(start, testBlockSize, 3, []uint32{1, 3, 5})
ranges2 := createShardTimeRanges(start.Add(testBlockSize*3), testBlockSize, 3,
[]uint32{1, 3, 5})

// Neither are supersets of each other as they have non overlapping time ranges.
require.False(t, ranges1.IsSuperset(ranges2))
require.False(t, ranges2.IsSuperset(ranges1))
}

func TestShardTimeRangesIsSupersetPartialOverlapTimeRanges(t *testing.T) {
start := time.Now().Truncate(testBlockSize)

ranges1 := createShardTimeRanges(start, testBlockSize, 3, []uint32{1, 3, 5})
ranges2 := createShardTimeRanges(start.Add(testBlockSize*2), testBlockSize, 3,
[]uint32{1, 3, 5})

// Neither are supersets of each other as they have non overlapping time ranges.
require.False(t, ranges1.IsSuperset(ranges2))
require.False(t, ranges2.IsSuperset(ranges1))
}

func TestShardTimeRangesIsSupersetEmpty(t *testing.T) {
start := time.Now().Truncate(testBlockSize)

ranges1 := createShardTimeRanges(start, testBlockSize, 3, []uint32{1, 3, 5})
ranges2 := NewShardTimeRanges()

require.True(t, ranges1.IsSuperset(ranges2))
require.False(t, ranges2.IsSuperset(ranges1))
require.True(t, ranges2.IsSuperset(ranges2))
}

func createShardTimeRanges(
start time.Time,
blockSize time.Duration,
numBlocks int,
shards []uint32,
) ShardTimeRanges {
ranges := NewShardTimeRanges()
for i := 0; i < numBlocks; i++ {
blockStart := start.Add(time.Duration(i) * blockSize)
ranges.AddRanges(NewShardTimeRangesFromRange(
blockStart,
blockStart.Add(blockSize),
shards...,
))
}
return ranges
}

0 comments on commit 3454070

Please sign in to comment.