Skip to content

Commit

Permalink
querier: Added regressions tests for counter missed bug.
Browse files Browse the repository at this point in the history
PR with just tests, not fix yet.

Reproduces: #2401

* Added regressions tests for CounterSeriesIterator; Simplified aggregators.
* Fixes edge dedup cases for Next and added tests for deduplication.
* Refactored downsampling tests, added more realistic cases.
* Added check for duplicated chunks during downsampling.
* Removed duplicates for efficiency on promSeriesSet.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Apr 30, 2020
1 parent 9b94b90 commit 5139013
Show file tree
Hide file tree
Showing 7 changed files with 1,561 additions and 1,754 deletions.
27 changes: 23 additions & 4 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ func Downsample(
if err := indexr.Series(postings.At(), &lset, &chks); err != nil {
return id, errors.Wrapf(err, "get series %d", postings.At())
}

for i, c := range chks[1:] {
if chks[i].MaxTime >= c.MinTime {
return id, errors.Errorf("found overlapping chunks within series %d. Chunks expected to be ordered by min time and non-overlapping, got: %v", postings.At(), chks)
}
}

// While #183 exists, we sanitize the chunks we retrieved from the block
// before retrieving their samples.
for i, c := range chks {
Expand All @@ -129,6 +136,9 @@ func Downsample(
// Raw and already downsampled data need different processing.
if origMeta.Thanos.Downsample.Resolution == 0 {
for _, c := range chks {
// TODO(bwplotka): We can optimze this further by using in WriteSeries iterators of each chunk instead of
// samples. Also ensure 120 sample limit, otherwise we have gigantic chunks.
// https://github.com/thanos-io/thanos/issues/2542.
if err := expandChunkIterator(c.Chunk.Iterator(reuseIt), &all); err != nil {
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At())
}
Expand All @@ -139,7 +149,11 @@ func Downsample(
} else {
// Downsample a block that contains aggregated chunks already.
for _, c := range chks {
aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk))
ac, ok := c.Chunk.(*AggrChunk)
if !ok {
return id, errors.Errorf("expected downsampled chunk (*downsample.AggrChunk) got %T instead for series: %d", c.Chunk, postings.At())
}
aggrChunks = append(aggrChunks, ac)
}
downsampledChunks, err := downsampleAggr(
aggrChunks,
Expand Down Expand Up @@ -551,16 +565,21 @@ type sample struct {

// CounterSeriesIterator generates monotonically increasing values by iterating
// over an ordered sequence of chunks, which should be raw or aggregated chunks
// of counter values. The generated samples can be used by PromQL functions
// like 'rate' that calculate differences between counter values.
// of counter values. The generated samples can be used by PromQL functions
// like 'rate' that calculate differences between counter values. Stale Markers
// are removed as well.
//
// Counter aggregation chunks must have the first and last values from their
// original raw series: the first raw value should be the first value encoded
// in the chunk, and the last raw value is encoded by the duplication of the
// previous sample's timestamp. As iteration occurs between chunks, the
// previous sample's timestamp. As iteration occurs between chunks, the
// comparison between the last raw value of the earlier chunk and the first raw
// value of the later chunk ensures that counter resets between chunks are
// recognized and that the correct value delta is calculated.
//
// It handles overlapped chunks (removes overlaps.
// NOTE: It is important to deduplicate with care ensuring that you don't hit
// issue https://github.com/thanos-io/thanos/issues/2401#issuecomment-621958839.
type CounterSeriesIterator struct {
chks []chunkenc.Iterator
i int // Current chunk.
Expand Down
601 changes: 441 additions & 160 deletions pkg/compact/downsample/downsample_test.go

Large diffs are not rendered by default.

137 changes: 90 additions & 47 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
// promSeriesSet implements the SeriesSet interface of the Prometheus storage
// package on top of our storepb SeriesSet.
type promSeriesSet struct {
set storepb.SeriesSet
initiated bool
done bool
set storepb.SeriesSet
done bool

mint, maxt int64
aggr resAggr
aggrs []storepb.Aggr
initiated bool

currLset []storepb.Label
currChunks []storepb.AggrChunk
Expand All @@ -39,7 +39,8 @@ func (s *promSeriesSet) Next() bool {
return false
}

// storage.Series are more strict then SeriesSet: It requires storage.Series to iterate over full series.
// storage.Series are more strict then SeriesSet:
// * It requires storage.Series to iterate over full series.
s.currLset, s.currChunks = s.set.At()
for {
s.done = s.set.Next()
Expand All @@ -52,14 +53,47 @@ func (s *promSeriesSet) Next() bool {
}
s.currChunks = append(s.currChunks, nextChunks...)
}

// Samples (so chunks as well) have to be sorted by time.
// TODO(bwplotka): Benchmark if we can do better.
// For example we could iterate in above loop and write our own binary search based insert sort.
// We could also remove duplicates in same loop.
sort.Slice(s.currChunks, func(i, j int) bool {
return s.currChunks[i].MinTime < s.currChunks[j].MinTime
})

// newChunkSeriesIterator will handle overlaps well, however we don't need to iterate over those samples,
// removed early duplicates here.
// TODO(bwplotka): Remove chunk duplicates on proxy level as well to avoid decoding those.
// https://github.com/thanos-io/thanos/issues/2546, consider skipping removal here then.
s.currChunks = removeExactDuplicates(s.currChunks)
return true
}

// removeExactDuplicates returns chunks without 1:1 duplicates.
// NOTE: input chunks has to be sorted by minTime.
func removeExactDuplicates(chks []storepb.AggrChunk) []storepb.AggrChunk {
if len(chks) <= 1 {
return chks
}

ret := make([]storepb.AggrChunk, 0, len(chks))
ret = append(ret, chks[0])

for _, c := range chks[1:] {
if ret[len(ret)-1].String() == c.String() {
continue
}
ret = append(ret, c)
}
return ret
}

func (s *promSeriesSet) At() storage.Series {
if !s.initiated || s.set.Err() != nil {
return nil
}
return newChunkSeries(s.currLset, s.currChunks, s.mint, s.maxt, s.aggr)
return newChunkSeries(s.currLset, s.currChunks, s.mint, s.maxt, s.aggrs)
}

func (s *promSeriesSet) Err() error {
Expand Down Expand Up @@ -98,6 +132,7 @@ func translateMatchers(ms ...*labels.Matcher) ([]storepb.LabelMatcher, error) {

// storeSeriesSet implements a storepb SeriesSet against a list of storepb.Series.
type storeSeriesSet struct {
// TODO(bwplotka): Don't buffer all, we have to buffer single series (to sort and dedup chunks), but nothing more.
series []storepb.Series
i int
}
Expand All @@ -119,29 +154,25 @@ func (storeSeriesSet) Err() error {
}

func (s storeSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
ser := s.series[s.i]
return ser.Labels, ser.Chunks
return s.series[s.i].Labels, s.series[s.i].Chunks
}

// chunkSeries implements storage.Series for a series on storepb types.
type chunkSeries struct {
lset labels.Labels
chunks []storepb.AggrChunk
mint, maxt int64
aggr resAggr
aggrs []storepb.Aggr
}

func newChunkSeries(lset []storepb.Label, chunks []storepb.AggrChunk, mint, maxt int64, aggr resAggr) *chunkSeries {
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].MinTime < chunks[j].MinTime
})

// newChunkSeries allows to iterate over samples for each sorted and non-overlapped chunks.
func newChunkSeries(lset []storepb.Label, chunks []storepb.AggrChunk, mint, maxt int64, aggrs []storepb.Aggr) *chunkSeries {
return &chunkSeries{
lset: storepb.LabelsToPromLabels(lset),
chunks: chunks,
mint: mint,
maxt: maxt,
aggr: aggr,
aggrs: aggrs,
}
}

Expand All @@ -153,33 +184,47 @@ func (s *chunkSeries) Iterator() storage.SeriesIterator {
var sit storage.SeriesIterator
its := make([]chunkenc.Iterator, 0, len(s.chunks))

switch s.aggr {
case resAggrCount:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Count, c.Raw))
}
sit = newChunkSeriesIterator(its)
case resAggrSum:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Sum, c.Raw))
}
sit = newChunkSeriesIterator(its)
case resAggrMin:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Min, c.Raw))
}
sit = newChunkSeriesIterator(its)
case resAggrMax:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Max, c.Raw))
}
sit = newChunkSeriesIterator(its)
case resAggrCounter:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Counter, c.Raw))
if len(s.aggrs) == 1 {
switch s.aggrs[0] {
case storepb.Aggr_COUNT:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Count, c.Raw))
}
sit = newChunkSeriesIterator(its)
case storepb.Aggr_SUM:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Sum, c.Raw))
}
sit = newChunkSeriesIterator(its)
case storepb.Aggr_MIN:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Min, c.Raw))
}
sit = newChunkSeriesIterator(its)
case storepb.Aggr_MAX:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Max, c.Raw))
}
sit = newChunkSeriesIterator(its)
case storepb.Aggr_COUNTER:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Counter, c.Raw))
}
sit = downsample.NewCounterSeriesIterator(its...)
default:
return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggrs)}
}
sit = downsample.NewCounterSeriesIterator(its...)
case resAggrAvg:
return newBoundedSeriesIterator(sit, s.mint, s.maxt)
}

if len(s.aggrs) != 2 {
return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggrs)}
}

switch {
case s.aggrs[0] == storepb.Aggr_SUM && s.aggrs[1] == storepb.Aggr_COUNT,
s.aggrs[0] == storepb.Aggr_COUNT && s.aggrs[1] == storepb.Aggr_SUM:

for _, c := range s.chunks {
if c.Raw != nil {
its = append(its, getFirstIterator(c.Raw))
Expand All @@ -190,7 +235,7 @@ func (s *chunkSeries) Iterator() storage.SeriesIterator {
}
sit = newChunkSeriesIterator(its)
default:
return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggr)}
return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggrs)}
}
return newBoundedSeriesIterator(sit, s.mint, s.maxt)
}
Expand Down Expand Up @@ -282,7 +327,6 @@ type chunkSeriesIterator struct {
func newChunkSeriesIterator(cs []chunkenc.Iterator) storage.SeriesIterator {
if len(cs) == 0 {
// This should not happen. StoreAPI implementations should not send empty results.
// NOTE(bplotka): Metric, err log here?
return errSeriesIterator{}
}
return &chunkSeriesIterator{chunks: cs}
Expand Down Expand Up @@ -400,8 +444,7 @@ func (s *dedupSeriesSet) At() storage.Series {
if len(s.replicas) == 1 {
return seriesWithLabels{Series: s.replicas[0], lset: s.lset}
}
// Clients may store the series, so we must make a copy of the slice
// before advancing.
// Clients may store the series, so we must make a copy of the slice before advancing.
repl := make([]storage.Series, len(s.replicas))
copy(repl, s.replicas)
return newDedupSeries(s.lset, repl...)
Expand Down Expand Up @@ -453,8 +496,8 @@ func newDedupSeriesIterator(a, b storage.SeriesIterator) *dedupSeriesIterator {
a: a,
b: b,
lastT: math.MinInt64,
aok: true,
bok: true,
aok: a.Next(),
bok: b.Next(),
}
}

Expand Down
47 changes: 18 additions & 29 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type queryable struct {

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.skipChunks), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks), nil
}

type querier struct {
Expand Down Expand Up @@ -137,38 +137,27 @@ func (s *seriesServer) Context() context.Context {
return s.ctx
}

type resAggr int

const (
resAggrAvg resAggr = iota
resAggrCount
resAggrSum
resAggrMin
resAggrMax
resAggrCounter
)

// aggrsFromFunc infers aggregates of the underlying data based on the wrapping
// function of a series selection.
func aggrsFromFunc(f string) ([]storepb.Aggr, resAggr) {
func aggrsFromFunc(f string) []storepb.Aggr {
if f == "min" || strings.HasPrefix(f, "min_") {
return []storepb.Aggr{storepb.Aggr_MIN}, resAggrMin
return []storepb.Aggr{storepb.Aggr_MIN}
}
if f == "max" || strings.HasPrefix(f, "max_") {
return []storepb.Aggr{storepb.Aggr_MAX}, resAggrMax
return []storepb.Aggr{storepb.Aggr_MAX}
}
if f == "count" || strings.HasPrefix(f, "count_") {
return []storepb.Aggr{storepb.Aggr_COUNT}, resAggrCount
return []storepb.Aggr{storepb.Aggr_COUNT}
}
// f == "sum" falls through here since we want the actual samples.
if strings.HasPrefix(f, "sum_") {
return []storepb.Aggr{storepb.Aggr_SUM}, resAggrSum
return []storepb.Aggr{storepb.Aggr_SUM}
}
if f == "increase" || f == "rate" {
return []storepb.Aggr{storepb.Aggr_COUNTER}, resAggrCounter
return []storepb.Aggr{storepb.Aggr_COUNTER}
}
// In the default case, we retrieve count and sum to compute an average.
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}, resAggrAvg
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
Expand All @@ -195,15 +184,15 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
return nil, nil, errors.Wrap(err, "convert matchers")
}

queryAggrs, resAggr := aggrsFromFunc(params.Func)
aggrs := aggrsFromFunc(params.Func)

resp := &seriesServer{ctx: ctx}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: params.Start,
MaxTime: params.End,
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: queryAggrs,
Aggregates: aggrs,
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
}, resp); err != nil {
Expand All @@ -218,10 +207,10 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
if !q.isDedupEnabled() {
// Return data without any deduplication.
return &promSeriesSet{
mint: q.mint,
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggr: resAggr,
mint: q.mint,
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggrs: aggrs,
}, warns, nil
}

Expand All @@ -230,10 +219,10 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
sortDedupLabels(resp.seriesSet, q.replicaLabels)

set := &promSeriesSet{
mint: q.mint,
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggr: resAggr,
mint: q.mint,
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggrs: aggrs,
}

// The merged series set assembles all potentially-overlapping time ranges
Expand Down
Loading

0 comments on commit 5139013

Please sign in to comment.