Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
56197: kvserver: make AdminRelocateRange work with non-voting replicas r=aayushshah15 a=aayushshah15

This commit teaches `RelocateRange` to work with non-voters. This lets
the merge queue rebalance a range that has non-voters so the merge can
actually proceed, as range merges require that replica sets of the LHS
and RHS ranges must be aligned.

Makes progress on cockroachdb#51943

Release note: None

Co-authored-by: Aayush Shah <[email protected]>
  • Loading branch information
craig[bot] and aayushshah15 committed Jan 7, 2021
2 parents 82448b8 + bfe698a commit c59186a
Show file tree
Hide file tree
Showing 66 changed files with 1,845 additions and 1,240 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
}, entry.Desc().Replicas().All())
}, entry.Desc().Replicas().Descriptors())

// Relocate the follower. n2 will no longer have a replica.
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,3], 1)`)
Expand All @@ -296,7 +296,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}, entry.Desc().Replicas().All())
}, entry.Desc().Replicas().Descriptors())

// Make a note of the follower reads metric on n3. We'll check that it was
// incremented.
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ func removeDeadReplicas(
err = kvserver.IterateRangeDescriptors(ctx, db, func(desc roachpb.RangeDescriptor) error {
hasSelf := false
numDeadPeers := 0
allReplicas := desc.Replicas().All()
allReplicas := desc.Replicas().Descriptors()
maxLivePeer := roachpb.StoreID(-1)
for _, rep := range allReplicas {
if rep.StoreID == storeIdent.StoreID {
Expand Down Expand Up @@ -998,7 +998,7 @@ func removeDeadReplicas(
StoreID: storeIdent.StoreID,
ReplicaID: desc.NextReplicaID,
}}
newDesc.SetReplicas(roachpb.MakeReplicaDescriptors(replicas))
newDesc.SetReplicas(roachpb.MakeReplicaSet(replicas))
newDesc.NextReplicaID++
fmt.Printf("Replica %s -> %s\n", &desc, &newDesc)
newDescs = append(newDescs, newDesc)
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,9 @@ func (b *Batch) adminChangeReplicas(

// adminRelocateRange is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminRelocateRange(key interface{}, targets []roachpb.ReplicationTarget) {
func (b *Batch) adminRelocateRange(
key interface{}, voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -725,7 +727,8 @@ func (b *Batch) adminRelocateRange(key interface{}, targets []roachpb.Replicatio
RequestHeader: roachpb.RequestHeader{
Key: k,
},
Targets: targets,
VoterTargets: voterTargets,
NonVoterTargets: nonVoterTargets,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,10 @@ func (db *DB) AdminChangeReplicas(
// AdminRelocateRange relocates the replicas for a range onto the specified
// list of stores.
func (db *DB) AdminRelocateRange(
ctx context.Context, key interface{}, targets []roachpb.ReplicationTarget,
ctx context.Context, key interface{}, voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
) error {
b := &Batch{}
b.adminRelocateRange(key, targets)
b.adminRelocateRange(key, voterTargets, nonVoterTargets)
return getOneErr(db.Run(ctx, b), b)
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()

recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().Voters()[1]
recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[1]
unrecognizedLeaseHolder := roachpb.ReplicaDescriptor{
NodeID: 99,
StoreID: 999,
Expand Down Expand Up @@ -642,7 +642,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
g := makeGossip(t, stopper, rpcContext)
for _, n := range testUserRangeDescriptor3Replicas.Replicas().Voters() {
for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(n.NodeID),
newNodeDesc(n.NodeID),
Expand Down Expand Up @@ -823,7 +823,7 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
g := makeGossip(t, stopper, rpcContext)
for _, n := range testUserRangeDescriptor3Replicas.Replicas().Voters() {
for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(n.NodeID),
newNodeDesc(n.NodeID),
Expand Down Expand Up @@ -4329,17 +4329,17 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) {
br := &roachpb.BatchResponse{}
switch call {
case 0:
expRepl := desc1.Replicas().All()[0]
expRepl := desc1.Replicas().Descriptors()[0]
require.Equal(t, expRepl, ba.Replica)
br.Error = roachpb.NewError(&roachpb.NotLeaseHolderError{
Lease: &roachpb.Lease{Replica: desc1.Replicas().All()[1]},
Lease: &roachpb.Lease{Replica: desc1.Replicas().Descriptors()[1]},
})
case 1:
expRep := desc1.Replicas().All()[1]
expRep := desc1.Replicas().Descriptors()[1]
require.Equal(t, ba.Replica, expRep)
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID))
case 2:
expRep := desc2.Replicas().All()[0]
expRep := desc2.Replicas().Descriptors()[0]
require.Equal(t, ba.Replica, expRep)
br = ba.CreateReply()
default:
Expand Down Expand Up @@ -4420,7 +4420,7 @@ func TestDistSenderRPCMetrics(t *testing.T) {
br := &roachpb.BatchResponse{}
if call == 0 {
br.Error = roachpb.NewError(&roachpb.NotLeaseHolderError{
Lease: &roachpb.Lease{Replica: desc.Replicas().All()[1]},
Lease: &roachpb.Lease{Replica: desc.Replicas().Descriptors()[1]},
})
} else {
br.Error = roachpb.NewError(&roachpb.ConditionFailedError{})
Expand Down Expand Up @@ -4449,7 +4449,7 @@ func TestDistSenderRPCMetrics(t *testing.T) {
ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: roachpb.Lease{
Replica: desc.Replicas().All()[0],
Replica: desc.Replicas().Descriptors()[0],
},
})
var ba roachpb.BatchRequest
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ func NewReplicaSlice(
}

// Learner replicas won't serve reads/writes, so we'll send only to the
// `Voters` replicas. This is just an optimization to save a network hop,
// `VoterDescriptors` replicas. This is just an optimization to save a network hop,
// everything would still work if we had `All` here.
voters := desc.Replicas().Voters()
voters := desc.Replicas().VoterDescriptors()
// If we know a leaseholder, though, let's make sure we include it.
if leaseholder != nil && len(voters) < len(desc.Replicas().All()) {
if leaseholder != nil && len(voters) < len(desc.Replicas().Descriptors()) {
found := false
for _, v := range voters {
if v == *leaseholder {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func newGetReplicasFn(dbs ...*kv.DB) GetReplicasFn {
ctx := context.Background()
return func(key roachpb.Key) []roachpb.ReplicationTarget {
desc := getRangeDesc(ctx, key, dbs...)
replicas := desc.Replicas().All()
replicas := desc.Replicas().Descriptors()
targets := make([]roachpb.ReplicationTarget, len(replicas))
for i, replica := range replicas {
targets[i] = roachpb.ReplicationTarget{
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/build",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/config/zonepb",
Expand Down
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (a *Allocator) ComputeAction(
// On the other hand if we get the race where a leaseholder starts adding a
// replica in the replicate queue and during this loses its lease, it should
// probably not retry.
if learners := desc.Replicas().Learners(); len(learners) > 0 {
if learners := desc.Replicas().LearnerDescriptors(); len(learners) > 0 {
// TODO(dan): Since this goes before anything else, the priority here should
// be influenced by whatever operations would happen right after the learner
// is removed. In the meantime, we don't want to block something important
Expand All @@ -356,7 +356,7 @@ func (a *Allocator) ComputeAction(
return AllocatorRemoveLearner, removeLearnerReplicaPriority
}
// computeAction expects to operate only on voters.
return a.computeAction(ctx, zone, desc.Replicas().Voters())
return a.computeAction(ctx, zone, desc.Replicas().VoterDescriptors())
}

func (a *Allocator) computeAction(
Expand Down Expand Up @@ -495,17 +495,17 @@ func (a *Allocator) AllocateTarget(

func (a *Allocator) allocateTargetFromList(
ctx context.Context,
sl StoreList,
candidateStores StoreList,
zone *zonepb.ZoneConfig,
candidateReplicas []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
options scorerOptions,
) (*roachpb.StoreDescriptor, string) {
analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, candidateReplicas, zone)
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
candidates := allocateCandidates(
ctx,
sl, analyzedConstraints, candidateReplicas,
a.storePool.getLocalitiesByStore(candidateReplicas),
candidateStores, analyzedConstraints, existingReplicas,
a.storePool.getLocalitiesByStore(existingReplicas),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
options,
)
Expand Down Expand Up @@ -560,17 +560,17 @@ func (a Allocator) RemoveTarget(
}

// Retrieve store descriptors for the provided candidates from the StorePool.
existingStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
candidateStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
for i, exist := range candidates {
existingStoreIDs[i] = exist.StoreID
candidateStoreIDs[i] = exist.StoreID
}
sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, storeFilterNone)
candidateStoreList, _, _ := a.storePool.getStoreListFromIDs(candidateStoreIDs, storeFilterNone)

analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
options := a.scorerOptions()
rankedCandidates := removeCandidates(
sl,
candidateStoreList,
analyzedConstraints,
a.storePool.getLocalitiesByStore(existingReplicas),
options,
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,16 +409,16 @@ func (cl candidateList) removeCandidate(c candidate) candidateList {
// stores that meet the criteria are included in the list.
func allocateCandidates(
ctx context.Context,
sl StoreList,
candidateStores StoreList,
constraints constraint.AnalyzedConstraints,
existing []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
options scorerOptions,
) candidateList {
var candidates candidateList
for _, s := range sl.stores {
if nodeHasReplica(s.Node.NodeID, existing) {
for _, s := range candidateStores.stores {
if nodeHasReplica(s.Node.NodeID, existingReplicas) {
continue
}
if !isNodeValidForRoutineReplicaTransfer(ctx, s.Node.NodeID) {
Expand All @@ -433,14 +433,14 @@ func allocateCandidates(
continue
}
diversityScore := diversityAllocateScore(s, existingStoreLocalities)
balanceScore := balanceScore(sl, s.Capacity, options)
balanceScore := balanceScore(candidateStores, s.Capacity, options)
var convergesScore int
if options.qpsRebalanceThreshold > 0 {
if s.Capacity.QueriesPerSecond < underfullThreshold(sl.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
if s.Capacity.QueriesPerSecond < underfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
convergesScore = 1
} else if s.Capacity.QueriesPerSecond < sl.candidateQueriesPerSecond.mean {
} else if s.Capacity.QueriesPerSecond < candidateStores.candidateQueriesPerSecond.mean {
convergesScore = 0
} else if s.Capacity.QueriesPerSecond < overfullThreshold(sl.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
} else if s.Capacity.QueriesPerSecond < overfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
convergesScore = -1
} else {
convergesScore = -2
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestLeaseCommandLearnerReplica(t *testing.T) {
{NodeID: 2, StoreID: learnerStoreID, Type: roachpb.ReplicaTypeLearner(), ReplicaID: 2},
}
desc := roachpb.RangeDescriptor{}
desc.SetReplicas(roachpb.MakeReplicaDescriptors(replicas))
desc.SetReplicas(roachpb.MakeReplicaSet(replicas))
cArgs := CommandArgs{
EvalCtx: (&MockEvalCtx{
StoreID: voterStoreID,
Expand Down
Loading

0 comments on commit c59186a

Please sign in to comment.