Skip to content

Commit

Permalink
Limit queried chunks by bytes
Browse files Browse the repository at this point in the history
Signed-off-by: Max Neverov <[email protected]>
  • Loading branch information
mneverov committed Sep 26, 2020
1 parent 23831cc commit b0f792e
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_received_frames` histogram metric.
- [#3147](https://github.com/thanos-io/thanos/pull/3147) Querier: Add `query.metadata.default-time-range` flag to specify the default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning.
- [#3207](https://github.com/thanos-io/thanos/pull/3207) Query Frontend: Add `cache-compression-type` flag to use compression in the query frontend cache.
- [#3089](https://github.com/thanos-io/thanos/pull/3089) Store: Add `store.grpc.series-sample-size-limit` flag to specify the maximum size of samples returned via a single Series call. The zero value means no limit.

### Changed

Expand Down
16 changes: 13 additions & 3 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func registerStore(app *extkingpin.App) {
"Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit.").
Default("0").Uint()

maxSampleSize := cmd.Flag("store.grpc.series-sample-size-limit",
"Maximum size of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit.").
Default("0").Uint()

maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int()

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)
Expand Down Expand Up @@ -133,6 +137,7 @@ func registerStore(app *extkingpin.App) {
uint64(*indexCacheSize),
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
uint64(*maxSampleSize),
*maxConcurrent,
component.Store,
debugLogging,
Expand Down Expand Up @@ -169,7 +174,7 @@ func runStore(
grpcGracePeriod time.Duration,
grpcCert, grpcKey, grpcClientCA, httpBindAddr string,
httpGracePeriod time.Duration,
indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount uint64,
indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount, maxSampleSize uint64,
maxConcurrency int,
component component.Component,
verbose bool,
Expand Down Expand Up @@ -287,7 +292,12 @@ func runStore(

queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), maxConcurrency)

bs, err := store.NewBucketStore(
bucketStoreOpts := []store.BucketStoreOption{
store.WithChunksLimit(store.NewChunksLimiterFactory(maxSampleCount / store.MaxSamplesPerChunk)), // The samples limit is an approximation based on the max number of samples per chunk.
store.WithChunksSizeLimit(store.NewChunksLimiterFactory(maxSampleSize)),
}

bs, err := store.New(
logger,
reg,
bkt,
Expand All @@ -296,14 +306,14 @@ func runStore(
indexCache,
queriesGate,
chunkPoolSizeBytes,
store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
verbose,
blockSyncConcurrency,
filterConf,
advertiseCompatibilityLabel,
enablePostingsCompression,
postingOffsetsInMemSampling,
false,
bucketStoreOpts...,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
4 changes: 4 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ Flags:
samples each chunk can contain), so the actual
number of samples might be lower, even though
the maximum could be hit.
--store.grpc.series-sample-size-limit=0
Maximum size of samples returned via a single
Series call. The Series call fails if this
limit is exceeded. 0 means no limit.
--store.grpc.series-max-concurrency=20
Maximum number of concurrent Series calls.
--objstore.config-file=<file-path>
Expand Down
121 changes: 111 additions & 10 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ const (
maxChunkSize = 16000
maxSeriesSize = 64 * 1024

noChunksLimit = 0
noChunksSizeLimit = 0

// CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility
// with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels.
// Now with newer Store Gateway advertising all the external labels it has access to, there was simple case where
Expand Down Expand Up @@ -270,7 +273,9 @@ type BucketStore struct {

// chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call.
chunksLimiterFactory ChunksLimiterFactory
partitioner partitioner
// chunksSizeLimiterFactory creates a new limiter used to limit the size of chunks fetched by each Series() call.
chunksSizeLimiterFactory ChunksLimiterFactory
partitioner partitioner

filterConfig *FilterConfig
advLabelSets []storepb.LabelSet
Expand All @@ -286,6 +291,41 @@ type BucketStore struct {
enableSeriesResponseHints bool
}

type bucketStoreOptions struct {
chunksLimitFactory ChunksLimiterFactory
chunksSizeLimitFactory ChunksLimiterFactory
}

// BucketStoreOption overrides options of the bucket store.
type BucketStoreOption interface {
apply(*bucketStoreOptions)
}

type bucketStoreOptionFunc func(*bucketStoreOptions)

func (f bucketStoreOptionFunc) apply(o *bucketStoreOptions) {
f(o)
}

// WithChunksLimit sets chunks limit for the bucket store.
func WithChunksLimit(f ChunksLimiterFactory) BucketStoreOption {
return bucketStoreOptionFunc(func(lo *bucketStoreOptions) {
lo.chunksLimitFactory = f
})
}

// WithChunksSizeLimit sets chunks size limit for the bucket store.
func WithChunksSizeLimit(f ChunksLimiterFactory) BucketStoreOption {
return bucketStoreOptionFunc(func(lo *bucketStoreOptions) {
lo.chunksSizeLimitFactory = f
})
}

var defaultBucketStoreOptions = bucketStoreOptions{
chunksLimitFactory: NewChunksLimiterFactory(noChunksLimit),
chunksSizeLimitFactory: NewChunksLimiterFactory(noChunksSizeLimit),
}

// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func NewBucketStore(
Expand All @@ -305,6 +345,49 @@ func NewBucketStore(
enablePostingsCompression bool,
postingOffsetsInMemSampling int,
enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility.
) (*BucketStore, error) {
bucketStoreOpts := []BucketStoreOption{
WithChunksLimit(chunksLimiterFactory),
WithChunksSizeLimit(NewChunksLimiterFactory(noChunksSizeLimit)),
}
return New(logger,
reg,
bkt,
fetcher,
dir,
indexCache,
queryGate,
maxChunkPoolBytes,
debugLogging,
blockSyncConcurrency,
filterConfig,
enableCompatibilityLabel,
enablePostingsCompression,
postingOffsetsInMemSampling,
enableSeriesResponseHints,
bucketStoreOpts...,
)
}

// New creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func New(
logger log.Logger,
reg prometheus.Registerer,
bkt objstore.InstrumentedBucketReader,
fetcher block.MetadataFetcher,
dir string,
indexCache storecache.IndexCache,
queryGate gate.Gate,
maxChunkPoolBytes uint64,
debugLogging bool,
blockSyncConcurrency int,
filterConfig *FilterConfig,
enableCompatibilityLabel bool,
enablePostingsCompression bool,
postingOffsetsInMemSampling int,
enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility.
opt ...BucketStoreOption,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -315,6 +398,11 @@ func NewBucketStore(
return nil, errors.Wrap(err, "create chunk pool")
}

opts := defaultBucketStoreOptions
for _, o := range opt {
o.apply(&opts)
}

s := &BucketStore{
logger: logger,
bkt: bkt,
Expand All @@ -328,7 +416,8 @@ func NewBucketStore(
blockSyncConcurrency: blockSyncConcurrency,
filterConfig: filterConfig,
queryGate: queryGate,
chunksLimiterFactory: chunksLimiterFactory,
chunksLimiterFactory: opts.chunksLimitFactory,
chunksSizeLimiterFactory: opts.chunksSizeLimitFactory,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enablePostingsCompression: enablePostingsCompression,
Expand Down Expand Up @@ -679,6 +768,7 @@ func blockSeries(
matchers []*labels.Matcher,
req *storepb.SeriesRequest,
chunksLimiter ChunksLimiter,
chunksSizeLimiter ChunksLimiter,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand Down Expand Up @@ -756,6 +846,10 @@ func blockSeries(
return nil, nil, errors.Wrap(err, "preload chunks")
}

if err := chunksSizeLimiter.Reserve(uint64(chunkr.stats.chunksFetchedSizeSum)); err != nil {
return nil, nil, errors.Wrap(err, "exceeded chunks size limit")
}

// Transform all chunks into the response format.
for _, s := range res {
for i, ref := range s.refs {
Expand Down Expand Up @@ -876,16 +970,22 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.MaxTime = s.limitMaxTime(req.MaxTime)

var (
ctx = srv.Context()
stats = &queryStats{}
res []storepb.SeriesSet
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
resHints = &hintspb.SeriesResponseHints{}
reqBlockMatchers []*labels.Matcher
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped)
ctx = srv.Context()
stats = &queryStats{}
res []storepb.SeriesSet
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
resHints = &hintspb.SeriesResponseHints{}
reqBlockMatchers []*labels.Matcher
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped)
chunksSizeLimiter = s.chunksSizeLimiterFactory(s.metrics.queriesDropped)
)

chunksSizeLimiter, err = chunksSizeLimiter.NewWithFailedCounterFrom(chunksLimiter)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

if req.Hints != nil {
reqHints := &hintspb.SeriesRequestHints{}
if err := types.UnmarshalAny(req.Hints, reqHints); err != nil {
Expand Down Expand Up @@ -936,6 +1036,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
blockMatchers,
req,
chunksLimiter,
chunksSizeLimiter,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down
30 changes: 19 additions & 11 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/pkg/timestamp"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/model"
Expand Down Expand Up @@ -124,7 +125,8 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o
return
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64,
maxChunksSizeLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -153,7 +155,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
}, nil)
testutil.Ok(t, err)

store, err := NewBucketStore(
store, err := New(
s.logger,
nil,
objstore.WithNoopInstr(bkt),
Expand All @@ -162,14 +164,15 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
s.cache,
nil,
0,
NewChunksLimiterFactory(maxChunksLimit),
false,
20,
filterConf,
true,
true,
DefaultPostingOffsetInMemorySampling,
true,
WithChunksLimit(NewChunksLimiterFactory(maxChunksLimit)),
WithChunksSizeLimit(NewChunksLimiterFactory(maxChunksSizeLimit)),
)
testutil.Ok(t, err)
s.store = store
Expand Down Expand Up @@ -429,7 +432,7 @@ func TestBucketStore_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, 0, emptyRelabelConfig, allowAllFilterConf)

if ok := t.Run("no index cache", func(t *testing.T) {
s.cache.SwapWith(noopCache{})
Expand Down Expand Up @@ -484,7 +487,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, 0, emptyRelabelConfig, allowAllFilterConf)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
Expand Down Expand Up @@ -512,7 +515,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
expectedChunks := uint64(2 * 2)

s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, emptyRelabelConfig, &FilterConfig{
s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, 0, emptyRelabelConfig, &FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
})
Expand Down Expand Up @@ -557,8 +560,9 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
expectedChunks := uint64(2 * 6)

cases := map[string]struct {
maxChunksLimit uint64
expectedErr string
maxChunksLimit uint64
maxChunksSizeLimit uint64
expectedErr string
}{
"should succeed if the max chunks limit is not exceeded": {
maxChunksLimit: expectedChunks,
Expand All @@ -567,6 +571,10 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
maxChunksLimit: expectedChunks - 1,
expectedErr: "exceeded chunks limit",
},
"should fail if the max chunks size limit is exceeded": {
maxChunksSizeLimit: maxChunkSize - 1,
expectedErr: "exceeded chunks size limit",
},
}

for testName, testData := range cases {
Expand All @@ -579,7 +587,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, testData.maxChunksSizeLimit, emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Expand Down Expand Up @@ -613,7 +621,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, 0, emptyRelabelConfig, allowAllFilterConf)

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
Expand Down Expand Up @@ -645,7 +653,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, 0, emptyRelabelConfig, allowAllFilterConf)

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
Expand Down
Loading

0 comments on commit b0f792e

Please sign in to comment.