Skip to content

Commit

Permalink
Optimized chunk comparision for overlaps.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Jun 3, 2020
1 parent 22d2c8d commit 3de467a
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 37 deletions.
12 changes: 4 additions & 8 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,11 @@ func TestProxyStore_Series(t *testing.T) {
expectedSeries: []rawSeries{
{
lset: []storepb.Label{{Name: "a", Value: "a"}},
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}, {{4, 3}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "a"}},
chunks: [][]sample{{{5, 4}}},
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}, {{4, 3}}, {{5, 4}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "b"}},
chunks: [][]sample{{{2, 2}, {3, 3}, {4, 4}}, {{1, 1}, {2, 2}, {3, 3}}}, // No sort merge.
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{2, 2}, {3, 3}, {4, 4}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "c"}},
Expand Down Expand Up @@ -343,7 +339,7 @@ func TestProxyStore_Series(t *testing.T) {
expectedSeries: []rawSeries{
{
lset: []storepb.Label{{Name: "a", Value: "b"}},
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{1, 11}, {2, 22}, {3, 33}}},
chunks: [][]sample{{{1, 11}, {2, 22}, {3, 33}}, {{1, 1}, {2, 2}, {3, 3}}},
},
},
},
Expand Down Expand Up @@ -1220,7 +1216,7 @@ type rawSeries struct {
}

func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) {
testutil.Equals(t, len(expected), len(got), "got: %v", got)
testutil.Equals(t, len(expected), len(got), "got unexpected number of series: \n %v", got)

for i, series := range got {
testutil.Equals(t, expected[i].lset, series.Labels)
Expand Down
77 changes: 63 additions & 14 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"sort"
"strconv"
"bytes"
"strings"
"unsafe"

Expand Down Expand Up @@ -195,27 +196,18 @@ Outer:
break Outer
}

if chksA[a].MinTime < chksB[b].MinTime {
cmp := chksA[a].Compare(chksB[b])
if cmp > 0 {
s.chunks = append(s.chunks, chksA[a])
break
}

if chksA[a].MinTime > chksB[b].MinTime {
if cmp < 0 {
s.chunks = append(s.chunks, chksB[b])
b++
continue
}

// TODO(bwplotka): This is expensive.
//fmt.Println("check strings")
if strings.Compare(chksA[a].String(), chksB[b].String()) == 0 {
// Exact duplicated chunks, discard one from b.
b++
continue
}

// Same min Time, but not duplicate, so it does not matter. Take b (since lower for loop).
s.chunks = append(s.chunks, chksB[b])
// Exact duplicated chunks, discard one from b.
b++
}
}
Expand Down Expand Up @@ -272,7 +264,6 @@ func (s *uniqueSeriesSet) Next() bool {
// We assume non-overlapping, sorted chunks. This is best effort only, if it's otherwise it
// will just be duplicated, but well handled by StoreAPI consumers.
s.peek.Chunks = append(s.peek.Chunks, chks...)

}

if s.peek == nil {
Expand All @@ -284,6 +275,64 @@ func (s *uniqueSeriesSet) Next() bool {
return true
}

// Compare returns positive 1 if chunk is smaller -1 if larger than b by min time, then max time.
// It returns 0 if chunks are exactly the same.
func (m AggrChunk) Compare(b AggrChunk) int {
if m.MinTime < b.MinTime {
return 1
}
if m.MinTime > b.MinTime {
return -1
}

// Same min time.
if m.MaxTime < b.MaxTime {
return 1
}
if m.MaxTime > b.MaxTime {
return -1
}

// We could use proto.Equal, but we need ordering as well.
for _, cmp := range []func() int{
func() int { return m.Raw.Compare(b.Raw) },
func() int { return m.Count.Compare(b.Count) },
func() int { return m.Sum.Compare(b.Sum) },
func() int { return m.Min.Compare(b.Min) },
func() int { return m.Max.Compare(b.Max) },
func() int { return m.Counter.Compare(b.Counter) },
} {
if c := cmp(); c == 0 {
continue
} else {
return c
}
}
return 0
}

// Compare returns positive 1 if chunk is smaller -1 if larger.
// It returns 0 if chunks are exactly the same.
func (m *Chunk) Compare(b *Chunk) int {
if m == nil && b == nil {
return 0
}
if b == nil {
return 1
}
if m == nil {
return -1
}

if m.Type < b.Type {
return 1
}
if m.Type > b.Type {
return -1
}
return bytes.Compare(m.Data, b.Data)
}

// LabelsToPromLabels converts Thanos proto labels to Prometheus labels in type safe manner.
// NOTE: It allocates memory.
func LabelsToPromLabels(lset []Label) labels.Labels {
Expand Down
47 changes: 32 additions & 15 deletions pkg/store/storepb/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,11 @@ func TestMergeSeriesSets(t *testing.T) {
lset: labels.Labels{labels.Label{Name: "a", Value: "c"}},
chunks: [][]sample{
{{t: 1, v: 1}, {t: 2, v: 2}, {t: 3, v: 3}, {t: 4, v: 4}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 14, v: 14}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 14, v: 14}},
{{t: 15, v: 15}, {t: 16, v: 16}, {t: 17, v: 17}, {t: 18, v: 18}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 22}, {t: 24, v: 24}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
},
}, {
lset: labels.Labels{labels.Label{Name: "a", Value: "d"}},
Expand Down Expand Up @@ -317,11 +317,11 @@ func TestMergeSeriesSets(t *testing.T) {
chunks: [][]sample{
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 1, v: 1}, {t: 2, v: 2}, {t: 3, v: 3}, {t: 4, v: 4}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 22}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 15, v: 15}, {t: 16, v: 16}, {t: 17, v: 17}, {t: 18, v: 18}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
},
},
},
Expand Down Expand Up @@ -392,17 +392,25 @@ func TestExtendLabels(t *testing.T) {
}

// Test the cost of merging series sets for different number of merged sets and their size.
// This tests cases with large number of series, with same chunks. Since the subset are unique, this does not capture
// merging of partial or non-overlapping sets well.
func BenchmarkMergedSeriesSet(b *testing.B) {
benchmarkMergedSeriesSet(testutil.NewTB(b))
b.Run("overlapping chunks", func(b *testing.B) {
benchmarkMergedSeriesSet(testutil.NewTB(b), true)
})
b.Run("non-overlapping chunks", func(b *testing.B) {
benchmarkMergedSeriesSet(testutil.NewTB(b), false)
})
}

func TestMergedSeriesSet_Labels(b *testing.T) {
benchmarkMergedSeriesSet(testutil.NewTB(b))
func TestMergedSeriesSet_Labels(t *testing.T) {
t.Run("overlapping chunks", func(t *testing.T) {
benchmarkMergedSeriesSet(testutil.NewTB(t), true)
})
t.Run("non-overlapping chunks", func(t *testing.T) {
benchmarkMergedSeriesSet(testutil.NewTB(t), false)
})
}

func benchmarkMergedSeriesSet(b testutil.TB) {
func benchmarkMergedSeriesSet(b testutil.TB, overlappingChunks bool) {
var sel func(sets []SeriesSet) SeriesSet
sel = func(sets []SeriesSet) SeriesSet {
if len(sets) == 0 {
Expand All @@ -429,19 +437,28 @@ func benchmarkMergedSeriesSet(b testutil.TB) {

sort.Sort(labels.Slice(lbls))

in := make([][]rawSeries, j)

blocks := make([][]rawSeries, j)
for _, l := range lbls {
for j := range in {
in[j] = append(in[j], rawSeries{lset: l, chunks: chunks})
for j := range blocks {
if overlappingChunks {
blocks[j] = append(blocks[j], rawSeries{lset: l, chunks: chunks})
continue
}
blocks[j] = append(blocks[j], rawSeries{
lset: l,
chunks: [][]sample{
{{int64(4*j) + 1, 1}, {int64(4*j) + 2, 2}},
{{int64(4*j) + 3, 3}, {int64(4*j) + 4, 4}},
},
})
}
}

b.ResetTimer()

for i := 0; i < b.N(); i++ {
var sets []SeriesSet
for _, s := range in {
for _, s := range blocks {
sets = append(sets, newListSeriesSet(b, s))
}
ms := sel(sets)
Expand Down

0 comments on commit 3de467a

Please sign in to comment.