Skip to content

Commit

Permalink
expose downloaded data size in query hints (thanos-io#6409)
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored and HC Zhu committed Jun 27, 2023
1 parent 540f95c commit cfedce8
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 41 deletions.
11 changes: 11 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2393,6 +2393,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache))
}

// Iterate over all groups and fetch posting from cache.
Expand Down Expand Up @@ -2466,6 +2467,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
if err := bytesLimiter.Reserve(uint64(length)); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching postings: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(length)
}

g, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -2626,6 +2628,7 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser
if err := bytesLimiter.Reserve(uint64(len(b))); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading series from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(b))
}

parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) {
Expand All @@ -2651,6 +2654,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
if err := bytesLimiter.Reserve(uint64(end - start)); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching series: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(end - start)
}

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
Expand Down Expand Up @@ -2923,6 +2927,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(p.End - p.Start)
}

for _, p := range parts {
Expand Down Expand Up @@ -3040,6 +3045,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
if err := bytesLimiter.Reserve(uint64(chunkLen)); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(chunkLen)
nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}})
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen)
Expand Down Expand Up @@ -3147,6 +3153,8 @@ type queryStats struct {
mergedSeriesCount int
mergedChunksCount int
MergeDuration time.Duration

DataDownloadedSizeSum units.Base2Bytes
}

func (s queryStats) merge(o *queryStats) *queryStats {
Expand Down Expand Up @@ -3187,6 +3195,8 @@ func (s queryStats) merge(o *queryStats) *queryStats {
s.mergedChunksCount += o.mergedChunksCount
s.MergeDuration += o.MergeDuration

s.DataDownloadedSizeSum += o.DataDownloadedSizeSum

return &s
}

Expand All @@ -3211,6 +3221,7 @@ func (s queryStats) toHints() *hintspb.QueryStats {
ChunksFetchCount: int64(s.chunksFetchCount),
MergedSeriesCount: int64(s.mergedSeriesCount),
MergedChunksCount: int64(s.mergedChunksCount),
DataDownloadedSizeSum: int64(s.DataDownloadedSizeSum),
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/store/hintspb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (m *QueryStats) Merge(other *QueryStats) {
m.BlocksQueried += other.BlocksQueried
m.MergedSeriesCount += other.MergedSeriesCount
m.MergedChunksCount += other.MergedChunksCount
m.DataDownloadedSizeSum += other.DataDownloadedSizeSum

m.PostingsFetched += other.PostingsFetched
m.PostingsToFetch += other.PostingsToFetch
Expand Down
113 changes: 72 additions & 41 deletions pkg/store/hintspb/hints.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/store/hintspb/hints.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,6 @@ message QueryStats {
int64 chunks_fetched = 17;
int64 chunks_fetched_size_sum = 18;
int64 chunks_fetch_count = 19;

int64 data_downloaded_size_sum = 20;
}

0 comments on commit cfedce8

Please sign in to comment.