Skip to content

Commit

Permalink
Fix inconsistent error for series limits in Store API (thanos-io#6330)
Browse files Browse the repository at this point in the history
* store: fix inconsistent error for series limits

Signed-off-by: Thibault Mange <[email protected]>

* update changelog

Signed-off-by: Thibault Mange <[email protected]>

* Update pkg/store/bucket.go

Co-authored-by: Saswata Mukherjee <[email protected]>
Signed-off-by: Thibault Mange <[email protected]>

* Update pkg/store/bucket.go

Co-authored-by: Saswata Mukherjee <[email protected]>
Signed-off-by: Thibault Mange <[email protected]>

* rename labelValues serires liimiter test function

Signed-off-by: Thibault Mange <[email protected]>

---------

Signed-off-by: Thibault Mange <[email protected]>
Co-authored-by: Saswata Mukherjee <[email protected]>
  • Loading branch information
2 people authored and HC Zhu committed Jun 27, 2023
1 parent 80fdf11 commit aec18b0
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6222](https://github.com/thanos-io/thanos/pull/6222) mixin(Receive): Fix tenant series received charts.
- [#6218](https://github.com/thanos-io/thanos/pull/6218) mixin(Store): handle ResourceExhausted as a non-server error. As a consequence, this error won't contribute to Store's grpc errors alerts.
- [#6271](https://github.com/thanos-io/thanos/pull/6271) Receive: Fix segfault in `LabelValues` during head compaction.
- [#6330](https://github.com/thanos-io/thanos/pull/6330) Store: Fix inconsistent error for series limits.

### Changed
- [#6168](https://github.com/thanos-io/thanos/pull/6168) Receiver: Make ketama hashring fail early when configured with number of nodes lower than the replication factor.
Expand Down
12 changes: 10 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,7 +1574,11 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
s.mtx.RUnlock()

if err := g.Wait(); err != nil {
return nil, status.Error(codes.Internal, err.Error())
code := codes.Internal
if s, ok := status.FromError(errors.Cause(err)); ok {
code = s.Code()
}
return nil, status.Error(code, err.Error())
}

anyHints, err := types.MarshalAny(resHints)
Expand Down Expand Up @@ -1762,7 +1766,11 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
s.mtx.RUnlock()

if err := g.Wait(); err != nil {
return nil, status.Error(codes.Aborted, err.Error())
code := codes.Internal
if s, ok := status.FromError(errors.Cause(err)); ok {
code = s.Code()
}
return nil, status.Error(code, err.Error())
}

anyHints, err := types.MarshalAny(resHints)
Expand Down
110 changes: 110 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package store
import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -760,6 +761,57 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {
})
}

func TestBucketStore_LabelNames_SeriesLimiter_e2e(t *testing.T) {
cases := map[string]struct {
maxSeriesLimit uint64
expectedErr string
code codes.Code
}{
"should succeed if the max series limit is not exceeded": {
maxSeriesLimit: math.MaxUint64,
},
"should fail if the max series limit is exceeded - ResourceExhausted": {
expectedErr: "exceeded series limit",
maxSeriesLimit: 1,
code: codes.ResourceExhausted,
},
}

for testName, testData := range cases {
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bkt := objstore.NewInMemBucket()
dir := t.TempDir()
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))
req := &storepb.LabelNamesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
Start: minTimeDuration.PrometheusTimestamp(),
End: maxTimeDuration.PrometheusTimestamp(),
}

s.cache.SwapWith(noopCache{})

_, err := s.store.LabelNames(context.Background(), req)

if testData.expectedErr == "" {
testutil.Ok(t, err)
} else {
testutil.NotOk(t, err)
testutil.Assert(t, strings.Contains(err.Error(), testData.expectedErr))

status, ok := status.FromError(err)
testutil.Equals(t, true, ok)
testutil.Equals(t, testData.code, status.Code())
}
})
}
}

func TestBucketStore_LabelValues_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -867,6 +919,64 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {
})
}

func TestBucketStore_LabelValues_SeriesLimiter_e2e(t *testing.T) {
cases := map[string]struct {
maxSeriesLimit uint64
expectedErr string
code codes.Code
}{
"should succeed if the max chunks limit is not exceeded": {
maxSeriesLimit: math.MaxUint64,
},
"should fail if the max series limit is exceeded - ResourceExhausted": {
expectedErr: "exceeded series limit",
maxSeriesLimit: 1,
code: codes.ResourceExhausted,
},
}

for testName, testData := range cases {
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bkt := objstore.NewInMemBucket()

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.LabelValuesRequest{
Label: "a",
Start: minTimeDuration.PrometheusTimestamp(),
End: maxTimeDuration.PrometheusTimestamp(),
Matchers: []storepb.LabelMatcher{
{
Type: storepb.LabelMatcher_EQ,
Name: "a",
Value: "1",
},
},
}

s.cache.SwapWith(noopCache{})

_, err := s.store.LabelValues(context.Background(), req)

if testData.expectedErr == "" {
testutil.Ok(t, err)
} else {
testutil.NotOk(t, err)
testutil.Assert(t, strings.Contains(err.Error(), testData.expectedErr))

status, ok := status.FromError(err)
testutil.Equals(t, true, ok)
testutil.Equals(t, testData.code, status.Code())
}
})
}
}

func emptyToNil(values []string) []string {
if len(values) == 0 {
return nil
Expand Down

0 comments on commit aec18b0

Please sign in to comment.