From f6f55dd7611dcb579fe42bbfd94c1835dd064bcf Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 25 Jul 2023 14:08:32 +0800 Subject: [PATCH 1/3] only fallback to leader read with data-is-not-ready error Signed-off-by: you06 typo Signed-off-by: you06 add test Signed-off-by: you06 remove comment Signed-off-by: you06 remove comment Signed-off-by: you06 update PR Signed-off-by: you06 Update internal/locate/region_request.go Co-authored-by: cfzjywxk --- error/error.go | 4 +- internal/locate/region_request.go | 31 +++++++----- internal/locate/region_request3_test.go | 65 ++++++++++++++++++++++++- 3 files changed, 84 insertions(+), 16 deletions(-) diff --git a/error/error.go b/error/error.go index ff6754da2..860c7647c 100644 --- a/error/error.go +++ b/error/error.go @@ -93,8 +93,8 @@ var ( ErrRegionFlashbackInProgress = errors.New("region is in the flashback progress") // ErrRegionFlashbackNotPrepared is the error when a region is not prepared for the flashback first. ErrRegionFlashbackNotPrepared = errors.New("region is not prepared for the flashback") - // ErrUnknown is the unknow error. - ErrUnknown = errors.New("unknow") + // ErrUnknown is the unknown error. + ErrUnknown = errors.New("unknown") // ErrResultUndetermined is the error when execution result is unknown. ErrResultUndetermined = errors.New("execution result undetermined") ) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index f54320c08..71a32ec9f 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -517,11 +517,12 @@ func (state *tryNewProxy) onNoLeader(selector *replicaSelector) { type accessFollower struct { stateBase // If tryLeader is true, the request can also be sent to the leader. - tryLeader bool - isStaleRead bool - option storeSelectorOp - leaderIdx AccessIndex - lastIdx AccessIndex + tryLeader bool + isStaleRead bool + resetStaleRead bool + option storeSelectorOp + leaderIdx AccessIndex + lastIdx AccessIndex } func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { @@ -542,7 +543,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector } } else { // Stale Read request will retry the leader only by using the WithLeaderOnly option, - if state.isStaleRead { + if state.isStaleRead && state.resetStaleRead { WithLeaderOnly()(&state.option) // retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read. resetStaleRead = true @@ -617,7 +618,7 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool // The request can only be sent to the leader. ((state.option.leaderOnly && idx == state.leaderIdx) || // Choose a replica with matched labels. - (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels))) && + (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && (state.lastIdx >= 0 || replica.store.IsLabelsMatch(state.option.labels)))) && // Make sure the replica is not unreachable. replica.store.getLivenessState() != unreachable } @@ -1669,13 +1670,19 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()), zap.Stringer("ctx", ctx), ) - if !req.IsGlobalStaleRead() { - // only backoff local stale reads as global should retry immediately against the leader as a normal read - err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready")) - if err != nil { - return false, err + // stale reads should retry immediately against the leader as a normal read. + if s.replicaSelector != nil && s.replicaSelector.state != nil { + if af, ok := s.replicaSelector.state.(*accessFollower); ok && !af.resetStaleRead { + af.resetStaleRead = true + // always retry data-is-not-ready error. + return true, nil } } + // backoff other cases to avoid infinite retries. + err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready")) + if err != nil { + return false, err + } return true, nil } diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 850f24d30..d98f9e677 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -51,6 +51,7 @@ import ( "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" ) @@ -873,12 +874,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { failureOnFollower := 0 failureOnLeader := 0 s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + dataIsNotReadyErr := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}} if addr != s.cluster.GetStore(s.storeIDs[0]).Address { failureOnFollower++ - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil + return dataIsNotReadyErr, nil } else if failureOnLeader == 0 && i%2 == 0 { failureOnLeader++ - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil + return dataIsNotReadyErr, nil } else { return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil } @@ -1100,3 +1102,62 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { s.NotNil(regionErr.GetEpochNotMatch()) s.Nil(regionErr.GetDiskFull()) } + +func (s *testRegionRequestToThreeStoresSuite) TestResetStaleReadFallbackDataIsNotReadyOnly() { + originBoTiKVServerBusy := retry.BoTiKVServerBusy + defer func() { + retry.BoTiKVServerBusy = originBoTiKVServerBusy + }() + retry.BoTiKVServerBusy = retry.NewConfig("tikvServerBusy", &metrics.BackoffHistogramServerBusy, retry.NewBackoffFnCfg(2, 10, retry.EqualJitter), tikverr.ErrTiKVServerBusy) + + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + + tryTime := 0 + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + default: + } + if tryTime == 0 || addr == s.cluster.GetStore(s.storeIDs[0]).Address { + tryTime++ + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + ServerIsBusy: &errorpb.ServerIsBusy{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil + }} + + region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID()) + s.True(region.isValid()) + + for _, local := range []bool{false, true} { + tryTime = 0 + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + var ops []StoreSelectorOption + if !local { + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + } else { + ops = append(ops, WithMatchLabels([]*metapb.StoreLabel{ + { + Key: "id", + Value: "2", + }, + })) + } + req.EnableStaleRead() + + ctx, _ := context.WithTimeout(context.Background(), 1000*time.Second) + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + s.Equal(resp.Resp.(*kvrpcpb.GetResponse).Value, []byte("value")) + } +} From 31ad539d21dfb9867f2c4b3999d5e88b41fa14d5 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 28 Jul 2023 18:00:38 +0800 Subject: [PATCH 2/3] fix test Signed-off-by: you06 --- internal/locate/region_request.go | 11 ++++++----- internal/locate/region_request3_test.go | 6 ++++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 71a32ec9f..16ba84c80 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -552,10 +552,11 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector } reloadRegion := false + leaderExhaust := state.IsLeaderExhausted(selector.replicas[state.leaderIdx]) for i := 0; i < len(selector.replicas) && !state.option.leaderOnly; i++ { idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) selectReplica := selector.replicas[idx] - if state.isCandidate(idx, selectReplica) { + if state.isCandidate(idx, selectReplica, leaderExhaust) { state.lastIdx = idx selector.targetIdx = idx break @@ -575,7 +576,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector logutil.BgLogger().Warn("unable to find stores with given labels") } leader := selector.replicas[state.leaderIdx] - if leader.isEpochStale() || state.IsLeaderExhausted(leader) { + if leader.isEpochStale() || leaderExhaust { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil @@ -613,12 +614,12 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic } } -func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool { +func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica, leaderExhaust bool) bool { return !replica.isEpochStale() && !replica.isExhausted(1) && // The request can only be sent to the leader. ((state.option.leaderOnly && idx == state.leaderIdx) || // Choose a replica with matched labels. - (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && (state.lastIdx >= 0 || replica.store.IsLabelsMatch(state.option.labels)))) && + (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && (leaderExhaust || replica.store.IsLabelsMatch(state.option.labels)))) && // Make sure the replica is not unreachable. replica.store.getLivenessState() != unreachable } @@ -1681,7 +1682,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // backoff other cases to avoid infinite retries. err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready")) if err != nil { - return false, err + return false, err } return true, nil } diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index d98f9e677..32e2763e3 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -533,10 +533,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ { rpcCtx, err := replicaSelector.next(s.bo) s.Nil(err) - // Should swith to the next follower. + // Should switch to the next follower. s.NotEqual(lastIdx, state3.lastIdx) // Shouldn't access the leader if followers aren't exhausted. - s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx) + if regionStore.workTiKVIdx == state3.lastIdx { + s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx) + } s.Equal(replicaSelector.targetIdx, state3.lastIdx) assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) lastIdx = state3.lastIdx From 64749f8597dd6766121f574fc602a1dcb7d2b434 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 28 Jul 2023 19:29:52 +0800 Subject: [PATCH 3/3] clean code Signed-off-by: you06 --- internal/locate/region_request.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 16ba84c80..b82853bf0 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -615,13 +615,23 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic } func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica, leaderExhaust bool) bool { - return !replica.isEpochStale() && !replica.isExhausted(1) && + if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable { + return false + } + if state.option.leaderOnly { // The request can only be sent to the leader. - ((state.option.leaderOnly && idx == state.leaderIdx) || - // Choose a replica with matched labels. - (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && (leaderExhaust || replica.store.IsLabelsMatch(state.option.labels)))) && - // Make sure the replica is not unreachable. - replica.store.getLivenessState() != unreachable + if idx == state.leaderIdx { + return true + } + } else if !state.tryLeader && idx == state.leaderIdx { + // The request cannot be sent to leader. + return false + } + // the replica should match labels, but when leader is exhausted, we need to try the rest available replica. + if leaderExhaust || replica.store.IsLabelsMatch(state.option.labels) { + return true + } + return false } type invalidStore struct {