Skip to content

Commit

Permalink
Merge pull request #85315 from erikgrinaker/backport22.1-85140
Browse files Browse the repository at this point in the history
release-22.1: kvserver: ignore leaseholder replica type in DistSender
  • Loading branch information
erikgrinaker authored Jul 29, 2022
2 parents 5fffafe + 2b143cb commit 60293f9
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 10 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2127,7 +2127,7 @@ func (ds *DistSender) sendToReplicas(
// account that the local node can't be down) it won't take long until we
// talk to a replica that tells us who the leaseholder is.
if ctx.Err() == nil {
if lh := routing.Leaseholder(); lh != nil && *lh == curReplica {
if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
routing.EvictLease(ctx)
}
}
Expand Down Expand Up @@ -2209,7 +2209,7 @@ func (ds *DistSender) sendToReplicas(
// (possibly because it hasn't applied its lease yet). Perhaps that
// lease expires and someone else gets a new one, so by moving on we
// get out of possibly infinite loops.
if *lh != curReplica || sameReplicaRetries < sameReplicaRetryLimit {
if !lh.IsSame(curReplica) || sameReplicaRetries < sameReplicaRetryLimit {
transport.MoveToFront(*lh)
}
}
Expand Down
31 changes: 24 additions & 7 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (l *simpleTransportAdapter) SkipReplica() {

func (l *simpleTransportAdapter) MoveToFront(replica roachpb.ReplicaDescriptor) {
for i := range l.replicas {
if l.replicas[i] == replica {
if l.replicas[i].IsSame(replica) {
// If we've already processed the replica, decrement the current
// index before we swap.
if i < l.nextReplicaIdx {
Expand Down Expand Up @@ -584,6 +584,8 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
ctx := context.Background()

recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[1]
recognizedLeaseHolderIncoming := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[2]
recognizedLeaseHolderIncoming.Type = roachpb.ReplicaTypeVoterIncoming()
unrecognizedLeaseHolder := roachpb.ReplicaDescriptor{
NodeID: 99,
StoreID: 999,
Expand Down Expand Up @@ -623,6 +625,15 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
},
expLeaseholder: nil,
},
{
name: "leaseholder in desc with different type",
nlhe: roachpb.NotLeaseHolderError{
RangeID: testUserRangeDescriptor3Replicas.RangeID,
Lease: &roachpb.Lease{Replica: recognizedLeaseHolderIncoming, Sequence: 1},
},
expLeaseholder: &recognizedLeaseHolderIncoming,
expLease: true,
},
{
name: "leaseholder unknown",
nlhe: roachpb.NotLeaseHolderError{
Expand All @@ -647,20 +658,22 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
))
}

first := true
var attempts int
var retryReplica roachpb.ReplicaDescriptor

var testFn simpleSendFn = func(
_ context.Context, args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
attempts++
reply := &roachpb.BatchResponse{}
if first {
if attempts == 1 {
reply.Error = roachpb.NewError(&tc.nlhe)
first = false
return reply, nil
}
// Return an error to avoid activating a code path that would update the
// cache with the leaseholder from the successful response. That's not
// what this test wants to test.
retryReplica = args.Header.Replica
reply.Error = roachpb.NewErrorf("boom")
return reply, nil
}
Expand All @@ -683,9 +696,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
if _, pErr := kv.SendWrapped(ctx, ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("unexpected error: %v", pErr)
}
if first {
t.Fatal("the request did not retry")
}
require.Equal(t, 2, attempts)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.NotNil(t, rng)

Expand All @@ -697,6 +708,12 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
l := rng.Lease()
require.NotNil(t, l)
require.Equal(t, *tc.expLeaseholder, l.Replica)
// The transport retry will use the replica descriptor from the
// initial range descriptor, not the one returned in the NLHE, i.e.
// it won't have the non-nil type.
expRetryReplica := *tc.expLeaseholder
expRetryReplica.Type = nil
require.Equal(t, expRetryReplica, retryReplica)
} else {
require.Nil(t, rng.Lease())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (gt *grpcTransport) SkipReplica() {

func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
for i := range gt.replicas {
if gt.replicas[i] == replica {
if gt.replicas[i].IsSame(replica) {
// If we've already processed the replica, decrement the current
// index before we swap.
if i < gt.nextReplicaIdx {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

Expand All @@ -30,6 +31,8 @@ func TestTransportMoveToFront(t *testing.T) {
rd1 := roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1}
rd2 := roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2}
rd3 := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3}
rd3Incoming := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3,
Type: roachpb.ReplicaTypeVoterIncoming()}
gt := grpcTransport{replicas: []roachpb.ReplicaDescriptor{rd1, rd2, rd3}}

verifyOrder := func(replicas []roachpb.ReplicaDescriptor) {
Expand Down Expand Up @@ -94,6 +97,11 @@ func TestTransportMoveToFront(t *testing.T) {
if gt.nextReplicaIdx != 1 {
t.Fatalf("expected client index 1; got %d", gt.nextReplicaIdx)
}

// Move rd3 to front, even if the replica type differs.
gt.MoveToFront(rd3Incoming)
verifyOrder([]roachpb.ReplicaDescriptor{rd1, rd3, rd2})
require.Equal(t, 1, gt.nextReplicaIdx)
}

// TestSpanImport tests that the gRPC transport ingests trace information that
Expand Down
6 changes: 6 additions & 0 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ func (r ReplicaDescriptor) String() string {
return redact.StringWithoutMarkers(r)
}

// IsSame returns true if the two replica descriptors refer to the same replica,
// ignoring the replica type.
func (r ReplicaDescriptor) IsSame(o ReplicaDescriptor) bool {
return r.NodeID == o.NodeID && r.StoreID == o.StoreID && r.ReplicaID == o.ReplicaID
}

// SafeFormat implements the redact.SafeFormatter interface.
func (r ReplicaDescriptor) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("(n%d,s%d):", r.NodeID, r.StoreID)
Expand Down

0 comments on commit 60293f9

Please sign in to comment.