diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index f34a31da8ee8a..4fbf8ef95da0d 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -87,6 +87,8 @@ const ( // lower the max-key-count to avoid tikv trigger region auto split regionMaxKeyCount = 1_280_000 defaultRegionSplitSize = 96 * units.MiB + // The max ranges count in a batch to split and scatter. + maxBatchSplitRanges = 4096 propRangeIndex = "tikv.range_index" @@ -1347,7 +1349,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys // split region by given ranges for i := 0; i < maxRetryTimes; i++ { - err = local.SplitAndScatterRegionByRanges(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize) + err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges) if err == nil || common.IsContextCanceledError(err) { break } diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index bc7a7a65d2a4c..92c8824c7dc91 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -58,10 +58,32 @@ var ( splitRegionBaseBackOffTime = time.Second ) -// TODO remove this file and use br internal functions -// This File include region split & scatter operation just like br. +// SplitAndScatterRegionInBatches splits&scatter regions in batches. +// Too many split&scatter requests may put a lot of pressure on TiKV and PD. +func (local *local) SplitAndScatterRegionInBatches( + ctx context.Context, + ranges []Range, + tableInfo *checkpoints.TidbTableInfo, + needSplit bool, + regionSplitSize int64, + batchCnt int, +) error { + for i := 0; i < len(ranges); i += batchCnt { + batch := ranges[i:] + if len(batch) > batchCnt { + batch = batch[:batchCnt] + } + if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil { + return errors.Trace(err) + } + } + return nil +} + +// SplitAndScatterRegionByRanges include region split & scatter operation just like br. // we can simply call br function, but we need to change some function signature of br // When the ranges total size is small, we can skip the split to avoid generate empty regions. +// TODO: remove this file and use br internal functions func (local *local) SplitAndScatterRegionByRanges( ctx context.Context, ranges []Range, @@ -423,16 +445,17 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) { } func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) { - regionID := regionInfo.Region.GetId() for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ { - ok, err := local.isScatterRegionFinished(ctx, regionID) - if err != nil { - log.L().Warn("scatter region failed: do not have the region", - logutil.Region(regionInfo.Region)) + ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo) + if ok { return } - if ok { - break + if err != nil { + if !utils.IsRetryableError(err) { + log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err)) + return + } + log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err)) } select { case <-time.After(time.Second): @@ -442,8 +465,8 @@ func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split. } } -func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) { - resp, err := local.splitCli.GetOperator(ctx, regionID) +func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { + resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId()) if err != nil { return false, err } @@ -461,9 +484,20 @@ func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64 return false, errors.Errorf("get operator error: %s", respErr.GetType()) } // If the current operator of the region is not 'scatter-region', we could assume - // that 'scatter-operator' has finished or timeout - ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING - return ok, nil + // that 'scatter-operator' has finished. + if string(resp.GetDesc()) != "scatter-region" { + return true, nil + } + switch resp.GetStatus() { + case pdpb.OperatorStatus_RUNNING: + return false, nil + case pdpb.OperatorStatus_SUCCESS: + return true, nil + default: + log.L().Warn("scatter-region operator status is abnormal, will scatter region again", + logutil.Region(regionInfo.Region), zap.Stringer("status", resp.GetStatus())) + return false, local.splitCli.ScatterRegion(ctx, regionInfo) + } } func getSplitKeysByRanges(ranges []Range, regions []*split.RegionInfo) map[uint64][][]byte { diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 52a9b71286087..560e7ca5a61f4 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -17,6 +17,7 @@ package local import ( "bytes" "context" + "fmt" "math" "math/rand" "sort" @@ -296,7 +297,8 @@ func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo { return &restore.RegionInfo{Region: r, Leader: l} } -// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) +// For keys ["", "aay", "bba", "bbh", "cca", ""], the key ranges of +// regions are [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ). func initTestClient(keys [][]byte, hook clientHook) *testClient { peers := make([]*metapb.Peer, 1) peers[0] = &metapb.Peer{ @@ -567,6 +569,44 @@ func (s *localSuite) TestBatchSplitByRangesNoValidKeys(c *C) { s.doTestBatchSplitRegionByRanges(context.Background(), c, &splitRegionNoValidKeyHook{returnErrTimes: math.MaxInt32}, ".*no valid key.*", defaultHook{}) } +func (s *localSuite) TestSplitAndScatterRegionInBatches(c *C) { + splitHook := defaultHook{} + deferFunc := splitHook.setup(c) + defer deferFunc() + + keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")} + client := initTestClient(keys, nil) + local := &local{ + splitCli: client, + g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ranges []Range + for i := 0; i < 20; i++ { + ranges = append(ranges, Range{ + start: []byte(fmt.Sprintf("a%02d", i)), + end: []byte(fmt.Sprintf("a%02d", i+1)), + }) + } + + err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4) + c.Assert(err, IsNil) + + rangeStart := codec.EncodeBytes([]byte{}, []byte("a")) + rangeEnd := codec.EncodeBytes([]byte{}, []byte("b")) + regions, err := restore.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5) + c.Assert(err, IsNil) + result := [][]byte{[]byte("a"), []byte("a00"), []byte("a01"), []byte("a02"), []byte("a03"), []byte("a04"), + []byte("a05"), []byte("a06"), []byte("a07"), []byte("a08"), []byte("a09"), []byte("a10"), []byte("a11"), + []byte("a12"), []byte("a13"), []byte("a14"), []byte("a15"), []byte("a16"), []byte("a17"), []byte("a18"), + []byte("a19"), []byte("a20"), []byte("b"), + } + checkRegionRanges(c, regions, result) +} + type reportAfterSplitHook struct { noopHook ch chan<- struct{}