From 651a4a440e8c433476ba6047bf9d423f89e85496 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 13 Jun 2024 09:04:22 -0700 Subject: [PATCH] Enhanced bytes limiter with data type param (#7414) * Refactor existing stats incrementation for touched and fetched data Signed-off-by: Justin Jung * Add TypedBytesLimiter Signed-off-by: Justin Jung * Remove addAndCheck func Signed-off-by: Justin Jung * Update BytesLimiter interface to accept dataType param Signed-off-by: Justin Jung * Added tests Signed-off-by: Justin Jung * Fix build + changelog Signed-off-by: Justin Jung * Fix wrong data type Signed-off-by: Justin Jung * Changed storeDataType to be exported Signed-off-by: Justin Jung * Revert []BytesLimiter to BytesLimtier Signed-off-by: Justin Jung * Lint Signed-off-by: Justin Jung * More reverts Signed-off-by: Justin Jung * More Signed-off-by: Justin Jung * Rename DefaultBytesLimiterFactory back to NewBytesLimiterFactory Signed-off-by: Justin Jung * Changed StoreDataType from string to int Signed-off-by: Justin Jung * Removed nil check for bytesLimiter Signed-off-by: Justin Jung * nit Signed-off-by: Justin Jung * Removed changelog Signed-off-by: Justin Jung --------- Signed-off-by: Justin Jung --- pkg/store/bucket.go | 101 +++++++++++++++++++++-------------- pkg/store/bucket_e2e_test.go | 40 ++++++++++++++ pkg/store/bucket_test.go | 45 +++++++++++++++- pkg/store/limiter.go | 6 ++- 4 files changed, 151 insertions(+), 41 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 49a84d8000..6270f5a1e3 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -62,6 +62,17 @@ import ( "github.com/thanos-io/thanos/pkg/tracing" ) +type StoreDataType int + +const ( + PostingsFetched StoreDataType = iota + PostingsTouched + SeriesFetched + SeriesTouched + ChunksFetched + ChunksTouched +) + const ( // MaxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed // for precalculating the number of samples that we may have to retrieve and decode for any given query @@ -388,10 +399,10 @@ type BucketStore struct { // seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call, // or LabelName and LabelValues calls when used with matchers. seriesLimiterFactory SeriesLimiterFactory - // bytesLimiterFactory creates a new limiter used to limit the amount of bytes fetched/touched by each Series() call. bytesLimiterFactory BytesLimiterFactory - partitioner Partitioner + + partitioner Partitioner filterConfig *FilterConfig advLabelSets []labelpb.ZLabelSet @@ -2869,12 +2880,11 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, if !hit { return false, nil, nil } - if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { + if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil { return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err) } - r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache)) - r.stats.postingsTouched++ - r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(dataFromCache)) + + r.stats.add(PostingsTouched, 1, len(dataFromCache)) p, closeFns, err := r.decodeCachedPostings(dataFromCache) defer func() { for _, closeFn := range closeFns { @@ -2943,10 +2953,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab // Fetch postings from the cache with a single call. fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant) for _, dataFromCache := range fromCache { - if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { + if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil { return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err) } - r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache)) } // Iterate over all groups and fetch posting from cache. @@ -2958,8 +2967,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab } // Get postings for the given key from cache first. if b, ok := fromCache[key]; ok { - r.stats.postingsTouched++ - r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(b)) + r.stats.add(PostingsTouched, 1, len(b)) l, closer, err := r.decodeCachedPostings(b) if err != nil { @@ -3000,10 +3008,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab start := int64(part.Start) length := int64(part.End) - start - if err := bytesLimiter.Reserve(uint64(length)); err != nil { + if err := bytesLimiter.ReserveWithType(uint64(length), PostingsFetched); err != nil { return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching postings: %s", err) } - r.stats.DataDownloadedSizeSum += units.Base2Bytes(length) } g, ctx := errgroup.WithContext(ctx) @@ -3035,8 +3042,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length) stats.postingsFetchCount++ - stats.postingsFetched += j - i - stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length)) + stats.add(PostingsFetched, j-i, int(length)) for rdr.Next() { diffVarintPostings, postingsCount, keyID := rdr.AtDiffVarint() @@ -3050,12 +3056,11 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab return errors.Wrap(err, "encoding with snappy") } - stats.postingsTouched++ - stats.PostingsTouchedSizeSum += units.Base2Bytes(int(len(diffVarintPostings))) stats.cachedPostingsCompressions += 1 stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(diffVarintPostings)) stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(len(dataToCache)) stats.CachedPostingsCompressionTimeSum += time.Since(startCompression) + stats.add(PostingsTouched, 1, len(diffVarintPostings)) r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache, tenant) } @@ -3178,10 +3183,9 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids, tenant) for id, b := range fromCache { r.loadedSeries[id] = b - if err := bytesLimiter.Reserve(uint64(len(b))); err != nil { + if err := bytesLimiter.ReserveWithType(uint64(len(b)), SeriesTouched); err != nil { return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading series from index cache: %s", err) } - r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(b)) } parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) { @@ -3207,11 +3211,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series r.stats.merge(stats) }() - if bytesLimiter != nil { - if err := bytesLimiter.Reserve(uint64(end - start)); err != nil { - return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching series: %s", err) - } - stats.DataDownloadedSizeSum += units.Base2Bytes(end - start) + if err := bytesLimiter.ReserveWithType(uint64(end-start), SeriesFetched); err != nil { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching series: %s", err) } b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), r.logger) @@ -3220,9 +3221,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series } stats.seriesFetchCount++ - stats.seriesFetched += len(ids) stats.SeriesFetchDurationSum += time.Since(begin) - stats.SeriesFetchedSizeSum += units.Base2Bytes(int(end - start)) + stats.add(SeriesFetched, len(ids), int(end-start)) for i, id := range ids { c := b[uint64(id)-start:] @@ -3325,8 +3325,7 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref storage.SeriesRef, lset *[]sym return false, errors.Errorf("series %d not found", ref) } - r.stats.seriesTouched++ - r.stats.SeriesTouchedSizeSum += units.Base2Bytes(len(b)) + r.stats.add(SeriesTouched, 1, len(b)) return decodeSeriesForTime(b, lset, chks, skipChunks, mint, maxt) } @@ -3514,10 +3513,9 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ }) for _, p := range parts { - if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil { + if err := bytesLimiter.ReserveWithType(uint64(p.End-p.Start), ChunksFetched); err != nil { return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err) } - r.stats.DataDownloadedSizeSum += units.Base2Bytes(p.End - p.Start) } for _, p := range parts { @@ -3551,8 +3549,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize) stats.chunksFetchCount++ - stats.chunksFetched += len(pIdxs) - stats.ChunksFetchedSizeSum += units.Base2Bytes(int(part.End - part.Start)) + stats.add(ChunksFetched, len(pIdxs), int(part.End-part.Start)) var ( buf []byte @@ -3613,8 +3610,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a if err != nil { return errors.Wrap(err, "populate chunk") } - stats.chunksTouched++ - stats.ChunksTouchedSizeSum += units.Base2Bytes(int(chunkDataLen)) + stats.add(ChunksTouched, 1, int(chunkDataLen)) continue } @@ -3623,10 +3619,9 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a fetchBegin = time.Now() // Read entire chunk into new buffer. // TODO: readChunkRange call could be avoided for any chunk but last in this particular part. - if err := bytesLimiter.Reserve(uint64(chunkLen)); err != nil { + if err := bytesLimiter.ReserveWithType(uint64(chunkLen), ChunksTouched); err != nil { return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err) } - stats.DataDownloadedSizeSum += units.Base2Bytes(chunkLen) nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}, r.logger) if err != nil { @@ -3636,16 +3631,15 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a return errors.Errorf("preloaded chunk too small, expecting %d", chunkLen) } - stats.chunksFetchCount++ - stats.ChunksFetchedSizeSum += units.Base2Bytes(len(*nb)) + stats.add(ChunksFetched, 1, len(*nb)) c := rawChunk((*nb)[n:]) err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), &c, aggrs, r.save, calculateChunkChecksum) if err != nil { r.block.chunkPool.Put(nb) return errors.Wrap(err, "populate chunk") } - stats.chunksTouched++ - stats.ChunksTouchedSizeSum += units.Base2Bytes(int(chunkDataLen)) + + stats.add(ChunksTouched, 1, int(chunkDataLen)) r.block.chunkPool.Put(nb) } @@ -3746,6 +3740,35 @@ type queryStats struct { DataDownloadedSizeSum units.Base2Bytes } +func (s *queryStats) add(dataType StoreDataType, dataCount int, dataSize int) { + s.mtx.Lock() + defer s.mtx.Unlock() + + switch dataType { + case PostingsFetched: + s.postingsFetched += dataCount + s.PostingsFetchedSizeSum += units.Base2Bytes(dataSize) + case PostingsTouched: + s.postingsTouched += dataCount + s.PostingsTouchedSizeSum += units.Base2Bytes(dataSize) + case SeriesFetched: + s.seriesFetched += dataCount + s.SeriesFetchedSizeSum += units.Base2Bytes(dataSize) + case SeriesTouched: + s.seriesTouched += dataCount + s.SeriesTouchedSizeSum += units.Base2Bytes(dataSize) + case ChunksFetched: + s.chunksFetched += dataCount + s.ChunksFetchedSizeSum += units.Base2Bytes(dataSize) + case ChunksTouched: + s.chunksTouched += dataCount + s.ChunksTouchedSizeSum += units.Base2Bytes(dataSize) + default: + return + } + s.DataDownloadedSizeSum += units.Base2Bytes(dataSize) +} + func (s *queryStats) merge(o *queryStats) { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index c91fb4096d..cec72b374b 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -677,6 +677,46 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { } } +func TestBucketStore_Series_CustomBytesLimiters_e2e(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + bkt := objstore.NewInMemBucket() + + dir := t.TempDir() + + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), func(_ prometheus.Counter) BytesLimiter { + return &bytesLimiterMock{ + limitFunc: func(_ uint64, dataType StoreDataType) error { + if dataType == PostingsFetched { + return fmt.Errorf("error reserving data type: PostingsFetched") + } + + return nil + }, + } + }, emptyRelabelConfig, allowAllFilterConf) + testutil.Ok(t, s.store.SyncBlocks(ctx)) + + req := &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + }, + MinTime: minTimeDuration.PrometheusTimestamp(), + MaxTime: maxTimeDuration.PrometheusTimestamp(), + } + + s.cache.SwapWith(noopCache{}) + srv := newStoreSeriesServer(ctx) + err := s.store.Series(req, srv) + + testutil.NotOk(t, err) + testutil.Assert(t, strings.Contains(err.Error(), "exceeded bytes limit")) + testutil.Assert(t, strings.Contains(err.Error(), "error reserving data type: PostingsFetched")) + status, ok := status.FromError(err) + testutil.Equals(t, true, ok) + testutil.Equals(t, codes.ResourceExhausted, status.Code()) +} + func TestBucketStore_LabelNames_e2e(t *testing.T) { objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 23f7de0523..7afda70ebd 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -3852,6 +3852,9 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { }) testutil.Ok(t, err) + firstBytesLimiterChecked := false + secondBytesLimiterChecked := false + // Set series limit to 2. Only pass if series limiter applies // for lazy postings only. bucketStore, err := NewBucketStore( @@ -3860,7 +3863,24 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { "", NewChunksLimiterFactory(10e6), NewSeriesLimiterFactory(2), - NewBytesLimiterFactory(10e6), + func(_ prometheus.Counter) BytesLimiter { + return &compositeBytesLimiterMock{ + limiters: []BytesLimiter{ + &bytesLimiterMock{ + limitFunc: func(_ uint64, _ StoreDataType) error { + firstBytesLimiterChecked = true + return nil + }, + }, + &bytesLimiterMock{ + limitFunc: func(_ uint64, _ StoreDataType) error { + secondBytesLimiterChecked = true + return nil + }, + }, + }, + } + }, NewGapBasedPartitioner(PartitionerMaxGapSize), 20, true, @@ -3891,4 +3911,27 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { srv := newStoreSeriesServer(context.Background()) testutil.Ok(t, bucketStore.Series(req, srv)) testutil.Equals(t, 2, len(srv.SeriesSet)) + testutil.Equals(t, true, firstBytesLimiterChecked) + testutil.Equals(t, true, secondBytesLimiterChecked) +} + +type bytesLimiterMock struct { + limitFunc func(uint64, StoreDataType) error +} + +func (m *bytesLimiterMock) ReserveWithType(num uint64, dataType StoreDataType) error { + return m.limitFunc(num, dataType) +} + +type compositeBytesLimiterMock struct { + limiters []BytesLimiter +} + +func (m *compositeBytesLimiterMock) ReserveWithType(num uint64, dataType StoreDataType) error { + for _, l := range m.limiters { + if err := l.ReserveWithType(num, dataType); err != nil { + return err + } + } + return nil } diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index f564e11443..993330cc85 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -34,7 +34,7 @@ type BytesLimiter interface { // Reserve bytes out of the total amount of bytes enforced by the limiter. // Returns an error if the limit has been exceeded. This function must be // goroutine safe. - Reserve(num uint64) error + ReserveWithType(num uint64, dataType StoreDataType) error } // ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for @@ -64,6 +64,10 @@ func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter { // Reserve implements ChunksLimiter. func (l *Limiter) Reserve(num uint64) error { + return l.ReserveWithType(num, 0) +} + +func (l *Limiter) ReserveWithType(num uint64, _ StoreDataType) error { if l == nil { return nil }