Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#46202
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
mittalrishabh authored and ti-chi-bot committed Aug 23, 2023
1 parent a420763 commit 36bd0ad
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 4 deletions.
6 changes: 6 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,7 @@ func (local *local) writeAndIngestByRange(
ctx, cancel := context.WithCancel(ctxt)
defer cancel()

<<<<<<< HEAD
WriteAndIngest:
for retry := 0; retry < maxRetryTimes; {
if retry != 0 {
Expand Down Expand Up @@ -1112,6 +1113,11 @@ WriteAndIngest:
logutil.Key("endKey", end), log.ShortError(err), zap.Int("retry", retry))
continue WriteAndIngest
}
=======
err = local.SplitAndScatterRegionInBatches(ctx, initialSplitRanges, needSplit, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
break
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
}

return err
Expand Down
27 changes: 23 additions & 4 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,15 @@ var (
func (local *local) SplitAndScatterRegionInBatches(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
regionSplitSize int64,
batchCnt int,
) error {
for i := 0; i < len(ranges); i += batchCnt {
batch := ranges[i:]
if len(batch) > batchCnt {
batch = batch[:batchCnt]
}
if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil {
if err := local.SplitAndScatterRegionByRanges(ctx, batch, needSplit); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -87,10 +85,13 @@ func (local *local) SplitAndScatterRegionInBatches(
func (local *local) SplitAndScatterRegionByRanges(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
<<<<<<< HEAD
regionSplitSize int64,
) error {
=======
) (err error) {
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
if len(ranges) == 0 {
return nil
}
Expand All @@ -106,9 +107,14 @@ func (local *local) SplitAndScatterRegionByRanges(
scatterRegions := make([]*split.RegionInfo, 0)
var retryKeys [][]byte
waitTime := splitRegionBaseBackOffTime
<<<<<<< HEAD
skippedKeys := 0
for i := 0; i < SplitRetryTimes; i++ {
log.L().Info("split and scatter region",
=======
for i := 0; i < splitRetryTimes; i++ {
log.FromContext(ctx).Info("split and scatter region",
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
logutil.Key("minKey", minKey),
logutil.Key("maxKey", maxKey),
zap.Int("retry", i),
Expand Down Expand Up @@ -168,6 +174,7 @@ func (local *local) SplitAndScatterRegionByRanges(
return nil
}

<<<<<<< HEAD
var tableRegionStats map[uint64]int64
if tableInfo != nil {
tableRegionStats, err = fetchTableRegionSizeStats(ctx, db, tableInfo.ID)
Expand All @@ -178,6 +185,8 @@ func (local *local) SplitAndScatterRegionByRanges(
}
}

=======
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
regionMap := make(map[uint64]*split.RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
Expand Down Expand Up @@ -287,6 +296,7 @@ func (local *local) SplitAndScatterRegionByRanges(
}
sendLoop:
for regionID, keys := range splitKeyMap {
<<<<<<< HEAD
// if region not in tableRegionStats, that means this region is newly split, so
// we can skip split it again.
regionSize, ok := tableRegionStats[regionID]
Expand All @@ -296,6 +306,8 @@ func (local *local) SplitAndScatterRegionByRanges(
if len(keys) == 1 && regionSize < regionSplitSize {
skippedKeys++
}
=======
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
select {
case ch <- &splitInfo{region: regionMap[regionID], keys: keys}:
case <-ctx.Done():
Expand Down Expand Up @@ -338,12 +350,19 @@ func (local *local) SplitAndScatterRegionByRanges(
scatterCount++
}
if scatterCount == len(scatterRegions) {
<<<<<<< HEAD
log.L().Info("waiting for scattering regions done",
zap.Int("skipped_keys", skippedKeys),
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
} else {
log.L().Info("waiting for scattering regions timeout",
zap.Int("skipped_keys", skippedKeys),
=======
log.FromContext(ctx).Info("waiting for scattering regions done",
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
} else {
log.FromContext(ctx).Info("waiting for scattering regions timeout",
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
zap.Int("scatterCount", scatterCount),
zap.Int("regions", len(scatterRegions)),
zap.Duration("take", time.Since(startTime)))
Expand Down
107 changes: 107 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,18 @@ func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, h
start = end
}

<<<<<<< HEAD
err = local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000)
if len(errPat) == 0 {
c.Assert(err, IsNil)
} else {
c.Assert(err, ErrorMatches, errPat)
=======
err = local.SplitAndScatterRegionByRanges(ctx, ranges, true)
if len(errPat) != 0 {
require.Error(t, err)
require.Regexp(t, errPat, err.Error())
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
return
}

Expand All @@ -454,8 +461,98 @@ func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, h
checkRegionRanges(c, regions, result)
}

<<<<<<< HEAD
func (s *localSuite) TestBatchSplitRegionByRanges(c *C) {
s.doTestBatchSplitRegionByRanges(context.Background(), c, nil, "", nil)
=======
func TestBatchSplitRegionByRanges(t *testing.T) {
doTestBatchSplitRegionByRanges(context.Background(), t, nil, "", nil)
}

type checkScatterClient struct {
*testSplitClient

mu sync.Mutex
notFoundFirstTime map[uint64]struct{}
scatterCounter atomic.Int32
}

func newCheckScatterClient(inner *testSplitClient) *checkScatterClient {
return &checkScatterClient{
testSplitClient: inner,
notFoundFirstTime: map[uint64]struct{}{},
scatterCounter: atomic.Int32{},
}
}

func (c *checkScatterClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error {
c.scatterCounter.Add(1)
return nil
}

func (c *checkScatterClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.notFoundFirstTime[regionID]; !ok {
c.notFoundFirstTime[regionID] = struct{}{}
return nil, nil
}
return c.testSplitClient.GetRegionByID(ctx, regionID)
}

func TestMissingScatter(t *testing.T) {
ctx := context.Background()
splitHook := defaultHook{}
deferFunc := splitHook.setup(t)
defer deferFunc()

keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")}
client := initTestSplitClient(keys, nil)
checkClient := newCheckScatterClient(client)
local := &Backend{
splitCli: checkClient,
logger: log.L(),
}
local.RegionSplitBatchSize = 4
local.RegionSplitConcurrency = 4

// current region ranges: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
rangeStart := codec.EncodeBytes([]byte{}, []byte("b"))
rangeEnd := codec.EncodeBytes([]byte{}, []byte("c"))
regions, err := split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5)
require.NoError(t, err)
// regions is: [aay, bba), [bba, bbh), [bbh, cca)
checkRegionRanges(t, regions, [][]byte{[]byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca")})

// generate: ranges [b, ba), [ba, bb), [bb, bc), ... [by, bz)
ranges := make([]Range, 0)
start := []byte{'b'}
for i := byte('a'); i <= 'z'; i++ {
end := []byte{'b', i}
ranges = append(ranges, Range{start: start, end: end})
start = end
}

err = local.SplitAndScatterRegionByRanges(ctx, ranges, true)
require.NoError(t, err)

splitHook.check(t, client)

// check split ranges
regions, err = split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5)
require.NoError(t, err)
result := [][]byte{
[]byte("b"), []byte("ba"), []byte("bb"), []byte("bba"), []byte("bbh"), []byte("bc"),
[]byte("bd"), []byte("be"), []byte("bf"), []byte("bg"), []byte("bh"), []byte("bi"), []byte("bj"),
[]byte("bk"), []byte("bl"), []byte("bm"), []byte("bn"), []byte("bo"), []byte("bp"), []byte("bq"),
[]byte("br"), []byte("bs"), []byte("bt"), []byte("bu"), []byte("bv"), []byte("bw"), []byte("bx"),
[]byte("by"), []byte("bz"), []byte("cca"),
}
checkRegionRanges(t, regions, result)

// the old regions will not be scattered. They are [..., bba), [bba, bbh), [..., cca)
require.Equal(t, len(result)-3, int(checkClient.scatterCounter.Load()))
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
}

type batchSizeHook struct{}
Expand Down Expand Up @@ -592,8 +689,13 @@ func (s *localSuite) TestSplitAndScatterRegionInBatches(c *C) {
})
}

<<<<<<< HEAD
err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4)
c.Assert(err, IsNil)
=======
err := local.SplitAndScatterRegionInBatches(ctx, ranges, true, 4)
require.NoError(t, err)
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))

rangeStart := codec.EncodeBytes([]byte{}, []byte("a"))
rangeEnd := codec.EncodeBytes([]byte{}, []byte("b"))
Expand Down Expand Up @@ -688,8 +790,13 @@ func (s *localSuite) doTestBatchSplitByRangesWithClusteredIndex(c *C, hook clien
start = e
}

<<<<<<< HEAD
err := local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000)
c.Assert(err, IsNil)
=======
err := local.SplitAndScatterRegionByRanges(ctx, ranges, true)
require.NoError(t, err)
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))

startKey := codec.EncodeBytes([]byte{}, rangeKeys[0])
endKey := codec.EncodeBytes([]byte{}, rangeKeys[len(rangeKeys)-1])
Expand Down

0 comments on commit 36bd0ad

Please sign in to comment.