diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index cb7e58940c57..43fd461579fe 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2127,7 +2127,7 @@ func (ds *DistSender) sendToReplicas( // account that the local node can't be down) it won't take long until we // talk to a replica that tells us who the leaseholder is. if ctx.Err() == nil { - if lh := routing.Leaseholder(); lh != nil && *lh == curReplica { + if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) { routing.EvictLease(ctx) } } @@ -2209,7 +2209,7 @@ func (ds *DistSender) sendToReplicas( // (possibly because it hasn't applied its lease yet). Perhaps that // lease expires and someone else gets a new one, so by moving on we // get out of possibly infinite loops. - if *lh != curReplica || sameReplicaRetries < sameReplicaRetryLimit { + if !lh.IsSame(curReplica) || sameReplicaRetries < sameReplicaRetryLimit { transport.MoveToFront(*lh) } } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 2c2066499ecd..e27c690d8f82 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -189,7 +189,7 @@ func (l *simpleTransportAdapter) SkipReplica() { func (l *simpleTransportAdapter) MoveToFront(replica roachpb.ReplicaDescriptor) { for i := range l.replicas { - if l.replicas[i] == replica { + if l.replicas[i].IsSame(replica) { // If we've already processed the replica, decrement the current // index before we swap. if i < l.nextReplicaIdx { @@ -584,6 +584,8 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { ctx := context.Background() recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[1] + recognizedLeaseHolderIncoming := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[2] + recognizedLeaseHolderIncoming.Type = roachpb.ReplicaTypeVoterIncoming() unrecognizedLeaseHolder := roachpb.ReplicaDescriptor{ NodeID: 99, StoreID: 999, @@ -623,6 +625,15 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { }, expLeaseholder: nil, }, + { + name: "leaseholder in desc with different type", + nlhe: roachpb.NotLeaseHolderError{ + RangeID: testUserRangeDescriptor3Replicas.RangeID, + Lease: &roachpb.Lease{Replica: recognizedLeaseHolderIncoming, Sequence: 1}, + }, + expLeaseholder: &recognizedLeaseHolderIncoming, + expLease: true, + }, { name: "leaseholder unknown", nlhe: roachpb.NotLeaseHolderError{ @@ -647,20 +658,22 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { )) } - first := true + var attempts int + var retryReplica roachpb.ReplicaDescriptor var testFn simpleSendFn = func( _ context.Context, args roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { + attempts++ reply := &roachpb.BatchResponse{} - if first { + if attempts == 1 { reply.Error = roachpb.NewError(&tc.nlhe) - first = false return reply, nil } // Return an error to avoid activating a code path that would update the // cache with the leaseholder from the successful response. That's not // what this test wants to test. + retryReplica = args.Header.Replica reply.Error = roachpb.NewErrorf("boom") return reply, nil } @@ -683,9 +696,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { if _, pErr := kv.SendWrapped(ctx, ds, put); !testutils.IsPError(pErr, "boom") { t.Fatalf("unexpected error: %v", pErr) } - if first { - t.Fatal("the request did not retry") - } + require.Equal(t, 2, attempts) rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */) require.NotNil(t, rng) @@ -697,6 +708,12 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { l := rng.Lease() require.NotNil(t, l) require.Equal(t, *tc.expLeaseholder, l.Replica) + // The transport retry will use the replica descriptor from the + // initial range descriptor, not the one returned in the NLHE, i.e. + // it won't have the non-nil type. + expRetryReplica := *tc.expLeaseholder + expRetryReplica.Type = nil + require.Equal(t, expRetryReplica, retryReplica) } else { require.Nil(t, rng.Lease()) } diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index dd9d8c526318..83cfbffa0d34 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -255,7 +255,7 @@ func (gt *grpcTransport) SkipReplica() { func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { for i := range gt.replicas { - if gt.replicas[i] == replica { + if gt.replicas[i].IsSame(replica) { // If we've already processed the replica, decrement the current // index before we swap. if i < gt.nextReplicaIdx { diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 2f051b2a84bd..d749345d1a0d 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/stretchr/testify/require" "google.golang.org/grpc" ) @@ -30,6 +31,8 @@ func TestTransportMoveToFront(t *testing.T) { rd1 := roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1} rd2 := roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2} rd3 := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3} + rd3Incoming := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3, + Type: roachpb.ReplicaTypeVoterIncoming()} gt := grpcTransport{replicas: []roachpb.ReplicaDescriptor{rd1, rd2, rd3}} verifyOrder := func(replicas []roachpb.ReplicaDescriptor) { @@ -94,6 +97,11 @@ func TestTransportMoveToFront(t *testing.T) { if gt.nextReplicaIdx != 1 { t.Fatalf("expected client index 1; got %d", gt.nextReplicaIdx) } + + // Move rd3 to front, even if the replica type differs. + gt.MoveToFront(rd3Incoming) + verifyOrder([]roachpb.ReplicaDescriptor{rd1, rd3, rd2}) + require.Equal(t, 1, gt.nextReplicaIdx) } // TestSpanImport tests that the gRPC transport ingests trace information that diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index c85f0cdc97b7..2413f276d57b 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -416,6 +416,12 @@ func (r ReplicaDescriptor) String() string { return redact.StringWithoutMarkers(r) } +// IsSame returns true if the two replica descriptors refer to the same replica, +// ignoring the replica type. +func (r ReplicaDescriptor) IsSame(o ReplicaDescriptor) bool { + return r.NodeID == o.NodeID && r.StoreID == o.StoreID && r.ReplicaID == o.ReplicaID +} + // SafeFormat implements the redact.SafeFormatter interface. func (r ReplicaDescriptor) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("(n%d,s%d):", r.NodeID, r.StoreID)