Skip to content

Commit

Permalink
Optimized algorithm to combine series only on start.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed May 15, 2020
1 parent 5bfcbe9 commit da3c83e
Showing 1 changed file with 121 additions and 25 deletions.
146 changes: 121 additions & 25 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse {
}

// CompareLabels compares two sets of labels.
// After lexicographical order, the set with fewer labels comes first.
func CompareLabels(a, b []Label) int {
l := len(a)
if len(b) < l {
Expand All @@ -58,7 +59,7 @@ func CompareLabels(a, b []Label) int {
return d
}
}
// If all labels so far were in common, the set with fewer labels comes first.

return len(a) - len(b)
}

Expand All @@ -73,13 +74,25 @@ func EmptySeriesSet() SeriesSet {
return emptySeriesSet{}
}

// MergeSeriesSets returns a new series set that is the union of the input sets.
// MergeSeriesSets takes all series sets and returns as a union single series set.
// It assumes series are sorted by labels within single SeriesSet, similar to remote read guarantees.
// However, they can be partial: in such case, if the single SeriesSet returns the same series within many iterations,
// MergeSeriesSets will merge those into one.
//
// It also assumes in a "best effort" way that chunks are sorted by min time. It's done as an optimization only, so if input
// series' chunks are NOT sorted, the only consequence is that the duplicates might be not correctly removed. This is double checked
// which on just-before PromQL level as well, so the only consequence is increased network bandwidth.
// If all chunks were sorted, MergeSeriesSet ALSO returns sorted chunks by min time.
//
// Chunks within the same series can also overlap (within all SeriesSet
// as well as single SeriesSet alone). If the chunk ranges overlap, the *exact* chunk duplicates will be removed
// (except one), and any other overlaps will be appended into on chunks slice.
func MergeSeriesSets(all ...SeriesSet) SeriesSet {
switch len(all) {
case 0:
return emptySeriesSet{}
case 1:
return all[0]
return newUniqueSeriesSet(all[0])
}
h := len(all) / 2

Expand All @@ -106,11 +119,6 @@ type mergedSeriesSet struct {
adone, bdone bool
}

// newMergedSeriesSet takes two series sets as a single series set.
// Series that occur in both sets should have disjoint time ranges.
// If the ranges overlap b samples are appended to a samples.
// If the single SeriesSet returns same series within many iterations,
// merge series set will not try to merge those.
func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet {
s := &mergedSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs
Expand Down Expand Up @@ -150,33 +158,121 @@ func (s *mergedSeriesSet) Next() bool {
}

d := s.compare()

// Both sets contain the current series. Chain them into a single one.
if d > 0 {
s.lset, s.chunks = s.b.At()
s.bdone = !s.b.Next()
} else if d < 0 {
return true
}
if d < 0 {
s.lset, s.chunks = s.a.At()
s.adone = !s.a.Next()
} else {
// Concatenate chunks from both series sets. They may be expected of order
// w.r.t to their time range. This must be accounted for later.
lset, chksA := s.a.At()
_, chksB := s.b.At()

s.lset = lset
// Slice reuse is not generally safe with nested merge iterators.
// We err on the safe side an create a new slice.
s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB))
s.chunks = append(s.chunks, chksA...)
s.chunks = append(s.chunks, chksB...)
return true
}

s.adone = !s.a.Next()
s.bdone = !s.b.Next()
// Both a and b contains the same series. Go through all chunks, remove duplicates and concatenate chunks from both
// series sets. We best effortly assume chunks are sorted by min time. If not, we will not detect all deduplicate which will
// be account on select layer anyway. We do it still for early optimization.
lset, chksA := s.a.At()
_, chksB := s.b.At()
s.lset = lset

// Slice reuse is not generally safe with nested merge iterators.
// We err on the safe side an create a new slice.
s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB))

b := 0
Outer:
for a := range chksA {
for {
if b >= len(chksB) {
// No more b chunks.
s.chunks = append(s.chunks, chksA[a:]...)
break Outer
}

if chksA[a].MinTime < chksB[b].MinTime {
s.chunks = append(s.chunks, chksA[a])
break
}

if chksA[a].MinTime > chksB[b].MinTime {
s.chunks = append(s.chunks, chksB[b])
b++
continue
}

// TODO(bwplotka): This is expensive.
//fmt.Println("check strings")
if strings.Compare(chksA[a].String(), chksB[b].String()) == 0 {
// Exact duplicated chunks, discard one from b.
b++
continue
}

// Same min Time, but not duplicate, so it does not matter. Take b (since lower for loop).
s.chunks = append(s.chunks, chksB[b])
b++
}
}

if b < len(chksB) {
s.chunks = append(s.chunks, chksB[b:]...)
}

s.adone = !s.a.Next()
s.bdone = !s.b.Next()
return true
}

// uniqueSeriesSet takes one series set and ensures each iteration contains single, full series.
type uniqueSeriesSet struct {
SeriesSet
done bool

peek *Series

lset []Label
chunks []AggrChunk
}

func newUniqueSeriesSet(wrapped SeriesSet) *uniqueSeriesSet {
return &uniqueSeriesSet{SeriesSet: wrapped}
}

func (s *uniqueSeriesSet) At() ([]Label, []AggrChunk) {
return s.lset, s.chunks
}

func (s *uniqueSeriesSet) Next() bool {
if s.Err() != nil {
return false
}

for !s.done {
if s.done = !s.SeriesSet.Next(); s.done {
break
}
lset, chks := s.SeriesSet.At()
if s.peek == nil {
s.peek = &Series{Labels: lset, Chunks: chks}
continue
}

if CompareLabels(lset, s.peek.Labels) != 0 {
s.lset, s.chunks = s.peek.Labels, s.peek.Chunks
s.peek = &Series{Labels: lset, Chunks: chks}
return true
}
}

if s.peek != nil {
s.lset, s.chunks = s.peek.Labels, s.peek.Chunks
s.peek = nil
return true
}
return false
}

// LabelsToPromLabels converts Thanos proto labels to Prometheus labels in type safe manner.
func LabelsToPromLabels(lset []Label) labels.Labels {
ret := make(labels.Labels, len(lset))
Expand Down

0 comments on commit da3c83e

Please sign in to comment.