Skip to content

Commit

Permalink
store the first raw value of a chunk during downsampling
Browse files Browse the repository at this point in the history
As discussed in thanos-io#1568, storing only the last raw value
of a chunk will lose a counter reset when:
a) the reset occurs at a chunk boundary, and
b) the last raw value of the earlier chunk is less than
the first aggregated value of the later chunk.

This commit stores the first raw value of a chunk during
the initial raw aggregation, and retains it during
subsequent aggregations. This is similar to the existing
handling for the last raw value of a chunk.

With this change, when counterSeriesIterator iterates over
a chunk boundary, it will see the last raw value of the
earlier chunk, then the first raw value of the later chunk,
and then the first aggregated value of the later chunk. The
first raw value will always be less than or equal to the
first aggregated value, so the only difference in
counterSeriesIterator's output will be the possible detection
of a reset and an extra sample after the chunk boundary.

Fixes: thanos-io#1568

Signed-off-by: Alfred Landrum <[email protected]>
  • Loading branch information
alfred-landrum committed Nov 3, 2019
1 parent 64af185 commit 75ec66c
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 22 deletions.
61 changes: 39 additions & 22 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,13 @@ func (b *aggrChunkBuilder) add(t int64, aggr *aggregator) {
b.added++
}

func (b *aggrChunkBuilder) finalizeChunk(lastT int64, trueSample float64) {
func (b *aggrChunkBuilder) firstRawSample(firstT int64, trueSample float64) {
// This must be the first sample given to the counter appender.
b.apps[AggrCounter].Append(firstT, trueSample)
}

func (b *aggrChunkBuilder) lastRawSample(lastT int64, trueSample float64) {
// This must be the last sample given to the counter appender.
b.apps[AggrCounter].Append(lastT, trueSample)
}

Expand All @@ -306,14 +312,17 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta {
if len(data) == 0 {
return nil
}
var (
mint, maxt = data[0].t, data[len(data)-1].t
// We assume a raw resolution of 1 minute. In practice it will often be lower
// but this is sufficient for our heuristic to produce well-sized chunks.
numChunks = targetChunkCount(mint, maxt, 1*60*1000, resolution, len(data))
chks = make([]chunks.Meta, 0, numChunks)
batchSize = (len(data) / numChunks) + 1
)

mint, maxt := data[0].t, data[len(data)-1].t
// We assume a raw resolution of 1 minute. In practice it will often be lower
// but this is sufficient for our heuristic to produce well-sized chunks.
numChunks := targetChunkCount(mint, maxt, 1*60*1000, resolution, len(data))
return downsampleRawLoop(data, resolution, numChunks)
}

func downsampleRawLoop(data []sample, resolution int64, numChunks int) []chunks.Meta {
batchSize := (len(data) / numChunks) + 1
chks := make([]chunks.Meta, 0, numChunks)

for len(data) > 0 {
j := batchSize
Expand All @@ -327,14 +336,16 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta {
for ; j < len(data) && data[j].t <= curW; j++ {
}

ab := newAggrChunkBuilder()
batch := data[:j]
data = data[j:]

ab := newAggrChunkBuilder()

ab.firstRawSample(batch[0].t, batch[0].v)

lastT := downsampleBatch(batch, resolution, ab.add)

// InjectThanosMeta the chunk's counter aggregate with the last true sample.
ab.finalizeChunk(lastT, batch[len(batch)-1].v)
ab.lastRawSample(lastT, batch[len(batch)-1].v)

chks = append(chks, ab.encode())
}
Expand Down Expand Up @@ -379,18 +390,20 @@ func downsampleBatch(data []sample, resolution int64, add func(int64, *aggregato

// downsampleAggr downsamples a sequence of aggregation chunks to the given resolution.
func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes int64) ([]chunks.Meta, error) {
// We downsample aggregates only along chunk boundaries. This is required for counters
// to be downsampled correctly since a chunks' last counter value is the true last value
// of the original series. We need to preserve it even across multiple aggregation iterations.
var numSamples int
for _, c := range chks {
numSamples += c.NumSamples()
}
var (
numChunks = targetChunkCount(mint, maxt, inRes, outRes, numSamples)
res = make([]chunks.Meta, 0, numChunks)
batchSize = len(chks) / numChunks
)
numChunks := targetChunkCount(mint, maxt, inRes, outRes, numSamples)
return downsampleAggrLoop(chks, buf, outRes, numChunks)
}

func downsampleAggrLoop(chks []*AggrChunk, buf *[]sample, resolution int64, numChunks int) ([]chunks.Meta, error) {
// We downsample aggregates only along chunk boundaries. This is required for counters
// to be downsampled correctly since a chunks' last counter value is the true last value
// of the original series. We need to preserve it even across multiple aggregation iterations.
res := make([]chunks.Meta, 0, numChunks)
batchSize := len(chks) / numChunks

for len(chks) > 0 {
j := batchSize
Expand All @@ -400,12 +413,13 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes
part := chks[:j]
chks = chks[j:]

chk, err := downsampleAggrBatch(part, buf, outRes)
chk, err := downsampleAggrBatch(part, buf, resolution)
if err != nil {
return nil, err
}
res = append(res, chk)
}

return res, nil
}

Expand Down Expand Up @@ -512,6 +526,8 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
ab.chunks[AggrCounter] = chunkenc.NewXORChunk()
ab.apps[AggrCounter], _ = ab.chunks[AggrCounter].Appender()

ab.firstRawSample((*buf)[0].t, (*buf)[0].v)

lastT := downsampleBatch(*buf, resolution, func(t int64, a *aggregator) {
if t < mint {
mint = t
Expand All @@ -520,7 +536,8 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
}
ab.apps[AggrCounter].Append(t, a.counter)
})
ab.apps[AggrCounter].Append(lastT, it.lastV)

ab.lastRawSample(lastT, it.lastV)

ab.mint = mint
ab.maxt = maxt
Expand Down
118 changes: 118 additions & 0 deletions pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,124 @@ import (
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestDownsampleCounterBoundaryReset(t *testing.T) {

toAggrChunks := func(t *testing.T, cm []chunks.Meta) (res []*AggrChunk) {
for i := range cm {
achk, ok := cm[i].Chunk.(*AggrChunk)
testutil.Assert(t, ok, "expected *AggrChunk")
res = append(res, achk)
}
return
}

counterSamples := func(t *testing.T, achks []*AggrChunk) (res []sample) {
for _, achk := range achks {
chk, err := achk.Get(AggrCounter)
testutil.Ok(t, err)

iter := chk.Iterator(nil)
for iter.Next() {
t, v := iter.At()
res = append(res, sample{t, v})
}
}
return
}

counterIterate := func(t *testing.T, achks []*AggrChunk) (res []sample) {
var iters []chunkenc.Iterator
for _, achk := range achks {
chk, err := achk.Get(AggrCounter)
testutil.Ok(t, err)
iters = append(iters, chk.Iterator(nil))
}

citer := NewCounterSeriesIterator(iters...)
for citer.Next() {
t, v := citer.At()
res = append(res, sample{t: t, v: v})
}
return
}

type test struct {
raw []sample
rawAggrResolution int64
expectedRawAggrChunks int
rawCounterSamples []sample
rawCounterIterate []sample
aggrAggrResolution int64
aggrChunks int
aggrCounterSamples []sample
aggrCounterIterate []sample
}

tests := []test{
{
// In this test case, counter resets occur at the
// boundaries between the t=49,t=99 and t=99,t=149
// windows, and the values in the t=49, t=99, and
// t=149 windows are high enough that the resets
// will only be accounted for if the first raw value
// of a chunk is maintained during aggregation.
// See #1568 for more details.
[]sample{
{t: 10, v: 1}, {t: 20, v: 3}, {t: 30, v: 5},
{t: 50, v: 1}, {t: 60, v: 8}, {t: 70, v: 10},
{t: 120, v: 1}, {t: 130, v: 18}, {t: 140, v: 20},
{t: 160, v: 21}, {t: 170, v: 38}, {t: 180, v: 40},
},
50,
4,
[]sample{
{t: 10, v: 1}, {t: 30, v: 5}, {t: 30, v: 5},
{t: 50, v: 1}, {t: 70, v: 10}, {t: 70, v: 10},
{t: 120, v: 1}, {t: 140, v: 20}, {t: 140, v: 20},
{t: 160, v: 21}, {t: 180, v: 40}, {t: 180, v: 40},
},
[]sample{
{t: 10, v: 1}, {t: 30, v: 5},
{t: 50, v: 6}, {t: 70, v: 15},
{t: 120, v: 16}, {t: 140, v: 35},
{t: 160, v: 36}, {t: 180, v: 55},
},
2 * 50,
2,
[]sample{
{t: 10, v: 1}, {t: 70, v: 15}, {t: 70, v: 10},
{t: 120, v: 1}, {t: 180, v: 40}, {t: 180, v: 40},
},
[]sample{
{t: 10, v: 1}, {t: 70, v: 15},
{t: 120, v: 16}, {t: 180, v: 55},
},
},
}

doTest := func(t *testing.T, test *test) {
// Asking for more chunks than raw samples ensures that downsampleRawLoop
// will create chunks with samples from a single window.
cm := downsampleRawLoop(test.raw, test.rawAggrResolution, len(test.raw)+1)
testutil.Equals(t, test.expectedRawAggrChunks, len(cm))

rawAggrChunks := toAggrChunks(t, cm)
testutil.Equals(t, test.rawCounterSamples, counterSamples(t, rawAggrChunks))
testutil.Equals(t, test.rawCounterIterate, counterIterate(t, rawAggrChunks))

var buf []sample
acm, err := downsampleAggrLoop(rawAggrChunks, &buf, test.aggrAggrResolution, test.aggrChunks)
testutil.Ok(t, err)
testutil.Equals(t, test.aggrChunks, len(acm))

aggrAggrChunks := toAggrChunks(t, acm)
testutil.Equals(t, test.aggrCounterSamples, counterSamples(t, aggrAggrChunks))
testutil.Equals(t, test.aggrCounterIterate, counterIterate(t, aggrAggrChunks))
}

doTest(t, &tests[0])
}

func TestExpandChunkIterator(t *testing.T) {
// Validate that expanding the chunk iterator filters out-of-order samples
// and staleness markers.
Expand Down

0 comments on commit 75ec66c

Please sign in to comment.