Skip to content

Commit

Permalink
kvserver: ignore leaseholder replica type in DistSender
Browse files Browse the repository at this point in the history
The DistSender could fail to prioritize a newly discovered leaseholder
from a `NotLeaseHolderError` if the leaseholder had a non-`VOTER`
replica type. Instead, it would continue to try replicas in order until
possibly exhausting the transport and backing off, leading to increased
tail latencies. This applies in particular to 22.1, where we allowed
`VOTER_INCOMING` replicas to acquire the lease (see 22b4fb5).

The primary reason is that `grpcTransport.MoveToFront()` would fail to
compare the new leaseholder replica descriptor with the one in its range
descriptor. There are two reasons why this can happen:

1. `ReplicaDescriptor.ReplicaType` is a pointer, where the zero-value
   `nil` is equivalent to `VOTER`. The equality comparison used in
   `MoveToFront()` is `==`, but pointer equality compares the memory
   address rather than the value.

2. The transport will keep using the initial range descriptor when it
   was created, and not updating it as we receive updated range
   descriptors. This means that the transport may e.g. have a `nil`
   replica type while the leaseholder has an `VOTER_INCOMING` replica
   type.

This patch fixes both issues by adding `ReplicaDescriptor.IsSame()`
which compares replica identities while ignoring the type.

Release note (bug fix): Fixed a bug where new leaseholders (with a
`VOTER_INCOMING` type) would not always be detected properly during
query execution, leading to occasional increased tail latencies due
to unnecessary internal retries.
  • Loading branch information
erikgrinaker committed Jul 27, 2022
1 parent 811ca1f commit 0251570
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 10 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,7 @@ func (ds *DistSender) sendToReplicas(
// account that the local node can't be down) it won't take long until we
// talk to a replica that tells us who the leaseholder is.
if ctx.Err() == nil {
if lh := routing.Leaseholder(); lh != nil && *lh == curReplica {
if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
routing.EvictLease(ctx)
}
}
Expand Down Expand Up @@ -2228,7 +2228,7 @@ func (ds *DistSender) sendToReplicas(
// (possibly because it hasn't applied its lease yet). Perhaps that
// lease expires and someone else gets a new one, so by moving on we
// get out of possibly infinite loops.
if *lh != curReplica || sameReplicaRetries < sameReplicaRetryLimit {
if !lh.IsSame(curReplica) || sameReplicaRetries < sameReplicaRetryLimit {
transport.MoveToFront(*lh)
}
}
Expand Down
31 changes: 24 additions & 7 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (l *simpleTransportAdapter) SkipReplica() {

func (l *simpleTransportAdapter) MoveToFront(replica roachpb.ReplicaDescriptor) {
for i := range l.replicas {
if l.replicas[i] == replica {
if l.replicas[i].IsSame(replica) {
// If we've already processed the replica, decrement the current
// index before we swap.
if i < l.nextReplicaIdx {
Expand Down Expand Up @@ -584,6 +584,8 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
ctx := context.Background()

recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[1]
recognizedLeaseHolderIncoming := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[2]
recognizedLeaseHolderIncoming.Type = roachpb.ReplicaTypeVoterIncoming()
unrecognizedLeaseHolder := roachpb.ReplicaDescriptor{
NodeID: 99,
StoreID: 999,
Expand Down Expand Up @@ -623,6 +625,15 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
},
expLeaseholder: nil,
},
{
name: "leaseholder in desc with different type",
nlhe: roachpb.NotLeaseHolderError{
RangeID: testUserRangeDescriptor3Replicas.RangeID,
Lease: &roachpb.Lease{Replica: recognizedLeaseHolderIncoming, Sequence: 1},
},
expLeaseholder: &recognizedLeaseHolderIncoming,
expLease: true,
},
{
name: "leaseholder unknown",
nlhe: roachpb.NotLeaseHolderError{
Expand All @@ -647,20 +658,22 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
))
}

first := true
var attempts int
var retryReplica roachpb.ReplicaDescriptor

var testFn simpleSendFn = func(
_ context.Context, args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
attempts++
reply := &roachpb.BatchResponse{}
if first {
if attempts == 1 {
reply.Error = roachpb.NewError(&tc.nlhe)
first = false
return reply, nil
}
// Return an error to avoid activating a code path that would update the
// cache with the leaseholder from the successful response. That's not
// what this test wants to test.
retryReplica = args.Header.Replica
reply.Error = roachpb.NewErrorf("boom")
return reply, nil
}
Expand All @@ -683,9 +696,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
if _, pErr := kv.SendWrapped(ctx, ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("unexpected error: %v", pErr)
}
if first {
t.Fatal("the request did not retry")
}
require.Equal(t, 2, attempts)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.NotNil(t, rng)

Expand All @@ -697,6 +708,12 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
l := rng.Lease()
require.NotNil(t, l)
require.Equal(t, *tc.expLeaseholder, l.Replica)
// The transport retry will use the replica descriptor from the
// initial range descriptor, not the one returned in the NLHE, i.e.
// it won't have the non-nil type.
expRetryReplica := *tc.expLeaseholder
expRetryReplica.Type = nil
require.Equal(t, expRetryReplica, retryReplica)
} else {
require.Nil(t, rng.Lease())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (gt *grpcTransport) SkipReplica() {

func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
for i := range gt.replicas {
if gt.replicas[i] == replica {
if gt.replicas[i].IsSame(replica) {
// If we've already processed the replica, decrement the current
// index before we swap.
if i < gt.nextReplicaIdx {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

Expand All @@ -31,6 +32,8 @@ func TestTransportMoveToFront(t *testing.T) {
rd1 := roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1}
rd2 := roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2}
rd3 := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3}
rd3Incoming := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3,
Type: roachpb.ReplicaTypeVoterIncoming()}
gt := grpcTransport{replicas: []roachpb.ReplicaDescriptor{rd1, rd2, rd3}}

verifyOrder := func(replicas []roachpb.ReplicaDescriptor) {
Expand Down Expand Up @@ -95,6 +98,11 @@ func TestTransportMoveToFront(t *testing.T) {
if gt.nextReplicaIdx != 1 {
t.Fatalf("expected client index 1; got %d", gt.nextReplicaIdx)
}

// Move rd3 to front, even if the replica type differs.
gt.MoveToFront(rd3Incoming)
verifyOrder([]roachpb.ReplicaDescriptor{rd1, rd3, rd2})
require.Equal(t, 1, gt.nextReplicaIdx)
}

// TestSpanImport tests that the gRPC transport ingests trace information that
Expand Down
6 changes: 6 additions & 0 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ func (r ReplicaDescriptor) String() string {
return redact.StringWithoutMarkers(r)
}

// IsSame returns true if the two replica descriptors refer to the same replica,
// ignoring the replica type.
func (r ReplicaDescriptor) IsSame(o ReplicaDescriptor) bool {
return r.NodeID == o.NodeID && r.StoreID == o.StoreID && r.ReplicaID == o.ReplicaID
}

// SafeFormat implements the redact.SafeFormatter interface.
func (r ReplicaDescriptor) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("(n%d,s%d):", r.NodeID, r.StoreID)
Expand Down

0 comments on commit 0251570

Please sign in to comment.