From 2b143cb42288c11a405f4bf67260503774f8544d Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 27 Jul 2022 15:31:36 +0000 Subject: [PATCH] kvserver: ignore leaseholder replica type in DistSender The DistSender could fail to prioritize a newly discovered leaseholder from a `NotLeaseHolderError` if the leaseholder had a non-`VOTER` replica type. Instead, it would continue to try replicas in order until possibly exhausting the transport and backing off, leading to increased tail latencies. This applies in particular to 22.1, where we allowed `VOTER_INCOMING` replicas to acquire the lease (see 22b4fb51). The primary reason is that `grpcTransport.MoveToFront()` would fail to compare the new leaseholder replica descriptor with the one in its range descriptor. There are two reasons why this can happen: 1. `ReplicaDescriptor.ReplicaType` is a pointer, where the zero-value `nil` is equivalent to `VOTER`. The equality comparison used in `MoveToFront()` is `==`, but pointer equality compares the memory address rather than the value. 2. The transport will keep using the initial range descriptor when it was created, and not updating it as we receive updated range descriptors. This means that the transport may e.g. have a `nil` replica type while the leaseholder has an `VOTER_INCOMING` replica type. This patch fixes both issues by adding `ReplicaDescriptor.IsSame()` which compares replica identities while ignoring the type. Release note (bug fix): Fixed a bug where new leaseholders (with a `VOTER_INCOMING` type) would not always be detected properly during query execution, leading to occasional increased tail latencies due to unnecessary internal retries. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 4 +-- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 31 ++++++++++++++++----- pkg/kv/kvclient/kvcoord/transport.go | 2 +- pkg/kv/kvclient/kvcoord/transport_test.go | 8 ++++++ pkg/roachpb/metadata.go | 6 ++++ 5 files changed, 41 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index cb7e58940c57..43fd461579fe 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2127,7 +2127,7 @@ func (ds *DistSender) sendToReplicas( // account that the local node can't be down) it won't take long until we // talk to a replica that tells us who the leaseholder is. if ctx.Err() == nil { - if lh := routing.Leaseholder(); lh != nil && *lh == curReplica { + if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) { routing.EvictLease(ctx) } } @@ -2209,7 +2209,7 @@ func (ds *DistSender) sendToReplicas( // (possibly because it hasn't applied its lease yet). Perhaps that // lease expires and someone else gets a new one, so by moving on we // get out of possibly infinite loops. - if *lh != curReplica || sameReplicaRetries < sameReplicaRetryLimit { + if !lh.IsSame(curReplica) || sameReplicaRetries < sameReplicaRetryLimit { transport.MoveToFront(*lh) } } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 2c2066499ecd..e27c690d8f82 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -189,7 +189,7 @@ func (l *simpleTransportAdapter) SkipReplica() { func (l *simpleTransportAdapter) MoveToFront(replica roachpb.ReplicaDescriptor) { for i := range l.replicas { - if l.replicas[i] == replica { + if l.replicas[i].IsSame(replica) { // If we've already processed the replica, decrement the current // index before we swap. if i < l.nextReplicaIdx { @@ -584,6 +584,8 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { ctx := context.Background() recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[1] + recognizedLeaseHolderIncoming := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[2] + recognizedLeaseHolderIncoming.Type = roachpb.ReplicaTypeVoterIncoming() unrecognizedLeaseHolder := roachpb.ReplicaDescriptor{ NodeID: 99, StoreID: 999, @@ -623,6 +625,15 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { }, expLeaseholder: nil, }, + { + name: "leaseholder in desc with different type", + nlhe: roachpb.NotLeaseHolderError{ + RangeID: testUserRangeDescriptor3Replicas.RangeID, + Lease: &roachpb.Lease{Replica: recognizedLeaseHolderIncoming, Sequence: 1}, + }, + expLeaseholder: &recognizedLeaseHolderIncoming, + expLease: true, + }, { name: "leaseholder unknown", nlhe: roachpb.NotLeaseHolderError{ @@ -647,20 +658,22 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { )) } - first := true + var attempts int + var retryReplica roachpb.ReplicaDescriptor var testFn simpleSendFn = func( _ context.Context, args roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { + attempts++ reply := &roachpb.BatchResponse{} - if first { + if attempts == 1 { reply.Error = roachpb.NewError(&tc.nlhe) - first = false return reply, nil } // Return an error to avoid activating a code path that would update the // cache with the leaseholder from the successful response. That's not // what this test wants to test. + retryReplica = args.Header.Replica reply.Error = roachpb.NewErrorf("boom") return reply, nil } @@ -683,9 +696,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { if _, pErr := kv.SendWrapped(ctx, ds, put); !testutils.IsPError(pErr, "boom") { t.Fatalf("unexpected error: %v", pErr) } - if first { - t.Fatal("the request did not retry") - } + require.Equal(t, 2, attempts) rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */) require.NotNil(t, rng) @@ -697,6 +708,12 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { l := rng.Lease() require.NotNil(t, l) require.Equal(t, *tc.expLeaseholder, l.Replica) + // The transport retry will use the replica descriptor from the + // initial range descriptor, not the one returned in the NLHE, i.e. + // it won't have the non-nil type. + expRetryReplica := *tc.expLeaseholder + expRetryReplica.Type = nil + require.Equal(t, expRetryReplica, retryReplica) } else { require.Nil(t, rng.Lease()) } diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index dd9d8c526318..83cfbffa0d34 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -255,7 +255,7 @@ func (gt *grpcTransport) SkipReplica() { func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { for i := range gt.replicas { - if gt.replicas[i] == replica { + if gt.replicas[i].IsSame(replica) { // If we've already processed the replica, decrement the current // index before we swap. if i < gt.nextReplicaIdx { diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 2f051b2a84bd..d749345d1a0d 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/stretchr/testify/require" "google.golang.org/grpc" ) @@ -30,6 +31,8 @@ func TestTransportMoveToFront(t *testing.T) { rd1 := roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1} rd2 := roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2} rd3 := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3} + rd3Incoming := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3, + Type: roachpb.ReplicaTypeVoterIncoming()} gt := grpcTransport{replicas: []roachpb.ReplicaDescriptor{rd1, rd2, rd3}} verifyOrder := func(replicas []roachpb.ReplicaDescriptor) { @@ -94,6 +97,11 @@ func TestTransportMoveToFront(t *testing.T) { if gt.nextReplicaIdx != 1 { t.Fatalf("expected client index 1; got %d", gt.nextReplicaIdx) } + + // Move rd3 to front, even if the replica type differs. + gt.MoveToFront(rd3Incoming) + verifyOrder([]roachpb.ReplicaDescriptor{rd1, rd3, rd2}) + require.Equal(t, 1, gt.nextReplicaIdx) } // TestSpanImport tests that the gRPC transport ingests trace information that diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index c85f0cdc97b7..2413f276d57b 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -416,6 +416,12 @@ func (r ReplicaDescriptor) String() string { return redact.StringWithoutMarkers(r) } +// IsSame returns true if the two replica descriptors refer to the same replica, +// ignoring the replica type. +func (r ReplicaDescriptor) IsSame(o ReplicaDescriptor) bool { + return r.NodeID == o.NodeID && r.StoreID == o.StoreID && r.ReplicaID == o.ReplicaID +} + // SafeFormat implements the redact.SafeFormatter interface. func (r ReplicaDescriptor) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("(n%d,s%d):", r.NodeID, r.StoreID)