Skip to content

Commit

Permalink
kvclient: remove retries within sendToReplicas
Browse files Browse the repository at this point in the history
This change removes a retry loop within sendToReplicas that occured when
we hit a NLHE. We still retry within the outer loop of sendPartialBatch
and removing this also removes the only remaining reordering of the
replicas that are retrieved from the RangeCache.

There was a metric distsender.errors.inleasetransferbackoffs that tracks
the number of retries within this now removed loop that is also removed.

Epic: none

Release note: none
  • Loading branch information
andrewbaptist committed Jan 19, 2024
1 parent 7e463b6 commit fa990c1
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 149 deletions.
1 change: 0 additions & 1 deletion pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/caller",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/grpcutil",
Expand Down
200 changes: 67 additions & 133 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,6 @@ const (
// RangeLookupPrefetchCount is the maximum number of range descriptors to prefetch
// during range lookups.
RangeLookupPrefetchCount = 8
// The maximum number of times a replica is retried when it repeatedly returns
// stale lease info.
sameReplicaRetryLimit = 10
)

var rangeDescriptorCacheSize = settings.RegisterIntSetting(
Expand Down Expand Up @@ -1839,6 +1836,8 @@ func (ds *DistSender) sendPartialBatch(
var reply *kvpb.BatchResponse
var pErr *kvpb.Error
var err error
var ambiguousErr error
ba = ba.ShallowCopy()

// Start a retry loop for sending the batch to the range. Each iteration of
// this loop uses a new descriptor. Attempts to send to multiple replicas in
Expand Down Expand Up @@ -1887,9 +1886,17 @@ func (ds *DistSender) sendPartialBatch(
// larger response slice with unknown mapping to our truncated reply).
intersection, err := rs.Intersect(routingTok.Desc().RSpan())
if err != nil {
return response{pErr: kvpb.NewError(err)}
if ambiguousErr != nil {
return response{pErr: kvpb.NewError(ambiguousErr)}
} else {
return response{pErr: kvpb.NewError(err)}
}
}
if !intersection.Equal(rs) {
if ambiguousErr != nil {
log.Eventf(ctx, "range shrunk after an ambiguous error, fail the request")
return response{pErr: kvpb.NewError(ambiguousErr)}
}
log.Eventf(ctx, "range shrunk; sub-dividing the request")
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
return response{reply: reply, pErr: pErr}
Expand Down Expand Up @@ -1944,6 +1951,11 @@ func (ds *DistSender) sendPartialBatch(
log.VEventf(ctx, 1, "evicting range desc %s after %s", routingTok, err)
routingTok.Evict(ctx)
continue
case errors.HasType(err, &kvpb.AmbiguousResultError{}):
ambiguousErr = err
log.VEventf(ctx, 1, "retry after ambiguous error %s after %s", routingTok, err)
routingTok.Evict(ctx)
continue
}
break
}
Expand Down Expand Up @@ -1992,6 +2004,11 @@ func (ds *DistSender) sendPartialBatch(
// batch here would give a potentially larger response slice
// with unknown mapping to our truncated reply).
log.VEventf(ctx, 1, "likely split; will resend. Got new descriptors: %s", tErr.Ranges)
// We don't track ambiguous errors across routing changes. Failing here
// may be premature, but better than losing an ambiguous error.
if ambiguousErr != nil {
return response{pErr: kvpb.NewError(ambiguousErr)}
}
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
return response{reply: reply, pErr: pErr}
}
Expand All @@ -2008,6 +2025,9 @@ func (ds *DistSender) sendPartialBatch(
}
}

if ambiguousErr != nil {
pErr = kvpb.NewError(ambiguousErr)
}
return response{pErr: pErr}
}

Expand Down Expand Up @@ -2164,12 +2184,7 @@ func maybeSetResumeSpan(
// ambiguousErr, if not nil, is the error we got from the first attempt when the
// success of the request cannot be ruled out by the error. lastAttemptErr is
// the error that the last attempt to execute the request returned.
func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
if ambiguousErr != nil {
return kvpb.NewAmbiguousResultErrorf("error=%v [exhausted] (last error: %v)",
ambiguousErr, lastAttemptErr)
}

func noMoreReplicasErr(lastAttemptErr error) error {
// TODO(bdarnell): The error from the last attempt is not necessarily the best
// one to return; we may want to remember the "best" error we've seen (for
// example, a NotLeaseHolderError conveys more information than a
Expand Down Expand Up @@ -2210,6 +2225,9 @@ const defaultSendClosedTimestampPolicy = roachpb.LEAD_FOR_GLOBAL_READS
// latter because aborting is idempotent). If withCommit is true, any errors
// that do not definitively rule out the possibility that the batch could have
// succeeded are transformed into AmbiguousResultErrors.
//
// This method always works on a mutable copy of the BatchRequest. It is
// expected that it will change values on the request as it runs.
func (ds *DistSender) sendToReplicas(
ctx context.Context, ba *kvpb.BatchRequest, routing rangecache.EvictionToken, withCommit bool,
) (*kvpb.BatchResponse, error) {
Expand All @@ -2224,7 +2242,6 @@ func (ds *DistSender) sendToReplicas(
ds.st, ds.clock,
routing.ClosedTimestampPolicy(defaultSendClosedTimestampPolicy), ba,
) {
ba = ba.ShallowCopy()
ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST
}
// Filter the replicas to only those that are relevant to the routing policy.
Expand All @@ -2248,7 +2265,6 @@ func (ds *DistSender) sendToReplicas(

// Rearrange the replicas so that they're ordered according to the routing
// policy.
var leaseholderFirst bool
switch ba.RoutingPolicy {
case kvpb.RoutingPolicy_LEASEHOLDER:
// First order by latency, then move the leaseholder to the front of the
Expand All @@ -2263,7 +2279,6 @@ func (ds *DistSender) sendToReplicas(
}
if idx != -1 {
replicas.MoveToFront(idx)
leaseholderFirst = true
} else {
// The leaseholder node's info must have been missing from gossip when we
// created replicas.
Expand All @@ -2290,13 +2305,6 @@ func (ds *DistSender) sendToReplicas(
}
defer transport.Release()

// inTransferRetry is used to slow down retries in cases where an ongoing
// lease transfer is suspected.
// TODO(andrei): now that requests wait on lease transfers to complete on
// outgoing leaseholders instead of immediately redirecting, we should
// rethink this backoff policy.
inTransferRetry := retry.StartWithCtx(ctx, ds.rpcRetryOptions)
inTransferRetry.Next() // The first call to Next does not block.
var sameReplicaRetries int
var prevReplica roachpb.ReplicaDescriptor

Expand All @@ -2308,7 +2316,6 @@ func (ds *DistSender) sendToReplicas(

// This loop will retry operations that fail with errors that reflect
// per-replica state and may succeed on other replicas.
var ambiguousError error
var br *kvpb.BatchResponse
for first := true; ; first = false {
if !first {
Expand All @@ -2323,7 +2330,7 @@ func (ds *DistSender) sendToReplicas(
if lastErr == nil && br != nil {
lastErr = br.Error.GoError()
}
err = skipStaleReplicas(transport, routing, ambiguousError, lastErr)
err = skipStaleReplicas(transport, routing, lastErr)
if err != nil {
return nil, err
}
Expand All @@ -2342,31 +2349,9 @@ func (ds *DistSender) sendToReplicas(
}
prevReplica = curReplica

ba = ba.ShallowCopy()
ba.Replica = curReplica
ba.RangeID = desc.RangeID

// When a sub-batch from a batch containing a commit experiences an
// ambiguous error, it is critical to ensure subsequent replay attempts
// do not permit changing the write timestamp, as the transaction may
// already have been considered implicitly committed.
ba.AmbiguousReplayProtection = ambiguousError != nil

// In the case that the batch has already seen an ambiguous error, in
// addition to enabling ambiguous replay protection, we also need to
// disable the ability for the server to forward the read timestamp, as
// the transaction may have been implicitly committed. If the intents for
// the implicitly committed transaction were already resolved, on a replay
// attempt encountering committed values above the read timestamp the
// server will attempt to handle what seems to be a write-write conflict by
// throwing a WriteTooOld, which could be refreshed away on the server if
// the read timestamp can be moved. Disabling this ability protects against
// refreshing away the error when retrying the ambiguous operation, instead
// returning to the DistSender so the ambiguous error can be propagated.
if ambiguousError != nil && ba.CanForwardReadTimestamp {
ba.CanForwardReadTimestamp = false
}

// Communicate to the server the information our cache has about the
// range. If it's stale, the server will return an update.
ba.ClientRangeInfo = roachpb.ClientRangeInfo{
Expand Down Expand Up @@ -2396,7 +2381,11 @@ func (ds *DistSender) sendToReplicas(
comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

br, err = transport.SendNext(ctx, ba)
// TODO(baptist): This extra copy prevents a race conditions if the batch is
// modified after it is sent by the client. This shouldn't be necessary, but
// the transport_race test will fail without this copy. Figure out how to
// remove it.
br, err = transport.SendNext(ctx, ba.ShallowCopy())
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)

Expand All @@ -2405,10 +2394,6 @@ func (ds *DistSender) sendToReplicas(

if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
if ambiguousError != nil {
return nil, kvpb.NewAmbiguousResultErrorf("error=%v [propagate] (last error: %v)",
ambiguousError, err)
}
return nil, err
}

Expand Down Expand Up @@ -2465,7 +2450,26 @@ func (ds *DistSender) sendToReplicas(
// batch contains a commit, the ambiguous error should be caught on
// retrying the writes, should it need to be propagated.
if withCommit && !grpcutil.RequestDidNotStart(err) {
ambiguousError = err
// When a sub-batch from a batch containing a commit experiences an
// ambiguous error, it is critical to ensure subsequent replay attempts
// do not permit changing the write timestamp, as the transaction may
// already have been considered implicitly committed.
ba.AmbiguousReplayProtection = true

// In the case that the batch has already seen an ambiguous error, in
// addition to enabling ambiguous replay protection, we also need to
// disable the ability for the server to forward the read timestamp, as
// the transaction may have been implicitly committed. If the intents for
// the implicitly committed transaction were already resolved, on a replay
// attempt encountering committed values above the read timestamp the
// server will attempt to handle what seems to be a write-write conflict by
// throwing a WriteTooOld, which could be refreshed away on the server if
// the read timestamp can be moved. Disabling this ability protects against
// refreshing away the error when retrying the ambiguous operation, instead
// returning to the DistSender so the ambiguous error can be propagated.
ba.CanForwardReadTimestamp = false

return nil, kvpb.NewAmbiguousResultErrorf("error=%v [propagate]", err)
}

// If the error wasn't just a context cancellation and the down replica
Expand Down Expand Up @@ -2542,85 +2546,23 @@ func (ds *DistSender) sendToReplicas(
// leaseholder in the range cache.
case *kvpb.NotLeaseHolderError:
ds.metrics.NotLeaseHolderErrCount.Inc(1)
// If we got some lease information, we use it. If not, we loop around
// and try the next replica.
// If we got some lease information, and it is more up-to-date, then
// apply the information and exit. If not, we loop around and try the
// next replica.
if tErr.Lease != nil {
// Update the leaseholder in the range cache. Naively this would also
// happen when the next RPC comes back, but we don't want to wait out
// the additional RPC latency.

var updatedLeaseholder bool
if tErr.Lease != nil {
updatedLeaseholder = routing.SyncTokenAndMaybeUpdateCache(ctx, tErr.Lease, &tErr.RangeDesc)
}
// Move the new leaseholder to the head of the queue for the next
// retry. Note that the leaseholder might not be the one indicated by
// the NLHE we just received, in case that error carried stale info.
if lh := routing.Leaseholder(); lh != nil {
// If the leaseholder is the replica that we've just tried, and
// we've tried this replica a bunch of times already, let's move on
// and not try it again. This prevents us getting stuck on a replica
// that we think has the lease but keeps returning redirects to us
// (possibly because it hasn't applied its lease yet). Perhaps that
// lease expires and someone else gets a new one, so by moving on we
// get out of possibly infinite loops.
if !lh.IsSame(curReplica) || sameReplicaRetries < sameReplicaRetryLimit {
moved := transport.MoveToFront(*lh)
if !moved {
// The transport always includes the client's view of the
// leaseholder when it's constructed. If the leaseholder can't
// be found on the transport then it must be the case that the
// routing was updated with lease information that is not
// compatible with the range descriptor that was used to
// construct the transport. A client may have an arbitrarily
// stale view of the leaseholder, but it is never expected to
// regress. As such, advancing through each replica on the
// transport until it's exhausted is unlikely to achieve much.
//
// We bail early by returning a SendError. The expectation is
// for the client to retry with a fresher eviction token.
log.VEventf(
ctx, 2, "transport incompatible with updated routing; bailing early",
)
return nil, newSendError(errors.Wrap(tErr, "leaseholder not found in transport; last error"))
}
}
log.VEventf(ctx, 2, "retry with updated routing info: lease: %s, rangeDesc: %s", tErr.Lease, tErr.RangeDesc)
if routing.SyncTokenAndMaybeUpdateCache(ctx, tErr.Lease, &tErr.RangeDesc) {
// We updated our cache with newer information. Exit from this
// method to allow a retry at a higher level with a new replica
// list.
return nil, newSendError(errors.Wrap(tErr, "routing changed, retry with updated cache"))
}
// Check whether the request was intentionally sent to a follower
// replica to perform a follower read. In such cases, the follower
// may reject the request with a NotLeaseHolderError if it does not
// have a sufficient closed timestamp. In response, we should
// immediately redirect to the leaseholder, without a backoff
// period.
intentionallySentToFollower := first && !leaseholderFirst
// See if we want to backoff a little before the next attempt. If
// the lease info we got is stale and we were intending to send to
// the leaseholder, we backoff because it might be the case that
// there's a lease transfer in progress and the would-be leaseholder
// has not yet applied the new lease.
//
// TODO(arul): The idea here is for the client to not keep sending
// the would-be leaseholder multiple requests and backoff a bit to let
// it apply the its lease. Instead of deriving this information like
// we do above, we could instead check if we're retrying the same
// leaseholder (i.e, if the leaseholder on the routing is the same as
// the replica we just tried), in which case we should backoff. With
// this scheme we'd no longer have to track "updatedLeaseholder" state
// when syncing the NLHE with the range cache.
shouldBackoff := !updatedLeaseholder && !intentionallySentToFollower
if shouldBackoff {
log.VErrEventf(ctx, 2, "backing off due to NotLeaseHolderErr with stale info")
} else {
inTransferRetry.Reset() // The following Next() call will not block.
}
inTransferRetry.Next()
}
default:
if ambiguousError != nil {
return nil, kvpb.NewAmbiguousResultErrorf("error=%v [propagate] (last error: %v)",
ambiguousError, br.Error.GoError())
}

// The error received is likely not specific to this
// replica, so we should return it instead of trying other
// replicas.
Expand All @@ -2637,11 +2579,7 @@ func (ds *DistSender) sendToReplicas(
// cause range cache evictions. Context cancellations just mean the
// sender changed its mind or the request timed out.

if ambiguousError != nil {
err = kvpb.NewAmbiguousResultError(errors.Wrapf(ambiguousError, "context done during DistSender.Send"))
} else {
err = errors.Wrap(ctx.Err(), "aborted during DistSender.Send")
}
err = errors.Wrap(ctx.Err(), "aborted during DistSender.Send")
log.Eventf(ctx, "%v", err)
return nil, err
}
Expand Down Expand Up @@ -2835,23 +2773,19 @@ func (ds *DistSender) AllRangeSpans(
// considered to be exhausted.
//
// Returns an error if the transport is exhausted.
func skipStaleReplicas(
transport Transport, routing rangecache.EvictionToken, ambiguousError error, lastErr error,
) error {
func skipStaleReplicas(transport Transport, routing rangecache.EvictionToken, lastErr error) error {
// Check whether the range cache told us that the routing info we had is
// very out-of-date. If so, there's not much point in trying the other
// replicas in the transport; they'll likely all return
// RangeKeyMismatchError if there's even a replica. We'll bubble up an
// error and try with a new descriptor.
if !routing.Valid() {
return noMoreReplicasErr(
ambiguousError,
errors.Wrap(lastErr, "routing information detected to be stale"))
return noMoreReplicasErr(errors.Wrap(lastErr, "routing information detected to be stale"))
}

for {
if transport.IsExhausted() {
return noMoreReplicasErr(ambiguousError, lastErr)
return noMoreReplicasErr(lastErr)
}

if _, ok := routing.Desc().GetReplicaDescriptorByID(transport.NextReplica().ReplicaID); ok {
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,6 @@ func validateTxnCommitAmbiguousError(t *testing.T, err error, reason string) {
"did not expect incorrect TransactionRetryWithProtoRefreshError due to failed refresh")
require.Falsef(t, errors.HasAssertionFailure(err),
"expected no AssertionFailedError due to sanity check on transaction already committed")
require.ErrorContainsf(t, aErr, reason,
"expected AmbiguousResultError to include message \"%s\"", reason)
}

// TestTransactionUnexpectedlyCommitted validates the handling of the case where
Expand Down
Loading

0 comments on commit fa990c1

Please sign in to comment.