diff --git a/br/pkg/lightning/backend/external/bench_test.go b/br/pkg/lightning/backend/external/bench_test.go index 66144569ef4dd..82910eb72b562 100644 --- a/br/pkg/lightning/backend/external/bench_test.go +++ b/br/pkg/lightning/backend/external/bench_test.go @@ -20,19 +20,16 @@ import ( "flag" "fmt" "io" - "os" - "runtime/pprof" - "sync" "testing" "time" "github.com/docker/go-units" - "github.com/felixge/fgprof" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/size" + "github.com/stretchr/testify/require" "go.uber.org/atomic" "golang.org/x/sync/errgroup" ) @@ -171,34 +168,20 @@ func TestCompareWriter(t *testing.T) { seed := time.Now().Nanosecond() t.Logf("random seed: %d", seed) var ( - err error now time.Time elapsed time.Duration - fileCPU *os.File - cpuProfCloser func() error - - filenameHeap string - heapProfDoneCh chan struct{} - heapWg *sync.WaitGroup + p = newProfiler(true, true) ) beforeTest := func() { testIdx++ - fileCPU, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", testIdx)) - intest.AssertNoError(err) - cpuProfCloser = fgprof.Start(fileCPU, fgprof.FormatPprof) - - filenameHeap = fmt.Sprintf("heap-profile-%d.prof", testIdx) - heapProfDoneCh, heapWg = recordHeapForMaxInUse(filenameHeap) + p.beforeTest() now = time.Now() } afterClose := func() { elapsed = time.Since(now) - err = cpuProfCloser() - intest.AssertNoError(err) - close(heapProfDoneCh) - heapWg.Wait() + p.afterTest() } suite := &writeTestSuite{ @@ -392,46 +375,20 @@ func TestCompareReaderEvenlyDistributedContent(t *testing.T) { kvCnt, _, _ := createEvenlyDistributedFiles(store, fileSize, fileCnt, subDir) memoryLimit := 64 * 1024 * 1024 - fileIdx := 0 var ( - err error - now time.Time elapsed time.Duration - fileCPU *os.File - cpuProfCloser func() error - - filenameHeap string - heapProfDoneCh chan struct{} - heapWg *sync.WaitGroup + p = newProfiler(true, true) ) - beforeTest := func() { - fileIdx++ - fileCPU, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx)) - intest.AssertNoError(err) - cpuProfCloser = fgprof.Start(fileCPU, fgprof.FormatPprof) - - filenameHeap = fmt.Sprintf("heap-profile-%d.prof", fileIdx) - heapProfDoneCh, heapWg = recordHeapForMaxInUse(filenameHeap) - - now = time.Now() - } - afterClose := func() { - elapsed = time.Since(now) - err = cpuProfCloser() - intest.AssertNoError(err) - close(heapProfDoneCh) - heapWg.Wait() - } suite := &readTestSuite{ store: store, totalKVCnt: kvCnt, concurrency: 100, memoryLimit: memoryLimit, - beforeCreateReader: beforeTest, - afterReaderClose: afterClose, + beforeCreateReader: p.beforeTest, + afterReaderClose: p.afterTest, subDir: subDir, } @@ -495,41 +452,15 @@ func testCompareReaderWithContent( if !*skipCreate { kvCnt, _, _ = createFn(store, *fileSize, *fileCount, *objectPrefix) } - fileIdx := 0 - - var ( - err error - - fileCPU *os.File - cpuProfCloser func() error - - filenameHeap string - heapProfDoneCh chan struct{} - heapWg *sync.WaitGroup - ) - beforeTest := func() { - fileIdx++ - fileCPU, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx)) - intest.AssertNoError(err) - cpuProfCloser = fgprof.Start(fileCPU, fgprof.FormatPprof) - - filenameHeap = fmt.Sprintf("heap-profile-%d.prof", fileIdx) - heapProfDoneCh, heapWg = recordHeapForMaxInUse(filenameHeap) - } - afterClose := func() { - err = cpuProfCloser() - intest.AssertNoError(err) - close(heapProfDoneCh) - heapWg.Wait() - } + p := newProfiler(true, true) suite := &readTestSuite{ store: store, totalKVCnt: kvCnt, concurrency: *concurrency, memoryLimit: *memoryLimit, - beforeCreateReader: beforeTest, - afterReaderClose: afterClose, + beforeCreateReader: p.beforeTest, + afterReaderClose: p.afterTest, subDir: *objectPrefix, } @@ -645,7 +576,9 @@ func testCompareMergeWithContent( t *testing.T, concurrency int, createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) (int, kv.Key, kv.Key), - fn func(t *testing.T, suite *mergeTestSuite)) { + fn func(t *testing.T, suite *mergeTestSuite), + p *profiler, +) { store := openTestingStorage(t) kvCnt := 0 var minKey, maxKey kv.Key @@ -653,29 +586,13 @@ func testCompareMergeWithContent( kvCnt, minKey, maxKey = createFn(store, *fileSize, *fileCount, *objectPrefix) } - fileIdx := 0 - var ( - file *os.File - err error - ) - beforeTest := func() { - file, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx)) - intest.AssertNoError(err) - err = pprof.StartCPUProfile(file) - intest.AssertNoError(err) - } - - afterTest := func() { - pprof.StopCPUProfile() - } - suite := &mergeTestSuite{ store: store, totalKVCnt: kvCnt, concurrency: concurrency, memoryLimit: *memoryLimit, - beforeMerge: beforeTest, - afterMerge: afterTest, + beforeMerge: p.beforeTest, + afterMerge: p.afterTest, subDir: *objectPrefix, minKey: minKey, maxKey: maxKey, @@ -686,16 +603,17 @@ func testCompareMergeWithContent( } func TestMergeBench(t *testing.T) { - testCompareMergeWithContent(t, 1, createAscendingFiles, mergeStep) - testCompareMergeWithContent(t, 1, createEvenlyDistributedFiles, mergeStep) - testCompareMergeWithContent(t, 2, createAscendingFiles, mergeStep) - testCompareMergeWithContent(t, 2, createEvenlyDistributedFiles, mergeStep) - testCompareMergeWithContent(t, 4, createAscendingFiles, mergeStep) - testCompareMergeWithContent(t, 4, createEvenlyDistributedFiles, mergeStep) - testCompareMergeWithContent(t, 8, createAscendingFiles, mergeStep) - testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, mergeStep) - testCompareMergeWithContent(t, 8, createAscendingFiles, newMergeStep) - testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, newMergeStep) + p := newProfiler(true, true) + testCompareMergeWithContent(t, 1, createAscendingFiles, mergeStep, p) + testCompareMergeWithContent(t, 1, createEvenlyDistributedFiles, mergeStep, p) + testCompareMergeWithContent(t, 2, createAscendingFiles, mergeStep, p) + testCompareMergeWithContent(t, 2, createEvenlyDistributedFiles, mergeStep, p) + testCompareMergeWithContent(t, 4, createAscendingFiles, mergeStep, p) + testCompareMergeWithContent(t, 4, createEvenlyDistributedFiles, mergeStep, p) + testCompareMergeWithContent(t, 8, createAscendingFiles, mergeStep, p) + testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, mergeStep, p) + testCompareMergeWithContent(t, 8, createAscendingFiles, newMergeStep, p) + testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, newMergeStep, p) } func TestReadAllDataLargeFiles(t *testing.T) { @@ -739,13 +657,174 @@ func TestReadAllDataLargeFiles(t *testing.T) { intest.AssertNoError(err) endKey, err := hex.DecodeString("00a00000000000000000") intest.AssertNoError(err) - bufPool := membuf.NewPool( + smallBlockBufPool := membuf.NewPool( + membuf.WithBlockNum(0), + membuf.WithBlockSize(smallBlockSize), + ) + largeBlockBufPool := membuf.NewPool( membuf.WithBlockNum(0), membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc), ) output := &memKVsAndBuffers{} now := time.Now() - err = readAllData(ctx, store, dataFiles, statFiles, startKey, endKey, bufPool, output) + + err = readAllData(ctx, store, dataFiles, statFiles, startKey, endKey, smallBlockBufPool, largeBlockBufPool, output) t.Logf("read all data cost: %s", time.Since(now)) intest.AssertNoError(err) } + +func TestReadAllData(t *testing.T) { + // test the case that thread=16, where we will load ~3.2GB data once and this + // step will at most have 4000 files to read, test the case that we have + // + // 1000 files read one KV (~100B), 1000 files read ~900KB, 90 files read 10MB, + // 1 file read 1G. total read size = 1000*100B + 1000*900KB + 90*10MB + 1*1G = 2.8G + + ctx := context.Background() + store := openTestingStorage(t) + readRangeStart := []byte("key00") + readRangeEnd := []byte("key88888888") + keyAfterRange := []byte("key9") + keyAfterRange2 := []byte("key9") + eg := errgroup.Group{} + + fileIdx := 0 + val := make([]byte, 90) + if *skipCreate { + goto finishCreateFiles + } + + cleanOldFiles(ctx, store, "/") + + for ; fileIdx < 1000; fileIdx++ { + fileIdx := fileIdx + eg.Go(func() error { + fileName := fmt.Sprintf("/test%d", fileIdx) + writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID") + err := writer.Init(ctx, 5*1024*1024) + require.NoError(t, err) + key := []byte(fmt.Sprintf("key0%d", fileIdx)) + err = writer.WriteRow(ctx, key, val) + require.NoError(t, err) + + // write some extra data that is greater than readRangeEnd + err = writer.WriteRow(ctx, keyAfterRange, val) + require.NoError(t, err) + err = writer.WriteRow(ctx, keyAfterRange2, make([]byte, 100*1024)) + require.NoError(t, err) + + return writer.Close(ctx) + }) + } + require.NoError(t, eg.Wait()) + t.Log("finish writing 1000 files of 100B") + + for ; fileIdx < 2000; fileIdx++ { + fileIdx := fileIdx + eg.Go(func() error { + fileName := fmt.Sprintf("/test%d", fileIdx) + writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID") + err := writer.Init(ctx, 5*1024*1024) + require.NoError(t, err) + + kvSize := 0 + keyIdx := 0 + for kvSize < 900*1024 { + key := []byte(fmt.Sprintf("key%06d_%d", keyIdx, fileIdx)) + keyIdx++ + kvSize += len(key) + len(val) + err = writer.WriteRow(ctx, key, val) + require.NoError(t, err) + } + + // write some extra data that is greater than readRangeEnd + err = writer.WriteRow(ctx, keyAfterRange, val) + require.NoError(t, err) + err = writer.WriteRow(ctx, keyAfterRange2, make([]byte, 300*1024)) + require.NoError(t, err) + return writer.Close(ctx) + }) + } + require.NoError(t, eg.Wait()) + t.Log("finish writing 1000 files of 900KB") + + for ; fileIdx < 2090; fileIdx++ { + fileIdx := fileIdx + eg.Go(func() error { + fileName := fmt.Sprintf("/test%d", fileIdx) + writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID") + err := writer.Init(ctx, 5*1024*1024) + require.NoError(t, err) + + kvSize := 0 + keyIdx := 0 + for kvSize < 10*1024*1024 { + key := []byte(fmt.Sprintf("key%09d_%d", keyIdx, fileIdx)) + keyIdx++ + kvSize += len(key) + len(val) + err = writer.WriteRow(ctx, key, val) + require.NoError(t, err) + } + + // write some extra data that is greater than readRangeEnd + err = writer.WriteRow(ctx, keyAfterRange, val) + require.NoError(t, err) + err = writer.WriteRow(ctx, keyAfterRange2, make([]byte, 900*1024)) + require.NoError(t, err) + return writer.Close(ctx) + }) + } + require.NoError(t, eg.Wait()) + t.Log("finish writing 90 files of 10MB") + + for ; fileIdx < 2091; fileIdx++ { + fileName := fmt.Sprintf("/test%d", fileIdx) + writer := NewWriterBuilder().BuildOneFile(store, fileName, "writerID") + err := writer.Init(ctx, 5*1024*1024) + require.NoError(t, err) + + kvSize := 0 + keyIdx := 0 + for kvSize < 1024*1024*1024 { + key := []byte(fmt.Sprintf("key%010d_%d", keyIdx, fileIdx)) + keyIdx++ + kvSize += len(key) + len(val) + err = writer.WriteRow(ctx, key, val) + require.NoError(t, err) + } + + // write some extra data that is greater than readRangeEnd + err = writer.WriteRow(ctx, keyAfterRange, val) + require.NoError(t, err) + err = writer.WriteRow(ctx, keyAfterRange2, make([]byte, 900*1024)) + require.NoError(t, err) + err = writer.Close(ctx) + require.NoError(t, err) + } + t.Log("finish writing 1 file of 1G") + +finishCreateFiles: + + dataFiles, statFiles, err := GetAllFileNames(ctx, store, "/") + require.NoError(t, err) + require.Equal(t, 2091, len(dataFiles)) + + p := newProfiler(true, true) + smallBlockBufPool := membuf.NewPool( + membuf.WithBlockNum(0), + membuf.WithBlockSize(smallBlockSize), + ) + largeBlockBufPool := membuf.NewPool( + membuf.WithBlockNum(0), + membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc), + ) + output := &memKVsAndBuffers{} + p.beforeTest() + now := time.Now() + err = readAllData(ctx, store, dataFiles, statFiles, readRangeStart, readRangeEnd, smallBlockBufPool, largeBlockBufPool, output) + require.NoError(t, err) + output.build(ctx) + elapsed := time.Since(now) + p.afterTest() + t.Logf("readAllData time cost: %s, size: %d", elapsed.String(), output.size) +} diff --git a/br/pkg/lightning/backend/external/byte_reader.go b/br/pkg/lightning/backend/external/byte_reader.go index de9301206025a..53df918b012ba 100644 --- a/br/pkg/lightning/backend/external/byte_reader.go +++ b/br/pkg/lightning/backend/external/byte_reader.go @@ -31,8 +31,10 @@ import ( var ( // ConcurrentReaderBufferSizePerConc is the buffer size for concurrent reader per // concurrency. - ConcurrentReaderBufferSizePerConc = int(2 * size.MB) - readAllDataConcThreshold = uint64(16) + ConcurrentReaderBufferSizePerConc = int(8 * size.MB) + // in readAllData, expected concurrency less than this value will not use + // concurrent reader. + readAllDataConcThreshold = uint64(4) ) // byteReader provides structured reading on a byte stream of external storage. diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index cafe9cecff4f4..98fdc27dd1ff1 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -50,9 +50,11 @@ import ( const maxCloudStorageConnections = 1000 type memKVsAndBuffers struct { - mu sync.Mutex - keys [][]byte - values [][]byte + mu sync.Mutex + keys [][]byte + values [][]byte + // memKVBuffers contains two types of buffer, first half are used for small block + // buffer, second half are used for large one. memKVBuffers []*membuf.Buffer size int droppedSize int @@ -92,14 +94,15 @@ func (b *memKVsAndBuffers) build(ctx context.Context) { // Engine stored sorted key/value pairs in an external storage. type Engine struct { - storage storage.ExternalStorage - dataFiles []string - statsFiles []string - startKey []byte - endKey []byte - splitKeys [][]byte - regionSplitSize int64 - bufPool *membuf.Pool + storage storage.ExternalStorage + dataFiles []string + statsFiles []string + startKey []byte + endKey []byte + splitKeys [][]byte + regionSplitSize int64 + smallBlockBufPool *membuf.Pool + largeBlockBufPool *membuf.Pool memKVsAndBuffers memKVsAndBuffers @@ -126,7 +129,10 @@ type Engine struct { importedKVCount *atomic.Int64 } -const memLimit = 12 * units.GiB +const ( + memLimit = 12 * units.GiB + smallBlockSize = units.MiB +) // NewExternalEngine creates an (external) engine. func NewExternalEngine( @@ -156,7 +162,12 @@ func NewExternalEngine( endKey: endKey, splitKeys: splitKeys, regionSplitSize: regionSplitSize, - bufPool: membuf.NewPool( + smallBlockBufPool: membuf.NewPool( + membuf.WithBlockNum(0), + membuf.WithPoolMemoryLimiter(memLimiter), + membuf.WithBlockSize(smallBlockSize), + ), + largeBlockBufPool: membuf.NewPool( membuf.WithBlockNum(0), membuf.WithPoolMemoryLimiter(memLimiter), membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc), @@ -222,16 +233,28 @@ func getFilesReadConcurrency( } startOffs, endOffs := offsets[0], offsets[1] for i := range statsFiles { - result[i] = (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc) + expectedConc := (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc) // let the stat internals cover the [startKey, endKey) since seekPropsOffsets // always return an offset that is less than or equal to the key. - result[i] += 1 - logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency", - zap.String("filename", statsFiles[i]), - zap.Uint64("startOffset", startOffs[i]), - zap.Uint64("endOffset", endOffs[i]), - zap.Uint64("expected concurrency", result[i]), - ) + expectedConc += 1 + // readAllData will enable concurrent read and use large buffer if result[i] > 1 + // when expectedConc < readAllDataConcThreshold, we don't use concurrent read to + // reduce overhead + if expectedConc >= readAllDataConcThreshold { + result[i] = expectedConc + } else { + result[i] = 1 + } + // only log for files with expected concurrency > 1, to avoid too many logs + if expectedConc > 1 { + logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency", + zap.String("filename", statsFiles[i]), + zap.Uint64("startOffset", startOffs[i]), + zap.Uint64("endOffset", endOffs[i]), + zap.Uint64("expectedConc", expectedConc), + zap.Uint64("concurrency", result[i]), + ) + } } return result, startOffs, nil } @@ -252,7 +275,8 @@ func (e *Engine) loadBatchRegionData(ctx context.Context, startKey, endKey []byt e.statsFiles, startKey, endKey, - e.bufPool, + e.smallBlockBufPool, + e.largeBlockBufPool, &e.memKVsAndBuffers, ) if err != nil { @@ -443,9 +467,13 @@ func (e *Engine) SplitRanges( // Close implements common.Engine. func (e *Engine) Close() error { - if e.bufPool != nil { - e.bufPool.Destroy() - e.bufPool = nil + if e.smallBlockBufPool != nil { + e.smallBlockBufPool.Destroy() + e.smallBlockBufPool = nil + } + if e.largeBlockBufPool != nil { + e.largeBlockBufPool.Destroy() + e.largeBlockBufPool = nil } e.storage.Close() return nil @@ -453,11 +481,21 @@ func (e *Engine) Close() error { // Reset resets the memory buffer pool. func (e *Engine) Reset() error { - if e.bufPool != nil { - e.bufPool.Destroy() - memLimiter := membuf.NewLimiter(memLimit) - e.bufPool = membuf.NewPool( + memLimiter := membuf.NewLimiter(memLimit) + if e.smallBlockBufPool != nil { + e.smallBlockBufPool.Destroy() + e.smallBlockBufPool = membuf.NewPool( + membuf.WithBlockNum(0), membuf.WithPoolMemoryLimiter(memLimiter), + membuf.WithBlockSize(smallBlockSize), + ) + } + if e.largeBlockBufPool != nil { + e.largeBlockBufPool.Destroy() + e.largeBlockBufPool = membuf.NewPool( + membuf.WithBlockNum(0), + membuf.WithPoolMemoryLimiter(memLimiter), + membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc), ) } return nil diff --git a/br/pkg/lightning/backend/external/merge_v2.go b/br/pkg/lightning/backend/external/merge_v2.go index 4a5dea8e12f88..5f5a6833bb050 100644 --- a/br/pkg/lightning/backend/external/merge_v2.go +++ b/br/pkg/lightning/backend/external/merge_v2.go @@ -119,6 +119,7 @@ func MergeOverlappingFilesV2( curStart, curEnd, bufPool, + bufPool, loaded, ) if err1 != nil { diff --git a/br/pkg/lightning/backend/external/misc_bench_test.go b/br/pkg/lightning/backend/external/misc_bench_test.go index 59ff4cce60167..ddfcdb80e5dbb 100644 --- a/br/pkg/lightning/backend/external/misc_bench_test.go +++ b/br/pkg/lightning/backend/external/misc_bench_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "github.com/felixge/fgprof" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/intest" @@ -39,6 +40,7 @@ func openTestingStorage(t *testing.T) storage.ExternalStorage { } s, err := storage.NewFromURL(context.Background(), *testingStorageURI) require.NoError(t, err) + t.Cleanup(s.Close) return s } @@ -347,6 +349,48 @@ func createAscendingFiles( return kvCnt, minKey, maxKey } +type profiler struct { + onAndOffCPUProf bool + peakMemProf bool + + caseIdx int + + onAndOffCPUProfCloser func() error + heapProfDoneCh chan struct{} + heapProfWg *sync.WaitGroup +} + +func newProfiler(onAndOffCPUProf, peakMemProf bool) *profiler { + return &profiler{ + onAndOffCPUProf: onAndOffCPUProf, + peakMemProf: peakMemProf, + } +} + +func (p *profiler) beforeTest() { + p.caseIdx++ + if p.onAndOffCPUProf { + fileCPU, err := os.Create(fmt.Sprintf("on-and-off-cpu-%d.prof", p.caseIdx)) + intest.AssertNoError(err) + p.onAndOffCPUProfCloser = fgprof.Start(fileCPU, fgprof.FormatPprof) + } + if p.peakMemProf { + fileHeap := fmt.Sprintf("heap-%d.prof", p.caseIdx) + p.heapProfDoneCh, p.heapProfWg = recordHeapForMaxInUse(fileHeap) + } +} + +func (p *profiler) afterTest() { + if p.onAndOffCPUProf { + err := p.onAndOffCPUProfCloser() + intest.AssertNoError(err) + } + if p.peakMemProf { + close(p.heapProfDoneCh) + p.heapProfWg.Wait() + } +} + func recordHeapForMaxInUse(filename string) (chan struct{}, *sync.WaitGroup) { doneCh := make(chan struct{}) var wg sync.WaitGroup diff --git a/br/pkg/lightning/backend/external/reader.go b/br/pkg/lightning/backend/external/reader.go index 151d20736cd69..27a79cee02477 100644 --- a/br/pkg/lightning/backend/external/reader.go +++ b/br/pkg/lightning/backend/external/reader.go @@ -35,7 +35,8 @@ func readAllData( store storage.ExternalStorage, dataFiles, statsFiles []string, startKey, endKey []byte, - bufPool *membuf.Pool, + smallBlockBufPool *membuf.Pool, + largeBlockBufPool *membuf.Pool, output *memKVsAndBuffers, ) (err error) { task := log.BeginTask(logutil.Logger(ctx), "read all data") @@ -69,36 +70,59 @@ func readAllData( startKey, endKey, ) - // TODO(lance6716): refine adjust concurrency - for i, c := range concurrences { - if c < readAllDataConcThreshold { - concurrences[i] = 1 - } - } - if err != nil { return err } + eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx) - // limit the concurrency to avoid open too many connections at the same time - eg.SetLimit(1000) - for i := range dataFiles { - i := i + readConn := 1000 + readConn = min(readConn, len(dataFiles)) + taskCh := make(chan int) + output.memKVBuffers = make([]*membuf.Buffer, readConn*2) + for readIdx := 0; readIdx < readConn; readIdx++ { + readIdx := readIdx eg.Go(func() error { - err2 := readOneFile( - egCtx, - store, - dataFiles[i], - startKey, - endKey, - startOffsets[i], - concurrences[i], - bufPool, - output, - ) - return errors.Annotatef(err2, "failed to read file %s", dataFiles[i]) + output.memKVBuffers[readIdx] = smallBlockBufPool.NewBuffer() + output.memKVBuffers[readIdx+readConn] = largeBlockBufPool.NewBuffer() + smallBlockBuf := output.memKVBuffers[readIdx] + largeBlockBuf := output.memKVBuffers[readIdx+readConn] + + for { + select { + case <-egCtx.Done(): + return egCtx.Err() + case fileIdx, ok := <-taskCh: + if !ok { + return nil + } + err2 := readOneFile( + egCtx, + store, + dataFiles[fileIdx], + startKey, + endKey, + startOffsets[fileIdx], + concurrences[fileIdx], + smallBlockBuf, + largeBlockBuf, + output, + ) + if err2 != nil { + return errors.Annotatef(err2, "failed to read file %s", dataFiles[fileIdx]) + } + } + } }) } + + for fileIdx := range dataFiles { + select { + case <-egCtx.Done(): + return eg.Wait() + case taskCh <- fileIdx: + } + } + close(taskCh) return eg.Wait() } @@ -109,7 +133,8 @@ func readOneFile( startKey, endKey []byte, startOffset uint64, concurrency uint64, - bufPool *membuf.Pool, + smallBlockBuf *membuf.Buffer, + largeBlockBuf *membuf.Buffer, output *memKVsAndBuffers, ) error { readAndSortDurHist := metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("read_one_file") @@ -127,7 +152,7 @@ func readOneFile( dataFile, int(concurrency), ConcurrentReaderBufferSizePerConc, - bufPool.NewBuffer(), + largeBlockBuf, ) err = rd.byteReader.switchConcurrentMode(true) if err != nil { @@ -135,8 +160,6 @@ func readOneFile( } } - // this buffer is associated with data slices and will return to caller - memBuf := bufPool.NewBuffer() keys := make([][]byte, 0, 1024) values := make([][]byte, 0, 1024) size := 0 @@ -159,15 +182,14 @@ func readOneFile( } // TODO(lance6716): we are copying every KV from rd's buffer to memBuf, can we // directly read into memBuf? - keys = append(keys, memBuf.AddBytes(k)) - values = append(values, memBuf.AddBytes(v)) + keys = append(keys, smallBlockBuf.AddBytes(k)) + values = append(values, smallBlockBuf.AddBytes(v)) size += len(k) + len(v) } readAndSortDurHist.Observe(time.Since(ts).Seconds()) output.mu.Lock() output.keysPerFile = append(output.keysPerFile, keys) output.valuesPerFile = append(output.valuesPerFile, values) - output.memKVBuffers = append(output.memKVBuffers, memBuf) output.size += size output.droppedSizePerFile = append(output.droppedSizePerFile, droppedSize) output.mu.Unlock() diff --git a/br/pkg/lightning/backend/external/reader_test.go b/br/pkg/lightning/backend/external/reader_test.go index 12f6e5151ffb5..153b25f3b4faf 100644 --- a/br/pkg/lightning/backend/external/reader_test.go +++ b/br/pkg/lightning/backend/external/reader_test.go @@ -139,7 +139,11 @@ func TestReadLargeFile(t *testing.T) { failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/external/assertReloadAtMostOnce", "return()") defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/external/assertReloadAtMostOnce") - bufPool := membuf.NewPool( + smallBlockBufPool := membuf.NewPool( + membuf.WithBlockNum(0), + membuf.WithBlockSize(smallBlockSize), + ) + largeBlockBufPool := membuf.NewPool( membuf.WithBlockNum(0), membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc), ) @@ -147,7 +151,7 @@ func TestReadLargeFile(t *testing.T) { startKey := []byte("key000000") maxKey := []byte("key004998") endKey := []byte("key004999") - err = readAllData(ctx, memStore, datas, stats, startKey, endKey, bufPool, output) + err = readAllData(ctx, memStore, datas, stats, startKey, endKey, smallBlockBufPool, largeBlockBufPool, output) require.NoError(t, err) output.build(ctx) require.Equal(t, startKey, output.keys[0]) diff --git a/br/pkg/lightning/backend/external/testutil.go b/br/pkg/lightning/backend/external/testutil.go index fe4c882550bfe..38de84aaf9c55 100644 --- a/br/pkg/lightning/backend/external/testutil.go +++ b/br/pkg/lightning/backend/external/testutil.go @@ -79,6 +79,7 @@ func testReadAndCompare( curStart, curEnd, bufPool, + bufPool, loaded, ) require.NoError(t, err)