Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit queried chunks by bytes #3089

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_received_frames` histogram metric.
- [#3147](https://github.com/thanos-io/thanos/pull/3147) Querier: Add `query.metadata.default-time-range` flag to specify the default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning.
- [#3207](https://github.com/thanos-io/thanos/pull/3207) Query Frontend: Add `cache-compression-type` flag to use compression in the query frontend cache.
- [#3089](https://github.com/thanos-io/thanos/pull/3089) Store: Add `store.grpc.series-sample-size-limit` flag to specify the maximum size of samples returned via a single Series call. The zero value means no limit.

### Changed

Expand Down
16 changes: 13 additions & 3 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func registerStore(app *extkingpin.App) {
"Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit.").
Default("0").Uint()

maxSampleSize := cmd.Flag("store.grpc.series-sample-size-limit",
"Maximum size of samples returned (in bytes) via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit.").
Default("0").Uint()

maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int()

objStoreConfig := extkingpin.RegisterCommonObjStoreFlags(cmd, "", true)
Expand Down Expand Up @@ -133,6 +137,7 @@ func registerStore(app *extkingpin.App) {
uint64(*indexCacheSize),
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
uint64(*maxSampleSize),
*maxConcurrent,
component.Store,
debugLogging,
Expand Down Expand Up @@ -169,7 +174,7 @@ func runStore(
grpcGracePeriod time.Duration,
grpcCert, grpcKey, grpcClientCA, httpBindAddr string,
httpGracePeriod time.Duration,
indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount uint64,
indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount, maxSampleSize uint64,
maxConcurrency int,
component component.Component,
verbose bool,
Expand Down Expand Up @@ -287,7 +292,12 @@ func runStore(

queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), maxConcurrency)

bs, err := store.NewBucketStore(
bucketStoreOpts := []store.BucketStoreOption{
store.WithChunksLimit(store.NewChunksLimiterFactory(maxSampleCount / store.MaxSamplesPerChunk)), // The samples limit is an approximation based on the max number of samples per chunk.
store.WithChunksSizeLimit(store.NewChunksLimiterFactory(maxSampleSize)),
}

bs, err := store.NewBucketStoreWithOptions(
logger,
reg,
bkt,
Expand All @@ -296,14 +306,14 @@ func runStore(
indexCache,
queriesGate,
chunkPoolSizeBytes,
store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
verbose,
blockSyncConcurrency,
filterConf,
advertiseCompatibilityLabel,
enablePostingsCompression,
postingOffsetsInMemSampling,
false,
bucketStoreOpts...,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
4 changes: 4 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ Flags:
samples each chunk can contain), so the actual
number of samples might be lower, even though
the maximum could be hit.
--store.grpc.series-sample-size-limit=0
Maximum size of samples returned (in bytes) via
a single Series call. The Series call fails if
this limit is exceeded. 0 means no limit.
--store.grpc.series-max-concurrency=20
Maximum number of concurrent Series calls.
--objstore.config-file=<file-path>
Expand Down
122 changes: 112 additions & 10 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ const (
maxChunkSize = 16000
maxSeriesSize = 64 * 1024

noChunksLimit = 0
noChunksSizeLimit = 0

// CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility
// with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels.
// Now with newer Store Gateway advertising all the external labels it has access to, there was simple case where
Expand Down Expand Up @@ -270,7 +273,9 @@ type BucketStore struct {

// chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call.
chunksLimiterFactory ChunksLimiterFactory
partitioner partitioner
// chunksSizeLimiterFactory creates a new limiter used to limit the size of chunks fetched by each Series() call.
chunksSizeLimiterFactory ChunksLimiterFactory
partitioner partitioner

filterConfig *FilterConfig
advLabelSets []labelpb.ZLabelSet
Expand All @@ -286,6 +291,42 @@ type BucketStore struct {
enableSeriesResponseHints bool
}

type bucketStoreOptions struct {
chunksLimitFactory ChunksLimiterFactory
chunksSizeLimitFactory ChunksLimiterFactory
}

// BucketStoreOption overrides options of the bucket store.
type BucketStoreOption interface {
apply(*bucketStoreOptions)
}

type bucketStoreOptionFunc func(*bucketStoreOptions)

func (f bucketStoreOptionFunc) apply(o *bucketStoreOptions) {
f(o)
}

// WithChunksLimit sets chunks limit for the bucket store.
func WithChunksLimit(f ChunksLimiterFactory) BucketStoreOption {
return bucketStoreOptionFunc(func(lo *bucketStoreOptions) {
lo.chunksLimitFactory = f
})
}

// WithChunksSizeLimit sets chunks size limit for the bucket store.
func WithChunksSizeLimit(f ChunksLimiterFactory) BucketStoreOption {
return bucketStoreOptionFunc(func(lo *bucketStoreOptions) {
lo.chunksSizeLimitFactory = f
})
}

var defaultBucketStoreOptions = bucketStoreOptions{
chunksLimitFactory: NewChunksLimiterFactory(noChunksLimit),
chunksSizeLimitFactory: NewChunksLimiterFactory(noChunksSizeLimit),
}

// Deprecated. Use NewBucketStoreWithOptions instead.
// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func NewBucketStore(
Expand All @@ -305,6 +346,49 @@ func NewBucketStore(
enablePostingsCompression bool,
postingOffsetsInMemSampling int,
enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility.
) (*BucketStore, error) {
bucketStoreOpts := []BucketStoreOption{
WithChunksLimit(chunksLimiterFactory),
WithChunksSizeLimit(NewChunksLimiterFactory(noChunksSizeLimit)),
}
return NewBucketStoreWithOptions(logger,
reg,
bkt,
fetcher,
dir,
indexCache,
queryGate,
maxChunkPoolBytes,
debugLogging,
blockSyncConcurrency,
filterConfig,
enableCompatibilityLabel,
enablePostingsCompression,
postingOffsetsInMemSampling,
enableSeriesResponseHints,
bucketStoreOpts...,
)
}

// NewBucketStoreWithOptions creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func NewBucketStoreWithOptions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwplotka What do you think about this? I've mixed feelings about it, just to avoid adding another argument to NewBucketStore(). Maybe we can rollback options in this PR and follow up the discussion about functional options in #3931?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwplotka What do you think about this? I've mixed feelings about it, just to avoid adding another argument to NewBucketStore(). Maybe we can rollback options in this PR and follow up the discussion about functional options in #3931?

logger log.Logger,
reg prometheus.Registerer,
bkt objstore.InstrumentedBucketReader,
fetcher block.MetadataFetcher,
dir string,
indexCache storecache.IndexCache,
queryGate gate.Gate,
maxChunkPoolBytes uint64,
debugLogging bool,
blockSyncConcurrency int,
filterConfig *FilterConfig,
enableCompatibilityLabel bool,
enablePostingsCompression bool,
postingOffsetsInMemSampling int,
enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility.
opt ...BucketStoreOption,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -315,6 +399,11 @@ func NewBucketStore(
return nil, errors.Wrap(err, "create chunk pool")
}

opts := defaultBucketStoreOptions
for _, o := range opt {
o.apply(&opts)
}

s := &BucketStore{
logger: logger,
bkt: bkt,
Expand All @@ -328,7 +417,8 @@ func NewBucketStore(
blockSyncConcurrency: blockSyncConcurrency,
filterConfig: filterConfig,
queryGate: queryGate,
chunksLimiterFactory: chunksLimiterFactory,
chunksLimiterFactory: opts.chunksLimitFactory,
chunksSizeLimiterFactory: opts.chunksSizeLimitFactory,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enablePostingsCompression: enablePostingsCompression,
Expand Down Expand Up @@ -679,6 +769,7 @@ func blockSeries(
matchers []*labels.Matcher,
req *storepb.SeriesRequest,
chunksLimiter ChunksLimiter,
chunksSizeLimiter ChunksLimiter,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand Down Expand Up @@ -774,6 +865,10 @@ func blockSeries(
return nil, nil, errors.Wrap(err, "preload chunks")
}

if err := chunksSizeLimiter.Reserve(uint64(chunkr.stats.chunksFetchedSizeSum)); err != nil {
return nil, nil, errors.Wrap(err, "exceeded chunks size limit")
}

// Transform all chunks into the response format.
for _, s := range res {
for i, ref := range s.refs {
Expand Down Expand Up @@ -894,16 +989,22 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.MaxTime = s.limitMaxTime(req.MaxTime)

var (
ctx = srv.Context()
stats = &queryStats{}
res []storepb.SeriesSet
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
resHints = &hintspb.SeriesResponseHints{}
reqBlockMatchers []*labels.Matcher
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped)
ctx = srv.Context()
stats = &queryStats{}
res []storepb.SeriesSet
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
resHints = &hintspb.SeriesResponseHints{}
reqBlockMatchers []*labels.Matcher
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped)
chunksSizeLimiter = s.chunksSizeLimiterFactory(s.metrics.queriesDropped)
)

chunksSizeLimiter, err = chunksSizeLimiter.NewWithFailedCounterFrom(chunksLimiter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify why you need NewWithFailedCounterFrom()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pracucci thanks for reviewing this.
Chunks limiter and chunks size limiter should have common failedOnce to avoid concurrent update of the failedCounter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more info on that: store does not know at the moment of ChunksLimit and ChunksSizeLimit creation what metrics will be used. Metrics are the part of the BucketStore. That is why store operates limiter factories and bucket creates new chunk size limiter with the sync.Once shared between limiters. Hope that helps

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think NewWithFailedCounterFrom() is a bit overengineered.

There's no problem updating a metric concurrently, but we may want to distinguish the reason why a query was dropped. An option may be adding a "reason" label to queriesDropped and passing s.metrics.queriesDropped.WithLabelValues("<reason>") to both factories.

if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

if req.Hints != nil {
reqHints := &hintspb.SeriesRequestHints{}
if err := types.UnmarshalAny(req.Hints, reqHints); err != nil {
Expand Down Expand Up @@ -957,6 +1058,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
blockMatchers,
req,
chunksLimiter,
chunksSizeLimiter,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down
30 changes: 19 additions & 11 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/pkg/timestamp"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/model"
Expand Down Expand Up @@ -125,7 +126,8 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o
return
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64,
maxChunksSizeLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -154,7 +156,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
}, nil)
testutil.Ok(t, err)

store, err := NewBucketStore(
store, err := NewBucketStoreWithOptions(
s.logger,
nil,
objstore.WithNoopInstr(bkt),
Expand All @@ -163,14 +165,15 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
s.cache,
nil,
0,
NewChunksLimiterFactory(maxChunksLimit),
false,
20,
filterConf,
true,
true,
DefaultPostingOffsetInMemorySampling,
true,
WithChunksLimit(NewChunksLimiterFactory(maxChunksLimit)),
WithChunksSizeLimit(NewChunksLimiterFactory(maxChunksSizeLimit)),
)
testutil.Ok(t, err)
s.store = store
Expand Down Expand Up @@ -430,7 +433,7 @@ func TestBucketStore_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, 0, emptyRelabelConfig, allowAllFilterConf)

if ok := t.Run("no index cache", func(t *testing.T) {
s.cache.SwapWith(noopCache{})
Expand Down Expand Up @@ -485,7 +488,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, 0, emptyRelabelConfig, allowAllFilterConf)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
Expand Down Expand Up @@ -513,7 +516,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
expectedChunks := uint64(2 * 2)

s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, emptyRelabelConfig, &FilterConfig{
s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, 0, emptyRelabelConfig, &FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
})
Expand Down Expand Up @@ -558,8 +561,9 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
expectedChunks := uint64(2 * 6)

cases := map[string]struct {
maxChunksLimit uint64
expectedErr string
maxChunksLimit uint64
maxChunksSizeLimit uint64
expectedErr string
}{
"should succeed if the max chunks limit is not exceeded": {
maxChunksLimit: expectedChunks,
Expand All @@ -568,6 +572,10 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
maxChunksLimit: expectedChunks - 1,
expectedErr: "exceeded chunks limit",
},
"should fail if the max chunks size limit is exceeded": {
maxChunksSizeLimit: maxChunkSize - 1,
expectedErr: "exceeded chunks size limit",
},
}

for testName, testData := range cases {
Expand All @@ -580,7 +588,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, testData.maxChunksSizeLimit, emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Expand Down Expand Up @@ -614,7 +622,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, 0, emptyRelabelConfig, allowAllFilterConf)

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
Expand Down Expand Up @@ -646,7 +654,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, 0, emptyRelabelConfig, allowAllFilterConf)

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
Expand Down
Loading