diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 05b1c1e46d..63652015be 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -289,7 +289,13 @@ func (b *aggrChunkBuilder) add(t int64, aggr *aggregator) { b.added++ } -func (b *aggrChunkBuilder) finalizeChunk(lastT int64, trueSample float64) { +func (b *aggrChunkBuilder) firstRawSample(firstT int64, trueSample float64) { + // This must be the first sample given to the counter appender. + b.apps[AggrCounter].Append(firstT, trueSample) +} + +func (b *aggrChunkBuilder) lastRawSample(lastT int64, trueSample float64) { + // This must be the last sample given to the counter appender. b.apps[AggrCounter].Append(lastT, trueSample) } @@ -306,14 +312,17 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta { if len(data) == 0 { return nil } - var ( - mint, maxt = data[0].t, data[len(data)-1].t - // We assume a raw resolution of 1 minute. In practice it will often be lower - // but this is sufficient for our heuristic to produce well-sized chunks. - numChunks = targetChunkCount(mint, maxt, 1*60*1000, resolution, len(data)) - chks = make([]chunks.Meta, 0, numChunks) - batchSize = (len(data) / numChunks) + 1 - ) + + mint, maxt := data[0].t, data[len(data)-1].t + // We assume a raw resolution of 1 minute. In practice it will often be lower + // but this is sufficient for our heuristic to produce well-sized chunks. + numChunks := targetChunkCount(mint, maxt, 1*60*1000, resolution, len(data)) + return downsampleRawLoop(data, resolution, numChunks) +} + +func downsampleRawLoop(data []sample, resolution int64, numChunks int) []chunks.Meta { + batchSize := (len(data) / numChunks) + 1 + chks := make([]chunks.Meta, 0, numChunks) for len(data) > 0 { j := batchSize @@ -327,14 +336,16 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta { for ; j < len(data) && data[j].t <= curW; j++ { } - ab := newAggrChunkBuilder() batch := data[:j] data = data[j:] + ab := newAggrChunkBuilder() + + ab.firstRawSample(batch[0].t, batch[0].v) + lastT := downsampleBatch(batch, resolution, ab.add) - // InjectThanosMeta the chunk's counter aggregate with the last true sample. - ab.finalizeChunk(lastT, batch[len(batch)-1].v) + ab.lastRawSample(lastT, batch[len(batch)-1].v) chks = append(chks, ab.encode()) } @@ -379,18 +390,20 @@ func downsampleBatch(data []sample, resolution int64, add func(int64, *aggregato // downsampleAggr downsamples a sequence of aggregation chunks to the given resolution. func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes int64) ([]chunks.Meta, error) { - // We downsample aggregates only along chunk boundaries. This is required for counters - // to be downsampled correctly since a chunks' last counter value is the true last value - // of the original series. We need to preserve it even across multiple aggregation iterations. var numSamples int for _, c := range chks { numSamples += c.NumSamples() } - var ( - numChunks = targetChunkCount(mint, maxt, inRes, outRes, numSamples) - res = make([]chunks.Meta, 0, numChunks) - batchSize = len(chks) / numChunks - ) + numChunks := targetChunkCount(mint, maxt, inRes, outRes, numSamples) + return downsampleAggrLoop(chks, buf, outRes, numChunks) +} + +func downsampleAggrLoop(chks []*AggrChunk, buf *[]sample, resolution int64, numChunks int) ([]chunks.Meta, error) { + // We downsample aggregates only along chunk boundaries. This is required for counters + // to be downsampled correctly since a chunks' last counter value is the true last value + // of the original series. We need to preserve it even across multiple aggregation iterations. + res := make([]chunks.Meta, 0, numChunks) + batchSize := len(chks) / numChunks for len(chks) > 0 { j := batchSize @@ -400,12 +413,13 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes part := chks[:j] chks = chks[j:] - chk, err := downsampleAggrBatch(part, buf, outRes) + chk, err := downsampleAggrBatch(part, buf, resolution) if err != nil { return nil, err } res = append(res, chk) } + return res, nil } @@ -512,6 +526,8 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch ab.chunks[AggrCounter] = chunkenc.NewXORChunk() ab.apps[AggrCounter], _ = ab.chunks[AggrCounter].Appender() + ab.firstRawSample((*buf)[0].t, (*buf)[0].v) + lastT := downsampleBatch(*buf, resolution, func(t int64, a *aggregator) { if t < mint { mint = t @@ -520,7 +536,8 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch } ab.apps[AggrCounter].Append(t, a.counter) }) - ab.apps[AggrCounter].Append(lastT, it.lastV) + + ab.lastRawSample(lastT, it.lastV) ab.mint = mint ab.maxt = maxt diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index b5e6a40634..b963bfb706 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -23,6 +23,124 @@ import ( "github.com/thanos-io/thanos/pkg/testutil" ) +func TestDownsampleCounterBoundaryReset(t *testing.T) { + + toAggrChunks := func(t *testing.T, cm []chunks.Meta) (res []*AggrChunk) { + for i := range cm { + achk, ok := cm[i].Chunk.(*AggrChunk) + testutil.Assert(t, ok, "expected *AggrChunk") + res = append(res, achk) + } + return + } + + counterSamples := func(t *testing.T, achks []*AggrChunk) (res []sample) { + for _, achk := range achks { + chk, err := achk.Get(AggrCounter) + testutil.Ok(t, err) + + iter := chk.Iterator(nil) + for iter.Next() { + t, v := iter.At() + res = append(res, sample{t, v}) + } + } + return + } + + counterIterate := func(t *testing.T, achks []*AggrChunk) (res []sample) { + var iters []chunkenc.Iterator + for _, achk := range achks { + chk, err := achk.Get(AggrCounter) + testutil.Ok(t, err) + iters = append(iters, chk.Iterator(nil)) + } + + citer := NewCounterSeriesIterator(iters...) + for citer.Next() { + t, v := citer.At() + res = append(res, sample{t: t, v: v}) + } + return + } + + type test struct { + raw []sample + rawAggrResolution int64 + expectedRawAggrChunks int + rawCounterSamples []sample + rawCounterIterate []sample + aggrAggrResolution int64 + aggrChunks int + aggrCounterSamples []sample + aggrCounterIterate []sample + } + + tests := []test{ + { + // In this test case, counter resets occur at the + // boundaries between the t=49,t=99 and t=99,t=149 + // windows, and the values in the t=49, t=99, and + // t=149 windows are high enough that the resets + // will only be accounted for if the first raw value + // of a chunk is maintained during aggregation. + // See #1568 for more details. + []sample{ + {t: 10, v: 1}, {t: 20, v: 3}, {t: 30, v: 5}, + {t: 50, v: 1}, {t: 60, v: 8}, {t: 70, v: 10}, + {t: 120, v: 1}, {t: 130, v: 18}, {t: 140, v: 20}, + {t: 160, v: 21}, {t: 170, v: 38}, {t: 180, v: 40}, + }, + 50, + 4, + []sample{ + {t: 10, v: 1}, {t: 30, v: 5}, {t: 30, v: 5}, + {t: 50, v: 1}, {t: 70, v: 10}, {t: 70, v: 10}, + {t: 120, v: 1}, {t: 140, v: 20}, {t: 140, v: 20}, + {t: 160, v: 21}, {t: 180, v: 40}, {t: 180, v: 40}, + }, + []sample{ + {t: 10, v: 1}, {t: 30, v: 5}, + {t: 50, v: 6}, {t: 70, v: 15}, + {t: 120, v: 16}, {t: 140, v: 35}, + {t: 160, v: 36}, {t: 180, v: 55}, + }, + 2 * 50, + 2, + []sample{ + {t: 10, v: 1}, {t: 70, v: 15}, {t: 70, v: 10}, + {t: 120, v: 1}, {t: 180, v: 40}, {t: 180, v: 40}, + }, + []sample{ + {t: 10, v: 1}, {t: 70, v: 15}, + {t: 120, v: 16}, {t: 180, v: 55}, + }, + }, + } + + doTest := func(t *testing.T, test *test) { + // Asking for more chunks than raw samples ensures that downsampleRawLoop + // will create chunks with samples from a single window. + cm := downsampleRawLoop(test.raw, test.rawAggrResolution, len(test.raw)+1) + testutil.Equals(t, test.expectedRawAggrChunks, len(cm)) + + rawAggrChunks := toAggrChunks(t, cm) + testutil.Equals(t, test.rawCounterSamples, counterSamples(t, rawAggrChunks)) + testutil.Equals(t, test.rawCounterIterate, counterIterate(t, rawAggrChunks)) + + var buf []sample + acm, err := downsampleAggrLoop(rawAggrChunks, &buf, test.aggrAggrResolution, test.aggrChunks) + testutil.Ok(t, err) + testutil.Equals(t, test.aggrChunks, len(acm)) + + aggrAggrChunks := toAggrChunks(t, acm) + testutil.Equals(t, test.aggrCounterSamples, counterSamples(t, aggrAggrChunks)) + testutil.Equals(t, test.aggrCounterIterate, counterIterate(t, aggrAggrChunks)) + } + + doTest(t, &tests[0]) +} + func TestExpandChunkIterator(t *testing.T) { // Validate that expanding the chunk iterator filters out-of-order samples // and staleness markers.