Skip to content

Commit

Permalink
Add streaming series limit at block series client (thanos-io#6972)
Browse files Browse the repository at this point in the history
* add series limit that is applied when streaming using block series client

Signed-off-by: Ben Ye <[email protected]>

* changelog

Signed-off-by: Ben Ye <[email protected]>

* add unit tests

Signed-off-by: Ben Ye <[email protected]>

* address comments

Signed-off-by: Ben Ye <[email protected]>

* fix comment

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Dec 12, 2023
1 parent 7b8eb86 commit fc1a6ed
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6925](https://github.com/thanos-io/thanos/pull/6925) Store Gateway: Support float native histogram.
- [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs.
- [#6943](https://github.com/thanos-io/thanos/pull/6943) Ruler: Added `keep_firing_for` field in alerting rule.
- [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Apply series limit when streaming series for series actually matched if lazy postings is enabled.

### Changed

Expand Down
48 changes: 33 additions & 15 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,8 +952,10 @@ type blockSeriesClient struct {
indexr *bucketIndexReader
chunkr *bucketChunkReader
loadAggregates []storepb.Aggr
chunksLimiter ChunksLimiter
bytesLimiter BytesLimiter

seriesLimiter SeriesLimiter
chunksLimiter ChunksLimiter
bytesLimiter BytesLimiter

lazyExpandedPostingEnabled bool
lazyExpandedPostingsCount prometheus.Counter
Expand Down Expand Up @@ -986,7 +988,8 @@ func newBlockSeriesClient(
logger log.Logger,
b *bucketBlock,
req *storepb.SeriesRequest,
limiter ChunksLimiter,
seriesLimiter SeriesLimiter,
chunksLimiter ChunksLimiter,
bytesLimiter BytesLimiter,
blockMatchers []*labels.Matcher,
shardMatcher *storepb.ShardMatcher,
Expand Down Expand Up @@ -1022,7 +1025,8 @@ func newBlockSeriesClient(
maxt: req.MaxTime,
indexr: b.indexReader(),
chunkr: chunkr,
chunksLimiter: limiter,
seriesLimiter: seriesLimiter,
chunksLimiter: chunksLimiter,
bytesLimiter: bytesLimiter,
skipChunks: req.SkipChunks,
seriesFetchDurationSum: seriesFetchDurationSum,
Expand Down Expand Up @@ -1091,20 +1095,21 @@ func (b *blockSeriesClient) ExpandPostings(
}
b.lazyPostings = ps

// If lazy expanded posting enabled, it is possible to fetch more series
// so easier to hit the series limit.
if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
}

if b.batchSize > len(ps.postings) {
b.batchSize = len(ps.postings)
}
if b.lazyPostings.lazyExpanded() {
// Assume lazy expansion could cut actual expanded postings length to 50%.
b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2)
b.lazyExpandedPostingsCount.Inc()
} else {
// Apply series limiter eargerly if lazy postings not enabled.
if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
}
}

if b.batchSize > len(ps.postings) {
b.batchSize = len(ps.postings)
}

b.entries = make([]seriesEntry, 0, b.batchSize)
return nil
}
Expand Down Expand Up @@ -1169,6 +1174,7 @@ func (b *blockSeriesClient) nextBatch(tenant string) error {
return errors.Wrap(err, "preload series")
}

seriesMatched := 0
b.entries = b.entries[:0]
OUTER:
for i := 0; i < len(postingsBatch); i++ {
Expand Down Expand Up @@ -1209,6 +1215,7 @@ OUTER:
continue
}

seriesMatched++
s := seriesEntry{lset: completeLabelset}
if b.skipChunks {
b.entries = append(b.entries, s)
Expand Down Expand Up @@ -1238,6 +1245,13 @@ OUTER:
b.entries = append(b.entries, s)
}

if b.lazyPostings.lazyExpanded() {
// Apply series limit before fetching chunks, for actual series matched.
if err := b.seriesLimiter.Reserve(uint64(seriesMatched)); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
}
}

if !b.skipChunks {
if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant); err != nil {
return errors.Wrap(err, "load chunks")
Expand Down Expand Up @@ -1405,8 +1419,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
g, gctx = errgroup.WithContext(ctx)
resHints = &hintspb.SeriesResponseHints{}
reqBlockMatchers []*labels.Matcher
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant))
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))

chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant))
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))

queryStatsEnabled = false
)
Expand Down Expand Up @@ -1464,6 +1479,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
s.logger,
blk,
req,
seriesLimiter,
chunksLimiter,
bytesLimiter,
sortedBlockMatchers,
Expand Down Expand Up @@ -1764,6 +1780,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
s.logger,
b,
seriesReq,
seriesLimiter,
nil,
bytesLimiter,
reqSeriesMatchersNoExtLabels,
Expand Down Expand Up @@ -1967,6 +1984,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
s.logger,
b,
seriesReq,
seriesLimiter,
nil,
bytesLimiter,
reqSeriesMatchersNoExtLabels,
Expand Down
100 changes: 100 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
nil,
blk,
req,
seriesLimiter,
chunksLimiter,
NewBytesLimiterFactory(0)(nil),
matchers,
Expand Down Expand Up @@ -3649,3 +3650,102 @@ func TestQueryStatsMerge(t *testing.T) {
s.merge(o)
testutil.Equals(t, e, s)
}

func TestBucketStoreStreamingSeriesLimit(t *testing.T) {
logger := log.NewNopLogger()
tmpDir := t.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
auxDir := filepath.Join(tmpDir, "aux")
metaDir := filepath.Join(tmpDir, "meta")
extLset := labels.FromStrings("region", "eu-west")

testutil.Ok(t, os.MkdirAll(metaDir, os.ModePerm))
testutil.Ok(t, os.MkdirAll(auxDir, os.ModePerm))

bkt, err := filesystem.NewBucket(bktDir)
testutil.Ok(t, err)
t.Cleanup(func() { testutil.Ok(t, bkt.Close()) })

headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = tmpDir
headOpts.ChunkRange = 1000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
testutil.Ok(t, err)
t.Cleanup(func() { testutil.Ok(t, h.Close()) })

app := h.Appender(context.Background())
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "1"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "2"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "3"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "4"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "5"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "6"), 0, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())

id := createBlockFromHead(t, auxDir, h)

auxBlockDir := filepath.Join(auxDir, id.String())
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}, nil)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(t, err)

insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
})
testutil.Ok(t, err)

// Set series limit to 2. Only pass if series limiter applies
// for lazy postings only.
bucketStore, err := NewBucketStore(
objstore.WithNoopInstr(bkt),
metaFetcher,
"",
NewChunksLimiterFactory(10e6),
NewSeriesLimiterFactory(2),
NewBytesLimiterFactory(10e6),
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
1*time.Minute,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
WithLazyExpandedPostings(true),
WithBlockEstimatedMaxSeriesFunc(func(_ metadata.Meta) uint64 {
return 1
}),
)
testutil.Ok(t, err)
t.Cleanup(func() { testutil.Ok(t, bucketStore.Close()) })

testutil.Ok(t, bucketStore.SyncBlocks(context.Background()))

req := &storepb.SeriesRequest{
MinTime: timestamp.FromTime(minTime),
MaxTime: timestamp.FromTime(maxTime),
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
{Type: storepb.LabelMatcher_RE, Name: "z", Value: "1|2"},
},
}
srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, bucketStore.Series(req, srv))
testutil.Equals(t, 2, len(srv.SeriesSet))
}

0 comments on commit fc1a6ed

Please sign in to comment.