From 2ca9f98f55db27ef68a81972d8973a59b86a2001 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 15 Jan 2024 15:25:45 +0800 Subject: [PATCH] restore: limit scan region concurency during restore (#50255) close pingcap/tidb#50280 --- br/pkg/restore/import.go | 13 +++++-------- br/pkg/restore/split.go | 34 ++++++++++++++++++++++++++++------ br/pkg/restore/split/split.go | 2 +- br/pkg/restore/split_test.go | 16 ++++++++++++++++ br/pkg/restore/util_test.go | 4 ++++ 5 files changed, 54 insertions(+), 15 deletions(-) diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index 842f3690db35f..9a2c5d06ed3d9 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -55,8 +55,7 @@ const ( ) const ( - importScanRegionTime = 20 * time.Second - gRPCBackOffMaxDelay = 3 * time.Second + gRPCBackOffMaxDelay = 3 * time.Second // Todo: make it configable gRPCTimeOut = 25 * time.Minute ) @@ -545,11 +544,9 @@ func (importer *FileImporter) ImportSSTFiles( } err = utils.WithRetry(ctx, func() error { - tctx, cancel := context.WithTimeout(ctx, importScanRegionTime) - defer cancel() // Scan regions covered by the file range regionInfos, errScanRegion := split.PaginateScanRegion( - tctx, importer.metaClient, startKey, endKey, split.ScanRegionPaginationLimit) + ctx, importer.metaClient, startKey, endKey, split.ScanRegionPaginationLimit) if errScanRegion != nil { return errors.Trace(errScanRegion) } @@ -993,14 +990,14 @@ func (importer *FileImporter) downloadSSTV2( } else { importer.storeWorkerPoolRWLock.RUnlock() } - defer func() { - workerCh <- struct{}{} - }() select { case <-ectx.Done(): return ectx.Err() case <-workerCh: } + defer func() { + workerCh <- struct{}{} + }() for _, file := range files { req, ok := downloadReqsMap[file.Name] if !ok { diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index ac5d711b309e6..913deedd2fef8 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" + "github.com/pingcap/tidb/pkg/util/mathutil" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -44,6 +45,11 @@ const ( CoarseGrained Granularity = "coarse-grained" ) +const ( + splitRegionKeysConcurrency = 8 + splitRegionRangesConcurrency = 16 +) + type SplitContext struct { isRawKv bool needScatter bool @@ -99,12 +105,19 @@ func (rs *RegionSplitter) ExecuteSplit( if errSplit != nil { return errors.Trace(errSplit) } + if len(sortedRanges) == 0 { + log.Info("skip split regions after sorted, no range") + return nil + } sortedKeys := make([][]byte, 0, len(sortedRanges)) totalRangeSize := uint64(0) for _, r := range sortedRanges { sortedKeys = append(sortedKeys, r.EndKey) totalRangeSize += r.Size } + // need use first range's start key to scan region + // and the range size must be greater than 0 here + scanStartKey := sortedRanges[0].StartKey sctx := SplitContext{ isRawKv: isRawKv, needScatter: true, @@ -115,7 +128,7 @@ func (rs *RegionSplitter) ExecuteSplit( if granularity == string(CoarseGrained) { return rs.executeSplitByRanges(ctx, sctx, sortedRanges) } - return rs.executeSplitByKeys(ctx, sctx, sortedKeys) + return rs.executeSplitByKeys(ctx, sctx, scanStartKey, sortedKeys) } func (rs *RegionSplitter) executeSplitByRanges( @@ -164,8 +177,9 @@ func (rs *RegionSplitter) executeSplitByRanges( } } } - - workerPool := utils.NewWorkerPool(uint(splitContext.storeCount), "split ranges") + // pd cannot handling too many scan regions requests. + poolSize := mathutil.Clamp(uint(splitContext.storeCount), 1, splitRegionRangesConcurrency) + workerPool := utils.NewWorkerPool(poolSize, "split ranges") eg, ectx := errgroup.WithContext(ctx) for rID, rgs := range splitRangeMap { region := regionMap[rID] @@ -184,6 +198,13 @@ func (rs *RegionSplitter) executeSplitByRanges( rangeSize += rg.Size allKeys = append(allKeys, rg.EndKey) } + // need use first range's start key to scan region + // and the range size must be greater than 0 here + scanStartKey := ranges[0].StartKey + // if ranges is less than store count, we can't split it by range + if len(ranges) <= sctx.storeCount { + return rs.executeSplitByKeys(ectx, sctx, scanStartKey, allKeys) + } expectSplitSize := rangeSize / uint64(sctx.storeCount) size := uint64(0) keys := make([][]byte, 0, sctx.storeCount) @@ -216,7 +237,7 @@ func (rs *RegionSplitter) executeSplitByRanges( } sctx.onSplit(keys) sctx.needScatter = false - return rs.executeSplitByKeys(ectx, sctx, allKeys) + return rs.executeSplitByKeys(ectx, sctx, scanStartKey, allKeys) }) } return eg.Wait() @@ -236,11 +257,12 @@ func (rs *RegionSplitter) executeSplitByRanges( func (rs *RegionSplitter) executeSplitByKeys( ctx context.Context, splitContext SplitContext, + scanStartKey []byte, sortedKeys [][]byte, ) error { var mutex sync.Mutex startTime := time.Now() - minKey := codec.EncodeBytesExt(nil, sortedKeys[0], splitContext.isRawKv) + minKey := codec.EncodeBytesExt(nil, scanStartKey, splitContext.isRawKv) maxKey := codec.EncodeBytesExt(nil, sortedKeys[len(sortedKeys)-1], splitContext.isRawKv) scatterRegions := make([]*split.RegionInfo, 0) regionsMap := make(map[uint64]*split.RegionInfo) @@ -256,7 +278,7 @@ func (rs *RegionSplitter) executeSplitByKeys( for _, region := range regions { regionMap[region.Region.GetId()] = region } - workerPool := utils.NewWorkerPool(8, "split keys") + workerPool := utils.NewWorkerPool(splitRegionKeysConcurrency, "split keys") eg, ectx := errgroup.WithContext(ctx) for regionID, splitKeys := range splitKeyMap { region := regionMap[regionID] diff --git a/br/pkg/restore/split/split.go b/br/pkg/restore/split/split.go index 95704383582d9..99d804aa07170 100644 --- a/br/pkg/restore/split/split.go +++ b/br/pkg/restore/split/split.go @@ -27,7 +27,7 @@ var ( const ( SplitRetryTimes = 32 SplitRetryInterval = 50 * time.Millisecond - SplitMaxRetryInterval = time.Second + SplitMaxRetryInterval = 4 * time.Second SplitCheckMaxRetryTimes = 64 SplitCheckInterval = 8 * time.Millisecond diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 793d3e1a36d05..aeee4998388be 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -230,6 +230,9 @@ func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit c.InjectTimes -= 1 return nil, status.Error(codes.Unavailable, "not leader") } + if len(key) != 0 && bytes.Equal(key, endKey) { + return nil, status.Error(codes.Internal, "key and endKey are the same") + } infos := c.regionsInfo.ScanRange(key, endKey, limit) regions := make([]*split.RegionInfo, 0, len(infos)) @@ -286,6 +289,19 @@ func (b *assertRetryLessThanBackoffer) NextBackoff(err error) time.Duration { func (b *assertRetryLessThanBackoffer) Attempt() int { return b.max - b.already } +func TestScanEmptyRegion(t *testing.T) { + client := initTestClient(false) + ranges := initRanges() + // make ranges has only one + ranges = ranges[0:1] + rewriteRules := initRewriteRules() + regionSplitter := restore.NewRegionSplitter(client) + + ctx := context.Background() + err := regionSplitter.ExecuteSplit(ctx, ranges, rewriteRules, 0, "", false, func(key [][]byte) {}) + // should not return error with only one range entry + require.NoError(t, err) +} func TestScatterFinishInTime(t *testing.T) { client := initTestClient(false) diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index 43c16a656ed73..2740594a79cf1 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -282,6 +282,10 @@ func TestPaginateScanRegion(t *testing.T) { require.NoError(t, err) require.Equal(t, regions[1:2], batch) + _, err = split.PaginateScanRegion( + ctx, NewTestClient(stores, regionMap, 0), regions[1].Region.EndKey, regions[1].Region.EndKey, 3) + require.Error(t, err) + _, err = split.PaginateScanRegion(ctx, NewTestClient(stores, regionMap, 0), []byte{2}, []byte{1}, 3) require.Error(t, err) require.True(t, berrors.ErrRestoreInvalidRange.Equal(err))