Skip to content

Commit

Permalink
add metrics for bucket store empty postings count (#5721)
Browse files Browse the repository at this point in the history
* add metrics for bucket store empty postings

Signed-off-by: Ben Ye <[email protected]>

* update changelog

Signed-off-by: Ben Ye <[email protected]>

* lint

Signed-off-by: Ben Ye <[email protected]>

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Sep 26, 2022
1 parent 1c18ef4 commit abaef5b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
* [#5654](https://github.com/thanos-io/thanos/pull/5654) Query: add `--grpc-compression` flag that controls the compression used in gRPC client. With the flag it is now possible to compress the traffic between Query and StoreAPI nodes - you get lower network usage in exchange for a bit higher CPU/RAM usage.
- [#5650](https://github.com/thanos-io/thanos/pull/5650) Query Frontend: Add sharded queries metrics.
- [#5658](https://github.com/thanos-io/thanos/pull/5658) Query Frontend: Introduce new optional parameters (`query-range.min-split-interval`, `query-range.max-split-interval`, `query-range.horizontal-shards`) to implement more dynamic horizontal query splitting.
- [#5721](https://github.com/thanos-io/thanos/pull/5721) Store: Add metric `thanos_bucket_store_empty_postings_total` for number of empty postings when fetching series.

### Changed

Expand Down
41 changes: 39 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type bucketStoreMetrics struct {
chunkSizeBytes prometheus.Histogram
queriesDropped *prometheus.CounterVec
seriesRefetches prometheus.Counter
emptyPostingCount prometheus.Counter

cachedPostingsCompressions *prometheus.CounterVec
cachedPostingsCompressionErrors *prometheus.CounterVec
Expand Down Expand Up @@ -255,6 +256,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

m.emptyPostingCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_empty_postings_total",
Help: "Total number of empty postings when fetching block series.",
})

return &m
}

Expand Down Expand Up @@ -789,13 +795,15 @@ func blockSeries(
minTime, maxTime int64, // Series must have data in this time range to be returned.
loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks.
shardMatcher *storepb.ShardMatcher,
emptyPostingsCount prometheus.Counter,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(ctx, matchers)
if err != nil {
return nil, nil, errors.Wrap(err, "expanded matching posting")
}

if len(ps) == 0 {
emptyPostingsCount.Inc()
return storepb.EmptySeriesSet(), indexr.stats, nil
}

Expand Down Expand Up @@ -1086,6 +1094,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.MinTime, req.MaxTime,
req.Aggregates,
shardMatcher,
s.metrics.emptyPostingCount,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1293,7 +1302,21 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

result = strutil.MergeSlices(res, extRes)
} else {
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchersNoExtLabels, nil, seriesLimiter, true, req.Start, req.End, nil, nil)
seriesSet, _, err := blockSeries(
newCtx,
b.extLset,
indexr,
nil,
reqSeriesMatchersNoExtLabels,
nil,
seriesLimiter,
true,
req.Start,
req.End,
nil,
nil,
s.metrics.emptyPostingCount,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}
Expand Down Expand Up @@ -1447,7 +1470,21 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
result = res
} else {
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchersNoExtLabels, nil, seriesLimiter, true, req.Start, req.End, nil, nil)
seriesSet, _, err := blockSeries(
newCtx,
b.extLset,
indexr,
nil,
reqSeriesMatchersNoExtLabels,
nil,
seriesLimiter,
true,
req.Start,
req.End,
nil,
nil,
s.metrics.emptyPostingCount,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ import (
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/thanos-io/objstore/providers/filesystem"
"go.uber.org/atomic"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
Expand Down Expand Up @@ -2286,6 +2288,11 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
// No limits.
chunksLimiter := NewChunksLimiterFactory(0)(nil)
seriesLimiter := NewSeriesLimiterFactory(0)(nil)
dummyCounter := promauto.NewCounter(prometheus.CounterOpts{
Name: "dummy",
Help: "dummy help",
})
ctx := context.Background()

// Run multiple workers to execute the queries.
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -2319,7 +2326,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
indexReader := blk.indexReader()
chunkReader := blk.chunkReader()

seriesSet, _, err := blockSeries(context.Background(), nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil)
seriesSet, _, err := blockSeries(ctx, nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil, dummyCounter)
testutil.Ok(b, err)

// Ensure at least 1 series has been returned (as expected).
Expand Down

0 comments on commit abaef5b

Please sign in to comment.