diff --git a/br/pkg/lightning/backend/kv/BUILD.bazel b/br/pkg/lightning/backend/kv/BUILD.bazel new file mode 100644 index 0000000000000..b0da8a0e7deb4 --- /dev/null +++ b/br/pkg/lightning/backend/kv/BUILD.bazel @@ -0,0 +1,78 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "kv", + srcs = [ + "allocator.go", + "kv2sql.go", + "session.go", + "sql2kv.go", + "types.go", + ], + importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/kv", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/lightning/common", + "//br/pkg/lightning/log", + "//br/pkg/lightning/manual", + "//br/pkg/lightning/metric", + "//br/pkg/lightning/verification", + "//br/pkg/logutil", + "//br/pkg/redact", + "//br/pkg/utils", + "//expression", + "//kv", + "//meta/autoid", + "//parser/model", + "//parser/mysql", + "//sessionctx", + "//sessionctx/variable", + "//table", + "//table/tables", + "//tablecodec", + "//types", + "//util/chunk", + "//util/mathutil", + "//util/topsql/stmtstats", + "@com_github_docker_go_units//:go-units", + "@com_github_pingcap_errors//:errors", + "@org_golang_x_exp//slices", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", + ], +) + +go_test( + name = "kv_test", + timeout = "short", + srcs = [ + "session_internal_test.go", + "session_test.go", + "sql2kv_test.go", + ], + embed = [":kv"], + flaky = True, + race = "on", + deps = [ + "//br/pkg/lightning/common", + "//br/pkg/lightning/log", + "//br/pkg/lightning/verification", + "//ddl", + "//kv", + "//meta/autoid", + "//parser", + "//parser/ast", + "//parser/model", + "//parser/mysql", + "//sessionctx", + "//table", + "//table/tables", + "//tablecodec", + "//types", + "//util/mock", + "@com_github_docker_go_units//:go-units", + "@com_github_stretchr_testify//require", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", + ], +) diff --git a/br/pkg/lightning/backend/kv/session.go b/br/pkg/lightning/backend/kv/session.go index 0e2751135c061..770a9bdde66de 100644 --- a/br/pkg/lightning/backend/kv/session.go +++ b/br/pkg/lightning/backend/kv/session.go @@ -38,6 +38,8 @@ import ( "go.uber.org/zap" ) +const maxAvailableBufSize int = 20 + // invalidIterator is a trimmed down Iterator type which is invalid. type invalidIterator struct { kv.Iterator @@ -92,6 +94,12 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) { buf.idx = 0 buf.cap = len(buf.buf) mb.Lock() + if len(mb.availableBufs) >= maxAvailableBufSize { + // too many byte buffers, evict one byte buffer and continue + evictedByteBuf := mb.availableBufs[0] + evictedByteBuf.destroy() + mb.availableBufs = mb.availableBufs[1:] + } mb.availableBufs = append(mb.availableBufs, buf) mb.Unlock() } @@ -99,8 +107,20 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) { func (mb *kvMemBuf) AllocateBuf(size int) { mb.Lock() size = mathutil.Max(units.MiB, int(utils.NextPowerOfTwo(int64(size)))*2) - if len(mb.availableBufs) > 0 && mb.availableBufs[0].cap >= size { - mb.buf = mb.availableBufs[0] + var ( + existingBuf *bytesBuf + existingBufIdx int + ) + for i, buf := range mb.availableBufs { + if buf.cap >= size { + existingBuf = buf + existingBufIdx = i + break + } + } + if existingBuf != nil { + mb.buf = existingBuf + mb.availableBufs[existingBufIdx] = mb.availableBufs[0] mb.availableBufs = mb.availableBufs[1:] } else { mb.buf = newBytesBuf(size) diff --git a/br/pkg/lightning/backend/kv/session_internal_test.go b/br/pkg/lightning/backend/kv/session_internal_test.go new file mode 100644 index 0000000000000..97ebd8cc82d1b --- /dev/null +++ b/br/pkg/lightning/backend/kv/session_internal_test.go @@ -0,0 +1,126 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "testing" + + "github.com/docker/go-units" + "github.com/stretchr/testify/require" +) + +func TestKVMemBufInterweaveAllocAndRecycle(t *testing.T) { + type testCase struct { + AllocSizes []int + FinalAvailableByteBufCaps []int + } + for _, tc := range []testCase{ + { + AllocSizes: []int{ + 1 * units.MiB, + 2 * units.MiB, + 3 * units.MiB, + 4 * units.MiB, + 5 * units.MiB, + }, + // [2] => [2,4] => [2,4,8] => [4,2,8] => [4,2,8,16] + FinalAvailableByteBufCaps: []int{ + 4 * units.MiB, + 2 * units.MiB, + 8 * units.MiB, + 16 * units.MiB, + }, + }, + { + AllocSizes: []int{ + 5 * units.MiB, + 4 * units.MiB, + 3 * units.MiB, + 2 * units.MiB, + 1 * units.MiB, + }, + // [16] => [16] => [16] => [16] => [16] + FinalAvailableByteBufCaps: []int{16 * units.MiB}, + }, + { + AllocSizes: []int{5, 4, 3, 2, 1}, + // [1] => [1] => [1] => [1] => [1] + FinalAvailableByteBufCaps: []int{1 * units.MiB}, + }, + { + AllocSizes: []int{ + 1 * units.MiB, + 2 * units.MiB, + 3 * units.MiB, + 2 * units.MiB, + 1 * units.MiB, + 5 * units.MiB, + }, + // [2] => [2,4] => [2,4,8] => [2,8,4] => [8,4,2] => [8,4,2,16] + FinalAvailableByteBufCaps: []int{ + 8 * units.MiB, + 4 * units.MiB, + 2 * units.MiB, + 16 * units.MiB, + }, + }, + } { + testKVMemBuf := &kvMemBuf{} + for _, allocSize := range tc.AllocSizes { + testKVMemBuf.AllocateBuf(allocSize) + testKVMemBuf.Recycle(testKVMemBuf.buf) + } + require.Equal(t, len(tc.FinalAvailableByteBufCaps), len(testKVMemBuf.availableBufs)) + for i, bb := range testKVMemBuf.availableBufs { + require.Equal(t, tc.FinalAvailableByteBufCaps[i], bb.cap) + } + } +} + +func TestKVMemBufBatchAllocAndRecycle(t *testing.T) { + type testCase struct { + AllocSizes []int + FinalAvailableByteBufCaps []int + } + testKVMemBuf := &kvMemBuf{} + bBufs := []*bytesBuf{} + for i := 0; i < maxAvailableBufSize; i++ { + testKVMemBuf.AllocateBuf(1 * units.MiB) + bBufs = append(bBufs, testKVMemBuf.buf) + } + for i := 0; i < maxAvailableBufSize; i++ { + testKVMemBuf.AllocateBuf(2 * units.MiB) + bBufs = append(bBufs, testKVMemBuf.buf) + } + for _, bb := range bBufs { + testKVMemBuf.Recycle(bb) + } + require.Equal(t, maxAvailableBufSize, len(testKVMemBuf.availableBufs)) + for _, bb := range testKVMemBuf.availableBufs { + require.Equal(t, 4*units.MiB, bb.cap) + } + bBufs = bBufs[:0] + for i := 0; i < maxAvailableBufSize; i++ { + testKVMemBuf.AllocateBuf(1 * units.MiB) + bb := testKVMemBuf.buf + require.Equal(t, 4*units.MiB, bb.cap) + bBufs = append(bBufs, bb) + require.Equal(t, maxAvailableBufSize-i-1, len(testKVMemBuf.availableBufs)) + } + for _, bb := range bBufs { + testKVMemBuf.Recycle(bb) + } + require.Equal(t, maxAvailableBufSize, len(testKVMemBuf.availableBufs)) +}