Skip to content

Commit

Permalink
Added check for duplicated chunks during downsampling.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Apr 30, 2020
1 parent de02447 commit 38b8b32
Show file tree
Hide file tree
Showing 3 changed files with 361 additions and 362 deletions.
26 changes: 20 additions & 6 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ func Downsample(
if err := indexr.Series(postings.At(), &lset, &chks); err != nil {
return id, errors.Wrapf(err, "get series %d", postings.At())
}

for i, c := range chks[1:] {
if chks[i].MaxTime >= c.MinTime {
return id, errors.Errorf("found overlapping chunks within series %d. Chunks expected to be ordered by min time and non-overlapping, got: %v", postings.At(), chks)
}
}

// While #183 exists, we sanitize the chunks we retrieved from the block
// before retrieving their samples.
for i, c := range chks {
Expand All @@ -129,6 +136,9 @@ func Downsample(
// Raw and already downsampled data need different processing.
if origMeta.Thanos.Downsample.Resolution == 0 {
for _, c := range chks {
// TODO(bwplotka): We can optimze this further by using in WriteSeries iterators of each chunk instead of
// samples. Also ensure 120 sample limit, otherwise we have gigantic chunks.
// https://github.com/thanos-io/thanos/issues/2542
if err := expandChunkIterator(c.Chunk.Iterator(reuseIt), &all); err != nil {
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At())
}
Expand Down Expand Up @@ -338,12 +348,12 @@ func downsampleRawLoop(data []sample, resolution int64, numChunks int) []chunks.

ab := newAggrChunkBuilder()

// Encode first raw value; see CounterSeriesIterator.
// Encode first raw value; see CounterAggrChunksIterator.
ab.apps[AggrCounter].Append(batch[0].t, batch[0].v)

lastT := downsampleBatch(batch, resolution, ab.add)

// Encode last raw value; see CounterSeriesIterator.
// Encode last raw value; see CounterAggrChunksIterator.
ab.apps[AggrCounter].Append(lastT, batch[len(batch)-1].v)

chks = append(chks, ab.encode())
Expand Down Expand Up @@ -528,7 +538,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
ab.chunks[AggrCounter] = chunkenc.NewXORChunk()
ab.apps[AggrCounter], _ = ab.chunks[AggrCounter].Appender()

// Retain first raw value; see CounterSeriesIterator.
// Retain first raw value; see CounterAggrChunksIterator.
ab.apps[AggrCounter].Append((*buf)[0].t, (*buf)[0].v)

lastT := downsampleBatch(*buf, resolution, func(t int64, a *aggregator) {
Expand All @@ -540,7 +550,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
ab.apps[AggrCounter].Append(t, a.counter)
})

// Retain last raw value; see CounterSeriesIterator.
// Retain last raw value; see CounterAggrChunksIterator.
ab.apps[AggrCounter].Append(lastT, it.lastV)

ab.mint = mint
Expand All @@ -556,7 +566,8 @@ type sample struct {
// CounterSeriesIterator generates monotonically increasing values by iterating
// over an ordered sequence of chunks, which should be raw or aggregated chunks
// of counter values. The generated samples can be used by PromQL functions
// like 'rate' that calculate differences between counter values.
// like 'rate' that calculate differences between counter values. Stale Markers
// are removed as well.
//
// Counter aggregation chunks must have the first and last values from their
// original raw series: the first raw value should be the first value encoded
Expand All @@ -565,7 +576,10 @@ type sample struct {
// comparison between the last raw value of the earlier chunk and the first raw
// value of the later chunk ensures that counter resets between chunks are
// recognized and that the correct value delta is calculated.
// NOTE: It handles overlapped chunks.
//
// It handles overlapped chunks (removes overlaps.
// NOTE: It is important to deduplicate with care ensuring that you don't hit
// issue https://github.com/thanos-io/thanos/issues/2401#issuecomment-621958839.
type CounterSeriesIterator struct {
chks []chunkenc.Iterator
i int // Current chunk.
Expand Down
Loading

0 comments on commit 38b8b32

Please sign in to comment.