From 9d404ba94ddf065e0fba477d0bd3cf1232ea93fd Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Mon, 8 Mar 2021 10:18:04 -0500 Subject: [PATCH 1/2] kvserver, allocator: emit actions to handle dead/decommissioning non-voters This commit teaches the allocator to emit actions to repair a range by removing/replacing dead or decommissioning non-voters. Release justification: needed for non-voting replicas Release note: None --- pkg/kv/kvserver/allocator.go | 265 +++++++++++++++++++----------- pkg/kv/kvserver/allocator_test.go | 164 ++++++++++++++++++ 2 files changed, 333 insertions(+), 96 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 0e0b73598a65..d1d884801283 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -47,25 +47,6 @@ const ( // needed because a weight of zero doesn't work in the current lease scoring // algorithm. minReplicaWeight = 0.001 - - // Priorities for various repair operations. - // - // NB: These priorities only influence the replicateQueue's understanding of - // which ranges are to be dealt with before others. In other words, these - // priorities don't influence the relative order of actions taken on a given - // range. Within a given range, the ordering of the various checks inside - // `Allocator.computeAction` determines which repair/rebalancing actions are - // taken before the others. - finalizeAtomicReplicationChangePriority float64 = 12002 - removeLearnerReplicaPriority float64 = 12001 - addDeadReplacementVoterPriority float64 = 12000 - addMissingVoterPriority float64 = 10000 - addDecommissioningReplacementVoterPriority float64 = 5000 - removeDeadVoterPriority float64 = 1000 - removeDecommissioningVoterPriority float64 = 900 - removeExtraVoterPriority float64 = 800 - addMissingNonVoterPriority float64 = 700 - removeExtraNonVoterPriority float64 = 600 ) // MinLeaseTransferStatsDuration configures the minimum amount of time a @@ -113,9 +94,13 @@ const ( AllocatorAddVoter AllocatorAddNonVoter AllocatorReplaceDeadVoter + AllocatorReplaceDeadNonVoter AllocatorRemoveDeadVoter + AllocatorRemoveDeadNonVoter AllocatorReplaceDecommissioningVoter + AllocatorReplaceDecommissioningNonVoter AllocatorRemoveDecommissioningVoter + AllocatorRemoveDecommissioningNonVoter AllocatorRemoveLearner AllocatorConsiderRebalance AllocatorRangeUnavailable @@ -129,9 +114,13 @@ var allocatorActionNames = map[AllocatorAction]string{ AllocatorAddVoter: "add voter", AllocatorAddNonVoter: "add non-voter", AllocatorReplaceDeadVoter: "replace dead voter", + AllocatorReplaceDeadNonVoter: "replace dead non-voter", AllocatorRemoveDeadVoter: "remove dead voter", + AllocatorRemoveDeadNonVoter: "remove dead non-voter", AllocatorReplaceDecommissioningVoter: "replace decommissioning voter", + AllocatorReplaceDecommissioningNonVoter: "replace decommissioning non-voter", AllocatorRemoveDecommissioningVoter: "remove decommissioning voter", + AllocatorRemoveDecommissioningNonVoter: "remove decommissioning non-voter", AllocatorRemoveLearner: "remove learner", AllocatorConsiderRebalance: "consider rebalance", AllocatorRangeUnavailable: "range unavailable", @@ -142,6 +131,51 @@ func (a AllocatorAction) String() string { return allocatorActionNames[a] } +// Priority defines the priorities for various repair operations. +// +// NB: These priorities only influence the replicateQueue's understanding of +// which ranges are to be dealt with before others. In other words, these +// priorities don't influence the relative order of actions taken on a given +// range. Within a given range, the ordering of the various checks inside +// `Allocator.computeAction` determines which repair/rebalancing actions are +// taken before the others. +func (a AllocatorAction) Priority() float64 { + switch a { + case AllocatorFinalizeAtomicReplicationChange: + return 12002 + case AllocatorRemoveLearner: + return 12001 + case AllocatorReplaceDeadVoter: + return 12000 + case AllocatorAddVoter: + return 10000 + case AllocatorReplaceDecommissioningVoter: + return 5000 + case AllocatorRemoveDeadVoter: + return 1000 + case AllocatorRemoveDecommissioningVoter: + return 900 + case AllocatorRemoveVoter: + return 800 + case AllocatorReplaceDeadNonVoter: + return 700 + case AllocatorAddNonVoter: + return 600 + case AllocatorReplaceDecommissioningNonVoter: + return 500 + case AllocatorRemoveDeadNonVoter: + return 400 + case AllocatorRemoveDecommissioningNonVoter: + return 300 + case AllocatorRemoveNonVoter: + return 200 + case AllocatorConsiderRebalance, AllocatorRangeUnavailable, AllocatorNoop: + return 0 + default: + panic(fmt.Sprintf("unknown AllocatorAction: %s", a)) + } +} + type targetReplicaType int const ( @@ -376,61 +410,68 @@ func GetNeededNonVoters(numVoters, zoneConfigNonVoterCount, clusterNodes int) in // returns the required action that should be taken and a priority. func (a *Allocator) ComputeAction( ctx context.Context, zone *zonepb.ZoneConfig, desc *roachpb.RangeDescriptor, -) (AllocatorAction, float64) { +) (action AllocatorAction, priority float64) { if a.storePool == nil { // Do nothing if storePool is nil for some unittests. - return AllocatorNoop, 0 + action = AllocatorNoop + return action, action.Priority() } if desc.Replicas().InAtomicReplicationChange() { // With a similar reasoning to the learner branch below, if we're in a // joint configuration the top priority is to leave it before we can // even think about doing anything else. - return AllocatorFinalizeAtomicReplicationChange, finalizeAtomicReplicationChangePriority + action = AllocatorFinalizeAtomicReplicationChange + return action, action.Priority() } - // Seeing a learner replica at this point is unexpected because learners are a - // short-lived (ish) transient state in a learner+snapshot+voter cycle, which - // is always done atomically. Only two places could have added a learner: the - // replicate queue or AdminChangeReplicas request. - // - // The replicate queue only operates on leaseholders, which means that only - // one node at a time is operating on a given range except in rare cases (old - // leaseholder could start the operation, and a new leaseholder steps up and - // also starts an overlapping operation). Combined with the above atomicity, - // this means that if the replicate queue sees a learner, either the node that - // was adding it crashed somewhere in the learner+snapshot+voter cycle and - // we're the new leaseholder or we caught a race. - // - // In the first case, we could assume the node that was adding it knew what it - // was doing and finish the addition. Or we could leave it and do higher - // priority operations first if there are any. However, this comes with code - // complexity and concept complexity (computing old vs new quorum sizes - // becomes ambiguous, the learner isn't in the quorum but it likely will be - // soon, so do you count it?). Instead, we do the simplest thing and remove it - // before doing any other operations to the range. We'll revisit this decision - // if and when the complexity becomes necessary. - // - // If we get the race where AdminChangeReplicas is adding a replica and the - // queue happens to run during the snapshot, this will remove the learner and - // AdminChangeReplicas will notice either during the snapshot transfer or when - // it tries to promote the learner to a voter. AdminChangeReplicas should - // retry. - // - // On the other hand if we get the race where a leaseholder starts adding a - // replica in the replicate queue and during this loses its lease, it should - // probably not retry. if learners := desc.Replicas().LearnerDescriptors(); len(learners) > 0 { + // Seeing a learner replica at this point is unexpected because learners are + // a short-lived (ish) transient state in a learner+snapshot+voter cycle, + // which is always done atomically. Only two places could have added a + // learner: the replicate queue or AdminChangeReplicas request. + // + // The replicate queue only operates on leaseholders, which means that only + // one node at a time is operating on a given range except in rare cases + // (old leaseholder could start the operation, and a new leaseholder steps + // up and also starts an overlapping operation). Combined with the above + // atomicity, this means that if the replicate queue sees a learner, either + // the node that was adding it crashed somewhere in the + // learner+snapshot+voter cycle and we're the new leaseholder or we caught a + // race. + // + // In the first case, we could assume the node that was adding it knew what + // it was doing and finish the addition. Or we could leave it and do higher + // priority operations first if there are any. However, this comes with code + // complexity and concept complexity (computing old vs new quorum sizes + // becomes ambiguous, the learner isn't in the quorum but it likely will be + // soon, so do you count it?). Instead, we do the simplest thing and remove + // it before doing any other operations to the range. We'll revisit this + // decision if and when the complexity becomes necessary. + // + // If we get the race where AdminChangeReplicas is adding a replica and the + // queue happens to run during the snapshot, this will remove the learner + // and AdminChangeReplicas will notice either during the snapshot transfer + // or when it tries to promote the learner to a voter. AdminChangeReplicas + // should retry. + // + // On the other hand if we get the race where a leaseholder starts adding a + // replica in the replicate queue and during this loses its lease, it should + // probably not retry. + // // TODO(dan): Since this goes before anything else, the priority here should // be influenced by whatever operations would happen right after the learner // is removed. In the meantime, we don't want to block something important // from happening (like addDeadReplacementVoterPriority) by queueing this at // a low priority so until this TODO is done, keep // removeLearnerReplicaPriority as the highest priority. - return AllocatorRemoveLearner, removeLearnerReplicaPriority + action = AllocatorRemoveLearner + return action, action.Priority() } + return a.computeAction(ctx, zone, desc.Replicas().VoterDescriptors(), desc.Replicas().NonVoterDescriptors()) + } func (a *Allocator) computeAction( @@ -438,7 +479,7 @@ func (a *Allocator) computeAction( zone *zonepb.ZoneConfig, voterReplicas []roachpb.ReplicaDescriptor, nonVoterReplicas []roachpb.ReplicaDescriptor, -) (AllocatorAction, float64) { +) (action AllocatorAction, adjustedPriority float64) { // NB: The ordering of the checks in this method is intentional. The order in // which these actions are returned by this method determines the relative // priority of the actions taken on a given range. We want this to be @@ -453,8 +494,8 @@ func (a *Allocator) computeAction( // actions. haveVoters := len(voterReplicas) decommissioningVoters := a.storePool.decommissioningReplicas(voterReplicas) - // Node count including dead nodes but excluding decommissioning/decommissioned - // nodes. + // Node count including dead nodes but excluding + // decommissioning/decommissioned nodes. clusterNodes := a.storePool.ClusterNodeCount() neededVoters := GetNeededVoters(zone.GetNumVoters(), clusterNodes) desiredQuorum := computeQuorum(neededVoters) @@ -467,11 +508,11 @@ func (a *Allocator) computeAction( // Range is under-replicated, and should add an additional voter. // Priority is adjusted by the difference between the current voter // count and the quorum of the desired voter count. - priority := addMissingVoterPriority + float64(desiredQuorum-haveVoters) - action := AllocatorAddVoter + action = AllocatorAddVoter + adjustedPriority = action.Priority() + float64(desiredQuorum-haveVoters) log.VEventf(ctx, 3, "%s - missing voter need=%d, have=%d, priority=%.2f", - action, neededVoters, haveVoters, priority) - return action, priority + action, neededVoters, haveVoters, adjustedPriority) + return action, adjustedPriority } liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas) @@ -481,9 +522,10 @@ func (a *Allocator) computeAction( // live voters. If we're correctly assessing the unavailable state of the // range, we also won't be able to add replicas as we try above, but hope // springs eternal. + action = AllocatorRangeUnavailable log.VEventf(ctx, 1, "unable to take action - live voters %v don't meet quorum of %d", liveVoters, quorum) - return AllocatorRangeUnavailable, 0 + return action, action.Priority() } if haveVoters == neededVoters && len(deadVoters) > 0 { @@ -491,21 +533,18 @@ func (a *Allocator) computeAction( // before removing the dead one. This can avoid permanent data loss in cases // where the node is only temporarily dead, but we remove it from the range // and lose a second node before we can up-replicate (#25392). - // The dead voter(s) will be down-replicated later. - priority := addDeadReplacementVoterPriority - action := AllocatorReplaceDeadVoter + action = AllocatorReplaceDeadVoter log.VEventf(ctx, 3, "%s - replacement for %d dead voters priority=%.2f", - action, len(deadVoters), priority) - return action, priority + action, len(deadVoters), action.Priority()) + return action, action.Priority() } if haveVoters == neededVoters && len(decommissioningVoters) > 0 { // Range has decommissioning voter(s), which should be replaced. - priority := addDecommissioningReplacementVoterPriority - action := AllocatorReplaceDecommissioningVoter + action = AllocatorReplaceDecommissioningVoter log.VEventf(ctx, 3, "%s - replacement for %d decommissioning voters priority=%.2f", - action, len(decommissioningVoters), priority) - return action, priority + action, len(decommissioningVoters), action.Priority()) + return action, action.Priority() } // Voting replica removal actions follow. @@ -515,62 +554,96 @@ func (a *Allocator) computeAction( // remove the dead replica(s) to get down to an odd number of replicas. if len(deadVoters) > 0 { // The range has dead replicas, which should be removed immediately. - priority := removeDeadVoterPriority + float64(quorum-len(liveVoters)) - action := AllocatorRemoveDeadVoter + action = AllocatorRemoveDeadVoter + adjustedPriority = action.Priority() + float64(quorum-len(liveVoters)) log.VEventf(ctx, 3, "%s - dead=%d, live=%d, quorum=%d, priority=%.2f", - action, len(deadVoters), len(liveVoters), quorum, priority) - return action, priority + action, len(deadVoters), len(liveVoters), quorum, adjustedPriority) + return action, adjustedPriority } if len(decommissioningVoters) > 0 { // Range is over-replicated, and has a decommissioning voter which // should be removed. - priority := removeDecommissioningVoterPriority - action := AllocatorRemoveDecommissioningVoter + action = AllocatorRemoveDecommissioningVoter log.VEventf(ctx, 3, "%s - need=%d, have=%d, num_decommissioning=%d, priority=%.2f", - action, neededVoters, haveVoters, len(decommissioningVoters), priority) - return action, priority + action, neededVoters, haveVoters, len(decommissioningVoters), action.Priority()) + return action, action.Priority() } if haveVoters > neededVoters { // Range is over-replicated, and should remove a voter. // Ranges with an even number of voters get extra priority because // they have a more fragile quorum. - priority := removeExtraVoterPriority - float64(haveVoters%2) - action := AllocatorRemoveVoter - log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, neededVoters, haveVoters, priority) - return action, priority + action = AllocatorRemoveVoter + adjustedPriority = action.Priority() - float64(haveVoters%2) + log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, neededVoters, + haveVoters, adjustedPriority) + return action, adjustedPriority } // Non-voting replica actions follow. // - // Non-voting replica addition. + // Non-voting replica addition / replacement. haveNonVoters := len(nonVoterReplicas) neededNonVoters := GetNeededNonVoters(haveVoters, int(zone.GetNumNonVoters()), clusterNodes) if haveNonVoters < neededNonVoters { - priority := addMissingNonVoterPriority - action := AllocatorAddNonVoter + action = AllocatorAddNonVoter log.VEventf(ctx, 3, "%s - missing non-voter need=%d, have=%d, priority=%.2f", - action, neededNonVoters, haveNonVoters, priority) - return action, priority + action, neededNonVoters, haveNonVoters, action.Priority()) + return action, action.Priority() + } + + liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas(nonVoterReplicas) + if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 { + // The range has non-voter(s) on a dead node that we should replace. + action = AllocatorReplaceDeadNonVoter + log.VEventf(ctx, 3, "%s - replacement for %d dead non-voters priority=%.2f", + action, len(deadNonVoters), action.Priority()) + return action, action.Priority() + } + + decommissioningNonVoters := a.storePool.decommissioningReplicas(nonVoterReplicas) + if haveNonVoters == neededNonVoters && len(decommissioningNonVoters) > 0 { + // The range has non-voter(s) on a decommissioning node that we should + // replace. + action = AllocatorReplaceDecommissioningNonVoter + log.VEventf(ctx, 3, "%s - replacement for %d decommissioning non-voters priority=%.2f", + action, len(decommissioningNonVoters), action.Priority()) + return action, action.Priority() } // Non-voting replica removal. + if len(deadNonVoters) > 0 { + // The range is over-replicated _and_ has non-voter(s) on a dead node. We'll + // just remove these. + action = AllocatorRemoveDeadNonVoter + log.VEventf(ctx, 3, "%s - dead=%d, live=%d, quorum=%d, priority=%.2f", + action, len(deadNonVoters), len(liveNonVoters), quorum, action.Priority()) + return action, action.Priority() + } - // TODO(aayush): Handle removal/replacement of decommissioning/dead non-voting - // replicas. + if len(decommissioningNonVoters) > 0 { + // The range is over-replicated _and_ has non-voter(s) on a decommissioning + // node. We'll just remove these. + action = AllocatorRemoveDecommissioningNonVoter + log.VEventf(ctx, 3, + "%s - need=%d, have=%d, num_decommissioning=%d, priority=%.2f", + action, neededNonVoters, haveNonVoters, len(decommissioningNonVoters), action.Priority()) + return action, action.Priority() + } if haveNonVoters > neededNonVoters { - // Like above, the range is over-replicated but should remove a non-voter. - priority := removeExtraNonVoterPriority - action := AllocatorRemoveNonVoter - log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, neededNonVoters, haveNonVoters, priority) - return action, priority + // The range is simply over-replicated and should remove a non-voter. + action = AllocatorRemoveNonVoter + log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, + neededNonVoters, haveNonVoters, action.Priority()) + return action, action.Priority() } // Nothing needs to be done, but we may want to rebalance. - return AllocatorConsiderRebalance, 0 + action = AllocatorConsiderRebalance + return action, action.Priority() } // getReplicasForDiversityCalc returns the set of replica descriptors that diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index cb347dd04c06..6b4992a7c551 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -5083,6 +5083,47 @@ func TestAllocatorComputeAction(t *testing.T) { }, expectedAction: AllocatorRemoveVoter, }, + // Need 2 non-voting replicas, have 2 but one of them is on a dead node. + { + zone: zonepb.ZoneConfig{ + NumReplicas: proto.Int32(5), + NumVoters: proto.Int32(3), + RangeMinBytes: proto.Int64(0), + RangeMaxBytes: proto.Int64(64000), + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + Type: roachpb.ReplicaTypeNonVoter(), + }, + { + StoreID: 6, + NodeID: 6, + ReplicaID: 6, + Type: roachpb.ReplicaTypeNonVoter(), + }, + }, + }, + expectedAction: AllocatorReplaceDeadNonVoter, + }, // Need 2 non-voting replicas, have none. { zone: zonepb.ZoneConfig{ @@ -5112,6 +5153,61 @@ func TestAllocatorComputeAction(t *testing.T) { }, expectedAction: AllocatorAddNonVoter, }, + // Need 2 non-voting replicas, have 1 but its on a dead node. + { + zone: zonepb.ZoneConfig{ + NumReplicas: proto.Int32(3), + NumVoters: proto.Int32(1), + RangeMinBytes: proto.Int64(0), + RangeMaxBytes: proto.Int64(64000), + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 6, + NodeID: 6, + ReplicaID: 6, + Type: roachpb.ReplicaTypeNonVoter(), + }, + }, + }, + expectedAction: AllocatorAddNonVoter, + }, + { + zone: zonepb.ZoneConfig{ + NumReplicas: proto.Int32(2), + NumVoters: proto.Int32(1), + RangeMinBytes: proto.Int64(0), + RangeMaxBytes: proto.Int64(64000), + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + Type: roachpb.ReplicaTypeNonVoter(), + }, + { + StoreID: 6, + NodeID: 6, + ReplicaID: 6, + Type: roachpb.ReplicaTypeNonVoter(), + }, + }, + }, + expectedAction: AllocatorRemoveDeadNonVoter, + }, // Need 1 non-voting replicas, have 2. { zone: zonepb.ZoneConfig{ @@ -5564,6 +5660,74 @@ func TestAllocatorComputeActionDecommission(t *testing.T) { dead: nil, decommissioning: []roachpb.StoreID{1, 2, 3}, }, + { + zone: zonepb.ZoneConfig{ + NumVoters: proto.Int32(1), + NumReplicas: proto.Int32(3), + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + Type: roachpb.ReplicaTypeNonVoter(), + }, + { + StoreID: 6, + NodeID: 6, + ReplicaID: 6, + Type: roachpb.ReplicaTypeNonVoter(), + }, + { + StoreID: 7, + NodeID: 7, + ReplicaID: 7, + Type: roachpb.ReplicaTypeNonVoter(), + }, + }, + }, + expectedAction: AllocatorRemoveDecommissioningNonVoter, + live: []roachpb.StoreID{1, 4, 6}, + dead: nil, + decommissioning: []roachpb.StoreID{7}, + }, + { + zone: zonepb.ZoneConfig{ + NumVoters: proto.Int32(1), + NumReplicas: proto.Int32(3), + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + Type: roachpb.ReplicaTypeNonVoter(), + }, + { + StoreID: 6, + NodeID: 6, + ReplicaID: 6, + Type: roachpb.ReplicaTypeNonVoter(), + }, + }, + }, + expectedAction: AllocatorReplaceDecommissioningNonVoter, + live: []roachpb.StoreID{1, 2, 3, 4, 6}, + dead: nil, + decommissioning: []roachpb.StoreID{4}, + }, } stopper, _, sp, a, _ := createTestAllocator(10, false /* deterministic */) From 777dadd45dbb568c91df7b711f21b5086387662c Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Mon, 8 Mar 2021 18:11:34 -0500 Subject: [PATCH 2/2] kvserver: teach replicateQueue to replace dead/decommisioning non-voters The commit is very similar to #59403. Previously, non-voters on dead or decommissioning nodes would not get removed or replaced. This commit fixes this behavior. Release justification: fixes limitation where dead/decommissioning non-voters would not be removed or replaced Release note: None --- pkg/kv/kvserver/allocator.go | 4 +- pkg/kv/kvserver/replicate_queue.go | 247 +++++++++++++++------ pkg/kv/kvserver/replicate_queue_test.go | 278 ++++++++++++++++++++++++ pkg/kv/kvserver/testing_knobs.go | 3 + pkg/server/server.go | 7 +- 5 files changed, 465 insertions(+), 74 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index d1d884801283..45df126e456d 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -618,8 +618,8 @@ func (a *Allocator) computeAction( // The range is over-replicated _and_ has non-voter(s) on a dead node. We'll // just remove these. action = AllocatorRemoveDeadNonVoter - log.VEventf(ctx, 3, "%s - dead=%d, live=%d, quorum=%d, priority=%.2f", - action, len(deadNonVoters), len(liveNonVoters), quorum, action.Priority()) + log.VEventf(ctx, 3, "%s - dead=%d, live=%d, priority=%.2f", + action, len(deadNonVoters), len(liveNonVoters), action.Priority()) return action, action.Priority() } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 0d41747a226d..a55c8528377d 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -334,6 +334,13 @@ func (rq *replicateQueue) processOneChange( return false, pErr.GoError() } + // TODO(aayush): The fact that we're calling `repl.DescAndZone()` here once to + // pass to `ComputeAction()` to use for deciding which action to take to + // repair a range, and then calling it again inside methods like + // `addOrReplace{Non}Voters()` or `remove{Dead,Decommissioning}` to execute + // upon that decision is a bit unfortunate. It means that we could + // successfully execute a decision that was based on the state of a stale + // range descriptor. desc, zone := repl.DescAndZone() // Avoid taking action if the range has too many dead replicas to make @@ -341,7 +348,7 @@ func (rq *replicateQueue) processOneChange( voterReplicas := desc.Replicas().VoterDescriptors() nonVoterReplicas := desc.Replicas().NonVoterDescriptors() liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(voterReplicas) - liveNonVoterReplicas, _ := rq.allocator.storePool.liveAndDeadReplicas(nonVoterReplicas) + liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(nonVoterReplicas) // NB: the replication layer ensures that the below operations don't cause // unavailability; see: @@ -363,62 +370,92 @@ func (rq *replicateQueue) processOneChange( // lost quorum. Either way, it's not a good idea to make changes right now. // Let the scanner requeue it again later. return false, nil + + // Add replicas. case AllocatorAddVoter: - return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, - -1 /* removeIdx */, dryRun) + return rq.addOrReplaceVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, dryRun) case AllocatorAddNonVoter: - return rq.addNonVoter(ctx, repl, nonVoterReplicas, liveVoterReplicas, liveNonVoterReplicas, dryRun) + return rq.addOrReplaceNonVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, dryRun) + + // Remove replicas. case AllocatorRemoveVoter: return rq.removeVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) case AllocatorRemoveNonVoter: return rq.removeNonVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) + + // Replace dead replicas. case AllocatorReplaceDeadVoter: if len(deadVoterReplicas) == 0 { // Nothing to do. return false, nil } - removeIdx := -1 // guaranteed to be changed below - for i, rDesc := range voterReplicas { - if rDesc.StoreID == deadVoterReplicas[0].StoreID { - removeIdx = i - break - } - } + removeIdx := getRemoveIdx(voterReplicas, deadVoterReplicas[0]) if removeIdx < 0 { return false, errors.AssertionFailedf( "dead voter %v unexpectedly not found in %v", deadVoterReplicas[0], voterReplicas) } - return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) - case AllocatorReplaceDecommissioningVoter: - decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas(voterReplicas) - if len(decommissioningReplicas) == 0 { + return rq.addOrReplaceVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + case AllocatorReplaceDeadNonVoter: + if len(deadNonVoterReplicas) == 0 { // Nothing to do. return false, nil } - removeIdx := -1 // guaranteed to be changed below - for i, rDesc := range voterReplicas { - if rDesc.StoreID == decommissioningReplicas[0].StoreID { - removeIdx = i - break - } + removeIdx := getRemoveIdx(nonVoterReplicas, deadNonVoterReplicas[0]) + if removeIdx < 0 { + return false, errors.AssertionFailedf( + "dead non-voter %v unexpectedly not found in %v", + deadNonVoterReplicas[0], nonVoterReplicas) + } + return rq.addOrReplaceNonVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + + // Replace decommissioning replicas. + case AllocatorReplaceDecommissioningVoter: + decommissioningVoterReplicas := rq.allocator.storePool.decommissioningReplicas(voterReplicas) + if len(decommissioningVoterReplicas) == 0 { + // Nothing to do. + return false, nil } + removeIdx := getRemoveIdx(voterReplicas, decommissioningVoterReplicas[0]) if removeIdx < 0 { return false, errors.AssertionFailedf( "decommissioning voter %v unexpectedly not found in %v", - decommissioningReplicas[0], voterReplicas) + decommissioningVoterReplicas[0], voterReplicas) } - return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + return rq.addOrReplaceVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + case AllocatorReplaceDecommissioningNonVoter: + decommissioningNonVoterReplicas := rq.allocator.storePool.decommissioningReplicas(nonVoterReplicas) + if len(decommissioningNonVoterReplicas) == 0 { + return false, nil + } + removeIdx := getRemoveIdx(nonVoterReplicas, decommissioningNonVoterReplicas[0]) + if removeIdx < 0 { + return false, errors.AssertionFailedf( + "decommissioning non-voter %v unexpectedly not found in %v", + decommissioningNonVoterReplicas[0], nonVoterReplicas) + } + return rq.addOrReplaceNonVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + + // Remove decommissioning replicas. + // + // NB: these two paths will only be hit when the range is over-replicated and + // has decommissioning replicas; in the common case we'll hit + // AllocatorReplaceDecommissioning{Non}Voter above. case AllocatorRemoveDecommissioningVoter: - // NB: this path will only be hit when the range is over-replicated and - // has decommissioning replicas; in the common case we'll hit - // AllocatorReplaceDecommissioningVoter above. - return rq.removeDecommissioning(ctx, repl, dryRun) + return rq.removeDecommissioning(ctx, repl, voterTarget, dryRun) + case AllocatorRemoveDecommissioningNonVoter: + return rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun) + + // Remove dead replicas. + // + // NB: these two paths below will only be hit when the range is + // over-replicated and has dead replicas; in the common case we'll hit + // AllocatorReplaceDead{Non}Voter above. case AllocatorRemoveDeadVoter: - // NB: this path will only be hit when the range is over-replicated and - // has dead replicas; in the common case we'll hit AllocatorReplaceDeadVoter - // above. - return rq.removeDead(ctx, repl, deadVoterReplicas, dryRun) + return rq.removeDead(ctx, repl, deadVoterReplicas, voterTarget, dryRun) + case AllocatorRemoveDeadNonVoter: + return rq.removeDead(ctx, repl, deadNonVoterReplicas, nonVoterTarget, dryRun) + case AllocatorRemoveLearner: return rq.removeLearner(ctx, repl, dryRun) case AllocatorConsiderRebalance: @@ -433,6 +470,18 @@ func (rq *replicateQueue) processOneChange( } } +func getRemoveIdx( + repls []roachpb.ReplicaDescriptor, deadOrDecommissioningRepl roachpb.ReplicaDescriptor, +) (removeIdx int) { + for i, rDesc := range repls { + if rDesc.StoreID == deadOrDecommissioningRepl.StoreID { + removeIdx = i + break + } + } + return removeIdx +} + // addOrReplaceVoters adds or replaces a voting replica. If removeIdx is -1, an // addition is carried out. Otherwise, removeIdx must be a valid index into // existingVoters and specifies which voter to replace with a new one. @@ -444,12 +493,12 @@ func (rq *replicateQueue) processOneChange( func (rq *replicateQueue) addOrReplaceVoters( ctx context.Context, repl *Replica, - existingVoters []roachpb.ReplicaDescriptor, - liveVoterReplicas []roachpb.ReplicaDescriptor, - liveNonVoterReplicas []roachpb.ReplicaDescriptor, - removeIdx int, // -1 for no removal + liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, + removeIdx int, dryRun bool, ) (requeue bool, _ error) { + desc, zone := repl.DescAndZone() + existingVoters := desc.Replicas().VoterDescriptors() if len(existingVoters) == 1 { // If only one replica remains, that replica is the leaseholder and // we won't be able to swap it out. Ignore the removal and simply add @@ -480,7 +529,6 @@ func (rq *replicateQueue) addOrReplaceVoters( } } - desc, zone := repl.DescAndZone() // The allocator should not try to re-add this replica since there is a reason // we're removing it (i.e. dead or decommissioning). If we left the replica in // the slice, the allocator would not be guaranteed to pick a replica that @@ -492,7 +540,7 @@ func (rq *replicateQueue) addOrReplaceVoters( if removeIdx >= 0 && newStore.StoreID == existingVoters[removeIdx].StoreID { return false, errors.AssertionFailedf("allocator suggested to replace replica on s%d with itself", newStore.StoreID) } - newReplica := roachpb.ReplicationTarget{ + newVoter := roachpb.ReplicationTarget{ NodeID: newStore.Node.NodeID, StoreID: newStore.StoreID, } @@ -534,38 +582,38 @@ func (rq *replicateQueue) addOrReplaceVoters( // Figure out whether we should be promoting an existing non-voting replica to // a voting replica or if we ought to be adding a voter afresh. var ops []roachpb.ReplicationChange - replDesc, found := desc.GetReplicaDescriptor(newReplica.StoreID) + replDesc, found := desc.GetReplicaDescriptor(newVoter.StoreID) if found { if replDesc.GetType() != roachpb.NON_VOTER { return false, errors.AssertionFailedf("allocation target %s for a voter"+ - " already has an unexpected replica: %s", newReplica, replDesc) + " already has an unexpected replica: %s", newVoter, replDesc) } // If the allocation target has a non-voter already, we will promote it to a // voter. rq.metrics.NonVoterPromotionsCount.Inc(1) - ops = roachpb.ReplicationChangesForPromotion(newReplica) + ops = roachpb.ReplicationChangesForPromotion(newVoter) } else { - ops = roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newReplica) + ops = roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newVoter) } if removeIdx < 0 { - log.VEventf(ctx, 1, "adding replica %+v: %s", - newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) + log.VEventf(ctx, 1, "adding voter %+v: %s", + newVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) } else { rq.metrics.RemoveReplicaCount.Inc(1) - removeReplica := existingVoters[removeIdx] - log.VEventf(ctx, 1, "replacing replica %s with %+v: %s", - removeReplica, newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) + removeVoter := existingVoters[removeIdx] + log.VEventf(ctx, 1, "replacing voter %s with %+v: %s", + removeVoter, newVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) // NB: We may have performed a promotion of a non-voter above, but we will // not perform a demotion here and instead just remove the existing replica - // entirely. This is because we know that the `removeReplica` is either dead - // or decommissioning (see `Allocator.computeAction`) . This means that - // after this allocation is executed, we could be one non-voter short. This - // will be handled by the replicateQueue's next attempt at this range. + // entirely. This is because we know that the `removeVoter` is either dead + // or decommissioning (see `Allocator.computeAction`). This means that after + // this allocation is executed, we could be one non-voter short. This will + // be handled by the replicateQueue's next attempt at this range. ops = append(ops, roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, roachpb.ReplicationTarget{ - StoreID: removeReplica.StoreID, - NodeID: removeReplica.NodeID, + StoreID: removeVoter.StoreID, + NodeID: removeVoter.NodeID, })...) } @@ -585,11 +633,12 @@ func (rq *replicateQueue) addOrReplaceVoters( return true, nil } -// addNonVoter adds a non-voting replica to `repl`s range. -func (rq *replicateQueue) addNonVoter( +// addOrReplaceNonVoters adds a non-voting replica to `repl`s range. +func (rq *replicateQueue) addOrReplaceNonVoters( ctx context.Context, repl *Replica, - existingReplicas, liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, + liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, + removeIdx int, dryRun bool, ) (requeue bool, _ error) { // Non-voter creation is disabled before 21.1. @@ -598,17 +647,34 @@ func (rq *replicateQueue) addNonVoter( } desc, zone := repl.DescAndZone() + existingNonVoters := desc.Replicas().NonVoterDescriptors() newStore, details, err := rq.allocator.AllocateNonVoter(ctx, zone, liveVoterReplicas, liveNonVoterReplicas) if err != nil { return false, err } + rq.metrics.AddReplicaCount.Inc(1) + newNonVoter := roachpb.ReplicationTarget{ NodeID: newStore.Node.NodeID, StoreID: newStore.StoreID, } - ops := roachpb.MakeReplicationChanges(roachpb.ADD_NON_VOTER, newNonVoter) + if removeIdx < 0 { + log.VEventf(ctx, 1, "adding non-voter %+v: %s", + newNonVoter, rangeRaftProgress(repl.RaftStatus(), existingNonVoters)) + } else { + rq.metrics.RemoveReplicaCount.Inc(1) + removeNonVoter := existingNonVoters[removeIdx] + log.VEventf(ctx, 1, "replacing non-voter %s with %+v: %s", + removeNonVoter, newNonVoter, rangeRaftProgress(repl.RaftStatus(), existingNonVoters)) + ops = append(ops, + roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, roachpb.ReplicationTarget{ + StoreID: removeNonVoter.StoreID, + NodeID: removeNonVoter.NodeID, + })...) + } + if err := rq.changeReplicas( ctx, repl, @@ -840,16 +906,33 @@ func (rq *replicateQueue) removeNonVoter( } func (rq *replicateQueue) removeDecommissioning( - ctx context.Context, repl *Replica, dryRun bool, + ctx context.Context, repl *Replica, targetType targetReplicaType, dryRun bool, ) (requeue bool, _ error) { desc, _ := repl.DescAndZone() - decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas(desc.Replicas().Descriptors()) + var decommissioningReplicas []roachpb.ReplicaDescriptor + var removeOp roachpb.ReplicaChangeType + switch targetType { + case voterTarget: + decommissioningReplicas = rq.allocator.storePool.decommissioningReplicas( + desc.Replicas().VoterDescriptors(), + ) + removeOp = roachpb.REMOVE_VOTER + case nonVoterTarget: + decommissioningReplicas = rq.allocator.storePool.decommissioningReplicas( + desc.Replicas().NonVoterDescriptors(), + ) + removeOp = roachpb.REMOVE_NON_VOTER + default: + panic(fmt.Sprintf("unknown targetReplicaType: %s", targetType)) + } + if len(decommissioningReplicas) == 0 { - log.VEventf(ctx, 1, "range of replica %s was identified as having decommissioning replicas, "+ - "but no decommissioning replicas were found", repl) + log.VEventf(ctx, 1, "range of %[1]ss %[2]s was identified as having decommissioning %[1]ss, "+ + "but no decommissioning %[1]ss were found", targetType, repl) return true, nil } decommissioningReplica := decommissioningReplicas[0] + done, err := rq.maybeTransferLeaseAway( ctx, repl, decommissioningReplica.StoreID, dryRun, nil /* canTransferLease */) if err != nil { @@ -859,9 +942,10 @@ func (rq *replicateQueue) removeDecommissioning( // Not leaseholder any more. return false, nil } + // Remove the decommissioning replica. rq.metrics.RemoveReplicaCount.Inc(1) - log.VEventf(ctx, 1, "removing decommissioning replica %+v from store", decommissioningReplica) + log.VEventf(ctx, 1, "removing decommissioning %s %+v from store", targetType, decommissioningReplica) target := roachpb.ReplicationTarget{ NodeID: decommissioningReplica.NodeID, StoreID: decommissioningReplica.StoreID, @@ -869,7 +953,7 @@ func (rq *replicateQueue) removeDecommissioning( if err := rq.changeReplicas( ctx, repl, - roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, target), + roachpb.MakeReplicationChanges(removeOp, target), desc, SnapshotRequest_UNKNOWN, // unused kvserverpb.ReasonStoreDecommissioning, "", dryRun, @@ -881,27 +965,48 @@ func (rq *replicateQueue) removeDecommissioning( } func (rq *replicateQueue) removeDead( - ctx context.Context, repl *Replica, deadVoterReplicas []roachpb.ReplicaDescriptor, dryRun bool, + ctx context.Context, + repl *Replica, + deadReplicas []roachpb.ReplicaDescriptor, + targetType targetReplicaType, + dryRun bool, ) (requeue bool, _ error) { desc := repl.Desc() - if len(deadVoterReplicas) == 0 { - log.VEventf(ctx, 1, "range of replica %s was identified as having dead replicas, but no dead replicas were found", repl) + if len(deadReplicas) == 0 { + log.VEventf( + ctx, + 1, + "range of %[1]s %[2]s was identified as having dead %[1]ss, but no dead %[1]ss were found", + targetType, + repl, + ) return true, nil } - deadReplica := deadVoterReplicas[0] + deadReplica := deadReplicas[0] rq.metrics.RemoveDeadReplicaCount.Inc(1) - log.VEventf(ctx, 1, "removing dead replica %+v from store", deadReplica) + log.VEventf(ctx, 1, "removing dead %s %+v from store", targetType, deadReplica) target := roachpb.ReplicationTarget{ NodeID: deadReplica.NodeID, StoreID: deadReplica.StoreID, } - // NB: we don't check whether to transfer the lease away because if the removal target - // is dead, it's not us (and if for some reason that happens, the removal is simply - // going to fail). + var removeOp roachpb.ReplicaChangeType + switch targetType { + case voterTarget: + removeOp = roachpb.REMOVE_VOTER + case nonVoterTarget: + removeOp = roachpb.REMOVE_NON_VOTER + default: + panic(fmt.Sprintf("unknown targetReplicaType: %s", targetType)) + } + + // NB: When removing a dead voter, we don't check whether to transfer the + // lease away because if the removal target is dead, it's not the voter being + // removed (and if for some reason that happens, the removal is simply going + // to fail). if err := rq.changeReplicas( ctx, repl, - roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, target), + roachpb.MakeReplicationChanges(removeOp, target), desc, SnapshotRequest_UNKNOWN, // unused kvserverpb.ReasonStoreDead, diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index cc358acf0f24..bc1df633ac2b 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -19,6 +19,7 @@ import ( "math/rand" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -28,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -368,6 +370,282 @@ func TestReplicateQueueUpAndDownReplicateNonVoters(t *testing.T) { }) } +func checkReplicaCount( + ctx context.Context, + tc *testcluster.TestCluster, + rangeDesc roachpb.RangeDescriptor, + voterCount, nonVoterCount int, +) (bool, error) { + err := forceScanOnAllReplicationQueues(tc) + if err != nil { + log.Infof(ctx, "store.ForceReplicationScanAndProcess() failed with: %s", err) + return false, err + } + rangeDesc, err = tc.LookupRange(rangeDesc.StartKey.AsRawKey()) + if err != nil { + return false, err + } + if len(rangeDesc.Replicas().VoterDescriptors()) != voterCount { + return false, nil + } + if len(rangeDesc.Replicas().NonVoterDescriptors()) != nonVoterCount { + return false, nil + } + return true, nil +} + +// TestReplicateQueueDecommissioningNonVoters is an end-to-end test ensuring +// that the replicateQueue will replace or remove non-voter(s) on +// decommissioning nodes. +func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Setup a scratch range on a test cluster with 2 non-voters and 1 voter. + setupFn := func(t *testing.T) (*testcluster.TestCluster, roachpb.RangeDescriptor) { + tc := testcluster.StartTestCluster(t, 5, + base.TestClusterArgs{ReplicationMode: base.ReplicationAuto}, + ) + + scratchKey := tc.ScratchRange(t) + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + _, err := tc.ServerConn(0).Exec( + `ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 3, num_voters = 1`, + ) + require.NoError(t, err) + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + return ok + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + return tc, scratchRange + } + + // Check that non-voters on decommissioning nodes are replaced by + // upreplicating elsewhere. This test is supposed to tickle the + // `AllocatorReplaceDecommissioningNonVoter` code path. + t.Run("replace", func(t *testing.T) { + tc, scratchRange := setupFn(t) + defer tc.Stopper().Stop(ctx) + // Do a fresh look up on the range descriptor. + scratchRange = tc.LookupRangeOrFatal(t, scratchRange.StartKey.AsRawKey()) + beforeNodeIDs := getNonVoterNodeIDs(scratchRange) + // Decommission each of the two nodes that have the non-voters and make sure + // that those non-voters are upreplicated elsewhere. + require.NoError(t, + tc.Server(0).Decommission(ctx, livenesspb.MembershipStatus_DECOMMISSIONING, beforeNodeIDs)) + + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + if !ok { + return false + } + // Ensure that the non-voters have actually been removed from the dead + // nodes and moved to others. + scratchRange = tc.LookupRangeOrFatal(t, scratchRange.StartKey.AsRawKey()) + afterNodeIDs := getNonVoterNodeIDs(scratchRange) + for _, before := range beforeNodeIDs { + for _, after := range afterNodeIDs { + if after == before { + return false + } + } + } + return true + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + }) + + // Check that when we have more non-voters than needed and some of those + // non-voters are on decommissioning nodes, that we simply remove those + // non-voters. This test is supposed to tickle the + // `AllocatorRemoveDecommissioningNonVoter` code path. + t.Run("remove", func(t *testing.T) { + tc, scratchRange := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Turn off the replicateQueue and update the zone configs to remove all + // non-voters. At the same time, also mark all the nodes that have + // non-voters as decommissioning. + tc.ToggleReplicateQueues(false) + _, err := tc.ServerConn(0).Exec( + `ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 1`, + ) + require.NoError(t, err) + + // Do a fresh look up on the range descriptor. + scratchRange = tc.LookupRangeOrFatal(t, scratchRange.StartKey.AsRawKey()) + var nonVoterNodeIDs []roachpb.NodeID + for _, repl := range scratchRange.Replicas().NonVoterDescriptors() { + nonVoterNodeIDs = append(nonVoterNodeIDs, repl.NodeID) + } + require.NoError(t, + tc.Server(0).Decommission(ctx, livenesspb.MembershipStatus_DECOMMISSIONING, nonVoterNodeIDs)) + + // At this point, we know that we have an over-replicated range with + // non-voters on nodes that are marked as decommissioning. So turn the + // replicateQueue on and ensure that these redundant non-voters are removed. + tc.ToggleReplicateQueues(true) + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 0 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + return ok + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + }) +} + +// TestReplicateQueueDeadNonVoters is an end to end test ensuring that +// non-voting replicas on dead nodes are replaced or removed. +func TestReplicateQueueDeadNonVoters(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + var livenessTrap atomic.Value + setupFn := func(t *testing.T) (*testcluster.TestCluster, roachpb.RangeDescriptor) { + tc := testcluster.StartTestCluster(t, 5, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + NodeLiveness: kvserver.NodeLivenessTestingKnobs{ + StorePoolNodeLivenessFn: func( + id roachpb.NodeID, now time.Time, duration time.Duration, + ) livenesspb.NodeLivenessStatus { + val := livenessTrap.Load() + if val == nil { + return livenesspb.NodeLivenessStatus_LIVE + } + return val.(func(nodeID roachpb.NodeID) livenesspb.NodeLivenessStatus)(id) + }, + }, + }, + }, + }, + ) + // Setup a scratch range on a test cluster with 2 non-voters and 1 voter. + scratchKey := tc.ScratchRange(t) + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + _, err := tc.ServerConn(0).Exec( + `ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 3, num_voters = 1`, + ) + require.NoError(t, err) + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + return ok + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + return tc, scratchRange + } + + markDead := func(nodeIDs []roachpb.NodeID) { + livenessTrap.Store(func(id roachpb.NodeID) livenesspb.NodeLivenessStatus { + for _, dead := range nodeIDs { + if dead == id { + return livenesspb.NodeLivenessStatus_DEAD + } + } + return livenesspb.NodeLivenessStatus_LIVE + }) + } + + // This subtest checks that non-voters on dead nodes are replaced by + // upreplicating elsewhere. This test is supposed to tickle the + // `AllocatorReplaceDeadNonVoter` code path. It does the following: + // + // 1. On a 5 node cluster, instantiate a range with 1 voter and 2 non-voters. + // 2. Kill the 2 nodes that have the non-voters. + // 3. Check that those non-voters are replaced. + t.Run("replace", func(t *testing.T) { + tc, scratchRange := setupFn(t) + defer tc.Stopper().Stop(ctx) + + beforeNodeIDs := getNonVoterNodeIDs(scratchRange) + markDead(beforeNodeIDs) + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + if !ok { + return false + } + // Ensure that the non-voters have actually been removed from the dead + // nodes and moved to others. + scratchRange = tc.LookupRangeOrFatal(t, scratchRange.StartKey.AsRawKey()) + afterNodeIDs := getNonVoterNodeIDs(scratchRange) + for _, before := range beforeNodeIDs { + for _, after := range afterNodeIDs { + if after == before { + return false + } + } + } + return true + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + }) + + // This subtest checks that when we have more non-voters than needed and some + // existing non-voters are on dead nodes, we will simply remove these + // non-voters. This test is supposed to tickle the + // AllocatorRemoveDeadNonVoter` code path. The test does the following: + // + // 1. Instantiate a range with 1 voter and 2 non-voters on a 5-node cluster. + // 2. Turn off the replicateQueue + // 3. Change the zone configs such that there should be no non-voters -- + // the two existing non-voters should now be considered "over-replicated" + // by the system. + // 4. Kill the nodes that have non-voters. + // 5. Turn on the replicateQueue + // 6. Make sure that the non-voters are downreplicated from the dead nodes. + t.Run("remove", func(t *testing.T) { + tc, scratchRange := setupFn(t) + defer tc.Stopper().Stop(ctx) + + toggleReplicationQueues(tc, false) + _, err := tc.ServerConn(0).Exec( + // Remove all non-voters. + "ALTER RANGE default CONFIGURE ZONE USING num_replicas = 1", + ) + require.NoError(t, err) + beforeNodeIDs := getNonVoterNodeIDs(scratchRange) + markDead(beforeNodeIDs) + + toggleReplicationQueues(tc, true) + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 0 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + return ok + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + }) +} + +func getNonVoterNodeIDs(rangeDesc roachpb.RangeDescriptor) (result []roachpb.NodeID) { + for _, repl := range rangeDesc.Replicas().NonVoterDescriptors() { + result = append(result, repl.NodeID) + } + return result +} + // TestReplicateQueueSwapVoterWithNonVoters tests that voting replicas can // rebalance to stores that already have a non-voter by "swapping" with them. // "Swapping" in this context means simply changing the `ReplicaType` on the diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 40b45a6a7315..1fca168fa22f 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -274,6 +274,9 @@ type NodeLivenessTestingKnobs struct { // RenewalDuration specifies how long before the expiration a record is // heartbeated. If LivenessDuration is set, this should probably be set too. RenewalDuration time.Duration + // StorePoolNodeLivenessFn is the function used by the StorePool to determine + // whether a node is live or not. + StorePoolNodeLivenessFn NodeLivenessFunc } var _ base.ModuleTestingKnobs = NodeLivenessTestingKnobs{} diff --git a/pkg/server/server.go b/pkg/server/server.go index 223d24ea4557..2a7f65b65de4 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -461,13 +461,18 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { }) registry.AddMetricStruct(nodeLiveness.Metrics()) + nodeLivenessFn := kvserver.MakeStorePoolNodeLivenessFunc(nodeLiveness) + if nodeLivenessKnobs, ok := cfg.TestingKnobs.Store.(*kvserver.NodeLivenessTestingKnobs); ok && + nodeLivenessKnobs.StorePoolNodeLivenessFn != nil { + nodeLivenessFn = nodeLivenessKnobs.StorePoolNodeLivenessFn + } storePool := kvserver.NewStorePool( cfg.AmbientCtx, st, g, clock, nodeLiveness.GetNodeCount, - kvserver.MakeStorePoolNodeLivenessFunc(nodeLiveness), + nodeLivenessFn, /* deterministic */ false, )