diff --git a/CHANGELOG.md b/CHANGELOG.md index d8fd8a9096..11fa0e033a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re ## Unreleased ### Added +- [#3903](https://github.com/thanos-io/thanos/pull/3903) Store: Returning custom grpc code when reaching series/chunk limits. ### Fixed - [#3204](https://github.com/thanos-io/thanos/pull/3204) Mixin: Use sidecar's metric timestamp for healthcheck. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index f0301f3052..4fabe004c2 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1026,7 +1026,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie err = g.Wait() }) if err != nil { - return status.Error(codes.Aborted, err.Error()) + code := codes.Aborted + if s, ok := status.FromError(errors.Cause(err)); ok { + code = s.Code() + } + return status.Error(code, err.Error()) } stats.blocksQueried = len(res) stats.getAllDuration = time.Since(begin) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 990d5c1e5a..c5bb2563f2 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -14,10 +14,15 @@ import ( "time" "github.com/go-kit/kit/log" + "github.com/gogo/status" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/weaveworks/common/httpgrpc" + "google.golang.org/grpc/codes" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/model" @@ -45,6 +50,20 @@ type swappableCache struct { ptr storecache.IndexCache } +type customLimiter struct { + limiter *Limiter + code codes.Code +} + +func (c *customLimiter) Reserve(num uint64) error { + err := c.limiter.Reserve(num) + if err != nil { + return httpgrpc.Errorf(int(c.code), err.Error()) + } + + return nil +} + func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) { c.ptr = ptr2 } @@ -113,7 +132,25 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o return } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { +func newCustomChunksLimiterFactory(limit uint64, code codes.Code) ChunksLimiterFactory { + return func(failedCounter prometheus.Counter) ChunksLimiter { + return &customLimiter{ + limiter: NewLimiter(limit, failedCounter), + code: code, + } + } +} + +func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterFactory { + return func(failedCounter prometheus.Counter) SeriesLimiter { + return &customLimiter{ + limiter: NewLimiter(limit, failedCounter), + code: code, + } + } +} + +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -151,8 +188,8 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m s.cache, nil, nil, - NewChunksLimiterFactory(maxChunksLimit), - NewSeriesLimiterFactory(0), + chunksLimiterFactory, + seriesLimiterFactory, NewGapBasedPartitioner(PartitionerMaxGapSize), false, 20, @@ -425,7 +462,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) if ok := t.Run("no index cache", func(t *testing.T) { s.cache.SwapWith(noopCache{}) @@ -480,7 +517,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ MaxItemSize: 1e5, @@ -508,7 +545,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { // The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks. expectedChunks := uint64(2 * 2) - s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, emptyRelabelConfig, &FilterConfig{ + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{ MinTime: minTimeDuration, MaxTime: filterMaxTime, }) @@ -554,14 +591,28 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { cases := map[string]struct { maxChunksLimit uint64 + maxSeriesLimit uint64 expectedErr string + code codes.Code }{ "should succeed if the max chunks limit is not exceeded": { maxChunksLimit: expectedChunks, }, - "should fail if the max chunks limit is exceeded": { + "should fail if the max chunks limit is exceeded - ResourceExhausted": { maxChunksLimit: expectedChunks - 1, expectedErr: "exceeded chunks limit", + code: codes.ResourceExhausted, + }, + "should fail if the max chunks limit is exceeded - 422": { + maxChunksLimit: expectedChunks - 1, + expectedErr: "exceeded chunks limit", + code: 422, + }, + "should fail if the max series limit is exceeded - 422": { + maxChunksLimit: expectedChunks, + expectedErr: "exceeded series limit", + maxSeriesLimit: 1, + code: 422, }, } @@ -575,7 +626,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), emptyRelabelConfig, allowAllFilterConf) testutil.Ok(t, s.store.SyncBlocks(ctx)) req := &storepb.SeriesRequest{ @@ -595,6 +646,9 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { } else { testutil.NotOk(t, err) testutil.Assert(t, strings.Contains(err.Error(), testData.expectedErr)) + status, ok := status.FromError(err) + testutil.Equals(t, true, ok) + testutil.Equals(t, testData.code, status.Code()) } }) } @@ -609,7 +663,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) mint, maxt := s.store.TimeRange() testutil.Equals(t, s.minTime, mint) @@ -642,7 +696,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) mint, maxt := s.store.TimeRange() testutil.Equals(t, s.minTime, mint)