Skip to content

Commit

Permalink
import: fix memory leak (pingcap#39332) (pingcap#39408)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 5, 2022
1 parent 2e9f2c8 commit 4b777b4
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 2 deletions.
24 changes: 22 additions & 2 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"go.uber.org/zap"
)

const maxAvailableBufSize int = 20

// invalidIterator is a trimmed down Iterator type which is invalid.
type invalidIterator struct {
kv.Iterator
Expand Down Expand Up @@ -91,15 +93,33 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) {
buf.idx = 0
buf.cap = len(buf.buf)
mb.Lock()
if len(mb.availableBufs) >= maxAvailableBufSize {
// too many byte buffers, evict one byte buffer and continue
evictedByteBuf := mb.availableBufs[0]
evictedByteBuf.destroy()
mb.availableBufs = mb.availableBufs[1:]
}
mb.availableBufs = append(mb.availableBufs, buf)
mb.Unlock()
}

func (mb *kvMemBuf) AllocateBuf(size int) {
mb.Lock()
size = utils.MaxInt(units.MiB, int(utils.NextPowerOfTwo(int64(size)))*2)
if len(mb.availableBufs) > 0 && mb.availableBufs[0].cap >= size {
mb.buf = mb.availableBufs[0]
var (
existingBuf *bytesBuf
existingBufIdx int
)
for i, buf := range mb.availableBufs {
if buf.cap >= size {
existingBuf = buf
existingBufIdx = i
break
}
}
if existingBuf != nil {
mb.buf = existingBuf
mb.availableBufs[existingBufIdx] = mb.availableBufs[0]
mb.availableBufs = mb.availableBufs[1:]
} else {
mb.buf = newBytesBuf(size)
Expand Down
101 changes: 101 additions & 0 deletions br/pkg/lightning/backend/kv/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"testing"

"github.com/docker/go-units"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/parser/mysql"
)
Expand All @@ -34,3 +35,103 @@ func (s *kvSuite) TestSession(c *C) {
_, err := session.Txn(true)
c.Assert(err, IsNil)
}

func (s *kvSuite) TestKVMemBufInterweaveAllocAndRecycle(c *C) {
type testCase struct {
AllocSizes []int
FinalAvailableByteBufCaps []int
}
for _, tc := range []testCase{
{
AllocSizes: []int{
1 * units.MiB,
2 * units.MiB,
3 * units.MiB,
4 * units.MiB,
5 * units.MiB,
},
// [2] => [2,4] => [2,4,8] => [4,2,8] => [4,2,8,16]
FinalAvailableByteBufCaps: []int{
4 * units.MiB,
2 * units.MiB,
8 * units.MiB,
16 * units.MiB,
},
},
{
AllocSizes: []int{
5 * units.MiB,
4 * units.MiB,
3 * units.MiB,
2 * units.MiB,
1 * units.MiB,
},
// [16] => [16] => [16] => [16] => [16]
FinalAvailableByteBufCaps: []int{16 * units.MiB},
},
{
AllocSizes: []int{5, 4, 3, 2, 1},
// [1] => [1] => [1] => [1] => [1]
FinalAvailableByteBufCaps: []int{1 * units.MiB},
},
{
AllocSizes: []int{
1 * units.MiB,
2 * units.MiB,
3 * units.MiB,
2 * units.MiB,
1 * units.MiB,
5 * units.MiB,
},
// [2] => [2,4] => [2,4,8] => [2,8,4] => [8,4,2] => [8,4,2,16]
FinalAvailableByteBufCaps: []int{
8 * units.MiB,
4 * units.MiB,
2 * units.MiB,
16 * units.MiB,
},
},
} {
testKVMemBuf := &kvMemBuf{}
for _, allocSize := range tc.AllocSizes {
testKVMemBuf.AllocateBuf(allocSize)
testKVMemBuf.Recycle(testKVMemBuf.buf)
}
c.Assert(len(testKVMemBuf.availableBufs), Equals, len(tc.FinalAvailableByteBufCaps))
for i, bb := range testKVMemBuf.availableBufs {
c.Assert(bb.cap, Equals, tc.FinalAvailableByteBufCaps[i])
}
}
}

func (s *kvSuite) TestKVMemBufBatchAllocAndRecycle(c *C) {
testKVMemBuf := &kvMemBuf{}
bBufs := []*bytesBuf{}
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(1 * units.MiB)
bBufs = append(bBufs, testKVMemBuf.buf)
}
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(2 * units.MiB)
bBufs = append(bBufs, testKVMemBuf.buf)
}
for _, bb := range bBufs {
testKVMemBuf.Recycle(bb)
}
c.Assert(len(testKVMemBuf.availableBufs), Equals, maxAvailableBufSize)
for _, bb := range testKVMemBuf.availableBufs {
c.Assert(bb.cap, Equals, 4*units.MiB)
}
bBufs = bBufs[:0]
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(1 * units.MiB)
bb := testKVMemBuf.buf
c.Assert(bb.cap, Equals, 4*units.MiB)
bBufs = append(bBufs, bb)
c.Assert(len(testKVMemBuf.availableBufs), Equals, maxAvailableBufSize-i-1)
}
for _, bb := range bBufs {
testKVMemBuf.Recycle(bb)
}
c.Assert(len(testKVMemBuf.availableBufs), Equals, maxAvailableBufSize)
}

0 comments on commit 4b777b4

Please sign in to comment.