Skip to content

Commit

Permalink
fallback to follower when leader is busy (tikv#916)
Browse files Browse the repository at this point in the history
* fallback to follower when leader is busy

Signed-off-by: you06 <[email protected]>
Co-authored-by: cfzjywxk <[email protected]>
  • Loading branch information
you06 and cfzjywxk committed Aug 7, 2023
1 parent a47e1c2 commit cf84acf
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 24 deletions.
2 changes: 2 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,9 +1289,11 @@ func (c *RegionCache) reloadRegion(regionID uint64) {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
c.mu.RLock()
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
oldRegion.asyncReload.Store(false)
}
c.mu.RUnlock()
return
}
c.mu.Lock()
Expand Down
128 changes: 104 additions & 24 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,38 +389,77 @@ func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) {
// the leader will be updated to replicas[0] and give it another chance.
type tryFollower struct {
stateBase
leaderIdx AccessIndex
lastIdx AccessIndex
// if the leader is unavailable, but it still holds the leadership, fallbackFromLeader is true and replica read is enabled.
fallbackFromLeader bool
leaderIdx AccessIndex
lastIdx AccessIndex
labels []*metapb.StoreLabel
}

func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
var targetReplica *replica
// Search replica that is not attempted from the last accessed replica
for i := 1; i < len(selector.replicas); i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
if idx == state.leaderIdx {
continue
filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) {
for i := 0; i < len(selector.replicas); i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
if idx == state.leaderIdx {
continue
}
selectReplica := selector.replicas[idx]
if fn(selectReplica) && selectReplica.store.getLivenessState() != unreachable {
return idx, selectReplica
}
}
targetReplica = selector.replicas[idx]
// Each follower is only tried once
if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable {
return -1, nil
}

if len(state.labels) > 0 {
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
return selectReplica.store.IsLabelsMatch(state.labels)
})
if selectReplica != nil && idx >= 0 {
state.lastIdx = idx
selector.targetIdx = idx
}
// labels only take effect for first try.
state.labels = nil
}
if selector.targetIdx < 0 {
// Search replica that is not attempted from the last accessed replica
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
return !selectReplica.isExhausted(1)
})
if selectReplica != nil && idx >= 0 {
state.lastIdx = idx
selector.targetIdx = idx
break
}
}

// If all followers are tried and fail, backoff and retry.
if selector.targetIdx < 0 {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
}
return selector.buildRPCContext(bo)
rpcCtx, err := selector.buildRPCContext(bo)
if err != nil || rpcCtx == nil {
return rpcCtx, err
}
if state.fallbackFromLeader {
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
replicaRead := true
rpcCtx.contextPatcher.replicaRead = &replicaRead
}
return rpcCtx, nil
}

func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) {
panic("the store must exist")
if !state.fallbackFromLeader {
peer := selector.targetReplica().peer
if !selector.region.switchWorkLeaderToPeer(peer) {
logutil.BgLogger().Warn("the store must exist",
zap.Uint64("store", peer.StoreId),
zap.Uint64("peer", peer.Id))
}
}
}

Expand Down Expand Up @@ -542,6 +581,10 @@ type accessFollower struct {
learnerOnly bool
}

// Follower read will try followers first, if no follower is available, it will fallback to leader.
// Specially, for stale read, it tries local peer(can be either leader or follower), then use snapshot read in the leader,
// if the leader read receive server-is-busy and connection errors, the region cache is still valid,
// and the state will be changed to tryFollower, which will read by replica read.
func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
replicaSize := len(selector.replicas)
resetStaleRead := false
Expand Down Expand Up @@ -610,14 +653,30 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
leaderInvalid := leader.isEpochStale() || state.IsLeaderExhausted(leader)
leaderEpochStale := leader.isEpochStale()
leaderInvalid := leaderEpochStale || state.IsLeaderExhausted(leader)
if len(state.option.labels) > 0 {
logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels",
zap.Uint64("region", selector.region.GetID()),
zap.Bool("leader-invalid", leaderInvalid),
zap.Any("labels", state.option.labels))
}
if leaderInvalid {
// In stale-read, the request will fallback to leader after the local follower failure.
// If the leader is also unavailable, we can fallback to the follower and use replica-read flag again,
// The remote follower not tried yet, and the local follower can retry without stale-read flag.
if state.isStaleRead {
selector.state = &tryFollower{
fallbackFromLeader: true,
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
}
if leaderEpochStale {
selector.regionCache.scheduleReloadRegion(selector.region)
}
return nil, stateChanged{}
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand Down Expand Up @@ -668,22 +727,23 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
return false
}
// The request can only be sent to the leader.
if state.option.leaderOnly && idx == state.leaderIdx {
return true
if state.option.leaderOnly {
// The request can only be sent to the leader.
return idx == state.leaderIdx
} else if !state.tryLeader && idx == state.leaderIdx {
// The request cannot be sent to leader.
return false
}
// Choose a replica with matched labels.
followerCandidate := !state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) &&
replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner)
if !followerCandidate {
if state.learnerOnly && replica.peer.Role == metapb.PeerRole_Learner {
// The request can only be sent to the learner.
return false
}
// And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers
// as candidates to serve the Read request.
if state.option.preferLeader && replica.store.isSlow() {
return false
}
// If the stores are limited, check if the store is in the list.
// Choose a replica with matched labels.
return replica.store.IsStoreMatch(state.option.stores)
}

Expand Down Expand Up @@ -1064,6 +1124,9 @@ func (s *replicaSelector) onServerIsBusy(
// Mark the server is busy (the next incoming READs could be redirect
// to expected followers. )
ctx.Store.markAlreadySlow()
if s.canFallback2Follower() {
return true, nil
}
}
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
if err != nil {
Expand All @@ -1072,6 +1135,23 @@ func (s *replicaSelector) onServerIsBusy(
return true, nil
}

// For some reasons, the leader is unreachable by now, try followers instead.
// the state is changed in accessFollower.next when leader is unavailable.
func (s *replicaSelector) canFallback2Follower() bool {
if s == nil || s.state == nil {
return false
}
state, ok := s.state.(*accessFollower)
if !ok {
return false
}
if !state.isStaleRead {
return false
}
// can fallback to follower only when the leader is exhausted.
return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx])
}

func (s *replicaSelector) invalidateRegion() {
if s.region != nil {
s.region.invalidate(Other)
Expand Down
90 changes: 90 additions & 0 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,3 +1199,93 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
s.True(ok)
s.Equal(getResp.Value, value)
}

func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
var followerID *uint64
for _, storeID := range s.storeIDs {
if storeID != leaderStore.storeID {
followerID = &storeID
break
}
}
s.NotNil(followerID)
followerLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(*followerID, 10),
},
}

regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)

dataIsNotReady := false
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
if dataIsNotReady && req.StaleRead {
dataIsNotReady = false
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
if addr == leaderStore.addr {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
ServerIsBusy: &errorpb.ServerIsBusy{},
}}}, nil
}
if !req.ReplicaRead {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
NotLeader: &errorpb.NotLeader{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil
}}

for _, localLeader := range []bool{true, false} {
dataIsNotReady = true
// data is not ready, then server is busy in the first round,
// directly server is busy in the second round.
for i := 0; i < 2; i++ {
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
req.ReplicaReadType = kv.ReplicaReadMixed
var ops []StoreSelectorOption
if localLeader {
ops = append(ops, WithMatchLabels(leaderLabel))
} else {
ops = append(ops, WithMatchLabels(followerLabel))
}

ctx, _ := context.WithTimeout(context.Background(), 10000*time.Second)
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)

regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
if localLeader {
s.NotEqual(getResp.Value, []byte("store"+leaderLabel[0].Value))
} else {
s.Equal(getResp.Value, []byte("store"+followerLabel[0].Value))
}
}
}
}

0 comments on commit cf84acf

Please sign in to comment.