Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add region cache state test & fix some issues of replica selector #910

Merged
merged 3 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,12 @@ func (c *RegionCache) Close() {
c.cancelFunc()
}

var reloadRegionInterval = int64(10 * time.Second)

// asyncCheckAndResolveLoop with
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
reloadRegionTicker := time.NewTicker(10 * time.Second)
reloadRegionTicker := time.NewTicker(time.Duration(atomic.LoadInt64(&reloadRegionInterval)))
defer func() {
ticker.Stop()
reloadRegionTicker.Stop()
Expand Down
61 changes: 42 additions & 19 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
return rpcCtx, err
}
if state.fallbackFromLeader {
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
replicaRead := true
rpcCtx.contextPatcher.replicaRead = &replicaRead
}
Expand Down Expand Up @@ -562,6 +564,10 @@ type accessFollower struct {
lastIdx AccessIndex
}

// Follower read will try followers first, if no follower is available, it will fallback to leader.
// Specially, for stale read, it tries local peer(can be either leader or follower), then use snapshot read in the leader,
// if the leader read receive server-is-busy and connection errors, the region cache is still valid,
// and the state will be changed to tryFollower, which will read by replica read.
func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
resetStaleRead := false
if state.lastIdx < 0 {
Expand Down Expand Up @@ -609,14 +615,30 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
leaderInvalid := leader.isEpochStale() || state.IsLeaderExhausted(leader)
leaderEpochStale := leader.isEpochStale()
leaderInvalid := leaderEpochStale || state.IsLeaderExhausted(leader)
Copy link
Contributor

@cfzjywxk cfzjywxk Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could refactor the implememtation of state.IsLeaderExhausted, cheking if the leader is exausted by the current states instead of leader.isExhausted(2).

if len(state.option.labels) > 0 {
logutil.BgLogger().Warn("unable to find stores with given labels",
zap.Uint64("region", selector.region.GetID()),
zap.Bool("leader-invalid", leaderInvalid),
zap.Any("labels", state.option.labels))
}
if leaderInvalid {
// In stale-read, the request will fallback to leader after the local follower failure.
// If the leader is also unavailable, we can fallback to the follower and use replica-read flag again,
// The remote follower not tried yet, and the local follower can retry without stale-read flag.
if state.isStaleRead {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be printed in the above log so we know if tryFollower happens in current turn, so is the fallbackFromLeader state.

selector.state = &tryFollower{
fallbackFromLeader: true,
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
}
if leaderEpochStale {
selector.regionCache.scheduleReloadRegion(selector.region)
}
return nil, stateChanged{}
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand Down Expand Up @@ -655,13 +677,17 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic
}

func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
return !replica.isEpochStale() && !replica.isExhausted(1) &&
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
return false
}
if state.option.leaderOnly && idx == state.leaderIdx {
// The request can only be sent to the leader.
((state.option.leaderOnly && idx == state.leaderIdx) ||
// Choose a replica with matched labels.
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels))) &&
// Make sure the replica is not unreachable.
replica.store.getLivenessState() != unreachable
return true
} else if !state.tryLeader && idx == state.leaderIdx {
// The request cannot be sent to leader.
return false
}
return replica.store.IsLabelsMatch(state.option.labels)
}

type invalidStore struct {
Expand Down Expand Up @@ -930,25 +956,21 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
s.region.invalidate(StoreNotFound)
}

// For some reason, the leader is unreachable by now, try followers instead.
func (s *replicaSelector) fallback2Follower(ctx *RPCContext) bool {
if ctx == nil || s == nil || s.state == nil {
// For some reasons, the leader is unreachable by now, try followers instead.
// the state is changed in accessFollower.next when leader is unavailable.
func (s *replicaSelector) canFallback2Follower() bool {
if s == nil || s.state == nil {
return false
}
state, ok := s.state.(*accessFollower)
if !ok {
return false
}
if state.lastIdx != state.leaderIdx {
if !state.isStaleRead {
return false
}
s.state = &tryFollower{
fallbackFromLeader: true,
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
}
return true
// can fallback to follower only when the leader is exhausted.
return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx])
}

func (s *replicaSelector) invalidateRegion() {
Expand Down Expand Up @@ -1680,6 +1702,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
}

// This peer is removed from the region. Invalidate the region since it's too stale.
// if the region error is from follower, can we mark the peer unavailable and reload region asynchronously?
if regionErr.GetRegionNotFound() != nil {
s.regionCache.InvalidateCachedRegion(ctx.Region)
return false, nil
Expand All @@ -1706,7 +1729,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx))
if s.replicaSelector.fallback2Follower(ctx) {
if s.replicaSelector.canFallback2Follower() {
// immediately retry on followers.
return true, nil
}
Expand Down
Loading