From 65315fdaace7d17d32c7603a1a28766872d237a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 1 Aug 2023 13:23:30 +0300 Subject: [PATCH 1/2] store/bucket: remove sort.Slice data race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The matchers slice is now sorted in each call but that introduces a data race because the slice is shared between all calls. Do the sorting once on the outermost function. Signed-off-by: Giedrius Statkevičius --- pkg/store/bucket.go | 42 ++++++++++++++++++++++++++-------------- pkg/store/bucket_test.go | 7 ++++--- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d16ad04c18..6811bd14b1 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -962,8 +962,24 @@ func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats { return stats } +type sortedMatchers []*labels.Matcher + +func newSortedMatchers(matchers []*labels.Matcher) sortedMatchers { + sort.Slice(matchers, func(i, j int) bool { + if matchers[i].Type == matchers[j].Type { + if matchers[i].Name == matchers[j].Name { + return matchers[i].Value < matchers[j].Value + } + return matchers[i].Name < matchers[j].Name + } + return matchers[i].Type < matchers[j].Type + }) + + return matchers +} + func (b *blockSeriesClient) ExpandPostings( - matchers []*labels.Matcher, + matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter) @@ -1284,6 +1300,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie continue } + sortedBlockMatchers := newSortedMatchers(blockMatchers) + blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers) if s.debugLogging { @@ -1326,7 +1344,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie "block.resolution": blk.meta.Thanos.Downsample.Resolution, }) - if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter); err != nil { + if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter); err != nil { span.Finish() return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID) } @@ -1527,6 +1545,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq continue } + sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels) + resHints.AddQueriedBlock(b.meta.ULID) indexr := b.indexReader() @@ -1581,7 +1601,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq defer blockClient.Close() if err := blockClient.ExpandPostings( - reqSeriesMatchersNoExtLabels, + sortedReqSeriesMatchersNoExtLabels, seriesLimiter, ); err != nil { return err @@ -1727,6 +1747,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR reqSeriesMatchersNoExtLabels = append(reqSeriesMatchersNoExtLabels, m) } + sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels) + resHints.AddQueriedBlock(b.meta.ULID) indexr := b.indexReader() @@ -1775,7 +1797,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR defer blockClient.Close() if err := blockClient.ExpandPostings( - reqSeriesMatchersNoExtLabels, + sortedReqSeriesMatchersNoExtLabels, seriesLimiter, ); err != nil { return err @@ -2196,23 +2218,13 @@ func (r *bucketIndexReader) reset() { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { return nil, nil } - // Sort matchers to make sure we generate the same cache key. - sort.Slice(ms, func(i, j int) bool { - if ms[i].Type == ms[j].Type { - if ms[i].Name == ms[j].Name { - return ms[i].Value < ms[j].Value - } - return ms[i].Name < ms[j].Name - } - return ms[i].Type < ms[j].Type - }) hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter) if err != nil { return nil, err diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 6cd806c659..67a43cf74b 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1231,7 +1231,7 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), c.matchers, NewBytesLimiterFactory(0)(nil)) + p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil)) testutil.Ok(t, err) testutil.Equals(t, c.expectedLen, len(p)) } @@ -1264,7 +1264,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") // Match nothing. matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") - ps, err := indexr.ExpandedPostings(context.Background(), []*labels.Matcher{matcher1, matcher2}, NewBytesLimiterFactory(0)(nil)) + ps, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil)) testutil.Ok(t, err) testutil.Equals(t, len(ps), 0) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. @@ -2565,6 +2565,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet // TODO FIXME! testutil.Ok calls b.Fatalf under the hood, which // must be called only from the goroutine running the Benchmark function. testutil.Ok(b, err) + sortedMatchers := newSortedMatchers(matchers) dummyHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{}) blockClient := newBlockSeriesClient( @@ -2580,7 +2581,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet dummyHistogram, nil, ) - testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter)) + testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter)) defer blockClient.Close() // Ensure at least 1 series has been returned (as expected). From e4294ceac56df4b031c975886e9d16d15b078569 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 1 Aug 2023 23:48:42 -0700 Subject: [PATCH 2/2] fix data race of chunk reader stats (#6578) Signed-off-by: Ben Ye --- pkg/store/bucket.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 6811bd14b1..5125a8d007 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -3226,9 +3226,14 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ // loadChunks will read range [start, end] from the segment file with sequence number seq. // This data range covers chunks starting at supplied offsets. func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { + var locked bool fetchBegin := time.Now() defer func() { + if !locked { + r.mtx.Lock() + } r.stats.ChunksFetchDurationSum += time.Since(fetchBegin) + r.mtx.Unlock() }() // Get a reader for the required range. @@ -3239,15 +3244,9 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a defer runutil.CloseWithLogOnErr(r.block.logger, reader, "readChunkRange close range reader") bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize) - locked := true + locked = true r.mtx.Lock() - defer func() { - if locked { - r.mtx.Unlock() - } - }() - r.stats.chunksFetchCount++ r.stats.chunksFetched += len(pIdxs) r.stats.ChunksFetchedSizeSum += units.Base2Bytes(int(part.End - part.Start))