diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index d1d884801283..45df126e456d 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -618,8 +618,8 @@ func (a *Allocator) computeAction( // The range is over-replicated _and_ has non-voter(s) on a dead node. We'll // just remove these. action = AllocatorRemoveDeadNonVoter - log.VEventf(ctx, 3, "%s - dead=%d, live=%d, quorum=%d, priority=%.2f", - action, len(deadNonVoters), len(liveNonVoters), quorum, action.Priority()) + log.VEventf(ctx, 3, "%s - dead=%d, live=%d, priority=%.2f", + action, len(deadNonVoters), len(liveNonVoters), action.Priority()) return action, action.Priority() } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 0d41747a226d..a55c8528377d 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -334,6 +334,13 @@ func (rq *replicateQueue) processOneChange( return false, pErr.GoError() } + // TODO(aayush): The fact that we're calling `repl.DescAndZone()` here once to + // pass to `ComputeAction()` to use for deciding which action to take to + // repair a range, and then calling it again inside methods like + // `addOrReplace{Non}Voters()` or `remove{Dead,Decommissioning}` to execute + // upon that decision is a bit unfortunate. It means that we could + // successfully execute a decision that was based on the state of a stale + // range descriptor. desc, zone := repl.DescAndZone() // Avoid taking action if the range has too many dead replicas to make @@ -341,7 +348,7 @@ func (rq *replicateQueue) processOneChange( voterReplicas := desc.Replicas().VoterDescriptors() nonVoterReplicas := desc.Replicas().NonVoterDescriptors() liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(voterReplicas) - liveNonVoterReplicas, _ := rq.allocator.storePool.liveAndDeadReplicas(nonVoterReplicas) + liveNonVoterReplicas, deadNonVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(nonVoterReplicas) // NB: the replication layer ensures that the below operations don't cause // unavailability; see: @@ -363,62 +370,92 @@ func (rq *replicateQueue) processOneChange( // lost quorum. Either way, it's not a good idea to make changes right now. // Let the scanner requeue it again later. return false, nil + + // Add replicas. case AllocatorAddVoter: - return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, - -1 /* removeIdx */, dryRun) + return rq.addOrReplaceVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, dryRun) case AllocatorAddNonVoter: - return rq.addNonVoter(ctx, repl, nonVoterReplicas, liveVoterReplicas, liveNonVoterReplicas, dryRun) + return rq.addOrReplaceNonVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, dryRun) + + // Remove replicas. case AllocatorRemoveVoter: return rq.removeVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) case AllocatorRemoveNonVoter: return rq.removeNonVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) + + // Replace dead replicas. case AllocatorReplaceDeadVoter: if len(deadVoterReplicas) == 0 { // Nothing to do. return false, nil } - removeIdx := -1 // guaranteed to be changed below - for i, rDesc := range voterReplicas { - if rDesc.StoreID == deadVoterReplicas[0].StoreID { - removeIdx = i - break - } - } + removeIdx := getRemoveIdx(voterReplicas, deadVoterReplicas[0]) if removeIdx < 0 { return false, errors.AssertionFailedf( "dead voter %v unexpectedly not found in %v", deadVoterReplicas[0], voterReplicas) } - return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) - case AllocatorReplaceDecommissioningVoter: - decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas(voterReplicas) - if len(decommissioningReplicas) == 0 { + return rq.addOrReplaceVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + case AllocatorReplaceDeadNonVoter: + if len(deadNonVoterReplicas) == 0 { // Nothing to do. return false, nil } - removeIdx := -1 // guaranteed to be changed below - for i, rDesc := range voterReplicas { - if rDesc.StoreID == decommissioningReplicas[0].StoreID { - removeIdx = i - break - } + removeIdx := getRemoveIdx(nonVoterReplicas, deadNonVoterReplicas[0]) + if removeIdx < 0 { + return false, errors.AssertionFailedf( + "dead non-voter %v unexpectedly not found in %v", + deadNonVoterReplicas[0], nonVoterReplicas) + } + return rq.addOrReplaceNonVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + + // Replace decommissioning replicas. + case AllocatorReplaceDecommissioningVoter: + decommissioningVoterReplicas := rq.allocator.storePool.decommissioningReplicas(voterReplicas) + if len(decommissioningVoterReplicas) == 0 { + // Nothing to do. + return false, nil } + removeIdx := getRemoveIdx(voterReplicas, decommissioningVoterReplicas[0]) if removeIdx < 0 { return false, errors.AssertionFailedf( "decommissioning voter %v unexpectedly not found in %v", - decommissioningReplicas[0], voterReplicas) + decommissioningVoterReplicas[0], voterReplicas) } - return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + return rq.addOrReplaceVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + case AllocatorReplaceDecommissioningNonVoter: + decommissioningNonVoterReplicas := rq.allocator.storePool.decommissioningReplicas(nonVoterReplicas) + if len(decommissioningNonVoterReplicas) == 0 { + return false, nil + } + removeIdx := getRemoveIdx(nonVoterReplicas, decommissioningNonVoterReplicas[0]) + if removeIdx < 0 { + return false, errors.AssertionFailedf( + "decommissioning non-voter %v unexpectedly not found in %v", + decommissioningNonVoterReplicas[0], nonVoterReplicas) + } + return rq.addOrReplaceNonVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + + // Remove decommissioning replicas. + // + // NB: these two paths will only be hit when the range is over-replicated and + // has decommissioning replicas; in the common case we'll hit + // AllocatorReplaceDecommissioning{Non}Voter above. case AllocatorRemoveDecommissioningVoter: - // NB: this path will only be hit when the range is over-replicated and - // has decommissioning replicas; in the common case we'll hit - // AllocatorReplaceDecommissioningVoter above. - return rq.removeDecommissioning(ctx, repl, dryRun) + return rq.removeDecommissioning(ctx, repl, voterTarget, dryRun) + case AllocatorRemoveDecommissioningNonVoter: + return rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun) + + // Remove dead replicas. + // + // NB: these two paths below will only be hit when the range is + // over-replicated and has dead replicas; in the common case we'll hit + // AllocatorReplaceDead{Non}Voter above. case AllocatorRemoveDeadVoter: - // NB: this path will only be hit when the range is over-replicated and - // has dead replicas; in the common case we'll hit AllocatorReplaceDeadVoter - // above. - return rq.removeDead(ctx, repl, deadVoterReplicas, dryRun) + return rq.removeDead(ctx, repl, deadVoterReplicas, voterTarget, dryRun) + case AllocatorRemoveDeadNonVoter: + return rq.removeDead(ctx, repl, deadNonVoterReplicas, nonVoterTarget, dryRun) + case AllocatorRemoveLearner: return rq.removeLearner(ctx, repl, dryRun) case AllocatorConsiderRebalance: @@ -433,6 +470,18 @@ func (rq *replicateQueue) processOneChange( } } +func getRemoveIdx( + repls []roachpb.ReplicaDescriptor, deadOrDecommissioningRepl roachpb.ReplicaDescriptor, +) (removeIdx int) { + for i, rDesc := range repls { + if rDesc.StoreID == deadOrDecommissioningRepl.StoreID { + removeIdx = i + break + } + } + return removeIdx +} + // addOrReplaceVoters adds or replaces a voting replica. If removeIdx is -1, an // addition is carried out. Otherwise, removeIdx must be a valid index into // existingVoters and specifies which voter to replace with a new one. @@ -444,12 +493,12 @@ func (rq *replicateQueue) processOneChange( func (rq *replicateQueue) addOrReplaceVoters( ctx context.Context, repl *Replica, - existingVoters []roachpb.ReplicaDescriptor, - liveVoterReplicas []roachpb.ReplicaDescriptor, - liveNonVoterReplicas []roachpb.ReplicaDescriptor, - removeIdx int, // -1 for no removal + liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, + removeIdx int, dryRun bool, ) (requeue bool, _ error) { + desc, zone := repl.DescAndZone() + existingVoters := desc.Replicas().VoterDescriptors() if len(existingVoters) == 1 { // If only one replica remains, that replica is the leaseholder and // we won't be able to swap it out. Ignore the removal and simply add @@ -480,7 +529,6 @@ func (rq *replicateQueue) addOrReplaceVoters( } } - desc, zone := repl.DescAndZone() // The allocator should not try to re-add this replica since there is a reason // we're removing it (i.e. dead or decommissioning). If we left the replica in // the slice, the allocator would not be guaranteed to pick a replica that @@ -492,7 +540,7 @@ func (rq *replicateQueue) addOrReplaceVoters( if removeIdx >= 0 && newStore.StoreID == existingVoters[removeIdx].StoreID { return false, errors.AssertionFailedf("allocator suggested to replace replica on s%d with itself", newStore.StoreID) } - newReplica := roachpb.ReplicationTarget{ + newVoter := roachpb.ReplicationTarget{ NodeID: newStore.Node.NodeID, StoreID: newStore.StoreID, } @@ -534,38 +582,38 @@ func (rq *replicateQueue) addOrReplaceVoters( // Figure out whether we should be promoting an existing non-voting replica to // a voting replica or if we ought to be adding a voter afresh. var ops []roachpb.ReplicationChange - replDesc, found := desc.GetReplicaDescriptor(newReplica.StoreID) + replDesc, found := desc.GetReplicaDescriptor(newVoter.StoreID) if found { if replDesc.GetType() != roachpb.NON_VOTER { return false, errors.AssertionFailedf("allocation target %s for a voter"+ - " already has an unexpected replica: %s", newReplica, replDesc) + " already has an unexpected replica: %s", newVoter, replDesc) } // If the allocation target has a non-voter already, we will promote it to a // voter. rq.metrics.NonVoterPromotionsCount.Inc(1) - ops = roachpb.ReplicationChangesForPromotion(newReplica) + ops = roachpb.ReplicationChangesForPromotion(newVoter) } else { - ops = roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newReplica) + ops = roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newVoter) } if removeIdx < 0 { - log.VEventf(ctx, 1, "adding replica %+v: %s", - newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) + log.VEventf(ctx, 1, "adding voter %+v: %s", + newVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) } else { rq.metrics.RemoveReplicaCount.Inc(1) - removeReplica := existingVoters[removeIdx] - log.VEventf(ctx, 1, "replacing replica %s with %+v: %s", - removeReplica, newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) + removeVoter := existingVoters[removeIdx] + log.VEventf(ctx, 1, "replacing voter %s with %+v: %s", + removeVoter, newVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) // NB: We may have performed a promotion of a non-voter above, but we will // not perform a demotion here and instead just remove the existing replica - // entirely. This is because we know that the `removeReplica` is either dead - // or decommissioning (see `Allocator.computeAction`) . This means that - // after this allocation is executed, we could be one non-voter short. This - // will be handled by the replicateQueue's next attempt at this range. + // entirely. This is because we know that the `removeVoter` is either dead + // or decommissioning (see `Allocator.computeAction`). This means that after + // this allocation is executed, we could be one non-voter short. This will + // be handled by the replicateQueue's next attempt at this range. ops = append(ops, roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, roachpb.ReplicationTarget{ - StoreID: removeReplica.StoreID, - NodeID: removeReplica.NodeID, + StoreID: removeVoter.StoreID, + NodeID: removeVoter.NodeID, })...) } @@ -585,11 +633,12 @@ func (rq *replicateQueue) addOrReplaceVoters( return true, nil } -// addNonVoter adds a non-voting replica to `repl`s range. -func (rq *replicateQueue) addNonVoter( +// addOrReplaceNonVoters adds a non-voting replica to `repl`s range. +func (rq *replicateQueue) addOrReplaceNonVoters( ctx context.Context, repl *Replica, - existingReplicas, liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, + liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, + removeIdx int, dryRun bool, ) (requeue bool, _ error) { // Non-voter creation is disabled before 21.1. @@ -598,17 +647,34 @@ func (rq *replicateQueue) addNonVoter( } desc, zone := repl.DescAndZone() + existingNonVoters := desc.Replicas().NonVoterDescriptors() newStore, details, err := rq.allocator.AllocateNonVoter(ctx, zone, liveVoterReplicas, liveNonVoterReplicas) if err != nil { return false, err } + rq.metrics.AddReplicaCount.Inc(1) + newNonVoter := roachpb.ReplicationTarget{ NodeID: newStore.Node.NodeID, StoreID: newStore.StoreID, } - ops := roachpb.MakeReplicationChanges(roachpb.ADD_NON_VOTER, newNonVoter) + if removeIdx < 0 { + log.VEventf(ctx, 1, "adding non-voter %+v: %s", + newNonVoter, rangeRaftProgress(repl.RaftStatus(), existingNonVoters)) + } else { + rq.metrics.RemoveReplicaCount.Inc(1) + removeNonVoter := existingNonVoters[removeIdx] + log.VEventf(ctx, 1, "replacing non-voter %s with %+v: %s", + removeNonVoter, newNonVoter, rangeRaftProgress(repl.RaftStatus(), existingNonVoters)) + ops = append(ops, + roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, roachpb.ReplicationTarget{ + StoreID: removeNonVoter.StoreID, + NodeID: removeNonVoter.NodeID, + })...) + } + if err := rq.changeReplicas( ctx, repl, @@ -840,16 +906,33 @@ func (rq *replicateQueue) removeNonVoter( } func (rq *replicateQueue) removeDecommissioning( - ctx context.Context, repl *Replica, dryRun bool, + ctx context.Context, repl *Replica, targetType targetReplicaType, dryRun bool, ) (requeue bool, _ error) { desc, _ := repl.DescAndZone() - decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas(desc.Replicas().Descriptors()) + var decommissioningReplicas []roachpb.ReplicaDescriptor + var removeOp roachpb.ReplicaChangeType + switch targetType { + case voterTarget: + decommissioningReplicas = rq.allocator.storePool.decommissioningReplicas( + desc.Replicas().VoterDescriptors(), + ) + removeOp = roachpb.REMOVE_VOTER + case nonVoterTarget: + decommissioningReplicas = rq.allocator.storePool.decommissioningReplicas( + desc.Replicas().NonVoterDescriptors(), + ) + removeOp = roachpb.REMOVE_NON_VOTER + default: + panic(fmt.Sprintf("unknown targetReplicaType: %s", targetType)) + } + if len(decommissioningReplicas) == 0 { - log.VEventf(ctx, 1, "range of replica %s was identified as having decommissioning replicas, "+ - "but no decommissioning replicas were found", repl) + log.VEventf(ctx, 1, "range of %[1]ss %[2]s was identified as having decommissioning %[1]ss, "+ + "but no decommissioning %[1]ss were found", targetType, repl) return true, nil } decommissioningReplica := decommissioningReplicas[0] + done, err := rq.maybeTransferLeaseAway( ctx, repl, decommissioningReplica.StoreID, dryRun, nil /* canTransferLease */) if err != nil { @@ -859,9 +942,10 @@ func (rq *replicateQueue) removeDecommissioning( // Not leaseholder any more. return false, nil } + // Remove the decommissioning replica. rq.metrics.RemoveReplicaCount.Inc(1) - log.VEventf(ctx, 1, "removing decommissioning replica %+v from store", decommissioningReplica) + log.VEventf(ctx, 1, "removing decommissioning %s %+v from store", targetType, decommissioningReplica) target := roachpb.ReplicationTarget{ NodeID: decommissioningReplica.NodeID, StoreID: decommissioningReplica.StoreID, @@ -869,7 +953,7 @@ func (rq *replicateQueue) removeDecommissioning( if err := rq.changeReplicas( ctx, repl, - roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, target), + roachpb.MakeReplicationChanges(removeOp, target), desc, SnapshotRequest_UNKNOWN, // unused kvserverpb.ReasonStoreDecommissioning, "", dryRun, @@ -881,27 +965,48 @@ func (rq *replicateQueue) removeDecommissioning( } func (rq *replicateQueue) removeDead( - ctx context.Context, repl *Replica, deadVoterReplicas []roachpb.ReplicaDescriptor, dryRun bool, + ctx context.Context, + repl *Replica, + deadReplicas []roachpb.ReplicaDescriptor, + targetType targetReplicaType, + dryRun bool, ) (requeue bool, _ error) { desc := repl.Desc() - if len(deadVoterReplicas) == 0 { - log.VEventf(ctx, 1, "range of replica %s was identified as having dead replicas, but no dead replicas were found", repl) + if len(deadReplicas) == 0 { + log.VEventf( + ctx, + 1, + "range of %[1]s %[2]s was identified as having dead %[1]ss, but no dead %[1]ss were found", + targetType, + repl, + ) return true, nil } - deadReplica := deadVoterReplicas[0] + deadReplica := deadReplicas[0] rq.metrics.RemoveDeadReplicaCount.Inc(1) - log.VEventf(ctx, 1, "removing dead replica %+v from store", deadReplica) + log.VEventf(ctx, 1, "removing dead %s %+v from store", targetType, deadReplica) target := roachpb.ReplicationTarget{ NodeID: deadReplica.NodeID, StoreID: deadReplica.StoreID, } - // NB: we don't check whether to transfer the lease away because if the removal target - // is dead, it's not us (and if for some reason that happens, the removal is simply - // going to fail). + var removeOp roachpb.ReplicaChangeType + switch targetType { + case voterTarget: + removeOp = roachpb.REMOVE_VOTER + case nonVoterTarget: + removeOp = roachpb.REMOVE_NON_VOTER + default: + panic(fmt.Sprintf("unknown targetReplicaType: %s", targetType)) + } + + // NB: When removing a dead voter, we don't check whether to transfer the + // lease away because if the removal target is dead, it's not the voter being + // removed (and if for some reason that happens, the removal is simply going + // to fail). if err := rq.changeReplicas( ctx, repl, - roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, target), + roachpb.MakeReplicationChanges(removeOp, target), desc, SnapshotRequest_UNKNOWN, // unused kvserverpb.ReasonStoreDead, diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index cc358acf0f24..bc1df633ac2b 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -19,6 +19,7 @@ import ( "math/rand" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -28,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -368,6 +370,282 @@ func TestReplicateQueueUpAndDownReplicateNonVoters(t *testing.T) { }) } +func checkReplicaCount( + ctx context.Context, + tc *testcluster.TestCluster, + rangeDesc roachpb.RangeDescriptor, + voterCount, nonVoterCount int, +) (bool, error) { + err := forceScanOnAllReplicationQueues(tc) + if err != nil { + log.Infof(ctx, "store.ForceReplicationScanAndProcess() failed with: %s", err) + return false, err + } + rangeDesc, err = tc.LookupRange(rangeDesc.StartKey.AsRawKey()) + if err != nil { + return false, err + } + if len(rangeDesc.Replicas().VoterDescriptors()) != voterCount { + return false, nil + } + if len(rangeDesc.Replicas().NonVoterDescriptors()) != nonVoterCount { + return false, nil + } + return true, nil +} + +// TestReplicateQueueDecommissioningNonVoters is an end-to-end test ensuring +// that the replicateQueue will replace or remove non-voter(s) on +// decommissioning nodes. +func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Setup a scratch range on a test cluster with 2 non-voters and 1 voter. + setupFn := func(t *testing.T) (*testcluster.TestCluster, roachpb.RangeDescriptor) { + tc := testcluster.StartTestCluster(t, 5, + base.TestClusterArgs{ReplicationMode: base.ReplicationAuto}, + ) + + scratchKey := tc.ScratchRange(t) + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + _, err := tc.ServerConn(0).Exec( + `ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 3, num_voters = 1`, + ) + require.NoError(t, err) + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + return ok + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + return tc, scratchRange + } + + // Check that non-voters on decommissioning nodes are replaced by + // upreplicating elsewhere. This test is supposed to tickle the + // `AllocatorReplaceDecommissioningNonVoter` code path. + t.Run("replace", func(t *testing.T) { + tc, scratchRange := setupFn(t) + defer tc.Stopper().Stop(ctx) + // Do a fresh look up on the range descriptor. + scratchRange = tc.LookupRangeOrFatal(t, scratchRange.StartKey.AsRawKey()) + beforeNodeIDs := getNonVoterNodeIDs(scratchRange) + // Decommission each of the two nodes that have the non-voters and make sure + // that those non-voters are upreplicated elsewhere. + require.NoError(t, + tc.Server(0).Decommission(ctx, livenesspb.MembershipStatus_DECOMMISSIONING, beforeNodeIDs)) + + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + if !ok { + return false + } + // Ensure that the non-voters have actually been removed from the dead + // nodes and moved to others. + scratchRange = tc.LookupRangeOrFatal(t, scratchRange.StartKey.AsRawKey()) + afterNodeIDs := getNonVoterNodeIDs(scratchRange) + for _, before := range beforeNodeIDs { + for _, after := range afterNodeIDs { + if after == before { + return false + } + } + } + return true + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + }) + + // Check that when we have more non-voters than needed and some of those + // non-voters are on decommissioning nodes, that we simply remove those + // non-voters. This test is supposed to tickle the + // `AllocatorRemoveDecommissioningNonVoter` code path. + t.Run("remove", func(t *testing.T) { + tc, scratchRange := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Turn off the replicateQueue and update the zone configs to remove all + // non-voters. At the same time, also mark all the nodes that have + // non-voters as decommissioning. + tc.ToggleReplicateQueues(false) + _, err := tc.ServerConn(0).Exec( + `ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 1`, + ) + require.NoError(t, err) + + // Do a fresh look up on the range descriptor. + scratchRange = tc.LookupRangeOrFatal(t, scratchRange.StartKey.AsRawKey()) + var nonVoterNodeIDs []roachpb.NodeID + for _, repl := range scratchRange.Replicas().NonVoterDescriptors() { + nonVoterNodeIDs = append(nonVoterNodeIDs, repl.NodeID) + } + require.NoError(t, + tc.Server(0).Decommission(ctx, livenesspb.MembershipStatus_DECOMMISSIONING, nonVoterNodeIDs)) + + // At this point, we know that we have an over-replicated range with + // non-voters on nodes that are marked as decommissioning. So turn the + // replicateQueue on and ensure that these redundant non-voters are removed. + tc.ToggleReplicateQueues(true) + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 0 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + return ok + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + }) +} + +// TestReplicateQueueDeadNonVoters is an end to end test ensuring that +// non-voting replicas on dead nodes are replaced or removed. +func TestReplicateQueueDeadNonVoters(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + var livenessTrap atomic.Value + setupFn := func(t *testing.T) (*testcluster.TestCluster, roachpb.RangeDescriptor) { + tc := testcluster.StartTestCluster(t, 5, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + NodeLiveness: kvserver.NodeLivenessTestingKnobs{ + StorePoolNodeLivenessFn: func( + id roachpb.NodeID, now time.Time, duration time.Duration, + ) livenesspb.NodeLivenessStatus { + val := livenessTrap.Load() + if val == nil { + return livenesspb.NodeLivenessStatus_LIVE + } + return val.(func(nodeID roachpb.NodeID) livenesspb.NodeLivenessStatus)(id) + }, + }, + }, + }, + }, + ) + // Setup a scratch range on a test cluster with 2 non-voters and 1 voter. + scratchKey := tc.ScratchRange(t) + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + _, err := tc.ServerConn(0).Exec( + `ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 3, num_voters = 1`, + ) + require.NoError(t, err) + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + return ok + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + return tc, scratchRange + } + + markDead := func(nodeIDs []roachpb.NodeID) { + livenessTrap.Store(func(id roachpb.NodeID) livenesspb.NodeLivenessStatus { + for _, dead := range nodeIDs { + if dead == id { + return livenesspb.NodeLivenessStatus_DEAD + } + } + return livenesspb.NodeLivenessStatus_LIVE + }) + } + + // This subtest checks that non-voters on dead nodes are replaced by + // upreplicating elsewhere. This test is supposed to tickle the + // `AllocatorReplaceDeadNonVoter` code path. It does the following: + // + // 1. On a 5 node cluster, instantiate a range with 1 voter and 2 non-voters. + // 2. Kill the 2 nodes that have the non-voters. + // 3. Check that those non-voters are replaced. + t.Run("replace", func(t *testing.T) { + tc, scratchRange := setupFn(t) + defer tc.Stopper().Stop(ctx) + + beforeNodeIDs := getNonVoterNodeIDs(scratchRange) + markDead(beforeNodeIDs) + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + if !ok { + return false + } + // Ensure that the non-voters have actually been removed from the dead + // nodes and moved to others. + scratchRange = tc.LookupRangeOrFatal(t, scratchRange.StartKey.AsRawKey()) + afterNodeIDs := getNonVoterNodeIDs(scratchRange) + for _, before := range beforeNodeIDs { + for _, after := range afterNodeIDs { + if after == before { + return false + } + } + } + return true + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + }) + + // This subtest checks that when we have more non-voters than needed and some + // existing non-voters are on dead nodes, we will simply remove these + // non-voters. This test is supposed to tickle the + // AllocatorRemoveDeadNonVoter` code path. The test does the following: + // + // 1. Instantiate a range with 1 voter and 2 non-voters on a 5-node cluster. + // 2. Turn off the replicateQueue + // 3. Change the zone configs such that there should be no non-voters -- + // the two existing non-voters should now be considered "over-replicated" + // by the system. + // 4. Kill the nodes that have non-voters. + // 5. Turn on the replicateQueue + // 6. Make sure that the non-voters are downreplicated from the dead nodes. + t.Run("remove", func(t *testing.T) { + tc, scratchRange := setupFn(t) + defer tc.Stopper().Stop(ctx) + + toggleReplicationQueues(tc, false) + _, err := tc.ServerConn(0).Exec( + // Remove all non-voters. + "ALTER RANGE default CONFIGURE ZONE USING num_replicas = 1", + ) + require.NoError(t, err) + beforeNodeIDs := getNonVoterNodeIDs(scratchRange) + markDead(beforeNodeIDs) + + toggleReplicationQueues(tc, true) + require.Eventually(t, func() bool { + ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 0 /* nonVoterCount */) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + return ok + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + }) +} + +func getNonVoterNodeIDs(rangeDesc roachpb.RangeDescriptor) (result []roachpb.NodeID) { + for _, repl := range rangeDesc.Replicas().NonVoterDescriptors() { + result = append(result, repl.NodeID) + } + return result +} + // TestReplicateQueueSwapVoterWithNonVoters tests that voting replicas can // rebalance to stores that already have a non-voter by "swapping" with them. // "Swapping" in this context means simply changing the `ReplicaType` on the diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 40b45a6a7315..1fca168fa22f 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -274,6 +274,9 @@ type NodeLivenessTestingKnobs struct { // RenewalDuration specifies how long before the expiration a record is // heartbeated. If LivenessDuration is set, this should probably be set too. RenewalDuration time.Duration + // StorePoolNodeLivenessFn is the function used by the StorePool to determine + // whether a node is live or not. + StorePoolNodeLivenessFn NodeLivenessFunc } var _ base.ModuleTestingKnobs = NodeLivenessTestingKnobs{} diff --git a/pkg/server/server.go b/pkg/server/server.go index 223d24ea4557..2a7f65b65de4 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -461,13 +461,18 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { }) registry.AddMetricStruct(nodeLiveness.Metrics()) + nodeLivenessFn := kvserver.MakeStorePoolNodeLivenessFunc(nodeLiveness) + if nodeLivenessKnobs, ok := cfg.TestingKnobs.Store.(*kvserver.NodeLivenessTestingKnobs); ok && + nodeLivenessKnobs.StorePoolNodeLivenessFn != nil { + nodeLivenessFn = nodeLivenessKnobs.StorePoolNodeLivenessFn + } storePool := kvserver.NewStorePool( cfg.AmbientCtx, st, g, clock, nodeLiveness.GetNodeCount, - kvserver.MakeStorePoolNodeLivenessFunc(nodeLiveness), + nodeLivenessFn, /* deterministic */ false, )