Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log: group replica selector logging and split not leader errors #929

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 84 additions & 50 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ type RegionRequestSender struct {
RegionRequestRuntimeStats
}

func (s *RegionRequestSender) String() string {
return fmt.Sprintf("{replicaSelector: %v}", s.replicaSelector.String())
}

// RegionRequestRuntimeStats records the runtime stats of send region requests.
type RegionRequestRuntimeStats struct {
Stats map[tikvrpc.CmdType]*RPCRuntimeStats
Expand Down Expand Up @@ -249,6 +253,10 @@ type replica struct {
deadlineErrUsingConfTimeout bool
}

func (r *replica) getEpoch() uint32 {
return atomic.LoadUint32(&r.epoch)
}

func (r *replica) isEpochStale() bool {
return r.epoch != atomic.LoadUint32(&r.store.epoch)
}
Expand All @@ -273,6 +281,64 @@ type replicaSelector struct {
busyThreshold time.Duration
}

func selectorStateToString(state selectorState) string {
replicaSelectorState := "nil"
if state != nil {
switch state.(type) {
case *accessKnownLeader:
replicaSelectorState = "accessKnownLeader"
case *accessFollower:
replicaSelectorState = "accessFollower"
case *accessByKnownProxy:
replicaSelectorState = "accessByKnownProxy"
case *tryFollower:
replicaSelectorState = "tryFollower"
case *tryNewProxy:
replicaSelectorState = "tryNewProxy"
case *invalidLeader:
replicaSelectorState = "invalidLeader"
case *invalidStore:
replicaSelectorState = "invalidStore"
case *stateBase:
replicaSelectorState = "stateBase"
case nil:
replicaSelectorState = "nil"
}
}
return replicaSelectorState
}

func (s *replicaSelector) String() string {
var replicaStatus []string
cacheRegionIsValid := "unknown"
selectorStateStr := "nil"
if s != nil {
selectorStateStr = selectorStateToString(s.state)
if s.region != nil {
if s.region.isValid() {
cacheRegionIsValid = "true"
} else {
cacheRegionIsValid = "false"
}
}
for _, replica := range s.replicas {
replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, "+
"attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v",
replica.peer.GetId(),
replica.store.storeID,
replica.isEpochStale(),
replica.attempts,
replica.getEpoch(),
atomic.LoadUint32(&replica.store.epoch),
replica.store.getResolveState(),
replica.store.getLivenessState(),
))
}
}

return fmt.Sprintf("replicaSelector{selectorStateStr: %v, cacheRegionIsValid: %v, replicaStatus: %v}", selectorStateStr, cacheRegionIsValid, replicaStatus)
}

// selectorState is the interface of states of the replicaSelector.
// Here is the main state transition diagram:
//
Expand Down Expand Up @@ -1401,8 +1467,8 @@ func (s *RegionRequestSender) SendReqCtx(
return nil, nil, retryTimes, err
}
if regionErr != nil {
regionErrLabel := regionErrorToLabel(regionErr)
totalErrors[regionErrLabel]++
regionErrLogging := regionErrorToLogging(rpcCtx.Peer.GetId(), regionErr)
totalErrors[regionErrLogging]++
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr)
if err != nil {
msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error())
Expand All @@ -1427,50 +1493,6 @@ func (s *RegionRequestSender) SendReqCtx(
}

func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) {
var replicaStatus []string
replicaSelectorState := "nil"
cacheRegionIsValid := "unknown"
if s.replicaSelector != nil {
switch s.replicaSelector.state.(type) {
case *accessKnownLeader:
replicaSelectorState = "accessKnownLeader"
case *accessFollower:
replicaSelectorState = "accessFollower"
case *accessByKnownProxy:
replicaSelectorState = "accessByKnownProxy"
case *tryFollower:
replicaSelectorState = "tryFollower"
case *tryNewProxy:
replicaSelectorState = "tryNewProxy"
case *invalidLeader:
replicaSelectorState = "invalidLeader"
case *invalidStore:
replicaSelectorState = "invalidStore"
case *stateBase:
replicaSelectorState = "stateBase"
case nil:
replicaSelectorState = "nil"
}
if s.replicaSelector.region != nil {
if s.replicaSelector.region.isValid() {
cacheRegionIsValid = "true"
} else {
cacheRegionIsValid = "false"
}
}
for _, replica := range s.replicaSelector.replicas {
replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v",
replica.peer.GetId(),
replica.store.storeID,
replica.isEpochStale(),
replica.attempts,
replica.epoch,
atomic.LoadUint32(&replica.store.epoch),
replica.store.getResolveState(),
replica.store.getLivenessState(),
))
}
}
var totalErrorStr bytes.Buffer
for err, cnt := range totalErrors {
if totalErrorStr.Len() > 0 {
Expand All @@ -1484,12 +1506,10 @@ func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, r
zap.Uint64("req-ts", req.GetStartTS()),
zap.String("req-type", req.Type.String()),
zap.String("region", regionID.String()),
zap.String("region-is-valid", cacheRegionIsValid),
zap.Int("retry-times", retryTimes),
zap.String("replica-read-type", req.ReplicaReadType.String()),
zap.String("replica-selector-state", replicaSelectorState),
zap.Bool("stale-read", req.StaleRead),
zap.String("replica-status", strings.Join(replicaStatus, "; ")),
zap.Stringer("request-sender", s),
zap.Int("retry-times", retryTimes),
zap.Int("total-backoff-ms", bo.GetTotalSleep()),
zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()),
zap.String("total-region-errors", totalErrorStr.String()))
Expand Down Expand Up @@ -1840,6 +1860,20 @@ func (s *RegionRequestSender) NeedReloadRegion(ctx *RPCContext) (need bool) {
return
}

// regionErrorToLogging constructs the logging content with extra information like returned leader peer id.
func regionErrorToLogging(peerID uint64, e *errorpb.Error) string {
str := regionErrorToLabel(e)
if e.GetNotLeader() != nil {
notLeader := e.GetNotLeader()
if notLeader.GetLeader() != nil {
str = fmt.Sprintf("%v-%v", str, notLeader.GetLeader().GetId())
} else {
str = fmt.Sprintf("%v-nil", str)
}
}
return fmt.Sprintf("%v-%v", peerID, str)
}

func regionErrorToLabel(e *errorpb.Error) string {
if e.GetNotLeader() != nil {
return "not_leader"
Expand Down
28 changes: 28 additions & 0 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,3 +1425,31 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg
// after region error returned, the region should be invalidated.
s.False(region.isValid())
}

func (s *testRegionRequestToThreeStoresSuite) TestLogging() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a manual test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: []byte("key"),
})
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(region)

oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
}()

s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}},
}}
return response, nil
}}

bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
regionErr, _ := resp.GetRegionError()
s.NotNil(regionErr)
}