Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#52822
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
Leavrth authored and ti-chi-bot committed Apr 24, 2024
1 parent e406d57 commit f521774
Show file tree
Hide file tree
Showing 8 changed files with 882 additions and 6 deletions.
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
deps = [
"//br/pkg/checksum",
"//br/pkg/errors",
<<<<<<< HEAD:br/pkg/lightning/backend/local/BUILD.bazel
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/external",
Expand All @@ -37,6 +38,8 @@ go_library(
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/tikv",
"//br/pkg/lightning/verification",
=======
>>>>>>> 0805e850d41 (br: handle region leader miss (#52822)):pkg/lightning/backend/local/BUILD.bazel
"//br/pkg/logutil",
"//br/pkg/membuf",
"//br/pkg/pdutil",
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
<<<<<<< HEAD:br/pkg/lightning/backend/local/duplicate.go
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/errormanager"
"github.com/pingcap/tidb/br/pkg/lightning/log"
=======
berrors "github.com/pingcap/tidb/br/pkg/errors"
>>>>>>> 0805e850d41 (br: handle region leader miss (#52822)):pkg/lightning/backend/local/duplicate.go
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -313,7 +317,8 @@ func getDupDetectClient(
) (import_sstpb.ImportSST_DuplicateDetectClient, error) {
leader := region.Leader
if leader == nil {
leader = region.Region.GetPeers()[0]
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound,
"region id %d has no leader", region.Region.Id)
}
importClient, err := importClientFactory.Create(ctx, leader.GetStoreId())
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ import (
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
<<<<<<< HEAD:br/pkg/lightning/backend/local/region_job.go
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
=======
berrors "github.com/pingcap/tidb/br/pkg/errors"
>>>>>>> 0805e850d41 (br: handle region leader miss (#52822)):pkg/lightning/backend/local/region_job.go
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -627,7 +631,8 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe

leader := j.region.Leader
if leader == nil {
leader = j.region.Region.GetPeers()[0]
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound,
"region id %d has no leader", j.region.Region.Id)
}

cli, err := clientFactory.Create(ctx, leader.StoreId)
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,8 @@ func (importer *FileImporter) ingestSSTs(
) (*import_sstpb.IngestResponse, error) {
leader := regionInfo.Leader
if leader == nil {
leader = regionInfo.Region.GetPeers()[0]
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound,
"region id %d has no leader", regionInfo.Region.Id)
}
reqCtx := &kvrpcpb.Context{
RegionId: regionInfo.Region.GetId(),
Expand Down
49 changes: 46 additions & 3 deletions br/pkg/restore/import_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,47 @@ func assertRegions(t *testing.T, regions []*split.RegionInfo, keys ...string) {
}
}

<<<<<<< HEAD
=======
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
func initTestClient(isRawKv bool) *TestClient {
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Id: 1,
StoreId: 1,
}
keys := [6]string{"", "aay", "bba", "bbh", "cca", ""}
regions := make(map[uint64]*split.RegionInfo)
for i := uint64(1); i < 6; i++ {
startKey := []byte(keys[i-1])
if len(startKey) != 0 {
startKey = codec.EncodeBytesExt([]byte{}, startKey, isRawKv)
}
endKey := []byte(keys[i])
if len(endKey) != 0 {
endKey = codec.EncodeBytesExt([]byte{}, endKey, isRawKv)
}
regions[i] = &split.RegionInfo{
Leader: &metapb.Peer{
Id: i,
StoreId: 1,
},
Region: &metapb.Region{
Id: i,
Peers: peers,
StartKey: startKey,
EndKey: endKey,
},
}
}
stores := make(map[uint64]*metapb.Store)
stores[1] = &metapb.Store{
Id: 1,
}
return NewTestClient(stores, regions, 6)
}

>>>>>>> 0805e850d41 (br: handle region leader miss (#52822))
func TestScanSuccess(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
Expand Down Expand Up @@ -245,7 +286,7 @@ func TestEpochNotMatch(t *testing.T) {
{Id: 43},
},
},
Leader: &metapb.Peer{Id: 43},
Leader: &metapb.Peer{Id: 43, StoreId: 1},
}
newRegion := pdtypes.NewRegionInfo(info.Region, info.Leader)
mergeRegion := func() {
Expand Down Expand Up @@ -304,7 +345,8 @@ func TestRegionSplit(t *testing.T) {
EndKey: codec.EncodeBytes(nil, []byte("aayy")),
},
Leader: &metapb.Peer{
Id: 43,
Id: 43,
StoreId: 1,
},
},
{
Expand All @@ -314,7 +356,8 @@ func TestRegionSplit(t *testing.T) {
EndKey: target.Region.EndKey,
},
Leader: &metapb.Peer{
Id: 45,
Id: 45,
StoreId: 1,
},
},
}
Expand Down
195 changes: 195 additions & 0 deletions br/pkg/restore/split/mock_pd_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.

package split

import (
"context"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/tidb/pkg/store/pdtypes"
"github.com/pingcap/tidb/pkg/util/codec"
pd "github.com/tikv/pd/client"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// MockPDClientForSplit is a mock PD client for testing split and scatter.
type MockPDClientForSplit struct {
pd.Client

mu sync.Mutex

Regions *pdtypes.RegionTree
lastRegionID uint64
scanRegions struct {
errors []error
beforeHook func()
}
splitRegions struct {
count int
hijacked func() (bool, *kvrpcpb.SplitRegionResponse, error)
}
scatterRegion struct {
eachRegionFailBefore int
count map[uint64]int
}
scatterRegions struct {
notImplemented bool
regionCount int
}
getOperator struct {
responses map[uint64][]*pdpb.GetOperatorResponse
}
}

// NewMockPDClientForSplit creates a new MockPDClientForSplit.
func NewMockPDClientForSplit() *MockPDClientForSplit {
ret := &MockPDClientForSplit{}
ret.Regions = &pdtypes.RegionTree{}
ret.scatterRegion.count = make(map[uint64]int)
return ret
}

func newRegionNotFullyReplicatedErr(regionID uint64) error {
return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionID)
}

func (c *MockPDClientForSplit) SetRegions(boundaries [][]byte) []*metapb.Region {
c.mu.Lock()
defer c.mu.Unlock()

return c.setRegions(boundaries)
}

func (c *MockPDClientForSplit) setRegions(boundaries [][]byte) []*metapb.Region {
ret := make([]*metapb.Region, 0, len(boundaries)-1)
for i := 1; i < len(boundaries); i++ {
c.lastRegionID++
r := &metapb.Region{
Id: c.lastRegionID,
StartKey: boundaries[i-1],
EndKey: boundaries[i],
}
p := &metapb.Peer{
Id: c.lastRegionID,
StoreId: 1,
}
c.Regions.SetRegion(&pdtypes.Region{
Meta: r,
Leader: p,
})
ret = append(ret, r)
}
return ret
}

func (c *MockPDClientForSplit) ScanRegions(
_ context.Context,
key, endKey []byte,
limit int,
_ ...pd.GetRegionOption,
) ([]*pd.Region, error) {
c.mu.Lock()
defer c.mu.Unlock()

if len(c.scanRegions.errors) > 0 {
err := c.scanRegions.errors[0]
c.scanRegions.errors = c.scanRegions.errors[1:]
return nil, err
}

if c.scanRegions.beforeHook != nil {
c.scanRegions.beforeHook()
}

regions := c.Regions.ScanRange(key, endKey, limit)
ret := make([]*pd.Region, 0, len(regions))
for _, r := range regions {
ret = append(ret, &pd.Region{
Meta: r.Meta,
Leader: r.Leader,
})
}
return ret, nil
}

func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...pd.GetRegionOption) (*pd.Region, error) {
c.mu.Lock()
defer c.mu.Unlock()

for _, r := range c.Regions.Regions {
if r.Meta.Id == regionID {
return &pd.Region{
Meta: r.Meta,
Leader: r.Leader,
}, nil
}
}
return nil, errors.New("region not found")
}

func (c *MockPDClientForSplit) SplitRegion(
region *RegionInfo,
keys [][]byte,
isRawKV bool,
) (bool, *kvrpcpb.SplitRegionResponse, error) {
c.mu.Lock()
defer c.mu.Unlock()

c.splitRegions.count++
if c.splitRegions.hijacked != nil {
return c.splitRegions.hijacked()
}

if !isRawKV {
for i := range keys {
keys[i] = codec.EncodeBytes(nil, keys[i])
}
}

newRegionBoundaries := make([][]byte, 0, len(keys)+2)
newRegionBoundaries = append(newRegionBoundaries, region.Region.StartKey)
newRegionBoundaries = append(newRegionBoundaries, keys...)
newRegionBoundaries = append(newRegionBoundaries, region.Region.EndKey)
newRegions := c.setRegions(newRegionBoundaries)
newRegions[0].Id = region.Region.Id
return false, &kvrpcpb.SplitRegionResponse{Regions: newRegions}, nil
}

func (c *MockPDClientForSplit) ScatterRegion(_ context.Context, regionID uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

c.scatterRegion.count[regionID]++
if c.scatterRegion.count[regionID] > c.scatterRegion.eachRegionFailBefore {
return nil
}
return newRegionNotFullyReplicatedErr(regionID)
}

func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...pd.RegionsOption) (*pdpb.ScatterRegionResponse, error) {
c.mu.Lock()
defer c.mu.Unlock()

if c.scatterRegions.notImplemented {
return nil, status.Error(codes.Unimplemented, "Ah, yep")
}
c.scatterRegions.regionCount += len(regionIDs)
return &pdpb.ScatterRegionResponse{}, nil
}

func (c *MockPDClientForSplit) GetOperator(_ context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
c.mu.Lock()
defer c.mu.Unlock()

if c.getOperator.responses == nil {
return &pdpb.GetOperatorResponse{Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, nil
}
ret := c.getOperator.responses[regionID][0]
c.getOperator.responses[regionID] = c.getOperator.responses[regionID][1:]
return ret, nil
}
16 changes: 16 additions & 0 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,23 @@ func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro
}

cur := regions[0]
if cur.Leader == nil {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader is nil", cur.Region.Id)
}
if cur.Leader.StoreId == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader's store id is 0", cur.Region.Id)
}
for _, r := range regions[1:] {
if r.Leader == nil {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader is nil", r.Region.Id)
}
if r.Leader.StoreId == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader's store id is 0", r.Region.Id)
}
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's endKey not equal to next region %d's startKey, endKey: %s, startKey: %s, region epoch: %s %s",
Expand Down
Loading

0 comments on commit f521774

Please sign in to comment.