From 25d08d33608b9cd16a2d67b0cb24f29495ca885e Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 7 Aug 2023 17:03:15 +0800 Subject: [PATCH 1/2] group replica selector logging and split not leader errors Signed-off-by: cfzjywxk --- internal/locate/region_request.go | 130 +++++++++++++++--------- internal/locate/region_request3_test.go | 28 +++++ 2 files changed, 108 insertions(+), 50 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 3bf19cb98..9f7a8b0ff 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -112,6 +112,10 @@ type RegionRequestSender struct { RegionRequestRuntimeStats } +func (s *RegionRequestSender) String() string { + return fmt.Sprintf("{replicaSelector: %v}", s.replicaSelector.String()) +} + // RegionRequestRuntimeStats records the runtime stats of send region requests. type RegionRequestRuntimeStats struct { Stats map[tikvrpc.CmdType]*RPCRuntimeStats @@ -273,6 +277,64 @@ type replicaSelector struct { busyThreshold time.Duration } +func selectorStateToString(state selectorState) string { + replicaSelectorState := "nil" + if state != nil { + switch state.(type) { + case *accessKnownLeader: + replicaSelectorState = "accessKnownLeader" + case *accessFollower: + replicaSelectorState = "accessFollower" + case *accessByKnownProxy: + replicaSelectorState = "accessByKnownProxy" + case *tryFollower: + replicaSelectorState = "tryFollower" + case *tryNewProxy: + replicaSelectorState = "tryNewProxy" + case *invalidLeader: + replicaSelectorState = "invalidLeader" + case *invalidStore: + replicaSelectorState = "invalidStore" + case *stateBase: + replicaSelectorState = "stateBase" + case nil: + replicaSelectorState = "nil" + } + } + return replicaSelectorState +} + +func (s *replicaSelector) String() string { + var replicaStatus []string + cacheRegionIsValid := "unknown" + selectorStateStr := "nil" + if s != nil { + selectorStateStr = selectorStateToString(s.state) + if s.region != nil { + if s.region.isValid() { + cacheRegionIsValid = "true" + } else { + cacheRegionIsValid = "false" + } + } + for _, replica := range s.replicas { + replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, "+ + "attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", + replica.peer.GetId(), + replica.store.storeID, + replica.isEpochStale(), + replica.attempts, + replica.epoch, + atomic.LoadUint32(&replica.store.epoch), + replica.store.getResolveState(), + replica.store.getLivenessState(), + )) + } + } + + return fmt.Sprintf("replicaSelector{selectorStateStr: %v, cacheRegionIsValid: %v, replicaStatus: %v}", selectorStateStr, cacheRegionIsValid, replicaStatus) +} + // selectorState is the interface of states of the replicaSelector. // Here is the main state transition diagram: // @@ -1401,8 +1463,8 @@ func (s *RegionRequestSender) SendReqCtx( return nil, nil, retryTimes, err } if regionErr != nil { - regionErrLabel := regionErrorToLabel(regionErr) - totalErrors[regionErrLabel]++ + regionErrLogging := regionErrorToLogging(rpcCtx.Peer.GetId(), regionErr) + totalErrors[regionErrLogging]++ retry, err = s.onRegionError(bo, rpcCtx, req, regionErr) if err != nil { msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) @@ -1427,50 +1489,6 @@ func (s *RegionRequestSender) SendReqCtx( } func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) { - var replicaStatus []string - replicaSelectorState := "nil" - cacheRegionIsValid := "unknown" - if s.replicaSelector != nil { - switch s.replicaSelector.state.(type) { - case *accessKnownLeader: - replicaSelectorState = "accessKnownLeader" - case *accessFollower: - replicaSelectorState = "accessFollower" - case *accessByKnownProxy: - replicaSelectorState = "accessByKnownProxy" - case *tryFollower: - replicaSelectorState = "tryFollower" - case *tryNewProxy: - replicaSelectorState = "tryNewProxy" - case *invalidLeader: - replicaSelectorState = "invalidLeader" - case *invalidStore: - replicaSelectorState = "invalidStore" - case *stateBase: - replicaSelectorState = "stateBase" - case nil: - replicaSelectorState = "nil" - } - if s.replicaSelector.region != nil { - if s.replicaSelector.region.isValid() { - cacheRegionIsValid = "true" - } else { - cacheRegionIsValid = "false" - } - } - for _, replica := range s.replicaSelector.replicas { - replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", - replica.peer.GetId(), - replica.store.storeID, - replica.isEpochStale(), - replica.attempts, - replica.epoch, - atomic.LoadUint32(&replica.store.epoch), - replica.store.getResolveState(), - replica.store.getLivenessState(), - )) - } - } var totalErrorStr bytes.Buffer for err, cnt := range totalErrors { if totalErrorStr.Len() > 0 { @@ -1484,12 +1502,10 @@ func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, r zap.Uint64("req-ts", req.GetStartTS()), zap.String("req-type", req.Type.String()), zap.String("region", regionID.String()), - zap.String("region-is-valid", cacheRegionIsValid), - zap.Int("retry-times", retryTimes), zap.String("replica-read-type", req.ReplicaReadType.String()), - zap.String("replica-selector-state", replicaSelectorState), zap.Bool("stale-read", req.StaleRead), - zap.String("replica-status", strings.Join(replicaStatus, "; ")), + zap.Stringer("request-sender", s), + zap.Int("retry-times", retryTimes), zap.Int("total-backoff-ms", bo.GetTotalSleep()), zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()), zap.String("total-region-errors", totalErrorStr.String())) @@ -1840,6 +1856,20 @@ func (s *RegionRequestSender) NeedReloadRegion(ctx *RPCContext) (need bool) { return } +// regionErrorToLogging constructs the logging content with extra information like returned leader peer id. +func regionErrorToLogging(peerID uint64, e *errorpb.Error) string { + str := regionErrorToLabel(e) + if e.GetNotLeader() != nil { + notLeader := e.GetNotLeader() + if notLeader.GetLeader() != nil { + str = fmt.Sprintf("%v-%v", str, notLeader.GetLeader().GetId()) + } else { + str = fmt.Sprintf("%v-nil", str) + } + } + return fmt.Sprintf("%v-%v", peerID, str) +} + func regionErrorToLabel(e *errorpb.Error) string { if e.GetNotLeader() != nil { return "not_leader" diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 28bc43e87..d1549b946 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1425,3 +1425,31 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg // after region error returned, the region should be invalidated. s.False(region.isValid()) } + +func (s *testRegionRequestToThreeStoresSuite) TestLogging() { + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ + Key: []byte("key"), + }) + region, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(region) + + oc := s.regionRequestSender.client + defer func() { + s.regionRequestSender.client = oc + }() + + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ + RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}, + }} + return response, nil + }} + + bo := retry.NewBackofferWithVars(context.Background(), 5, nil) + resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) + s.Nil(err) + s.NotNil(resp) + regionErr, _ := resp.GetRegionError() + s.NotNil(regionErr) +} From 9f88a2c6ad35910d7a23794d0acb4cc7d2226a6f Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Fri, 11 Aug 2023 16:59:56 +0800 Subject: [PATCH 2/2] use atomic to read epoch Signed-off-by: cfzjywxk --- internal/locate/region_request.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 9f7a8b0ff..c22f782e1 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -253,6 +253,10 @@ type replica struct { deadlineErrUsingConfTimeout bool } +func (r *replica) getEpoch() uint32 { + return atomic.LoadUint32(&r.epoch) +} + func (r *replica) isEpochStale() bool { return r.epoch != atomic.LoadUint32(&r.store.epoch) } @@ -324,7 +328,7 @@ func (s *replicaSelector) String() string { replica.store.storeID, replica.isEpochStale(), replica.attempts, - replica.epoch, + replica.getEpoch(), atomic.LoadUint32(&replica.store.epoch), replica.store.getResolveState(), replica.store.getLivenessState(),