Skip to content

Commit

Permalink
kvclient: allow DistSender to consider non-voters when routing requests
Browse files Browse the repository at this point in the history
Before this patch, the DistSender would only route BatchRequests to
replicas of types `VOTER_FULL` and `VOTER_INCOMING`. This patch changes
that to let the DistSender route requests (for instance, follower read
requests) to `NON_VOTER` replicas as well.

Makes progress on cockroachdb#51943

Release note: None
  • Loading branch information
aayushshah15 committed Jan 28, 2021
1 parent b907fd0 commit 2944a0d
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 109 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timest

// canSendToFollower implements the logic for checking whether a batch request
// may be sent to a follower.
// TODO(aayush): We should try to bind clusterID to the function here, rather
// than having callers plumb it in every time.
func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool {
return batchCanBeEvaluatedOnFollower(ba) &&
txnCanPerformFollowerRead(ba.Txn) &&
Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,14 @@ func (ds *DistSender) sendToReplicas(
desc := routing.Desc()
ba.RangeID = desc.RangeID
leaseholder := routing.Leaseholder()
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder)
canFollowerRead := (ds.clusterID != nil) && CanSendToFollower(ds.clusterID.Get(), ds.st, ba)
var replicas ReplicaSlice
var err error
if canFollowerRead {
replicas, err = NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, AllExtantReplicas)
} else {
replicas, err = NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, OnlyPotentialLeaseholders)
}
if err != nil {
return nil, err
}
Expand All @@ -1784,7 +1791,6 @@ func (ds *DistSender) sendToReplicas(

// Try the leaseholder first, if the request wants it.
{
canFollowerRead := (ds.clusterID != nil) && CanSendToFollower(ds.clusterID.Get(), ds.st, ba)
sendToLeaseholder := (leaseholder != nil) && !canFollowerRead && ba.RequiresLeaseHolder()
if sendToLeaseholder {
idx := replicas.Find(leaseholder.ReplicaID)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ func (ds *DistSender) singleRangeFeed(
if ds.rpcContext != nil {
latencyFn = ds.rpcContext.RemoteClocks.Latency
}
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil /* leaseholder */)
// TODO(aayush): We should enable creating RangeFeeds on non-voting replicas.
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, OnlyPotentialLeaseholders)
if err != nil {
return args.Timestamp, err
}
Expand Down
63 changes: 46 additions & 17 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,32 @@ func (i ReplicaInfo) addr() string {
// A ReplicaSlice is a slice of ReplicaInfo.
type ReplicaSlice []ReplicaInfo

// ReplicaSliceFilter controls which kinds of replicas are to be included in
// the slice for routing BatchRequests to.
type ReplicaSliceFilter int

const (
// OnlyPotentialLeaseholders prescribes that the ReplicaSlice should include
// only replicas that are allowed to be leaseholders (i.e. replicas of type
// VOTER_FULL).
OnlyPotentialLeaseholders ReplicaSliceFilter = iota
// AllExtantReplicas prescribes that the ReplicaSlice should include all
// replicas that are not LEARNERs, VOTER_OUTGOING, or
// VOTER_DEMOTING_{LEARNER/NON_VOTER}.
AllExtantReplicas
)

// NewReplicaSlice creates a ReplicaSlice from the replicas listed in the range
// descriptor and using gossip to lookup node descriptors. Replicas on nodes
// that are not gossiped are omitted from the result.
//
// Generally, only voting replicas are returned. However, if a non-nil
// leaseholder is passed in, it will be included in the result even if the
// descriptor has it as a learner (we assert that the leaseholder is part of the
// descriptor). The idea is that the descriptor might be stale and list the
// leaseholder as a learner erroneously, and lease info is a strong signal in
// that direction. Note that the returned ReplicaSlice might still not include
// the leaseholder if info for the respective node is missing from the
// NodeDescStore.
// Generally, learners are not returned. However, if a non-nil leaseholder is
// passed in, it will be included in the result even if the descriptor has it as
// a learner (we assert that the leaseholder is part of the descriptor). The
// idea is that the descriptor might be stale and list the leaseholder as a
// learner erroneously, and lease info is a strong signal in that direction.
// Note that the returned ReplicaSlice might still not include the leaseholder
// if info for the respective node is missing from the NodeDescStore.
//
// If there's no info in gossip for any of the nodes in the descriptor, a
// sendError is returned.
Expand All @@ -59,33 +73,48 @@ func NewReplicaSlice(
nodeDescs NodeDescStore,
desc *roachpb.RangeDescriptor,
leaseholder *roachpb.ReplicaDescriptor,
filter ReplicaSliceFilter,
) (ReplicaSlice, error) {
if leaseholder != nil {
if _, ok := desc.GetReplicaDescriptorByID(leaseholder.ReplicaID); !ok {
log.Fatalf(ctx, "leaseholder not in descriptor; leaseholder: %s, desc: %s", leaseholder, desc)
}
}
canReceiveLease := func(rDesc roachpb.ReplicaDescriptor) bool {
if err := roachpb.CheckCanReceiveLease(rDesc, desc); err != nil {
return false
}
return true
}

// Learner replicas won't serve reads/writes, so we'll send only to the
// `VoterDescriptors` replicas. This is just an optimization to save a network hop,
// everything would still work if we had `All` here.
voters := desc.Replicas().VoterDescriptors()
// Learner replicas won't serve reads/writes, so we'll send only to the voters
// and non-voting replicas. This is just an optimization to save a network
// hop, everything would still work if we had `All` here.
var replicas []roachpb.ReplicaDescriptor
switch filter {
case OnlyPotentialLeaseholders:
replicas = desc.Replicas().Filter(canReceiveLease).Descriptors()
case AllExtantReplicas:
replicas = desc.Replicas().VoterFullAndNonVoterDescriptors()
default:
log.Fatalf(ctx, "unknown ReplicaSliceFilter %s", filter)
}
// If we know a leaseholder, though, let's make sure we include it.
if leaseholder != nil && len(voters) < len(desc.Replicas().Descriptors()) {
if leaseholder != nil && len(replicas) < len(desc.Replicas().Descriptors()) {
found := false
for _, v := range voters {
for _, v := range replicas {
if v == *leaseholder {
found = true
break
}
}
if !found {
log.Eventf(ctx, "the descriptor has the leaseholder as a learner; including it anyway")
voters = append(voters, *leaseholder)
replicas = append(replicas, *leaseholder)
}
}
rs := make(ReplicaSlice, 0, len(voters))
for _, r := range voters {
rs := make(ReplicaSlice, 0, len(replicas))
for _, r := range replicas {
nd, err := nodeDescs.GetNodeDescriptor(r.NodeID)
if err != nil {
if log.V(1) {
Expand Down
21 changes: 17 additions & 4 deletions pkg/kv/kvclient/kvcoord/replica_slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,34 @@ func TestNewReplicaSlice(t *testing.T) {
},
},
}
rs, err := NewReplicaSlice(ctx, ns, rd, nil /* leaseholder */)
rs, err := NewReplicaSlice(ctx, ns, rd, nil, OnlyPotentialLeaseholders)
require.NoError(t, err)
require.Equal(t, 3, rs.Len())

// Check that learners are not included.
typLearner := roachpb.LEARNER
rd.InternalReplicas[2].Type = &typLearner
rs, err = NewReplicaSlice(ctx, ns, rd, nil /* leaseholder */)
rs, err = NewReplicaSlice(ctx, ns, rd, nil, OnlyPotentialLeaseholders)
require.NoError(t, err)
require.Equal(t, 2, rs.Len())
rs, err = NewReplicaSlice(ctx, ns, rd, nil, AllExtantReplicas)
require.NoError(t, err)
require.Equal(t, 2, rs.Len())

// Check that non-voters are included iff we ask for them to be.
typNonVoter := roachpb.NON_VOTER
rd.InternalReplicas[2].Type = &typNonVoter
rs, err = NewReplicaSlice(ctx, ns, rd, nil, AllExtantReplicas)
require.NoError(t, err)
require.Equal(t, 3, rs.Len())
rs, err = NewReplicaSlice(ctx, ns, rd, nil, OnlyPotentialLeaseholders)
require.NoError(t, err)
require.Equal(t, 2, rs.Len())

// Check that, if the leasehoder points to a learner, that learner is
// Check that, if the leaseholder points to a learner, that learner is
// included.
leaseholder := &roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3}
rs, err = NewReplicaSlice(ctx, ns, rd, leaseholder)
rs, err = NewReplicaSlice(ctx, ns, rd, leaseholder, OnlyPotentialLeaseholders)
require.NoError(t, err)
require.Equal(t, 3, rs.Len())
}
Expand Down
Loading

0 comments on commit 2944a0d

Please sign in to comment.