From fbe88bbe31df19912c6faa5234c7edc863b95a15 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 4 Jan 2024 16:45:21 -0500 Subject: [PATCH] kvserver: proxy requests that return a NotLeaseholderError Previously if a request that required a direct connection to a leaseholder would fail if the client was unable to initiate this connection. There is a new field ProxyRangeInfo added to the BatchRequest header. This field is set by a client which wants to request the server will proxy the request for it. There are three main changes as part of this request. 1) In DistSender if the client needs to send a request to a Leaseholder but its transport has moved to the next replica, then it will add the ProxyRangeInfo header to the request and send it to the follower. 2) In DistSender if it is attempting to send a request that already has the ProxyRangeInfo header set, then it will short circuit much of the retry logic and instead only send the request to the leaseholder the original client wanted to send it to. 3) On the server side of the node, it will normally evaulate a request even if ProxyRangeInfo is set. If its local evaluation results in a NotLeaseHolderError and the range info matches what the client set in the ProxyRangeInfo header, it will attempt to proxy the request to the leaseholder and return the response from that request instead. The impact of this change is to allow Batch requests to succeed as long as there is connectivity between the client and at least one replica that has a direct connection to the leaseholder. Epic: none Fixes: #93503 Release note (ops change): This PR adds an additional setting kv.dist_sender.proxy.enabled which is defaulted to true. When it is enabled, proxy requests will be routed through a follower replica when the leaseholder is unavailable. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 131 ++++++++++++++---- .../kvcoord/dist_sender_server_test.go | 4 +- pkg/kv/kvpb/api.proto | 7 +- pkg/server/node.go | 94 +++++++++++++ 4 files changed, 208 insertions(+), 28 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 406b943cd0c7..dea4ba488fba 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -303,6 +303,13 @@ var sortByLocalityFirst = settings.RegisterBoolSetting( true, ) +var ProxyBatchRequest = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "kv.dist_sender.proxy.enabled", + "when true, proxy batch requests that can't be routed directly to the leaseholder", + true, +) + // DistSenderMetrics is the set of metrics for a given distributed sender. type DistSenderMetrics struct { BatchCount *metric.Counter @@ -1969,6 +1976,12 @@ func (ds *DistSender) sendPartialBatch( // Set pErr so that, if we don't perform any more retries, the // deduceRetryEarlyExitError() call below the loop includes this error. pErr = kvpb.NewError(err) + // For proxy requests, we not the originator, so don't endlessly retry. + // This is a balance between trying forever and not trying hard enough. + if attempts >= 2 && ba.ProxyRangeInfo != nil { + log.VEventf(ctx, 1, "failing proxy requests after 2 failed attempts %s", err) + break + } switch { case IsSendError(err): // We've tried all the replicas without success. Either they're all @@ -2256,7 +2269,7 @@ const defaultSendClosedTimestampPolicy = roachpb.LEAD_FOR_GLOBAL_READS // AmbiguousResultError. Of those two, the latter has to be passed back to the // client, while the former should be handled by retrying with an updated range // descriptor. This method handles other errors returned from replicas -// internally by retrying (NotLeaseholderError, RangeNotFoundError), and falls +// internally by retrying (NotLeaseHolderError, RangeNotFoundError), and falls // back to a sendError when it runs out of replicas to try. // // routing dictates what replicas will be tried (but not necessarily their @@ -2301,15 +2314,60 @@ func (ds *DistSender) sendToReplicas( log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy) } desc := routing.Desc() - leaseholder := routing.Leaseholder() - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter) + replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, routing.Leaseholder(), replicaFilter) if err != nil { return nil, err } + // This client requested we proxy this request. Only proxy if we can + // determine the leaseholder and it agrees with the ProxyRangeInfo from + // the client. We don't support a proxy request to a non-leaseholder + // replica. If we decide to proxy this request, we will reduce our replica + // list to only the requested proxy. If we fail on that request we fail back + // to the caller so they can try something else. + if ba.ProxyRangeInfo != nil { + log.VErrEventf(ctx, 3, "processing a proxy request") + + // Sync our routing information with what the client sent use. + if err := routing.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc); err != nil { + // Update our routing based on the lookup request. If there was a range + // split/merge, and our information is now stale, then we need to retry + // with new routing information. This is a retriable error, return a + // SendError to retry with a fresh transport. + return nil, newSendError(errors.Wrapf(err, "incompatible routing information, reload and try again %v, %v", desc, routing)) + } + + if routing.Lease().Empty() { + log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing) + // The client had stale information stop processing and update them first. + br := kvpb.BatchResponse{} + br.Error = kvpb.NewError(kvpb.NewNotLeaseHolderError(roachpb.Lease{}, 0, routing.Desc(), "client requested a proxy but we can't figure out the leaseholder")) + return &br, nil + } + if ba.ProxyRangeInfo.Lease.Sequence != routing.Lease().Sequence || + ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation { + msg := fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing) + log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing) + br := kvpb.BatchResponse{} + br.Error = kvpb.NewError(kvpb.NewNotLeaseHolderError(*routing.Lease(), 0, routing.Desc(), msg)) + return &br, nil + } + + // On a proxy request, we only send the request to the leaseholder. If we + // are here then the client and server agree on the routing information, so + // use the leaseholder as our only replica to send to. + idx := replicas.Find(routing.Leaseholder().ReplicaID) + // This shouldn't happen. We validated the routing above and the token + // is still valid. + if idx == -1 { + return nil, kvpb.NewReplicaUnavailableError(errors.Newf("proxy requested, but inconsistent routing %v %v"), desc, *routing.Leaseholder()) + } + replicas = replicas[idx : idx+1] + log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas) + } // Rearrange the replicas so that they're ordered according to the routing // policy. - var leaseholderFirst bool + var routeToLeaseholder bool switch ba.RoutingPolicy { case kvpb.RoutingPolicy_LEASEHOLDER: // First order by latency, then move the leaseholder to the front of the @@ -2319,12 +2377,12 @@ func (ds *DistSender) sendToReplicas( } idx := -1 - if leaseholder != nil { - idx = replicas.Find(leaseholder.ReplicaID) + if routing.Leaseholder() != nil { + idx = replicas.Find(routing.Leaseholder().ReplicaID) } if idx != -1 { replicas.MoveToFront(idx) - leaseholderFirst = true + routeToLeaseholder = true } else { // The leaseholder node's info must have been missing from gossip when we // created replicas. @@ -2462,6 +2520,41 @@ func (ds *DistSender) sendToReplicas( comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID) ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size())) + // Determine whether we should proxy this request through a follower to + // the leaseholder. The primary condition for proxying is that we would + // like to send the request to the leaseholder, but the transport is + // about to send the request through a follower. + requestToSend := ba + if !ProxyBatchRequest.Get(&ds.st.SV) { + // The setting is disabled, so we don't proxy this request. + } else if ba.ProxyRangeInfo != nil { + // Clear out the proxy information to make it clear that the + // recipient does not need to proxy this request further. + requestToSend = ba.ShallowCopy() + requestToSend.ProxyRangeInfo = nil + } else if !routeToLeaseholder { + // This request isn't intended for the leaseholder so we don't proxy it. + } else if routing.Leaseholder() == nil { + // NB: Normally we don't have both routeToLeaseholder and a nil + // leaseholder. This could be changed to an assertion. + log.Errorf(ctx, "attempting %v to route to leaseholder, but the leaseholder is unknown %v", ba, routing) + } else if ba.Replica.NodeID == routing.Leaseholder().NodeID { + // We are sending this request to the leaseholder, so it doesn't + // make sense to attempt to proxy it. + } else if ds.nodeIDGetter() == ba.Replica.NodeID { + // This condition prevents proxying a request if we are the same + // node as the final destination. Without this we would pass through + // the same DistSender stack again which is pointless. + } else { + // We passed all the conditions above and want to attempt to proxy + // this request. We need to copy it as we are going to modify the + // Header. For other replicas we may not end up setting the header. + requestToSend = ba.ShallowCopy() + rangeInfo := routing.RangeInfo() + requestToSend.ProxyRangeInfo = &rangeInfo + log.VEventf(ctx, 1, "attempt proxy request %v using %v", requestToSend, rangeInfo) + } + tBegin := timeutil.Now() // for slow log message sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica). Track(ctx, ba, tBegin.UnixNano()) @@ -2470,7 +2563,7 @@ func (ds *DistSender) sendToReplicas( err = cbErr transport.SkipReplica() } else { - br, err = transport.SendNext(sendCtx, ba) + br, err = transport.SendNext(sendCtx, requestToSend) tEnd := timeutil.Now() cbToken.Done(br, err, tEnd.UnixNano()) @@ -2559,21 +2652,10 @@ func (ds *DistSender) sendToReplicas( // retrying the writes, should it need to be propagated. if withCommit && !grpcutil.RequestDidNotStart(err) { ambiguousError = err - } else if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) { - // If we get a gRPC error against the leaseholder, we don't want to - // backoff and keep trying the same leaseholder against the leaseholder. - // TODO(baptist): This should not be in an else block. Ideally - // we set leaseholderUnavailable to true even if there is an - // ambiguous error as it should be set independent of an - // ambiguous error. TestTransactionUnexpectedlyCommitted test - // fails otherwise. That test is expecting us to retry against - // the leaseholder after we received a gRPC error to validate - // that it now returns a WriteTooOld error. Once the proxy code - // is in place, this can be moved back out of the if block. In - // practice the only impact of having this in the else block is - // that we will retry more times against a leaseholder before - // moving on to the other replicas. There is not an easy way to - // modify the test without this being in the else block. + } + // If we get a gRPC error against the leaseholder, we don't want to + // backoff and keep trying the request against the same leaseholder. + if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) { leaseholderUnavailable = true } } else { @@ -2746,6 +2828,7 @@ func (ds *DistSender) sendToReplicas( if updatedLeaseholder { leaseholderUnavailable = false transport.Reset() + routeToLeaseholder = true } // If the leaseholder is the replica that we've just tried, and // we've tried this replica a bunch of times already, let's move on @@ -2781,7 +2864,7 @@ func (ds *DistSender) sendToReplicas( // have a sufficient closed timestamp. In response, we should // immediately redirect to the leaseholder, without a backoff // period. - intentionallySentToFollower := first && !leaseholderFirst + intentionallySentToFollower := first && !routeToLeaseholder // See if we want to backoff a little before the next attempt. If // the lease info we got is stale and we were intending to send to // the leaseholder, we backoff because it might be the case that diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 22e00269cd84..e15ef7a4a6b6 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -4694,10 +4694,8 @@ func TestPartialPartition(t *testing.T) { for _, test := range testCases { t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers), func(t *testing.T) { - if test.useProxy { - skip.WithIssue(t, 93503) - } st := cluster.MakeTestingClusterSettings() + kvcoord.ProxyBatchRequest.Override(ctx, &st.SV, test.useProxy) // With epoch leases this test doesn't work reliably. It passes // in cases where it should fail and fails in cases where it // should pass. diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 75a5b8031ede..dd5f4bebfab1 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2929,9 +2929,14 @@ message Header { // in a mixed-version state after a new value got added), DEFAULT is assumed. cockroach.rpc.ConnectionClass connection_class = 33; + // ProxyRangeInfo is the information the client used to determine it needed to + // proxy this request. It is set if the client is unable to directly send to + // the desired node, but wants it to go there. + RangeInfo proxy_range_info = 34; + reserved 7, 10, 12, 14, 20; - // Next ID: 34 + // Next ID: 35 } // BoundedStalenessHeader contains configuration values pertaining to bounded diff --git a/pkg/server/node.go b/pkg/server/node.go index 36db6fc38e07..7e0637057aea 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -58,6 +58,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/future" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" @@ -1387,9 +1388,18 @@ func (n *Node) batchInternal( n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes) writeBytes.Release() }() + var pErr *kvpb.Error br, writeBytes, pErr = n.stores.SendWithWriteBytes(ctx, args) if pErr != nil { + if proxyResponse, wasProxied := n.maybeProxyRequest(ctx, args, pErr); wasProxied { + // If the proxy response succeeded, then we want to return the response + // immediately, however if it failed, then we need to run this request + // locally to see if we can satisfy it or give the client updated + // routing information. + return proxyResponse, nil + } + br = &kvpb.BatchResponse{} if pErr.Index != nil && keyvissettings.Enabled.Get(&n.storeCfg.Settings.SV) { // Tell the SpanStatsCollector about the requests in this BatchRequest, @@ -1443,6 +1453,90 @@ func (n *Node) batchInternal( return br, nil } +// maybeProxyRequest is called after the server returned an error and it +// attempts to proxy the request if it can. We attempt o proxy requests if two +// primary conditions are met: +// 1) The ProxyRangeInfo header is set on the request indicating the client +// would like us to proxy this request if we can't evaluate it. +// 2) Local evaulation has resulted in a NotLeaseHolderError which matches the +// ProxyRangeInfo from the client. +// If these conditions are met, we will attempt to send the request through our +// local DistSender stack and modify the result to +func (n *Node) maybeProxyRequest( + ctx context.Context, ba *kvpb.BatchRequest, pErr *kvpb.Error, +) (*kvpb.BatchResponse, bool) { + if ba.ProxyRangeInfo == nil { + log.VEvent(ctx, 3, "not a proxy request") + return nil, false + } + + // The setting is also checked on the client. + if !kvcoord.ProxyBatchRequest.Get(&n.execCfg.Settings.SV) { + log.VEventf(ctx, 3, "proxy disabled") + return nil, false + } + + switch tErr := pErr.GetDetail().(type) { + case *kvpb.StoreNotFoundError, *kvpb.RangeNotFoundError: + // TODO(baptist): We don't expect to hit this branch today except in + // cases where we are moving the range off this store and the client has + // stale information. If we proxy requests through non-replicas for a + // range we could remove this restriction and instead handle this proxy + // request. + log.VEventf(ctx, 2, "don't proxy NotFound error %v", pErr) + return nil, false + case *kvpb.NotLeaseHolderError: + // If we would normally return a NLHE because we think the client has + // stale information, see if our information would update the clients + // state. If so, rather than proxying this request, fail back to the + // client first. + leaseCompatible := tErr.Lease == nil || ba.ProxyRangeInfo.Lease.Sequence >= tErr.Lease.Sequence + descCompatible := ba.ProxyRangeInfo.Desc.Generation >= tErr.RangeDesc.Generation + if leaseCompatible && descCompatible { + log.VEventf(ctx, 2, "proxy request %v after error %v with up-tp-date info", ba, pErr) + } else { + log.VEventf(ctx, 2, "out-of-date client information on proxy request %v != %v with up-tp-date info", ba.ProxyRangeInfo, pErr) + return nil, false + } + default: + log.VEventf(ctx, 2, "local returned non-proxyable errors %v", pErr) + return nil, false + } + + // TODO(baptist): The TraceInfo here is what was used for processing this + // request locally. Its not clear if this is useful for debugging purposes, + // but there isn't a clean place to put this right now. We can't send a + // request with non-empty TraceInfo. A fast follow-up PR needs to clean up + // how TraceInfo and CollectedSpans are managed for proxy requests. + if !ba.TraceInfo.Empty() { + ba.TraceInfo.Reset() + } + // The BatchRequest API requires the timestamp is empty on all requests and + // sets it based on the ReadTimestamp of the transaction. Clear the + // timestamp so BatchRequest.SetActiveTimestamp doesn't error. + if !ba.Timestamp.IsEmpty() { + ba.Timestamp.Reset() + } + // Send the request through our local dist_sender. + br, pErr := n.proxySender.Send(ctx, ba) + log.VEventf(ctx, 2, "proxy response is %v %v", br, pErr) + + if pErr != nil { + // For connection errors, we treat it as though the proxy never happened, + // return the local error from attempting the proxy. + if grpcutil.IsClosedConnection(pErr.GoError()) { + log.VEventf(ctx, 2, "node received network error on proxy request: %s", pErr) + return nil, false + } + // For other errors, put them into a BatchResponse with this error and send + // this error to the client instead. + br = &kvpb.BatchResponse{} + br.Error = pErr + } + + return br, true +} + // getLocalityComparison takes gatewayNodeID as input and returns the locality // comparison result between the gateway node and the current node. This result // indicates whether the two nodes are located in different regions or zones.