Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querier: Added regressions tests for counter missed reset bug. #2528

Merged
merged 2 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ func Downsample(
if err := indexr.Series(postings.At(), &lset, &chks); err != nil {
return id, errors.Wrapf(err, "get series %d", postings.At())
}

for i, c := range chks[1:] {
if chks[i].MaxTime >= c.MinTime {
return id, errors.Errorf("found overlapping chunks within series %d. Chunks expected to be ordered by min time and non-overlapping, got: %v", postings.At(), chks)
}
}

// While #183 exists, we sanitize the chunks we retrieved from the block
// before retrieving their samples.
for i, c := range chks {
Expand All @@ -129,6 +136,9 @@ func Downsample(
// Raw and already downsampled data need different processing.
if origMeta.Thanos.Downsample.Resolution == 0 {
for _, c := range chks {
// TODO(bwplotka): We can optimze this further by using in WriteSeries iterators of each chunk instead of
// samples. Also ensure 120 sample limit, otherwise we have gigantic chunks.
// https://github.com/thanos-io/thanos/issues/2542.
if err := expandChunkIterator(c.Chunk.Iterator(reuseIt), &all); err != nil {
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At())
}
Expand All @@ -139,7 +149,11 @@ func Downsample(
} else {
// Downsample a block that contains aggregated chunks already.
for _, c := range chks {
aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk))
ac, ok := c.Chunk.(*AggrChunk)
if !ok {
return id, errors.Errorf("expected downsampled chunk (*downsample.AggrChunk) got %T instead for series: %d", c.Chunk, postings.At())
}
aggrChunks = append(aggrChunks, ac)
}
downsampledChunks, err := downsampleAggr(
aggrChunks,
Expand Down Expand Up @@ -551,16 +565,21 @@ type sample struct {

// CounterSeriesIterator generates monotonically increasing values by iterating
// over an ordered sequence of chunks, which should be raw or aggregated chunks
// of counter values. The generated samples can be used by PromQL functions
// like 'rate' that calculate differences between counter values.
// of counter values. The generated samples can be used by PromQL functions
// like 'rate' that calculate differences between counter values. Stale Markers
// are removed as well.
//
// Counter aggregation chunks must have the first and last values from their
// original raw series: the first raw value should be the first value encoded
// in the chunk, and the last raw value is encoded by the duplication of the
// previous sample's timestamp. As iteration occurs between chunks, the
// previous sample's timestamp. As iteration occurs between chunks, the
// comparison between the last raw value of the earlier chunk and the first raw
// value of the later chunk ensures that counter resets between chunks are
// recognized and that the correct value delta is calculated.
//
// It handles overlapped chunks (removes overlaps.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
// NOTE: It is important to deduplicate with care ensuring that you don't hit
// issue https://github.com/thanos-io/thanos/issues/2401#issuecomment-621958839.
type CounterSeriesIterator struct {
chks []chunkenc.Iterator
i int // Current chunk.
Expand Down
601 changes: 441 additions & 160 deletions pkg/compact/downsample/downsample_test.go

Large diffs are not rendered by default.

137 changes: 90 additions & 47 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
// promSeriesSet implements the SeriesSet interface of the Prometheus storage
// package on top of our storepb SeriesSet.
type promSeriesSet struct {
set storepb.SeriesSet
initiated bool
done bool
set storepb.SeriesSet
done bool

mint, maxt int64
aggr resAggr
aggrs []storepb.Aggr
initiated bool

currLset []storepb.Label
currChunks []storepb.AggrChunk
Expand All @@ -39,7 +39,8 @@ func (s *promSeriesSet) Next() bool {
return false
}

// storage.Series are more strict then SeriesSet: It requires storage.Series to iterate over full series.
// storage.Series are more strict then SeriesSet:
// * It requires storage.Series to iterate over full series.
s.currLset, s.currChunks = s.set.At()
for {
s.done = s.set.Next()
Expand All @@ -52,14 +53,47 @@ func (s *promSeriesSet) Next() bool {
}
s.currChunks = append(s.currChunks, nextChunks...)
}

// Samples (so chunks as well) have to be sorted by time.
// TODO(bwplotka): Benchmark if we can do better.
// For example we could iterate in above loop and write our own binary search based insert sort.
// We could also remove duplicates in same loop.
sort.Slice(s.currChunks, func(i, j int) bool {
return s.currChunks[i].MinTime < s.currChunks[j].MinTime
})

// newChunkSeriesIterator will handle overlaps well, however we don't need to iterate over those samples,
// removed early duplicates here.
// TODO(bwplotka): Remove chunk duplicates on proxy level as well to avoid decoding those.
// https://github.com/thanos-io/thanos/issues/2546, consider skipping removal here then.
s.currChunks = removeExactDuplicates(s.currChunks)
return true
}

// removeExactDuplicates returns chunks without 1:1 duplicates.
// NOTE: input chunks has to be sorted by minTime.
func removeExactDuplicates(chks []storepb.AggrChunk) []storepb.AggrChunk {
if len(chks) <= 1 {
return chks
}

ret := make([]storepb.AggrChunk, 0, len(chks))
ret = append(ret, chks[0])

for _, c := range chks[1:] {
if ret[len(ret)-1].String() == c.String() {
continue
}
ret = append(ret, c)
}
return ret
}

func (s *promSeriesSet) At() storage.Series {
if !s.initiated || s.set.Err() != nil {
return nil
}
return newChunkSeries(s.currLset, s.currChunks, s.mint, s.maxt, s.aggr)
return newChunkSeries(s.currLset, s.currChunks, s.mint, s.maxt, s.aggrs)
}

func (s *promSeriesSet) Err() error {
Expand Down Expand Up @@ -98,6 +132,7 @@ func translateMatchers(ms ...*labels.Matcher) ([]storepb.LabelMatcher, error) {

// storeSeriesSet implements a storepb SeriesSet against a list of storepb.Series.
type storeSeriesSet struct {
// TODO(bwplotka): Don't buffer all, we have to buffer single series (to sort and dedup chunks), but nothing more.
series []storepb.Series
i int
}
Expand All @@ -119,29 +154,25 @@ func (storeSeriesSet) Err() error {
}

func (s storeSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
ser := s.series[s.i]
return ser.Labels, ser.Chunks
return s.series[s.i].Labels, s.series[s.i].Chunks
}

// chunkSeries implements storage.Series for a series on storepb types.
type chunkSeries struct {
lset labels.Labels
chunks []storepb.AggrChunk
mint, maxt int64
aggr resAggr
aggrs []storepb.Aggr
}

func newChunkSeries(lset []storepb.Label, chunks []storepb.AggrChunk, mint, maxt int64, aggr resAggr) *chunkSeries {
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].MinTime < chunks[j].MinTime
})

// newChunkSeries allows to iterate over samples for each sorted and non-overlapped chunks.
func newChunkSeries(lset []storepb.Label, chunks []storepb.AggrChunk, mint, maxt int64, aggrs []storepb.Aggr) *chunkSeries {
return &chunkSeries{
lset: storepb.LabelsToPromLabels(lset),
chunks: chunks,
mint: mint,
maxt: maxt,
aggr: aggr,
aggrs: aggrs,
}
}

Expand All @@ -153,33 +184,47 @@ func (s *chunkSeries) Iterator() storage.SeriesIterator {
var sit storage.SeriesIterator
its := make([]chunkenc.Iterator, 0, len(s.chunks))

switch s.aggr {
case resAggrCount:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Count, c.Raw))
}
sit = newChunkSeriesIterator(its)
case resAggrSum:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Sum, c.Raw))
}
sit = newChunkSeriesIterator(its)
case resAggrMin:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Min, c.Raw))
}
sit = newChunkSeriesIterator(its)
case resAggrMax:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Max, c.Raw))
}
sit = newChunkSeriesIterator(its)
case resAggrCounter:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Counter, c.Raw))
if len(s.aggrs) == 1 {
switch s.aggrs[0] {
case storepb.Aggr_COUNT:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Count, c.Raw))
}
sit = newChunkSeriesIterator(its)
case storepb.Aggr_SUM:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Sum, c.Raw))
}
sit = newChunkSeriesIterator(its)
case storepb.Aggr_MIN:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Min, c.Raw))
}
sit = newChunkSeriesIterator(its)
case storepb.Aggr_MAX:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Max, c.Raw))
}
sit = newChunkSeriesIterator(its)
case storepb.Aggr_COUNTER:
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Counter, c.Raw))
}
sit = downsample.NewCounterSeriesIterator(its...)
default:
return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggrs)}
}
sit = downsample.NewCounterSeriesIterator(its...)
case resAggrAvg:
return newBoundedSeriesIterator(sit, s.mint, s.maxt)
}

if len(s.aggrs) != 2 {
return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggrs)}
}

switch {
case s.aggrs[0] == storepb.Aggr_SUM && s.aggrs[1] == storepb.Aggr_COUNT,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this case possible? aggrsFromFunc only returns the 2nd case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is average

Copy link
Member Author

@bwplotka bwplotka May 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I know what you mean. You are right but it's easy to swap and assume it will be handled well, so I would vote to be extra safe here.

s.aggrs[0] == storepb.Aggr_COUNT && s.aggrs[1] == storepb.Aggr_SUM:

for _, c := range s.chunks {
if c.Raw != nil {
its = append(its, getFirstIterator(c.Raw))
Expand All @@ -190,7 +235,7 @@ func (s *chunkSeries) Iterator() storage.SeriesIterator {
}
sit = newChunkSeriesIterator(its)
default:
return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggr)}
return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggrs)}
}
return newBoundedSeriesIterator(sit, s.mint, s.maxt)
}
Expand Down Expand Up @@ -282,7 +327,6 @@ type chunkSeriesIterator struct {
func newChunkSeriesIterator(cs []chunkenc.Iterator) storage.SeriesIterator {
if len(cs) == 0 {
// This should not happen. StoreAPI implementations should not send empty results.
// NOTE(bplotka): Metric, err log here?
return errSeriesIterator{}
}
return &chunkSeriesIterator{chunks: cs}
Expand Down Expand Up @@ -400,8 +444,7 @@ func (s *dedupSeriesSet) At() storage.Series {
if len(s.replicas) == 1 {
return seriesWithLabels{Series: s.replicas[0], lset: s.lset}
}
// Clients may store the series, so we must make a copy of the slice
// before advancing.
// Clients may store the series, so we must make a copy of the slice before advancing.
repl := make([]storage.Series, len(s.replicas))
copy(repl, s.replicas)
return newDedupSeries(s.lset, repl...)
Expand Down Expand Up @@ -453,8 +496,8 @@ func newDedupSeriesIterator(a, b storage.SeriesIterator) *dedupSeriesIterator {
a: a,
b: b,
lastT: math.MinInt64,
aok: true,
bok: true,
aok: a.Next(),
bok: b.Next(),
}
}

Expand Down
47 changes: 18 additions & 29 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type queryable struct {

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.skipChunks), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks), nil
}

type querier struct {
Expand Down Expand Up @@ -137,38 +137,27 @@ func (s *seriesServer) Context() context.Context {
return s.ctx
}

type resAggr int

const (
resAggrAvg resAggr = iota
resAggrCount
resAggrSum
resAggrMin
resAggrMax
resAggrCounter
)

// aggrsFromFunc infers aggregates of the underlying data based on the wrapping
// function of a series selection.
func aggrsFromFunc(f string) ([]storepb.Aggr, resAggr) {
func aggrsFromFunc(f string) []storepb.Aggr {
if f == "min" || strings.HasPrefix(f, "min_") {
return []storepb.Aggr{storepb.Aggr_MIN}, resAggrMin
return []storepb.Aggr{storepb.Aggr_MIN}
}
if f == "max" || strings.HasPrefix(f, "max_") {
return []storepb.Aggr{storepb.Aggr_MAX}, resAggrMax
return []storepb.Aggr{storepb.Aggr_MAX}
}
if f == "count" || strings.HasPrefix(f, "count_") {
return []storepb.Aggr{storepb.Aggr_COUNT}, resAggrCount
return []storepb.Aggr{storepb.Aggr_COUNT}
}
// f == "sum" falls through here since we want the actual samples.
if strings.HasPrefix(f, "sum_") {
return []storepb.Aggr{storepb.Aggr_SUM}, resAggrSum
return []storepb.Aggr{storepb.Aggr_SUM}
}
if f == "increase" || f == "rate" {
return []storepb.Aggr{storepb.Aggr_COUNTER}, resAggrCounter
return []storepb.Aggr{storepb.Aggr_COUNTER}
}
// In the default case, we retrieve count and sum to compute an average.
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}, resAggrAvg
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
Expand All @@ -195,15 +184,15 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
return nil, nil, errors.Wrap(err, "convert matchers")
}

queryAggrs, resAggr := aggrsFromFunc(params.Func)
aggrs := aggrsFromFunc(params.Func)

resp := &seriesServer{ctx: ctx}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: params.Start,
MaxTime: params.End,
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: queryAggrs,
Aggregates: aggrs,
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
}, resp); err != nil {
Expand All @@ -218,10 +207,10 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
if !q.isDedupEnabled() {
// Return data without any deduplication.
return &promSeriesSet{
mint: q.mint,
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggr: resAggr,
mint: q.mint,
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggrs: aggrs,
}, warns, nil
}

Expand All @@ -230,10 +219,10 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
sortDedupLabels(resp.seriesSet, q.replicaLabels)

set := &promSeriesSet{
mint: q.mint,
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggr: resAggr,
mint: q.mint,
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggrs: aggrs,
}

// The merged series set assembles all potentially-overlapping time ranges
Expand Down
Loading