Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/proxy: Deduplicate chunks on StoreAPI level. Recommend chunk sorting for StoreAPI + Optimized iter chunk dedup. #2710

Merged
merged 4 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- TSDB now does memory-mapping of Head chunks and reduces memory usage.
- [#2667](https://github.com/thanos-io/thanos/pull/2667) Store: removed support to the legacy `index.cache.json`. The hidden flag `--store.disable-index-header` was removed.
- [#2667](https://github.com/thanos-io/thanos/pull/2667) Compact: the deprecated flag `--index.generate-missing-cache-file` and the metric `thanos_compact_generated_index_total` were removed.
- [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA).

## [v0.13.0](https://github.com/thanos-io/thanos/releases) - IN PROGRESS

Expand Down
8 changes: 3 additions & 5 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,8 @@ func (s *promSeriesSet) Next() bool {
return s.currChunks[i].MinTime < s.currChunks[j].MinTime
})

// newChunkSeriesIterator will handle overlaps well, however we don't need to iterate over those samples,
// removed early duplicates here.
// TODO(bwplotka): Remove chunk duplicates on proxy level as well to avoid decoding those.
// https://github.com/thanos-io/thanos/issues/2546, consider skipping removal here then.
// Proxy handles duplicates between different series, let's handle duplicates within single series now as well.
// We don't need to decode those.
s.currChunks = removeExactDuplicates(s.currChunks)
return true
}
Expand All @@ -81,7 +79,7 @@ func removeExactDuplicates(chks []storepb.AggrChunk) []storepb.AggrChunk {
ret = append(ret, chks[0])

for _, c := range chks[1:] {
if ret[len(ret)-1].String() == c.String() {
if ret[len(ret)-1].Compare(c) == 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brancz solving regression here.

continue
}
ret = append(ret, c)
Expand Down
5 changes: 2 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,9 +984,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()

// Merge series set into an union of all block sets. This exposes all blocks are single seriesSet.
// Chunks of returned series might be out of order w.r.t to their time range.
// This must be accounted for later by clients.
// NOTE: We "carefully" assume series and chunks are sorted within each SeriesSet. This should be guaranteed by
// blockSeries method. In worst case deduplication logic won't deduplicate correctly, which will be accounted later.
set := storepb.MergeSeriesSets(res...)
for set.Next() {
var series storepb.Series
Expand Down
1 change: 1 addition & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
}
return s.currSeries.Labels, s.currSeries.Chunks
}

func (s *streamSeriesSet) Err() error {
s.errMtx.Lock()
defer s.errMtx.Unlock()
Expand Down
12 changes: 4 additions & 8 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,11 @@ func TestProxyStore_Series(t *testing.T) {
expectedSeries: []rawSeries{
{
lset: []storepb.Label{{Name: "a", Value: "a"}},
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}, {{4, 3}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "a"}},
chunks: [][]sample{{{5, 4}}},
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}, {{4, 3}}, {{5, 4}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "b"}},
chunks: [][]sample{{{2, 2}, {3, 3}, {4, 4}}, {{1, 1}, {2, 2}, {3, 3}}}, // No sort merge.
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{2, 2}, {3, 3}, {4, 4}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "c"}},
Expand Down Expand Up @@ -343,7 +339,7 @@ func TestProxyStore_Series(t *testing.T) {
expectedSeries: []rawSeries{
{
lset: []storepb.Label{{Name: "a", Value: "b"}},
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{1, 11}, {2, 22}, {3, 33}}},
chunks: [][]sample{{{1, 11}, {2, 22}, {3, 33}}, {{1, 1}, {2, 2}, {3, 3}}},
},
},
},
Expand Down Expand Up @@ -1220,7 +1216,7 @@ type rawSeries struct {
}

func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) {
testutil.Equals(t, len(expected), len(got), "got: %v", got)
testutil.Equals(t, len(expected), len(got), "got unexpected number of series: \n %v", got)

for i, series := range got {
testutil.Equals(t, expected[i].lset, series.Labels)
Expand Down
201 changes: 176 additions & 25 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package storepb

import (
"bytes"
"fmt"
"sort"
"strconv"
Expand Down Expand Up @@ -50,6 +51,7 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse {
}

// CompareLabels compares two sets of labels.
// After lexicographical order, the set with fewer labels comes first.
func CompareLabels(a, b []Label) int {
l := len(a)
if len(b) < l {
Expand All @@ -63,7 +65,7 @@ func CompareLabels(a, b []Label) int {
return d
}
}
// If all labels so far were in common, the set with fewer labels comes first.

return len(a) - len(b)
}

Expand All @@ -78,13 +80,25 @@ func EmptySeriesSet() SeriesSet {
return emptySeriesSet{}
}

// MergeSeriesSets returns a new series set that is the union of the input sets.
// MergeSeriesSets takes all series sets and returns as a union single series set.
// It assumes series are sorted by labels within single SeriesSet, similar to remote read guarantees.
// However, they can be partial: in such case, if the single SeriesSet returns the same series within many iterations,
// MergeSeriesSets will merge those into one.
//
// It also assumes in a "best effort" way that chunks are sorted by min time. It's done as an optimization only, so if input
// series' chunks are NOT sorted, the only consequence is that the duplicates might be not correctly removed. This is double checked
// which on just-before PromQL level as well, so the only consequence is increased network bandwidth.
// If all chunks were sorted, MergeSeriesSet ALSO returns sorted chunks by min time.
//
// Chunks within the same series can also overlap (within all SeriesSet
// as well as single SeriesSet alone). If the chunk ranges overlap, the *exact* chunk duplicates will be removed
// (except one), and any other overlaps will be appended into on chunks slice.
func MergeSeriesSets(all ...SeriesSet) SeriesSet {
switch len(all) {
case 0:
return emptySeriesSet{}
case 1:
return all[0]
return newUniqueSeriesSet(all[0])
}
h := len(all) / 2

Expand All @@ -111,11 +125,6 @@ type mergedSeriesSet struct {
adone, bdone bool
}

// newMergedSeriesSet takes two series sets as a single series set.
// Series that occur in both sets should have disjoint time ranges.
// If the ranges overlap b samples are appended to a samples.
// If the single SeriesSet returns same series within many iterations,
// merge series set will not try to merge those.
func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet {
s := &mergedSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs
Expand Down Expand Up @@ -155,33 +164,175 @@ func (s *mergedSeriesSet) Next() bool {
}

d := s.compare()

// Both sets contain the current series. Chain them into a single one.
if d > 0 {
s.lset, s.chunks = s.b.At()
s.bdone = !s.b.Next()
} else if d < 0 {
return true
}
if d < 0 {
s.lset, s.chunks = s.a.At()
s.adone = !s.a.Next()
} else {
// Concatenate chunks from both series sets. They may be expected of order
// w.r.t to their time range. This must be accounted for later.
lset, chksA := s.a.At()
_, chksB := s.b.At()

s.lset = lset
// Slice reuse is not generally safe with nested merge iterators.
// We err on the safe side an create a new slice.
s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB))
s.chunks = append(s.chunks, chksA...)
s.chunks = append(s.chunks, chksB...)
return true
}

s.adone = !s.a.Next()
s.bdone = !s.b.Next()
// Both a and b contains the same series. Go through all chunks, remove duplicates and concatenate chunks from both
// series sets. We best effortly assume chunks are sorted by min time. If not, we will not detect all deduplicate which will
// be account on select layer anyway. We do it still for early optimization.
lset, chksA := s.a.At()
_, chksB := s.b.At()
s.lset = lset

// Slice reuse is not generally safe with nested merge iterators.
// We err on the safe side an create a new slice.
s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB))

b := 0
Outer:
for a := range chksA {
for {
if b >= len(chksB) {
// No more b chunks.
s.chunks = append(s.chunks, chksA[a:]...)
break Outer
}

cmp := chksA[a].Compare(chksB[b])
if cmp > 0 {
s.chunks = append(s.chunks, chksA[a])
break
}
if cmp < 0 {
s.chunks = append(s.chunks, chksB[b])
b++
continue
}

// Exact duplicated chunks, discard one from b.
b++
}
}

if b < len(chksB) {
s.chunks = append(s.chunks, chksB[b:]...)
}

s.adone = !s.a.Next()
s.bdone = !s.b.Next()
return true
}

// uniqueSeriesSet takes one series set and ensures each iteration contains single, full series.
type uniqueSeriesSet struct {
SeriesSet
done bool

peek *Series

lset []Label
chunks []AggrChunk
}

func newUniqueSeriesSet(wrapped SeriesSet) *uniqueSeriesSet {
return &uniqueSeriesSet{SeriesSet: wrapped}
}

func (s *uniqueSeriesSet) At() ([]Label, []AggrChunk) {
return s.lset, s.chunks
}

func (s *uniqueSeriesSet) Next() bool {
if s.Err() != nil {
return false
}

for !s.done {
if s.done = !s.SeriesSet.Next(); s.done {
break
}
lset, chks := s.SeriesSet.At()
if s.peek == nil {
s.peek = &Series{Labels: lset, Chunks: chks}
continue
}

if CompareLabels(lset, s.peek.Labels) != 0 {
s.lset, s.chunks = s.peek.Labels, s.peek.Chunks
s.peek = &Series{Labels: lset, Chunks: chks}
return true
}

// We assume non-overlapping, sorted chunks. This is best effort only, if it's otherwise it
// will just be duplicated, but well handled by StoreAPI consumers.
s.peek.Chunks = append(s.peek.Chunks, chks...)
}

if s.peek == nil {
return false
}

s.lset, s.chunks = s.peek.Labels, s.peek.Chunks
s.peek = nil
return true
}

// Compare returns positive 1 if chunk is smaller -1 if larger than b by min time, then max time.
// It returns 0 if chunks are exactly the same.
func (m AggrChunk) Compare(b AggrChunk) int {
if m.MinTime < b.MinTime {
return 1
}
if m.MinTime > b.MinTime {
return -1
}

// Same min time.
if m.MaxTime < b.MaxTime {
return 1
}
if m.MaxTime > b.MaxTime {
return -1
}

// We could use proto.Equal, but we need ordering as well.
for _, cmp := range []func() int{
func() int { return m.Raw.Compare(b.Raw) },
func() int { return m.Count.Compare(b.Count) },
func() int { return m.Sum.Compare(b.Sum) },
func() int { return m.Min.Compare(b.Min) },
func() int { return m.Max.Compare(b.Max) },
func() int { return m.Counter.Compare(b.Counter) },
} {
if c := cmp(); c == 0 {
continue
} else {
return c
}
}
return 0
}

// Compare returns positive 1 if chunk is smaller -1 if larger.
// It returns 0 if chunks are exactly the same.
func (m *Chunk) Compare(b *Chunk) int {
if m == nil && b == nil {
return 0
}
if b == nil {
return 1
}
if m == nil {
return -1
}

if m.Type < b.Type {
return 1
}
if m.Type > b.Type {
return -1
}
return bytes.Compare(m.Data, b.Data)
}

// LabelsToPromLabels converts Thanos proto labels to Prometheus labels in type safe manner.
// NOTE: It allocates memory.
func LabelsToPromLabels(lset []Label) labels.Labels {
Expand Down
Loading