Skip to content

Commit

Permalink
kvserver: proxy requests that return a NotLeaseholderError
Browse files Browse the repository at this point in the history
This handles Partial partitions from the DistSender layer.

DistSender has two major changes:
1) If it wants to send a request to a Leaseholder but the original
         attempt failed and its moved on to a different replica, it will send
         the request as a proxy request to the follower instead of a normal
         request. This is indicated by setting the new ProxyRangeInfo field.
2) If it receives a request with the ProxyRangeInfo field, it will
         create a "1-node" transport with only the leaseholder and
         additionally validate that it agrees with the client on the final
         destination.

On the server side. If we receive a request with ProxyRangeInfo set,
we will proxy that request to the intendend leaseholder rather than even
looking at the request.

Epic: none

Fixes: cockroachdb#93503

Release note (ops change): This PR adds an additional setting
kv.dist_sender.proxy.enabled which is defaulted to true. When it is
enabled, proxy requests will be routed through a follower replica when
the leaseholder is unavailable.
  • Loading branch information
andrewbaptist committed Mar 7, 2024
1 parent 199586f commit 6a29c47
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 23 deletions.
119 changes: 100 additions & 19 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,13 @@ var sortByLocalityFirst = settings.RegisterBoolSetting(
true,
)

var ProxyBatchRequest = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.dist_sender.proxy.enabled",
"when true, proxy batch requests that can't be routed directly to the leaseholder",
true,
)

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
Expand Down Expand Up @@ -1969,6 +1976,12 @@ func (ds *DistSender) sendPartialBatch(
// Set pErr so that, if we don't perform any more retries, the
// deduceRetryEarlyExitError() call below the loop includes this error.
pErr = kvpb.NewError(err)
// For proxy requests, we not the originator, so don't endlessly retry.
// This is a balance between trying forever and not trying hard enough.
if attempts >= 2 && ba.ProxyRangeInfo != nil {
log.VEventf(ctx, 1, "failing proxy requests after 2 failed attempts %s", err)
break
}
switch {
case IsSendError(err):
// We've tried all the replicas without success. Either they're all
Expand Down Expand Up @@ -2256,7 +2269,7 @@ const defaultSendClosedTimestampPolicy = roachpb.LEAD_FOR_GLOBAL_READS
// AmbiguousResultError. Of those two, the latter has to be passed back to the
// client, while the former should be handled by retrying with an updated range
// descriptor. This method handles other errors returned from replicas
// internally by retrying (NotLeaseholderError, RangeNotFoundError), and falls
// internally by retrying (NotLeaseHolderError, RangeNotFoundError), and falls
// back to a sendError when it runs out of replicas to try.
//
// routing dictates what replicas will be tried (but not necessarily their
Expand Down Expand Up @@ -2307,9 +2320,60 @@ func (ds *DistSender) sendToReplicas(
return nil, err
}

var canProxyRequest = ProxyBatchRequest.Get(&ds.st.SV)
// This client requested we proxy this request. Only proxy if we can
// determine the leaseholder and it agrees with the ProxyRangeInfo from
// the client. We don't support a proxy request to a non-leaseholder
// replica. If we decide to proxy this request, we will reduce our replica
// list to only the requested proxy. If we fail on that request we fail back
// to the caller so they can try something else.
if ba.ProxyRangeInfo != nil {
log.VErrEventf(ctx, 3, "processing a proxy request")
// We don't want to re-proxy this request on failures.
canProxyRequest = false

// Sync our routing information with what the client sent use.
if err := routing.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc); err != nil {
// Update our routing based on the lookup request. If there was a range
// split/merge, and our information is now stale, then we need to retry
// with new routing information. This is a retriable error, return a
// SendError to retry with a fresh transport.
return nil, newSendError(errors.Wrapf(err, "incompatible routing information, reload and try again %v, %v", desc, routing))
}

if routing.Lease().Empty() {
log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing)
// The client had stale information stop processing and update them first.
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(kvpb.NewNotLeaseHolderError(roachpb.Lease{}, 0, routing.Desc(), "client requested a proxy but we can't figure out the leaseholder"))
return &br, nil
}
if !ba.ProxyRangeInfo.Lease.Equivalent(*routing.Lease(), true) ||
ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation {
msg := fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)
log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(kvpb.NewNotLeaseHolderError(*routing.Lease(), 0, routing.Desc(), msg))
return &br, nil
}

// On a proxy request, we only send the request to the leaseholder. If we
// are here then the client and server agree on the routing information, so
// use the leaseholder as our only replica to send to.
leaseholder = routing.Leaseholder()
idx := replicas.Find(leaseholder.ReplicaID)
// This shouldn't happen. We validated the routing above and the token
// is still valid.
if idx == -1 {
log.VErrEventf(ctx, 1, "unexpected incompatibility between lease and replica %v, %v", leaseholder, replicas)
return nil, kvpb.NewReplicaUnavailableError(errors.Newf("proxy requested, but leaseholder not in list of replicas %v %v"), desc, *leaseholder)
}
replicas = replicas[idx : idx+1]
log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas)
}
// Rearrange the replicas so that they're ordered according to the routing
// policy.
var leaseholderFirst bool
var routeToLeaseholder bool
switch ba.RoutingPolicy {
case kvpb.RoutingPolicy_LEASEHOLDER:
// First order by latency, then move the leaseholder to the front of the
Expand All @@ -2324,7 +2388,7 @@ func (ds *DistSender) sendToReplicas(
}
if idx != -1 {
replicas.MoveToFront(idx)
leaseholderFirst = true
routeToLeaseholder = true
} else {
// The leaseholder node's info must have been missing from gossip when we
// created replicas.
Expand All @@ -2348,6 +2412,7 @@ func (ds *DistSender) sendToReplicas(
metrics: &ds.metrics,
dontConsiderConnHealth: ds.dontConsiderConnHealth,
}
log.VEventf(ctx, 2, "attempting request with sorted replicas %v and lease %s ", replicas, routing.Leaseholder())
transport, err := ds.transportFactory(opts, replicas)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2462,6 +2527,32 @@ func (ds *DistSender) sendToReplicas(
comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

// Check whether we should proxy this request through a follower to the
// leaseholder. The condition for proxying is that we are trying to send the
// request to the leaseholder, but the transport is trying to send through a
// follower.
if !canProxyRequest {
log.VEventf(ctx, 3, "skip proxy - not proxyable %v", ba)
} else if !routeToLeaseholder {
log.VEventf(ctx, 3, "skip proxy - request not routed to leaseholder %v", ba)
} else if routing.Leaseholder() == nil {
// NB: Normally we don't have both routeToLeaseholder and a nil leaseholder.
// This can happen if the previous leaseholder returns a NotLeaseHolderError
// without a pointer to a valid lease.
log.VEventf(ctx, 3, "skip proxy - leaseholder unknown %v", ba)
} else if ba.Replica.NodeID == routing.Leaseholder().NodeID {
log.VEventf(ctx, 3, "skip proxy - don't proxy to leaseholder %v", ba)
} else if ds.nodeIDGetter() == ba.Replica.NodeID {
log.VEventf(ctx, 3, "skip proxy - don't proxy through self %v", ba)
} else {
// Copy the request. We don't want to modify what the caller sees or
// automatically proxy on a retry failure.
ba = ba.ShallowCopy()
rangeInfo := routing.ToRangeInfo()
ba.ProxyRangeInfo = &rangeInfo
log.VEventf(ctx, 1, "attempt proxy request %v using %v", ba, rangeInfo)
}

tBegin := timeutil.Now() // for slow log message
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).
Track(ctx, ba, tBegin.UnixNano())
Expand Down Expand Up @@ -2559,21 +2650,10 @@ func (ds *DistSender) sendToReplicas(
// retrying the writes, should it need to be propagated.
if withCommit && !grpcutil.RequestDidNotStart(err) {
ambiguousError = err
} else if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
// If we get a gRPC error against the leaseholder, we don't want to
// backoff and keep trying the same leaseholder against the leaseholder.
// TODO(baptist): This should not be in an else block. Ideally
// we set leaseholderUnavailable to true even if there is an
// ambiguous error as it should be set independent of an
// ambiguous error. TestTransactionUnexpectedlyCommitted test
// fails otherwise. That test is expecting us to retry against
// the leaseholder after we received a gRPC error to validate
// that it now returns a WriteTooOld error. Once the proxy code
// is in place, this can be moved back out of the if block. In
// practice the only impact of having this in the else block is
// that we will retry more times against a leaseholder before
// moving on to the other replicas. There is not an easy way to
// modify the test without this being in the else block.
}
// If we get a gRPC error against the leaseholder, we don't want to
// backoff and keep trying the request against the same leaseholder.
if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
leaseholderUnavailable = true
}
} else {
Expand Down Expand Up @@ -2741,6 +2821,7 @@ func (ds *DistSender) sendToReplicas(
// to try all the replicas again since we may make
// different decisions.
transport.Reset()
routeToLeaseholder = true
}
// If the leaseholder is the replica that we've just tried, and
// we've tried this replica a bunch of times already, let's move on
Expand Down Expand Up @@ -2776,7 +2857,7 @@ func (ds *DistSender) sendToReplicas(
// have a sufficient closed timestamp. In response, we should
// immediately redirect to the leaseholder, without a backoff
// period.
intentionallySentToFollower := first && !leaseholderFirst
intentionallySentToFollower := first && !routeToLeaseholder
// See if we want to backoff a little before the next attempt. If
// the lease info we got is stale and we were intending to send to
// the leaseholder, we backoff because it might be the case that
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4700,10 +4700,8 @@ func TestPartialPartition(t *testing.T) {
for _, test := range testCases {
t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers),
func(t *testing.T) {
if test.useProxy {
t.Skip("proxy not supported")
}
st := cluster.MakeTestingClusterSettings()
kvcoord.ProxyBatchRequest.Override(ctx, &st.SV, test.useProxy)
// With epoch leases this test doesn't work reliably. It passes
// in cases where it should fail and fails in cases where it
// should pass.
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2929,9 +2929,14 @@ message Header {
// in a mixed-version state after a new value got added), DEFAULT is assumed.
cockroach.rpc.ConnectionClass connection_class = 33;

// ProxyRangeInfo is the information the client used to determine it needed to
// proxy this request. It is set if the client is unable to directly send to
// the desired node, but wants it to go there.
RangeInfo proxy_range_info = 34;

reserved 7, 10, 12, 14, 20;

// Next ID: 34
// Next ID: 35
}

// BoundedStalenessHeader contains configuration values pertaining to bounded
Expand Down
89 changes: 89 additions & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
Expand Down Expand Up @@ -1387,9 +1388,18 @@ func (n *Node) batchInternal(
n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes)
writeBytes.Release()
}()

var pErr *kvpb.Error
br, writeBytes, pErr = n.stores.SendWithWriteBytes(ctx, args)
if pErr != nil {
if wasProxied, proxyResponse := n.maybeProxyRequest(ctx, args, pErr); wasProxied {
// If the proxy response succeeded, then we want to return the response
// immediately, however if it failed, then we need to run this request
// locally to see if we can satisfy it or give the client updated
// routing information.
return proxyResponse, nil
}

br = &kvpb.BatchResponse{}
if pErr.Index != nil && keyvissettings.Enabled.Get(&n.storeCfg.Settings.SV) {
// Tell the SpanStatsCollector about the requests in this BatchRequest,
Expand Down Expand Up @@ -1443,6 +1453,85 @@ func (n *Node) batchInternal(
return br, nil
}

// maybeProxyRequest is called after the server returned an error and it
// attempts to proxy the request if it can. It first checks first if the
func (n *Node) maybeProxyRequest(
ctx context.Context, ba *kvpb.BatchRequest, pErr *kvpb.Error,
) (bool, *kvpb.BatchResponse) {
if ba.ProxyRangeInfo == nil {
log.VEvent(ctx, 3, "not a proxy request")
return false, nil
}

// Normally the setting is checked on the client, however it is safest to
// check it here also.
if !kvcoord.ProxyBatchRequest.Get(&n.execCfg.Settings.SV) {
log.VEventf(ctx, 3, "proxy disabled")
return false, nil
}

// This is unexpected. The sender should not have sent us this request, but
// log anyway to make sure.
if ba.GatewayNodeID == n.Descriptor.NodeID {
log.VErrEventf(ctx, 1, "we are already the client, don't self proxy")
return false, nil
}
// The request is a proxy a request, but we were unable to process locally.
if ba.ProxyRangeInfo.Lease.Replica.NodeID == n.Descriptor.NodeID {
log.VEventf(ctx, 2, "we are the proxy target process request locally")
return false, nil
}

switch tErr := pErr.GetDetail().(type) {
case *kvpb.StoreNotFoundError, *kvpb.RangeNotFoundError:
log.VEventf(ctx, 2, "proxy request %v after error %v", ba, pErr)
// TODO(baptist): Should we proxy this request? Today this is indicates
// the client has stale information. In the future we could remove this
// return and instead send this as a proxy request.
return false, nil
case *kvpb.NotLeaseHolderError:
// If we would normally return a NLHE because we think the client has
// stale information, see if our information would update the clients
// state. If so, rather than proxying this request, fail back to the
// client first.
leaseCompatible := tErr.Lease == nil || ba.ProxyRangeInfo.Lease.Sequence >= tErr.Lease.Sequence
descCompatible := ba.ProxyRangeInfo.Desc.Generation >= tErr.RangeDesc.Generation
if leaseCompatible && descCompatible {
log.VEventf(ctx, 2, "proxy request %v after error %v with up-tp-date info", ba, pErr)
} else {
log.VEventf(ctx, 2, "out-of-date client information on proxy request %v != %v with up-tp-date info", ba.ProxyRangeInfo, pErr)
return false, nil
}
default:
log.VEventf(ctx, 2, "local returned non-proxyable errors %v", pErr)
return false, nil
}

// TODO(baptist): We need to save the old trace info and re-append after we
// send the request.
if !ba.TraceInfo.Empty() {
ba.TraceInfo.Reset()
}
// Send the request through our local dist_sender.
br, pErr := n.proxySender.Send(ctx, ba)
log.VEventf(ctx, 2, "proxy response is %v %v", br, pErr)

if pErr != nil {
// For connection errors, we treat it as though the proxy never happened,
// return the local error from attempting the proxy.
if grpcutil.IsClosedConnection(pErr.GoError()) {
log.VEventf(ctx, 2, "node received network error on proxy request: %s", pErr)
return false, nil
}
// For other errors, put them into a BatchResponse with this error and send
// this error to the client instead.
br = &kvpb.BatchResponse{}
br.Error = pErr
}

return true, br
}

// getLocalityComparison takes gatewayNodeID as input and returns the locality
// comparison result between the gateway node and the current node. This result
// indicates whether the two nodes are located in different regions or zones.
Expand Down

0 comments on commit 6a29c47

Please sign in to comment.