Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/bucket: fix data race #6575

Merged
merged 2 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,24 @@ func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats {
return stats
}

type sortedMatchers []*labels.Matcher

func newSortedMatchers(matchers []*labels.Matcher) sortedMatchers {
sort.Slice(matchers, func(i, j int) bool {
if matchers[i].Type == matchers[j].Type {
if matchers[i].Name == matchers[j].Name {
return matchers[i].Value < matchers[j].Value
}
return matchers[i].Name < matchers[j].Name
}
return matchers[i].Type < matchers[j].Type
})

return matchers
}

func (b *blockSeriesClient) ExpandPostings(
matchers []*labels.Matcher,
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter)
Expand Down Expand Up @@ -1284,6 +1300,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
continue
}

sortedBlockMatchers := newSortedMatchers(blockMatchers)

blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers)

if s.debugLogging {
Expand Down Expand Up @@ -1326,7 +1344,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
"block.resolution": blk.meta.Thanos.Downsample.Resolution,
})

if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter); err != nil {
if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter); err != nil {
span.Finish()
return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID)
}
Expand Down Expand Up @@ -1527,6 +1545,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
continue
}

sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels)

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
Expand Down Expand Up @@ -1581,7 +1601,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
); err != nil {
return err
Expand Down Expand Up @@ -1727,6 +1747,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
reqSeriesMatchersNoExtLabels = append(reqSeriesMatchersNoExtLabels, m)
}

sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels)

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
Expand Down Expand Up @@ -1775,7 +1797,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
); err != nil {
return err
Expand Down Expand Up @@ -2196,23 +2218,13 @@ func (r *bucketIndexReader) reset() {
// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
// Shortcut the case of `len(postingGroups) == 0`. It will only happen when no
// matchers specified, and we don't need to fetch expanded postings from cache.
if len(ms) == 0 {
return nil, nil
}

// Sort matchers to make sure we generate the same cache key.
sort.Slice(ms, func(i, j int) bool {
if ms[i].Type == ms[j].Type {
if ms[i].Name == ms[j].Name {
return ms[i].Value < ms[j].Value
}
return ms[i].Name < ms[j].Name
}
return ms[i].Type < ms[j].Type
})
hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter)
if err != nil {
return nil, err
Expand Down
121 changes: 118 additions & 3 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ func benchmarkExpandedPostings(

t.ResetTimer()
for i := 0; i < t.N(); i++ {
p, err := indexr.ExpandedPostings(context.Background(), c.matchers, NewBytesLimiterFactory(0)(nil))
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil))
testutil.Ok(t, err)
testutil.Equals(t, c.expectedLen, len(p))
}
Expand Down Expand Up @@ -1261,7 +1261,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo")
// Match nothing.
matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*")
ps, err := indexr.ExpandedPostings(context.Background(), []*labels.Matcher{matcher1, matcher2}, NewBytesLimiterFactory(0)(nil))
ps, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil))
testutil.Ok(t, err)
testutil.Equals(t, len(ps), 0)
// Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings.
Expand Down Expand Up @@ -2562,6 +2562,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
// TODO FIXME! testutil.Ok calls b.Fatalf under the hood, which
// must be called only from the goroutine running the Benchmark function.
testutil.Ok(b, err)
sortedMatchers := newSortedMatchers(matchers)

dummyHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{})
blockClient := newBlockSeriesClient(
Expand All @@ -2577,7 +2578,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
dummyHistogram,
nil,
)
testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter))
testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter))
defer blockClient.Close()

// Ensure at least 1 series has been returned (as expected).
Expand Down Expand Up @@ -3049,3 +3050,117 @@ func TestPostingGroupMerge(t *testing.T) {
})
}
}

// TestExpandedPostings is a test whether there is a race between multiple ExpandPostings() calls.
func TestExpandedPostingsRace(t *testing.T) {
const blockCount = 10

tmpDir := t.TempDir()
t.Cleanup(func() {
testutil.Ok(t, os.RemoveAll(tmpDir))
})

bkt := objstore.NewInMemBucket()
t.Cleanup(func() {
testutil.Ok(t, bkt.Close())
})

// Create a block.
head, _ := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "head"),
SamplesPerSeries: 10,
ScrapeInterval: 15 * time.Second,
Series: 1000,
PrependLabels: nil,
Random: rand.New(rand.NewSource(120)),
SkipChunks: true,
})
blockID := createBlockFromHead(t, tmpDir, head)

bucketBlocks := make([]*bucketBlock, 0, blockCount)

for i := 0; i < blockCount; i++ {
ul := ulid.MustNew(uint64(i), rand.New(rand.NewSource(444)))

// Upload the block to the bucket.
thanosMeta := metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: fmt.Sprintf("%d", i)}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}
m, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String()))
testutil.Ok(t, err)

m.Thanos = thanosMeta
m.BlockMeta.ULID = ul

e2eutil.Copy(t, filepath.Join(tmpDir, blockID.String()), filepath.Join(tmpDir, ul.String()))
testutil.Ok(t, m.WriteToDir(log.NewLogfmtLogger(os.Stderr), filepath.Join(tmpDir, ul.String())))
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), log.NewLogfmtLogger(os.Stderr), bkt, filepath.Join(tmpDir, ul.String()), metadata.NoneFunc))

r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, ul, DefaultPostingOffsetInMemorySampling)
testutil.Ok(t, err)

blk, err := newBucketBlock(
context.Background(),
log.NewLogfmtLogger(os.Stderr),
newBucketStoreMetrics(nil),
m,
bkt,
filepath.Join(tmpDir, ul.String()),
noopCache{},
nil,
r,
NewGapBasedPartitioner(PartitionerMaxGapSize),
nil,
nil,
)
testutil.Ok(t, err)

bucketBlocks = append(bucketBlocks, blk)
}

tm, cancel := context.WithTimeout(context.Background(), 40*time.Second)
t.Cleanup(cancel)

l := sync.Mutex{}
previousRefs := make(map[int][]storage.SeriesRef)

for {
if tm.Err() != nil {
break
}

m := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
labels.MustNewMatcher(labels.MatchRegexp, "j", ".+"),
labels.MustNewMatcher(labels.MatchRegexp, "i", ".+"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
labels.MustNewMatcher(labels.MatchRegexp, "j", ".+"),
labels.MustNewMatcher(labels.MatchRegexp, "i", ".+"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
}

wg := &sync.WaitGroup{}
for i, bb := range bucketBlocks {
wg.Add(1)
i := i
bb := bb
go func(i int, bb *bucketBlock) {
refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil))
testutil.Ok(t, err)
defer wg.Done()

l.Lock()
defer l.Unlock()
if previousRefs[i] != nil {
testutil.Equals(t, previousRefs[i], refs)
} else {
previousRefs[i] = refs
}
}(i, bb)
}
wg.Wait()
}
}