Skip to content

Commit

Permalink
Deduplicate chunk dups on proxy StoreAPI level. Recommend chunk sorti…
Browse files Browse the repository at this point in the history
…ng for StoreAPI.

Also: Merge same series together on proxy level instead select. This allows better dedup efficiency.

Partially fixes: #2303

Cases like overlapped data from store and sidecar and 1:1 duplicates are optimized as soon as it's possible.
This case was highly visible on GitLab repro data and exists in most of Thanos setup.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed May 14, 2020
1 parent 34859f1 commit 5bfcbe9
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [2513](https://github.com/thanos-io/thanos/pull/2513) Tools: Moved `thanos bucket` commands to `thanos tools bucket`, also
moved `thanos check rules` to `thanos tools rules-check`. `thanos tools rules-check` also takes rules by `--rules` repeated flag not argument
anymore.
- [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA).

## [v0.12.2](https://github.com/thanos-io/thanos/releases/tag/v0.12.2) - 2020.04.30

Expand Down
5 changes: 2 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,9 +981,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()

// Merge series set into an union of all block sets. This exposes all blocks are single seriesSet.
// Chunks of returned series might be out of order w.r.t to their time range.
// This must be accounted for later by clients.
// NOTE: We "carefully" assume series and chunks are sorted within each SeriesSet. This should be guaranteed by
// blockSeries method. In worst case deduplication logic won't deduplicate correctly, which will be accounted later.
set := storepb.MergeSeriesSets(res...)
for set.Next() {
var series storepb.Series
Expand Down
1 change: 1 addition & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
}
return s.currSeries.Labels, s.currSeries.Chunks
}

func (s *streamSeriesSet) Err() error {
s.errMtx.Lock()
defer s.errMtx.Unlock()
Expand Down
230 changes: 195 additions & 35 deletions pkg/store/storepb/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (errSeriesSet) At() ([]Label, []AggrChunk) { return nil, nil }

func (e errSeriesSet) Err() error { return e.err }

func TestMergeSeriesSet(t *testing.T) {
func TestMergeSeriesSets(t *testing.T) {
for _, tcase := range []struct {
desc string
in [][]rawSeries
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestMergeSeriesSet(t *testing.T) {
},
},
{
desc: "two seriesSets, {a=c} series to merge",
desc: "two seriesSets, {a=c} series to merge, sorted",
in: [][]rawSeries{
{
{
Expand All @@ -165,14 +165,12 @@ func TestMergeSeriesSet(t *testing.T) {
chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}},
}, {
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}, {{7, 1}, {8, 2}}, {{9, 3}, {10, 4}, {11, 4444}}},
chunks: [][]sample{{{7, 1}, {8, 2}}, {{9, 3}, {10, 4}, {11, 4444}}, {{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}},
},
},
},
{
// SeriesSet can return same series within different iterations. MergeSeries should not try to merge those.
// We do it on last step possible: Querier promSet.
desc: "single seriesSets, {a=c} series to merge.",
desc: "single seriesSets, {a=c} series to merge, nothing merged",
in: [][]rawSeries{
{
{
Expand Down Expand Up @@ -203,18 +201,182 @@ func TestMergeSeriesSet(t *testing.T) {
},
},
},
{
// SeriesSet can return same series within different iterations. MergeSeries will merge those as long as there are more than one series sets.
desc: "single seriesSets, {a=c} series to merge, merged",
in: [][]rawSeries{
{
{
lset: labels.FromStrings("a", "a"),
chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}},
},
{
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{{{7, 1}, {8, 2}}, {{9, 3}, {10, 4}, {11, 4444}}},
},
{
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}},
},
},
{
{
lset: labels.FromStrings("a", "d"),
chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}},
},
},
},

expected: []rawSeries{
{
lset: labels.FromStrings("a", "a"),
chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}},
}, {
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{{{7, 1}, {8, 2}}, {{9, 3}, {10, 4}, {11, 4444}}, {{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}},
},
{
lset: labels.FromStrings("a", "d"),
chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}},
},
},
},
{
desc: "four seriesSets, {a=c} series to merge AND deduplicate exactly the same chunks",
in: [][]rawSeries{
{
{
lset: labels.FromStrings("a", "a"),
chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}},
},
{
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{
{{11, 11}, {12, 12}, {13, 13}, {14, 14}},
{{15, 15}, {16, 16}, {17, 17}, {18, 18}},
},
},
{
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{
{{20, 20}, {21, 21}, {22, 22}, {24, 24}},
},
},
},
{
{
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{
{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
{{11, 11}, {12, 12}, {13, 13}, {14, 14}}, // Same chunk as in set 1.
},
},
{
lset: labels.FromStrings("a", "d"),
chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}},
},
},
{
{
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{
{{11, 11}, {12, 12}, {13, 13}, {14, 14}}, // Same chunk as in set 1.
{{20, 20}, {21, 21}, {22, 23}, {24, 24}}, // Almost same chunk as in set 1 (one value is different).
},
},
},
{
{
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{
{{11, 11}, {12, 12}, {14, 14}}, // Almost same chunk as in set 1 (one sample is missing).
{{20, 20}, {21, 21}, {22, 22}, {24, 24}}, // Same chunk as in set 1.
},
},
},
},

expected: []rawSeries{
{
lset: labels.Labels{labels.Label{Name: "a", Value: "a"}},
chunks: [][]sample{{{t: 1, v: 1}, {t: 2, v: 2}}, {{t: 3, v: 3}, {t: 4, v: 4}}},
}, {
lset: labels.Labels{labels.Label{Name: "a", Value: "c"}},
chunks: [][]sample{
{{t: 1, v: 1}, {t: 2, v: 2}, {t: 3, v: 3}, {t: 4, v: 4}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 14, v: 14}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 15, v: 15}, {t: 16, v: 16}, {t: 17, v: 17}, {t: 18, v: 18}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 22}, {t: 24, v: 24}},
},
}, {
lset: labels.Labels{labels.Label{Name: "a", Value: "d"}},
chunks: [][]sample{{{t: 11, v: 1}, {t: 12, v: 2}}, {{t: 13, v: 3}, {t: 14, v: 4}}},
},
},
},
{
desc: "four seriesSets, {a=c} series to merge, unsorted chunks, so dedup is expected to not be fully done",
in: [][]rawSeries{
{
{
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{
{{20, 20}, {21, 21}, {22, 22}, {24, 24}},
},
},
{
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{
{{11, 11}, {12, 12}, {13, 13}, {14, 14}},
{{15, 15}, {16, 16}, {17, 17}, {18, 18}},
},
},
},
{
{
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{
{{11, 11}, {12, 12}, {13, 13}, {14, 14}}, // Same chunk as in set 1.
{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
},
},
},
{
{
lset: labels.FromStrings("a", "c"),
chunks: [][]sample{
{{20, 20}, {21, 21}, {22, 23}, {24, 24}}, // Almost same chunk as in set 1 (one value is different).
{{11, 11}, {12, 12}, {13, 13}, {14, 14}}, // Same chunk as in set 1.
},
},
},
},

expected: []rawSeries{
{
lset: labels.Labels{labels.Label{Name: "a", Value: "c"}},
chunks: [][]sample{
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 1, v: 1}, {t: 2, v: 2}, {t: 3, v: 3}, {t: 4, v: 4}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 23}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 20, v: 20}, {t: 21, v: 21}, {t: 22, v: 22}, {t: 24, v: 24}},
{{t: 11, v: 11}, {t: 12, v: 12}, {t: 13, v: 13}, {t: 14, v: 14}},
{{t: 15, v: 15}, {t: 16, v: 16}, {t: 17, v: 17}, {t: 18, v: 18}},
},
},
},
},
} {
if ok := t.Run(tcase.desc, func(t *testing.T) {
t.Run(tcase.desc, func(t *testing.T) {
var input []SeriesSet
for _, iss := range tcase.in {
input = append(input, newListSeriesSet(t, iss))
}
ss := MergeSeriesSets(input...)
seriesEquals(t, tcase.expected, ss)
testutil.Ok(t, ss.Err())
}); !ok {
return
}
testutil.Equals(t, tcase.expected, expandSeriesSet(t, MergeSeriesSets(input...)))
})
}
}

Expand All @@ -239,42 +401,40 @@ type rawSeries struct {
chunks [][]sample
}

func seriesEquals(t *testing.T, expected []rawSeries, gotSS SeriesSet) {
var got []Series
func expandSeriesSet(t *testing.T, gotSS SeriesSet) (ret []rawSeries) {
for gotSS.Next() {
lset, chks := gotSS.At()
got = append(got, Series{Labels: lset, Chunks: chks})
}

testutil.Equals(t, len(expected), len(got), "got: %v", got)

for i, series := range got {
testutil.Equals(t, expected[i].lset, LabelsToPromLabels(series.Labels))
testutil.Equals(t, len(expected[i].chunks), len(series.Chunks), "unexpected number of chunks")

for k, chk := range series.Chunks {
r := rawSeries{lset: LabelsToPromLabels(lset), chunks: make([][]sample, len(chks))}
for i, chk := range chks {
c, err := chunkenc.FromData(chunkenc.EncXOR, chk.Raw.Data)
testutil.Ok(t, err)

j := 0
iter := c.Iterator(nil)
for iter.Next() {
testutil.Assert(t, j < len(expected[i].chunks[k]), "more samples than expected for %s chunk %d", series.Labels, k)

tv, v := iter.At()
testutil.Equals(t, expected[i].chunks[k][j], sample{tv, v})
j++
t, v := iter.At()
r.chunks[i] = append(r.chunks[i], sample{t: t, v: v})
}
testutil.Ok(t, iter.Err())
testutil.Equals(t, len(expected[i].chunks[k]), j)
}
ret = append(ret, r)
}

testutil.Ok(t, gotSS.Err())
return ret
}

// Test the cost of merging series sets for different number of merged sets and their size.
// The subset are all equivalent so this does not capture merging of partial or non-overlapping sets well.
// This tests cases with large number of series, with same chunks. Since the subset are unique, this does not capture
// merging of partial or non-overlapping sets well.
func BenchmarkMergedSeriesSet(b *testing.B) {
benchmarkMergedSeriesSet(testutil.NewTB(b))
}

func TestMergedSeriesSet_Labels(b *testing.T) {
benchmarkMergedSeriesSet(testutil.NewTB(b))
}

func benchmarkMergedSeriesSet(b testutil.TB) {
var sel func(sets []SeriesSet) SeriesSet
sel = func(sets []SeriesSet) SeriesSet {
if len(sets) == 0 {
Expand All @@ -295,7 +455,7 @@ func BenchmarkMergedSeriesSet(b *testing.B) {
20000,
} {
for _, j := range []int{1, 2, 4, 8, 16, 32} {
b.Run(fmt.Sprintf("series=%d,blocks=%d", k, j), func(b *testing.B) {
b.Run(fmt.Sprintf("series=%d,blocks=%d", k, j), func(b testutil.TB) {
lbls, err := labels.ReadLabels(filepath.Join("../../testutil/testdata", "20kseries.json"), k)
testutil.Ok(b, err)

Expand All @@ -311,7 +471,7 @@ func BenchmarkMergedSeriesSet(b *testing.B) {

b.ResetTimer()

for i := 0; i < b.N; i++ {
for i := 0; i < b.N(); i++ {
var sets []SeriesSet
for _, s := range in {
sets = append(sets, newListSeriesSet(b, s))
Expand Down
6 changes: 6 additions & 0 deletions pkg/store/storepb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/store/storepb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ service Store {
/// partition of the single series, but once a new series is started to be streamed it means that no more data will
/// be sent for previous one.
/// Series has to be sorted.
///
/// There is no requirements on chunk sorting, however it is recommended to have chunk sorted by chunk min time.
/// This heavily optimizes the resource usage on Querier / Federated Queries.
rpc Series(SeriesRequest) returns (stream SeriesResponse);

/// LabelNames returns all label names that is available.
Expand Down

0 comments on commit 5bfcbe9

Please sign in to comment.