From 3debaeba1a0ec2eee771b801d887c1597dcf0ced Mon Sep 17 00:00:00 2001 From: Alfred Landrum Date: Sat, 9 Nov 2019 15:47:42 +0100 Subject: [PATCH] store the first raw value of a chunk during downsampling (#1709) * store the first raw value of a chunk during downsampling As discussed in #1568, storing only the last raw value of a chunk will lose a counter reset when: a) the reset occurs at a chunk boundary, and b) the last raw value of the earlier chunk is less than the first aggregated value of the later chunk. This commit stores the first raw value of a chunk during the initial raw aggregation, and retains it during subsequent aggregations. This is similar to the existing handling for the last raw value of a chunk. With this change, when counterSeriesIterator iterates over a chunk boundary, it will see the last raw value of the earlier chunk, then the first raw value of the later chunk, and then the first aggregated value of the later chunk. The first raw value will always be less than or equal to the first aggregated value, so the only difference in counterSeriesIterator's output will be the possible detection of a reset and an extra sample after the chunk boundary. Fixes: https://github.com/thanos-io/thanos/issues/1568 Signed-off-by: Alfred Landrum * changelog for #1709 Signed-off-by: Alfred Landrum * adjust existing downsampling tests Signed-off-by: Alfred Landrum * add counter aggregation comments to CounterSeriesIterator Signed-off-by: Alfred Landrum --- CHANGELOG.md | 1 + pkg/compact/downsample/downsample.go | 77 +++++++++----- pkg/compact/downsample/downsample_test.go | 122 +++++++++++++++++++++- 3 files changed, 169 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b8f4785457..8945f3425a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1656](https://github.com/thanos-io/thanos/pull/1656) Thanos Store now starts metric and status probe HTTP server earlier in its start-up sequence. `/-/healthy` endpoint now starts to respond with success earlier. `/metrics` endpoint starts serving metrics earlier as well. Make sure to point your readiness probes to the `/-/ready` endpoint rather than `/metrics`. - [#1669](https://github.com/thanos-io/thanos/pull/1669) Fixed store sharding. Now it does not load excluded meta.jsons and load/fetch index-cache.json files. - [#1670](https://github.com/thanos-io/thanos/pull/1670) Fixed un-ordered blocks upload. Sidecar now uploads the oldest blocks first. +- [#1568](https://github.com/thanos-io/thanos/pull/1709) Thanos Store now retains the first raw value of a chunk during downsampling to avoid losing some counter resets that occur on an aggregation boundary. ### Changed diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 05b1c1e46d..4f223064c6 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -289,10 +289,6 @@ func (b *aggrChunkBuilder) add(t int64, aggr *aggregator) { b.added++ } -func (b *aggrChunkBuilder) finalizeChunk(lastT int64, trueSample float64) { - b.apps[AggrCounter].Append(lastT, trueSample) -} - func (b *aggrChunkBuilder) encode() chunks.Meta { return chunks.Meta{ MinTime: b.mint, @@ -306,14 +302,17 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta { if len(data) == 0 { return nil } - var ( - mint, maxt = data[0].t, data[len(data)-1].t - // We assume a raw resolution of 1 minute. In practice it will often be lower - // but this is sufficient for our heuristic to produce well-sized chunks. - numChunks = targetChunkCount(mint, maxt, 1*60*1000, resolution, len(data)) - chks = make([]chunks.Meta, 0, numChunks) - batchSize = (len(data) / numChunks) + 1 - ) + + mint, maxt := data[0].t, data[len(data)-1].t + // We assume a raw resolution of 1 minute. In practice it will often be lower + // but this is sufficient for our heuristic to produce well-sized chunks. + numChunks := targetChunkCount(mint, maxt, 1*60*1000, resolution, len(data)) + return downsampleRawLoop(data, resolution, numChunks) +} + +func downsampleRawLoop(data []sample, resolution int64, numChunks int) []chunks.Meta { + batchSize := (len(data) / numChunks) + 1 + chks := make([]chunks.Meta, 0, numChunks) for len(data) > 0 { j := batchSize @@ -327,14 +326,18 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta { for ; j < len(data) && data[j].t <= curW; j++ { } - ab := newAggrChunkBuilder() batch := data[:j] data = data[j:] + ab := newAggrChunkBuilder() + + // Encode first raw value; see CounterSeriesIterator. + ab.apps[AggrCounter].Append(batch[0].t, batch[0].v) + lastT := downsampleBatch(batch, resolution, ab.add) - // InjectThanosMeta the chunk's counter aggregate with the last true sample. - ab.finalizeChunk(lastT, batch[len(batch)-1].v) + // Encode last raw value; see CounterSeriesIterator. + ab.apps[AggrCounter].Append(lastT, batch[len(batch)-1].v) chks = append(chks, ab.encode()) } @@ -379,18 +382,21 @@ func downsampleBatch(data []sample, resolution int64, add func(int64, *aggregato // downsampleAggr downsamples a sequence of aggregation chunks to the given resolution. func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes int64) ([]chunks.Meta, error) { - // We downsample aggregates only along chunk boundaries. This is required for counters - // to be downsampled correctly since a chunks' last counter value is the true last value - // of the original series. We need to preserve it even across multiple aggregation iterations. var numSamples int for _, c := range chks { numSamples += c.NumSamples() } - var ( - numChunks = targetChunkCount(mint, maxt, inRes, outRes, numSamples) - res = make([]chunks.Meta, 0, numChunks) - batchSize = len(chks) / numChunks - ) + numChunks := targetChunkCount(mint, maxt, inRes, outRes, numSamples) + return downsampleAggrLoop(chks, buf, outRes, numChunks) +} + +func downsampleAggrLoop(chks []*AggrChunk, buf *[]sample, resolution int64, numChunks int) ([]chunks.Meta, error) { + // We downsample aggregates only along chunk boundaries. This is required + // for counters to be downsampled correctly since a chunk's first and last + // counter values are the true values of the original series. We need + // to preserve them even across multiple aggregation iterations. + res := make([]chunks.Meta, 0, numChunks) + batchSize := len(chks) / numChunks for len(chks) > 0 { j := batchSize @@ -400,12 +406,13 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes part := chks[:j] chks = chks[j:] - chk, err := downsampleAggrBatch(part, buf, outRes) + chk, err := downsampleAggrBatch(part, buf, resolution) if err != nil { return nil, err } res = append(res, chk) } + return res, nil } @@ -512,6 +519,9 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch ab.chunks[AggrCounter] = chunkenc.NewXORChunk() ab.apps[AggrCounter], _ = ab.chunks[AggrCounter].Appender() + // Retain first raw value; see CounterSeriesIterator. + ab.apps[AggrCounter].Append((*buf)[0].t, (*buf)[0].v) + lastT := downsampleBatch(*buf, resolution, func(t int64, a *aggregator) { if t < mint { mint = t @@ -520,6 +530,8 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch } ab.apps[AggrCounter].Append(t, a.counter) }) + + // Retain last raw value; see CounterSeriesIterator. ab.apps[AggrCounter].Append(lastT, it.lastV) ab.mint = mint @@ -532,11 +544,18 @@ type sample struct { v float64 } -// CounterSeriesIterator iterates over an ordered sequence of chunks and treats decreasing -// values as counter reset. -// Additionally, it can deal with downsampled counter chunks, which set the last value of a chunk -// to the original last value. The last value can be detected by checking whether the timestamp -// did not increase w.r.t to the previous sample. +// CounterSeriesIterator generates monotonically increasing values by iterating +// over an ordered sequence of chunks, which should be raw or aggregated chunks +// of counter values. The generated samples can be used by PromQL functions +// like 'rate' that calculate differences between counter values. +// +// Counter aggregation chunks must have the first and last values from their +// original raw series: the first raw value should be the first value encoded +// in the chunk, and the last raw value is encoded by the duplication of the +// previous sample's timestamp. As iteration occurs between chunks, the +// comparison between the last raw value of the earlier chunk and the first raw +// value of the later chunk ensures that counter resets between chunks are +// recognized and that the correct value delta is calculated. type CounterSeriesIterator struct { chks []chunkenc.Iterator i int // Current chunk. diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index b5e6a40634..3c0fce8851 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -23,6 +23,124 @@ import ( "github.com/thanos-io/thanos/pkg/testutil" ) +func TestDownsampleCounterBoundaryReset(t *testing.T) { + + toAggrChunks := func(t *testing.T, cm []chunks.Meta) (res []*AggrChunk) { + for i := range cm { + achk, ok := cm[i].Chunk.(*AggrChunk) + testutil.Assert(t, ok, "expected *AggrChunk") + res = append(res, achk) + } + return + } + + counterSamples := func(t *testing.T, achks []*AggrChunk) (res []sample) { + for _, achk := range achks { + chk, err := achk.Get(AggrCounter) + testutil.Ok(t, err) + + iter := chk.Iterator(nil) + for iter.Next() { + t, v := iter.At() + res = append(res, sample{t, v}) + } + } + return + } + + counterIterate := func(t *testing.T, achks []*AggrChunk) (res []sample) { + var iters []chunkenc.Iterator + for _, achk := range achks { + chk, err := achk.Get(AggrCounter) + testutil.Ok(t, err) + iters = append(iters, chk.Iterator(nil)) + } + + citer := NewCounterSeriesIterator(iters...) + for citer.Next() { + t, v := citer.At() + res = append(res, sample{t: t, v: v}) + } + return + } + + type test struct { + raw []sample + rawAggrResolution int64 + expectedRawAggrChunks int + rawCounterSamples []sample + rawCounterIterate []sample + aggrAggrResolution int64 + aggrChunks int + aggrCounterSamples []sample + aggrCounterIterate []sample + } + + tests := []test{ + { + // In this test case, counter resets occur at the + // boundaries between the t=49,t=99 and t=99,t=149 + // windows, and the values in the t=49, t=99, and + // t=149 windows are high enough that the resets + // will only be accounted for if the first raw value + // of a chunk is maintained during aggregation. + // See #1568 for more details. + raw: []sample{ + {t: 10, v: 1}, {t: 20, v: 3}, {t: 30, v: 5}, + {t: 50, v: 1}, {t: 60, v: 8}, {t: 70, v: 10}, + {t: 120, v: 1}, {t: 130, v: 18}, {t: 140, v: 20}, + {t: 160, v: 21}, {t: 170, v: 38}, {t: 180, v: 40}, + }, + rawAggrResolution: 50, + expectedRawAggrChunks: 4, + rawCounterSamples: []sample{ + {t: 10, v: 1}, {t: 30, v: 5}, {t: 30, v: 5}, + {t: 50, v: 1}, {t: 70, v: 10}, {t: 70, v: 10}, + {t: 120, v: 1}, {t: 140, v: 20}, {t: 140, v: 20}, + {t: 160, v: 21}, {t: 180, v: 40}, {t: 180, v: 40}, + }, + rawCounterIterate: []sample{ + {t: 10, v: 1}, {t: 30, v: 5}, + {t: 50, v: 6}, {t: 70, v: 15}, + {t: 120, v: 16}, {t: 140, v: 35}, + {t: 160, v: 36}, {t: 180, v: 55}, + }, + aggrAggrResolution: 2 * 50, + aggrChunks: 2, + aggrCounterSamples: []sample{ + {t: 10, v: 1}, {t: 70, v: 15}, {t: 70, v: 10}, + {t: 120, v: 1}, {t: 180, v: 40}, {t: 180, v: 40}, + }, + aggrCounterIterate: []sample{ + {t: 10, v: 1}, {t: 70, v: 15}, + {t: 120, v: 16}, {t: 180, v: 55}, + }, + }, + } + + doTest := func(t *testing.T, test *test) { + // Asking for more chunks than raw samples ensures that downsampleRawLoop + // will create chunks with samples from a single window. + cm := downsampleRawLoop(test.raw, test.rawAggrResolution, len(test.raw)+1) + testutil.Equals(t, test.expectedRawAggrChunks, len(cm)) + + rawAggrChunks := toAggrChunks(t, cm) + testutil.Equals(t, test.rawCounterSamples, counterSamples(t, rawAggrChunks)) + testutil.Equals(t, test.rawCounterIterate, counterIterate(t, rawAggrChunks)) + + var buf []sample + acm, err := downsampleAggrLoop(rawAggrChunks, &buf, test.aggrAggrResolution, test.aggrChunks) + testutil.Ok(t, err) + testutil.Equals(t, test.aggrChunks, len(acm)) + + aggrAggrChunks := toAggrChunks(t, acm) + testutil.Equals(t, test.aggrCounterSamples, counterSamples(t, aggrAggrChunks)) + testutil.Equals(t, test.aggrCounterIterate, counterIterate(t, aggrAggrChunks)) + } + + doTest(t, &tests[0]) +} + func TestExpandChunkIterator(t *testing.T) { // Validate that expanding the chunk iterator filters out-of-order samples // and staleness markers. @@ -56,7 +174,7 @@ func TestDownsampleRaw(t *testing.T) { AggrSum: {{99, 7}, {199, 17}, {250, 1}}, AggrMin: {{99, 1}, {199, 2}, {250, 1}}, AggrMax: {{99, 3}, {199, 10}, {250, 1}}, - AggrCounter: {{99, 4}, {199, 13}, {250, 14}, {250, 1}}, + AggrCounter: {{20, 1}, {99, 4}, {199, 13}, {250, 14}, {250, 1}}, }, }, } @@ -93,7 +211,7 @@ func TestDownsampleAggr(t *testing.T) { AggrSum: {{499, 29}, {999, 100}}, AggrMin: {{499, -3}, {999, 0}}, AggrMax: {{499, 10}, {999, 100}}, - AggrCounter: {{499, 210}, {999, 320}, {1299, 430}, {1299, 110}}, + AggrCounter: {{99, 100}, {499, 210}, {999, 320}, {1299, 430}, {1299, 110}}, }, }, }