Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only fallback to leader read with data-is-not-ready error #907

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ var (
ErrRegionFlashbackInProgress = errors.New("region is in the flashback progress")
// ErrRegionFlashbackNotPrepared is the error when a region is not prepared for the flashback first.
ErrRegionFlashbackNotPrepared = errors.New("region is not prepared for the flashback")
// ErrUnknown is the unknow error.
ErrUnknown = errors.New("unknow")
// ErrUnknown is the unknown error.
ErrUnknown = errors.New("unknown")
// ErrResultUndetermined is the error when execution result is unknown.
ErrResultUndetermined = errors.New("execution result undetermined")
)
Expand Down
58 changes: 38 additions & 20 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,11 +517,12 @@ func (state *tryNewProxy) onNoLeader(selector *replicaSelector) {
type accessFollower struct {
stateBase
// If tryLeader is true, the request can also be sent to the leader.
tryLeader bool
isStaleRead bool
option storeSelectorOp
leaderIdx AccessIndex
lastIdx AccessIndex
tryLeader bool
isStaleRead bool
resetStaleRead bool
option storeSelectorOp
leaderIdx AccessIndex
lastIdx AccessIndex
}

func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
Expand All @@ -542,7 +543,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}
} else {
// Stale Read request will retry the leader only by using the WithLeaderOnly option,
if state.isStaleRead {
if state.isStaleRead && state.resetStaleRead {
WithLeaderOnly()(&state.option)
// retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read.
resetStaleRead = true
Expand All @@ -551,10 +552,11 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}

reloadRegion := false
leaderExhaust := state.IsLeaderExhausted(selector.replicas[state.leaderIdx])
for i := 0; i < len(selector.replicas) && !state.option.leaderOnly; i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
selectReplica := selector.replicas[idx]
if state.isCandidate(idx, selectReplica) {
if state.isCandidate(idx, selectReplica, leaderExhaust) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The leaderExhaust which is the result of state.IsLeaderExhausted should not be used here in the isCandidate function as the leaderExhaust checks isExhausted with 2 and the isCandidate checks only 1. The leaderExhaust is designed to be only used in the L574 if selector.targetIdx < 0 { branch to allow an extra retry on the leader peer if no peers are available.

Copy link
Contributor

@cfzjywxk cfzjywxk Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still trying to figure out the results of the different combinations here and if they all work as expected...
The conditions of combinations include:

  1. A stale read or a follower read.
  2. The local peer matching label is the leader peer or not.
  3. No fallback happens.
  4. After falling back to leader read, if the leader is still unavailable, if the error would be returned to upper layer as expected.

Quite difficult to maintain by now...

state.lastIdx = idx
selector.targetIdx = idx
break
Expand All @@ -574,7 +576,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
logutil.BgLogger().Warn("unable to find stores with given labels")
}
leader := selector.replicas[state.leaderIdx]
if leader.isEpochStale() || state.IsLeaderExhausted(leader) {
if leader.isEpochStale() || leaderExhaust {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand Down Expand Up @@ -612,14 +614,24 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic
}
}

func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
return !replica.isEpochStale() && !replica.isExhausted(1) &&
func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica, leaderExhaust bool) bool {
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
return false
}
if state.option.leaderOnly {
// The request can only be sent to the leader.
((state.option.leaderOnly && idx == state.leaderIdx) ||
// Choose a replica with matched labels.
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels))) &&
// Make sure the replica is not unreachable.
replica.store.getLivenessState() != unreachable
if idx == state.leaderIdx {
return true
}
} else if !state.tryLeader && idx == state.leaderIdx {
// The request cannot be sent to leader.
return false
}
// the replica should match labels, but when leader is exhausted, we need to try the rest available replica.
if leaderExhaust || replica.store.IsLabelsMatch(state.option.labels) {
return true
}
return false
}

type invalidStore struct {
Expand Down Expand Up @@ -1669,13 +1681,19 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()),
zap.Stringer("ctx", ctx),
)
if !req.IsGlobalStaleRead() {
// only backoff local stale reads as global should retry immediately against the leader as a normal read
err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready"))
if err != nil {
return false, err
// stale reads should retry immediately against the leader as a normal read.
if s.replicaSelector != nil && s.replicaSelector.state != nil {
if af, ok := s.replicaSelector.state.(*accessFollower); ok && !af.resetStaleRead {
af.resetStaleRead = true
// always retry data-is-not-ready error.
return true, nil
}
}
// backoff other cases to avoid infinite retries.
err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready"))
if err != nil {
return false, err
}
return true, nil
}

Expand Down
71 changes: 67 additions & 4 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
)
Expand Down Expand Up @@ -532,10 +533,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ {
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
// Should swith to the next follower.
// Should switch to the next follower.
s.NotEqual(lastIdx, state3.lastIdx)
// Shouldn't access the leader if followers aren't exhausted.
s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx)
if regionStore.workTiKVIdx == state3.lastIdx {
s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx)
}
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
lastIdx = state3.lastIdx
Expand Down Expand Up @@ -873,12 +876,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
failureOnFollower := 0
failureOnLeader := 0
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
dataIsNotReadyErr := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}}
if addr != s.cluster.GetStore(s.storeIDs[0]).Address {
failureOnFollower++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
return dataIsNotReadyErr, nil
} else if failureOnLeader == 0 && i%2 == 0 {
failureOnLeader++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
return dataIsNotReadyErr, nil
} else {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil
}
Expand Down Expand Up @@ -1100,3 +1104,62 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
s.NotNil(regionErr.GetEpochNotMatch())
s.Nil(regionErr.GetDiskFull())
}

func (s *testRegionRequestToThreeStoresSuite) TestResetStaleReadFallbackDataIsNotReadyOnly() {
originBoTiKVServerBusy := retry.BoTiKVServerBusy
defer func() {
retry.BoTiKVServerBusy = originBoTiKVServerBusy
}()
retry.BoTiKVServerBusy = retry.NewConfig("tikvServerBusy", &metrics.BackoffHistogramServerBusy, retry.NewBackoffFnCfg(2, 10, retry.EqualJitter), tikverr.ErrTiKVServerBusy)

regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)

tryTime := 0
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
if tryTime == 0 || addr == s.cluster.GetStore(s.storeIDs[0]).Address {
tryTime++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
ServerIsBusy: &errorpb.ServerIsBusy{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil
}}

region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID())
s.True(region.isValid())

for _, local := range []bool{false, true} {
tryTime = 0
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
var ops []StoreSelectorOption
if !local {
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
} else {
ops = append(ops, WithMatchLabels([]*metapb.StoreLabel{
{
Key: "id",
Value: "2",
},
}))
}
req.EnableStaleRead()

ctx, _ := context.WithTimeout(context.Background(), 1000*time.Second)
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)
regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
s.Equal(resp.Resp.(*kvrpcpb.GetResponse).Value, []byte("value"))
}
}