Skip to content

Commit

Permalink
Optimized chunk comparision for overlaps.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed May 16, 2020
1 parent f6004ab commit 0ebf5e1
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 29 deletions.
77 changes: 63 additions & 14 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package storepb

import (
"bytes"
"strings"
"unsafe"

Expand Down Expand Up @@ -190,27 +191,18 @@ Outer:
break Outer
}

if chksA[a].MinTime < chksB[b].MinTime {
cmp := chksA[a].Compare(chksB[b])
if cmp > 0 {
s.chunks = append(s.chunks, chksA[a])
break
}

if chksA[a].MinTime > chksB[b].MinTime {
if cmp < 0 {
s.chunks = append(s.chunks, chksB[b])
b++
continue
}

// TODO(bwplotka): This is expensive.
//fmt.Println("check strings")
if strings.Compare(chksA[a].String(), chksB[b].String()) == 0 {
// Exact duplicated chunks, discard one from b.
b++
continue
}

// Same min Time, but not duplicate, so it does not matter. Take b (since lower for loop).
s.chunks = append(s.chunks, chksB[b])
// Exact duplicated chunks, discard one from b.
b++
}
}
Expand Down Expand Up @@ -267,7 +259,6 @@ func (s *uniqueSeriesSet) Next() bool {
// We assume non-overlapping, sorted chunks. This is best effort only, if it's otherwise it
// will just be duplicated, but well handled by StoreAPI consumers.
s.peek.Chunks = append(s.peek.Chunks, chks...)

}

if s.peek == nil {
Expand All @@ -279,6 +270,64 @@ func (s *uniqueSeriesSet) Next() bool {
return true
}

// Compare returns positive 1 if chunk is smaller -1 if larger than b by min time, then max time.
// It returns 0 if chunks are exactly the same.
func (m AggrChunk) Compare(b AggrChunk) int {
if m.MinTime < b.MinTime {
return 1
}
if m.MinTime > b.MinTime {
return -1
}

// Same min time.
if m.MaxTime < b.MaxTime {
return 1
}
if m.MaxTime > b.MaxTime {
return -1
}

// We could use proto.Equal, but we need ordering as well.
for _, cmp := range []func() int{
func() int { return m.Raw.Compare(b.Raw) },
func() int { return m.Count.Compare(b.Count) },
func() int { return m.Sum.Compare(b.Sum) },
func() int { return m.Min.Compare(b.Min) },
func() int { return m.Max.Compare(b.Max) },
func() int { return m.Counter.Compare(b.Counter) },
} {
if c := cmp(); c == 0 {
continue
} else {
return c
}
}
return 0
}

// Compare returns positive 1 if chunk is smaller -1 if larger.
// It returns 0 if chunks are exactly the same.
func (m *Chunk) Compare(b *Chunk) int {
if m == nil && b == nil {
return 0
}
if b == nil {
return 1
}
if m == nil {
return -1
}

if m.Type < b.Type {
return 1
}
if m.Type > b.Type {
return -1
}
return bytes.Compare(m.Data, b.Data)
}

// LabelsToPromLabels converts Thanos proto labels to Prometheus labels in type safe manner.
func LabelsToPromLabels(lset []Label) labels.Labels {
ret := make(labels.Labels, len(lset))
Expand Down
47 changes: 32 additions & 15 deletions pkg/store/storepb/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,11 @@ func TestMergeSeriesSets(t *testing.T) {
lset: labels.Labels{labels.Label{Name: "a", Value: "c"}},
chunks: [][]sample{
{{t: 1, v: 1}, {t: 2, v: 2}, {t: 3, v: 3}, {t: 4, v: 4}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 14, v: 14}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 14, v: 14}},
{{t: 15, v: 15}, {t: 16, v: 16}, {t: 17, v: 17}, {t: 18, v: 18}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 22}, {t: 24, v: 24}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
},
}, {
lset: labels.Labels{labels.Label{Name: "a", Value: "d"}},
Expand Down Expand Up @@ -317,11 +317,11 @@ func TestMergeSeriesSets(t *testing.T) {
chunks: [][]sample{
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 1, v: 1}, {t: 2, v: 2}, {t: 3, v: 3}, {t: 4, v: 4}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 22}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 15, v: 15}, {t: 16, v: 16}, {t: 17, v: 17}, {t: 18, v: 18}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
},
},
},
Expand Down Expand Up @@ -381,17 +381,25 @@ func expandSeriesSet(t *testing.T, gotSS SeriesSet) (ret []rawSeries) {
}

// Test the cost of merging series sets for different number of merged sets and their size.
// This tests cases with large number of series, with same chunks. Since the subset are unique, this does not capture
// merging of partial or non-overlapping sets well.
func BenchmarkMergedSeriesSet(b *testing.B) {
benchmarkMergedSeriesSet(testutil.NewTB(b))
b.Run("overlapping chunks", func(b *testing.B) {
benchmarkMergedSeriesSet(testutil.NewTB(b), true)
})
b.Run("non-overlapping chunks", func(b *testing.B) {
benchmarkMergedSeriesSet(testutil.NewTB(b), false)
})
}

func TestMergedSeriesSet_Labels(b *testing.T) {
benchmarkMergedSeriesSet(testutil.NewTB(b))
func TestMergedSeriesSet_Labels(t *testing.T) {
t.Run("overlapping chunks", func(t *testing.T) {
benchmarkMergedSeriesSet(testutil.NewTB(t), true)
})
t.Run("non-overlapping chunks", func(t *testing.T) {
benchmarkMergedSeriesSet(testutil.NewTB(t), false)
})
}

func benchmarkMergedSeriesSet(b testutil.TB) {
func benchmarkMergedSeriesSet(b testutil.TB, overlappingChunks bool) {
var sel func(sets []SeriesSet) SeriesSet
sel = func(sets []SeriesSet) SeriesSet {
if len(sets) == 0 {
Expand All @@ -418,19 +426,28 @@ func benchmarkMergedSeriesSet(b testutil.TB) {

sort.Sort(labels.Slice(lbls))

in := make([][]rawSeries, j)

blocks := make([][]rawSeries, j)
for _, l := range lbls {
for j := range in {
in[j] = append(in[j], rawSeries{lset: l, chunks: chunks})
for j := range blocks {
if overlappingChunks {
blocks[j] = append(blocks[j], rawSeries{lset: l, chunks: chunks})
continue
}
blocks[j] = append(blocks[j], rawSeries{
lset: l,
chunks: [][]sample{
{{int64(4*j) + 1, 1}, {int64(4*j) + 2, 2}},
{{int64(4*j) + 3, 3}, {int64(4*j) + 4, 4}},
},
})
}
}

b.ResetTimer()

for i := 0; i < b.N(); i++ {
var sets []SeriesSet
for _, s := range in {
for _, s := range blocks {
sets = append(sets, newListSeriesSet(b, s))
}
ms := sel(sets)
Expand Down

0 comments on commit 0ebf5e1

Please sign in to comment.