Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed --store.grpc.series-sample-limit #2858

Merged
merged 2 commits into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive, Querier: Fixed leaks on receive and querier Store API Series, which were leaking on errors.
- [#2895](https://github.com/thanos-io/thanos/pull/2895) Compact: Fix increment of `thanos_compact_downsample_total` metric for downsample of 5m resolution blocks.
- [#2858](https://github.com/thanos-io/thanos/pull/2858) Store: Fix `--store.grpc.series-sample-limit` implementation. The limit is now applied to the sum of all samples fetched across all queried blocks via a single Series call, instead of applying it individually to each block.

### Added

Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
Default("2GB").Bytes()

maxSampleCount := cmd.Flag("store.grpc.series-sample-limit",
"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: For efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit.").
"Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit.").
Default("0").Uint()

maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int()
Expand Down Expand Up @@ -296,7 +296,7 @@ func runStore(
indexCache,
queriesGate,
chunkPoolSizeBytes,
maxSampleCount,
store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
verbose,
blockSyncConcurrency,
filterConf,
Expand Down
13 changes: 8 additions & 5 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,14 @@ Flags:
memory.
--store.grpc.series-sample-limit=0
Maximum amount of samples returned via a single
Series call. 0 means no limit. NOTE: For
efficiency we take 120 as the number of samples
in chunk (it cannot be bigger than that), so
the actual number of samples might be lower,
even though the maximum could be hit.
Series call. The Series call fails if this
limit is exceeded. 0 means no limit. NOTE: For
efficiency the limit is internally implemented
as 'chunks limit' considering each chunk
contains 120 samples (it's the max number of
samples each chunk can contain), so the actual
number of samples might be lower, even though
the maximum could be hit.
--store.grpc.series-max-concurrency=20
Maximum number of concurrent Series calls.
--objstore.config-file=<file-path>
Expand Down
40 changes: 17 additions & 23 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ import (
)

const (
// maxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed
// MaxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed
// for precalculating the number of samples that we may have to retrieve and decode for any given query
// without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know
// where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way
// because you barely get any improvements in compression when the number of samples is beyond this.
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
maxSamplesPerChunk = 120
MaxSamplesPerChunk = 120
maxChunkSize = 16000
maxSeriesSize = 64 * 1024

Expand Down Expand Up @@ -240,9 +240,9 @@ type BucketStore struct {
// Query gate which limits the maximum amount of concurrent queries.
queryGate gate.Gate

// samplesLimiter limits the number of samples per each Series() call.
samplesLimiter SampleLimiter
partitioner partitioner
// chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call.
chunksLimiterFactory ChunksLimiterFactory
partitioner partitioner

filterConfig *FilterConfig
advLabelSets []storepb.LabelSet
Expand All @@ -269,7 +269,7 @@ func NewBucketStore(
indexCache storecache.IndexCache,
queryGate gate.Gate,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
chunksLimiterFactory ChunksLimiterFactory,
debugLogging bool,
blockSyncConcurrency int,
filterConfig *FilterConfig,
Expand All @@ -287,7 +287,6 @@ func NewBucketStore(
return nil, errors.Wrap(err, "create chunk pool")
}

metrics := newBucketStoreMetrics(reg)
s := &BucketStore{
logger: logger,
bkt: bkt,
Expand All @@ -301,14 +300,14 @@ func NewBucketStore(
blockSyncConcurrency: blockSyncConcurrency,
filterConfig: filterConfig,
queryGate: queryGate,
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
chunksLimiterFactory: chunksLimiterFactory,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enablePostingsCompression: enablePostingsCompression,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
metrics: newBucketStoreMetrics(reg),
}
s.metrics = metrics

if err := os.MkdirAll(dir, 0777); err != nil {
return nil, errors.Wrap(err, "create dir")
Expand Down Expand Up @@ -649,7 +648,7 @@ func blockSeries(
chunkr *bucketChunkReader,
matchers []*labels.Matcher,
req *storepb.SeriesRequest,
samplesLimiter SampleLimiter,
chunksLimiter ChunksLimiter,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand Down Expand Up @@ -722,12 +721,16 @@ func blockSeries(
s.refs = append(s.refs, meta.Ref)
}
if len(s.chks) > 0 {
if err := chunksLimiter.Reserve(uint64(len(s.chks))); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially limit chunk bytes instead TBH, but this is good for a start

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or both. In Cortex we would need by count :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue: #2861

return nil, nil, errors.Wrap(err, "exceeded chunks limit")
}

res = append(res, s)
}
}

// Preload all chunks that were marked in the previous stage.
if err := chunkr.preload(samplesLimiter); err != nil {
if err := chunkr.preload(); err != nil {
return nil, nil, errors.Wrap(err, "preload chunks")
}

Expand Down Expand Up @@ -858,6 +861,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
g, gctx = errgroup.WithContext(ctx)
resHints = &hintspb.SeriesResponseHints{}
reqBlockMatchers []*labels.Matcher
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped)
)

if req.Hints != nil {
Expand Down Expand Up @@ -909,7 +913,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
chunkr,
blockMatchers,
req,
s.samplesLimiter,
chunksLimiter,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1983,19 +1987,9 @@ func (r *bucketChunkReader) addPreload(id uint64) error {
}

// preload all added chunk IDs. Must be called before the first call to Chunk is made.
func (r *bucketChunkReader) preload(samplesLimiter SampleLimiter) error {
func (r *bucketChunkReader) preload() error {
g, ctx := errgroup.WithContext(r.ctx)

numChunks := uint64(0)
for _, offsets := range r.preloads {
for range offsets {
numChunks++
}
}
if err := samplesLimiter.Check(numChunks * maxSamplesPerChunk); err != nil {
return errors.Wrap(err, "exceeded samples limit")
}

for seq, offsets := range r.preloads {
sort.Slice(offsets, func(i, j int) bool {
return offsets[i] < offsets[j]
Expand Down
62 changes: 59 additions & 3 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -123,7 +124,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o
return
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -161,7 +162,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
s.cache,
nil,
0,
maxSampleCount,
NewChunksLimiterFactory(maxChunksLimit),
false,
20,
filterConf,
Expand Down Expand Up @@ -504,7 +505,10 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
hourAfter := time.Now().Add(1 * time.Hour)
filterMaxTime := model.TimeOrDurationValue{Time: &hourAfter}

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 241, emptyRelabelConfig, &FilterConfig{
// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
expectedChunks := uint64(2 * 2)

s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, emptyRelabelConfig, &FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
})
Expand Down Expand Up @@ -543,3 +547,55 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
testutil.Equals(t, 1, len(s.Chunks))
}
}

func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
// The query will fetch 2 series from 6 blocks, so we do expect to hit a total of 12 chunks.
expectedChunks := uint64(2 * 6)

cases := map[string]struct {
maxChunksLimit uint64
expectedErr string
}{
"should succeed if the max chunks limit is not exceeded": {
maxChunksLimit: expectedChunks,
},
"should fail if the max chunks limit is exceeded": {
maxChunksLimit: expectedChunks - 1,
expectedErr: "exceeded chunks limit",
},
}

for testName, testData := range cases {
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bkt := objstore.NewInMemBucket()

dir, err := ioutil.TempDir("", "test_bucket_chunks_limiter_e2e")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: minTimeDuration.PrometheusTimestamp(),
MaxTime: maxTimeDuration.PrometheusTimestamp(),
}

s.cache.SwapWith(noopCache{})
srv := newStoreSeriesServer(ctx)
err = s.store.Series(req, srv)

if testData.expectedErr == "" {
testutil.Ok(t, err)
} else {
testutil.NotOk(t, err)
testutil.Assert(t, strings.Contains(err.Error(), testData.expectedErr))
}
})
}
}
20 changes: 8 additions & 12 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func TestBucketStore_Info(t *testing.T) {
noopCache{},
nil,
2e5,
0,
NewChunksLimiterFactory(0),
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -823,7 +823,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
noopCache{},
nil,
0,
0,
NewChunksLimiterFactory(0),
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -1248,8 +1248,8 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
blockSets: map[uint64]*bucketBlockSet{
labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{blocks}},
},
queryGate: noopGate{},
samplesLimiter: noopLimiter{},
queryGate: noopGate{},
chunksLimiterFactory: NewChunksLimiterFactory(0),
}

for _, block := range blocks {
Expand Down Expand Up @@ -1330,10 +1330,6 @@ type noopGate struct{}
func (noopGate) Start(context.Context) error { return nil }
func (noopGate) Done() {}

type noopLimiter struct{}

func (noopLimiter) Check(uint64) error { return nil }

// Regression test against: https://github.com/thanos-io/thanos/issues/2147.
func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "segfault-series")
Expand Down Expand Up @@ -1456,8 +1452,8 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
b1.meta.ULID: b1,
b2.meta.ULID: b2,
},
queryGate: noopGate{},
samplesLimiter: noopLimiter{},
queryGate: noopGate{},
chunksLimiterFactory: NewChunksLimiterFactory(0),
}

t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {
Expand Down Expand Up @@ -1571,7 +1567,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
indexCache,
nil,
1000000,
10000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
false,
10,
nil,
Expand Down Expand Up @@ -1680,7 +1676,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
indexCache,
nil,
1000000,
10000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
false,
10,
nil,
Expand Down
Loading