diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 87cd06704..cf57b521b 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -1289,9 +1289,11 @@ func (c *RegionCache) reloadRegion(regionID uint64) { // ignore error and use old region info. logutil.Logger(bo.GetCtx()).Error("load region failure", zap.Uint64("regionID", regionID), zap.Error(err)) + c.mu.RLock() if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil { oldRegion.asyncReload.Store(false) } + c.mu.RUnlock() return } c.mu.Lock() diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index e071d739a..eca28b872 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -395,26 +395,51 @@ type tryFollower struct { lastIdx AccessIndex // fromOnNotLeader indicates whether the state is changed from onNotLeader. fromOnNotLeader bool + labels []*metapb.StoreLabel } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { - var targetReplica *replica hasDeadlineExceededErr := false - // Search replica that is not attempted from the last accessed replica - for i := 1; i < len(selector.replicas); i++ { - idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) - targetReplica = selector.replicas[idx] - hasDeadlineExceededErr = hasDeadlineExceededErr || targetReplica.deadlineErrUsingConfTimeout - if idx == state.leaderIdx { - continue + //hasDeadlineExceededErr || targetReplica.deadlineErrUsingConfTimeout + filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) { + for i := 0; i < len(selector.replicas); i++ { + idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) + if idx == state.leaderIdx { + continue + } + selectReplica := selector.replicas[idx] + hasDeadlineExceededErr = hasDeadlineExceededErr || selectReplica.deadlineErrUsingConfTimeout + if selectReplica.store.getLivenessState() != unreachable && !selectReplica.deadlineErrUsingConfTimeout && + fn(selectReplica) { + return idx, selectReplica + } } - // Each follower is only tried once - if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable && !targetReplica.deadlineErrUsingConfTimeout { + return -1, nil + } + + if len(state.labels) > 0 { + idx, selectReplica := filterReplicas(func(selectReplica *replica) bool { + return selectReplica.store.IsLabelsMatch(state.labels) + }) + if selectReplica != nil && idx >= 0 { + state.lastIdx = idx + selector.targetIdx = idx + } + // labels only take effect for first try. + state.labels = nil + } + + if selector.targetIdx < 0 { + // Search replica that is not attempted from the last accessed replica + idx, selectReplica := filterReplicas(func(selectReplica *replica) bool { + return !selectReplica.isExhausted(1) + }) + if selectReplica != nil && idx >= 0 { state.lastIdx = idx selector.targetIdx = idx - break } } + // If all followers are tried and fail, backoff and retry. if selector.targetIdx < 0 { if hasDeadlineExceededErr { @@ -427,22 +452,24 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( } rpcCtx, err := selector.buildRPCContext(bo) if err != nil || rpcCtx == nil { - return nil, err + return rpcCtx, err } - // If the state is changed from onNotLeader, the `replicaRead` flag should not be set as leader read would still be used. if !state.fromOnNotLeader { - replicaRead := selector.targetIdx != state.leaderIdx + replicaRead := true rpcCtx.contextPatcher.replicaRead = &replicaRead } - disableStaleRead := false - rpcCtx.contextPatcher.staleRead = &disableStaleRead + staleRead := false + rpcCtx.contextPatcher.staleRead = &staleRead return rpcCtx, nil } func (state *tryFollower) onSendSuccess(selector *replicaSelector) { if state.fromOnNotLeader { - if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) { - panic("the store must exist") + peer := selector.targetReplica().peer + if !selector.region.switchWorkLeaderToPeer(peer) { + logutil.BgLogger().Warn("the store must exist", + zap.Uint64("store", peer.StoreId), + zap.Uint64("peer", peer.Id)) } } } @@ -565,6 +592,10 @@ type accessFollower struct { learnerOnly bool } +// Follower read will try followers first, if no follower is available, it will fallback to leader. +// Specially, for stale read, it tries local peer(can be either leader or follower), then use snapshot read in the leader, +// if the leader read receive server-is-busy and connection errors, the region cache is still valid, +// and the state will be changed to tryFollower, which will read by replica read. func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { replicaSize := len(selector.replicas) resetStaleRead := false @@ -633,7 +664,8 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector // If there is no candidate, fallback to the leader. if selector.targetIdx < 0 { leader := selector.replicas[state.leaderIdx] - leaderInvalid := leader.isEpochStale() || state.IsLeaderExhausted(leader) + leaderEpochStale := leader.isEpochStale() + leaderInvalid := leaderEpochStale || state.IsLeaderExhausted(leader) if len(state.option.labels) > 0 { logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels", zap.Uint64("region", selector.region.GetID()), @@ -645,6 +677,20 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector return nil, nil } if leaderInvalid { + // In stale-read, the request will fallback to leader after the local follower failure. + // If the leader is also unavailable, we can fallback to the follower and use replica-read flag again, + // The remote follower not tried yet, and the local follower can retry without stale-read flag. + if state.isStaleRead { + selector.state = &tryFollower{ + leaderIdx: state.leaderIdx, + lastIdx: state.leaderIdx, + labels: state.option.labels, + } + if leaderEpochStale { + selector.regionCache.scheduleReloadRegion(selector.region) + } + return nil, stateChanged{} + } metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil @@ -695,23 +741,25 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout { return false } - // The request can only be sent to the leader. - if state.option.leaderOnly && idx == state.leaderIdx { - return true + if state.option.leaderOnly { + // The request can only be sent to the leader. + return idx == state.leaderIdx } - // Choose a replica with matched labels. - followerCandidate := !state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && - replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner) - if !followerCandidate { + if !state.tryLeader && idx == state.leaderIdx { + // The request cannot be sent to leader. return false } + if state.learnerOnly { + // The request can only be sent to the learner. + return replica.peer.Role == metapb.PeerRole_Learner + } // And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers // as candidates to serve the Read request. if state.option.preferLeader && replica.store.isSlow() { return false } - // If the stores are limited, check if the store is in the list. - return replica.store.IsStoreMatch(state.option.stores) + // Choose a replica with matched labels. + return replica.store.IsStoreMatch(state.option.stores) && replica.store.IsLabelsMatch(state.option.labels) } // tryIdleReplica is the state where we find the leader is busy and retry the request using replica read. @@ -1101,6 +1149,9 @@ func (s *replicaSelector) onServerIsBusy( // Mark the server is busy (the next incoming READs could be redirect // to expected followers. ) ctx.Store.markAlreadySlow() + if s.canFallback2Follower() { + return true, nil + } } err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) if err != nil { @@ -1109,6 +1160,23 @@ func (s *replicaSelector) onServerIsBusy( return true, nil } +// For some reasons, the leader is unreachable by now, try followers instead. +// the state is changed in accessFollower.next when leader is unavailable. +func (s *replicaSelector) canFallback2Follower() bool { + if s == nil || s.state == nil { + return false + } + state, ok := s.state.(*accessFollower) + if !ok { + return false + } + if !state.isStaleRead { + return false + } + // can fallback to follower only when the leader is exhausted. + return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx]) +} + func (s *replicaSelector) invalidateRegion() { if s.region != nil { s.region.invalidate(Other) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index f56b4022f..bae33b7cf 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -348,7 +348,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) rpcCtx, err := replicaSelector.next(s.bo) s.Nil(err) - // Should swith to the next follower. + // Should switch to the next follower. s.Equal(AccessIndex(tikvLearnerAccessIdx), accessLearner.lastIdx) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) } @@ -590,7 +590,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ { rpcCtx, err := replicaSelector.next(s.bo) s.Nil(err) - // Should swith to the next follower. + // Should switch to the next follower. s.NotEqual(lastIdx, state3.lastIdx) // Shouldn't access the leader if followers aren't exhausted. s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx) @@ -1284,3 +1284,95 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { } } } + +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() { + leaderStore, _ := s.loadAndGetLeaderStore() + leaderLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(leaderStore.StoreID(), 10), + }, + } + var followerID *uint64 + for _, storeID := range s.storeIDs { + if storeID != leaderStore.storeID { + id := storeID + followerID = &id + break + } + } + s.NotNil(followerID) + followerLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(*followerID, 10), + }, + } + + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + + dataIsNotReady := false + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + default: + } + if dataIsNotReady && req.StaleRead { + dataIsNotReady = false + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{}, + }}}, nil + } + if addr == leaderStore.addr { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + ServerIsBusy: &errorpb.ServerIsBusy{}, + }}}, nil + } + if !req.ReplicaRead { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + NotLeader: &errorpb.NotLeader{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + }} + + for _, localLeader := range []bool{true, false} { + dataIsNotReady = true + // data is not ready, then server is busy in the first round, + // directly server is busy in the second round. + for i := 0; i < 2; i++ { + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + req.ReplicaReadType = kv.ReplicaReadMixed + var ops []StoreSelectorOption + if localLeader { + ops = append(ops, WithMatchLabels(leaderLabel)) + } else { + ops = append(ops, WithMatchLabels(followerLabel)) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second) + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) + s.True(ok) + if localLeader { + s.NotEqual(getResp.Value, []byte("store"+leaderLabel[0].Value)) + } else { + s.Equal(getResp.Value, []byte("store"+followerLabel[0].Value)) + } + cancel() + } + } +}