Skip to content

Commit

Permalink
kvserver: proxy requests that return a NotLeaseholderError
Browse files Browse the repository at this point in the history
Previously if a request that required a direct connection to a
leaseholder would fail if the client was unable to initiate this
connection. There is a new field ProxyRangeInfo added to the
BatchRequest header. This field is set by a client which wants to
request the server will proxy the request for it.

There are three main changes as part of this request.
1) In DistSender if the client needs to send a request to a Leaseholder
	 but its transport has moved to the next replica, then it will add the
	 ProxyRangeInfo header to the request and send it to the follower.
2) In DistSender if it is attempting to send a request that already has
	 the ProxyRangeInfo header set, then it will short circuit much of the
	 retry logic and instead only send the request to the leaseholder the
	 original client wanted to send it to.
3) On the server side of the node, it will normally evaulate a request
	 even if ProxyRangeInfo is set. If its local evaluation results in a
	 NotLeaseHolderError and the range info matches what the client set in
	 the ProxyRangeInfo header, it will attempt to proxy the request to
	 the leaseholder and return the response from that request instead.

The impact of this change is to allow Batch requests to succeed as long
as there is connectivity between the client and at least one replica
that has a direct connection to the leaseholder.

Epic: none

Fixes: cockroachdb#93503

Release note (ops change): This PR adds an additional setting
kv.dist_sender.proxy.enabled which is defaulted to true. When it is
enabled, proxy requests will be routed through a follower replica when
the leaseholder is unavailable.
  • Loading branch information
andrewbaptist committed Mar 12, 2024
1 parent 7fe9e1b commit fbe88bb
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 28 deletions.
131 changes: 107 additions & 24 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,13 @@ var sortByLocalityFirst = settings.RegisterBoolSetting(
true,
)

var ProxyBatchRequest = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.dist_sender.proxy.enabled",
"when true, proxy batch requests that can't be routed directly to the leaseholder",
true,
)

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
Expand Down Expand Up @@ -1969,6 +1976,12 @@ func (ds *DistSender) sendPartialBatch(
// Set pErr so that, if we don't perform any more retries, the
// deduceRetryEarlyExitError() call below the loop includes this error.
pErr = kvpb.NewError(err)
// For proxy requests, we not the originator, so don't endlessly retry.
// This is a balance between trying forever and not trying hard enough.
if attempts >= 2 && ba.ProxyRangeInfo != nil {
log.VEventf(ctx, 1, "failing proxy requests after 2 failed attempts %s", err)
break
}
switch {
case IsSendError(err):
// We've tried all the replicas without success. Either they're all
Expand Down Expand Up @@ -2256,7 +2269,7 @@ const defaultSendClosedTimestampPolicy = roachpb.LEAD_FOR_GLOBAL_READS
// AmbiguousResultError. Of those two, the latter has to be passed back to the
// client, while the former should be handled by retrying with an updated range
// descriptor. This method handles other errors returned from replicas
// internally by retrying (NotLeaseholderError, RangeNotFoundError), and falls
// internally by retrying (NotLeaseHolderError, RangeNotFoundError), and falls
// back to a sendError when it runs out of replicas to try.
//
// routing dictates what replicas will be tried (but not necessarily their
Expand Down Expand Up @@ -2301,15 +2314,60 @@ func (ds *DistSender) sendToReplicas(
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
}
desc := routing.Desc()
leaseholder := routing.Leaseholder()
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter)
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, routing.Leaseholder(), replicaFilter)
if err != nil {
return nil, err
}

// This client requested we proxy this request. Only proxy if we can
// determine the leaseholder and it agrees with the ProxyRangeInfo from
// the client. We don't support a proxy request to a non-leaseholder
// replica. If we decide to proxy this request, we will reduce our replica
// list to only the requested proxy. If we fail on that request we fail back
// to the caller so they can try something else.
if ba.ProxyRangeInfo != nil {
log.VErrEventf(ctx, 3, "processing a proxy request")

// Sync our routing information with what the client sent use.
if err := routing.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc); err != nil {
// Update our routing based on the lookup request. If there was a range
// split/merge, and our information is now stale, then we need to retry
// with new routing information. This is a retriable error, return a
// SendError to retry with a fresh transport.
return nil, newSendError(errors.Wrapf(err, "incompatible routing information, reload and try again %v, %v", desc, routing))
}

if routing.Lease().Empty() {
log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing)
// The client had stale information stop processing and update them first.
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(kvpb.NewNotLeaseHolderError(roachpb.Lease{}, 0, routing.Desc(), "client requested a proxy but we can't figure out the leaseholder"))
return &br, nil
}
if ba.ProxyRangeInfo.Lease.Sequence != routing.Lease().Sequence ||
ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation {
msg := fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)
log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(kvpb.NewNotLeaseHolderError(*routing.Lease(), 0, routing.Desc(), msg))
return &br, nil
}

// On a proxy request, we only send the request to the leaseholder. If we
// are here then the client and server agree on the routing information, so
// use the leaseholder as our only replica to send to.
idx := replicas.Find(routing.Leaseholder().ReplicaID)
// This shouldn't happen. We validated the routing above and the token
// is still valid.
if idx == -1 {
return nil, kvpb.NewReplicaUnavailableError(errors.Newf("proxy requested, but inconsistent routing %v %v"), desc, *routing.Leaseholder())
}
replicas = replicas[idx : idx+1]
log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas)
}
// Rearrange the replicas so that they're ordered according to the routing
// policy.
var leaseholderFirst bool
var routeToLeaseholder bool
switch ba.RoutingPolicy {
case kvpb.RoutingPolicy_LEASEHOLDER:
// First order by latency, then move the leaseholder to the front of the
Expand All @@ -2319,12 +2377,12 @@ func (ds *DistSender) sendToReplicas(
}

idx := -1
if leaseholder != nil {
idx = replicas.Find(leaseholder.ReplicaID)
if routing.Leaseholder() != nil {
idx = replicas.Find(routing.Leaseholder().ReplicaID)
}
if idx != -1 {
replicas.MoveToFront(idx)
leaseholderFirst = true
routeToLeaseholder = true
} else {
// The leaseholder node's info must have been missing from gossip when we
// created replicas.
Expand Down Expand Up @@ -2462,6 +2520,41 @@ func (ds *DistSender) sendToReplicas(
comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

// Determine whether we should proxy this request through a follower to
// the leaseholder. The primary condition for proxying is that we would
// like to send the request to the leaseholder, but the transport is
// about to send the request through a follower.
requestToSend := ba
if !ProxyBatchRequest.Get(&ds.st.SV) {
// The setting is disabled, so we don't proxy this request.
} else if ba.ProxyRangeInfo != nil {
// Clear out the proxy information to make it clear that the
// recipient does not need to proxy this request further.
requestToSend = ba.ShallowCopy()
requestToSend.ProxyRangeInfo = nil
} else if !routeToLeaseholder {
// This request isn't intended for the leaseholder so we don't proxy it.
} else if routing.Leaseholder() == nil {
// NB: Normally we don't have both routeToLeaseholder and a nil
// leaseholder. This could be changed to an assertion.
log.Errorf(ctx, "attempting %v to route to leaseholder, but the leaseholder is unknown %v", ba, routing)
} else if ba.Replica.NodeID == routing.Leaseholder().NodeID {
// We are sending this request to the leaseholder, so it doesn't
// make sense to attempt to proxy it.
} else if ds.nodeIDGetter() == ba.Replica.NodeID {
// This condition prevents proxying a request if we are the same
// node as the final destination. Without this we would pass through
// the same DistSender stack again which is pointless.
} else {
// We passed all the conditions above and want to attempt to proxy
// this request. We need to copy it as we are going to modify the
// Header. For other replicas we may not end up setting the header.
requestToSend = ba.ShallowCopy()
rangeInfo := routing.RangeInfo()
requestToSend.ProxyRangeInfo = &rangeInfo
log.VEventf(ctx, 1, "attempt proxy request %v using %v", requestToSend, rangeInfo)
}

tBegin := timeutil.Now() // for slow log message
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).
Track(ctx, ba, tBegin.UnixNano())
Expand All @@ -2470,7 +2563,7 @@ func (ds *DistSender) sendToReplicas(
err = cbErr
transport.SkipReplica()
} else {
br, err = transport.SendNext(sendCtx, ba)
br, err = transport.SendNext(sendCtx, requestToSend)
tEnd := timeutil.Now()
cbToken.Done(br, err, tEnd.UnixNano())

Expand Down Expand Up @@ -2559,21 +2652,10 @@ func (ds *DistSender) sendToReplicas(
// retrying the writes, should it need to be propagated.
if withCommit && !grpcutil.RequestDidNotStart(err) {
ambiguousError = err
} else if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
// If we get a gRPC error against the leaseholder, we don't want to
// backoff and keep trying the same leaseholder against the leaseholder.
// TODO(baptist): This should not be in an else block. Ideally
// we set leaseholderUnavailable to true even if there is an
// ambiguous error as it should be set independent of an
// ambiguous error. TestTransactionUnexpectedlyCommitted test
// fails otherwise. That test is expecting us to retry against
// the leaseholder after we received a gRPC error to validate
// that it now returns a WriteTooOld error. Once the proxy code
// is in place, this can be moved back out of the if block. In
// practice the only impact of having this in the else block is
// that we will retry more times against a leaseholder before
// moving on to the other replicas. There is not an easy way to
// modify the test without this being in the else block.
}
// If we get a gRPC error against the leaseholder, we don't want to
// backoff and keep trying the request against the same leaseholder.
if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
leaseholderUnavailable = true
}
} else {
Expand Down Expand Up @@ -2746,6 +2828,7 @@ func (ds *DistSender) sendToReplicas(
if updatedLeaseholder {
leaseholderUnavailable = false
transport.Reset()
routeToLeaseholder = true
}
// If the leaseholder is the replica that we've just tried, and
// we've tried this replica a bunch of times already, let's move on
Expand Down Expand Up @@ -2781,7 +2864,7 @@ func (ds *DistSender) sendToReplicas(
// have a sufficient closed timestamp. In response, we should
// immediately redirect to the leaseholder, without a backoff
// period.
intentionallySentToFollower := first && !leaseholderFirst
intentionallySentToFollower := first && !routeToLeaseholder
// See if we want to backoff a little before the next attempt. If
// the lease info we got is stale and we were intending to send to
// the leaseholder, we backoff because it might be the case that
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4694,10 +4694,8 @@ func TestPartialPartition(t *testing.T) {
for _, test := range testCases {
t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers),
func(t *testing.T) {
if test.useProxy {
skip.WithIssue(t, 93503)
}
st := cluster.MakeTestingClusterSettings()
kvcoord.ProxyBatchRequest.Override(ctx, &st.SV, test.useProxy)
// With epoch leases this test doesn't work reliably. It passes
// in cases where it should fail and fails in cases where it
// should pass.
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2929,9 +2929,14 @@ message Header {
// in a mixed-version state after a new value got added), DEFAULT is assumed.
cockroach.rpc.ConnectionClass connection_class = 33;

// ProxyRangeInfo is the information the client used to determine it needed to
// proxy this request. It is set if the client is unable to directly send to
// the desired node, but wants it to go there.
RangeInfo proxy_range_info = 34;

reserved 7, 10, 12, 14, 20;

// Next ID: 34
// Next ID: 35
}

// BoundedStalenessHeader contains configuration values pertaining to bounded
Expand Down
94 changes: 94 additions & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
Expand Down Expand Up @@ -1387,9 +1388,18 @@ func (n *Node) batchInternal(
n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes)
writeBytes.Release()
}()

var pErr *kvpb.Error
br, writeBytes, pErr = n.stores.SendWithWriteBytes(ctx, args)
if pErr != nil {
if proxyResponse, wasProxied := n.maybeProxyRequest(ctx, args, pErr); wasProxied {
// If the proxy response succeeded, then we want to return the response
// immediately, however if it failed, then we need to run this request
// locally to see if we can satisfy it or give the client updated
// routing information.
return proxyResponse, nil
}

br = &kvpb.BatchResponse{}
if pErr.Index != nil && keyvissettings.Enabled.Get(&n.storeCfg.Settings.SV) {
// Tell the SpanStatsCollector about the requests in this BatchRequest,
Expand Down Expand Up @@ -1443,6 +1453,90 @@ func (n *Node) batchInternal(
return br, nil
}

// maybeProxyRequest is called after the server returned an error and it
// attempts to proxy the request if it can. We attempt o proxy requests if two
// primary conditions are met:
// 1) The ProxyRangeInfo header is set on the request indicating the client
// would like us to proxy this request if we can't evaluate it.
// 2) Local evaulation has resulted in a NotLeaseHolderError which matches the
// ProxyRangeInfo from the client.
// If these conditions are met, we will attempt to send the request through our
// local DistSender stack and modify the result to
func (n *Node) maybeProxyRequest(
ctx context.Context, ba *kvpb.BatchRequest, pErr *kvpb.Error,
) (*kvpb.BatchResponse, bool) {
if ba.ProxyRangeInfo == nil {
log.VEvent(ctx, 3, "not a proxy request")
return nil, false
}

// The setting is also checked on the client.
if !kvcoord.ProxyBatchRequest.Get(&n.execCfg.Settings.SV) {
log.VEventf(ctx, 3, "proxy disabled")
return nil, false
}

switch tErr := pErr.GetDetail().(type) {
case *kvpb.StoreNotFoundError, *kvpb.RangeNotFoundError:
// TODO(baptist): We don't expect to hit this branch today except in
// cases where we are moving the range off this store and the client has
// stale information. If we proxy requests through non-replicas for a
// range we could remove this restriction and instead handle this proxy
// request.
log.VEventf(ctx, 2, "don't proxy NotFound error %v", pErr)
return nil, false
case *kvpb.NotLeaseHolderError:
// If we would normally return a NLHE because we think the client has
// stale information, see if our information would update the clients
// state. If so, rather than proxying this request, fail back to the
// client first.
leaseCompatible := tErr.Lease == nil || ba.ProxyRangeInfo.Lease.Sequence >= tErr.Lease.Sequence
descCompatible := ba.ProxyRangeInfo.Desc.Generation >= tErr.RangeDesc.Generation
if leaseCompatible && descCompatible {
log.VEventf(ctx, 2, "proxy request %v after error %v with up-tp-date info", ba, pErr)
} else {
log.VEventf(ctx, 2, "out-of-date client information on proxy request %v != %v with up-tp-date info", ba.ProxyRangeInfo, pErr)
return nil, false
}
default:
log.VEventf(ctx, 2, "local returned non-proxyable errors %v", pErr)
return nil, false
}

// TODO(baptist): The TraceInfo here is what was used for processing this
// request locally. Its not clear if this is useful for debugging purposes,
// but there isn't a clean place to put this right now. We can't send a
// request with non-empty TraceInfo. A fast follow-up PR needs to clean up
// how TraceInfo and CollectedSpans are managed for proxy requests.
if !ba.TraceInfo.Empty() {
ba.TraceInfo.Reset()
}
// The BatchRequest API requires the timestamp is empty on all requests and
// sets it based on the ReadTimestamp of the transaction. Clear the
// timestamp so BatchRequest.SetActiveTimestamp doesn't error.
if !ba.Timestamp.IsEmpty() {
ba.Timestamp.Reset()
}
// Send the request through our local dist_sender.
br, pErr := n.proxySender.Send(ctx, ba)
log.VEventf(ctx, 2, "proxy response is %v %v", br, pErr)

if pErr != nil {
// For connection errors, we treat it as though the proxy never happened,
// return the local error from attempting the proxy.
if grpcutil.IsClosedConnection(pErr.GoError()) {
log.VEventf(ctx, 2, "node received network error on proxy request: %s", pErr)
return nil, false
}
// For other errors, put them into a BatchResponse with this error and send
// this error to the client instead.
br = &kvpb.BatchResponse{}
br.Error = pErr
}

return br, true
}

// getLocalityComparison takes gatewayNodeID as input and returns the locality
// comparison result between the gateway node and the current node. This result
// indicates whether the two nodes are located in different regions or zones.
Expand Down

0 comments on commit fbe88bb

Please sign in to comment.