Skip to content

Commit

Permalink
Merge pull request #65 from vinted/0320006fixes
Browse files Browse the repository at this point in the history
*: pull in fixes
  • Loading branch information
GiedriusS authored Aug 2, 2023
2 parents 068bd6d + e4294ce commit 78f55b3
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 25 deletions.
55 changes: 33 additions & 22 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,24 @@ func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats {
return stats
}

type sortedMatchers []*labels.Matcher

func newSortedMatchers(matchers []*labels.Matcher) sortedMatchers {
sort.Slice(matchers, func(i, j int) bool {
if matchers[i].Type == matchers[j].Type {
if matchers[i].Name == matchers[j].Name {
return matchers[i].Value < matchers[j].Value
}
return matchers[i].Name < matchers[j].Name
}
return matchers[i].Type < matchers[j].Type
})

return matchers
}

func (b *blockSeriesClient) ExpandPostings(
matchers []*labels.Matcher,
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter)
Expand Down Expand Up @@ -1284,6 +1300,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
continue
}

sortedBlockMatchers := newSortedMatchers(blockMatchers)

blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers)

if s.debugLogging {
Expand Down Expand Up @@ -1326,7 +1344,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
"block.resolution": blk.meta.Thanos.Downsample.Resolution,
})

if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter); err != nil {
if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter); err != nil {
span.Finish()
return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID)
}
Expand Down Expand Up @@ -1527,6 +1545,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
continue
}

sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels)

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
Expand Down Expand Up @@ -1581,7 +1601,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
); err != nil {
return err
Expand Down Expand Up @@ -1727,6 +1747,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
reqSeriesMatchersNoExtLabels = append(reqSeriesMatchersNoExtLabels, m)
}

sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels)

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
Expand Down Expand Up @@ -1775,7 +1797,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
); err != nil {
return err
Expand Down Expand Up @@ -2196,23 +2218,13 @@ func (r *bucketIndexReader) reset() {
// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
// Shortcut the case of `len(postingGroups) == 0`. It will only happen when no
// matchers specified, and we don't need to fetch expanded postings from cache.
if len(ms) == 0 {
return nil, nil
}

// Sort matchers to make sure we generate the same cache key.
sort.Slice(ms, func(i, j int) bool {
if ms[i].Type == ms[j].Type {
if ms[i].Name == ms[j].Name {
return ms[i].Value < ms[j].Value
}
return ms[i].Name < ms[j].Name
}
return ms[i].Type < ms[j].Type
})
hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3214,9 +3226,14 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
// loadChunks will read range [start, end] from the segment file with sequence number seq.
// This data range covers chunks starting at supplied offsets.
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error {
var locked bool
fetchBegin := time.Now()
defer func() {
if !locked {
r.mtx.Lock()
}
r.stats.ChunksFetchDurationSum += time.Since(fetchBegin)
r.mtx.Unlock()
}()

// Get a reader for the required range.
Expand All @@ -3227,15 +3244,9 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
defer runutil.CloseWithLogOnErr(r.block.logger, reader, "readChunkRange close range reader")
bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize)

locked := true
locked = true
r.mtx.Lock()

defer func() {
if locked {
r.mtx.Unlock()
}
}()

r.stats.chunksFetchCount++
r.stats.chunksFetched += len(pIdxs)
r.stats.ChunksFetchedSizeSum += units.Base2Bytes(int(part.End - part.Start))
Expand Down
7 changes: 4 additions & 3 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,7 @@ func benchmarkExpandedPostings(

t.ResetTimer()
for i := 0; i < t.N(); i++ {
p, err := indexr.ExpandedPostings(context.Background(), c.matchers, NewBytesLimiterFactory(0)(nil))
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil))
testutil.Ok(t, err)
testutil.Equals(t, c.expectedLen, len(p))
}
Expand Down Expand Up @@ -1264,7 +1264,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo")
// Match nothing.
matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*")
ps, err := indexr.ExpandedPostings(context.Background(), []*labels.Matcher{matcher1, matcher2}, NewBytesLimiterFactory(0)(nil))
ps, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil))
testutil.Ok(t, err)
testutil.Equals(t, len(ps), 0)
// Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings.
Expand Down Expand Up @@ -2565,6 +2565,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
// TODO FIXME! testutil.Ok calls b.Fatalf under the hood, which
// must be called only from the goroutine running the Benchmark function.
testutil.Ok(b, err)
sortedMatchers := newSortedMatchers(matchers)

dummyHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{})
blockClient := newBlockSeriesClient(
Expand All @@ -2580,7 +2581,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
dummyHistogram,
nil,
)
testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter))
testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter))
defer blockClient.Close()

// Ensure at least 1 series has been returned (as expected).
Expand Down

0 comments on commit 78f55b3

Please sign in to comment.