From d2d53e575b489a8cbfc9e1723d0e3f62a68faf39 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Wed, 20 Oct 2021 22:56:15 +0200 Subject: [PATCH] Fix data race in `BucketedBytes` pool (#4792) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix data race in BucketedBytes pool Previous test didn't detect the data race: we copied the bytes header to the bytes.Buffer so when appending to the slice we were not modifying the original one. However, the usage of this in bucketChunkReader.save() actually modifies the referenced slice, so the test was modified to test that it can be done safely. The race condition happened because we were reading the referenced slice capacity after putting it back to the pool, when someone else might already retrieved and modified it. Before modifying the implementation, this was the data race reported: ================== WARNING: DATA RACE Read at 0x00c0000bc900 by goroutine 36: github.com/thanos-io/thanos/pkg/pool.(*BucketedBytes).Put() /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool.go:124 +0x1f9 github.com/thanos-io/thanos/pkg/pool.TestRacePutGet.func1() /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:108 +0xfa github.com/thanos-io/thanos/pkg/pool.TestRacePutGet·dwrap·3() /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:119 +0x65 Previous write at 0x00c0000bc900 by goroutine 27: github.com/thanos-io/thanos/pkg/pool.TestRacePutGet.func1() /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:94 +0x1fa github.com/thanos-io/thanos/pkg/pool.TestRacePutGet·dwrap·3() /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:119 +0x65 Goroutine 36 (running) created at: github.com/thanos-io/thanos/pkg/pool.TestRacePutGet() /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:119 +0x257 testing.tRunner() /usr/local/Cellar/go/1.17/libexec/src/testing/testing.go:1259 +0x22f testing.(*T).Run·dwrap·21() 1 Fix data race in BucketedBytes pool /usr/local/Cellar/go/1.17/libexec/src/testing/testing.go:1306 +0x47 Goroutine 27 (running) created at: github.com/thanos-io/thanos/pkg/pool.TestRacePutGet() /Users/oleg/w/github.com/thanos-io/thanos/pkg/pool/pool_test.go:119 +0x257 testing.tRunner() /usr/local/Cellar/go/1.17/libexec/src/testing/testing.go:1259 +0x22f testing.(*T).Run·dwrap·21() /usr/local/Cellar/go/1.17/libexec/src/testing/testing.go:1306 +0x47 ================== Signed-off-by: Oleg Zaytsev * Update CHANGELOG.md Signed-off-by: Oleg Zaytsev * goimports fix Signed-off-by: Oleg Zaytsev --- CHANGELOG.md | 1 + pkg/pool/pool.go | 9 ++++--- pkg/pool/pool_test.go | 56 +++++++++++++++++++++++-------------------- 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ceefc193a..e28c8a1242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races. - [#4754](https://github.com/thanos-io/thanos/pull/4754) Query: Fix possible panic on stores endpoint. - [#4753](https://github.com/thanos-io/thanos/pull/4753) Store: validate block sync concurrency parameter +- [#4792](https://github.com/thanos-io/thanos/pull/4792) Store: Fix data race in BucketedBytes pool. ## [v0.23.1](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.10.1 diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index cbd034e9e7..a7eb98c854 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -107,8 +107,9 @@ func (p *BucketedBytes) Put(b *[]byte) { return } + sz := cap(*b) for i, bktSize := range p.sizes { - if cap(*b) > bktSize { + if sz > bktSize { continue } *b = (*b)[:0] @@ -118,13 +119,11 @@ func (p *BucketedBytes) Put(b *[]byte) { p.mtx.Lock() defer p.mtx.Unlock() - // We could assume here that our users will not make the slices larger // but lets be on the safe side to avoid an underflow of p.usedTotal. - sz := uint64(cap(*b)) - if sz >= p.usedTotal { + if uint64(sz) >= p.usedTotal { p.usedTotal = 0 } else { - p.usedTotal -= sz + p.usedTotal -= uint64(sz) } } diff --git a/pkg/pool/pool_test.go b/pkg/pool/pool_test.go index a4140361d2..14c8350acb 100644 --- a/pkg/pool/pool_test.go +++ b/pkg/pool/pool_test.go @@ -4,8 +4,7 @@ package pool import ( - "bytes" - "fmt" + "strings" "sync" "testing" "time" @@ -71,52 +70,57 @@ func TestRacePutGet(t *testing.T) { s := sync.WaitGroup{} - // Start two goroutines: they always Get and Put two byte slices - // to which they write 'foo' / 'barbazbaz' and check if the data is still + const goroutines = 100 + + // Start multiple goroutines: they always Get and Put two byte slices + // to which they write their contents and check if the data is still // there after writing it, before putting it back. - errs := make(chan error, 2) - stop := make(chan bool, 2) + errs := make(chan error, goroutines) + stop := make(chan struct{}) - f := func(txt string) { + f := func(txt string, grow bool) { defer s.Done() for { select { case <-stop: return default: - c, err := chunkPool.Get(3) - if err != nil { - errs <- errors.Wrapf(err, "goroutine %s", txt) - return - } - - buf := bytes.NewBuffer(*c) - - _, err = fmt.Fprintf(buf, "%s", txt) + c, err := chunkPool.Get(len(txt)) if err != nil { errs <- errors.Wrapf(err, "goroutine %s", txt) return } - if buf.String() != txt { + *c = append(*c, txt...) + if string(*c) != txt { errs <- errors.New("expected to get the data just written") return } + if grow { + *c = append(*c, txt...) + *c = append(*c, txt...) + if string(*c) != txt+txt+txt { + errs <- errors.New("expected to get the data just written") + return + } + } - b := buf.Bytes() - chunkPool.Put(&b) + chunkPool.Put(c) } } } - s.Add(2) - go f("foo") - go f("barbazbaz") - - time.Sleep(5 * time.Second) - stop <- true - stop <- true + for i := 0; i < goroutines; i++ { + s.Add(1) + // make sure we start multiple goroutines with same len buf requirements, to hit same pools + s := strings.Repeat(string(byte(i)), i%10) + // some of the goroutines will append more elements to the provided slice + grow := i%2 == 0 + go f(s, grow) + } + time.Sleep(1 * time.Second) + close(stop) s.Wait() select { case err := <-errs: