Skip to content

Commit

Permalink
Fixed --store.grpc.series-sample-limit
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci committed Jul 8, 2020
1 parent 7743fa0 commit 61d8981
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
* [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers.
- [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix`
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
- [#2858](https://github.com/thanos-io/thanos/pull/2858) Store: Fix `--store.grpc.series-sample-limit` implementation. The limit is now applied to the sum of all samples fetched across all queried blocks via a single Series call, instead of applying it individually to each block.

### Changed

Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
Default("2GB").Bytes()

maxSampleCount := cmd.Flag("store.grpc.series-sample-limit",
"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: For efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit.").
"Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit.").
Default("0").Uint()

maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int()
Expand Down Expand Up @@ -298,7 +298,7 @@ func runStore(
indexCache,
queriesGate,
chunkPoolSizeBytes,
maxSampleCount,
store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
verbose,
blockSyncConcurrency,
filterConf,
Expand Down
13 changes: 8 additions & 5 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,14 @@ Flags:
memory.
--store.grpc.series-sample-limit=0
Maximum amount of samples returned via a single
Series call. 0 means no limit. NOTE: For
efficiency we take 120 as the number of samples
in chunk (it cannot be bigger than that), so
the actual number of samples might be lower,
even though the maximum could be hit.
Series call. The Series call fails if this
limit is exceeded. 0 means no limit. NOTE: For
efficiency the limit is internally implemented
as 'chunks limit' considering each chunk
contains 120 samples (it's the max number of
samples each chunk can contain), so the actual
number of samples might be lower, even though
the maximum could be hit.
--store.grpc.series-max-concurrency=20
Maximum number of concurrent Series calls.
--objstore.config-file=<file-path>
Expand Down
40 changes: 17 additions & 23 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ import (
)

const (
// maxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed
// MaxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed
// for precalculating the number of samples that we may have to retrieve and decode for any given query
// without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know
// where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way
// because you barely get any improvements in compression when the number of samples is beyond this.
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
maxSamplesPerChunk = 120
MaxSamplesPerChunk = 120
maxChunkSize = 16000
maxSeriesSize = 64 * 1024

Expand Down Expand Up @@ -240,9 +240,9 @@ type BucketStore struct {
// Query gate which limits the maximum amount of concurrent queries.
queryGate gate.Gate

// samplesLimiter limits the number of samples per each Series() call.
samplesLimiter SampleLimiter
partitioner partitioner
// chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call.
chunksLimiterFactory ChunksLimiterFactory
partitioner partitioner

filterConfig *FilterConfig
advLabelSets []storepb.LabelSet
Expand All @@ -269,7 +269,7 @@ func NewBucketStore(
indexCache storecache.IndexCache,
queryGate gate.Gate,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
chunksLimiterFactory ChunksLimiterFactory,
debugLogging bool,
blockSyncConcurrency int,
filterConfig *FilterConfig,
Expand All @@ -287,7 +287,6 @@ func NewBucketStore(
return nil, errors.Wrap(err, "create chunk pool")
}

metrics := newBucketStoreMetrics(reg)
s := &BucketStore{
logger: logger,
bkt: bkt,
Expand All @@ -301,14 +300,14 @@ func NewBucketStore(
blockSyncConcurrency: blockSyncConcurrency,
filterConfig: filterConfig,
queryGate: queryGate,
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
chunksLimiterFactory: chunksLimiterFactory,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enablePostingsCompression: enablePostingsCompression,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
metrics: newBucketStoreMetrics(reg),
}
s.metrics = metrics

if err := os.MkdirAll(dir, 0777); err != nil {
return nil, errors.Wrap(err, "create dir")
Expand Down Expand Up @@ -649,7 +648,7 @@ func blockSeries(
chunkr *bucketChunkReader,
matchers []*labels.Matcher,
req *storepb.SeriesRequest,
samplesLimiter SampleLimiter,
chunksLimiter ChunksLimiter,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand Down Expand Up @@ -722,12 +721,16 @@ func blockSeries(
s.refs = append(s.refs, meta.Ref)
}
if len(s.chks) > 0 {
if err := chunksLimiter.Reserve(uint64(len(s.chks))); err != nil {
return nil, nil, errors.Wrap(err, "exceeded chunks limit")
}

res = append(res, s)
}
}

// Preload all chunks that were marked in the previous stage.
if err := chunkr.preload(samplesLimiter); err != nil {
if err := chunkr.preload(); err != nil {
return nil, nil, errors.Wrap(err, "preload chunks")
}

Expand Down Expand Up @@ -858,6 +861,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
g, gctx = errgroup.WithContext(ctx)
resHints = &hintspb.SeriesResponseHints{}
reqBlockMatchers []*labels.Matcher
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped)
)

if req.Hints != nil {
Expand Down Expand Up @@ -909,7 +913,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
chunkr,
blockMatchers,
req,
s.samplesLimiter,
chunksLimiter,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1983,19 +1987,9 @@ func (r *bucketChunkReader) addPreload(id uint64) error {
}

// preload all added chunk IDs. Must be called before the first call to Chunk is made.
func (r *bucketChunkReader) preload(samplesLimiter SampleLimiter) error {
func (r *bucketChunkReader) preload() error {
g, ctx := errgroup.WithContext(r.ctx)

numChunks := uint64(0)
for _, offsets := range r.preloads {
for range offsets {
numChunks++
}
}
if err := samplesLimiter.Check(numChunks * maxSamplesPerChunk); err != nil {
return errors.Wrap(err, "exceeded samples limit")
}

for seq, offsets := range r.preloads {
sort.Slice(offsets, func(i, j int) bool {
return offsets[i] < offsets[j]
Expand Down
62 changes: 59 additions & 3 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -123,7 +124,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o
return
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -161,7 +162,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
s.cache,
nil,
0,
maxSampleCount,
NewChunksLimiterFactory(maxChunksLimit),
false,
20,
filterConf,
Expand Down Expand Up @@ -504,7 +505,10 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
hourAfter := time.Now().Add(1 * time.Hour)
filterMaxTime := model.TimeOrDurationValue{Time: &hourAfter}

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 241, emptyRelabelConfig, &FilterConfig{
// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
expectedChunks := uint64(2 * 2)

s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, emptyRelabelConfig, &FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
})
Expand Down Expand Up @@ -543,3 +547,55 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
testutil.Equals(t, 1, len(s.Chunks))
}
}

func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
// The query will fetch 2 series from 6 blocks, so we do expect to hit a total of 12 chunks.
expectedChunks := uint64(2 * 6)

cases := map[string]struct {
maxChunksLimit uint64
expectedErr string
}{
"should succeed if the max chunks limit is not exceeded": {
maxChunksLimit: expectedChunks,
},
"should fail if the max chunks limit is exceeded": {
maxChunksLimit: expectedChunks - 1,
expectedErr: "exceeded chunks limit",
},
}

for testName, testData := range cases {
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bkt := objstore.NewInMemBucket()

dir, err := ioutil.TempDir("", "test_bucket_chunks_limiter_e2e")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: minTimeDuration.PrometheusTimestamp(),
MaxTime: maxTimeDuration.PrometheusTimestamp(),
}

s.cache.SwapWith(noopCache{})
srv := newStoreSeriesServer(ctx)
err = s.store.Series(req, srv)

if testData.expectedErr == "" {
testutil.Ok(t, err)
} else {
testutil.NotOk(t, err)
testutil.Assert(t, strings.Contains(err.Error(), testData.expectedErr))
}
})
}
}
20 changes: 8 additions & 12 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func TestBucketStore_Info(t *testing.T) {
noopCache{},
nil,
2e5,
0,
NewChunksLimiterFactory(0),
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -828,7 +828,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
noopCache{},
nil,
0,
0,
NewChunksLimiterFactory(0),
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -1422,8 +1422,8 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) {
blockSets: map[uint64]*bucketBlockSet{
labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{blocks}},
},
queryGate: noopGate{},
samplesLimiter: noopLimiter{},
queryGate: noopGate{},
chunksLimiterFactory: NewChunksLimiterFactory(0),
}

for _, block := range blocks {
Expand Down Expand Up @@ -1509,10 +1509,6 @@ type noopGate struct{}
func (noopGate) Start(context.Context) error { return nil }
func (noopGate) Done() {}

type noopLimiter struct{}

func (noopLimiter) Check(uint64) error { return nil }

type benchSeriesCase struct {
name string
req *storepb.SeriesRequest
Expand Down Expand Up @@ -1677,8 +1673,8 @@ func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
b1.meta.ULID: b1,
b2.meta.ULID: b2,
},
queryGate: noopGate{},
samplesLimiter: noopLimiter{},
queryGate: noopGate{},
chunksLimiterFactory: NewChunksLimiterFactory(0),
}

t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {
Expand Down Expand Up @@ -1779,7 +1775,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
indexCache,
nil,
1000000,
10000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
false,
10,
nil,
Expand Down Expand Up @@ -1888,7 +1884,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
indexCache,
nil,
1000000,
10000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
false,
10,
nil,
Expand Down
Loading

0 comments on commit 61d8981

Please sign in to comment.