diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index f1074a14163a0..78f9c14db30ba 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1069,6 +1069,7 @@ func (local *local) writeAndIngestByRange( ctx, cancel := context.WithCancel(ctxt) defer cancel() +<<<<<<< HEAD WriteAndIngest: for retry := 0; retry < maxRetryTimes; { if retry != 0 { @@ -1112,6 +1113,11 @@ WriteAndIngest: logutil.Key("endKey", end), log.ShortError(err), zap.Int("retry", retry)) continue WriteAndIngest } +======= + err = local.SplitAndScatterRegionInBatches(ctx, initialSplitRanges, needSplit, maxBatchSplitRanges) + if err == nil || common.IsContextCanceledError(err) { + break +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) } return err diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index 92c8824c7dc91..61243c36175ec 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -63,9 +63,7 @@ var ( func (local *local) SplitAndScatterRegionInBatches( ctx context.Context, ranges []Range, - tableInfo *checkpoints.TidbTableInfo, needSplit bool, - regionSplitSize int64, batchCnt int, ) error { for i := 0; i < len(ranges); i += batchCnt { @@ -73,7 +71,7 @@ func (local *local) SplitAndScatterRegionInBatches( if len(batch) > batchCnt { batch = batch[:batchCnt] } - if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil { + if err := local.SplitAndScatterRegionByRanges(ctx, batch, needSplit); err != nil { return errors.Trace(err) } } @@ -87,10 +85,13 @@ func (local *local) SplitAndScatterRegionInBatches( func (local *local) SplitAndScatterRegionByRanges( ctx context.Context, ranges []Range, - tableInfo *checkpoints.TidbTableInfo, needSplit bool, +<<<<<<< HEAD regionSplitSize int64, ) error { +======= +) (err error) { +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) if len(ranges) == 0 { return nil } @@ -106,9 +107,14 @@ func (local *local) SplitAndScatterRegionByRanges( scatterRegions := make([]*split.RegionInfo, 0) var retryKeys [][]byte waitTime := splitRegionBaseBackOffTime +<<<<<<< HEAD skippedKeys := 0 for i := 0; i < SplitRetryTimes; i++ { log.L().Info("split and scatter region", +======= + for i := 0; i < splitRetryTimes; i++ { + log.FromContext(ctx).Info("split and scatter region", +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) logutil.Key("minKey", minKey), logutil.Key("maxKey", maxKey), zap.Int("retry", i), @@ -168,6 +174,7 @@ func (local *local) SplitAndScatterRegionByRanges( return nil } +<<<<<<< HEAD var tableRegionStats map[uint64]int64 if tableInfo != nil { tableRegionStats, err = fetchTableRegionSizeStats(ctx, db, tableInfo.ID) @@ -178,6 +185,8 @@ func (local *local) SplitAndScatterRegionByRanges( } } +======= +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) regionMap := make(map[uint64]*split.RegionInfo) for _, region := range regions { regionMap[region.Region.GetId()] = region @@ -287,6 +296,7 @@ func (local *local) SplitAndScatterRegionByRanges( } sendLoop: for regionID, keys := range splitKeyMap { +<<<<<<< HEAD // if region not in tableRegionStats, that means this region is newly split, so // we can skip split it again. regionSize, ok := tableRegionStats[regionID] @@ -296,6 +306,8 @@ func (local *local) SplitAndScatterRegionByRanges( if len(keys) == 1 && regionSize < regionSplitSize { skippedKeys++ } +======= +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) select { case ch <- &splitInfo{region: regionMap[regionID], keys: keys}: case <-ctx.Done(): @@ -338,12 +350,19 @@ func (local *local) SplitAndScatterRegionByRanges( scatterCount++ } if scatterCount == len(scatterRegions) { +<<<<<<< HEAD log.L().Info("waiting for scattering regions done", zap.Int("skipped_keys", skippedKeys), zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) } else { log.L().Info("waiting for scattering regions timeout", zap.Int("skipped_keys", skippedKeys), +======= + log.FromContext(ctx).Info("waiting for scattering regions done", + zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) + } else { + log.FromContext(ctx).Info("waiting for scattering regions timeout", +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) zap.Int("scatterCount", scatterCount), zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 560e7ca5a61f4..5171e50d136c6 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -431,11 +431,18 @@ func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, h start = end } +<<<<<<< HEAD err = local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000) if len(errPat) == 0 { c.Assert(err, IsNil) } else { c.Assert(err, ErrorMatches, errPat) +======= + err = local.SplitAndScatterRegionByRanges(ctx, ranges, true) + if len(errPat) != 0 { + require.Error(t, err) + require.Regexp(t, errPat, err.Error()) +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) return } @@ -454,8 +461,98 @@ func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, h checkRegionRanges(c, regions, result) } +<<<<<<< HEAD func (s *localSuite) TestBatchSplitRegionByRanges(c *C) { s.doTestBatchSplitRegionByRanges(context.Background(), c, nil, "", nil) +======= +func TestBatchSplitRegionByRanges(t *testing.T) { + doTestBatchSplitRegionByRanges(context.Background(), t, nil, "", nil) +} + +type checkScatterClient struct { + *testSplitClient + + mu sync.Mutex + notFoundFirstTime map[uint64]struct{} + scatterCounter atomic.Int32 +} + +func newCheckScatterClient(inner *testSplitClient) *checkScatterClient { + return &checkScatterClient{ + testSplitClient: inner, + notFoundFirstTime: map[uint64]struct{}{}, + scatterCounter: atomic.Int32{}, + } +} + +func (c *checkScatterClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error { + c.scatterCounter.Add(1) + return nil +} + +func (c *checkScatterClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) { + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.notFoundFirstTime[regionID]; !ok { + c.notFoundFirstTime[regionID] = struct{}{} + return nil, nil + } + return c.testSplitClient.GetRegionByID(ctx, regionID) +} + +func TestMissingScatter(t *testing.T) { + ctx := context.Background() + splitHook := defaultHook{} + deferFunc := splitHook.setup(t) + defer deferFunc() + + keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} + client := initTestSplitClient(keys, nil) + checkClient := newCheckScatterClient(client) + local := &Backend{ + splitCli: checkClient, + logger: log.L(), + } + local.RegionSplitBatchSize = 4 + local.RegionSplitConcurrency = 4 + + // current region ranges: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) + rangeStart := codec.EncodeBytes([]byte{}, []byte("b")) + rangeEnd := codec.EncodeBytes([]byte{}, []byte("c")) + regions, err := split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5) + require.NoError(t, err) + // regions is: [aay, bba), [bba, bbh), [bbh, cca) + checkRegionRanges(t, regions, [][]byte{[]byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca")}) + + // generate: ranges [b, ba), [ba, bb), [bb, bc), ... [by, bz) + ranges := make([]Range, 0) + start := []byte{'b'} + for i := byte('a'); i <= 'z'; i++ { + end := []byte{'b', i} + ranges = append(ranges, Range{start: start, end: end}) + start = end + } + + err = local.SplitAndScatterRegionByRanges(ctx, ranges, true) + require.NoError(t, err) + + splitHook.check(t, client) + + // check split ranges + regions, err = split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5) + require.NoError(t, err) + result := [][]byte{ + []byte("b"), []byte("ba"), []byte("bb"), []byte("bba"), []byte("bbh"), []byte("bc"), + []byte("bd"), []byte("be"), []byte("bf"), []byte("bg"), []byte("bh"), []byte("bi"), []byte("bj"), + []byte("bk"), []byte("bl"), []byte("bm"), []byte("bn"), []byte("bo"), []byte("bp"), []byte("bq"), + []byte("br"), []byte("bs"), []byte("bt"), []byte("bu"), []byte("bv"), []byte("bw"), []byte("bx"), + []byte("by"), []byte("bz"), []byte("cca"), + } + checkRegionRanges(t, regions, result) + + // the old regions will not be scattered. They are [..., bba), [bba, bbh), [..., cca) + require.Equal(t, len(result)-3, int(checkClient.scatterCounter.Load())) +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) } type batchSizeHook struct{} @@ -592,8 +689,13 @@ func (s *localSuite) TestSplitAndScatterRegionInBatches(c *C) { }) } +<<<<<<< HEAD err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4) c.Assert(err, IsNil) +======= + err := local.SplitAndScatterRegionInBatches(ctx, ranges, true, 4) + require.NoError(t, err) +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) rangeStart := codec.EncodeBytes([]byte{}, []byte("a")) rangeEnd := codec.EncodeBytes([]byte{}, []byte("b")) @@ -688,8 +790,13 @@ func (s *localSuite) doTestBatchSplitByRangesWithClusteredIndex(c *C, hook clien start = e } +<<<<<<< HEAD err := local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000) c.Assert(err, IsNil) +======= + err := local.SplitAndScatterRegionByRanges(ctx, ranges, true) + require.NoError(t, err) +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) startKey := codec.EncodeBytes([]byte{}, rangeKeys[0]) endKey := codec.EncodeBytes([]byte{}, rangeKeys[len(rangeKeys)-1])