Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more log for diagnose (#915) #931

Merged
merged 1 commit into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2392,6 +2392,24 @@ const (
tombstone
)

// String implements fmt.Stringer interface.
func (s resolveState) String() string {
switch s {
case unresolved:
return "unresolved"
case resolved:
return "resolved"
case needCheck:
return "needCheck"
case deleted:
return "deleted"
case tombstone:
return "tombstone"
default:
return fmt.Sprintf("unknown-%v", uint64(s))
}
}

// IsTiFlash returns true if the storeType is TiFlash
func (s *Store) IsTiFlash() bool {
return s.storeType == tikvrpc.TiFlash
Expand Down Expand Up @@ -2531,6 +2549,12 @@ func (s *Store) changeResolveStateTo(from, to resolveState) bool {
return false
}
if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) {
logutil.BgLogger().Info("change store resolve state",
zap.Uint64("store", s.storeID),
zap.String("addr", s.addr),
zap.String("from", from.String()),
zap.String("to", to.String()),
zap.String("liveness-state", s.getLivenessState().String()))
return true
}
}
Expand Down Expand Up @@ -2631,6 +2655,20 @@ const (
unknown
)

// String implements fmt.Stringer interface.
func (s livenessState) String() string {
switch s {
case unreachable:
return "unreachable"
case reachable:
return "reachable"
case unknown:
return "unknown"
default:
return fmt.Sprintf("unknown-%v", uint32(s))
}
}

func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
// This mechanism doesn't support non-TiKV stores currently.
if s.storeType != tikvrpc.TiKV {
Expand Down
96 changes: 86 additions & 10 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
package locate

import (
"bytes"
"context"
"fmt"
"math"
Expand Down Expand Up @@ -587,14 +588,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
leaderInvalid := leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1))
if len(state.option.labels) > 0 {
logutil.BgLogger().Warn(
"unable to find stores with given labels",
zap.Any("labels", state.option.labels),
)
logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels",
zap.Uint64("region", selector.region.GetID()),
zap.Bool("leader-invalid", leaderInvalid),
zap.Any("labels", state.option.labels))
}
leader := selector.replicas[state.leaderIdx]
if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) {
if leaderInvalid {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand Down Expand Up @@ -1168,6 +1170,7 @@ func (s *RegionRequestSender) SendReqCtx(
}()
}

totalErrors := make(map[string]int)
for {
if retryTimes > 0 {
req.IsRetryRequest = true
Expand Down Expand Up @@ -1200,10 +1203,7 @@ func (s *RegionRequestSender) SendReqCtx(

// TODO: Change the returned error to something like "region missing in cache",
// and handle this error like EpochNotMatch, which means to re-split the request and retry.
logutil.Logger(bo.GetCtx()).Debug(
"throwing pseudo region error due to region not found in cache",
zap.Stringer("region", &regionID),
)
s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, totalErrors)
resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
return resp, nil, retryTimes, err
}
Expand All @@ -1229,6 +1229,8 @@ func (s *RegionRequestSender) SendReqCtx(
var retry bool
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
if err != nil {
msg := fmt.Sprintf("send request failed, err: %v", err.Error())
s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors)
return nil, nil, retryTimes, err
}

Expand Down Expand Up @@ -1260,14 +1262,19 @@ func (s *RegionRequestSender) SendReqCtx(
return nil, nil, retryTimes, err
}
if regionErr != nil {
regionErrLabel := regionErrorToLabel(regionErr)
totalErrors[regionErrLabel]++
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr)
if err != nil {
msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error())
s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors)
return nil, nil, retryTimes, err
}
if retry {
retryTimes++
continue
}
s.logSendReqError(bo, "send request meet region error without retry", regionID, retryTimes, req, totalErrors)
} else {
if s.replicaSelector != nil {
s.replicaSelector.onSendSuccess()
Expand All @@ -1280,6 +1287,75 @@ func (s *RegionRequestSender) SendReqCtx(
}
}

func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) {
var replicaStatus []string
replicaSelectorState := "nil"
cacheRegionIsValid := "unknown"
if s.replicaSelector != nil {
switch s.replicaSelector.state.(type) {
case *accessKnownLeader:
replicaSelectorState = "accessKnownLeader"
case *accessFollower:
replicaSelectorState = "accessFollower"
case *accessByKnownProxy:
replicaSelectorState = "accessByKnownProxy"
case *tryFollower:
replicaSelectorState = "tryFollower"
case *tryNewProxy:
replicaSelectorState = "tryNewProxy"
case *invalidLeader:
replicaSelectorState = "invalidLeader"
case *invalidStore:
replicaSelectorState = "invalidStore"
case *stateBase:
replicaSelectorState = "stateBase"
case nil:
replicaSelectorState = "nil"
}
if s.replicaSelector.region != nil {
if s.replicaSelector.region.isValid() {
cacheRegionIsValid = "true"
} else {
cacheRegionIsValid = "false"
}
}
for _, replica := range s.replicaSelector.replicas {
replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v",
replica.peer.GetId(),
replica.store.storeID,
replica.isEpochStale(),
replica.attempts,
replica.epoch,
atomic.LoadUint32(&replica.store.epoch),
replica.store.getResolveState(),
replica.store.getLivenessState(),
))
}
}
var totalErrorStr bytes.Buffer
for err, cnt := range totalErrors {
if totalErrorStr.Len() > 0 {
totalErrorStr.WriteString(", ")
}
totalErrorStr.WriteString(err)
totalErrorStr.WriteString(":")
totalErrorStr.WriteString(strconv.Itoa(cnt))
}
logutil.Logger(bo.GetCtx()).Info(msg,
zap.Uint64("req-ts", req.GetStartTS()),
zap.String("req-type", req.Type.String()),
zap.String("region", regionID.String()),
zap.String("region-is-valid", cacheRegionIsValid),
zap.Int("retry-times", retryTimes),
zap.String("replica-read-type", req.ReplicaReadType.String()),
zap.String("replica-selector-state", replicaSelectorState),
zap.Bool("stale-read", req.StaleRead),
zap.String("replica-status", strings.Join(replicaStatus, "; ")),
zap.Int("total-backoff-ms", bo.GetTotalSleep()),
zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()),
zap.String("total-region-errors", totalErrorStr.String()))
}

// RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx.
type RPCCancellerCtxKey struct{}

Expand Down
14 changes: 14 additions & 0 deletions internal/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
package retry

import (
"bytes"
"context"
"fmt"
"math"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -150,6 +152,18 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
errMsg += "\n" + err.Error()
}
}
var backoffDetail bytes.Buffer
totalTimes := 0
for name, times := range b.backoffTimes {
totalTimes += times
if backoffDetail.Len() > 0 {
backoffDetail.WriteString(", ")
}
backoffDetail.WriteString(name)
backoffDetail.WriteString(":")
backoffDetail.WriteString(strconv.Itoa(times))
}
errMsg += fmt.Sprintf("\ntotal-backoff-times: %v, backoff-detail: %v", totalTimes, backoffDetail.String())
returnedErr := err
if longestSleepCfg != nil {
errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime)
Expand Down
20 changes: 20 additions & 0 deletions kv/store_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
package kv

import (
"fmt"

"go.uber.org/atomic"
)

Expand Down Expand Up @@ -72,3 +74,21 @@ const (
func (r ReplicaReadType) IsFollowerRead() bool {
return r != ReplicaReadLeader
}

// String implements fmt.Stringer interface.
func (r ReplicaReadType) String() string {
switch r {
case ReplicaReadLeader:
return "leader"
case ReplicaReadFollower:
return "follower"
case ReplicaReadMixed:
return "mixed"
case ReplicaReadLearner:
return "learner"
case ReplicaReadPreferLeader:
return "prefer-leader"
default:
return fmt.Sprintf("unknown-%v", byte(r))
}
}