Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming series limit at block series client #6972

Merged
merged 5 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6925](https://github.com/thanos-io/thanos/pull/6925) Store Gateway: Support float native histogram.
- [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs.
- [#6943](https://github.com/thanos-io/thanos/pull/6943) Ruler: Added `keep_firing_for` field in alerting rule.
- [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Apply series limit when streaming series for series actually matched if lazy postings is enabled.

### Changed

Expand Down
48 changes: 33 additions & 15 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,8 +952,10 @@ type blockSeriesClient struct {
indexr *bucketIndexReader
chunkr *bucketChunkReader
loadAggregates []storepb.Aggr
chunksLimiter ChunksLimiter
bytesLimiter BytesLimiter

seriesLimiter SeriesLimiter
chunksLimiter ChunksLimiter
bytesLimiter BytesLimiter

lazyExpandedPostingEnabled bool
lazyExpandedPostingsCount prometheus.Counter
Expand Down Expand Up @@ -986,7 +988,8 @@ func newBlockSeriesClient(
logger log.Logger,
b *bucketBlock,
req *storepb.SeriesRequest,
limiter ChunksLimiter,
seriesLimiter SeriesLimiter,
chunksLimiter ChunksLimiter,
bytesLimiter BytesLimiter,
blockMatchers []*labels.Matcher,
shardMatcher *storepb.ShardMatcher,
Expand Down Expand Up @@ -1022,7 +1025,8 @@ func newBlockSeriesClient(
maxt: req.MaxTime,
indexr: b.indexReader(),
chunkr: chunkr,
chunksLimiter: limiter,
seriesLimiter: seriesLimiter,
chunksLimiter: chunksLimiter,
bytesLimiter: bytesLimiter,
skipChunks: req.SkipChunks,
seriesFetchDurationSum: seriesFetchDurationSum,
Expand Down Expand Up @@ -1091,20 +1095,21 @@ func (b *blockSeriesClient) ExpandPostings(
}
b.lazyPostings = ps

// If lazy expanded posting enabled, it is possible to fetch more series
// so easier to hit the series limit.
if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
}

if b.batchSize > len(ps.postings) {
b.batchSize = len(ps.postings)
}
if b.lazyPostings.lazyExpanded() {
// Assume lazy expansion could cut actual expanded postings length to 50%.
b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2)
b.lazyExpandedPostingsCount.Inc()
} else {
// Apply series limiter eargerly if lazy postings not enabled.
if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
}
}

if b.batchSize > len(ps.postings) {
b.batchSize = len(ps.postings)
}

b.entries = make([]seriesEntry, 0, b.batchSize)
return nil
}
Expand Down Expand Up @@ -1169,6 +1174,7 @@ func (b *blockSeriesClient) nextBatch(tenant string) error {
return errors.Wrap(err, "preload series")
}

seriesMatched := 0
b.entries = b.entries[:0]
OUTER:
for i := 0; i < len(postingsBatch); i++ {
Expand Down Expand Up @@ -1209,6 +1215,7 @@ OUTER:
continue
}

seriesMatched++
s := seriesEntry{lset: completeLabelset}
if b.skipChunks {
b.entries = append(b.entries, s)
Expand Down Expand Up @@ -1238,6 +1245,13 @@ OUTER:
b.entries = append(b.entries, s)
}

if b.lazyPostings.lazyExpanded() {
// Apply series limit before fetching chunks, for actual series matched.
if err := b.seriesLimiter.Reserve(uint64(seriesMatched)); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
}
}

if !b.skipChunks {
if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant); err != nil {
return errors.Wrap(err, "load chunks")
Expand Down Expand Up @@ -1405,8 +1419,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
g, gctx = errgroup.WithContext(ctx)
resHints = &hintspb.SeriesResponseHints{}
reqBlockMatchers []*labels.Matcher
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant))
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))

chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant))
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))

queryStatsEnabled = false
)
Expand Down Expand Up @@ -1464,6 +1479,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
s.logger,
blk,
req,
seriesLimiter,
chunksLimiter,
bytesLimiter,
sortedBlockMatchers,
Expand Down Expand Up @@ -1764,6 +1780,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
s.logger,
b,
seriesReq,
seriesLimiter,
nil,
bytesLimiter,
reqSeriesMatchersNoExtLabels,
Expand Down Expand Up @@ -1967,6 +1984,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
s.logger,
b,
seriesReq,
seriesLimiter,
nil,
bytesLimiter,
reqSeriesMatchersNoExtLabels,
Expand Down
100 changes: 100 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
nil,
blk,
req,
seriesLimiter,
chunksLimiter,
NewBytesLimiterFactory(0)(nil),
matchers,
Expand Down Expand Up @@ -3649,3 +3650,102 @@ func TestQueryStatsMerge(t *testing.T) {
s.merge(o)
testutil.Equals(t, e, s)
}

func TestBucketStoreStreamingSeriesLimit(t *testing.T) {
logger := log.NewNopLogger()
tmpDir := t.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
auxDir := filepath.Join(tmpDir, "aux")
metaDir := filepath.Join(tmpDir, "meta")
extLset := labels.FromStrings("region", "eu-west")

testutil.Ok(t, os.MkdirAll(metaDir, os.ModePerm))
testutil.Ok(t, os.MkdirAll(auxDir, os.ModePerm))

bkt, err := filesystem.NewBucket(bktDir)
testutil.Ok(t, err)
t.Cleanup(func() { testutil.Ok(t, bkt.Close()) })

headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = tmpDir
headOpts.ChunkRange = 1000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
testutil.Ok(t, err)
t.Cleanup(func() { testutil.Ok(t, h.Close()) })

app := h.Appender(context.Background())
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "1"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "2"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "3"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "4"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "5"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("a", "1", "z", "6"), 0, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())

id := createBlockFromHead(t, auxDir, h)

auxBlockDir := filepath.Join(auxDir, id.String())
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}, nil)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(t, err)

insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
})
testutil.Ok(t, err)

// Set series limit to 2. Only pass if series limiter applies
// for lazy postings only.
bucketStore, err := NewBucketStore(
objstore.WithNoopInstr(bkt),
metaFetcher,
"",
NewChunksLimiterFactory(10e6),
NewSeriesLimiterFactory(2),
NewBytesLimiterFactory(10e6),
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
1*time.Minute,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
WithLazyExpandedPostings(true),
WithBlockEstimatedMaxSeriesFunc(func(_ metadata.Meta) uint64 {
return 1
}),
)
testutil.Ok(t, err)
t.Cleanup(func() { testutil.Ok(t, bucketStore.Close()) })

testutil.Ok(t, bucketStore.SyncBlocks(context.Background()))

req := &storepb.SeriesRequest{
MinTime: timestamp.FromTime(minTime),
MaxTime: timestamp.FromTime(maxTime),
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
{Type: storepb.LabelMatcher_RE, Name: "z", Value: "1|2"},
},
}
srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, bucketStore.Series(req, srv))
testutil.Equals(t, 2, len(srv.SeriesSet))
}
Loading