From 6634b51093d722bc6afab1c92fe4b1ed36a46c62 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Wed, 3 Apr 2024 12:50:43 +0000 Subject: [PATCH] kvclient: don't drop ambiguous errors on incompatible transport Previously in the case of a NLHE with an incompatible transport we would exit the retry loop immediately and return the error without checking if there were any ambiguous or replica unavailable errors. This had a risk of losing an ambiguous error if the retry lt ater succeeded and returning an incorrect result to the caller. Epic: none Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 39 ++-- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 195 ++++++++++++++++++++ 2 files changed, 215 insertions(+), 19 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 9f4ab424e9ed..bb6318c71d00 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2429,13 +2429,13 @@ func maybeSetResumeSpan( } } -// noMoreReplicasErr produces the error to be returned from sendToReplicas when +// selectBestError produces the error to be returned from sendToReplicas when // the transport is exhausted. // // ambiguousErr, if not nil, is the error we got from the first attempt when the // success of the request cannot be ruled out by the error. lastAttemptErr is // the error that the last attempt to execute the request returned. -func noMoreReplicasErr(ambiguousErr, replicaUnavailableErr, lastAttemptErr error) error { +func selectBestError(ambiguousErr, replicaUnavailableErr, lastAttemptErr error) error { if ambiguousErr != nil { return kvpb.NewAmbiguousResultErrorf("error=%v [exhausted] (last error: %v)", ambiguousErr, lastAttemptErr) @@ -2444,17 +2444,7 @@ func noMoreReplicasErr(ambiguousErr, replicaUnavailableErr, lastAttemptErr error return replicaUnavailableErr } - // Authentication and authorization errors should be propagated up rather than - // wrapped in a sendError and retried as they are likely to be fatal if they - // are returned from multiple servers. - if grpcutil.IsAuthError(lastAttemptErr) { - return lastAttemptErr - } - // TODO(bdarnell): The error from the last attempt is not necessarily the best - // one to return; we may want to remember the "best" error we've seen (for - // example, a NotLeaseHolderError conveys more information than a - // RangeNotFound). - return newSendError(errors.Wrap(lastAttemptErr, "sending to all replicas failed; last error")) + return lastAttemptErr } // slowDistSenderRangeThreshold is a latency threshold for logging slow @@ -3034,12 +3024,17 @@ func (ds *DistSender) sendToReplicas( // regress. As such, advancing through each replica on the // transport until it's exhausted is unlikely to achieve much. // - // We bail early by returning a sendError. The expectation is - // for the client to retry with a fresher eviction token. + // We bail early by returning the best error we have + // seen so far. The expectation is for the client to + // retry with a fresher eviction token if possible. log.VEventf( ctx, 2, "transport incompatible with updated routing; bailing early", ) - return nil, newSendError(errors.Wrap(tErr, "leaseholder not found in transport; last error")) + return nil, selectBestError( + ambiguousError, + replicaUnavailableError, + newSendError(errors.Wrap(tErr, "leaseholder not found in transport; last error")), + ) } } // Check whether the request was intentionally sent to a follower @@ -3308,15 +3303,21 @@ func skipStaleReplicas( // RangeKeyMismatchError if there's even a replica. We'll bubble up an // error and try with a new descriptor. if !routing.Valid() { - return noMoreReplicasErr( + return selectBestError( ambiguousError, nil, // ignore the replicaUnavailableError, retry with new routing info - errors.Wrap(lastErr, "routing information detected to be stale")) + newSendError(errors.Wrap(lastErr, "routing information detected to be stale"))) } for { if transport.IsExhausted() { - return noMoreReplicasErr(ambiguousError, replicaUnavailableError, lastErr) + // Authentication and authorization errors should be propagated up rather than + // wrapped in a sendError and retried as they are likely to be fatal if they + // are returned from multiple servers. + if !grpcutil.IsAuthError(lastErr) { + lastErr = newSendError(errors.Wrap(lastErr, "sending to all replicas failed; last error")) + } + return selectBestError(ambiguousError, replicaUnavailableError, lastErr) } if _, ok := routing.Desc().GetReplicaDescriptorByID(transport.NextReplica().ReplicaID); ok { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index a45adeb9a44f..e24543a4d2cf 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -3606,6 +3606,201 @@ func TestGatewayNodeID(t *testing.T) { } } +// TestReplicaErrorsMerged tests cases where the different replicas return +// different errors. Specifically it is making sure that more important errors +// such as ambiguous errors are never dropped. +func TestReplicaErrorsMerged(t *testing.T) { + // Only one descriptor. + var initDescriptor = roachpb.RangeDescriptor{ + Generation: 1, + RangeID: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + { + NodeID: 2, + StoreID: 2, + ReplicaID: 2, + }, + }, + } + var initLease = roachpb.Lease{ + Sequence: 1, + Replica: roachpb.ReplicaDescriptor{ + NodeID: 1, StoreID: 1, ReplicaID: 1, + }, + } + var descriptor2 = roachpb.RangeDescriptor{ + Generation: 2, + RangeID: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + { + NodeID: 3, + StoreID: 3, + ReplicaID: 2, + }, + }, + } + var lease3 = roachpb.Lease{ + Sequence: 2, + Replica: roachpb.ReplicaDescriptor{ + NodeID: 3, StoreID: 3, ReplicaID: 2, + }, + } + + notLeaseHolderErr := kvpb.NewError(kvpb.NewNotLeaseHolderError(lease3, 0, &descriptor2, "")) + startedRequestError := errors.New("request might have started") + unavailableError1 := kvpb.NewError(kvpb.NewReplicaUnavailableError(errors.New("unavailable"), &initDescriptor, initDescriptor.InternalReplicas[0])) + unavailableError2 := kvpb.NewError(kvpb.NewReplicaUnavailableError(errors.New("unavailable"), &initDescriptor, initDescriptor.InternalReplicas[1])) + + // withCommit changes the error handling behavior in sendPartialBatch. + // Specifically if the top level request was sent with a commit, then it + // will convert network errors that may have started to ambiguous errors and + // these are returned with higher priority. This prevents ambiguous errors + // from being retried incorrectly. + // See https://cockroachlabs.com/blog/demonic-nondeterminism/#appendix for + // the gory details. + testCases := []struct { + withCommit bool + sendErr1, sendErr2 error + err1, err2 *kvpb.Error + expErr string + }{ + // The ambiguous error is returned with higher priority for withCommit. + { + withCommit: true, + sendErr1: startedRequestError, + err2: notLeaseHolderErr, + expErr: "result is ambiguous", + }, + // The not leaseholder errors is the last error. + { + withCommit: false, + sendErr1: startedRequestError, + err2: notLeaseHolderErr, + expErr: "leaseholder not found in transport", + }, + // The ambiguous error is returned with higher priority for withCommit. + { + withCommit: true, + sendErr1: startedRequestError, + err2: unavailableError2, + expErr: "result is ambiguous", + }, + // The unavailable error is the last error. + { + withCommit: false, + sendErr1: startedRequestError, + err2: unavailableError2, + expErr: "unavailable", + }, + // The unavailable error is returned with higher priority regardless of withCommit. + { + withCommit: true, + err1: unavailableError1, + err2: notLeaseHolderErr, + expErr: "unavailable", + }, + // The unavailable error is returned with higher priority regardless of withCommit. + { + withCommit: false, + err1: unavailableError1, + err2: notLeaseHolderErr, + expErr: "unavailable", + }, + } + clock := hlc.NewClockForTesting(nil) + ns := &mockNodeStore{ + nodes: []roachpb.NodeDescriptor{ + { + NodeID: 1, + Address: util.UnresolvedAddr{}, + }, + { + NodeID: 2, + Address: util.UnresolvedAddr{}, + }, + }, + } + ctx := context.Background() + for i, tc := range testCases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + // We run every test case twice, to make sure error merging is commutative. + testutils.RunTrueAndFalse(t, "reverse", func(t *testing.T, reverse bool) { + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + st := cluster.MakeTestingClusterSettings() + rc := rangecache.NewRangeCache(st, nil /* db */, func() int64 { return 100 }, stopper) + rc.Insert(ctx, roachpb.RangeInfo{ + Desc: initDescriptor, + Lease: initLease, + }) + + transportFn := func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { + br := &kvpb.BatchResponse{} + switch ba.Replica.NodeID { + case 1: + if tc.sendErr1 != nil { + return nil, tc.sendErr1 + } else { + br.Error = tc.err1 + } + return br, nil + case 2: + if tc.sendErr2 != nil { + return nil, tc.sendErr2 + } else { + br.Error = tc.err2 + } + return br, nil + default: + assert.Fail(t, "Unexpected replica n%d", ba.Replica.NodeID) + return nil, nil + } + } + cfg := DistSenderConfig{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Clock: clock, + NodeDescs: ns, + Stopper: stopper, + RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( + []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, + ) { + // These tests only deal with the low-level sendToReplicas(). Nobody + // should be reading descriptor from the database, but the DistSender + // insists on having a non-nil one. + return nil, nil, errors.New("range desc db unexpectedly used") + }), + TransportFactory: adaptSimpleTransport(transportFn), + Settings: cluster.MakeTestingClusterSettings(), + } + ds := NewDistSender(cfg) + + ba := &kvpb.BatchRequest{} + ba.Add(kvpb.NewGet(roachpb.Key("a"))) + tok, err := rc.LookupWithEvictionToken(ctx, roachpb.RKeyMin, rangecache.EvictionToken{}, false) + require.NoError(t, err) + br, err := ds.sendToReplicas(ctx, ba, tok, tc.withCommit) + log.Infof(ctx, "Error is %v", err) + require.ErrorContains(t, err, tc.expErr) + require.Nil(t, br) + }) + }) + } +} + // TestMultipleErrorsMerged tests that DistSender prioritizes errors that are // returned from concurrent partial batches and returns the "best" one after // merging the transaction metadata passed on the errors.