Skip to content

Commit

Permalink
Moved err adjust to be per replica, not per already deduped value.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed May 12, 2020
1 parent 8d5b0a2 commit 34859f1
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 78 deletions.
1 change: 1 addition & 0 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ func (it *ApplyCounterResetsSeriesIterator) At() (t int64, v float64) {
}

func (it *ApplyCounterResetsSeriesIterator) Seek(x int64) bool {
// Don't use underlying Seek, but iterate over next to not miss counter resets.
for {
if t, _ := it.At(); t >= x {
return true
Expand Down
181 changes: 111 additions & 70 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,65 +477,147 @@ func (s *dedupSeries) Labels() labels.Labels {
return s.lset
}

func (s *dedupSeries) Iterator() (it storage.SeriesIterator) {
it = s.replicas[0].Iterator()
for _, o := range s.replicas[1:] {
it = newDedupSeriesIterator(it, o.Iterator())
}
func (s *dedupSeries) Iterator() storage.SeriesIterator {
var it adjustableSeriesIterator
if s.isCounter {
return newCounterDedupAdjustSeriesIterator(it)
it = &counterErrAdjustSeriesIterator{SeriesIterator: s.replicas[0].Iterator()}
} else {
it = noopAdjustableSeriesIterator{SeriesIterator: s.replicas[0].Iterator()}
}

for _, o := range s.replicas[1:] {
var replicaIter adjustableSeriesIterator
if s.isCounter {
replicaIter = &counterErrAdjustSeriesIterator{SeriesIterator: o.Iterator()}
} else {
replicaIter = noopAdjustableSeriesIterator{SeriesIterator: o.Iterator()}
}
it = newDedupSeriesIterator(it, replicaIter)
}
return it
}

// adjustableSeriesIterator iterates over the data of a time series and allows to adjust current value based on
// given lastValue iterated.
type adjustableSeriesIterator interface {
storage.SeriesIterator

// adjustAtValue allows to adjust value by implementation if needed knowing the last value. This is used by counter
// implementation which can adjust for obsolete counter value.
adjustAtValue(lastValue float64)
}

type noopAdjustableSeriesIterator struct {
storage.SeriesIterator
}

func (it noopAdjustableSeriesIterator) adjustAtValue(float64) {}

// counterErrAdjustSeriesIterator is extendedSeriesIterator used when we deduplicate counter.
// It makes sure we always adjust for the latest seen last counter value for all replicas.
// Let's consider following example:
//
// Replica 1 counter scrapes: 20 30 40 Nan - 0 5
// Replica 2 counter scrapes: 25 35 45 Nan - 2
//
// Now for downsampling purposes we are accounting the resets so our replicas before going to dedup iterator looks like this:
//
// Replica 1 counter total: 20 30 40 - - 40 45
// Replica 2 counter total: 25 35 45 - - 47
//
// Now if at any point we will switch our focus from replica 2 to replica 1 we will experience lower value than previous,
// which will trigger false positive counter reset in PromQL.
//
// We mitigate this by taking allowing invoking AdjustAtValue which adjust the value in case of last value being larger than current at.
// (Counter cannot go down)
//
// This is to mitigate https://github.com/thanos-io/thanos/issues/2401.
// TODO(bwplotka): Find better deduplication algorithm that does not require knowledge if the given
// series is counter or not: https://github.com/thanos-io/thanos/issues/2547.
type counterErrAdjustSeriesIterator struct {
storage.SeriesIterator

errAdjust float64
}

func (it *counterErrAdjustSeriesIterator) adjustAtValue(lastValue float64) {
_, v := it.At()
if lastValue > v {
// This replica has obsolete value (did not see the correct "end" of counter value before app restart). Adjust.
it.errAdjust += lastValue - v
}
}

func (it *counterErrAdjustSeriesIterator) At() (int64, float64) {
t, v := it.SeriesIterator.At()
return t, v + it.errAdjust
}

type dedupSeriesIterator struct {
a, b storage.SeriesIterator
a, b adjustableSeriesIterator

aok, bok bool

// TODO(bwplotka): Don't base on LastT, but on detected scrape interval. This will allow us to be more
// responsive to gaps: https://github.com/thanos-io/thanos/issues/981, let's do it in next PR.
lastT int64
lastV float64

aok, bok bool
lastT int64
penA, penB int64
useA bool
}

func newDedupSeriesIterator(a, b storage.SeriesIterator) *dedupSeriesIterator {
func newDedupSeriesIterator(a, b adjustableSeriesIterator) *dedupSeriesIterator {
return &dedupSeriesIterator{
a: a,
b: b,
lastT: math.MinInt64,
lastV: float64(math.MinInt64),
aok: a.Next(),
bok: b.Next(),
}
}

func (it *dedupSeriesIterator) Next() bool {
lastValue := it.lastV
lastUseA := it.useA
defer func() {
if it.useA != lastUseA {
// We switched replicas.
// Ensure values are correct bases on value before At.
it.adjustAtValue(lastValue)
}
}()

// Advance both iterators to at least the next highest timestamp plus the potential penalty.
if it.aok {
it.aok = it.a.Seek(it.lastT + 1 + it.penA)
}
if it.bok {
it.bok = it.b.Seek(it.lastT + 1 + it.penB)
}

// Handle basic cases where one iterator is exhausted before the other.
if !it.aok {
it.useA = false
if it.bok {
it.lastT, _ = it.b.At()
it.lastT, it.lastV = it.b.At()
it.penB = 0
}
return it.bok
}
if !it.bok {
it.useA = true
it.lastT, _ = it.a.At()
it.lastT, it.lastV = it.a.At()
it.penA = 0
return true
}
// General case where both iterators still have data. We pick the one
// with the smaller timestamp.
// The applied penalty potentially already skipped potential samples already
// that would have resulted in exaggerated sampling frequency.
ta, _ := it.a.At()
tb, _ := it.b.At()
ta, va := it.a.At()
tb, vb := it.b.At()

it.useA = ta <= tb

Expand All @@ -546,29 +628,41 @@ func (it *dedupSeriesIterator) Next() bool {
// timestamp assignment.
// If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
const initialPenality = 5000
const initialPenalty = 5000

if it.useA {
if it.lastT != math.MinInt64 {
it.penB = 2 * (ta - it.lastT)
} else {
it.penB = initialPenality
it.penB = initialPenalty
}
it.penA = 0
it.lastT = ta
it.lastV = va
return true
}
if it.lastT != math.MinInt64 {
it.penA = 2 * (tb - it.lastT)
} else {
it.penA = initialPenality
it.penA = initialPenalty
}
it.penB = 0
it.lastT = tb
it.lastV = vb
return true
}

func (it *dedupSeriesIterator) adjustAtValue(lastValue float64) {
if it.aok {
it.a.adjustAtValue(lastValue)
}
if it.bok {
it.b.adjustAtValue(lastValue)
}
}

func (it *dedupSeriesIterator) Seek(t int64) bool {
// Don't use underlying Seek, but iterate over next to not miss gaps.
for {
ts, _ := it.At()
if ts > 0 && ts >= t {
Expand All @@ -593,56 +687,3 @@ func (it *dedupSeriesIterator) Err() error {
}
return it.b.Err()
}

// counterDedupAdjustSeriesIterator is used when we deduplicate counter.
// It makes sure we always adjust for the latest seen last counter value for all replicas.
// Let's consider following example:
//
// Replica 1 counter scrapes: 20 30 40 Nan - 0 5
// Replica 2 counter scrapes: 25 35 45 Nan - 2
//
// Now for downsampling purposes we are accounting the resets so our replicas before going to dedup iterator looks like this:
//
// Replica 1 counter total: 20 30 40 - - 40 45
// Replica 2 counter total: 25 35 45 - - 47
//
// Now if at any point we will switch our focus from replica 2 to replica 1 we will experience lower value than previous,
// which will trigger false positive counter reset in PromQL.
//
// We mitigate this by taking always adjusting for the "behind" replica value to be not smaller than highest sample seen.
// This is also what is closest to the truth (last seen counter value on this target).
//
// This short-term solution to mitigate https://github.com/thanos-io/thanos/issues/2401.
// TODO(bwplotka): Find better deduplication algorithm that does not require knowledge if the given
// series is counter or not: https://github.com/thanos-io/thanos/issues/2547.
type counterDedupAdjustSeriesIterator struct {
storage.SeriesIterator

lastV float64
adjust float64
}

func newCounterDedupAdjustSeriesIterator(iter storage.SeriesIterator) storage.SeriesIterator {
return &counterDedupAdjustSeriesIterator{
SeriesIterator: iter,
lastV: -1 * math.MaxFloat64,
}

}

func (it *counterDedupAdjustSeriesIterator) Next() bool {
if it.SeriesIterator.Next() {
_, v := it.SeriesIterator.At()
if it.lastV > v {
it.adjust += it.lastV - v
}
it.lastV = v
return true
}
return false
}

func (it *counterDedupAdjustSeriesIterator) At() (int64, float64) {
t, v := it.SeriesIterator.At()
return t, v + it.adjust
}
49 changes: 41 additions & 8 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,27 +1104,60 @@ func TestDedupSeriesSet(t *testing.T) {
{10000, 8.0}, // Smaller timestamp, this will be chosen. CurrValue = 8.0.
{20000, 9.0}, // Same. CurrValue = 9.0.
// {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator.
{50001, 9 + 1.0}}, // Next after 20000+1 has a bit higher than timestamp then in second series. Penalty 5000 will be added.
{50001, 9 + 1.0}, // Next after 20000+1 has a bit higher than timestamp then in second series. Penalty 5000 will be added.
{60000, 9 + 2.0},
{70000, 9 + 3.0},
{80000, 9 + 4.0},
{90000, 9 + 5.0}, // This should be now taken, and we expect 14 to be correct value now.
{100000, 9 + 6.0},
},
}, {
lset: labels.Labels{{Name: "replica", Value: "02"}},
samples: []sample{
{10001, 8.0}, // Penalty 5000 will be added.
// 20001 was app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator. Penalty 2 * (20000 - 10000) will be added.
// 30001 no sample. Within penalty, ignored.
{45001, 8.0 + 0.5}, // Smaller timestamp, this will be chosen. CurrValue = 8.5 which is smaller than last chosen value.
{45001, 8 + 0.5}, // Smaller timestamp, this will be chosen. CurrValue = 8.5 which is smaller than last chosen value.
{55001, 8 + 1.5},
{65001, 8 + 2.5},
// {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator.
},
},
},
exp: []series{
{
lset: labels.Labels{},
samples: []sample{{10000, 8}, {20000, 9}, {45001, 9}},
samples: []sample{{10000, 8}, {20000, 9}, {45001, 9}, {55001, 10}, {65001, 11}, {90000, 14}, {100000, 15}},
},
},
dedupLabels: map[string]struct{}{
"replica": {},
},
},
{
// Same thing but not for counter should not adjust antything.
isCounter: false,
input: []series{
{
lset: labels.Labels{{Name: "replica", Value: "01"}},
samples: []sample{
{10000, 8.0}, {20000, 9.0}, {50001, 9 + 1.0}, {60000, 9 + 2.0}, {70000, 9 + 3.0}, {80000, 9 + 4.0}, {90000, 9 + 5.0}, {100000, 9 + 6.0},
},
}, {
lset: labels.Labels{{Name: "replica", Value: "02"}},
samples: []sample{
{10001, 8.0}, {45001, 8 + 0.5}, {55001, 8 + 1.5}, {65001, 8 + 2.5},
},
},
},
exp: []series{
{
lset: labels.Labels{},
samples: []sample{{10000, 8}, {20000, 9}, {45001, 8.5}, {55001, 9.5}, {65001, 10.5}, {90000, 14}, {100000, 15}},
},
},
dedupLabels: map[string]struct{}{"replica": {}},
},
{
// Regression test on real data against https://github.com/thanos-io/thanos/issues/2401.
// Real data with stale marker after downsample.CounterSeriesIterator (required for downsampling + rate).
Expand Down Expand Up @@ -1253,19 +1286,19 @@ func TestDedupSeriesIterator(t *testing.T) {
for i, c := range cases {
t.Logf("case %d:", i)
it := newDedupSeriesIterator(
newMockedSeriesIterator(c.a),
newMockedSeriesIterator(c.b),
noopAdjustableSeriesIterator{newMockedSeriesIterator(c.a)},
noopAdjustableSeriesIterator{newMockedSeriesIterator(c.b)},
)
res := expandSeries(t, it)
res := expandSeries(t, noopAdjustableSeriesIterator{it})
testutil.Equals(t, c.exp, res)
}
}

func BenchmarkDedupSeriesIterator(b *testing.B) {
run := func(b *testing.B, s1, s2 []sample) {
it := newDedupSeriesIterator(
newMockedSeriesIterator(s1),
newMockedSeriesIterator(s2),
noopAdjustableSeriesIterator{newMockedSeriesIterator(s1)},
noopAdjustableSeriesIterator{newMockedSeriesIterator(s2)},
)
b.ResetTimer()
var total int64
Expand Down

0 comments on commit 34859f1

Please sign in to comment.