From 9d0b8a76dd1031d1249491ee74244b74d1a5ae73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 11:24:27 +0200 Subject: [PATCH] store: make changes according to the review comments --- cmd/thanos/store.go | 3 +- pkg/store/bucket.go | 87 ++++++++++++++++-------------------- pkg/store/bucket_e2e_test.go | 2 +- pkg/store/limiter.go | 24 ++++++++++ 4 files changed, 66 insertions(+), 50 deletions(-) create mode 100644 pkg/store/limiter.go diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 25f8512abf..7ea51a13cc 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -36,7 +36,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks."). Default("2GB").Bytes() - maxSampleCount := cmd.Flag("grpc-sample-limit", "Maximum amount of samples returned via a single Series call. 0 means no limit."). + maxSampleCount := cmd.Flag("grpc-sample-limit", + "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: may unlikely underestimate the number of samples that would be needed to download."). Default("50000000").Uint() maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index c1411c15fc..a6d1fd89eb 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -41,6 +41,14 @@ import ( "google.golang.org/grpc/status" ) +// Approximately this is the max number of samples that we may have in any given chunk. This is needed +// for precalculating the number of samples that we may have to retrieve and decode for any given query +// without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know +// where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way +// because you barely get any improvements in compression when the number of samples is beyond this. +// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf. +const maxSamplesPerSeriesRef uint64 = 120 + type bucketStoreMetrics struct { blocksLoaded prometheus.Gauge blockLoads prometheus.Counter @@ -57,6 +65,7 @@ type bucketStoreMetrics struct { resultSeriesCount prometheus.Summary chunkSizeBytes prometheus.Histogram queriesLimited prometheus.Counter + queriesLimit prometheus.Gauge } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -133,8 +142,12 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { }) m.queriesLimited = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_bucket_store_queries_limited_total", - Help: "Total number of queries that were dropped due to the sample limit.", + Name: "thanos_bucket_store_queries_limited", + Help: "Number of queries that were dropped due to the sample limit.", + }) + m.queriesLimit = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "thanos_bucket_store_queries_limit", + Help: "Number of maximum concurrent queries.", }) if reg != nil { @@ -154,6 +167,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { m.resultSeriesCount, m.chunkSizeBytes, m.queriesLimited, + m.queriesLimit, ) } return &m @@ -179,12 +193,11 @@ type BucketStore struct { // Number of goroutines to use when syncing blocks from object storage. blockSyncConcurrency int - // The maximum of samples Thanos Store could return in one Series() call. - // Set to 0 to remove this limit (not recommended). - maxSampleCount uint64 - // Query gate which limits the maximum amount of concurrent queries. queryGate *Gate + + // Samples limiter which limits the number of samples per each Series() call. + samplesLimiter *Limiter } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -220,10 +233,10 @@ func NewBucketStore( chunkPool: chunkPool, blocks: map[ulid.ULID]*bucketBlock{}, blockSets: map[uint64]*bucketBlockSet{}, - maxSampleCount: maxSampleCount, debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, queryGate: NewGate(maxConcurrent, reg), + samplesLimiter: NewLimiter(maxSampleCount), } s.metrics = newBucketStoreMetrics(reg) @@ -231,6 +244,8 @@ func NewBucketStore( return nil, errors.Wrap(err, "create dir") } + s.metrics.queriesLimit.Set(float64(maxConcurrent)) + return s, nil } @@ -481,7 +496,7 @@ func (s *bucketSeriesSet) Err() error { return s.err } -func (bs *BucketStore) blockSeries( +func blockSeries( ctx context.Context, ulid ulid.ULID, extLset map[string]string, @@ -489,8 +504,7 @@ func (bs *BucketStore) blockSeries( chunkr *bucketChunkReader, matchers []labels.Matcher, req *storepb.SeriesRequest, - samples *uint64, - samplesLock *sync.Mutex, + samplesLimiter *Limiter, ) (storepb.SeriesSet, *queryStats, error) { ps, err := indexr.ExpandedPostings(matchers) if err != nil { @@ -568,7 +582,7 @@ func (bs *BucketStore) blockSeries( } // Preload all chunks that were marked in the previous stage. - if err := chunkr.preload(); err != nil { + if err := chunkr.preload(samplesLimiter); err != nil { return nil, nil, errors.Wrap(err, "preload chunks") } @@ -579,7 +593,7 @@ func (bs *BucketStore) blockSeries( if err != nil { return nil, nil, errors.Wrap(err, "get chunk") } - if err := bs.populateChunk(&s.chks[i], chk, req.Aggregates, samples, samplesLock); err != nil { + if err := populateChunk(&s.chks[i], chk, req.Aggregates); err != nil { return nil, nil, errors.Wrap(err, "populate chunk") } } @@ -588,30 +602,9 @@ func (bs *BucketStore) blockSeries( return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } -func (bs *BucketStore) checkSamples(gotSamples uint64, samples *uint64, samplesLock *sync.Mutex) error { - samplesLock.Lock() - *samples += gotSamples - if bs.maxSampleCount > 0 && *samples > bs.maxSampleCount { - samplesLock.Unlock() - bs.metrics.queriesLimited.Inc() - return errors.Errorf("sample limit violated (got %v, limit %v)", *samples, bs.maxSampleCount) - } - samplesLock.Unlock() - return nil -} - -func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, - samples *uint64, samplesLock *sync.Mutex) error { +func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error { if in.Encoding() == chunkenc.EncXOR { - ch, err := chunkenc.FromData(in.Encoding(), in.Bytes()) - if err != nil { - return errors.Errorf("failed to create a chunk") - } - err = bs.checkSamples(uint64(ch.NumSamples()), samples, samplesLock) - if err != nil { - return errors.Wrapf(err, "check samples") - } out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()} return nil } @@ -620,10 +613,6 @@ func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, } ac := downsample.AggrChunk(in.Bytes()) - err := bs.checkSamples(uint64(ac.NumSamples()), samples, samplesLock) - if err != nil { - return errors.Wrapf(err, "check samples") - } for _, at := range aggrs { switch at { @@ -710,12 +699,10 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie return status.Error(codes.InvalidArgument, err.Error()) } var ( - stats = &queryStats{} - g run.Group - res []storepb.SeriesSet - mtx sync.Mutex - samples uint64 - samplesLock sync.Mutex + stats = &queryStats{} + g run.Group + res []storepb.SeriesSet + mtx sync.Mutex ) s.mtx.RLock() @@ -745,15 +732,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block") g.Add(func() error { - part, pstats, err := s.blockSeries(ctx, + part, pstats, err := blockSeries(ctx, b.meta.ULID, b.meta.Thanos.Labels, indexr, chunkr, blockMatchers, req, - &samples, - &samplesLock, + s.samplesLimiter, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) @@ -1570,12 +1556,17 @@ func (r *bucketChunkReader) addPreload(id uint64) error { } // preload all added chunk IDs. Must be called before the first call to Chunk is made. -func (r *bucketChunkReader) preload() error { +func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error { const maxChunkSize = 16000 const maxGapSize = 512 * 1024 var g run.Group + numSamples := uint64(len(r.preloads)) * maxSamplesPerSeriesRef + if err := samplesLimiter.Check(numSamples); err != nil { + return err + } + for seq, offsets := range r.preloads { sort.Slice(offsets, func(i, j int) bool { return offsets[i] < offsets[j] diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index e5e4f8db26..9ae5098397 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -218,7 +218,7 @@ func TestBucketStore_e2e(t *testing.T) { // Test the samples limit. testutil.Ok(t, os.RemoveAll(dir)) - s = prepareStoreWithTestBlocks(t, dir, bkt, 30) + s = prepareStoreWithTestBlocks(t, dir, bkt, 120) mint, maxt = s.store.TimeRange() defer s.Close() diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go new file mode 100644 index 0000000000..d27f4ce05f --- /dev/null +++ b/pkg/store/limiter.go @@ -0,0 +1,24 @@ +package store + +import "github.com/pkg/errors" + +// Limiter is a simple mechanism for checking if something has passed a certain threshold. +type Limiter struct { + limit uint64 +} + +// NewLimiter returns a new limiter with a specified limit. 0 disables the limit. +func NewLimiter(limit uint64) *Limiter { + return &Limiter{limit: limit} +} + +// Check checks if the passed number exceeds the limits or not. +func (l *Limiter) Check(num uint64) error { + if l.limit == 0 { + return nil + } + if num > l.limit { + return errors.Errorf("limit %v violated", l.limit) + } + return nil +}