From 91a5f9fa29a0aa778cf86890eacf6ef1112eb7f2 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 10 Jan 2024 14:11:46 +0800 Subject: [PATCH 1/9] limit scan region concurency during restore --- br/pkg/restore/split.go | 13 ++++++++++--- br/pkg/restore/split/split.go | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index ac5d711b309e6..193f1b3d7385f 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" + "github.com/pingcap/tidb/pkg/util/mathutil" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -44,6 +45,11 @@ const ( CoarseGrained Granularity = "coarse-grained" ) +const ( + splitRegionKeysConcurrency = 8 + splitRegionRangesConcurrency = 16 +) + type SplitContext struct { isRawKv bool needScatter bool @@ -164,8 +170,9 @@ func (rs *RegionSplitter) executeSplitByRanges( } } } - - workerPool := utils.NewWorkerPool(uint(splitContext.storeCount), "split ranges") + // pd cannot handling too many scan regions requests. + poolSize := mathutil.Min(uint(splitContext.storeCount), splitRegionRangesConcurrency) + workerPool := utils.NewWorkerPool(poolSize, "split ranges") eg, ectx := errgroup.WithContext(ctx) for rID, rgs := range splitRangeMap { region := regionMap[rID] @@ -256,7 +263,7 @@ func (rs *RegionSplitter) executeSplitByKeys( for _, region := range regions { regionMap[region.Region.GetId()] = region } - workerPool := utils.NewWorkerPool(8, "split keys") + workerPool := utils.NewWorkerPool(splitRegionKeysConcurrency, "split keys") eg, ectx := errgroup.WithContext(ctx) for regionID, splitKeys := range splitKeyMap { region := regionMap[regionID] diff --git a/br/pkg/restore/split/split.go b/br/pkg/restore/split/split.go index 95704383582d9..99d804aa07170 100644 --- a/br/pkg/restore/split/split.go +++ b/br/pkg/restore/split/split.go @@ -27,7 +27,7 @@ var ( const ( SplitRetryTimes = 32 SplitRetryInterval = 50 * time.Millisecond - SplitMaxRetryInterval = time.Second + SplitMaxRetryInterval = 4 * time.Second SplitCheckMaxRetryTimes = 64 SplitCheckInterval = 8 * time.Millisecond From fef1e180f8c0364a242a7c7c693b389b0bc5f713 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 10 Jan 2024 15:46:38 +0800 Subject: [PATCH 2/9] fix unstable test --- br/pkg/restore/split.go | 9 ++++++--- br/pkg/restore/util_test.go | 4 ++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 193f1b3d7385f..47934458ad669 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -111,6 +111,7 @@ func (rs *RegionSplitter) ExecuteSplit( sortedKeys = append(sortedKeys, r.EndKey) totalRangeSize += r.Size } + scanStartKey := sortedRanges[0].StartKey sctx := SplitContext{ isRawKv: isRawKv, needScatter: true, @@ -121,7 +122,7 @@ func (rs *RegionSplitter) ExecuteSplit( if granularity == string(CoarseGrained) { return rs.executeSplitByRanges(ctx, sctx, sortedRanges) } - return rs.executeSplitByKeys(ctx, sctx, sortedKeys) + return rs.executeSplitByKeys(ctx, sctx, scanStartKey, sortedKeys) } func (rs *RegionSplitter) executeSplitByRanges( @@ -191,6 +192,7 @@ func (rs *RegionSplitter) executeSplitByRanges( rangeSize += rg.Size allKeys = append(allKeys, rg.EndKey) } + scanStartKey := ranges[0].StartKey expectSplitSize := rangeSize / uint64(sctx.storeCount) size := uint64(0) keys := make([][]byte, 0, sctx.storeCount) @@ -223,7 +225,7 @@ func (rs *RegionSplitter) executeSplitByRanges( } sctx.onSplit(keys) sctx.needScatter = false - return rs.executeSplitByKeys(ectx, sctx, allKeys) + return rs.executeSplitByKeys(ectx, sctx, scanStartKey, allKeys) }) } return eg.Wait() @@ -243,11 +245,12 @@ func (rs *RegionSplitter) executeSplitByRanges( func (rs *RegionSplitter) executeSplitByKeys( ctx context.Context, splitContext SplitContext, + scanStartKey []byte, sortedKeys [][]byte, ) error { var mutex sync.Mutex startTime := time.Now() - minKey := codec.EncodeBytesExt(nil, sortedKeys[0], splitContext.isRawKv) + minKey := codec.EncodeBytesExt(nil, scanStartKey, splitContext.isRawKv) maxKey := codec.EncodeBytesExt(nil, sortedKeys[len(sortedKeys)-1], splitContext.isRawKv) scatterRegions := make([]*split.RegionInfo, 0) regionsMap := make(map[uint64]*split.RegionInfo) diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index 43c16a656ed73..f7527c18a191f 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -282,6 +282,10 @@ func TestPaginateScanRegion(t *testing.T) { require.NoError(t, err) require.Equal(t, regions[1:2], batch) + batch, err = split.PaginateScanRegion( + ctx, NewTestClient(stores, regionMap, 0), regions[1].Region.EndKey, regions[1].Region.EndKey, 3) + require.Error(t, err) + _, err = split.PaginateScanRegion(ctx, NewTestClient(stores, regionMap, 0), []byte{2}, []byte{1}, 3) require.Error(t, err) require.True(t, berrors.ErrRestoreInvalidRange.Equal(err)) From bdc0733ea54bdfd2e6c456ebcdfd6e78ca56c969 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 10 Jan 2024 16:52:19 +0800 Subject: [PATCH 3/9] add test case --- br/pkg/restore/split.go | 8 ++++++++ br/pkg/restore/split_test.go | 16 ++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 47934458ad669..3b0648be1a1c7 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -105,12 +105,18 @@ func (rs *RegionSplitter) ExecuteSplit( if errSplit != nil { return errors.Trace(errSplit) } + if len(sortedRanges) == 0 { + log.Info("skip split regions after sorted, no range") + return nil + } sortedKeys := make([][]byte, 0, len(sortedRanges)) totalRangeSize := uint64(0) for _, r := range sortedRanges { sortedKeys = append(sortedKeys, r.EndKey) totalRangeSize += r.Size } + // need use first range's start key to scan region + // and the range size must be greater than 0 here scanStartKey := sortedRanges[0].StartKey sctx := SplitContext{ isRawKv: isRawKv, @@ -192,6 +198,8 @@ func (rs *RegionSplitter) executeSplitByRanges( rangeSize += rg.Size allKeys = append(allKeys, rg.EndKey) } + // need use first range's start key to scan region + // and the range size must be greater than 0 here scanStartKey := ranges[0].StartKey expectSplitSize := rangeSize / uint64(sctx.storeCount) size := uint64(0) diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 793d3e1a36d05..3e852eb13acfb 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -230,6 +230,9 @@ func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit c.InjectTimes -= 1 return nil, status.Error(codes.Unavailable, "not leader") } + if bytes.Compare(key, endKey) == 0 { + return nil, status.Error(codes.Internal, "key and endKey are the same") + } infos := c.regionsInfo.ScanRange(key, endKey, limit) regions := make([]*split.RegionInfo, 0, len(infos)) @@ -286,6 +289,19 @@ func (b *assertRetryLessThanBackoffer) NextBackoff(err error) time.Duration { func (b *assertRetryLessThanBackoffer) Attempt() int { return b.max - b.already } +func TestScanEmptyRegion(t *testing.T) { + client := initTestClient(false) + ranges := initRanges() + // make ranges has only one + ranges = ranges[0:1] + rewriteRules := initRewriteRules() + regionSplitter := restore.NewRegionSplitter(client) + + ctx := context.Background() + err := regionSplitter.ExecuteSplit(ctx, ranges, rewriteRules, 0, "", false, func(key [][]byte) {}) + // should not return error with only one range entry + require.NoError(t, err) +} func TestScatterFinishInTime(t *testing.T) { client := initTestClient(false) From 55e45453fd9e983d2003fa0bd80a5e70c43ddfdb Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 10 Jan 2024 17:57:55 +0800 Subject: [PATCH 4/9] update --- br/pkg/restore/split.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 3b0648be1a1c7..43089c65f31f9 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -201,6 +201,10 @@ func (rs *RegionSplitter) executeSplitByRanges( // need use first range's start key to scan region // and the range size must be greater than 0 here scanStartKey := ranges[0].StartKey + // if ranges is less than store count, we can't split it by range + if len(ranges) <= sctx.storeCount { + return rs.executeSplitByKeys(ectx, sctx, scanStartKey, allKeys) + } expectSplitSize := rangeSize / uint64(sctx.storeCount) size := uint64(0) keys := make([][]byte, 0, sctx.storeCount) From eea825f57cedd0bd63cb1270f8d71e8a8c7ca9ac Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 11 Jan 2024 16:00:49 +0800 Subject: [PATCH 5/9] remove scan region timeout --- br/pkg/restore/import.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index 842f3690db35f..928021ec35542 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -55,8 +55,7 @@ const ( ) const ( - importScanRegionTime = 20 * time.Second - gRPCBackOffMaxDelay = 3 * time.Second + gRPCBackOffMaxDelay = 3 * time.Second // Todo: make it configable gRPCTimeOut = 25 * time.Minute ) @@ -545,11 +544,9 @@ func (importer *FileImporter) ImportSSTFiles( } err = utils.WithRetry(ctx, func() error { - tctx, cancel := context.WithTimeout(ctx, importScanRegionTime) - defer cancel() // Scan regions covered by the file range regionInfos, errScanRegion := split.PaginateScanRegion( - tctx, importer.metaClient, startKey, endKey, split.ScanRegionPaginationLimit) + ctx, importer.metaClient, startKey, endKey, split.ScanRegionPaginationLimit) if errScanRegion != nil { return errors.Trace(errScanRegion) } From 8ae378e353fb90fc5a15edf140060edb8235e7f2 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 11 Jan 2024 17:13:45 +0800 Subject: [PATCH 6/9] reorder the token reset function --- br/pkg/restore/import.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index 928021ec35542..9a2c5d06ed3d9 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -990,14 +990,14 @@ func (importer *FileImporter) downloadSSTV2( } else { importer.storeWorkerPoolRWLock.RUnlock() } - defer func() { - workerCh <- struct{}{} - }() select { case <-ectx.Done(): return ectx.Err() case <-workerCh: } + defer func() { + workerCh <- struct{}{} + }() for _, file := range files { req, ok := downloadReqsMap[file.Name] if !ok { From 124f351afc26dddd6b6caf3241504ae6f782a62e Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 11 Jan 2024 17:17:42 +0800 Subject: [PATCH 7/9] udpate --- br/pkg/restore/split.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 43089c65f31f9..913deedd2fef8 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -178,7 +178,7 @@ func (rs *RegionSplitter) executeSplitByRanges( } } // pd cannot handling too many scan regions requests. - poolSize := mathutil.Min(uint(splitContext.storeCount), splitRegionRangesConcurrency) + poolSize := mathutil.Clamp(uint(splitContext.storeCount), 1, splitRegionRangesConcurrency) workerPool := utils.NewWorkerPool(poolSize, "split ranges") eg, ectx := errgroup.WithContext(ctx) for rID, rgs := range splitRangeMap { From 58cbc3af46b22a1313b29617b5a224de93cabbfe Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 11 Jan 2024 18:05:07 +0800 Subject: [PATCH 8/9] fix build --- br/pkg/restore/split_test.go | 2 +- br/pkg/restore/util_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 3e852eb13acfb..fd4d2c0ce93f8 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -230,7 +230,7 @@ func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit c.InjectTimes -= 1 return nil, status.Error(codes.Unavailable, "not leader") } - if bytes.Compare(key, endKey) == 0 { + if bytes.Equal(key, endKey) { return nil, status.Error(codes.Internal, "key and endKey are the same") } diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index f7527c18a191f..2740594a79cf1 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -282,7 +282,7 @@ func TestPaginateScanRegion(t *testing.T) { require.NoError(t, err) require.Equal(t, regions[1:2], batch) - batch, err = split.PaginateScanRegion( + _, err = split.PaginateScanRegion( ctx, NewTestClient(stores, regionMap, 0), regions[1].Region.EndKey, regions[1].Region.EndKey, 3) require.Error(t, err) From d7df3541b1d0b1abecbee69360992dfed54b6ffa Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 15 Jan 2024 12:04:15 +0800 Subject: [PATCH 9/9] fix test --- br/pkg/restore/split_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index fd4d2c0ce93f8..aeee4998388be 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -230,7 +230,7 @@ func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit c.InjectTimes -= 1 return nil, status.Error(codes.Unavailable, "not leader") } - if bytes.Equal(key, endKey) { + if len(key) != 0 && bytes.Equal(key, endKey) { return nil, status.Error(codes.Internal, "key and endKey are the same") }