Skip to content

Commit

Permalink
store: make changes according to the review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Giedrius Statkevičius committed Feb 8, 2019
1 parent 12db24a commit 9d0b8a7
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 50 deletions.
3 changes: 2 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks.").
Default("2GB").Bytes()

maxSampleCount := cmd.Flag("grpc-sample-limit", "Maximum amount of samples returned via a single Series call. 0 means no limit.").
maxSampleCount := cmd.Flag("grpc-sample-limit",
"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: may unlikely underestimate the number of samples that would be needed to download.").
Default("50000000").Uint()

maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int()
Expand Down
87 changes: 39 additions & 48 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ import (
"google.golang.org/grpc/status"
)

// Approximately this is the max number of samples that we may have in any given chunk. This is needed
// for precalculating the number of samples that we may have to retrieve and decode for any given query
// without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know
// where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way
// because you barely get any improvements in compression when the number of samples is beyond this.
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
const maxSamplesPerSeriesRef uint64 = 120

type bucketStoreMetrics struct {
blocksLoaded prometheus.Gauge
blockLoads prometheus.Counter
Expand All @@ -57,6 +65,7 @@ type bucketStoreMetrics struct {
resultSeriesCount prometheus.Summary
chunkSizeBytes prometheus.Histogram
queriesLimited prometheus.Counter
queriesLimit prometheus.Gauge
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -133,8 +142,12 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
})

m.queriesLimited = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_queries_limited_total",
Help: "Total number of queries that were dropped due to the sample limit.",
Name: "thanos_bucket_store_queries_limited",
Help: "Number of queries that were dropped due to the sample limit.",
})
m.queriesLimit = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_limit",
Help: "Number of maximum concurrent queries.",
})

if reg != nil {
Expand All @@ -154,6 +167,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
m.resultSeriesCount,
m.chunkSizeBytes,
m.queriesLimited,
m.queriesLimit,
)
}
return &m
Expand All @@ -179,12 +193,11 @@ type BucketStore struct {
// Number of goroutines to use when syncing blocks from object storage.
blockSyncConcurrency int

// The maximum of samples Thanos Store could return in one Series() call.
// Set to 0 to remove this limit (not recommended).
maxSampleCount uint64

// Query gate which limits the maximum amount of concurrent queries.
queryGate *Gate

// Samples limiter which limits the number of samples per each Series() call.
samplesLimiter *Limiter
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand Down Expand Up @@ -220,17 +233,19 @@ func NewBucketStore(
chunkPool: chunkPool,
blocks: map[ulid.ULID]*bucketBlock{},
blockSets: map[uint64]*bucketBlockSet{},
maxSampleCount: maxSampleCount,
debugLogging: debugLogging,
blockSyncConcurrency: blockSyncConcurrency,
queryGate: NewGate(maxConcurrent, reg),
samplesLimiter: NewLimiter(maxSampleCount),
}
s.metrics = newBucketStoreMetrics(reg)

if err := os.MkdirAll(dir, 0777); err != nil {
return nil, errors.Wrap(err, "create dir")
}

s.metrics.queriesLimit.Set(float64(maxConcurrent))

return s, nil
}

Expand Down Expand Up @@ -481,16 +496,15 @@ func (s *bucketSeriesSet) Err() error {
return s.err
}

func (bs *BucketStore) blockSeries(
func blockSeries(
ctx context.Context,
ulid ulid.ULID,
extLset map[string]string,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
matchers []labels.Matcher,
req *storepb.SeriesRequest,
samples *uint64,
samplesLock *sync.Mutex,
samplesLimiter *Limiter,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand Down Expand Up @@ -568,7 +582,7 @@ func (bs *BucketStore) blockSeries(
}

// Preload all chunks that were marked in the previous stage.
if err := chunkr.preload(); err != nil {
if err := chunkr.preload(samplesLimiter); err != nil {
return nil, nil, errors.Wrap(err, "preload chunks")
}

Expand All @@ -579,7 +593,7 @@ func (bs *BucketStore) blockSeries(
if err != nil {
return nil, nil, errors.Wrap(err, "get chunk")
}
if err := bs.populateChunk(&s.chks[i], chk, req.Aggregates, samples, samplesLock); err != nil {
if err := populateChunk(&s.chks[i], chk, req.Aggregates); err != nil {
return nil, nil, errors.Wrap(err, "populate chunk")
}
}
Expand All @@ -588,30 +602,9 @@ func (bs *BucketStore) blockSeries(
return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil
}

func (bs *BucketStore) checkSamples(gotSamples uint64, samples *uint64, samplesLock *sync.Mutex) error {
samplesLock.Lock()
*samples += gotSamples
if bs.maxSampleCount > 0 && *samples > bs.maxSampleCount {
samplesLock.Unlock()
bs.metrics.queriesLimited.Inc()
return errors.Errorf("sample limit violated (got %v, limit %v)", *samples, bs.maxSampleCount)
}
samplesLock.Unlock()
return nil
}

func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr,
samples *uint64, samplesLock *sync.Mutex) error {
func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error {

if in.Encoding() == chunkenc.EncXOR {
ch, err := chunkenc.FromData(in.Encoding(), in.Bytes())
if err != nil {
return errors.Errorf("failed to create a chunk")
}
err = bs.checkSamples(uint64(ch.NumSamples()), samples, samplesLock)
if err != nil {
return errors.Wrapf(err, "check samples")
}
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()}
return nil
}
Expand All @@ -620,10 +613,6 @@ func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk,
}

ac := downsample.AggrChunk(in.Bytes())
err := bs.checkSamples(uint64(ac.NumSamples()), samples, samplesLock)
if err != nil {
return errors.Wrapf(err, "check samples")
}

for _, at := range aggrs {
switch at {
Expand Down Expand Up @@ -710,12 +699,10 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
return status.Error(codes.InvalidArgument, err.Error())
}
var (
stats = &queryStats{}
g run.Group
res []storepb.SeriesSet
mtx sync.Mutex
samples uint64
samplesLock sync.Mutex
stats = &queryStats{}
g run.Group
res []storepb.SeriesSet
mtx sync.Mutex
)
s.mtx.RLock()

Expand Down Expand Up @@ -745,15 +732,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block")

g.Add(func() error {
part, pstats, err := s.blockSeries(ctx,
part, pstats, err := blockSeries(ctx,
b.meta.ULID,
b.meta.Thanos.Labels,
indexr,
chunkr,
blockMatchers,
req,
&samples,
&samplesLock,
s.samplesLimiter,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1570,12 +1556,17 @@ func (r *bucketChunkReader) addPreload(id uint64) error {
}

// preload all added chunk IDs. Must be called before the first call to Chunk is made.
func (r *bucketChunkReader) preload() error {
func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error {
const maxChunkSize = 16000
const maxGapSize = 512 * 1024

var g run.Group

numSamples := uint64(len(r.preloads)) * maxSamplesPerSeriesRef
if err := samplesLimiter.Check(numSamples); err != nil {
return err
}

for seq, offsets := range r.preloads {
sort.Slice(offsets, func(i, j int) bool {
return offsets[i] < offsets[j]
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestBucketStore_e2e(t *testing.T) {

// Test the samples limit.
testutil.Ok(t, os.RemoveAll(dir))
s = prepareStoreWithTestBlocks(t, dir, bkt, 30)
s = prepareStoreWithTestBlocks(t, dir, bkt, 120)
mint, maxt = s.store.TimeRange()
defer s.Close()

Expand Down
24 changes: 24 additions & 0 deletions pkg/store/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package store

import "github.com/pkg/errors"

// Limiter is a simple mechanism for checking if something has passed a certain threshold.
type Limiter struct {
limit uint64
}

// NewLimiter returns a new limiter with a specified limit. 0 disables the limit.
func NewLimiter(limit uint64) *Limiter {
return &Limiter{limit: limit}
}

// Check checks if the passed number exceeds the limits or not.
func (l *Limiter) Check(num uint64) error {
if l.limit == 0 {
return nil
}
if num > l.limit {
return errors.Errorf("limit %v violated", l.limit)
}
return nil
}

0 comments on commit 9d0b8a7

Please sign in to comment.