Skip to content

Commit

Permalink
restore: limit scan region concurency during restore (#50255)
Browse files Browse the repository at this point in the history
close #50280
  • Loading branch information
3pointer authored Jan 15, 2024
1 parent f6f4639 commit 2ca9f98
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 15 deletions.
13 changes: 5 additions & 8 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ const (
)

const (
importScanRegionTime = 20 * time.Second
gRPCBackOffMaxDelay = 3 * time.Second
gRPCBackOffMaxDelay = 3 * time.Second
// Todo: make it configable
gRPCTimeOut = 25 * time.Minute
)
Expand Down Expand Up @@ -545,11 +544,9 @@ func (importer *FileImporter) ImportSSTFiles(
}

err = utils.WithRetry(ctx, func() error {
tctx, cancel := context.WithTimeout(ctx, importScanRegionTime)
defer cancel()
// Scan regions covered by the file range
regionInfos, errScanRegion := split.PaginateScanRegion(
tctx, importer.metaClient, startKey, endKey, split.ScanRegionPaginationLimit)
ctx, importer.metaClient, startKey, endKey, split.ScanRegionPaginationLimit)
if errScanRegion != nil {
return errors.Trace(errScanRegion)
}
Expand Down Expand Up @@ -993,14 +990,14 @@ func (importer *FileImporter) downloadSSTV2(
} else {
importer.storeWorkerPoolRWLock.RUnlock()
}
defer func() {
workerCh <- struct{}{}
}()
select {
case <-ectx.Done():
return ectx.Err()
case <-workerCh:
}
defer func() {
workerCh <- struct{}{}
}()
for _, file := range files {
req, ok := downloadReqsMap[file.Name]
if !ok {
Expand Down
34 changes: 28 additions & 6 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils/iter"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/mathutil"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -44,6 +45,11 @@ const (
CoarseGrained Granularity = "coarse-grained"
)

const (
splitRegionKeysConcurrency = 8
splitRegionRangesConcurrency = 16
)

type SplitContext struct {
isRawKv bool
needScatter bool
Expand Down Expand Up @@ -99,12 +105,19 @@ func (rs *RegionSplitter) ExecuteSplit(
if errSplit != nil {
return errors.Trace(errSplit)
}
if len(sortedRanges) == 0 {
log.Info("skip split regions after sorted, no range")
return nil
}
sortedKeys := make([][]byte, 0, len(sortedRanges))
totalRangeSize := uint64(0)
for _, r := range sortedRanges {
sortedKeys = append(sortedKeys, r.EndKey)
totalRangeSize += r.Size
}
// need use first range's start key to scan region
// and the range size must be greater than 0 here
scanStartKey := sortedRanges[0].StartKey
sctx := SplitContext{
isRawKv: isRawKv,
needScatter: true,
Expand All @@ -115,7 +128,7 @@ func (rs *RegionSplitter) ExecuteSplit(
if granularity == string(CoarseGrained) {
return rs.executeSplitByRanges(ctx, sctx, sortedRanges)
}
return rs.executeSplitByKeys(ctx, sctx, sortedKeys)
return rs.executeSplitByKeys(ctx, sctx, scanStartKey, sortedKeys)
}

func (rs *RegionSplitter) executeSplitByRanges(
Expand Down Expand Up @@ -164,8 +177,9 @@ func (rs *RegionSplitter) executeSplitByRanges(
}
}
}

workerPool := utils.NewWorkerPool(uint(splitContext.storeCount), "split ranges")
// pd cannot handling too many scan regions requests.
poolSize := mathutil.Clamp(uint(splitContext.storeCount), 1, splitRegionRangesConcurrency)
workerPool := utils.NewWorkerPool(poolSize, "split ranges")
eg, ectx := errgroup.WithContext(ctx)
for rID, rgs := range splitRangeMap {
region := regionMap[rID]
Expand All @@ -184,6 +198,13 @@ func (rs *RegionSplitter) executeSplitByRanges(
rangeSize += rg.Size
allKeys = append(allKeys, rg.EndKey)
}
// need use first range's start key to scan region
// and the range size must be greater than 0 here
scanStartKey := ranges[0].StartKey
// if ranges is less than store count, we can't split it by range
if len(ranges) <= sctx.storeCount {
return rs.executeSplitByKeys(ectx, sctx, scanStartKey, allKeys)
}
expectSplitSize := rangeSize / uint64(sctx.storeCount)
size := uint64(0)
keys := make([][]byte, 0, sctx.storeCount)
Expand Down Expand Up @@ -216,7 +237,7 @@ func (rs *RegionSplitter) executeSplitByRanges(
}
sctx.onSplit(keys)
sctx.needScatter = false
return rs.executeSplitByKeys(ectx, sctx, allKeys)
return rs.executeSplitByKeys(ectx, sctx, scanStartKey, allKeys)
})
}
return eg.Wait()
Expand All @@ -236,11 +257,12 @@ func (rs *RegionSplitter) executeSplitByRanges(
func (rs *RegionSplitter) executeSplitByKeys(
ctx context.Context,
splitContext SplitContext,
scanStartKey []byte,
sortedKeys [][]byte,
) error {
var mutex sync.Mutex
startTime := time.Now()
minKey := codec.EncodeBytesExt(nil, sortedKeys[0], splitContext.isRawKv)
minKey := codec.EncodeBytesExt(nil, scanStartKey, splitContext.isRawKv)
maxKey := codec.EncodeBytesExt(nil, sortedKeys[len(sortedKeys)-1], splitContext.isRawKv)
scatterRegions := make([]*split.RegionInfo, 0)
regionsMap := make(map[uint64]*split.RegionInfo)
Expand All @@ -256,7 +278,7 @@ func (rs *RegionSplitter) executeSplitByKeys(
for _, region := range regions {
regionMap[region.Region.GetId()] = region
}
workerPool := utils.NewWorkerPool(8, "split keys")
workerPool := utils.NewWorkerPool(splitRegionKeysConcurrency, "split keys")
eg, ectx := errgroup.WithContext(ctx)
for regionID, splitKeys := range splitKeyMap {
region := regionMap[regionID]
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
const (
SplitRetryTimes = 32
SplitRetryInterval = 50 * time.Millisecond
SplitMaxRetryInterval = time.Second
SplitMaxRetryInterval = 4 * time.Second

SplitCheckMaxRetryTimes = 64
SplitCheckInterval = 8 * time.Millisecond
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit
c.InjectTimes -= 1
return nil, status.Error(codes.Unavailable, "not leader")
}
if len(key) != 0 && bytes.Equal(key, endKey) {
return nil, status.Error(codes.Internal, "key and endKey are the same")
}

infos := c.regionsInfo.ScanRange(key, endKey, limit)
regions := make([]*split.RegionInfo, 0, len(infos))
Expand Down Expand Up @@ -286,6 +289,19 @@ func (b *assertRetryLessThanBackoffer) NextBackoff(err error) time.Duration {
func (b *assertRetryLessThanBackoffer) Attempt() int {
return b.max - b.already
}
func TestScanEmptyRegion(t *testing.T) {
client := initTestClient(false)
ranges := initRanges()
// make ranges has only one
ranges = ranges[0:1]
rewriteRules := initRewriteRules()
regionSplitter := restore.NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.ExecuteSplit(ctx, ranges, rewriteRules, 0, "", false, func(key [][]byte) {})
// should not return error with only one range entry
require.NoError(t, err)
}

func TestScatterFinishInTime(t *testing.T) {
client := initTestClient(false)
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ func TestPaginateScanRegion(t *testing.T) {
require.NoError(t, err)
require.Equal(t, regions[1:2], batch)

_, err = split.PaginateScanRegion(
ctx, NewTestClient(stores, regionMap, 0), regions[1].Region.EndKey, regions[1].Region.EndKey, 3)
require.Error(t, err)

_, err = split.PaginateScanRegion(ctx, NewTestClient(stores, regionMap, 0), []byte{2}, []byte{1}, 3)
require.Error(t, err)
require.True(t, berrors.ErrRestoreInvalidRange.Equal(err))
Expand Down

0 comments on commit 2ca9f98

Please sign in to comment.