From 74012c086324e4ec13f3c629d0b83376c7195859 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 26 Jul 2023 16:02:20 +0800 Subject: [PATCH 1/9] add more log for diagnose Signed-off-by: crazycs520 --- internal/locate/region_cache.go | 36 +++++++++++++++++++++++++++++++ internal/locate/region_request.go | 19 ++++++++++++---- internal/retry/backoff.go | 14 ++++++++++++ kv/store_vars.go | 13 +++++++++++ 4 files changed, 78 insertions(+), 4 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 8482c81d0..4d36c364a 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2322,6 +2322,23 @@ const ( tombstone ) +func (s resolveState) String() string { + switch s { + case unresolved: + return "unresolved" + case resolved: + return "resolved" + case needCheck: + return "needCheck" + case deleted: + return "deleted" + case tombstone: + return "tombstone" + default: + return fmt.Sprintf("unknown-%v", s) + } +} + // IsTiFlash returns true if the storeType is TiFlash func (s *Store) IsTiFlash() bool { return s.storeType == tikvrpc.TiFlash @@ -2456,6 +2473,12 @@ func (s *Store) changeResolveStateTo(from, to resolveState) bool { return false } if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) { + logutil.BgLogger().Info("change store resolve state", + zap.Uint64("store", s.storeID), + zap.String("addr", s.addr), + zap.String("from", from.String()), + zap.String("to", to.String()), + zap.String("liveness-state", s.getLivenessState().String())) return true } } @@ -2543,6 +2566,19 @@ const ( unknown ) +func (s livenessState) String() string { + switch s { + case unreachable: + return "unreachable" + case reachable: + return "reachable" + case unknown: + return "unknown" + default: + return fmt.Sprintf("unknown-%v", s) + } +} + func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) { // This mechanism doesn't support non-TiKV stores currently. if s.storeType != tikvrpc.TiKV { diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 1edad08e8..75d0bbd13 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -570,11 +570,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector } // If there is no candidate, fallback to the leader. if selector.targetIdx < 0 { + leader := selector.replicas[state.leaderIdx] + leaderInvalid := leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) if len(state.option.labels) > 0 { - logutil.BgLogger().Warn("unable to find stores with given labels") + logutil.BgLogger().Warn("unable to find stores with given labels", + zap.Uint64("region", selector.region.GetID()), + zap.Bool("leader-invalid", leaderInvalid)) + } - leader := selector.replicas[state.leaderIdx] - if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) { + if leaderInvalid { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil @@ -1041,7 +1045,14 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. - logutil.Logger(bo.GetCtx()).Debug("throwing pseudo region error due to region not found in cache", zap.Stringer("region", ®ionID)) + logutil.Logger(bo.GetCtx()).Info("throwing pseudo region error due to no replica available", + zap.String("region", regionID.String()), + zap.Int("try-times", tryTimes), + zap.String("replica-read-type", req.ReplicaReadType.String()), + zap.Bool("stale-read", req.StaleRead), + zap.Int("total-backoff-ms", bo.GetTotalSleep()), + zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()), + ) resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) return resp, nil, err } diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go index 6a27d0593..bdefc7993 100644 --- a/internal/retry/backoff.go +++ b/internal/retry/backoff.go @@ -35,9 +35,11 @@ package retry import ( + "bytes" "context" "fmt" "math" + "strconv" "strings" "sync/atomic" "time" @@ -150,6 +152,18 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e errMsg += "\n" + err.Error() } } + var backoffDetail bytes.Buffer + totalTimes := 0 + for name, times := range b.backoffTimes { + totalTimes += times + if backoffDetail.Len() > 0 { + backoffDetail.WriteString(", ") + } + backoffDetail.WriteString(name) + backoffDetail.WriteString(":") + backoffDetail.WriteString(strconv.Itoa(times)) + } + errMsg += fmt.Sprintf("\ntotal-backoff-times: %v, backoff-detail: %v", totalTimes, backoffDetail.String()) returnedErr := err if longestSleepCfg != nil { errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime) diff --git a/kv/store_vars.go b/kv/store_vars.go index d0abff21d..7f7ba2156 100644 --- a/kv/store_vars.go +++ b/kv/store_vars.go @@ -68,3 +68,16 @@ const ( func (r ReplicaReadType) IsFollowerRead() bool { return r != ReplicaReadLeader } + +func (r ReplicaReadType) String() string { + switch r { + case ReplicaReadLeader: + return "leader" + case ReplicaReadFollower: + return "follower" + case ReplicaReadMixed: + return "mixed" + default: + return "unknown" + } +} From eaf2176e35e76edb8bc3ef2912a29c6f1295b390 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 26 Jul 2023 16:15:33 +0800 Subject: [PATCH 2/9] add comment Signed-off-by: crazycs520 --- internal/locate/region_cache.go | 2 ++ kv/store_vars.go | 1 + 2 files changed, 3 insertions(+) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 4d36c364a..5c3b49cad 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2322,6 +2322,7 @@ const ( tombstone ) +// String implements fmt.Stringer interface. func (s resolveState) String() string { switch s { case unresolved: @@ -2566,6 +2567,7 @@ const ( unknown ) +// String implements fmt.Stringer interface. func (s livenessState) String() string { switch s { case unreachable: diff --git a/kv/store_vars.go b/kv/store_vars.go index 7f7ba2156..db5c661d0 100644 --- a/kv/store_vars.go +++ b/kv/store_vars.go @@ -69,6 +69,7 @@ func (r ReplicaReadType) IsFollowerRead() bool { return r != ReplicaReadLeader } +// String implements fmt.Stringer interface. func (r ReplicaReadType) String() string { switch r { case ReplicaReadLeader: From dd2655f10b266248a90c5c62ab2f20160bdc1a16 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 26 Jul 2023 16:28:32 +0800 Subject: [PATCH 3/9] refine Signed-off-by: crazycs520 --- kv/store_vars.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kv/store_vars.go b/kv/store_vars.go index db5c661d0..2c8befd30 100644 --- a/kv/store_vars.go +++ b/kv/store_vars.go @@ -35,6 +35,8 @@ package kv import ( + "fmt" + "go.uber.org/atomic" ) @@ -79,6 +81,6 @@ func (r ReplicaReadType) String() string { case ReplicaReadMixed: return "mixed" default: - return "unknown" + return fmt.Sprintf("unknown-%v", r) } } From 7aab48410c9ecc20b1933b6970e26f63911ccf8e Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 26 Jul 2023 16:53:40 +0800 Subject: [PATCH 4/9] add log Signed-off-by: crazycs520 --- internal/locate/region_request.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 75d0bbd13..e568c6bae 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1045,8 +1045,17 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. + cacheRegionIsValid := "unknown" + if s.replicaSelector.region != nil { + if s.replicaSelector.region.isValid() { + cacheRegionIsValid = "valid" + } else { + cacheRegionIsValid = "invalid" + } + } logutil.Logger(bo.GetCtx()).Info("throwing pseudo region error due to no replica available", zap.String("region", regionID.String()), + zap.String("region-valid", cacheRegionIsValid), zap.Int("try-times", tryTimes), zap.String("replica-read-type", req.ReplicaReadType.String()), zap.Bool("stale-read", req.StaleRead), From 8344ba8afa5ba59edece6be87189e82b543725cd Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 26 Jul 2023 16:57:44 +0800 Subject: [PATCH 5/9] update Signed-off-by: crazycs520 --- internal/locate/region_request.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index e568c6bae..c4325aae7 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1048,14 +1048,14 @@ func (s *RegionRequestSender) SendReqCtx( cacheRegionIsValid := "unknown" if s.replicaSelector.region != nil { if s.replicaSelector.region.isValid() { - cacheRegionIsValid = "valid" + cacheRegionIsValid = "true" } else { - cacheRegionIsValid = "invalid" + cacheRegionIsValid = "false" } } logutil.Logger(bo.GetCtx()).Info("throwing pseudo region error due to no replica available", zap.String("region", regionID.String()), - zap.String("region-valid", cacheRegionIsValid), + zap.String("region-is-valid", cacheRegionIsValid), zap.Int("try-times", tryTimes), zap.String("replica-read-type", req.ReplicaReadType.String()), zap.Bool("stale-read", req.StaleRead), From 4b2a4996fecaacaf4b93aa71b988eac73a306b62 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 26 Jul 2023 17:04:54 +0800 Subject: [PATCH 6/9] fix Signed-off-by: crazycs520 --- internal/locate/region_cache.go | 4 ++-- kv/store_vars.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 5c3b49cad..2d0d76e84 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2336,7 +2336,7 @@ func (s resolveState) String() string { case tombstone: return "tombstone" default: - return fmt.Sprintf("unknown-%v", s) + return fmt.Sprintf("unknown-%v", uint64(s)) } } @@ -2577,7 +2577,7 @@ func (s livenessState) String() string { case unknown: return "unknown" default: - return fmt.Sprintf("unknown-%v", s) + return fmt.Sprintf("unknown-%v", uint32(s)) } } diff --git a/kv/store_vars.go b/kv/store_vars.go index 2c8befd30..f66cbaff7 100644 --- a/kv/store_vars.go +++ b/kv/store_vars.go @@ -81,6 +81,6 @@ func (r ReplicaReadType) String() string { case ReplicaReadMixed: return "mixed" default: - return fmt.Sprintf("unknown-%v", r) + return fmt.Sprintf("unknown-%v", byte(r)) } } From fea5760317c33251f5980a3be6d5ba2497367128 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 26 Jul 2023 17:35:10 +0800 Subject: [PATCH 7/9] refine Signed-off-by: crazycs520 --- internal/locate/region_request.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c4325aae7..4f7ab51e0 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -575,8 +575,8 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if len(state.option.labels) > 0 { logutil.BgLogger().Warn("unable to find stores with given labels", zap.Uint64("region", selector.region.GetID()), - zap.Bool("leader-invalid", leaderInvalid)) - + zap.Bool("leader-invalid", leaderInvalid), + zap.Any("labels", state.option.labels)) } if leaderInvalid { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() From 04da8398f4009f20cfdfcb0507998c700a9eaa7e Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 26 Jul 2023 18:01:04 +0800 Subject: [PATCH 8/9] fix Signed-off-by: crazycs520 --- internal/locate/region_request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index d02ed94c0..8cb586e94 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1059,7 +1059,7 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. cacheRegionIsValid := "unknown" - if s.replicaSelector.region != nil { + if s.replicaSelector != nil && s.replicaSelector.region != nil { if s.replicaSelector.region.isValid() { cacheRegionIsValid = "true" } else { From a030a54001bc5f41308d5256eae1afaf70d27971 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 31 Jul 2023 12:03:08 +0800 Subject: [PATCH 9/9] update Signed-off-by: crazycs520 --- internal/locate/region_request.go | 96 +++++++++++++++++++++++++------ tikvrpc/tikvrpc.go | 48 ++++++++++++++++ 2 files changed, 127 insertions(+), 17 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index e2a839963..6feb7c884 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -35,6 +35,7 @@ package locate import ( + "bytes" "context" "fmt" "math/rand" @@ -1088,6 +1089,7 @@ func (s *RegionRequestSender) SendReqCtx( }() } + totalErrors := make(map[string]int) for { if tryTimes > 0 { req.IsRetryRequest = true @@ -1116,23 +1118,7 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. - cacheRegionIsValid := "unknown" - if s.replicaSelector != nil && s.replicaSelector.region != nil { - if s.replicaSelector.region.isValid() { - cacheRegionIsValid = "true" - } else { - cacheRegionIsValid = "false" - } - } - logutil.Logger(bo.GetCtx()).Info("throwing pseudo region error due to no replica available", - zap.String("region", regionID.String()), - zap.String("region-is-valid", cacheRegionIsValid), - zap.Int("try-times", tryTimes), - zap.String("replica-read-type", req.ReplicaReadType.String()), - zap.Bool("stale-read", req.StaleRead), - zap.Int("total-backoff-ms", bo.GetTotalSleep()), - zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()), - ) + s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, tryTimes, req, totalErrors) resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) return resp, nil, err } @@ -1158,6 +1144,8 @@ func (s *RegionRequestSender) SendReqCtx( var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) if err != nil { + msg := fmt.Sprintf("send request failed, err: %v", err.Error()) + s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors) return nil, nil, err } @@ -1182,14 +1170,19 @@ func (s *RegionRequestSender) SendReqCtx( return nil, nil, err } if regionErr != nil { + regionErrLabel := regionErrorToLabel(regionErr) + totalErrors[regionErrLabel]++ retry, err = s.onRegionError(bo, rpcCtx, req, regionErr) if err != nil { + msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) + s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors) return nil, nil, err } if retry { tryTimes++ continue } + s.logSendReqError(bo, "send request meet region error without retry", regionID, tryTimes, req, totalErrors) } else { if s.replicaSelector != nil { s.replicaSelector.onSendSuccess() @@ -1202,6 +1195,75 @@ func (s *RegionRequestSender) SendReqCtx( } } +func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) { + var replicaStatus []string + replicaSelectorState := "nil" + cacheRegionIsValid := "unknown" + if s.replicaSelector != nil { + switch s.replicaSelector.state.(type) { + case *accessKnownLeader: + replicaSelectorState = "accessKnownLeader" + case *accessFollower: + replicaSelectorState = "accessFollower" + case *accessByKnownProxy: + replicaSelectorState = "accessByKnownProxy" + case *tryFollower: + replicaSelectorState = "tryFollower" + case *tryNewProxy: + replicaSelectorState = "tryNewProxy" + case *invalidLeader: + replicaSelectorState = "invalidLeader" + case *invalidStore: + replicaSelectorState = "invalidStore" + case *stateBase: + replicaSelectorState = "stateBase" + case nil: + replicaSelectorState = "nil" + } + if s.replicaSelector.region != nil { + if s.replicaSelector.region.isValid() { + cacheRegionIsValid = "true" + } else { + cacheRegionIsValid = "false" + } + } + for _, replica := range s.replicaSelector.replicas { + replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", + replica.peer.GetId(), + replica.store.storeID, + replica.isEpochStale(), + replica.attempts, + replica.epoch, + atomic.LoadUint32(&replica.store.epoch), + replica.store.getResolveState(), + replica.store.getLivenessState(), + )) + } + } + var totalErrorStr bytes.Buffer + for err, cnt := range totalErrors { + if totalErrorStr.Len() > 0 { + totalErrorStr.WriteString(", ") + } + totalErrorStr.WriteString(err) + totalErrorStr.WriteString(":") + totalErrorStr.WriteString(strconv.Itoa(cnt)) + } + logutil.Logger(bo.GetCtx()).Info(msg, + zap.Uint64("req-ts", req.GetStartTS()), + zap.String("req-type", req.Type.String()), + zap.String("region", regionID.String()), + zap.String("region-is-valid", cacheRegionIsValid), + zap.Int("retry-times", retryTimes), + zap.String("replica-read-type", req.ReplicaReadType.String()), + zap.String("replica-selector-state", replicaSelectorState), + zap.Bool("stale-read", req.StaleRead), + zap.String("replica-status", strings.Join(replicaStatus, "; ")), + zap.Int("total-backoff-ms", bo.GetTotalSleep()), + zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()), + zap.String("total-region-errors", totalErrorStr.String())) +} + // RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx. type RPCCancellerCtxKey struct{} diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index bc78d8b65..33a4e05dd 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -1271,3 +1271,51 @@ func (req *Request) IsTxnWriteRequest() bool { // ResourceGroupTagger is used to fill the ResourceGroupTag in the kvrpcpb.Context. type ResourceGroupTagger func(req *Request) + +// GetStartTS returns the `start_ts` of the request. +func (req *Request) GetStartTS() uint64 { + switch req.Type { + case CmdGet: + return req.Get().GetVersion() + case CmdScan: + return req.Scan().GetVersion() + case CmdPrewrite: + return req.Prewrite().GetStartVersion() + case CmdCommit: + return req.Commit().GetStartVersion() + case CmdCleanup: + return req.Cleanup().GetStartVersion() + case CmdBatchGet: + return req.BatchGet().GetVersion() + case CmdBatchRollback: + return req.BatchRollback().GetStartVersion() + case CmdScanLock: + return req.ScanLock().GetMaxVersion() + case CmdResolveLock: + return req.ResolveLock().GetStartVersion() + case CmdPessimisticLock: + return req.PessimisticLock().GetStartVersion() + case CmdPessimisticRollback: + return req.PessimisticRollback().GetStartVersion() + case CmdTxnHeartBeat: + return req.TxnHeartBeat().GetStartVersion() + case CmdCheckTxnStatus: + return req.CheckTxnStatus().GetLockTs() + case CmdCheckSecondaryLocks: + return req.CheckSecondaryLocks().GetStartVersion() + case CmdFlashbackToVersion: + return req.FlashbackToVersion().GetStartTs() + case CmdPrepareFlashbackToVersion: + req.PrepareFlashbackToVersion().GetStartTs() + case CmdCop: + return req.Cop().GetStartTs() + case CmdCopStream: + return req.Cop().GetStartTs() + case CmdBatchCop: + return req.BatchCop().GetStartTs() + case CmdMvccGetByStartTs: + return req.MvccGetByStartTs().GetStartTs() + default: + } + return 0 +}