From 0fe4c02a198a3f0f43665ca2c1678fbfa5582ccc Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 4 Jan 2024 16:45:21 -0500 Subject: [PATCH] kvserver: proxy requests that return a NotLeaseholderError Previously if a request that required a direct connection to a leaseholder would fail if the client was unable to initiate this connection. This PR adds the client and server sides of proxy handling to work around partial partitions. There are three main changes as part of this request. 1) In DistSender, if the client needs to send a request to a leaseholder but the transport has moved to the next replica, then it will add the ProxyRangeInfo header to the request and send it to each follower. 2) In DistSender, if it is attempting to send a request that already has the ProxyRangeInfo header set, then it will short circuit much of the retry logic and only send the request to the leaseholder based on the ProxyRangeInfo header. 3) On the server side of the node, it will normally evaulate a request even if ProxyRangeInfo is set. If local evaluation results in a NotLeaseHolderError and the range info matches what the client set in the ProxyRangeInfo header, it will attempt to proxy the request to the leaseholder. It will return the end users response rather than its local response. The impact of this change is to allow a BatchRequest to succeed as long as there is connectivity between the client and at least one replica that has a direct connection to the leaseholder. Epic: none Fixes: #93503 Release note (ops change): This PR adds an additional setting kv.dist_sender.proxy.enabled which is defaulted to true. When it is enabled, proxy requests will be routed through a follower replica when the leaseholder is unavailable. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 159 +++++++++++++++--- .../kvcoord/dist_sender_server_test.go | 4 +- pkg/kv/kvpb/errordetailtype_string.go | 3 + pkg/server/node.go | 77 +++++++++ 4 files changed, 216 insertions(+), 27 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 719331ff4197..94bdbcdfcfc8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -303,6 +303,13 @@ var sortByLocalityFirst = settings.RegisterBoolSetting( true, ) +var ProxyBatchRequest = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "kv.dist_sender.proxy.enabled", + "when true, proxy batch requests that can't be routed directly to the leaseholder", + true, +) + // DistSenderMetrics is the set of metrics for a given distributed sender. type DistSenderMetrics struct { BatchCount *metric.Counter @@ -1896,6 +1903,18 @@ func (ds *DistSender) sendPartialBatch( attempts++ pErr = nil // If we've invalidated the descriptor on a send failure, re-lookup. + + // On a proxy request, update our routing information with what the + // client sent us if the client had newer information. We have already + // validated the client request against our local replica state in + // node.go and reject requests with stale information. Here we ensure + // our RangeCache has the same information as both the client request + // and our local replica before attempting the request. If the sync + // makes our token invalid, we handle it similarly to a RangeNotFound or + // NotLeaseHolderError from a remote server. + if ba.ProxyRangeInfo != nil { + routingTok.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc) + } if !routingTok.Valid() { var descKey roachpb.RKey if isReverse { @@ -1969,6 +1988,11 @@ func (ds *DistSender) sendPartialBatch( // Set pErr so that, if we don't perform any more retries, the // deduceRetryEarlyExitError() call below the loop includes this error. pErr = kvpb.NewError(err) + // Proxy requests are not retried since we not the originator. + if ba.ProxyRangeInfo != nil { + log.VEventf(ctx, 1, "failing proxy request after error %s", err) + break + } switch { case IsSendError(err): // We've tried all the replicas without success. Either they're all @@ -2262,7 +2286,7 @@ const defaultSendClosedTimestampPolicy = roachpb.LEAD_FOR_GLOBAL_READS // AmbiguousResultError. Of those two, the latter has to be passed back to the // client, while the former should be handled by retrying with an updated range // descriptor. This method handles other errors returned from replicas -// internally by retrying (NotLeaseholderError, RangeNotFoundError), and falls +// internally by retrying (NotLeaseHolderError, RangeNotFoundError), and falls // back to a sendError when it runs out of replicas to try. // // routing dictates what replicas will be tried (but not necessarily their @@ -2307,15 +2331,62 @@ func (ds *DistSender) sendToReplicas( log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy) } desc := routing.Desc() - leaseholder := routing.Leaseholder() - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter) + replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, routing.Leaseholder(), replicaFilter) if err != nil { return nil, err } + // This client requested we proxy this request. Only proxy if we can + // determine the leaseholder and it agrees with the ProxyRangeInfo from + // the client. We don't support a proxy request to a non-leaseholder + // replica. If we decide to proxy this request, we will reduce our replica + // list to only the requested replica. If we fail on that request we fail back + // to the caller so they can try something else. + if ba.ProxyRangeInfo != nil { + log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo) + // We don't know who the leaseholder is, and it is likely that the + // client had stale information. Return our information to them through + // a NLHE and let them retry. + if routing.Lease().Empty() { + log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing) + br := kvpb.BatchResponse{} + br.Error = kvpb.NewError( + kvpb.NewNotLeaseHolderError(roachpb.Lease{}, + 0, /* proposerStoreID */ + routing.Desc(), + "client requested a proxy but we can't figure out the leaseholder"), + ) + return &br, nil + } + if ba.ProxyRangeInfo.Lease.Sequence != routing.Lease().Sequence || + ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation { + log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing) + br := kvpb.BatchResponse{} + br.Error = kvpb.NewError( + kvpb.NewNotLeaseHolderError( + *routing.Lease(), + 0, /* proposerStoreID */ + routing.Desc(), + fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)), + ) + return &br, nil + } + + // On a proxy request, we only send the request to the leaseholder. If we + // are here then the client and server agree on the routing information, so + // use the leaseholder as our only replica to send to. + idx := replicas.Find(routing.Leaseholder().ReplicaID) + // This should never happen. We validated the routing above and the token + // is still valid. + if idx == -1 { + return nil, errors.AssertionFailedf("inconsistent routing %v %v", desc, *routing.Leaseholder()) + } + replicas = replicas[idx : idx+1] + log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas) + } // Rearrange the replicas so that they're ordered according to the routing // policy. - var leaseholderFirst bool + var routeToLeaseholder bool switch ba.RoutingPolicy { case kvpb.RoutingPolicy_LEASEHOLDER: // First order by latency, then move the leaseholder to the front of the @@ -2325,12 +2396,12 @@ func (ds *DistSender) sendToReplicas( } idx := -1 - if leaseholder != nil { - idx = replicas.Find(leaseholder.ReplicaID) + if routing.Leaseholder() != nil { + idx = replicas.Find(routing.Leaseholder().ReplicaID) } if idx != -1 { replicas.MoveToFront(idx) - leaseholderFirst = true + routeToLeaseholder = true } else { // The leaseholder node's info must have been missing from gossip when we // created replicas. @@ -2468,6 +2539,44 @@ func (ds *DistSender) sendToReplicas( comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID) ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size())) + // Determine whether we should proxy this request through a follower to + // the leaseholder. The primary condition for proxying is that we would + // like to send the request to the leaseholder, but the transport is + // about to send the request through a follower. + requestToSend := ba + if !ProxyBatchRequest.Get(&ds.st.SV) { + // The setting is disabled, so we don't proxy this request. + } else if ba.ProxyRangeInfo != nil { + // Clear out the proxy information to prevent the recipient from + // sending this request onwards. This is necessary to prevent proxy + // chaining. We want the recipient to process the request or fail + // immediately. This is an extra safety measure to prevent any types + // of routing loops. + requestToSend = ba.ShallowCopy() + requestToSend.ProxyRangeInfo = nil + } else if !routeToLeaseholder { + // This request isn't intended for the leaseholder so we don't proxy it. + } else if routing.Leaseholder() == nil { + // NB: Normally we don't have both routeToLeaseholder and a nil + // leaseholder. This could be changed to an assertion. + log.Errorf(ctx, "attempting %v to route to leaseholder, but the leaseholder is unknown %v", ba, routing) + } else if ba.Replica.NodeID == routing.Leaseholder().NodeID { + // We are sending this request to the leaseholder, so it doesn't + // make sense to attempt to proxy it. + } else if ds.nodeIDGetter() == ba.Replica.NodeID { + // This condition prevents proxying a request if we are the same + // node as the final destination. Without this we would pass through + // the same DistSender stack again which is pointless. + } else { + // We passed all the conditions above and want to attempt to proxy + // this request. We need to copy it as we are going to modify the + // Header. For other replicas we may not end up setting the header. + requestToSend = ba.ShallowCopy() + rangeInfo := routing.RangeInfo() + requestToSend.ProxyRangeInfo = &rangeInfo + log.VEventf(ctx, 1, "attempt proxy request %v using %v", requestToSend, rangeInfo) + } + tBegin := timeutil.Now() // for slow log message sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica). Track(ctx, ba, tBegin.UnixNano()) @@ -2476,7 +2585,7 @@ func (ds *DistSender) sendToReplicas( err = cbErr transport.SkipReplica() } else { - br, err = transport.SendNext(sendCtx, ba) + br, err = transport.SendNext(sendCtx, requestToSend) tEnd := timeutil.Now() cbToken.Done(br, err, tEnd.UnixNano()) @@ -2492,6 +2601,18 @@ func (ds *DistSender) sendToReplicas( } } } + if err == nil { + if proxyErr, ok := br.Error.GetDetail().(*kvpb.ProxyFailedError); ok { + // The server proxy attempt resulted in a non-BatchRequest error, likely + // a communication error. Convert the wrapped error into an error on our + // side Depending on the type of request and what the error is we may + // treat the error an ambiguous error. Clear out the BatchResponse as + // the only information it contained was this error. + err = proxyErr.Unwrap() + br = nil + log.VEventf(ctx, 2, "proxy send error: %s", err) + } + } ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size())) ds.maybeIncrementErrCounters(br, err) @@ -2557,21 +2678,10 @@ func (ds *DistSender) sendToReplicas( // retrying the writes, should it need to be propagated. if withCommit && !grpcutil.RequestDidNotStart(err) { ambiguousError = err - } else if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) { - // If we get a gRPC error against the leaseholder, we don't want to - // backoff and keep trying the same leaseholder against the leaseholder. - // TODO(baptist): This should not be in an else block. Ideally - // we set leaseholderUnavailable to true even if there is an - // ambiguous error as it should be set independent of an - // ambiguous error. TestTransactionUnexpectedlyCommitted test - // fails otherwise. That test is expecting us to retry against - // the leaseholder after we received a gRPC error to validate - // that it now returns a WriteTooOld error. Once the proxy code - // is in place, this can be moved back out of the if block. In - // practice the only impact of having this in the else block is - // that we will retry more times against a leaseholder before - // moving on to the other replicas. There is not an easy way to - // modify the test without this being in the else block. + } + // If we get a gRPC error against the leaseholder, we don't want to + // backoff and keep trying the request against the same leaseholder. + if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) { leaseholderUnavailable = true } } else { @@ -2738,6 +2848,7 @@ func (ds *DistSender) sendToReplicas( // error too aggressively. if updatedLeaseholder { leaseholderUnavailable = false + routeToLeaseholder = true } // If the leaseholder is the replica that we've just tried, and // we've tried this replica a bunch of times already, let's move on @@ -2773,7 +2884,7 @@ func (ds *DistSender) sendToReplicas( // have a sufficient closed timestamp. In response, we should // immediately redirect to the leaseholder, without a backoff // period. - intentionallySentToFollower := first && !leaseholderFirst + intentionallySentToFollower := first && !routeToLeaseholder // See if we want to backoff a little before the next attempt. If // the lease info we got is stale and we were intending to send to // the leaseholder, we backoff because it might be the case that diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index ddf0ed94e0b1..cbfabd0fff64 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -4694,10 +4694,8 @@ func TestPartialPartition(t *testing.T) { for _, test := range testCases { t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers), func(t *testing.T) { - if test.useProxy { - skip.WithIssue(t, 93503) - } st := cluster.MakeTestingClusterSettings() + kvcoord.ProxyBatchRequest.Override(ctx, &st.SV, test.useProxy) // With epoch leases this test doesn't work reliably. It passes // in cases where it should fail and fails in cases where it // should pass. diff --git a/pkg/kv/kvpb/errordetailtype_string.go b/pkg/kv/kvpb/errordetailtype_string.go index 6abcea8fb453..f0d593236b49 100644 --- a/pkg/kv/kvpb/errordetailtype_string.go +++ b/pkg/kv/kvpb/errordetailtype_string.go @@ -43,6 +43,7 @@ func _() { _ = x[MVCCHistoryMutationErrType-44] _ = x[LockConflictErrType-45] _ = x[ReplicaUnavailableErrType-46] + _ = x[ProxyFailedErrType-47] _ = x[CommunicationErrType-22] _ = x[InternalErrType-25] } @@ -119,6 +120,8 @@ func (i ErrorDetailType) String() string { return "LockConflictErrType" case ReplicaUnavailableErrType: return "ReplicaUnavailableErrType" + case ProxyFailedErrType: + return "ProxyFailedErrType" case CommunicationErrType: return "CommunicationErrType" case InternalErrType: diff --git a/pkg/server/node.go b/pkg/server/node.go index 8c008d4b63c4..0a1037e11822 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1410,9 +1410,34 @@ func (n *Node) batchInternal( n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes) writeBytes.Release() }() + + // If a proxy attempt is requested, we copy the request to prevent evaluation + // from modifying the request. There are places on the server that can modify + // the request, and we can't keep these modifications if we later proxy it. + // Note we only ShallowCopy, so care must be taken with internal changes. + // For reference some of the places are: + // SetActiveTimestamp - sets the Header.Timestamp + // maybeStripInFlightWrites - can modify internal EndTxn requests + // tryBumpBatchTimestamp - can modify the txn.ReadTimestamp + // TODO(baptist): Other code copies the BatchRequest, in some cases + // unnecessarily, to prevent modifying the passed in request. We should clean + // up the contract of the Send method to allow modifying the request or more + // strictly enforce that the callee is not allowed to change it. + var originalRequest *kvpb.BatchRequest + if args.ProxyRangeInfo != nil { + originalRequest = args.ShallowCopy() + } var pErr *kvpb.Error br, writeBytes, pErr = n.stores.SendWithWriteBytes(ctx, args) if pErr != nil { + if originalRequest != nil { + if proxyResponse := n.maybeProxyRequest(ctx, originalRequest, pErr); proxyResponse != nil { + // If the proxy request succeeded then return its result instead of + // our error. If not, use our original error. + return proxyResponse, nil + } + } + br = &kvpb.BatchResponse{} if pErr.Index != nil && keyvissettings.Enabled.Get(&n.storeCfg.Settings.SV) { // Tell the SpanStatsCollector about the requests in this BatchRequest, @@ -1466,6 +1491,58 @@ func (n *Node) batchInternal( return br, nil } +// maybeProxyRequest is called after the server returned an error and it +// attempts to proxy the request if it can. We attempt o proxy requests if two +// primary conditions are met: +// 1) The ProxyRangeInfo header is set on the request indicating the client +// would like us to proxy this request if we can't evaluate it. +// 2) Local evaluation has resulted in a NotLeaseHolderError which matches the +// ProxyRangeInfo from the client. +// If these conditions are met, attempt to send the request through our local +// DistSender stack and use that result instead of our error. +func (n *Node) maybeProxyRequest( + ctx context.Context, ba *kvpb.BatchRequest, pErr *kvpb.Error, +) *kvpb.BatchResponse { + // NB: We don't handle StoreNotFound or RangeNotFound errors. If we want to + // support proxy requests through non-replicas we could proxy those errors + // as well. + var nlhe *kvpb.NotLeaseHolderError + if ok := errors.As(pErr.GetDetail(), &nlhe); !ok { + log.VEventf(ctx, 2, "non-proxyable errors %v", pErr) + return nil + } + // If because we think the client has + // stale information, see if our information would update the clients + // state. If so, rather than proxying this request, fail back to the + // client first. + leaseCompatible := nlhe.Lease != nil && ba.ProxyRangeInfo.Lease.Sequence >= nlhe.Lease.Sequence + descCompatible := ba.ProxyRangeInfo.Desc.Generation >= nlhe.RangeDesc.Generation + if !leaseCompatible || !descCompatible { + log.VEventf( + ctx, + 2, + "out-of-date client information on proxy request %v != %v", + ba.ProxyRangeInfo, + pErr, + ) + return nil + } + + log.VEventf(ctx, 2, "proxy request for %v after local error %v", ba, pErr) + // TODO(baptist): Correctly set up the span / tracing. + br, pErr := n.proxySender.Send(ctx, ba) + if pErr == nil { + log.VEvent(ctx, 2, "proxy succeeded") + return br + } + // Wrap the error in a ProxyFailedError. It is unwrapped on the client side + // and handled there. + log.VEventf(ctx, 2, "proxy attempt resulted in error %v", pErr) + br = &kvpb.BatchResponse{} + br.Error = kvpb.NewError(kvpb.NewProxyFailedError(pErr.GoError())) + return br +} + // getLocalityComparison takes gatewayNodeID as input and returns the locality // comparison result between the gateway node and the current node. This result // indicates whether the two nodes are located in different regions or zones.