From 663a90a06006137b27ad0bbb2fb406869a54665d Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Tue, 26 Apr 2022 23:16:51 +0800 Subject: [PATCH 1/2] cherry pick #33625 to release-5.4 Signed-off-by: ti-srebot --- br/pkg/lightning/backend/local/local.go | 4 +- br/pkg/lightning/backend/local/localhelper.go | 62 ++++++++++++++----- .../backend/local/localhelper_test.go | 42 ++++++++++++- 3 files changed, 92 insertions(+), 16 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index b4a5826e5651f..74a600d849fc6 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -87,6 +87,8 @@ const ( // lower the max-key-count to avoid tikv trigger region auto split regionMaxKeyCount = 1_280_000 defaultRegionSplitSize = 96 * units.MiB + // The max ranges count in a batch to split and scatter. + maxBatchSplitRanges = 4096 propRangeIndex = "tikv.range_index" @@ -1347,7 +1349,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys // split region by given ranges for i := 0; i < maxRetryTimes; i++ { - err = local.SplitAndScatterRegionByRanges(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize) + err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges) if err == nil || common.IsContextCanceledError(err) { break } diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index bc7a7a65d2a4c..92c8824c7dc91 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -58,10 +58,32 @@ var ( splitRegionBaseBackOffTime = time.Second ) -// TODO remove this file and use br internal functions -// This File include region split & scatter operation just like br. +// SplitAndScatterRegionInBatches splits&scatter regions in batches. +// Too many split&scatter requests may put a lot of pressure on TiKV and PD. +func (local *local) SplitAndScatterRegionInBatches( + ctx context.Context, + ranges []Range, + tableInfo *checkpoints.TidbTableInfo, + needSplit bool, + regionSplitSize int64, + batchCnt int, +) error { + for i := 0; i < len(ranges); i += batchCnt { + batch := ranges[i:] + if len(batch) > batchCnt { + batch = batch[:batchCnt] + } + if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil { + return errors.Trace(err) + } + } + return nil +} + +// SplitAndScatterRegionByRanges include region split & scatter operation just like br. // we can simply call br function, but we need to change some function signature of br // When the ranges total size is small, we can skip the split to avoid generate empty regions. +// TODO: remove this file and use br internal functions func (local *local) SplitAndScatterRegionByRanges( ctx context.Context, ranges []Range, @@ -423,16 +445,17 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) { } func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) { - regionID := regionInfo.Region.GetId() for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ { - ok, err := local.isScatterRegionFinished(ctx, regionID) - if err != nil { - log.L().Warn("scatter region failed: do not have the region", - logutil.Region(regionInfo.Region)) + ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo) + if ok { return } - if ok { - break + if err != nil { + if !utils.IsRetryableError(err) { + log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err)) + return + } + log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err)) } select { case <-time.After(time.Second): @@ -442,8 +465,8 @@ func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split. } } -func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) { - resp, err := local.splitCli.GetOperator(ctx, regionID) +func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { + resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId()) if err != nil { return false, err } @@ -461,9 +484,20 @@ func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64 return false, errors.Errorf("get operator error: %s", respErr.GetType()) } // If the current operator of the region is not 'scatter-region', we could assume - // that 'scatter-operator' has finished or timeout - ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING - return ok, nil + // that 'scatter-operator' has finished. + if string(resp.GetDesc()) != "scatter-region" { + return true, nil + } + switch resp.GetStatus() { + case pdpb.OperatorStatus_RUNNING: + return false, nil + case pdpb.OperatorStatus_SUCCESS: + return true, nil + default: + log.L().Warn("scatter-region operator status is abnormal, will scatter region again", + logutil.Region(regionInfo.Region), zap.Stringer("status", resp.GetStatus())) + return false, local.splitCli.ScatterRegion(ctx, regionInfo) + } } func getSplitKeysByRanges(ranges []Range, regions []*split.RegionInfo) map[uint64][][]byte { diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 52a9b71286087..1bc9d8ee3e8f9 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -17,6 +17,7 @@ package local import ( "bytes" "context" + "fmt" "math" "math/rand" "sort" @@ -296,7 +297,8 @@ func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo { return &restore.RegionInfo{Region: r, Leader: l} } -// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) +// For keys ["", "aay", "bba", "bbh", "cca", ""], the key ranges of +// regions are [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ). func initTestClient(keys [][]byte, hook clientHook) *testClient { peers := make([]*metapb.Peer, 1) peers[0] = &metapb.Peer{ @@ -567,6 +569,44 @@ func (s *localSuite) TestBatchSplitByRangesNoValidKeys(c *C) { s.doTestBatchSplitRegionByRanges(context.Background(), c, &splitRegionNoValidKeyHook{returnErrTimes: math.MaxInt32}, ".*no valid key.*", defaultHook{}) } +func TestSplitAndScatterRegionInBatches(t *testing.T) { + splitHook := defaultHook{} + deferFunc := splitHook.setup(t) + defer deferFunc() + + keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")} + client := initTestClient(keys, nil) + local := &local{ + splitCli: client, + g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ranges []Range + for i := 0; i < 20; i++ { + ranges = append(ranges, Range{ + start: []byte(fmt.Sprintf("a%02d", i)), + end: []byte(fmt.Sprintf("a%02d", i+1)), + }) + } + + err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4) + require.NoError(t, err) + + rangeStart := codec.EncodeBytes([]byte{}, []byte("a")) + rangeEnd := codec.EncodeBytes([]byte{}, []byte("b")) + regions, err := restore.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5) + require.NoError(t, err) + result := [][]byte{[]byte("a"), []byte("a00"), []byte("a01"), []byte("a02"), []byte("a03"), []byte("a04"), + []byte("a05"), []byte("a06"), []byte("a07"), []byte("a08"), []byte("a09"), []byte("a10"), []byte("a11"), + []byte("a12"), []byte("a13"), []byte("a14"), []byte("a15"), []byte("a16"), []byte("a17"), []byte("a18"), + []byte("a19"), []byte("a20"), []byte("b"), + } + checkRegionRanges(t, regions, result) +} + type reportAfterSplitHook struct { noopHook ch chan<- struct{} From dafee545eb4c1a50c384efb5c0a6e13b6e4926a5 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Mon, 27 Jun 2022 14:13:16 +0800 Subject: [PATCH 2/2] fix test conflicts --- br/pkg/lightning/backend/local/localhelper_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 1bc9d8ee3e8f9..560e7ca5a61f4 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -569,9 +569,9 @@ func (s *localSuite) TestBatchSplitByRangesNoValidKeys(c *C) { s.doTestBatchSplitRegionByRanges(context.Background(), c, &splitRegionNoValidKeyHook{returnErrTimes: math.MaxInt32}, ".*no valid key.*", defaultHook{}) } -func TestSplitAndScatterRegionInBatches(t *testing.T) { +func (s *localSuite) TestSplitAndScatterRegionInBatches(c *C) { splitHook := defaultHook{} - deferFunc := splitHook.setup(t) + deferFunc := splitHook.setup(c) defer deferFunc() keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")} @@ -593,18 +593,18 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) { } err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4) - require.NoError(t, err) + c.Assert(err, IsNil) rangeStart := codec.EncodeBytes([]byte{}, []byte("a")) rangeEnd := codec.EncodeBytes([]byte{}, []byte("b")) regions, err := restore.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5) - require.NoError(t, err) + c.Assert(err, IsNil) result := [][]byte{[]byte("a"), []byte("a00"), []byte("a01"), []byte("a02"), []byte("a03"), []byte("a04"), []byte("a05"), []byte("a06"), []byte("a07"), []byte("a08"), []byte("a09"), []byte("a10"), []byte("a11"), []byte("a12"), []byte("a13"), []byte("a14"), []byte("a15"), []byte("a16"), []byte("a17"), []byte("a18"), []byte("a19"), []byte("a20"), []byte("b"), } - checkRegionRanges(t, regions, result) + checkRegionRanges(c, regions, result) } type reportAfterSplitHook struct {