diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index c8c03870014..b1013ba8d26 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -642,6 +642,7 @@ func (it *ApplyCounterResetsSeriesIterator) At() (t int64, v float64) { } func (it *ApplyCounterResetsSeriesIterator) Seek(x int64) bool { + // Don't use underlying Seek, but iterate over next to not miss counter resets. for { if t, _ := it.At(); t >= x { return true diff --git a/pkg/query/iter.go b/pkg/query/iter.go index a42e471d797..19ffdc239f4 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -477,56 +477,170 @@ func (s *dedupSeries) Labels() labels.Labels { return s.lset } -func (s *dedupSeries) Iterator() (it storage.SeriesIterator) { - it = s.replicas[0].Iterator() +func (s *dedupSeries) Iterator() storage.SeriesIterator { + var it extendedSeriesIterator = &seriesIteratorToExtendedAdapter{SeriesIterator: s.replicas[0].Iterator()} + for _, o := range s.replicas[1:] { - it = newDedupSeriesIterator(it, o.Iterator()) + var replicaIter extendedSeriesIterator + if s.isCounter { + replicaIter = &counterErrAdjustSeriesIterator{SeriesIterator: o.Iterator()} + } else { + replicaIter = &seriesIteratorToExtendedAdapter{SeriesIterator: o.Iterator()} + } + + it = newDedupSeriesIterator(it, replicaIter) } - if s.isCounter { - return newCounterDedupAdjustSeriesIterator(it) + return &extendedToSeriesIteratorAdapter{extendedSeriesIterator: it} +} + +// extendedSeriesIterator iterates over the data of a time series. +type extendedSeriesIterator interface { + // Seek advances the iterator forward to the value at or after + // the given timestamp. It allows passing lastValue assessed by consumer, so implementation handling counters can + // adjust for potential counter error. + Seek(t int64, lastValue float64) bool + // At returns the current timestamp/value pair. + At() (t int64, v float64) + // Next advances the iterator by one. It allows passing lastValue assessed by consumer, so implementation handling counters can + // // adjust for potential counter error. + Next(lastValue float64) bool + // Err returns the current error. + Err() error +} + +type seriesIteratorToExtendedAdapter struct { + storage.SeriesIterator +} + +func (it *seriesIteratorToExtendedAdapter) Seek(t int64, _ float64) bool { + return it.SeriesIterator.Seek(t) +} + +func (it *seriesIteratorToExtendedAdapter) Next(float64) bool { + return it.SeriesIterator.Next() +} + +type extendedToSeriesIteratorAdapter struct { + extendedSeriesIterator +} + +func (it *extendedToSeriesIteratorAdapter) Seek(t int64) bool { + return it.extendedSeriesIterator.Seek(t, float64(math.MinInt64)) +} + +func (it *extendedToSeriesIteratorAdapter) Next() bool { + return it.extendedSeriesIterator.Next(float64(math.MinInt64)) +} + +// counterErrAdjustSeriesIterator is extendedSeriesIterator used when we deduplicate counter. +// It makes sure we always adjust for the latest seen last counter value for all replicas. +// Let's consider following example: +// +// Replica 1 counter scrapes: 20 30 40 Nan - 0 5 +// Replica 2 counter scrapes: 25 35 45 Nan - 2 +// +// Now for downsampling purposes we are accounting the resets so our replicas before going to dedup iterator looks like this: +// +// Replica 1 counter total: 20 30 40 - - 40 45 +// Replica 2 counter total: 25 35 45 - - 47 +// +// Now if at any point we will switch our focus from replica 2 to replica 1 we will experience lower value than previous, +// which will trigger false positive counter reset in PromQL. +// +// We mitigate this by taking always adjusting for the "behind" replica value to be not smaller than highest sample seen. +// This is also what is closest to the truth (last seen counter value on this target). +// +// This short-term solution to mitigate https://github.com/thanos-io/thanos/issues/2401. +// TODO(bwplotka): Find better deduplication algorithm that does not require knowledge if the given +// series is counter or not: https://github.com/thanos-io/thanos/issues/2547. +type counterErrAdjustSeriesIterator struct { + storage.SeriesIterator + + errAdjust float64 +} + +func (it *counterErrAdjustSeriesIterator) Next(lastValue float64) bool { + if it.SeriesIterator.Next() { + // Get current value with the current error adjust applied. + _, v := it.At() + if lastValue > v { + it.errAdjust += lastValue - v + } + return true + } + return false +} + +func (it *counterErrAdjustSeriesIterator) Seek(t int64, lastValue float64) bool { + if it.SeriesIterator.Seek(t) { + // Get current value with the current error adjust applied. + _, v := it.At() + if lastValue > v { + it.errAdjust += lastValue - v + } + return true } - return it + return false +} + +func (it *counterErrAdjustSeriesIterator) At() (int64, float64) { + t, v := it.SeriesIterator.At() + return t, v + it.errAdjust } type dedupSeriesIterator struct { - a, b storage.SeriesIterator + a, b extendedSeriesIterator + + aok, bok bool + + // TODO(bwplotka): Don't base on LastT, but on detected scrape interval. This will allow us to be more + // responsive to gaps: https://github.com/thanos-io/thanos/issues/981, let's do it in next PR. + lastT int64 + lastV float64 - aok, bok bool - lastT int64 penA, penB int64 useA bool } -func newDedupSeriesIterator(a, b storage.SeriesIterator) *dedupSeriesIterator { +func newDedupSeriesIterator(a, b extendedSeriesIterator) *dedupSeriesIterator { + lastV := float64(math.MinInt64) return &dedupSeriesIterator{ a: a, b: b, lastT: math.MinInt64, - aok: a.Next(), - bok: b.Next(), + lastV: lastV, + aok: a.Next(lastV), + bok: b.Next(lastV), } } -func (it *dedupSeriesIterator) Next() bool { +func (it *dedupSeriesIterator) Next(lastValue float64) bool { + // This dedup iterator can be deduplicated with yet another replica. Make sure we adapt to the biggest last Value + // seen across all replicas. This is used only if underlying implementations are on counters. + if lastValue > it.lastV { + it.lastV = lastValue + } + // Advance both iterators to at least the next highest timestamp plus the potential penalty. if it.aok { - it.aok = it.a.Seek(it.lastT + 1 + it.penA) + it.aok = it.a.Seek(it.lastT+1+it.penA, it.lastV) } if it.bok { - it.bok = it.b.Seek(it.lastT + 1 + it.penB) + it.bok = it.b.Seek(it.lastT+1+it.penB, it.lastV) } + // Handle basic cases where one iterator is exhausted before the other. if !it.aok { it.useA = false if it.bok { - it.lastT, _ = it.b.At() + it.lastT, it.lastV = it.b.At() it.penB = 0 } return it.bok } if !it.bok { it.useA = true - it.lastT, _ = it.a.At() + it.lastT, it.lastV = it.a.At() it.penA = 0 return true } @@ -534,8 +648,8 @@ func (it *dedupSeriesIterator) Next() bool { // with the smaller timestamp. // The applied penalty potentially already skipped potential samples already // that would have resulted in exaggerated sampling frequency. - ta, _ := it.a.At() - tb, _ := it.b.At() + ta, va := it.a.At() + tb, vb := it.b.At() it.useA = ta <= tb @@ -546,35 +660,38 @@ func (it *dedupSeriesIterator) Next() bool { // timestamp assignment. // If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge // that timestamps are in milliseconds and sampling frequencies typically multiple seconds long. - const initialPenality = 5000 + const initialPenalty = 5000 if it.useA { if it.lastT != math.MinInt64 { it.penB = 2 * (ta - it.lastT) } else { - it.penB = initialPenality + it.penB = initialPenalty } it.penA = 0 it.lastT = ta + it.lastV = va return true } if it.lastT != math.MinInt64 { it.penA = 2 * (tb - it.lastT) } else { - it.penA = initialPenality + it.penA = initialPenalty } it.penB = 0 it.lastT = tb + it.lastV = vb return true } -func (it *dedupSeriesIterator) Seek(t int64) bool { +func (it *dedupSeriesIterator) Seek(t int64, lastValue float64) bool { + // Don't use underlying Seek, but iterate over next to not miss gaps. for { ts, _ := it.At() if ts > 0 && ts >= t { return true } - if !it.Next() { + if !it.Next(lastValue) { return false } } @@ -593,56 +710,3 @@ func (it *dedupSeriesIterator) Err() error { } return it.b.Err() } - -// counterDedupAdjustSeriesIterator is used when we deduplicate counter. -// It makes sure we always adjust for the latest seen last counter value for all replicas. -// Let's consider following example: -// -// Replica 1 counter scrapes: 20 30 40 Nan - 0 5 -// Replica 2 counter scrapes: 25 35 45 Nan - 2 -// -// Now for downsampling purposes we are accounting the resets so our replicas before going to dedup iterator looks like this: -// -// Replica 1 counter total: 20 30 40 - - 40 45 -// Replica 2 counter total: 25 35 45 - - 47 -// -// Now if at any point we will switch our focus from replica 2 to replica 1 we will experience lower value than previous, -// which will trigger false positive counter reset in PromQL. -// -// We mitigate this by taking always adjusting for the "behind" replica value to be not smaller than highest sample seen. -// This is also what is closest to the truth (last seen counter value on this target). -// -// This short-term solution to mitigate https://github.com/thanos-io/thanos/issues/2401. -// TODO(bwplotka): Find better deduplication algorithm that does not require knowledge if the given -// series is counter or not: https://github.com/thanos-io/thanos/issues/2547. -type counterDedupAdjustSeriesIterator struct { - storage.SeriesIterator - - lastV float64 - adjust float64 -} - -func newCounterDedupAdjustSeriesIterator(iter storage.SeriesIterator) storage.SeriesIterator { - return &counterDedupAdjustSeriesIterator{ - SeriesIterator: iter, - lastV: -1 * math.MaxFloat64, - } - -} - -func (it *counterDedupAdjustSeriesIterator) Next() bool { - if it.SeriesIterator.Next() { - _, v := it.SeriesIterator.At() - if it.lastV > v { - it.adjust += it.lastV - v - } - it.lastV = v - return true - } - return false -} - -func (it *counterDedupAdjustSeriesIterator) At() (int64, float64) { - t, v := it.SeriesIterator.At() - return t, v + it.adjust -} diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 873fa72e1df..f3ef4fb4242 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -1104,21 +1104,30 @@ func TestDedupSeriesSet(t *testing.T) { {10000, 8.0}, // Smaller timestamp, this will be chosen. CurrValue = 8.0. {20000, 9.0}, // Same. CurrValue = 9.0. // {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator. - {50001, 9 + 1.0}}, // Next after 20000+1 has a bit higher than timestamp then in second series. Penalty 5000 will be added. + {50001, 9 + 1.0}, // Next after 20000+1 has a bit higher than timestamp then in second series. Penalty 5000 will be added. + {60000, 9 + 2.0}, + {70000, 9 + 3.0}, + {80000, 9 + 4.0}, + {90000, 9 + 5.0}, // This should be now taken, and we expect 14 to be correct value now. + {100000, 9 + 6.0}, + }, }, { lset: labels.Labels{{Name: "replica", Value: "02"}}, samples: []sample{ {10001, 8.0}, // Penalty 5000 will be added. // 20001 was app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator. Penalty 2 * (20000 - 10000) will be added. // 30001 no sample. Within penalty, ignored. - {45001, 8.0 + 0.5}, // Smaller timestamp, this will be chosen. CurrValue = 8.5 which is smaller than last chosen value. + {45001, 8 + 0.5}, // Smaller timestamp, this will be chosen. CurrValue = 8.5 which is smaller than last chosen value. + {55001, 8 + 1.5}, + {65001, 8 + 2.5}, + // {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator. }, }, }, exp: []series{ { lset: labels.Labels{}, - samples: []sample{{10000, 8}, {20000, 9}, {45001, 9}}, + samples: []sample{{10000, 8}, {20000, 9}, {45001, 9}, {55001, 10}, {65001, 11}, {90000, 14}, {100000, 15}}, }, }, dedupLabels: map[string]struct{}{ @@ -1253,10 +1262,10 @@ func TestDedupSeriesIterator(t *testing.T) { for i, c := range cases { t.Logf("case %d:", i) it := newDedupSeriesIterator( - newMockedSeriesIterator(c.a), - newMockedSeriesIterator(c.b), + &seriesIteratorToExtendedAdapter{newMockedSeriesIterator(c.a)}, + &seriesIteratorToExtendedAdapter{newMockedSeriesIterator(c.b)}, ) - res := expandSeries(t, it) + res := expandSeries(t, &extendedToSeriesIteratorAdapter{it}) testutil.Equals(t, c.exp, res) } } @@ -1264,13 +1273,13 @@ func TestDedupSeriesIterator(t *testing.T) { func BenchmarkDedupSeriesIterator(b *testing.B) { run := func(b *testing.B, s1, s2 []sample) { it := newDedupSeriesIterator( - newMockedSeriesIterator(s1), - newMockedSeriesIterator(s2), + &seriesIteratorToExtendedAdapter{newMockedSeriesIterator(s1)}, + &seriesIteratorToExtendedAdapter{newMockedSeriesIterator(s2)}, ) b.ResetTimer() var total int64 - for it.Next() { + for it.Next(float64(math.MinInt64)) { t, _ := it.At() total += t }