Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
117340: kvserver: proxy requests that return a NotLeaseholderError r=erikgrinaker a=andrewbaptist

Previously if a request that required a direct connection to a
leaseholder would fail if the client was unable to initiate this
connection. There is a new field ProxyRangeInfo added to the
BatchRequest header. This field is set by a client which wants to
request the server will proxy the request for it.

There are three main changes as part of this request.
1) In DistSender if the client needs to send a request to a Leaseholder
	 but its transport has moved to the next replica, then it will add the
	 ProxyRangeInfo header to the request and send it to the follower.
2) In DistSender if it is attempting to send a request that already has
	 the ProxyRangeInfo header set, then it will short circuit much of the
	 retry logic and instead only send the request to the leaseholder the
	 original client wanted to send it to.
3) On the server side of the node, it will normally evaulate a request
	 even if ProxyRangeInfo is set. If its local evaluation results in a
	 NotLeaseHolderError and the range info matches what the client set in
	 the ProxyRangeInfo header, it will attempt to proxy the request to
	 the leaseholder and return the response from that request instead.

The impact of this change is to allow Batch requests to succeed as long
as there is connectivity between the client and at least one replica
that has a direct connection to the leaseholder.

Epic: none

Fixes: cockroachdb#93503

Release note (ops change): This PR adds an additional setting
kv.dist_sender.proxy.enabled which is defaulted to true. When it is
enabled, proxy requests will be routed through a follower replica when
the leaseholder is unavailable.

Co-authored-by: Andrew Baptist <[email protected]>
  • Loading branch information
craig[bot] and andrewbaptist committed Mar 23, 2024
2 parents c950a9e + b5caa16 commit 794a129
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 36 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@
<tr><td>APPLICATION</td><td>distsender.rpc.err.notleaseholdererrtype</td><td>Number of NotLeaseHolderErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.err.oprequirestxnerrtype</td><td>Number of OpRequiresTxnErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.err.optimisticevalconflictserrtype</td><td>Number of OptimisticEvalConflictsErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.err.proxyfailederrtype</td><td>Number of ProxyFailedErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.err.raftgroupdeletederrtype</td><td>Number of RaftGroupDeletedErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.err.rangefeedretryerrtype</td><td>Number of RangeFeedRetryErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.err.rangekeymismatcherrtype</td><td>Number of RangeKeyMismatchErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
170 changes: 146 additions & 24 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,13 @@ var sortByLocalityFirst = settings.RegisterBoolSetting(
true,
)

var ProxyBatchRequest = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.dist_sender.proxy.enabled",
"when true, proxy batch requests that can't be routed directly to the leaseholder",
true,
)

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
Expand Down Expand Up @@ -1974,6 +1981,18 @@ func (ds *DistSender) sendPartialBatch(
attempts++
pErr = nil
// If we've invalidated the descriptor on a send failure, re-lookup.

// On a proxy request, update our routing information with what the
// client sent us if the client had newer information. We have already
// validated the client request against our local replica state in
// node.go and reject requests with stale information. Here we ensure
// our RangeCache has the same information as both the client request
// and our local replica before attempting the request. If the sync
// makes our token invalid, we handle it similarly to a RangeNotFound or
// NotLeaseHolderError from a remote server.
if ba.ProxyRangeInfo != nil {
routingTok.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc)
}
if !routingTok.Valid() {
var descKey roachpb.RKey
if isReverse {
Expand Down Expand Up @@ -2047,6 +2066,11 @@ func (ds *DistSender) sendPartialBatch(
// Set pErr so that, if we don't perform any more retries, the
// deduceRetryEarlyExitError() call below the loop includes this error.
pErr = kvpb.NewError(err)
// Proxy requests are not retried since we not the originator.
if ba.ProxyRangeInfo != nil {
log.VEventf(ctx, 1, "failing proxy request after error %s", err)
break
}
switch {
case IsSendError(err):
// We've tried all the replicas without success. Either they're all
Expand Down Expand Up @@ -2340,7 +2364,7 @@ const defaultSendClosedTimestampPolicy = roachpb.LEAD_FOR_GLOBAL_READS
// AmbiguousResultError. Of those two, the latter has to be passed back to the
// client, while the former should be handled by retrying with an updated range
// descriptor. This method handles other errors returned from replicas
// internally by retrying (NotLeaseholderError, RangeNotFoundError), and falls
// internally by retrying (NotLeaseHolderError, RangeNotFoundError), and falls
// back to a sendError when it runs out of replicas to try.
//
// routing dictates what replicas will be tried (but not necessarily their
Expand Down Expand Up @@ -2385,15 +2409,62 @@ func (ds *DistSender) sendToReplicas(
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
}
desc := routing.Desc()
leaseholder := routing.Leaseholder()
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter)
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, routing.Leaseholder(), replicaFilter)
if err != nil {
return nil, err
}

// This client requested we proxy this request. Only proxy if we can
// determine the leaseholder and it agrees with the ProxyRangeInfo from
// the client. We don't support a proxy request to a non-leaseholder
// replica. If we decide to proxy this request, we will reduce our replica
// list to only the requested replica. If we fail on that request we fail back
// to the caller so they can try something else.
if ba.ProxyRangeInfo != nil {
log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo)
// We don't know who the leaseholder is, and it is likely that the
// client had stale information. Return our information to them through
// a NLHE and let them retry.
if routing.Lease().Empty() {
log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(roachpb.Lease{},
0, /* proposerStoreID */
routing.Desc(),
"client requested a proxy but we can't figure out the leaseholder"),
)
return &br, nil
}
if ba.ProxyRangeInfo.Lease.Sequence != routing.Lease().Sequence ||
ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation {
log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(
*routing.Lease(),
0, /* proposerStoreID */
routing.Desc(),
fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)),
)
return &br, nil
}

// On a proxy request, we only send the request to the leaseholder. If we
// are here then the client and server agree on the routing information, so
// use the leaseholder as our only replica to send to.
idx := replicas.Find(routing.Leaseholder().ReplicaID)
// This should never happen. We validated the routing above and the token
// is still valid.
if idx == -1 {
return nil, errors.AssertionFailedf("inconsistent routing %v %v", desc, *routing.Leaseholder())
}
replicas = replicas[idx : idx+1]
log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas)
}
// Rearrange the replicas so that they're ordered according to the routing
// policy.
var leaseholderFirst bool
var routeToLeaseholder bool
switch ba.RoutingPolicy {
case kvpb.RoutingPolicy_LEASEHOLDER:
// First order by latency, then move the leaseholder to the front of the
Expand All @@ -2403,12 +2474,12 @@ func (ds *DistSender) sendToReplicas(
}

idx := -1
if leaseholder != nil {
idx = replicas.Find(leaseholder.ReplicaID)
if routing.Leaseholder() != nil {
idx = replicas.Find(routing.Leaseholder().ReplicaID)
}
if idx != -1 {
replicas.MoveToFront(idx)
leaseholderFirst = true
routeToLeaseholder = true
} else {
// The leaseholder node's info must have been missing from gossip when we
// created replicas.
Expand Down Expand Up @@ -2546,6 +2617,44 @@ func (ds *DistSender) sendToReplicas(
comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

// Determine whether we should proxy this request through a follower to
// the leaseholder. The primary condition for proxying is that we would
// like to send the request to the leaseholder, but the transport is
// about to send the request through a follower.
requestToSend := ba
if !ProxyBatchRequest.Get(&ds.st.SV) {
// The setting is disabled, so we don't proxy this request.
} else if ba.ProxyRangeInfo != nil {
// Clear out the proxy information to prevent the recipient from
// sending this request onwards. This is necessary to prevent proxy
// chaining. We want the recipient to process the request or fail
// immediately. This is an extra safety measure to prevent any types
// of routing loops.
requestToSend = ba.ShallowCopy()
requestToSend.ProxyRangeInfo = nil
} else if !routeToLeaseholder {
// This request isn't intended for the leaseholder so we don't proxy it.
} else if routing.Leaseholder() == nil {
// NB: Normally we don't have both routeToLeaseholder and a nil
// leaseholder. This could be changed to an assertion.
log.Errorf(ctx, "attempting %v to route to leaseholder, but the leaseholder is unknown %v", ba, routing)
} else if ba.Replica.NodeID == routing.Leaseholder().NodeID {
// We are sending this request to the leaseholder, so it doesn't
// make sense to attempt to proxy it.
} else if ds.nodeIDGetter() == ba.Replica.NodeID {
// This condition prevents proxying a request if we are the same
// node as the final destination. Without this we would pass through
// the same DistSender stack again which is pointless.
} else {
// We passed all the conditions above and want to attempt to proxy
// this request. We need to copy it as we are going to modify the
// Header. For other replicas we may not end up setting the header.
requestToSend = ba.ShallowCopy()
rangeInfo := routing.RangeInfo()
requestToSend.ProxyRangeInfo = &rangeInfo
log.VEventf(ctx, 1, "attempt proxy request %v using %v", requestToSend, rangeInfo)
}

tBegin := timeutil.Now() // for slow log message
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).
Track(ctx, ba, tBegin.UnixNano())
Expand All @@ -2554,7 +2663,7 @@ func (ds *DistSender) sendToReplicas(
err = cbErr
transport.SkipReplica()
} else {
br, err = transport.SendNext(sendCtx, ba)
br, err = transport.SendNext(sendCtx, requestToSend)
tEnd := timeutil.Now()
cbToken.Done(br, err, tEnd.UnixNano())

Expand All @@ -2570,6 +2679,18 @@ func (ds *DistSender) sendToReplicas(
}
}
}
if err == nil {
if proxyErr, ok := br.Error.GetDetail().(*kvpb.ProxyFailedError); ok {
// The server proxy attempt resulted in a non-BatchRequest error, likely
// a communication error. Convert the wrapped error into an error on our
// side Depending on the type of request and what the error is we may
// treat the error an ambiguous error. Clear out the BatchResponse as
// the only information it contained was this error.
err = proxyErr.Unwrap()
br = nil
log.VEventf(ctx, 2, "proxy send error: %s", err)
}
}

ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)
Expand Down Expand Up @@ -2635,21 +2756,10 @@ func (ds *DistSender) sendToReplicas(
// retrying the writes, should it need to be propagated.
if withCommit && !grpcutil.RequestDidNotStart(err) {
ambiguousError = err
} else if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
// If we get a gRPC error against the leaseholder, we don't want to
// backoff and keep trying the same leaseholder against the leaseholder.
// TODO(baptist): This should not be in an else block. Ideally
// we set leaseholderUnavailable to true even if there is an
// ambiguous error as it should be set independent of an
// ambiguous error. TestTransactionUnexpectedlyCommitted test
// fails otherwise. That test is expecting us to retry against
// the leaseholder after we received a gRPC error to validate
// that it now returns a WriteTooOld error. Once the proxy code
// is in place, this can be moved back out of the if block. In
// practice the only impact of having this in the else block is
// that we will retry more times against a leaseholder before
// moving on to the other replicas. There is not an easy way to
// modify the test without this being in the else block.
}
// If we get a gRPC error against the leaseholder, we don't want to
// backoff and keep trying the request against the same leaseholder.
if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
leaseholderUnavailable = true
}
} else {
Expand Down Expand Up @@ -2816,6 +2926,18 @@ func (ds *DistSender) sendToReplicas(
// error too aggressively.
if updatedLeaseholder {
leaseholderUnavailable = false
routeToLeaseholder = true
// If we changed the leaseholder, reset the transport to try all the
// replicas in order again. After a leaseholder change, requests to
// followers will be marked as potential proxy requests and point to
// the new leaseholder. We need to try all the replicas again before
// giving up.
// NB: We reset and retry here because if we release a SendError to
// the caller, it will call Evict and evict the leaseholder
// information we just learned from this error.
// TODO(baptist): If sendPartialBatch didn't evict valid range
// information we would not need to reset the transport here.
transport.Reset()
}
// If the leaseholder is the replica that we've just tried, and
// we've tried this replica a bunch of times already, let's move on
Expand Down Expand Up @@ -2851,7 +2973,7 @@ func (ds *DistSender) sendToReplicas(
// have a sufficient closed timestamp. In response, we should
// immediately redirect to the leaseholder, without a backoff
// period.
intentionallySentToFollower := first && !leaseholderFirst
intentionallySentToFollower := first && !routeToLeaseholder
// See if we want to backoff a little before the next attempt. If
// the lease info we got is stale and we were intending to send to
// the leaseholder, we backoff because it might be the case that
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4694,10 +4694,8 @@ func TestPartialPartition(t *testing.T) {
for _, test := range testCases {
t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers),
func(t *testing.T) {
if test.useProxy {
skip.WithIssue(t, 93503)
}
st := cluster.MakeTestingClusterSettings()
kvcoord.ProxyBatchRequest.Override(ctx, &st.SV, test.useProxy)
// With epoch leases this test doesn't work reliably. It passes
// in cases where it should fail and fails in cases where it
// should pass.
Expand Down
25 changes: 18 additions & 7 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5683,21 +5683,32 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) {

// We'll send a request that first gets a NLHE, and then a RangeNotFoundError. We
// then expect an updated descriptor to be used and return success.
// Initially the routing is (*1, 2,) - no LH
// 1) Send to n1 -> NLHE with LH=2 (updated - reset), transport -> (*2, 1,) - LH=2
// 2) Send to n2 -> not found, transport -> (2, *1,) - LH=2
// 3) Send to n1 -> NLHE with LH=2 (not updated - backoff), transport -> (1, *2,) - LH=2
// 4) Send to n2 -> not found, transport -> (1, 2, *) - LH=2
// Evict/Refresh transport is now (*3) - no LH
// 5) Send to n3 - success
call := 0
var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
br := &kvpb.BatchResponse{}
switch call {
case 0:
case 0, 2:
expRepl := desc1.Replicas().Descriptors()[0]
require.Equal(t, expRepl, ba.Replica)
br.Error = kvpb.NewError(&kvpb.NotLeaseHolderError{
Lease: &roachpb.Lease{Replica: desc1.Replicas().Descriptors()[1]},
})
case 1:
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(
roachpb.Lease{Replica: desc1.Replicas().Descriptors()[1]},
1,
&desc1,
"store not leaseholder",
))
case 1, 3:
expRep := desc1.Replicas().Descriptors()[1]
require.Equal(t, ba.Replica, expRep)
br.Error = kvpb.NewError(kvpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID))
case 2:
case 4:
expRep := desc2.Replicas().Descriptors()[0]
require.Equal(t, ba.Replica, expRep)
br = ba.CreateReply()
Expand Down Expand Up @@ -5745,7 +5756,7 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) {

_, err := ds.Send(ctx, ba)
require.NoError(t, err.GoError())
require.Equal(t, call, 3)
require.Equal(t, call, 5)
require.Equal(t, rangeLookups, 2)
}

Expand Down
Loading

0 comments on commit 794a129

Please sign in to comment.