Skip to content

Commit

Permalink
kv: log slow requests on replica level in addition to range level
Browse files Browse the repository at this point in the history
Previously, slow requests were only logged at the range level, but
the logs did not indicate which replica is slow. Moreover, the
SlowRPC metric attempted to represent the number of requests
currently being retried, however it was done on the range level
and therefore missed a second level of replica-level retries being
done underneath.

This PR adds logging on the replica level, removes a confusing log
line, and changes the metric to count the number of slow requests
in a simpler manner.

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-33510
Fixes: #114431
  • Loading branch information
shralex committed Dec 27, 2023
1 parent 53e4efd commit ecc1a99
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 22 deletions.
55 changes: 35 additions & 20 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ var (
}
metaDistSenderSlowRPCs = metric.Metadata{
Name: "requests.slow.distsender",
Help: `Number of replica-bound RPCs currently stuck or retrying for a long time.
Help: `Number of slow replica-bound RPCs.
Note that this is not a good signal for KV health. The remote side of the
RPCs tracked here may experience contention, so an end user can easily
Expand Down Expand Up @@ -309,7 +309,7 @@ type DistSenderMetrics struct {
NotLeaseHolderErrCount *metric.Counter
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
SlowRPCs *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
DistSenderRangeFeedMetrics
Expand Down Expand Up @@ -341,7 +341,7 @@ func makeDistSenderMetrics() DistSenderMetrics {
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
SlowRPCs: metric.NewCounter(metaDistSenderSlowRPCs),
DistSenderRangeFeedMetrics: makeDistSenderRangeFeedMetrics(),
}
for i := range m.MethodCounts {
Expand Down Expand Up @@ -1810,8 +1810,20 @@ func slowRangeRPCWarningStr(
dur.Seconds(), attempts, ba, desc, resp)
}

func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, attempts int64) {
s.Printf("slow RPC finished after %.2fs (%d attempts)", dur.Seconds(), attempts)
func slowReplicaRPCWarningStr(
s *redact.StringBuilder,
ba *kvpb.BatchRequest,
dur time.Duration,
attempts int64,
err error,
br *kvpb.BatchResponse,
) {
resp := interface{}(err)
if resp == nil {
resp = br
}
s.Printf("have been waiting %.2fs (%d attempts) for RPC %s to replica %s; resp: %s",
dur.Seconds(), attempts, ba, ba.Replica, resp)
}

// sendPartialBatch sends the supplied batch to the range specified by the
Expand Down Expand Up @@ -1906,25 +1918,12 @@ func (ds *DistSender) sendPartialBatch(

prevTok = routingTok
reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit)

const slowDistSenderThreshold = time.Minute
if dur := timeutil.Since(tBegin); dur > slowDistSenderThreshold && !tBegin.IsZero() {
if dur := timeutil.Since(tBegin); dur > slowDistSenderRangeThreshold && !tBegin.IsZero() {
{
var s redact.StringBuilder
slowRangeRPCWarningStr(&s, ba, dur, attempts, routingTok.Desc(), err, reply)
log.Warningf(ctx, "slow range RPC: %v", &s)
}
// If the RPC wasn't successful, defer the logging of a message once the
// RPC is not retried any more.
if err != nil || reply.Error != nil {
ds.metrics.SlowRPCs.Inc(1)
defer func(tBegin time.Time, attempts int64) {
ds.metrics.SlowRPCs.Dec(1)
var s redact.StringBuilder
slowRangeRPCReturnWarningStr(&s, timeutil.Since(tBegin), attempts)
log.Warningf(ctx, "slow RPC response: %v", &s)
}(tBegin, attempts)
}
tBegin = time.Time{} // prevent reentering branch for this RPC
}

Expand Down Expand Up @@ -2185,6 +2184,13 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
return newSendError(errors.Wrap(lastAttemptErr, "sending to all replicas failed; last error"))
}

// slowDistSenderRangeThreshold is a latency threshold for logging slow requests to a range,
// potentially involving RPCs to multiple replicas of the range.
const slowDistSenderRangeThreshold = time.Minute

// slowDistSenderReplicaThreshold is a latency threshold for logging a slow RPC to a single replica.
const slowDistSenderReplicaThreshold = 3 * time.Second

// defaultSendClosedTimestampPolicy is used when the closed timestamp policy
// is not known by the range cache. This choice prevents sending batch requests
// to only voters when a perfectly good non-voter may exist in the local
Expand Down Expand Up @@ -2318,7 +2324,8 @@ func (ds *DistSender) sendToReplicas(
// per-replica state and may succeed on other replicas.
var ambiguousError error
var br *kvpb.BatchResponse
for first := true; ; first = false {
attempts := int64(0)
for first := true; ; first, attempts = false, attempts+1 {
if !first {
ds.metrics.NextReplicaErrCount.Inc(1)
}
Expand Down Expand Up @@ -2404,7 +2411,15 @@ func (ds *DistSender) sendToReplicas(
comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

tBegin := timeutil.Now() // for slow log message
br, err = transport.SendNext(ctx, ba)
if dur := timeutil.Since(tBegin); dur > slowDistSenderReplicaThreshold && !tBegin.IsZero() {
var s redact.StringBuilder
// Note that these RPC may or may not have succeeded. Errors are counted separately below.
ds.metrics.SlowRPCs.Inc(1)
slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
log.Warningf(ctx, "slow replica RPC: %v", &s)
}
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)

Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4435,6 +4435,11 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
get.KeyLockingStrength = lock.Shared
get.KeyLockingDurability = lock.Unreplicated
ba.Add(get)
ba.Replica = roachpb.ReplicaDescriptor{
ReplicaID: 1,
NodeID: 2,
StoreID: 3,
}
br := &kvpb.BatchResponse{}
br.Error = kvpb.NewError(errors.New("boom"))
desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")}
Expand All @@ -4448,9 +4453,10 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
}

{
exp := `slow RPC finished after 8.16s (120 attempts)`
exp := `have been waiting 8.16s (120 attempts) for RPC Get(Shared,Unreplicated) [‹"a"›] to` +
` replica (n2,s3):1; resp: ‹(err: boom)›`
var s redact.StringBuilder
slowRangeRPCReturnWarningStr(&s, dur, attempts)
slowReplicaRPCWarningStr(&s, ba, dur, attempts, nil /* err */, br)
act := s.RedactableString()
require.EqualValues(t, exp, act)
}
Expand Down

0 comments on commit ecc1a99

Please sign in to comment.