Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fallback to follower when leader is busy (#916) #923

Merged
merged 2 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,9 +1289,11 @@ func (c *RegionCache) reloadRegion(regionID uint64) {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
c.mu.RLock()
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
oldRegion.asyncReload.Store(false)
}
c.mu.RUnlock()
return
}
c.mu.Lock()
Expand Down
124 changes: 96 additions & 28 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,26 +395,51 @@ type tryFollower struct {
lastIdx AccessIndex
// fromOnNotLeader indicates whether the state is changed from onNotLeader.
fromOnNotLeader bool
labels []*metapb.StoreLabel
}

func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
var targetReplica *replica
hasDeadlineExceededErr := false
// Search replica that is not attempted from the last accessed replica
for i := 1; i < len(selector.replicas); i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
targetReplica = selector.replicas[idx]
hasDeadlineExceededErr = hasDeadlineExceededErr || targetReplica.deadlineErrUsingConfTimeout
if idx == state.leaderIdx {
continue
//hasDeadlineExceededErr || targetReplica.deadlineErrUsingConfTimeout
filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) {
for i := 0; i < len(selector.replicas); i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
if idx == state.leaderIdx {
continue
}
selectReplica := selector.replicas[idx]
hasDeadlineExceededErr = hasDeadlineExceededErr || selectReplica.deadlineErrUsingConfTimeout
if selectReplica.store.getLivenessState() != unreachable && !selectReplica.deadlineErrUsingConfTimeout &&
fn(selectReplica) {
return idx, selectReplica
}
}
// Each follower is only tried once
if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable && !targetReplica.deadlineErrUsingConfTimeout {
return -1, nil
}

if len(state.labels) > 0 {
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
return selectReplica.store.IsLabelsMatch(state.labels)
})
if selectReplica != nil && idx >= 0 {
state.lastIdx = idx
selector.targetIdx = idx
}
// labels only take effect for first try.
state.labels = nil
}

if selector.targetIdx < 0 {
// Search replica that is not attempted from the last accessed replica
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
return !selectReplica.isExhausted(1)
})
if selectReplica != nil && idx >= 0 {
state.lastIdx = idx
selector.targetIdx = idx
break
}
}

// If all followers are tried and fail, backoff and retry.
if selector.targetIdx < 0 {
if hasDeadlineExceededErr {
Expand All @@ -427,22 +452,24 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
}
rpcCtx, err := selector.buildRPCContext(bo)
if err != nil || rpcCtx == nil {
return nil, err
return rpcCtx, err
}
// If the state is changed from onNotLeader, the `replicaRead` flag should not be set as leader read would still be used.
if !state.fromOnNotLeader {
replicaRead := selector.targetIdx != state.leaderIdx
replicaRead := true
rpcCtx.contextPatcher.replicaRead = &replicaRead
}
disableStaleRead := false
rpcCtx.contextPatcher.staleRead = &disableStaleRead
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
return rpcCtx, nil
}

func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
if state.fromOnNotLeader {
if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) {
panic("the store must exist")
peer := selector.targetReplica().peer
if !selector.region.switchWorkLeaderToPeer(peer) {
logutil.BgLogger().Warn("the store must exist",
zap.Uint64("store", peer.StoreId),
zap.Uint64("peer", peer.Id))
}
}
}
Expand Down Expand Up @@ -565,6 +592,10 @@ type accessFollower struct {
learnerOnly bool
}

// Follower read will try followers first, if no follower is available, it will fallback to leader.
// Specially, for stale read, it tries local peer(can be either leader or follower), then use snapshot read in the leader,
// if the leader read receive server-is-busy and connection errors, the region cache is still valid,
// and the state will be changed to tryFollower, which will read by replica read.
func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
replicaSize := len(selector.replicas)
resetStaleRead := false
Expand Down Expand Up @@ -633,7 +664,8 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
leaderInvalid := leader.isEpochStale() || state.IsLeaderExhausted(leader)
leaderEpochStale := leader.isEpochStale()
leaderInvalid := leaderEpochStale || state.IsLeaderExhausted(leader)
if len(state.option.labels) > 0 {
logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels",
zap.Uint64("region", selector.region.GetID()),
Expand All @@ -645,6 +677,20 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
return nil, nil
}
if leaderInvalid {
// In stale-read, the request will fallback to leader after the local follower failure.
// If the leader is also unavailable, we can fallback to the follower and use replica-read flag again,
// The remote follower not tried yet, and the local follower can retry without stale-read flag.
if state.isStaleRead {
selector.state = &tryFollower{
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
}
if leaderEpochStale {
selector.regionCache.scheduleReloadRegion(selector.region)
}
return nil, stateChanged{}
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand Down Expand Up @@ -695,23 +741,25 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout {
return false
}
// The request can only be sent to the leader.
if state.option.leaderOnly && idx == state.leaderIdx {
return true
if state.option.leaderOnly {
// The request can only be sent to the leader.
return idx == state.leaderIdx
}
// Choose a replica with matched labels.
followerCandidate := !state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) &&
replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner)
if !followerCandidate {
if !state.tryLeader && idx == state.leaderIdx {
// The request cannot be sent to leader.
return false
}
if state.learnerOnly {
// The request can only be sent to the learner.
return replica.peer.Role == metapb.PeerRole_Learner
}
// And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers
// as candidates to serve the Read request.
if state.option.preferLeader && replica.store.isSlow() {
return false
}
// If the stores are limited, check if the store is in the list.
return replica.store.IsStoreMatch(state.option.stores)
// Choose a replica with matched labels.
return replica.store.IsStoreMatch(state.option.stores) && replica.store.IsLabelsMatch(state.option.labels)
}

// tryIdleReplica is the state where we find the leader is busy and retry the request using replica read.
Expand Down Expand Up @@ -1101,6 +1149,9 @@ func (s *replicaSelector) onServerIsBusy(
// Mark the server is busy (the next incoming READs could be redirect
// to expected followers. )
ctx.Store.markAlreadySlow()
if s.canFallback2Follower() {
return true, nil
}
}
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
if err != nil {
Expand All @@ -1109,6 +1160,23 @@ func (s *replicaSelector) onServerIsBusy(
return true, nil
}

// For some reasons, the leader is unreachable by now, try followers instead.
// the state is changed in accessFollower.next when leader is unavailable.
func (s *replicaSelector) canFallback2Follower() bool {
if s == nil || s.state == nil {
return false
}
state, ok := s.state.(*accessFollower)
if !ok {
return false
}
if !state.isStaleRead {
return false
}
// can fallback to follower only when the leader is exhausted.
return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx])
}

func (s *replicaSelector) invalidateRegion() {
if s.region != nil {
s.region.invalidate(Other)
Expand Down
96 changes: 94 additions & 2 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
atomic.StoreInt64(&region.lastAccess, time.Now().Unix())
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
// Should swith to the next follower.
// Should switch to the next follower.
s.Equal(AccessIndex(tikvLearnerAccessIdx), accessLearner.lastIdx)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
}
Expand Down Expand Up @@ -590,7 +590,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ {
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
// Should swith to the next follower.
// Should switch to the next follower.
s.NotEqual(lastIdx, state3.lastIdx)
// Shouldn't access the leader if followers aren't exhausted.
s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx)
Expand Down Expand Up @@ -1284,3 +1284,95 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
}
}
}

func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
var followerID *uint64
for _, storeID := range s.storeIDs {
if storeID != leaderStore.storeID {
id := storeID
followerID = &id
break
}
}
s.NotNil(followerID)
followerLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(*followerID, 10),
},
}

regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)

dataIsNotReady := false
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
if dataIsNotReady && req.StaleRead {
dataIsNotReady = false
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
if addr == leaderStore.addr {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
ServerIsBusy: &errorpb.ServerIsBusy{},
}}}, nil
}
if !req.ReplicaRead {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
NotLeader: &errorpb.NotLeader{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil
}}

for _, localLeader := range []bool{true, false} {
dataIsNotReady = true
// data is not ready, then server is busy in the first round,
// directly server is busy in the second round.
for i := 0; i < 2; i++ {
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
req.ReplicaReadType = kv.ReplicaReadMixed
var ops []StoreSelectorOption
if localLeader {
ops = append(ops, WithMatchLabels(leaderLabel))
} else {
ops = append(ops, WithMatchLabels(followerLabel))
}

ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second)
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)

regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
if localLeader {
s.NotEqual(getResp.Value, []byte("store"+leaderLabel[0].Value))
} else {
s.Equal(getResp.Value, []byte("store"+followerLabel[0].Value))
}
cancel()
}
}
}