Skip to content

Commit

Permalink
kvserver: proxy requests that return a NotLeaseholderError
Browse files Browse the repository at this point in the history
Previously if a request that required a direct connection to a
leaseholder would fail if the client was unable to initiate this
connection. This PR adds the client and server sides of proxy handling
to work around partial partitions.

There are three main changes as part of this request.
1) In DistSender, if the client needs to send a request to a leaseholder
	 but the transport has moved to the next replica, then it will add the
	 ProxyRangeInfo header to the request and send it to each follower.
2) In DistSender, if it is attempting to send a request that already has
	 the ProxyRangeInfo header set, then it will short circuit much of the
	 retry logic and only send the request to the leaseholder based on the
	 ProxyRangeInfo header.
3) On the server side of the node, it will normally evaulate a request
	 even if ProxyRangeInfo is set. If local evaluation results in a
	 NotLeaseHolderError and the range info matches what the client set in
	 the ProxyRangeInfo header, it will attempt to proxy the request to
	 the leaseholder. It will return the end users response rather than
	 its local response.

The impact of this change is to allow a BatchRequest to succeed as long
as there is connectivity between the client and at least one replica
that has a direct connection to the leaseholder.

Epic: none

Fixes: #93503

Release note (ops change): This PR adds an additional setting
kv.dist_sender.proxy.enabled which is defaulted to true. When it is
enabled, proxy requests will be routed through a follower replica when
the leaseholder is unavailable.
  • Loading branch information
andrewbaptist committed Mar 22, 2024
1 parent a9c3a59 commit 0fe4c02
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 27 deletions.
159 changes: 135 additions & 24 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,13 @@ var sortByLocalityFirst = settings.RegisterBoolSetting(
true,
)

var ProxyBatchRequest = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.dist_sender.proxy.enabled",
"when true, proxy batch requests that can't be routed directly to the leaseholder",
true,
)

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
Expand Down Expand Up @@ -1896,6 +1903,18 @@ func (ds *DistSender) sendPartialBatch(
attempts++
pErr = nil
// If we've invalidated the descriptor on a send failure, re-lookup.

// On a proxy request, update our routing information with what the
// client sent us if the client had newer information. We have already
// validated the client request against our local replica state in
// node.go and reject requests with stale information. Here we ensure
// our RangeCache has the same information as both the client request
// and our local replica before attempting the request. If the sync
// makes our token invalid, we handle it similarly to a RangeNotFound or
// NotLeaseHolderError from a remote server.
if ba.ProxyRangeInfo != nil {
routingTok.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc)
}
if !routingTok.Valid() {
var descKey roachpb.RKey
if isReverse {
Expand Down Expand Up @@ -1969,6 +1988,11 @@ func (ds *DistSender) sendPartialBatch(
// Set pErr so that, if we don't perform any more retries, the
// deduceRetryEarlyExitError() call below the loop includes this error.
pErr = kvpb.NewError(err)
// Proxy requests are not retried since we not the originator.
if ba.ProxyRangeInfo != nil {
log.VEventf(ctx, 1, "failing proxy request after error %s", err)
break
}
switch {
case IsSendError(err):
// We've tried all the replicas without success. Either they're all
Expand Down Expand Up @@ -2262,7 +2286,7 @@ const defaultSendClosedTimestampPolicy = roachpb.LEAD_FOR_GLOBAL_READS
// AmbiguousResultError. Of those two, the latter has to be passed back to the
// client, while the former should be handled by retrying with an updated range
// descriptor. This method handles other errors returned from replicas
// internally by retrying (NotLeaseholderError, RangeNotFoundError), and falls
// internally by retrying (NotLeaseHolderError, RangeNotFoundError), and falls
// back to a sendError when it runs out of replicas to try.
//
// routing dictates what replicas will be tried (but not necessarily their
Expand Down Expand Up @@ -2307,15 +2331,62 @@ func (ds *DistSender) sendToReplicas(
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
}
desc := routing.Desc()
leaseholder := routing.Leaseholder()
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter)
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, routing.Leaseholder(), replicaFilter)
if err != nil {
return nil, err
}

// This client requested we proxy this request. Only proxy if we can
// determine the leaseholder and it agrees with the ProxyRangeInfo from
// the client. We don't support a proxy request to a non-leaseholder
// replica. If we decide to proxy this request, we will reduce our replica
// list to only the requested replica. If we fail on that request we fail back
// to the caller so they can try something else.
if ba.ProxyRangeInfo != nil {
log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo)
// We don't know who the leaseholder is, and it is likely that the
// client had stale information. Return our information to them through
// a NLHE and let them retry.
if routing.Lease().Empty() {
log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(roachpb.Lease{},
0, /* proposerStoreID */
routing.Desc(),
"client requested a proxy but we can't figure out the leaseholder"),
)
return &br, nil
}
if ba.ProxyRangeInfo.Lease.Sequence != routing.Lease().Sequence ||
ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation {
log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(
*routing.Lease(),
0, /* proposerStoreID */
routing.Desc(),
fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)),
)
return &br, nil
}

// On a proxy request, we only send the request to the leaseholder. If we
// are here then the client and server agree on the routing information, so
// use the leaseholder as our only replica to send to.
idx := replicas.Find(routing.Leaseholder().ReplicaID)
// This should never happen. We validated the routing above and the token
// is still valid.
if idx == -1 {
return nil, errors.AssertionFailedf("inconsistent routing %v %v", desc, *routing.Leaseholder())
}
replicas = replicas[idx : idx+1]
log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas)
}
// Rearrange the replicas so that they're ordered according to the routing
// policy.
var leaseholderFirst bool
var routeToLeaseholder bool
switch ba.RoutingPolicy {
case kvpb.RoutingPolicy_LEASEHOLDER:
// First order by latency, then move the leaseholder to the front of the
Expand All @@ -2325,12 +2396,12 @@ func (ds *DistSender) sendToReplicas(
}

idx := -1
if leaseholder != nil {
idx = replicas.Find(leaseholder.ReplicaID)
if routing.Leaseholder() != nil {
idx = replicas.Find(routing.Leaseholder().ReplicaID)
}
if idx != -1 {
replicas.MoveToFront(idx)
leaseholderFirst = true
routeToLeaseholder = true
} else {
// The leaseholder node's info must have been missing from gossip when we
// created replicas.
Expand Down Expand Up @@ -2468,6 +2539,44 @@ func (ds *DistSender) sendToReplicas(
comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

// Determine whether we should proxy this request through a follower to
// the leaseholder. The primary condition for proxying is that we would
// like to send the request to the leaseholder, but the transport is
// about to send the request through a follower.
requestToSend := ba
if !ProxyBatchRequest.Get(&ds.st.SV) {
// The setting is disabled, so we don't proxy this request.
} else if ba.ProxyRangeInfo != nil {
// Clear out the proxy information to prevent the recipient from
// sending this request onwards. This is necessary to prevent proxy
// chaining. We want the recipient to process the request or fail
// immediately. This is an extra safety measure to prevent any types
// of routing loops.
requestToSend = ba.ShallowCopy()
requestToSend.ProxyRangeInfo = nil
} else if !routeToLeaseholder {
// This request isn't intended for the leaseholder so we don't proxy it.
} else if routing.Leaseholder() == nil {
// NB: Normally we don't have both routeToLeaseholder and a nil
// leaseholder. This could be changed to an assertion.
log.Errorf(ctx, "attempting %v to route to leaseholder, but the leaseholder is unknown %v", ba, routing)
} else if ba.Replica.NodeID == routing.Leaseholder().NodeID {
// We are sending this request to the leaseholder, so it doesn't
// make sense to attempt to proxy it.
} else if ds.nodeIDGetter() == ba.Replica.NodeID {
// This condition prevents proxying a request if we are the same
// node as the final destination. Without this we would pass through
// the same DistSender stack again which is pointless.
} else {
// We passed all the conditions above and want to attempt to proxy
// this request. We need to copy it as we are going to modify the
// Header. For other replicas we may not end up setting the header.
requestToSend = ba.ShallowCopy()
rangeInfo := routing.RangeInfo()
requestToSend.ProxyRangeInfo = &rangeInfo
log.VEventf(ctx, 1, "attempt proxy request %v using %v", requestToSend, rangeInfo)
}

tBegin := timeutil.Now() // for slow log message
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).
Track(ctx, ba, tBegin.UnixNano())
Expand All @@ -2476,7 +2585,7 @@ func (ds *DistSender) sendToReplicas(
err = cbErr
transport.SkipReplica()
} else {
br, err = transport.SendNext(sendCtx, ba)
br, err = transport.SendNext(sendCtx, requestToSend)
tEnd := timeutil.Now()
cbToken.Done(br, err, tEnd.UnixNano())

Expand All @@ -2492,6 +2601,18 @@ func (ds *DistSender) sendToReplicas(
}
}
}
if err == nil {
if proxyErr, ok := br.Error.GetDetail().(*kvpb.ProxyFailedError); ok {
// The server proxy attempt resulted in a non-BatchRequest error, likely
// a communication error. Convert the wrapped error into an error on our
// side Depending on the type of request and what the error is we may
// treat the error an ambiguous error. Clear out the BatchResponse as
// the only information it contained was this error.
err = proxyErr.Unwrap()
br = nil
log.VEventf(ctx, 2, "proxy send error: %s", err)
}
}

ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)
Expand Down Expand Up @@ -2557,21 +2678,10 @@ func (ds *DistSender) sendToReplicas(
// retrying the writes, should it need to be propagated.
if withCommit && !grpcutil.RequestDidNotStart(err) {
ambiguousError = err
} else if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
// If we get a gRPC error against the leaseholder, we don't want to
// backoff and keep trying the same leaseholder against the leaseholder.
// TODO(baptist): This should not be in an else block. Ideally
// we set leaseholderUnavailable to true even if there is an
// ambiguous error as it should be set independent of an
// ambiguous error. TestTransactionUnexpectedlyCommitted test
// fails otherwise. That test is expecting us to retry against
// the leaseholder after we received a gRPC error to validate
// that it now returns a WriteTooOld error. Once the proxy code
// is in place, this can be moved back out of the if block. In
// practice the only impact of having this in the else block is
// that we will retry more times against a leaseholder before
// moving on to the other replicas. There is not an easy way to
// modify the test without this being in the else block.
}
// If we get a gRPC error against the leaseholder, we don't want to
// backoff and keep trying the request against the same leaseholder.
if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
leaseholderUnavailable = true
}
} else {
Expand Down Expand Up @@ -2738,6 +2848,7 @@ func (ds *DistSender) sendToReplicas(
// error too aggressively.
if updatedLeaseholder {
leaseholderUnavailable = false
routeToLeaseholder = true
}
// If the leaseholder is the replica that we've just tried, and
// we've tried this replica a bunch of times already, let's move on
Expand Down Expand Up @@ -2773,7 +2884,7 @@ func (ds *DistSender) sendToReplicas(
// have a sufficient closed timestamp. In response, we should
// immediately redirect to the leaseholder, without a backoff
// period.
intentionallySentToFollower := first && !leaseholderFirst
intentionallySentToFollower := first && !routeToLeaseholder
// See if we want to backoff a little before the next attempt. If
// the lease info we got is stale and we were intending to send to
// the leaseholder, we backoff because it might be the case that
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4694,10 +4694,8 @@ func TestPartialPartition(t *testing.T) {
for _, test := range testCases {
t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers),
func(t *testing.T) {
if test.useProxy {
skip.WithIssue(t, 93503)
}
st := cluster.MakeTestingClusterSettings()
kvcoord.ProxyBatchRequest.Override(ctx, &st.SV, test.useProxy)
// With epoch leases this test doesn't work reliably. It passes
// in cases where it should fail and fails in cases where it
// should pass.
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvpb/errordetailtype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 77 additions & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,9 +1410,34 @@ func (n *Node) batchInternal(
n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes)
writeBytes.Release()
}()

// If a proxy attempt is requested, we copy the request to prevent evaluation
// from modifying the request. There are places on the server that can modify
// the request, and we can't keep these modifications if we later proxy it.
// Note we only ShallowCopy, so care must be taken with internal changes.
// For reference some of the places are:
// SetActiveTimestamp - sets the Header.Timestamp
// maybeStripInFlightWrites - can modify internal EndTxn requests
// tryBumpBatchTimestamp - can modify the txn.ReadTimestamp
// TODO(baptist): Other code copies the BatchRequest, in some cases
// unnecessarily, to prevent modifying the passed in request. We should clean
// up the contract of the Send method to allow modifying the request or more
// strictly enforce that the callee is not allowed to change it.
var originalRequest *kvpb.BatchRequest
if args.ProxyRangeInfo != nil {
originalRequest = args.ShallowCopy()
}
var pErr *kvpb.Error
br, writeBytes, pErr = n.stores.SendWithWriteBytes(ctx, args)
if pErr != nil {
if originalRequest != nil {
if proxyResponse := n.maybeProxyRequest(ctx, originalRequest, pErr); proxyResponse != nil {
// If the proxy request succeeded then return its result instead of
// our error. If not, use our original error.
return proxyResponse, nil
}
}

br = &kvpb.BatchResponse{}
if pErr.Index != nil && keyvissettings.Enabled.Get(&n.storeCfg.Settings.SV) {
// Tell the SpanStatsCollector about the requests in this BatchRequest,
Expand Down Expand Up @@ -1466,6 +1491,58 @@ func (n *Node) batchInternal(
return br, nil
}

// maybeProxyRequest is called after the server returned an error and it
// attempts to proxy the request if it can. We attempt o proxy requests if two
// primary conditions are met:
// 1) The ProxyRangeInfo header is set on the request indicating the client
// would like us to proxy this request if we can't evaluate it.
// 2) Local evaluation has resulted in a NotLeaseHolderError which matches the
// ProxyRangeInfo from the client.
// If these conditions are met, attempt to send the request through our local
// DistSender stack and use that result instead of our error.
func (n *Node) maybeProxyRequest(
ctx context.Context, ba *kvpb.BatchRequest, pErr *kvpb.Error,
) *kvpb.BatchResponse {
// NB: We don't handle StoreNotFound or RangeNotFound errors. If we want to
// support proxy requests through non-replicas we could proxy those errors
// as well.
var nlhe *kvpb.NotLeaseHolderError
if ok := errors.As(pErr.GetDetail(), &nlhe); !ok {
log.VEventf(ctx, 2, "non-proxyable errors %v", pErr)
return nil
}
// If because we think the client has
// stale information, see if our information would update the clients
// state. If so, rather than proxying this request, fail back to the
// client first.
leaseCompatible := nlhe.Lease != nil && ba.ProxyRangeInfo.Lease.Sequence >= nlhe.Lease.Sequence
descCompatible := ba.ProxyRangeInfo.Desc.Generation >= nlhe.RangeDesc.Generation
if !leaseCompatible || !descCompatible {
log.VEventf(
ctx,
2,
"out-of-date client information on proxy request %v != %v",
ba.ProxyRangeInfo,
pErr,
)
return nil
}

log.VEventf(ctx, 2, "proxy request for %v after local error %v", ba, pErr)
// TODO(baptist): Correctly set up the span / tracing.
br, pErr := n.proxySender.Send(ctx, ba)
if pErr == nil {
log.VEvent(ctx, 2, "proxy succeeded")
return br
}
// Wrap the error in a ProxyFailedError. It is unwrapped on the client side
// and handled there.
log.VEventf(ctx, 2, "proxy attempt resulted in error %v", pErr)
br = &kvpb.BatchResponse{}
br.Error = kvpb.NewError(kvpb.NewProxyFailedError(pErr.GoError()))
return br
}

// getLocalityComparison takes gatewayNodeID as input and returns the locality
// comparison result between the gateway node and the current node. This result
// indicates whether the two nodes are located in different regions or zones.
Expand Down

0 comments on commit 0fe4c02

Please sign in to comment.