Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: ignore leaseholder replica type in DistSender #85140

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,7 @@ func (ds *DistSender) sendToReplicas(
// account that the local node can't be down) it won't take long until we
// talk to a replica that tells us who the leaseholder is.
if ctx.Err() == nil {
if lh := routing.Leaseholder(); lh != nil && *lh == curReplica {
if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
routing.EvictLease(ctx)
}
}
Expand Down Expand Up @@ -2228,7 +2228,7 @@ func (ds *DistSender) sendToReplicas(
// (possibly because it hasn't applied its lease yet). Perhaps that
// lease expires and someone else gets a new one, so by moving on we
// get out of possibly infinite loops.
if *lh != curReplica || sameReplicaRetries < sameReplicaRetryLimit {
if !lh.IsSame(curReplica) || sameReplicaRetries < sameReplicaRetryLimit {
transport.MoveToFront(*lh)
}
}
Expand Down
31 changes: 24 additions & 7 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (l *simpleTransportAdapter) SkipReplica() {

func (l *simpleTransportAdapter) MoveToFront(replica roachpb.ReplicaDescriptor) {
for i := range l.replicas {
if l.replicas[i] == replica {
if l.replicas[i].IsSame(replica) {
// If we've already processed the replica, decrement the current
// index before we swap.
if i < l.nextReplicaIdx {
Expand Down Expand Up @@ -584,6 +584,8 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
ctx := context.Background()

recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[1]
recognizedLeaseHolderIncoming := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[2]
recognizedLeaseHolderIncoming.Type = roachpb.ReplicaTypeVoterIncoming()
unrecognizedLeaseHolder := roachpb.ReplicaDescriptor{
NodeID: 99,
StoreID: 999,
Expand Down Expand Up @@ -623,6 +625,15 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
},
expLeaseholder: nil,
},
{
name: "leaseholder in desc with different type",
nlhe: roachpb.NotLeaseHolderError{
RangeID: testUserRangeDescriptor3Replicas.RangeID,
Lease: &roachpb.Lease{Replica: recognizedLeaseHolderIncoming, Sequence: 1},
},
expLeaseholder: &recognizedLeaseHolderIncoming,
expLease: true,
},
{
name: "leaseholder unknown",
nlhe: roachpb.NotLeaseHolderError{
Expand All @@ -647,20 +658,22 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
))
}

first := true
var attempts int
var retryReplica roachpb.ReplicaDescriptor

var testFn simpleSendFn = func(
_ context.Context, args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
attempts++
reply := &roachpb.BatchResponse{}
if first {
if attempts == 1 {
reply.Error = roachpb.NewError(&tc.nlhe)
first = false
return reply, nil
}
// Return an error to avoid activating a code path that would update the
// cache with the leaseholder from the successful response. That's not
// what this test wants to test.
retryReplica = args.Header.Replica
reply.Error = roachpb.NewErrorf("boom")
return reply, nil
}
Expand All @@ -683,9 +696,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
if _, pErr := kv.SendWrapped(ctx, ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("unexpected error: %v", pErr)
}
if first {
t.Fatal("the request did not retry")
}
require.Equal(t, 2, attempts)
rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */)
require.NotNil(t, rng)

Expand All @@ -697,6 +708,12 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
l := rng.Lease()
require.NotNil(t, l)
require.Equal(t, *tc.expLeaseholder, l.Replica)
// The transport retry will use the replica descriptor from the
// initial range descriptor, not the one returned in the NLHE, i.e.
// it won't have the non-nil type.
expRetryReplica := *tc.expLeaseholder
expRetryReplica.Type = nil
require.Equal(t, expRetryReplica, retryReplica)
} else {
require.Nil(t, rng.Lease())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (gt *grpcTransport) SkipReplica() {

func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
for i := range gt.replicas {
if gt.replicas[i] == replica {
if gt.replicas[i].IsSame(replica) {
// If we've already processed the replica, decrement the current
// index before we swap.
if i < gt.nextReplicaIdx {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

Expand All @@ -31,6 +32,8 @@ func TestTransportMoveToFront(t *testing.T) {
rd1 := roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1}
rd2 := roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2}
rd3 := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3}
rd3Incoming := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3,
Type: roachpb.ReplicaTypeVoterIncoming()}
gt := grpcTransport{replicas: []roachpb.ReplicaDescriptor{rd1, rd2, rd3}}

verifyOrder := func(replicas []roachpb.ReplicaDescriptor) {
Expand Down Expand Up @@ -95,6 +98,11 @@ func TestTransportMoveToFront(t *testing.T) {
if gt.nextReplicaIdx != 1 {
t.Fatalf("expected client index 1; got %d", gt.nextReplicaIdx)
}

// Move rd3 to front, even if the replica type differs.
gt.MoveToFront(rd3Incoming)
verifyOrder([]roachpb.ReplicaDescriptor{rd1, rd3, rd2})
require.Equal(t, 1, gt.nextReplicaIdx)
}

// TestSpanImport tests that the gRPC transport ingests trace information that
Expand Down
6 changes: 6 additions & 0 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ func (r ReplicaDescriptor) String() string {
return redact.StringWithoutMarkers(r)
}

// IsSame returns true if the two replica descriptors refer to the same replica,
// ignoring the replica type.
func (r ReplicaDescriptor) IsSame(o ReplicaDescriptor) bool {
return r.NodeID == o.NodeID && r.StoreID == o.StoreID && r.ReplicaID == o.ReplicaID
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the relations between node, store, and replica?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ReplicaID local to a pair of (NodeID, StoreID)? Can it be compared without any of the two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One-to-many all the way down. One node can have many stores (disks), one store can have many replicas.

The node ID must be unique within the cluster. The store ID must be unique per node. The replica ID must be unique within the range. The range ID is sort of implied here, in that the replica descriptor is stored within a range descriptor.

In order to route a request to a replica we need to know all four. A replica ID won't move between nodes/stores -- if we have to move it, we'll create a new replica (with a new ID) on a different node, populate it, and then delete the old one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right in that we technically only need to compare the replica ID here, since we're operating within the context of a single range. We check the node ID and store ID just to make sure we have the right "address" for it, I suppose.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so: (rangeID, replicaID) -> (nodeID, storeID), and rangeID is implied by the caller of IsSame (otherwise, due to locality of replicaID, it would be comparing apples to tables). Hence, is it enough to just compare ReplicaIDs? From what you said it follows that if rangeID and replicaID are the same then nodeID and storeID are the same too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you answered this already. Were racing with the last 2 comments :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle it should be, yeah. But it's cheap enough to check the node and store too, so I don't see a strong reason not to. If there should be a mismatch for whatever reason, then there's no real point in trying to contact the replica anyway, because the transport will send it to the wrong place.

}

// SafeFormat implements the redact.SafeFormatter interface.
func (r ReplicaDescriptor) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("(n%d,s%d):", r.NodeID, r.StoreID)
Expand Down