diff --git a/CHANGELOG.md b/CHANGELOG.md index 19b095f344..0caa450c5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_received_frames` histogram metric. - [#3147](https://github.com/thanos-io/thanos/pull/3147) Querier: Add `query.metadata.default-time-range` flag to specify the default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning. - [#3207](https://github.com/thanos-io/thanos/pull/3207) Query Frontend: Add `cache-compression-type` flag to use compression in the query frontend cache. +- [#3089](https://github.com/thanos-io/thanos/pull/3089) Store: Add `store.grpc.series-sample-size-limit` flag to specify the maximum size of samples returned via a single Series call. The zero value means no limit. ### Changed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 17c1339182..98f9016cb3 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -68,6 +68,10 @@ func registerStore(app *extkingpin.App) { "Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit."). Default("0").Uint() + maxSampleSize := cmd.Flag("store.grpc.series-sample-size-limit", + "Maximum size of samples returned (in bytes) via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit."). + Default("0").Uint() + maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int() objStoreConfig := extkingpin.RegisterCommonObjStoreFlags(cmd, "", true) @@ -133,6 +137,7 @@ func registerStore(app *extkingpin.App) { uint64(*indexCacheSize), uint64(*chunkPoolSize), uint64(*maxSampleCount), + uint64(*maxSampleSize), *maxConcurrent, component.Store, debugLogging, @@ -169,7 +174,7 @@ func runStore( grpcGracePeriod time.Duration, grpcCert, grpcKey, grpcClientCA, httpBindAddr string, httpGracePeriod time.Duration, - indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount uint64, + indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount, maxSampleSize uint64, maxConcurrency int, component component.Component, verbose bool, @@ -287,7 +292,12 @@ func runStore( queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), maxConcurrency) - bs, err := store.NewBucketStore( + bucketStoreOpts := []store.BucketStoreOption{ + store.WithChunksLimit(store.NewChunksLimiterFactory(maxSampleCount / store.MaxSamplesPerChunk)), // The samples limit is an approximation based on the max number of samples per chunk. + store.WithChunksSizeLimit(store.NewChunksLimiterFactory(maxSampleSize)), + } + + bs, err := store.NewBucketStoreWithOptions( logger, reg, bkt, @@ -296,7 +306,6 @@ func runStore( indexCache, queriesGate, chunkPoolSizeBytes, - store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. verbose, blockSyncConcurrency, filterConf, @@ -304,6 +313,7 @@ func runStore( enablePostingsCompression, postingOffsetsInMemSampling, false, + bucketStoreOpts..., ) if err != nil { return errors.Wrap(err, "create object storage store") diff --git a/docs/components/store.md b/docs/components/store.md index d2318c09d9..ea81de1e4c 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -101,6 +101,10 @@ Flags: samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit. + --store.grpc.series-sample-size-limit=0 + Maximum size of samples returned (in bytes) via + a single Series call. The Series call fails if + this limit is exceeded. 0 means no limit. --store.grpc.series-max-concurrency=20 Maximum number of concurrent Series calls. --objstore.config-file= diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 6ec95eed57..067a97060a 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -64,6 +64,9 @@ const ( maxChunkSize = 16000 maxSeriesSize = 64 * 1024 + noChunksLimit = 0 + noChunksSizeLimit = 0 + // CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility // with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels. // Now with newer Store Gateway advertising all the external labels it has access to, there was simple case where @@ -270,7 +273,9 @@ type BucketStore struct { // chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call. chunksLimiterFactory ChunksLimiterFactory - partitioner partitioner + // chunksSizeLimiterFactory creates a new limiter used to limit the size of chunks fetched by each Series() call. + chunksSizeLimiterFactory ChunksLimiterFactory + partitioner partitioner filterConfig *FilterConfig advLabelSets []labelpb.ZLabelSet @@ -286,6 +291,42 @@ type BucketStore struct { enableSeriesResponseHints bool } +type bucketStoreOptions struct { + chunksLimitFactory ChunksLimiterFactory + chunksSizeLimitFactory ChunksLimiterFactory +} + +// BucketStoreOption overrides options of the bucket store. +type BucketStoreOption interface { + apply(*bucketStoreOptions) +} + +type bucketStoreOptionFunc func(*bucketStoreOptions) + +func (f bucketStoreOptionFunc) apply(o *bucketStoreOptions) { + f(o) +} + +// WithChunksLimit sets chunks limit for the bucket store. +func WithChunksLimit(f ChunksLimiterFactory) BucketStoreOption { + return bucketStoreOptionFunc(func(lo *bucketStoreOptions) { + lo.chunksLimitFactory = f + }) +} + +// WithChunksSizeLimit sets chunks size limit for the bucket store. +func WithChunksSizeLimit(f ChunksLimiterFactory) BucketStoreOption { + return bucketStoreOptionFunc(func(lo *bucketStoreOptions) { + lo.chunksSizeLimitFactory = f + }) +} + +var defaultBucketStoreOptions = bucketStoreOptions{ + chunksLimitFactory: NewChunksLimiterFactory(noChunksLimit), + chunksSizeLimitFactory: NewChunksLimiterFactory(noChunksSizeLimit), +} + +// Deprecated. Use NewBucketStoreWithOptions instead. // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -305,6 +346,49 @@ func NewBucketStore( enablePostingsCompression bool, postingOffsetsInMemSampling int, enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility. +) (*BucketStore, error) { + bucketStoreOpts := []BucketStoreOption{ + WithChunksLimit(chunksLimiterFactory), + WithChunksSizeLimit(NewChunksLimiterFactory(noChunksSizeLimit)), + } + return NewBucketStoreWithOptions(logger, + reg, + bkt, + fetcher, + dir, + indexCache, + queryGate, + maxChunkPoolBytes, + debugLogging, + blockSyncConcurrency, + filterConfig, + enableCompatibilityLabel, + enablePostingsCompression, + postingOffsetsInMemSampling, + enableSeriesResponseHints, + bucketStoreOpts..., + ) +} + +// NewBucketStoreWithOptions creates a new bucket backed store that implements the store API against +// an object store bucket. It is optimized to work against high latency backends. +func NewBucketStoreWithOptions( + logger log.Logger, + reg prometheus.Registerer, + bkt objstore.InstrumentedBucketReader, + fetcher block.MetadataFetcher, + dir string, + indexCache storecache.IndexCache, + queryGate gate.Gate, + maxChunkPoolBytes uint64, + debugLogging bool, + blockSyncConcurrency int, + filterConfig *FilterConfig, + enableCompatibilityLabel bool, + enablePostingsCompression bool, + postingOffsetsInMemSampling int, + enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility. + opt ...BucketStoreOption, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -315,6 +399,11 @@ func NewBucketStore( return nil, errors.Wrap(err, "create chunk pool") } + opts := defaultBucketStoreOptions + for _, o := range opt { + o.apply(&opts) + } + s := &BucketStore{ logger: logger, bkt: bkt, @@ -328,7 +417,8 @@ func NewBucketStore( blockSyncConcurrency: blockSyncConcurrency, filterConfig: filterConfig, queryGate: queryGate, - chunksLimiterFactory: chunksLimiterFactory, + chunksLimiterFactory: opts.chunksLimitFactory, + chunksSizeLimiterFactory: opts.chunksSizeLimitFactory, partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, enableCompatibilityLabel: enableCompatibilityLabel, enablePostingsCompression: enablePostingsCompression, @@ -679,6 +769,7 @@ func blockSeries( matchers []*labels.Matcher, req *storepb.SeriesRequest, chunksLimiter ChunksLimiter, + chunksSizeLimiter ChunksLimiter, ) (storepb.SeriesSet, *queryStats, error) { ps, err := indexr.ExpandedPostings(matchers) if err != nil { @@ -774,6 +865,10 @@ func blockSeries( return nil, nil, errors.Wrap(err, "preload chunks") } + if err := chunksSizeLimiter.Reserve(uint64(chunkr.stats.chunksFetchedSizeSum)); err != nil { + return nil, nil, errors.Wrap(err, "exceeded chunks size limit") + } + // Transform all chunks into the response format. for _, s := range res { for i, ref := range s.refs { @@ -894,16 +989,22 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie req.MaxTime = s.limitMaxTime(req.MaxTime) var ( - ctx = srv.Context() - stats = &queryStats{} - res []storepb.SeriesSet - mtx sync.Mutex - g, gctx = errgroup.WithContext(ctx) - resHints = &hintspb.SeriesResponseHints{} - reqBlockMatchers []*labels.Matcher - chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped) + ctx = srv.Context() + stats = &queryStats{} + res []storepb.SeriesSet + mtx sync.Mutex + g, gctx = errgroup.WithContext(ctx) + resHints = &hintspb.SeriesResponseHints{} + reqBlockMatchers []*labels.Matcher + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped) + chunksSizeLimiter = s.chunksSizeLimiterFactory(s.metrics.queriesDropped) ) + chunksSizeLimiter, err = chunksSizeLimiter.NewWithFailedCounterFrom(chunksLimiter) + if err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + if req.Hints != nil { reqHints := &hintspb.SeriesRequestHints{} if err := types.UnmarshalAny(req.Hints, reqHints); err != nil { @@ -957,6 +1058,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie blockMatchers, req, chunksLimiter, + chunksSizeLimiter, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 873e61eb33..598df937f2 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/model" @@ -125,7 +126,8 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o return } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64, + maxChunksSizeLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -154,7 +156,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m }, nil) testutil.Ok(t, err) - store, err := NewBucketStore( + store, err := NewBucketStoreWithOptions( s.logger, nil, objstore.WithNoopInstr(bkt), @@ -163,7 +165,6 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m s.cache, nil, 0, - NewChunksLimiterFactory(maxChunksLimit), false, 20, filterConf, @@ -171,6 +172,8 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m true, DefaultPostingOffsetInMemorySampling, true, + WithChunksLimit(NewChunksLimiterFactory(maxChunksLimit)), + WithChunksSizeLimit(NewChunksLimiterFactory(maxChunksSizeLimit)), ) testutil.Ok(t, err) s.store = store @@ -430,7 +433,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, 0, emptyRelabelConfig, allowAllFilterConf) if ok := t.Run("no index cache", func(t *testing.T) { s.cache.SwapWith(noopCache{}) @@ -485,7 +488,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, 0, emptyRelabelConfig, allowAllFilterConf) indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ MaxItemSize: 1e5, @@ -513,7 +516,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { // The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks. expectedChunks := uint64(2 * 2) - s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, emptyRelabelConfig, &FilterConfig{ + s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, 0, emptyRelabelConfig, &FilterConfig{ MinTime: minTimeDuration, MaxTime: filterMaxTime, }) @@ -558,8 +561,9 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { expectedChunks := uint64(2 * 6) cases := map[string]struct { - maxChunksLimit uint64 - expectedErr string + maxChunksLimit uint64 + maxChunksSizeLimit uint64 + expectedErr string }{ "should succeed if the max chunks limit is not exceeded": { maxChunksLimit: expectedChunks, @@ -568,6 +572,10 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { maxChunksLimit: expectedChunks - 1, expectedErr: "exceeded chunks limit", }, + "should fail if the max chunks size limit is exceeded": { + maxChunksSizeLimit: maxChunkSize - 1, + expectedErr: "exceeded chunks size limit", + }, } for testName, testData := range cases { @@ -580,7 +588,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, testData.maxChunksSizeLimit, emptyRelabelConfig, allowAllFilterConf) testutil.Ok(t, s.store.SyncBlocks(ctx)) req := &storepb.SeriesRequest{ @@ -614,7 +622,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, 0, emptyRelabelConfig, allowAllFilterConf) mint, maxt := s.store.TimeRange() testutil.Equals(t, s.minTime, mint) @@ -646,7 +654,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, 0, emptyRelabelConfig, allowAllFilterConf) mint, maxt := s.store.TimeRange() testutil.Equals(t, s.minTime, mint) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 5d4145e377..6053201203 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -28,6 +28,7 @@ import ( "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" @@ -563,7 +564,7 @@ func TestBucketStore_Info(t *testing.T) { defer testutil.Ok(t, os.RemoveAll(dir)) - bucketStore, err := NewBucketStore( + bucketStore, err := NewBucketStoreWithOptions( nil, nil, nil, @@ -572,7 +573,6 @@ func TestBucketStore_Info(t *testing.T) { noopCache{}, nil, 2e5, - NewChunksLimiterFactory(0), false, 20, allowAllFilterConf, @@ -580,6 +580,7 @@ func TestBucketStore_Info(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, false, + WithChunksLimit(NewChunksLimiterFactory(0)), ) testutil.Ok(t, err) @@ -593,6 +594,49 @@ func TestBucketStore_Info(t *testing.T) { testutil.Equals(t, []labelpb.ZLabel(nil), resp.Labels) } +func TestSeries_ErrorInvalidLimiter(t *testing.T) { + defer testutil.TolerantVerifyLeak(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dir := t.TempDir() + store, err := NewBucketStoreWithOptions( + nil, + nil, + nil, + nil, + dir, + noopCache{}, + nil, + 2e5, + false, + 20, + allowAllFilterConf, + true, + true, + DefaultPostingOffsetInMemorySampling, + false, + WithChunksLimit(NewChunksLimiterFactory(0)), + ) + testutil.Ok(t, err) + store.chunksLimiterFactory = func(failedCounter prometheus.Counter) ChunksLimiter { + return nil + } + + req := &storepb.SeriesRequest{ + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "test"}, + }, + } + srv := newStoreSeriesServer(ctx) + + err = store.Series(req, srv) + + testutil.NotOk(t, err) + testutil.Equals(t, true, regexp.MustCompile("failed to share counter from").MatchString(err.Error())) +} + type recorder struct { mtx sync.Mutex objstore.Bucket @@ -812,7 +856,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul }, nil) testutil.Ok(t, err) - bucketStore, err := NewBucketStore( + bucketStore, err := NewBucketStoreWithOptions( logger, nil, objstore.WithNoopInstr(rec), @@ -821,7 +865,6 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul noopCache{}, nil, 0, - NewChunksLimiterFactory(0), false, 20, allowAllFilterConf, @@ -829,6 +872,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul true, DefaultPostingOffsetInMemorySampling, false, + WithChunksLimit(NewChunksLimiterFactory(0)), ) testutil.Ok(t, err) @@ -1248,8 +1292,9 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{blocks}}, }, - queryGate: noopGate{}, - chunksLimiterFactory: NewChunksLimiterFactory(0), + queryGate: noopGate{}, + chunksLimiterFactory: NewChunksLimiterFactory(0), + chunksSizeLimiterFactory: NewChunksLimiterFactory(0), } for _, block := range blocks { @@ -1454,8 +1499,9 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { b1.meta.ULID: b1, b2.meta.ULID: b2, }, - queryGate: noopGate{}, - chunksLimiterFactory: NewChunksLimiterFactory(0), + queryGate: noopGate{}, + chunksLimiterFactory: NewChunksLimiterFactory(0), + chunksSizeLimiterFactory: NewChunksLimiterFactory(0), } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) { @@ -1565,7 +1611,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) testutil.Ok(tb, err) - store, err := NewBucketStore( + store, err := NewBucketStoreWithOptions( logger, nil, instrBkt, @@ -1574,7 +1620,6 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { indexCache, nil, 1000000, - NewChunksLimiterFactory(10000/MaxSamplesPerChunk), false, 10, nil, @@ -1582,6 +1627,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, true, + WithChunksLimit(NewChunksLimiterFactory(10000/MaxSamplesPerChunk)), ) testutil.Ok(tb, err) testutil.Ok(tb, store.SyncBlocks(context.Background())) @@ -1674,7 +1720,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) testutil.Ok(tb, err) - store, err := NewBucketStore( + store, err := NewBucketStoreWithOptions( logger, nil, instrBkt, @@ -1683,7 +1729,6 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { indexCache, nil, 1000000, - NewChunksLimiterFactory(10000/MaxSamplesPerChunk), false, 10, nil, @@ -1691,6 +1736,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, true, + WithChunksLimit(NewChunksLimiterFactory(10000/MaxSamplesPerChunk)), ) testutil.Ok(tb, err) testutil.Ok(tb, store.SyncBlocks(context.Background())) @@ -1777,7 +1823,7 @@ func TestBlockWithLargeChunks(t *testing.T) { indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) testutil.Ok(t, err) - store, err := NewBucketStore( + store, err := NewBucketStoreWithOptions( logger, nil, instrBkt, @@ -1786,7 +1832,6 @@ func TestBlockWithLargeChunks(t *testing.T) { indexCache, nil, 1000000, - NewChunksLimiterFactory(10000/MaxSamplesPerChunk), false, 10, nil, @@ -1794,6 +1839,8 @@ func TestBlockWithLargeChunks(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, true, + WithChunksLimit(NewChunksLimiterFactory(10000/MaxSamplesPerChunk)), + WithChunksSizeLimit(NewChunksLimiterFactory(0)), ) testutil.Ok(t, err) testutil.Ok(t, store.SyncBlocks(context.Background())) diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index c60be901e9..850416335f 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -12,10 +12,12 @@ import ( ) type ChunksLimiter interface { - // Reserve num chunks out of the total number of chunks enforced by the limiter. - // Returns an error if the limit has been exceeded. This function must be - // goroutine safe. + // Reserve num chunks or bytes out of the total number of chunks or bytes enforced by the limiter. + // Returns an error if the limit has been exceeded. This function must be goroutine safe. Reserve(num uint64) error + + // NewWithFailedCounterFrom creates a new chunks limiter from existing with failed counter from the argument. + NewWithFailedCounterFrom(l ChunksLimiter) (ChunksLimiter, error) } // ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for @@ -29,12 +31,12 @@ type Limiter struct { // Counter metric which we will increase if limit is exceeded. failedCounter prometheus.Counter - failedOnce sync.Once + failedOnce *sync.Once } // NewLimiter returns a new limiter with a specified limit. 0 disables the limit. func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter { - return &Limiter{limit: limit, failedCounter: ctr} + return &Limiter{limit: limit, failedCounter: ctr, failedOnce: &sync.Once{}} } // Reserve implements ChunksLimiter. @@ -51,6 +53,21 @@ func (l *Limiter) Reserve(num uint64) error { return nil } +// NewWithFailedCounterFrom creates a new limiter with failed counter from the argument. +func (l *Limiter) NewWithFailedCounterFrom(l2 ChunksLimiter) (ChunksLimiter, error) { + from, ok := l2.(*Limiter) + if !ok || from == nil { + return &Limiter{}, errors.Errorf("failed to share counter from %#v", l2) + } + + return &Limiter{ + limit: l.limit, + reserved: l.reserved, + failedCounter: from.failedCounter, + failedOnce: from.failedOnce, + }, nil +} + // NewChunksLimiterFactory makes a new ChunksLimiterFactory with a static limit. func NewChunksLimiterFactory(limit uint64) ChunksLimiterFactory { return func(failedCounter prometheus.Counter) ChunksLimiter { diff --git a/pkg/store/limiter_test.go b/pkg/store/limiter_test.go index 3e3fc677d4..9e3714ceb5 100644 --- a/pkg/store/limiter_test.go +++ b/pkg/store/limiter_test.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/thanos-io/thanos/pkg/testutil" ) @@ -28,3 +29,67 @@ func TestLimiter(t *testing.T) { testutil.NotOk(t, l.Reserve(2)) testutil.Equals(t, float64(1), prom_testutil.ToFloat64(c)) } + +func TestNewWithFailedCounterFrom(t *testing.T) { + c := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + expectedLimit := uint64(42) + l := NewLimiter(expectedLimit, c) + + t.Run("should return error when from limiter is nil", func(t *testing.T) { + var nilLimiter *Limiter + _, err := l.NewWithFailedCounterFrom(nilLimiter) + testutil.NotOk(t, err) + }) + + t.Run("should return error when from limiter is of different type", func(t *testing.T) { + unknownLimiter := struct { + ChunksLimiter + }{} + _, err := l.NewWithFailedCounterFrom(unknownLimiter) + testutil.NotOk(t, err) + }) + + t.Run("should create new limiter with given counter", func(t *testing.T) { + c2 := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + limit := uint64(1) + counterSource := NewLimiter(limit, c2) + + err := l.Reserve(expectedLimit - 1) + testutil.Ok(t, err) + + res, err := l.NewWithFailedCounterFrom(counterSource) + testutil.Ok(t, err) + + resLimiter, ok := res.(*Limiter) + if !ok || resLimiter == nil { + t.Fatalf("unexpected limiter in result %#v", resLimiter) + } + + testutil.Equals(t, resLimiter.failedCounter, counterSource.failedCounter) + testutil.Equals(t, resLimiter.failedOnce, counterSource.failedOnce) + testutil.Equals(t, resLimiter.limit, expectedLimit) + testutil.Equals(t, resLimiter.reserved, l.reserved) + }) +} + +func TestLimiterFailedOnce(t *testing.T) { + c := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + factoryWithLimit1 := NewChunksLimiterFactory(1) + factoryWithLimit2 := NewChunksLimiterFactory(2) + + l1 := factoryWithLimit1(c) + l2, err := factoryWithLimit2(c).NewWithFailedCounterFrom(l1) + testutil.Ok(t, err) + + testutil.Ok(t, l1.Reserve(1)) + testutil.Equals(t, float64(0), prom_testutil.ToFloat64(c)) + + testutil.Ok(t, l2.Reserve(2)) + testutil.Equals(t, float64(0), prom_testutil.ToFloat64(c)) + + testutil.NotOk(t, l1.Reserve(1)) + testutil.Equals(t, float64(1), prom_testutil.ToFloat64(c)) + + testutil.NotOk(t, l2.Reserve(1)) + testutil.Equals(t, float64(1), prom_testutil.ToFloat64(c)) +}