diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index f4e8849904..3235a3220e 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -294,15 +294,11 @@ func TestProxyStore_Series(t *testing.T) { expectedSeries: []rawSeries{ { lset: []storepb.Label{{Name: "a", Value: "a"}}, - chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}, {{4, 3}}}, - }, - { - lset: []storepb.Label{{Name: "a", Value: "a"}}, - chunks: [][]sample{{{5, 4}}}, + chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}, {{4, 3}}, {{5, 4}}}, }, { lset: []storepb.Label{{Name: "a", Value: "b"}}, - chunks: [][]sample{{{2, 2}, {3, 3}, {4, 4}}, {{1, 1}, {2, 2}, {3, 3}}}, // No sort merge. + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{2, 2}, {3, 3}, {4, 4}}}, }, { lset: []storepb.Label{{Name: "a", Value: "c"}}, @@ -343,7 +339,7 @@ func TestProxyStore_Series(t *testing.T) { expectedSeries: []rawSeries{ { lset: []storepb.Label{{Name: "a", Value: "b"}}, - chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{1, 11}, {2, 22}, {3, 33}}}, + chunks: [][]sample{{{1, 11}, {2, 22}, {3, 33}}, {{1, 1}, {2, 2}, {3, 3}}}, }, }, }, @@ -1220,7 +1216,7 @@ type rawSeries struct { } func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) { - testutil.Equals(t, len(expected), len(got), "got: %v", got) + testutil.Equals(t, len(expected), len(got), "got unexpected number of series: \n %v", got) for i, series := range got { testutil.Equals(t, expected[i].lset, series.Labels) diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 9d6e3132aa..dfc1a4f8ec 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -4,6 +4,7 @@ package storepb import ( + "bytes" "strings" "unsafe" @@ -190,27 +191,18 @@ Outer: break Outer } - if chksA[a].MinTime < chksB[b].MinTime { + cmp := chksA[a].Compare(chksB[b]) + if cmp > 0 { s.chunks = append(s.chunks, chksA[a]) break } - - if chksA[a].MinTime > chksB[b].MinTime { + if cmp < 0 { s.chunks = append(s.chunks, chksB[b]) b++ continue } - // TODO(bwplotka): This is expensive. - //fmt.Println("check strings") - if strings.Compare(chksA[a].String(), chksB[b].String()) == 0 { - // Exact duplicated chunks, discard one from b. - b++ - continue - } - - // Same min Time, but not duplicate, so it does not matter. Take b (since lower for loop). - s.chunks = append(s.chunks, chksB[b]) + // Exact duplicated chunks, discard one from b. b++ } } @@ -267,7 +259,6 @@ func (s *uniqueSeriesSet) Next() bool { // We assume non-overlapping, sorted chunks. This is best effort only, if it's otherwise it // will just be duplicated, but well handled by StoreAPI consumers. s.peek.Chunks = append(s.peek.Chunks, chks...) - } if s.peek == nil { @@ -279,6 +270,64 @@ func (s *uniqueSeriesSet) Next() bool { return true } +// Compare returns positive 1 if chunk is smaller -1 if larger than b by min time, then max time. +// It returns 0 if chunks are exactly the same. +func (m AggrChunk) Compare(b AggrChunk) int { + if m.MinTime < b.MinTime { + return 1 + } + if m.MinTime > b.MinTime { + return -1 + } + + // Same min time. + if m.MaxTime < b.MaxTime { + return 1 + } + if m.MaxTime > b.MaxTime { + return -1 + } + + // We could use proto.Equal, but we need ordering as well. + for _, cmp := range []func() int{ + func() int { return m.Raw.Compare(b.Raw) }, + func() int { return m.Count.Compare(b.Count) }, + func() int { return m.Sum.Compare(b.Sum) }, + func() int { return m.Min.Compare(b.Min) }, + func() int { return m.Max.Compare(b.Max) }, + func() int { return m.Counter.Compare(b.Counter) }, + } { + if c := cmp(); c == 0 { + continue + } else { + return c + } + } + return 0 +} + +// Compare returns positive 1 if chunk is smaller -1 if larger. +// It returns 0 if chunks are exactly the same. +func (m *Chunk) Compare(b *Chunk) int { + if m == nil && b == nil { + return 0 + } + if b == nil { + return 1 + } + if m == nil { + return -1 + } + + if m.Type < b.Type { + return 1 + } + if m.Type > b.Type { + return -1 + } + return bytes.Compare(m.Data, b.Data) +} + // LabelsToPromLabels converts Thanos proto labels to Prometheus labels in type safe manner. func LabelsToPromLabels(lset []Label) labels.Labels { ret := make(labels.Labels, len(lset)) diff --git a/pkg/store/storepb/custom_test.go b/pkg/store/storepb/custom_test.go index e7c9f05b11..832c6e0b9b 100644 --- a/pkg/store/storepb/custom_test.go +++ b/pkg/store/storepb/custom_test.go @@ -261,11 +261,11 @@ func TestMergeSeriesSets(t *testing.T) { lset: labels.Labels{labels.Label{Name: "a", Value: "c"}}, chunks: [][]sample{ {{t: 1, v: 1}, {t: 2, v: 2}, {t: 3, v: 3}, {t: 4, v: 4}}, - {{t: 11, v: 11}, {t: 12, v: 12}, {t: 14, v: 14}}, {{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}}, + {{t: 11, v: 11}, {t: 12, v: 12}, {t: 14, v: 14}}, {{t: 15, v: 15}, {t: 16, v: 16}, {t: 17, v: 17}, {t: 18, v: 18}}, - {{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}}, {{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 22}, {t: 24, v: 24}}, + {{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}}, }, }, { lset: labels.Labels{labels.Label{Name: "a", Value: "d"}}, @@ -317,11 +317,11 @@ func TestMergeSeriesSets(t *testing.T) { chunks: [][]sample{ {{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}}, {{t: 1, v: 1}, {t: 2, v: 2}, {t: 3, v: 3}, {t: 4, v: 4}}, - {{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}}, - {{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}}, {{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 22}, {t: 24, v: 24}}, {{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}}, {{t: 15, v: 15}, {t: 16, v: 16}, {t: 17, v: 17}, {t: 18, v: 18}}, + {{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}}, + {{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}}, }, }, }, @@ -381,17 +381,25 @@ func expandSeriesSet(t *testing.T, gotSS SeriesSet) (ret []rawSeries) { } // Test the cost of merging series sets for different number of merged sets and their size. -// This tests cases with large number of series, with same chunks. Since the subset are unique, this does not capture -// merging of partial or non-overlapping sets well. func BenchmarkMergedSeriesSet(b *testing.B) { - benchmarkMergedSeriesSet(testutil.NewTB(b)) + b.Run("overlapping chunks", func(b *testing.B) { + benchmarkMergedSeriesSet(testutil.NewTB(b), true) + }) + b.Run("non-overlapping chunks", func(b *testing.B) { + benchmarkMergedSeriesSet(testutil.NewTB(b), false) + }) } -func TestMergedSeriesSet_Labels(b *testing.T) { - benchmarkMergedSeriesSet(testutil.NewTB(b)) +func TestMergedSeriesSet_Labels(t *testing.T) { + t.Run("overlapping chunks", func(t *testing.T) { + benchmarkMergedSeriesSet(testutil.NewTB(t), true) + }) + t.Run("non-overlapping chunks", func(t *testing.T) { + benchmarkMergedSeriesSet(testutil.NewTB(t), false) + }) } -func benchmarkMergedSeriesSet(b testutil.TB) { +func benchmarkMergedSeriesSet(b testutil.TB, overlappingChunks bool) { var sel func(sets []SeriesSet) SeriesSet sel = func(sets []SeriesSet) SeriesSet { if len(sets) == 0 { @@ -418,11 +426,20 @@ func benchmarkMergedSeriesSet(b testutil.TB) { sort.Sort(labels.Slice(lbls)) - in := make([][]rawSeries, j) - + blocks := make([][]rawSeries, j) for _, l := range lbls { - for j := range in { - in[j] = append(in[j], rawSeries{lset: l, chunks: chunks}) + for j := range blocks { + if overlappingChunks { + blocks[j] = append(blocks[j], rawSeries{lset: l, chunks: chunks}) + continue + } + blocks[j] = append(blocks[j], rawSeries{ + lset: l, + chunks: [][]sample{ + {{int64(4*j) + 1, 1}, {int64(4*j) + 2, 2}}, + {{int64(4*j) + 3, 3}, {int64(4*j) + 4, 4}}, + }, + }) } } @@ -430,7 +447,7 @@ func benchmarkMergedSeriesSet(b testutil.TB) { for i := 0; i < b.N(); i++ { var sets []SeriesSet - for _, s := range in { + for _, s := range blocks { sets = append(sets, newListSeriesSet(b, s)) } ms := sel(sets)