Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: kvserver: ignore leaseholder replica type in DistSender #85315

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2127,7 +2127,7 @@ func (ds *DistSender) sendToReplicas(
// account that the local node can't be down) it won't take long until we
// talk to a replica that tells us who the leaseholder is.
if ctx.Err() == nil {
if lh := routing.Leaseholder(); lh != nil && *lh == curReplica {
if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
routing.EvictLease(ctx)
}
}
Expand Down Expand Up @@ -2209,7 +2209,7 @@ func (ds *DistSender) sendToReplicas(
// (possibly because it hasn't applied its lease yet). Perhaps that
// lease expires and someone else gets a new one, so by moving on we
// get out of possibly infinite loops.
if *lh != curReplica || sameReplicaRetries < sameReplicaRetryLimit {
if !lh.IsSame(curReplica) || sameReplicaRetries < sameReplicaRetryLimit {
transport.MoveToFront(*lh)
}
}
Expand Down
31 changes: 24 additions & 7 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (l *simpleTransportAdapter) SkipReplica() {

func (l *simpleTransportAdapter) MoveToFront(replica roachpb.ReplicaDescriptor) {
for i := range l.replicas {
if l.replicas[i] == replica {
if l.replicas[i].IsSame(replica) {
// If we've already processed the replica, decrement the current
// index before we swap.
if i < l.nextReplicaIdx {
Expand Down Expand Up @@ -584,6 +584,8 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
ctx := context.Background()

recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[1]
recognizedLeaseHolderIncoming := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[2]
recognizedLeaseHolderIncoming.Type = roachpb.ReplicaTypeVoterIncoming()
unrecognizedLeaseHolder := roachpb.ReplicaDescriptor{
NodeID: 99,
StoreID: 999,
Expand Down Expand Up @@ -623,6 +625,15 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
},
expLeaseholder: nil,
},
{
name: "leaseholder in desc with different type",
nlhe: roachpb.NotLeaseHolderError{
RangeID: testUserRangeDescriptor3Replicas.RangeID,
Lease: &roachpb.Lease{Replica: recognizedLeaseHolderIncoming, Sequence: 1},
},
expLeaseholder: &recognizedLeaseHolderIncoming,
expLease: true,
},
{
name: "leaseholder unknown",
nlhe: roachpb.NotLeaseHolderError{
Expand All @@ -647,20 +658,22 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
))
}

first := true
var attempts int
var retryReplica roachpb.ReplicaDescriptor

var testFn simpleSendFn = func(
_ context.Context, args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
attempts++
reply := &roachpb.BatchResponse{}
if first {
if attempts == 1 {
reply.Error = roachpb.NewError(&tc.nlhe)
first = false
return reply, nil
}
// Return an error to avoid activating a code path that would update the
// cache with the leaseholder from the successful response. That's not
// what this test wants to test.
retryReplica = args.Header.Replica
reply.Error = roachpb.NewErrorf("boom")
return reply, nil
}
Expand All @@ -683,9 +696,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
if _, pErr := kv.SendWrapped(ctx, ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("unexpected error: %v", pErr)
}
if first {
t.Fatal("the request did not retry")
}
require.Equal(t, 2, attempts)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.NotNil(t, rng)

Expand All @@ -697,6 +708,12 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
l := rng.Lease()
require.NotNil(t, l)
require.Equal(t, *tc.expLeaseholder, l.Replica)
// The transport retry will use the replica descriptor from the
// initial range descriptor, not the one returned in the NLHE, i.e.
// it won't have the non-nil type.
expRetryReplica := *tc.expLeaseholder
expRetryReplica.Type = nil
require.Equal(t, expRetryReplica, retryReplica)
} else {
require.Nil(t, rng.Lease())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (gt *grpcTransport) SkipReplica() {

func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
for i := range gt.replicas {
if gt.replicas[i] == replica {
if gt.replicas[i].IsSame(replica) {
// If we've already processed the replica, decrement the current
// index before we swap.
if i < gt.nextReplicaIdx {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

Expand All @@ -30,6 +31,8 @@ func TestTransportMoveToFront(t *testing.T) {
rd1 := roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1}
rd2 := roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2}
rd3 := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3}
rd3Incoming := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3,
Type: roachpb.ReplicaTypeVoterIncoming()}
gt := grpcTransport{replicas: []roachpb.ReplicaDescriptor{rd1, rd2, rd3}}

verifyOrder := func(replicas []roachpb.ReplicaDescriptor) {
Expand Down Expand Up @@ -94,6 +97,11 @@ func TestTransportMoveToFront(t *testing.T) {
if gt.nextReplicaIdx != 1 {
t.Fatalf("expected client index 1; got %d", gt.nextReplicaIdx)
}

// Move rd3 to front, even if the replica type differs.
gt.MoveToFront(rd3Incoming)
verifyOrder([]roachpb.ReplicaDescriptor{rd1, rd3, rd2})
require.Equal(t, 1, gt.nextReplicaIdx)
}

// TestSpanImport tests that the gRPC transport ingests trace information that
Expand Down
6 changes: 6 additions & 0 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ func (r ReplicaDescriptor) String() string {
return redact.StringWithoutMarkers(r)
}

// IsSame returns true if the two replica descriptors refer to the same replica,
// ignoring the replica type.
func (r ReplicaDescriptor) IsSame(o ReplicaDescriptor) bool {
return r.NodeID == o.NodeID && r.StoreID == o.StoreID && r.ReplicaID == o.ReplicaID
}

// SafeFormat implements the redact.SafeFormatter interface.
func (r ReplicaDescriptor) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("(n%d,s%d):", r.NodeID, r.StoreID)
Expand Down