Skip to content

Commit

Permalink
Remove implicit cloning of time ranges.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Feb 26, 2020
1 parent 05064b4 commit 5422535
Show file tree
Hide file tree
Showing 15 changed files with 111 additions and 103 deletions.
11 changes: 6 additions & 5 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *fileSystemSource) shardAvailability(
w := time.Duration(info.BlockSize)
currRange := xtime.Range{Start: t, End: t.Add(w)}
if targetRangesForShard.Overlaps(currRange) {
tr = tr.AddRange(currRange)
tr.AddRange(currRange)
}
}
return tr
Expand Down Expand Up @@ -416,7 +416,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
if err == nil && run == bootstrapIndexRunType {
// Mark index block as fulfilled.
fulfilled := result.ShardTimeRanges{
shard: xtime.Ranges{}.AddRange(timeRange),
shard: xtime.NewRanges(timeRange),
}
err = runResult.index.IndexResults().MarkFulfilled(start, fulfilled,
ns.Options().IndexOptions())
Expand All @@ -429,7 +429,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(

if err == nil {
fulfilled := result.ShardTimeRanges{
shard: xtime.Ranges{}.AddRange(timeRange),
shard: xtime.NewRanges(timeRange),
}
totalFulfilledRanges.AddRanges(fulfilled)
remainingRanges.Subtract(fulfilled)
Expand Down Expand Up @@ -722,7 +722,8 @@ func (s *fileSystemSource) bootstrapDataRunResultFromAvailability(
continue
}
availability := s.shardAvailability(md.ID(), shard, ranges)
remaining := ranges.RemoveRanges(availability)
remaining := ranges.Clone()
remaining.RemoveRanges(availability)
if !remaining.IsEmpty() {
unfulfilled.AddRanges(result.ShardTimeRanges{
shard: remaining,
Expand Down Expand Up @@ -782,7 +783,7 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
if !intersects {
continue
}
willFulfill[shard] = willFulfill[shard].AddRange(intersection)
willFulfill[shard].AddRange(intersection)
}
}

Expand Down
21 changes: 12 additions & 9 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func testBootstrappingIndexShardTimeRanges() result.ShardTimeRanges {
// `testBlockSize` rather than `testIndexSize` since the files generated
// by this test use 2 hour (which is `testBlockSize`) reader blocks.
return map[uint32]xtime.Ranges{
testShard: xtime.Ranges{}.AddRange(xtime.Range{
testShard: xtime.NewRanges(xtime.Range{
Start: testStart.Add(testBlockSize),
End: testStart.Add(11 * time.Hour),
}),
Expand Down Expand Up @@ -284,10 +284,13 @@ func sortedTagsFromTagsMap(tags map[string]string) ident.Tags {

func validateTimeRanges(t *testing.T, tr xtime.Ranges, expected xtime.Ranges) {
// Make range eclipses expected
require.True(t, expected.RemoveRanges(tr).IsEmpty())
expectedWithRemovedRanges := expected.Clone()
expectedWithRemovedRanges.RemoveRanges(tr)
require.True(t, expectedWithRemovedRanges.IsEmpty())

// Now make sure no ranges outside of expected
expectedWithAddedRanges := expected.AddRanges(tr)
expectedWithAddedRanges := expected.Clone()
expectedWithAddedRanges.AddRanges(tr)

require.Equal(t, expected.Len(), expectedWithAddedRanges.Len())
iter := expected.Iter()
Expand Down Expand Up @@ -388,9 +391,9 @@ func TestAvailableTimeRangeFilter(t *testing.T) {
require.Equal(t, 1, len(res))
require.NotNil(t, res[testShard])

expected := xtime.Ranges{}.
AddRange(xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)}).
AddRange(xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)})
expected := xtime.NewRanges(
xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)},
xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)})
validateTimeRanges(t, res[testShard], expected)
}

Expand All @@ -415,9 +418,9 @@ func TestAvailableTimeRangePartialError(t *testing.T) {
require.Equal(t, 1, len(res))
require.NotNil(t, res[testShard])

expected := xtime.Ranges{}.
AddRange(xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)}).
AddRange(xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)})
expected := xtime.NewRanges(
xtime.Range{Start: testStart, End: testStart.Add(2 * time.Hour)},
xtime.Range{Start: testStart.Add(10 * time.Hour), End: testStart.Add(12 * time.Hour)})
validateTimeRanges(t, res[testShard], expected)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func BenchmarkBootstrapIndex(b *testing.B) {
max = end
}

ranges = ranges.AddRange(xtime.Range{Start: start, End: end})
ranges.AddRange(xtime.Range{Start: start, End: end})

// Override the block size if different.
namespaceOpts := testNamespaceMetadata.Options()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newTestBootstrapIndexTimes(
}

shardTimeRanges := map[uint32]xtime.Ranges{
testShard: xtime.Ranges{}.AddRange(xtime.Range{
testShard: xtime.NewRanges(xtime.Range{
Start: start,
End: end,
}),
Expand Down Expand Up @@ -567,7 +567,7 @@ func TestBootstrapIndexWithPersistForIndexBlockAtRetentionEdge(t *testing.T) {

// NB(bodu): Simulate requesting bootstrapping of two whole index blocks instead of 3 data blocks (1.5 index blocks).
times.shardTimeRanges = map[uint32]xtime.Ranges{
testShard: xtime.Ranges{}.AddRange(xtime.Range{
testShard: xtime.NewRanges(xtime.Range{
Start: firstIndexBlockStart,
End: times.end,
}),
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,15 +767,15 @@ func (s *peersSource) processReaders(
if err == nil {
// Mark index block as fulfilled.
fulfilled := result.ShardTimeRanges{
shard: xtime.Ranges{}.AddRange(timeRange),
shard: xtime.NewRanges(timeRange),
}
err = r.IndexResults().MarkFulfilled(start, fulfilled,
idxOpts)
}

if err == nil {
remainingRanges.Subtract(result.ShardTimeRanges{
shard: xtime.Ranges{}.AddRange(timeRange),
shard: xtime.NewRanges(timeRange),
})
} else {
s.log.Error(err.Error(),
Expand Down
32 changes: 16 additions & 16 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,29 +734,29 @@ func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) {
require.NoError(t, err)

target := result.ShardTimeRanges{
0: xtime.Ranges{}.
AddRange(xtime.Range{Start: start, End: midway}).
AddRange(xtime.Range{Start: midway, End: end}),
1: xtime.Ranges{}.
AddRange(xtime.Range{Start: start, End: midway}).
AddRange(xtime.Range{Start: midway, End: end}),
2: xtime.Ranges{}.
AddRange(xtime.Range{Start: start, End: midway}).
AddRange(xtime.Range{Start: midway, End: end}),
3: xtime.Ranges{}.
AddRange(xtime.Range{Start: start, End: midway}).
AddRange(xtime.Range{Start: midway, End: end}),
0: xtime.NewRanges(
xtime.Range{Start: start, End: midway},
xtime.Range{Start: midway, End: end}),
1: xtime.NewRanges(
xtime.Range{Start: start, End: midway},
xtime.Range{Start: midway, End: end}),
2: xtime.NewRanges(
xtime.Range{Start: start, End: midway},
xtime.Range{Start: midway, End: end}),
3: xtime.NewRanges(
xtime.Range{Start: start, End: midway},
xtime.Range{Start: midway, End: end}),
}

tester := bootstrap.BuildNamespacesTester(t, testRunOptsWithPersist, target, testNsMd)
defer tester.Finish()
tester.TestReadWith(src)

expectedRanges := result.ShardTimeRanges{
0: xtime.Ranges{}.AddRange(xtime.Range{Start: start, End: midway}),
1: xtime.Ranges{}.AddRange(xtime.Range{Start: start, End: midway}),
2: xtime.Ranges{}.AddRange(xtime.Range{Start: start, End: midway}),
3: xtime.Ranges{}.AddRange(xtime.Range{Start: start, End: midway}),
0: xtime.NewRanges(xtime.Range{Start: start, End: midway}),
1: xtime.NewRanges(xtime.Range{Start: start, End: midway}),
2: xtime.NewRanges(xtime.Range{Start: start, End: midway}),
3: xtime.NewRanges(xtime.Range{Start: start, End: midway}),
}

// NB(bodu): There is no time series data written to disk so all ranges fail to be fulfilled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) {
numShards = uint32(4)
blockStart = time.Now().Truncate(blockSize)
shardTimeRangesToBootstrap = result.ShardTimeRanges{}
bootstrapRanges = xtime.Ranges{}.AddRange(xtime.Range{
bootstrapRanges = xtime.NewRanges(xtime.Range{
Start: blockStart,
End: blockStart.Add(blockSize),
})
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func PersistBootstrapIndexSegment(
expectedRanges := make(result.ShardTimeRanges, len(requestedRanges))
for shard := range requestedRanges {
shards[shard] = struct{}{}
expectedRanges[shard] = xtime.Ranges{}.AddRange(xtime.Range{
expectedRanges[shard] = xtime.NewRanges(xtime.Range{
Start: expectedRangeStart,
End: expectedRangeEnd,
})
Expand Down Expand Up @@ -253,7 +253,7 @@ func BuildBootstrapIndexSegment(

expectedRanges := make(result.ShardTimeRanges, len(requestedRanges))
for shard := range requestedRanges {
expectedRanges[shard] = xtime.Ranges{}.AddRange(xtime.Range{
expectedRanges[shard] = xtime.NewRanges(xtime.Range{
Start: expectedRangeStart,
End: expectedRangeEnd,
})
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewShardTimeRangesTimeWindowGroups(
continue
}
// Add to this range.
group[shard] = group[shard].AddRange(intersection)
group[shard].AddRange(intersection)
}
}

Expand Down
20 changes: 6 additions & 14 deletions src/dbnode/storage/bootstrap/result/result_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,9 @@ func TestShardTimeRangesString(t *testing.T) {
}

str := ShardTimeRanges{
0: xtime.NewRanges(xtime.Range{
Start: ts[0][0],
End: ts[0][1],
}).AddRange(xtime.Range{
Start: ts[1][0],
End: ts[1][1],
}),
0: xtime.NewRanges(
xtime.Range{Start: ts[0][0], End: ts[0][1]},
xtime.Range{Start: ts[1][0], End: ts[1][1]}),
1: xtime.NewRanges(xtime.Range{
Start: ts[2][0],
End: ts[2][1],
Expand All @@ -400,13 +396,9 @@ func TestShardTimeRangesSummaryString(t *testing.T) {
start := time.Unix(1472824800, 0)

str := ShardTimeRanges{
0: xtime.NewRanges(xtime.Range{
Start: start,
End: start.Add(testBlockSize),
}).AddRange(xtime.Range{
Start: start.Add(2 * testBlockSize),
End: start.Add(4 * testBlockSize),
}),
0: xtime.NewRanges(
xtime.Range{Start: start, End: start.Add(testBlockSize)},
xtime.Range{Start: start.Add(2 * testBlockSize), End: start.Add(4 * testBlockSize)}),
1: xtime.NewRanges(xtime.Range{
Start: start,
End: start.Add(2 * testBlockSize),
Expand Down
10 changes: 7 additions & 3 deletions src/dbnode/storage/bootstrap/result/shard_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func (r ShardTimeRanges) Equal(other ShardTimeRanges) bool {
func (r ShardTimeRanges) Copy() ShardTimeRanges {
result := make(map[uint32]xtime.Ranges, len(r))
for shard, ranges := range r {
result[shard] = xtime.Ranges{}.AddRanges(ranges)
newRanges := xtime.NewRanges()
newRanges.AddRanges(ranges)
result[shard] = newRanges
}
return result
}
Expand All @@ -92,7 +94,8 @@ func (r ShardTimeRanges) AddRanges(other ShardTimeRanges) {
continue
}
if existing, ok := r[shard]; ok {
r[shard] = existing.AddRanges(ranges)
existing.AddRanges(ranges)
r[shard] = existing
} else {
r[shard] = ranges
}
Expand Down Expand Up @@ -123,7 +126,8 @@ func (r ShardTimeRanges) Subtract(other ShardTimeRanges) {
continue
}

subtractedRanges := ranges.RemoveRanges(otherRanges)
subtractedRanges := ranges.Clone()
subtractedRanges.RemoveRanges(otherRanges)
if subtractedRanges.IsEmpty() {
delete(r, shard)
} else {
Expand Down
6 changes: 4 additions & 2 deletions src/dbnode/storage/bootstrap/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,14 +551,16 @@ func (nt *NamespacesTester) TestReadWith(s Source) {

func validateRanges(ac xtime.Ranges, ex xtime.Ranges) error {
// Make range eclipses expected.
removedRange := ex.RemoveRanges(ac)
removedRange := ex.Clone()
removedRange.RemoveRanges(ac)
if !removedRange.IsEmpty() {
return fmt.Errorf("actual range %v does not match expected range %v "+
"diff: %v", ac, ex, removedRange)
}

// Now make sure no ranges outside of expected.
expectedWithAddedRanges := ex.AddRanges(ac)
expectedWithAddedRanges := ex.Clone()
expectedWithAddedRanges.AddRanges(ac)
if ex.Len() != expectedWithAddedRanges.Len() {
return fmt.Errorf("expected with re-added ranges not equal")
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,7 @@ func (i *nsIndex) blocksForQueryWithRLock(queryRange xtime.Ranges) ([]index.Bloc
}

// Remove this range from the query range.
queryRange = queryRange.RemoveRange(blockRange)
queryRange.RemoveRange(blockRange)

blocks = append(blocks, block)
}
Expand Down
41 changes: 22 additions & 19 deletions src/x/time/ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ type Ranges struct {

// NewRanges constructs a new Ranges object comprising the provided ranges.
func NewRanges(ranges ...Range) Ranges {
var result Ranges
res := Ranges{sortedRanges: list.New()}
for _, r := range ranges {
result = result.AddRange(r)
res.AddRange(r)
}
return result
return res
}

// Len returns the number of ranges included.
Expand Down Expand Up @@ -66,44 +66,47 @@ func (tr Ranges) Overlaps(r Range) bool {
}

// AddRange adds the time range to the collection of ranges.
func (tr Ranges) AddRange(r Range) Ranges {
res := tr.clone()
res.addRangeInPlace(r)
return res
func (tr Ranges) AddRange(r Range) {
if tr.sortedRanges == nil {
tr.sortedRanges = list.New()
}
tr.addRangeInPlace(r)
}

// AddRanges adds the time ranges.
func (tr Ranges) AddRanges(other Ranges) Ranges {
res := tr.clone()
func (tr Ranges) AddRanges(other Ranges) {
if tr.sortedRanges == nil {
tr.sortedRanges = list.New()
}
it := other.Iter()
for it.Next() {
res.addRangeInPlace(it.Value())
tr.addRangeInPlace(it.Value())
}
return res
}

// RemoveRange removes the time range from the collection of ranges.
func (tr Ranges) RemoveRange(r Range) Ranges {
res := tr.clone()
res.removeRangeInPlace(r)
return res
func (tr Ranges) RemoveRange(r Range) {
tr.removeRangeInPlace(r)
}

// RemoveRanges removes the given time ranges from the current one.
func (tr Ranges) RemoveRanges(other Ranges) Ranges {
res := tr.clone()
func (tr Ranges) RemoveRanges(other Ranges) {
it := other.Iter()
for it.Next() {
res.removeRangeInPlace(it.Value())
tr.removeRangeInPlace(it.Value())
}
return res
}

// Iter returns an iterator that iterates over the time ranges included.
func (tr Ranges) Iter() *RangeIter {
return newRangeIter(tr.sortedRanges)
}

// Clone makes a clone of the time ranges.
func (tr Ranges) Clone() Ranges {
return tr.clone()
}

// String returns the string representation of the range.
func (tr Ranges) String() string {
var buf bytes.Buffer
Expand Down
Loading

0 comments on commit 5422535

Please sign in to comment.