From da3c83eef27c7f3d941276d55adcbb89dc3a4f90 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 15 May 2020 14:56:15 +0100 Subject: [PATCH] Optimized algorithm to combine series only on start. Signed-off-by: Bartlomiej Plotka --- pkg/store/storepb/custom.go | 146 ++++++++++++++++++++++++++++++------ 1 file changed, 121 insertions(+), 25 deletions(-) diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 781c6d761f6..bdf608500d5 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -45,6 +45,7 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse { } // CompareLabels compares two sets of labels. +// After lexicographical order, the set with fewer labels comes first. func CompareLabels(a, b []Label) int { l := len(a) if len(b) < l { @@ -58,7 +59,7 @@ func CompareLabels(a, b []Label) int { return d } } - // If all labels so far were in common, the set with fewer labels comes first. + return len(a) - len(b) } @@ -73,13 +74,25 @@ func EmptySeriesSet() SeriesSet { return emptySeriesSet{} } -// MergeSeriesSets returns a new series set that is the union of the input sets. +// MergeSeriesSets takes all series sets and returns as a union single series set. +// It assumes series are sorted by labels within single SeriesSet, similar to remote read guarantees. +// However, they can be partial: in such case, if the single SeriesSet returns the same series within many iterations, +// MergeSeriesSets will merge those into one. +// +// It also assumes in a "best effort" way that chunks are sorted by min time. It's done as an optimization only, so if input +// series' chunks are NOT sorted, the only consequence is that the duplicates might be not correctly removed. This is double checked +// which on just-before PromQL level as well, so the only consequence is increased network bandwidth. +// If all chunks were sorted, MergeSeriesSet ALSO returns sorted chunks by min time. +// +// Chunks within the same series can also overlap (within all SeriesSet +// as well as single SeriesSet alone). If the chunk ranges overlap, the *exact* chunk duplicates will be removed +// (except one), and any other overlaps will be appended into on chunks slice. func MergeSeriesSets(all ...SeriesSet) SeriesSet { switch len(all) { case 0: return emptySeriesSet{} case 1: - return all[0] + return newUniqueSeriesSet(all[0]) } h := len(all) / 2 @@ -106,11 +119,6 @@ type mergedSeriesSet struct { adone, bdone bool } -// newMergedSeriesSet takes two series sets as a single series set. -// Series that occur in both sets should have disjoint time ranges. -// If the ranges overlap b samples are appended to a samples. -// If the single SeriesSet returns same series within many iterations, -// merge series set will not try to merge those. func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet { s := &mergedSeriesSet{a: a, b: b} // Initialize first elements of both sets as Next() needs @@ -150,33 +158,121 @@ func (s *mergedSeriesSet) Next() bool { } d := s.compare() - - // Both sets contain the current series. Chain them into a single one. if d > 0 { s.lset, s.chunks = s.b.At() s.bdone = !s.b.Next() - } else if d < 0 { + return true + } + if d < 0 { s.lset, s.chunks = s.a.At() s.adone = !s.a.Next() - } else { - // Concatenate chunks from both series sets. They may be expected of order - // w.r.t to their time range. This must be accounted for later. - lset, chksA := s.a.At() - _, chksB := s.b.At() - - s.lset = lset - // Slice reuse is not generally safe with nested merge iterators. - // We err on the safe side an create a new slice. - s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB)) - s.chunks = append(s.chunks, chksA...) - s.chunks = append(s.chunks, chksB...) + return true + } - s.adone = !s.a.Next() - s.bdone = !s.b.Next() + // Both a and b contains the same series. Go through all chunks, remove duplicates and concatenate chunks from both + // series sets. We best effortly assume chunks are sorted by min time. If not, we will not detect all deduplicate which will + // be account on select layer anyway. We do it still for early optimization. + lset, chksA := s.a.At() + _, chksB := s.b.At() + s.lset = lset + + // Slice reuse is not generally safe with nested merge iterators. + // We err on the safe side an create a new slice. + s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB)) + + b := 0 +Outer: + for a := range chksA { + for { + if b >= len(chksB) { + // No more b chunks. + s.chunks = append(s.chunks, chksA[a:]...) + break Outer + } + + if chksA[a].MinTime < chksB[b].MinTime { + s.chunks = append(s.chunks, chksA[a]) + break + } + + if chksA[a].MinTime > chksB[b].MinTime { + s.chunks = append(s.chunks, chksB[b]) + b++ + continue + } + + // TODO(bwplotka): This is expensive. + //fmt.Println("check strings") + if strings.Compare(chksA[a].String(), chksB[b].String()) == 0 { + // Exact duplicated chunks, discard one from b. + b++ + continue + } + + // Same min Time, but not duplicate, so it does not matter. Take b (since lower for loop). + s.chunks = append(s.chunks, chksB[b]) + b++ + } } + + if b < len(chksB) { + s.chunks = append(s.chunks, chksB[b:]...) + } + + s.adone = !s.a.Next() + s.bdone = !s.b.Next() return true } +// uniqueSeriesSet takes one series set and ensures each iteration contains single, full series. +type uniqueSeriesSet struct { + SeriesSet + done bool + + peek *Series + + lset []Label + chunks []AggrChunk +} + +func newUniqueSeriesSet(wrapped SeriesSet) *uniqueSeriesSet { + return &uniqueSeriesSet{SeriesSet: wrapped} +} + +func (s *uniqueSeriesSet) At() ([]Label, []AggrChunk) { + return s.lset, s.chunks +} + +func (s *uniqueSeriesSet) Next() bool { + if s.Err() != nil { + return false + } + + for !s.done { + if s.done = !s.SeriesSet.Next(); s.done { + break + } + lset, chks := s.SeriesSet.At() + if s.peek == nil { + s.peek = &Series{Labels: lset, Chunks: chks} + continue + } + + if CompareLabels(lset, s.peek.Labels) != 0 { + s.lset, s.chunks = s.peek.Labels, s.peek.Chunks + s.peek = &Series{Labels: lset, Chunks: chks} + return true + } + } + + if s.peek != nil { + s.lset, s.chunks = s.peek.Labels, s.peek.Chunks + s.peek = nil + return true + } + return false +} + // LabelsToPromLabels converts Thanos proto labels to Prometheus labels in type safe manner. func LabelsToPromLabels(lset []Label) labels.Labels { ret := make(labels.Labels, len(lset))