Skip to content

Commit

Permalink
Revert "Fix MemPostings.Add and MemPostings.Get data race (promet…
Browse files Browse the repository at this point in the history
…heus#15141)"

This reverts commit 50ef0dc.

Memory allocation goes so high in Prombench that the system is unusable.
  • Loading branch information
bboreham committed Nov 3, 2024
1 parent e2e01c1 commit d87ec32
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 78 deletions.
8 changes: 4 additions & 4 deletions tsdb/head_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
defer h.Close()

for i := 0; i < b.N; i++ {
h.getOrCreate(uint64(i), labels.FromStrings(labels.MetricName, "test", "a", strconv.Itoa(i), "b", strconv.Itoa(i%10), "c", strconv.Itoa(i%100), "d", strconv.Itoa(i/2), "e", strconv.Itoa(i/4)))
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)))
}
}

Expand All @@ -61,8 +61,8 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
i := int(count.Inc())
h.getOrCreate(uint64(i), labels.FromStrings(labels.MetricName, "test", "a", strconv.Itoa(i), "b", strconv.Itoa(i%10), "c", strconv.Itoa(i%100), "d", strconv.Itoa(i/2), "e", strconv.Itoa(i/4)))
i := count.Inc()
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i))))
}
})
}
Expand All @@ -82,7 +82,7 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
defer h.Close()

for i := 0; i < b.N; i++ {
h.getOrCreate(uint64(i), labels.FromStrings(labels.MetricName, "test", "a", strconv.Itoa(i), "b", strconv.Itoa(i%10), "c", strconv.Itoa(i%100), "d", strconv.Itoa(i/2), "e", strconv.Itoa(i/4)))
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)))
}
}

Expand Down
27 changes: 8 additions & 19 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,13 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) {
p.mtx.Unlock()
}

func appendWithExponentialGrowth[T any](a []T, v T) (_ []T, copied bool) {
func appendWithExponentialGrowth[T any](a []T, v T) []T {
if cap(a) < len(a)+1 {
newList := make([]T, len(a), len(a)*2+1)
copy(newList, a)
a = newList
copied = true
}
return append(a, v), copied
return append(a, v)
}

func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
Expand All @@ -361,26 +360,16 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
nm = map[string][]storage.SeriesRef{}
p.m[l.Name] = nm
}
list, copied := appendWithExponentialGrowth(nm[l.Value], id)
list := appendWithExponentialGrowth(nm[l.Value], id)
nm[l.Value] = list

// Return if it shouldn't be ordered, if it only has one element or if it's already ordered.
// The invariant is that the first n-1 items in the list are already sorted.
if !p.ordered || len(list) == 1 || list[len(list)-1] >= list[len(list)-2] {
if !p.ordered {
return
}

if !copied {
// We have appended to the existing slice,
// and readers may already have a copy of this postings slice,
// so we need to copy it before sorting.
old := list
list = make([]storage.SeriesRef, len(old), cap(old))
copy(list, old)
nm[l.Value] = list
}

// Repair order violations.
// There is no guarantee that no higher ID was inserted before as they may
// be generated independently before adding them to postings.
// We repair order violations on insert. The invariant is that the first n-1
// items in the list are already sorted.
for i := len(list) - 1; i >= 1; i-- {
if list[i] >= list[i-1] {
break
Expand Down
55 changes: 0 additions & 55 deletions tsdb/index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,58 +1475,3 @@ func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
require.Error(t, p.Err())
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}

func TestMemPostings_Unordered_Add_Get(t *testing.T) {
mp := NewMemPostings()
for ref := storage.SeriesRef(1); ref < 8; ref += 2 {
// First, add next series.
next := ref + 1
mp.Add(next, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(next))))
nextPostings := mp.Get(labels.MetricName, "test")

// Now add current ref.
mp.Add(ref, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(ref))))

// Next postings should still reference the next series.
nextExpanded, err := ExpandPostings(nextPostings)
require.NoError(t, err)
require.Len(t, nextExpanded, int(ref))
require.Equal(t, next, nextExpanded[len(nextExpanded)-1])
}
}

func TestMemPostings_Concurrent_Add_Get(t *testing.T) {
refs := make(chan storage.SeriesRef)
wg := sync.WaitGroup{}
wg.Add(1)
t.Cleanup(wg.Wait)
t.Cleanup(func() { close(refs) })

mp := NewMemPostings()
go func() {
defer wg.Done()
for ref := range refs {
mp.Add(ref, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(ref))))
p := mp.Get(labels.MetricName, "test")

_, err := ExpandPostings(p)
if err != nil {
t.Errorf("unexpected error: %s", err)
}
}
}()

for ref := storage.SeriesRef(1); ref < 8; ref += 2 {
// Add next ref in another goroutine so they would race.
refs <- ref + 1
// Add current ref here
mp.Add(ref, labels.FromStrings(labels.MetricName, "test", "series", strconv.Itoa(int(ref))))

// We don't read the value of the postings here,
// this is tested in TestMemPostings_Unordered_Add_Get where it's easier to achieve the determinism.
// This test just checks that there's no data race.
p := mp.Get(labels.MetricName, "test")
_, err := ExpandPostings(p)
require.NoError(t, err)
}
}

0 comments on commit d87ec32

Please sign in to comment.