Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: split and scatter regions in batches (#33625) #34258

Merged
merged 3 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ const (
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB
// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096

propRangeIndex = "tikv.range_index"

Expand Down Expand Up @@ -1347,7 +1349,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
// split region by given ranges
for i := 0; i < maxRetryTimes; i++ {
err = local.SplitAndScatterRegionByRanges(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize)
err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
break
}
Expand Down
62 changes: 48 additions & 14 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,32 @@ var (
splitRegionBaseBackOffTime = time.Second
)

// TODO remove this file and use br internal functions
// This File include region split & scatter operation just like br.
// SplitAndScatterRegionInBatches splits&scatter regions in batches.
// Too many split&scatter requests may put a lot of pressure on TiKV and PD.
func (local *local) SplitAndScatterRegionInBatches(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
regionSplitSize int64,
batchCnt int,
) error {
for i := 0; i < len(ranges); i += batchCnt {
batch := ranges[i:]
if len(batch) > batchCnt {
batch = batch[:batchCnt]
}
if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil {
return errors.Trace(err)
}
}
return nil
}

// SplitAndScatterRegionByRanges include region split & scatter operation just like br.
// we can simply call br function, but we need to change some function signature of br
// When the ranges total size is small, we can skip the split to avoid generate empty regions.
// TODO: remove this file and use br internal functions
func (local *local) SplitAndScatterRegionByRanges(
ctx context.Context,
ranges []Range,
Expand Down Expand Up @@ -423,16 +445,17 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) {
}

func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
regionID := regionInfo.Region.GetId()
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
ok, err := local.isScatterRegionFinished(ctx, regionID)
if err != nil {
log.L().Warn("scatter region failed: do not have the region",
logutil.Region(regionInfo.Region))
ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo)
if ok {
return
}
if ok {
break
if err != nil {
if !utils.IsRetryableError(err) {
log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err))
return
}
log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err))
}
select {
case <-time.After(time.Second):
Expand All @@ -442,8 +465,8 @@ func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.
}
}

func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionID)
func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId())
if err != nil {
return false, err
}
Expand All @@ -461,9 +484,20 @@ func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64
return false, errors.Errorf("get operator error: %s", respErr.GetType())
}
// If the current operator of the region is not 'scatter-region', we could assume
// that 'scatter-operator' has finished or timeout
ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING
return ok, nil
// that 'scatter-operator' has finished.
if string(resp.GetDesc()) != "scatter-region" {
return true, nil
}
switch resp.GetStatus() {
case pdpb.OperatorStatus_RUNNING:
return false, nil
case pdpb.OperatorStatus_SUCCESS:
return true, nil
default:
log.L().Warn("scatter-region operator status is abnormal, will scatter region again",
logutil.Region(regionInfo.Region), zap.Stringer("status", resp.GetStatus()))
return false, local.splitCli.ScatterRegion(ctx, regionInfo)
}
}

func getSplitKeysByRanges(ranges []Range, regions []*split.RegionInfo) map[uint64][][]byte {
Expand Down
42 changes: 41 additions & 1 deletion br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package local
import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -296,7 +297,8 @@ func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo {
return &restore.RegionInfo{Region: r, Leader: l}
}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
// For keys ["", "aay", "bba", "bbh", "cca", ""], the key ranges of
// regions are [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ).
func initTestClient(keys [][]byte, hook clientHook) *testClient {
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Expand Down Expand Up @@ -567,6 +569,44 @@ func (s *localSuite) TestBatchSplitByRangesNoValidKeys(c *C) {
s.doTestBatchSplitRegionByRanges(context.Background(), c, &splitRegionNoValidKeyHook{returnErrTimes: math.MaxInt32}, ".*no valid key.*", defaultHook{})
}

func (s *localSuite) TestSplitAndScatterRegionInBatches(c *C) {
splitHook := defaultHook{}
deferFunc := splitHook.setup(c)
defer deferFunc()

keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")}
client := initTestClient(keys, nil)
local := &local{
splitCli: client,
g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone),
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var ranges []Range
for i := 0; i < 20; i++ {
ranges = append(ranges, Range{
start: []byte(fmt.Sprintf("a%02d", i)),
end: []byte(fmt.Sprintf("a%02d", i+1)),
})
}

err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4)
c.Assert(err, IsNil)

rangeStart := codec.EncodeBytes([]byte{}, []byte("a"))
rangeEnd := codec.EncodeBytes([]byte{}, []byte("b"))
regions, err := restore.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5)
c.Assert(err, IsNil)
result := [][]byte{[]byte("a"), []byte("a00"), []byte("a01"), []byte("a02"), []byte("a03"), []byte("a04"),
[]byte("a05"), []byte("a06"), []byte("a07"), []byte("a08"), []byte("a09"), []byte("a10"), []byte("a11"),
[]byte("a12"), []byte("a13"), []byte("a14"), []byte("a15"), []byte("a16"), []byte("a17"), []byte("a18"),
[]byte("a19"), []byte("a20"), []byte("b"),
}
checkRegionRanges(c, regions, result)
}

type reportAfterSplitHook struct {
noopHook
ch chan<- struct{}
Expand Down