From 1512a141b76baecc825d9d11615bb4a4d3f573e7 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Tue, 2 Mar 2021 17:55:22 -0500 Subject: [PATCH 1/2] kvserver, timeutil: fix some Timer user-after-Stops Two guys were continuing to use a Timer after Stop()ing it, which is illegal. Release note: None Release justification: Bug fix. --- pkg/kv/kvserver/closedts/provider/provider.go | 6 ++++-- pkg/kv/kvserver/closedts/sidetransport/sender.go | 5 +++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/closedts/provider/provider.go b/pkg/kv/kvserver/closedts/provider/provider.go index eeee070eec5b..d97476089fe6 100644 --- a/pkg/kv/kvserver/closedts/provider/provider.go +++ b/pkg/kv/kvserver/closedts/provider/provider.go @@ -126,7 +126,7 @@ func (p *Provider) runCloser(ctx context.Context) { // Track whether we've ever been live to avoid logging warnings about not // being live during node startup. var everBeenLive bool - var t timeutil.Timer + t := timeutil.NewTimer() defer t.Stop() for { closeFraction := closedts.CloseFraction.Get(&p.cfg.Settings.SV) @@ -134,7 +134,9 @@ func (p *Provider) runCloser(ctx context.Context) { if targetDuration > 0 { t.Reset(time.Duration(closeFraction * targetDuration)) } else { - t.Stop() // disable closing when the target duration is non-positive + // Disable closing when the target duration is non-positive. + t.Stop() + t = timeutil.NewTimer() } select { case <-p.cfg.Stopper.ShouldQuiesce(): diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender.go b/pkg/kv/kvserver/closedts/sidetransport/sender.go index e9a97c2c2e6f..4d5d9f721658 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/sender.go +++ b/pkg/kv/kvserver/closedts/sidetransport/sender.go @@ -178,15 +178,16 @@ func (s *Sender) Run(ctx context.Context, nodeID roachpb.NodeID) { s.buf.Close() }() - var timer timeutil.Timer + timer := timeutil.NewTimer() defer timer.Stop() for { interval := closedts.SideTransportCloseInterval.Get(&s.st.SV) if interval > 0 { - timer.Reset(closedts.SideTransportCloseInterval.Get(&s.st.SV)) + timer.Reset(interval) } else { // Disable the side-transport. timer.Stop() + timer = timeutil.NewTimer() } select { case <-timer.C: From a75a28767d0e305879a477070fe849b65a91e974 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Fri, 26 Feb 2021 01:45:58 -0500 Subject: [PATCH 2/2] kvserver: handle rebalancing of non-voting replicas This commit adds the necessary machinery within the allocator and the plumbing within the `replicateQueue` to be able to rebalance non-voting replicas. A few things to note are: 1. Voting replicas are allowed to rebalance to nodes that already have a non-voting replica. This will trigger essentially what is a metadata operation: flipping the replica type of the corresponding non-voter to a voter and the type of the voter to a non_voter. Notably, this PR changes voter allocation code to also be able to do this. 2. Computation of diversity scores works slightly differently for voting replicas and non-voting replicas. Non-voting replicas compute candidate diversity scores based off of all existing replicas, whereas voting replicas compute candidate diversity scores off of just the set of voting replicas. We don't yet support removal/replacement of dead and decommissioning non-voters, that is coming in a follow-up PR. Release justification: fix major limitation of non-voting replicas Release note: None --- pkg/kv/kvserver/allocator.go | 311 +++++++++++----- pkg/kv/kvserver/allocator_scorer.go | 86 +++-- pkg/kv/kvserver/allocator_scorer_test.go | 36 +- pkg/kv/kvserver/allocator_test.go | 447 +++++++++++++++-------- pkg/kv/kvserver/client_replica_test.go | 8 +- pkg/kv/kvserver/replicate_queue.go | 245 ++++++++++--- pkg/kv/kvserver/replicate_queue_test.go | 176 ++++++++- pkg/kv/kvserver/store_pool.go | 6 +- pkg/roachpb/api.go | 16 + pkg/ts/catalog/chart_catalog.go | 10 +- 10 files changed, 994 insertions(+), 347 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index f516a6ce4d79..0e0b73598a65 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -123,18 +123,16 @@ const ( ) var allocatorActionNames = map[AllocatorAction]string{ - AllocatorNoop: "noop", - AllocatorRemoveVoter: "remove voter", - AllocatorRemoveNonVoter: "remove non-voter", - AllocatorAddVoter: "add voter", - AllocatorAddNonVoter: "add non-voter", - AllocatorReplaceDeadVoter: "replace dead voter", - AllocatorRemoveDeadVoter: "remove dead voter", - AllocatorReplaceDecommissioningVoter: "replace decommissioning voter", - AllocatorRemoveDecommissioningVoter: "remove decommissioning voter", - AllocatorRemoveLearner: "remove learner", - // TODO(aayush): Rationalize whether or not rebalancing of non-voters needs to - // be dictated by a distinct allocator action. + AllocatorNoop: "noop", + AllocatorRemoveVoter: "remove voter", + AllocatorRemoveNonVoter: "remove non-voter", + AllocatorAddVoter: "add voter", + AllocatorAddNonVoter: "add non-voter", + AllocatorReplaceDeadVoter: "replace dead voter", + AllocatorRemoveDeadVoter: "remove dead voter", + AllocatorReplaceDecommissioningVoter: "replace decommissioning voter", + AllocatorRemoveDecommissioningVoter: "remove decommissioning voter", + AllocatorRemoveLearner: "remove learner", AllocatorConsiderRebalance: "consider rebalance", AllocatorRangeUnavailable: "range unavailable", AllocatorFinalizeAtomicReplicationChange: "finalize conf change", @@ -155,9 +153,9 @@ const ( func (t targetReplicaType) String() string { switch typ := t; typ { case voterTarget: - return "voters" + return "voter" case nonVoterTarget: - return "non-voters" + return "non-voter" default: panic(fmt.Sprintf("unknown targetReplicaType %d", typ)) } @@ -559,6 +557,10 @@ func (a *Allocator) computeAction( } // Non-voting replica removal. + + // TODO(aayush): Handle removal/replacement of decommissioning/dead non-voting + // replicas. + if haveNonVoters > neededNonVoters { // Like above, the range is over-replicated but should remove a non-voter. priority := removeExtraNonVoterPriority @@ -571,6 +573,41 @@ func (a *Allocator) computeAction( return AllocatorConsiderRebalance, 0 } +// getReplicasForDiversityCalc returns the set of replica descriptors that +// should be used for computing the diversity scores for a target when +// allocating/removing/rebalancing a replica of `targetType`. +func getReplicasForDiversityCalc( + targetType targetReplicaType, existingVoters, allExistingReplicas []roachpb.ReplicaDescriptor, +) []roachpb.ReplicaDescriptor { + switch t := targetType; t { + case voterTarget: + // When computing the "diversity score" for a given store for a voting + // replica allocation/rebalance/removal, we consider the localities of only + // the stores that contain a voting replica for the range. + // + // Note that if we were to consider all stores that have any kind of replica + // for the range, voting replica allocation would be disincentivized to pick + // stores that (partially or fully) share locality hierarchies with stores + // that contain a non-voting replica. This is undesirable because this could + // inadvertently reduce the fault-tolerance of the range in cases like the + // following: + // + // Consider 3 regions (A, B, C), each with 2 AZs. Suppose that regions A and + // B have a voting replica each, whereas region C has a non-voting replica. + // In cases like these, we would want region C to be picked over regions A + // and B for allocating a new third voting replica since that improves our + // fault tolerance to the greatest degree. + // In the counterfactual (i.e. if we were to compute diversity scores based + // off of all `existingReplicas`), regions A, B, and C would all be equally + // likely to get a new voting replica. + return existingVoters + case nonVoterTarget: + return allExistingReplicas + default: + panic(fmt.Sprintf("unsupported targetReplicaType: %v", t)) + } +} + type decisionDetails struct { Target string Existing string `json:",omitempty"` @@ -650,52 +687,31 @@ func (a *Allocator) allocateTargetFromList( existingVoters, zone.GetNumVoters(), zone.VoterConstraints) var constraintsChecker constraintsCheckFn - var replicaSetForDiversityCalc []roachpb.ReplicaDescriptor switch t := targetType; t { case voterTarget: constraintsChecker = voterConstraintsCheckerForAllocation( analyzedOverallConstraints, analyzedVoterConstraints, ) - // When computing the "diversity score" for a given store for a voting - // replica allocation, we consider the localities of only the stores that - // contain a voting replica for the range. - // - // Note that if we were to consider all stores that have any kind of replica - // for the range, voting replica allocation would be disincentivized to pick - // stores that (partially or fully) share locality hierarchies with stores - // that contain a non-voting replica. This is undesirable because this could - // inadvertently reduce the fault-tolerance of the range in cases like the - // following: - // - // Consider 3 regions (A, B, C), each with 2 AZs. Suppose that regions A and - // B have a voting replica each, whereas region C has a non-voting replica. - // In cases like these, we would want region C to be picked over regions A - // and B for allocating a new third voting replica since that improves our - // fault tolerance to the greatest degree. - // In the counterfactual (i.e. if we were to compute diversity scores based - // off of all `existingReplicas`), regions A, B, and C would all be equally - // likely to get a new voting replica. - replicaSetForDiversityCalc = existingVoters case nonVoterTarget: constraintsChecker = nonVoterConstraintsCheckerForAllocation(analyzedOverallConstraints) - replicaSetForDiversityCalc = existingReplicas default: log.Fatalf(ctx, "unsupported targetReplicaType: %v", t) } + // We'll consider the targets that have a non-voter as feasible + // relocation/up-replication targets for existing/new voting replicas, since + // we always want voter constraint conformance to take precedence over + // non-voters. For instance, in cases where we can only satisfy constraints + // for either 1 voter or 1 non-voter, we want the voter to be able to displace + // the non-voter. + existingReplicaSet := getReplicasForDiversityCalc(targetType, existingVoters, existingReplicas) candidates := rankedCandidateListForAllocation( ctx, candidateStores, constraintsChecker, - // TODO(aayush): We currently rule out all the stores that have any replica - // for the given range. However, we want to get to a place where we'll - // consider the targets that have a non-voter as feasible - // relocation/up-replication targets for existing/new voting replicas, since - // it is cheaper to promote a non-voter than it is to add a new voter via - // the LEARNER->VOTER_FULL path. - existingReplicas, - a.storePool.getLocalitiesByStore(replicaSetForDiversityCalc), + existingReplicaSet, + a.storePool.getLocalitiesByStore(existingReplicaSet), a.storePool.isNodeReadyForRoutineReplicaTransfer, options) @@ -713,26 +729,45 @@ func (a *Allocator) allocateTargetFromList( return nil, "" } -// TODO(aayush): Generalize this to work for non-voting replicas as well. func (a Allocator) simulateRemoveTarget( ctx context.Context, targetStore roachpb.StoreID, zone *zonepb.ZoneConfig, candidates []roachpb.ReplicaDescriptor, existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, rangeUsageInfo RangeUsageInfo, + targetType targetReplicaType, ) (roachpb.ReplicaDescriptor, string, error) { // Update statistics first // TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines, // but as of October 2017 calls to the Allocator are mostly serialized by the ReplicateQueue // (with the main exceptions being Scatter and the status server's allocator debug endpoint). // Try to make this interfere less with other callers. - a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_VOTER) - defer func() { - a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.REMOVE_VOTER) - }() - log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore) - return a.RemoveVoter(ctx, zone, candidates, existingVoters, nil) + switch t := targetType; t { + case voterTarget: + a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_VOTER) + defer a.storePool.updateLocalStoreAfterRebalance( + targetStore, + rangeUsageInfo, + roachpb.REMOVE_VOTER, + ) + log.VEventf(ctx, 3, "simulating which voter would be removed after adding s%d", + targetStore) + return a.RemoveVoter(ctx, zone, candidates, existingVoters, existingNonVoters) + case nonVoterTarget: + a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_NON_VOTER) + defer a.storePool.updateLocalStoreAfterRebalance( + targetStore, + rangeUsageInfo, + roachpb.REMOVE_NON_VOTER, + ) + log.VEventf(ctx, 3, "simulating which non-voter would be removed after adding s%d", + targetStore) + return a.RemoveNonVoter(ctx, zone, candidates, existingVoters, existingNonVoters) + default: + panic(fmt.Sprintf("unknown targetReplicaType: %s", t)) + } } func (a Allocator) removeTarget( @@ -762,7 +797,6 @@ func (a Allocator) removeTarget( options := a.scorerOptions() var constraintsChecker constraintsCheckFn - var replicaSetForDiversityCalc []roachpb.ReplicaDescriptor switch t := targetType; t { case voterTarget: // Voting replicas have to abide by both the overall `constraints` (which @@ -772,16 +806,13 @@ func (a Allocator) removeTarget( analyzedOverallConstraints, analyzedVoterConstraints, ) - // See comment inside `allocateTargetFromList` for why we compute diversity - // scores relative to only the stores of voting replicas. - replicaSetForDiversityCalc = existingVoters case nonVoterTarget: constraintsChecker = nonVoterConstraintsCheckerForRemoval(analyzedOverallConstraints) - replicaSetForDiversityCalc = existingReplicas default: log.Fatalf(ctx, "unsupported targetReplicaType: %v", t) } + replicaSetForDiversityCalc := getReplicasForDiversityCalc(targetType, existingVoters, existingReplicas) rankedCandidates := rankedCandidateListForRemoval( candidateStoreList, constraintsChecker, @@ -850,54 +881,67 @@ func (a Allocator) RemoveNonVoter( ) } -// RebalanceTarget returns a suitable store for a rebalance target with -// required attributes. Rebalance targets are selected via the same mechanism -// as AllocateVoter(), except the chosen target must follow some additional -// criteria. Namely, if chosen, it must further the goal of balancing the -// cluster. -// -// The supplied parameters are the required attributes for the range and -// information about the range being considered for rebalancing. -// -// The existing replicas modulo any store with dead replicas are candidates for -// rebalancing. Note that rebalancing is accomplished by first adding a new -// replica to the range, then removing the most undesirable replica. -// -// Simply ignoring a rebalance opportunity in the event that the target chosen -// by AllocateVoter() doesn't fit balancing criteria is perfectly fine, as -// other stores in the cluster will also be doing their probabilistic best to -// rebalance. This helps prevent a stampeding herd targeting an abnormally -// under-utilized store. -// -// The return values are, in order: -// -// 1. The target on which to add a new replica, -// 2. An existing replica to remove, -// 3. a JSON string for use in the range log, and -// 4. a boolean indicationg whether 1-3 were populated (i.e. whether a rebalance -// opportunity was found). -func (a Allocator) RebalanceTarget( +func (a Allocator) rebalanceTarget( ctx context.Context, zone *zonepb.ZoneConfig, raftStatus *raft.Status, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, rangeUsageInfo RangeUsageInfo, filter storeFilter, + targetType targetReplicaType, ) (add roachpb.ReplicationTarget, remove roachpb.ReplicationTarget, details string, ok bool) { sl, _, _ := a.storePool.getStoreList(filter) + existingReplicas := append(existingVoters, existingNonVoters...) zero := roachpb.ReplicationTarget{} - analyzedConstraints := constraint.AnalyzeConstraints( + analyzedOverallConstraints := constraint.AnalyzeConstraints( ctx, a.storePool.getStoreDescriptor, existingReplicas, *zone.NumReplicas, zone.Constraints) + analyzedVoterConstraints := constraint.AnalyzeConstraints( + ctx, a.storePool.getStoreDescriptor, existingVoters, zone.GetNumVoters(), zone.VoterConstraints) + var removalConstraintsChecker constraintsCheckFn + var rebalanceConstraintsChecker rebalanceConstraintsCheckFn + var replicaSetToRebalance, replicasWithExcludedStores []roachpb.ReplicaDescriptor + var otherReplicaSet []roachpb.ReplicaDescriptor + + switch t := targetType; t { + case voterTarget: + removalConstraintsChecker = voterConstraintsCheckerForRemoval( + analyzedOverallConstraints, + analyzedVoterConstraints, + ) + rebalanceConstraintsChecker = voterConstraintsCheckerForRebalance( + analyzedOverallConstraints, + analyzedVoterConstraints, + ) + replicaSetToRebalance = existingVoters + otherReplicaSet = existingNonVoters + case nonVoterTarget: + removalConstraintsChecker = nonVoterConstraintsCheckerForRemoval(analyzedOverallConstraints) + rebalanceConstraintsChecker = nonVoterConstraintsCheckerForRebalance(analyzedOverallConstraints) + replicaSetToRebalance = existingNonVoters + // When rebalancing non-voting replicas, we don't consider stores that + // already have voting replicas as possible candidates. Voting replicas are + // supposed to be rebalanced before non-voting replicas, and they do + // consider the non-voters' stores as possible candidates. + replicasWithExcludedStores = existingVoters + otherReplicaSet = existingVoters + default: + log.Fatalf(ctx, "unsupported targetReplicaType: %v", t) + } + options := a.scorerOptions() - results := rebalanceCandidates( + replicaSetForDiversityCalc := getReplicasForDiversityCalc(targetType, existingVoters, existingReplicas) + results := rankedCandidateListForRebalancing( ctx, sl, - analyzedConstraints, - existingReplicas, - a.storePool.getLocalitiesByStore(existingReplicas), + removalConstraintsChecker, + rebalanceConstraintsChecker, + replicaSetToRebalance, + replicasWithExcludedStores, + a.storePool.getLocalitiesByStore(replicaSetForDiversityCalc), a.storePool.isNodeReadyForRoutineReplicaTransfer, options, + targetType, ) if len(results) == 0 { @@ -924,21 +968,21 @@ func (a Allocator) RebalanceTarget( ReplicaID: maxReplicaID(existingReplicas) + 1, } // Deep-copy the Replicas slice since we'll mutate it below. - existingPlusOneNew := append([]roachpb.ReplicaDescriptor(nil), existingReplicas...) + existingPlusOneNew := append([]roachpb.ReplicaDescriptor(nil), replicaSetToRebalance...) existingPlusOneNew = append(existingPlusOneNew, newReplica) replicaCandidates := existingPlusOneNew // If we can, filter replicas as we would if we were actually removing one. // If we can't (e.g. because we're the leaseholder but not the raft leader), // it's better to simulate the removal with the info that we do have than to // assume that the rebalance is ok (#20241). - if raftStatus != nil && raftStatus.Progress != nil { + if targetType == voterTarget && raftStatus != nil && raftStatus.Progress != nil { replicaCandidates = simulateFilterUnremovableReplicas( ctx, raftStatus, replicaCandidates, newReplica.ReplicaID) } if len(replicaCandidates) == 0 { // No existing replicas are suitable to remove. - log.VEventf(ctx, 2, "not rebalancing to s%d because there are no existing "+ - "replicas that can be removed", target.store.StoreID) + log.VEventf(ctx, 2, "not rebalancing %s to s%d because there are no existing "+ + "replicas that can be removed", targetType, target.store.StoreID) return zero, zero, "", false } @@ -950,10 +994,12 @@ func (a Allocator) RebalanceTarget( zone, replicaCandidates, existingPlusOneNew, + otherReplicaSet, rangeUsageInfo, + targetType, ) if err != nil { - log.Warningf(ctx, "simulating RemoveVoter failed: %+v", err) + log.Warningf(ctx, "simulating removal of %s failed: %+v", targetType, err) return zero, zero, "", false } if target.store.StoreID != removeReplica.StoreID { @@ -988,6 +1034,83 @@ func (a Allocator) RebalanceTarget( return addTarget, removeTarget, string(detailsBytes), true } +// RebalanceVoter returns a suitable store for a rebalance target with required +// attributes. Rebalance targets are selected via the same mechanism as +// AllocateVoter(), except the chosen target must follow some additional +// criteria. Namely, if chosen, it must further the goal of balancing the +// cluster. +// +// The supplied parameters are the required attributes for the range and +// information about the range being considered for rebalancing. +// +// The existing voting replicas modulo any store with dead replicas are +// candidates for rebalancing. +// +// Simply ignoring a rebalance opportunity in the event that the target chosen +// by rankedCandidateListForRebalancing() doesn't fit balancing criteria is +// perfectly fine, as other stores in the cluster will also be doing their +// probabilistic best to rebalance. This helps prevent a stampeding herd +// targeting an abnormally under-utilized store. +// +// The return values are, in order: +// +// 1. The target on which to add a new replica, +// 2. An existing replica to remove, +// 3. a JSON string for use in the range log, and +// 4. a boolean indicationg whether 1-3 were populated (i.e. whether a rebalance +// opportunity was found). +func (a Allocator) RebalanceVoter( + ctx context.Context, + zone *zonepb.ZoneConfig, + raftStatus *raft.Status, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + rangeUsageInfo RangeUsageInfo, + filter storeFilter, +) (add roachpb.ReplicationTarget, remove roachpb.ReplicationTarget, details string, ok bool) { + return a.rebalanceTarget( + ctx, + zone, + raftStatus, + existingVoters, + existingNonVoters, + rangeUsageInfo, + filter, + voterTarget, + ) +} + +// RebalanceNonVoter returns a suitable pair of rebalance candidates for a +// non-voting replica. This behaves very similarly to `RebalanceVoter` as +// explained above. The key differences are the following: +// +// 1. Non-voting replicas only adhere to the overall `constraints` and not the +// `voter_constraints`. +// 2. We do not consider stores that have voters as valid candidates for +// rebalancing. +// 3. Diversity score calculation for non-voters is relative to all existing +// replicas. This is in contrast to how we compute the diversity scores for +// voting replicas, which are computed relative to just the set of voting +// replicas. +func (a Allocator) RebalanceNonVoter( + ctx context.Context, + zone *zonepb.ZoneConfig, + raftStatus *raft.Status, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + rangeUsageInfo RangeUsageInfo, + filter storeFilter, +) (add roachpb.ReplicationTarget, remove roachpb.ReplicationTarget, details string, ok bool) { + return a.rebalanceTarget( + ctx, + zone, + raftStatus, + existingVoters, + existingNonVoters, + rangeUsageInfo, + filter, + nonVoterTarget, + ) +} + func (a *Allocator) scorerOptions() scorerOptions { return scorerOptions{ deterministic: a.storePool.deterministic, diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index 0842aab29fb1..c7568a8d1a44 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -523,18 +523,20 @@ type rebalanceOptions struct { candidates candidateList } -// rebalanceCandidates creates two candidate lists. The first contains all -// existing replica's stores, ordered from least qualified for rebalancing to -// most qualified. The second list is of all potential stores that could be -// used as rebalancing receivers, ordered from best to worst. -func rebalanceCandidates( +// rankedCandidateListForRebalancing creates two candidate lists. The first +// contains all existing replica's stores, ordered from least qualified for +// rebalancing to most qualified. The second list is of all potential stores +// that could be used as rebalancing receivers, ordered from best to worst. +func rankedCandidateListForRebalancing( ctx context.Context, allStores StoreList, - constraints constraint.AnalyzedConstraints, - existingReplicas []roachpb.ReplicaDescriptor, + removalConstraintsChecker constraintsCheckFn, + rebalanceConstraintsChecker rebalanceConstraintsCheckFn, + existingReplicasForType, replicasWithExcludedStores []roachpb.ReplicaDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool, options scorerOptions, + replicaType targetReplicaType, ) []rebalanceOptions { // 1. Determine whether existing replicas are valid and/or necessary. existingStores := make(map[roachpb.StoreID]candidate) @@ -545,11 +547,11 @@ func rebalanceCandidates( log.VEventf(ctx, 3, "not considering non-ready node n%d for rebalance", store.Node.NodeID) continue } - for _, repl := range existingReplicas { + for _, repl := range existingReplicasForType { if store.StoreID != repl.StoreID { continue } - valid, necessary := removeConstraintsCheck(store, constraints) + valid, necessary := removalConstraintsChecker(store) fullDisk := !maxCapacityCheck(store) if !valid { if !needRebalanceFrom { @@ -617,8 +619,15 @@ func rebalanceCandidates( } var comparableCands candidateList for _, store := range allStores.stores { - constraintsOK, necessary := rebalanceFromConstraintsCheck( - store, existing.store.StoreID, constraints) + // Ignore any stores that contain any of the replicas within + // `replicasWithExcludedStores`. + for _, excluded := range replicasWithExcludedStores { + if store.StoreID == excluded.StoreID { + continue + } + } + + constraintsOK, necessary := rebalanceConstraintsChecker(store, existing.store) maxCapacityOK := maxCapacityCheck(store) diversityScore := diversityRebalanceFromScore( store, existing.store.StoreID, existingStoreLocalities) @@ -630,6 +639,7 @@ func rebalanceCandidates( diversityScore: diversityScore, } if !cand.less(existing) { + // If `cand` is not worse than `existing`, add it to the list. comparableCands = append(comparableCands, cand) if !needRebalanceFrom && !needRebalanceTo && existing.less(cand) { needRebalanceTo = true @@ -676,9 +686,12 @@ func rebalanceCandidates( } } } - // TODO(a-robinson): Some moderate refactoring could extract this logic out - // into the loop below, avoiding duplicate balanceScore calculations. - if shouldRebalance(ctx, existing.store, sl, options) { + // NB: Due to step 2 from above, we're guaranteed to have a non-empty `sl` + // at this point. + // + // TODO(a-robinson): Some moderate refactoring could extract this logic + // out into the loop below, avoiding duplicate balanceScore calculations. + if shouldRebalanceBasedOnRangeCount(ctx, existing.store, sl, options) { shouldRebalanceCheck = true break } @@ -742,7 +755,7 @@ func rebalanceCandidates( // Only consider this candidate if we must rebalance due to constraint, // disk fullness, or diversity reasons. log.VEventf(ctx, 3, "not considering %+v as a candidate for range %+v: score=%s storeList=%+v", - s, existingReplicas, cand.balanceScore, comparable.sl) + s, existingReplicasForType, cand.balanceScore, comparable.sl) continue } cand.rangeCount = int(s.Capacity.RangeCount) @@ -840,9 +853,10 @@ func betterRebalanceTarget(target1, existing1, target2, existing2 *candidate) *c return target1 } -// shouldRebalance returns whether the specified store is a candidate for -// having a replica removed from it given the candidate store list. -func shouldRebalance( +// shouldRebalanceBasedOnRangeCount returns whether the specified store is a +// candidate for having a replica removed from it given the candidate store +// list. +func shouldRebalanceBasedOnRangeCount( ctx context.Context, store roachpb.StoreDescriptor, sl StoreList, options scorerOptions, ) bool { overfullThreshold := int32(math.Ceil(overfullRangeThreshold(options, sl.candidateRanges.mean))) @@ -911,6 +925,11 @@ func sameLocalityAndAttrs(s1, s2 roachpb.StoreDescriptor) bool { // existing one. type constraintsCheckFn func(roachpb.StoreDescriptor) (valid, necessary bool) +// rebalanceConstraintsCheckFn determines whether `toStore` is a valid and/or +// necessary replacement candidate for `fromStore` (which must contain an +// existing replica). +type rebalanceConstraintsCheckFn func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool) + // voterConstraintsCheckerForAllocation returns a constraintsCheckFn that // determines whether a candidate for a new voting replica is valid and/or // necessary as per the `voter_constraints` and `constraints` on the range. @@ -978,6 +997,31 @@ func nonVoterConstraintsCheckerForRemoval( } } +// voterConstraintsCheckerForRebalance returns a rebalanceConstraintsCheckFn +// that determines whether a given store is a valid and/or necessary rebalance +// candidate from a given store of an existing voting replica. +func voterConstraintsCheckerForRebalance( + overallConstraints, voterConstraints constraint.AnalyzedConstraints, +) rebalanceConstraintsCheckFn { + return func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool) { + overallConstraintsOK, necessaryOverall := rebalanceFromConstraintsCheck(toStore, fromStore, overallConstraints) + voterConstraintsOK, necessaryForVoters := rebalanceFromConstraintsCheck(toStore, fromStore, voterConstraints) + + return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters + } +} + +// nonVoterConstraintsCheckerForRebalance returns a rebalanceConstraintsCheckFn +// that determines whether a given store is a valid and/or necessary rebalance +// candidate from a given store of an existing non-voting replica. +func nonVoterConstraintsCheckerForRebalance( + overallConstraints constraint.AnalyzedConstraints, +) rebalanceConstraintsCheckFn { + return func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool) { + return rebalanceFromConstraintsCheck(toStore, fromStore, overallConstraints) + } +} + // allocateConstraintsCheck checks the potential allocation target store // against all the constraints. If it matches a constraint at all, it's valid. // If it matches a constraint that is not already fully satisfied by existing @@ -1057,9 +1101,7 @@ func removeConstraintsCheck( // will be necessary if fromStoreID (an existing replica) is removed from the // range. func rebalanceFromConstraintsCheck( - store roachpb.StoreDescriptor, - fromStoreID roachpb.StoreID, - analyzed constraint.AnalyzedConstraints, + store, fromStoreID roachpb.StoreDescriptor, analyzed constraint.AnalyzedConstraints, ) (valid bool, necessary bool) { // All stores are valid when there are no constraints. if len(analyzed.Constraints) == 0 { @@ -1083,7 +1125,7 @@ func rebalanceFromConstraintsCheck( matchingStores := analyzed.SatisfiedBy[i] if len(matchingStores) < int(constraints.NumReplicas) || (len(matchingStores) == int(constraints.NumReplicas) && - containsStore(analyzed.SatisfiedBy[i], fromStoreID)) { + containsStore(analyzed.SatisfiedBy[i], fromStoreID.StoreID)) { return true, true } } diff --git a/pkg/kv/kvserver/allocator_scorer_test.go b/pkg/kv/kvserver/allocator_scorer_test.go index fe4c214b95f5..5110c844ab8f 100644 --- a/pkg/kv/kvserver/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator_scorer_test.go @@ -315,9 +315,9 @@ func TestBetterThan(t *testing.T) { } // TestBestRebalanceTarget constructs a hypothetical output of -// rebalanceCandidates and verifies that bestRebalanceTarget properly returns -// the candidates in the ideal order of preference and omits any that aren't -// desirable. +// rankedCandidateListForRebalancing and verifies that bestRebalanceTarget +// properly returns the candidates in the ideal order of preference and omits +// any that aren't desirable. func TestBestRebalanceTarget(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1209,18 +1209,36 @@ func TestShouldRebalanceDiversity(t *testing.T) { } } - targets := rebalanceCandidates( + removalConstraintsChecker := voterConstraintsCheckerForRemoval( + constraint.EmptyAnalyzedConstraints, + constraint.EmptyAnalyzedConstraints, + ) + rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance( + constraint.EmptyAnalyzedConstraints, + constraint.EmptyAnalyzedConstraints, + ) + targets := rankedCandidateListForRebalancing( context.Background(), filteredSL, - constraint.AnalyzedConstraints{}, + removalConstraintsChecker, + rebalanceConstraintsChecker, replicas, + nil, existingStoreLocalities, - func(context.Context, roachpb.NodeID) bool { return true }, /* isNodeValidForRoutineReplicaTransfer */ - options) + func(context.Context, roachpb.NodeID) bool { return true }, + options, + voterTarget, + ) actual := len(targets) > 0 if actual != tc.expected { - t.Errorf("%d: shouldRebalance on s%d with replicas on %v got %t, expected %t", - i, tc.s.StoreID, tc.existingNodeIDs, actual, tc.expected) + t.Errorf( + "%d: shouldRebalanceBasedOnRangeCount on s%d with replicas on %v got %t, expected %t", + i, + tc.s.StoreID, + tc.existingNodeIDs, + actual, + tc.expected, + ) } } } diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index c111d4c9cbb7..cb347dd04c06 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -86,13 +86,15 @@ var multiDCConfigUnsatisfiableVoterConstraints = zonepb.ZoneConfig{ } // multiDCConfigVoterAndNonVoter prescribes that one voting replica be placed in -// DC "a" and one non-voting replica be placed in DC "b". +// DC "b" and one non-voting replica be placed in DC "a". var multiDCConfigVoterAndNonVoter = zonepb.ZoneConfig{ NumReplicas: proto.Int32(2), Constraints: []zonepb.ConstraintsConjunction{ + // Constrain the non-voter to "a". {Constraints: []zonepb.Constraint{{Value: "a", Type: zonepb.Constraint_REQUIRED}}, NumReplicas: 1}, }, VoterConstraints: []zonepb.ConstraintsConjunction{ + // Constrain the voter to "b". {Constraints: []zonepb.Constraint{{Value: "b", Type: zonepb.Constraint_REQUIRED}}}, }, } @@ -317,6 +319,42 @@ var multiDiversityDCStores = []*roachpb.StoreDescriptor{ }, } +var oneStoreWithFullDisk = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 5, RangeCount: 600}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 600}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 600}, + }, +} + +var oneStoreWithTooManyRanges = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 5, RangeCount: 600}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 200}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 200}, + }, +} + func replicas(storeIDs ...roachpb.StoreID) []roachpb.ReplicaDescriptor { res := make([]roachpb.ReplicaDescriptor, len(storeIDs)) for i, storeID := range storeIDs { @@ -646,16 +684,9 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { { var rangeUsageInfo RangeUsageInfo - target, _, details, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - nil, /* raftStatus */ - tc.existing, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, details, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, tc.existing, nil, rangeUsageInfo, storeFilterThrottled) if e, a := tc.expectTargetRebalance, ok; e != a { - t.Errorf("RebalanceTarget(%v) got target %v, details %v; expectTarget=%v", + t.Errorf("RebalanceVoter(%v) got target %v, details %v; expectTarget=%v", tc.existing, target, details, tc.expectTargetRebalance) } } @@ -715,14 +746,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { // After that we should not be seeing replicas move. var rangeUsageInfo RangeUsageInfo for i := 1; i < 40; i++ { - add, remove, _, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - nil, /* raftStatus */ - ranges[i].InternalReplicas, - rangeUsageInfo, - storeFilterThrottled, - ) + add, remove, _, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, ranges[i].InternalReplicas, nil, rangeUsageInfo, storeFilterThrottled) if ok { // Update the descriptor. newReplicas := make([]roachpb.ReplicaDescriptor, 0, len(ranges[i].InternalReplicas)) @@ -754,14 +778,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { // We dont expect any range wanting to move since the system should have // reached a stable state at this point. for i := 1; i < 40; i++ { - _, _, _, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - nil, /* raftStatus */ - ranges[i].InternalReplicas, - rangeUsageInfo, - storeFilterThrottled, - ) + _, _, _, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, ranges[i].InternalReplicas, nil, rangeUsageInfo, storeFilterThrottled) require.False(t, ok) } } @@ -824,14 +841,7 @@ func TestAllocatorRebalance(t *testing.T) { // Every rebalance target must be either store 1 or 2. for i := 0; i < 10; i++ { var rangeUsageInfo RangeUsageInfo - target, _, _, ok := a.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, - []roachpb.ReplicaDescriptor{{NodeID: 3, StoreID: 3}}, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, _, ok := a.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: 3, StoreID: 3}}, nil, rangeUsageInfo, storeFilterThrottled) if !ok { i-- // loop until we find 10 candidates continue @@ -843,14 +853,14 @@ func TestAllocatorRebalance(t *testing.T) { } } - // Verify shouldRebalance results. + // Verify shouldRebalanceBasedOnRangeCount results. for i, store := range stores { desc, ok := a.storePool.getStoreDescriptor(store.StoreID) if !ok { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) - result := shouldRebalance(ctx, desc, sl, a.scorerOptions()) + result := shouldRebalanceBasedOnRangeCount(ctx, desc, sl, a.scorerOptions()) if expResult := (i >= 2); expResult != result { t.Errorf("%d: expected rebalance %t; got %t; desc %+v; sl: %+v", i, expResult, result, desc, sl) } @@ -870,7 +880,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { // 2 other datacenters. All of our replicas are distributed within these 3 // datacenters. Originally, the stores that are all alone in their datacenter // are fuller than the other stores. If we didn't simulate RemoveVoter in - // RebalanceTarget, we would try to choose store 2 or 3 as the target store + // RebalanceVoter, we would try to choose store 2 or 3 as the target store // to make a rebalance. However, we would immediately remove the replica on // store 1 or 2 to retain the locality diversity. stores := []*roachpb.StoreDescriptor{ @@ -975,14 +985,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { } } for i := 0; i < 10; i++ { - result, _, details, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - status, - replicas, - rangeUsageInfo, - storeFilterThrottled, - ) + result, _, details, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), status, replicas, nil, rangeUsageInfo, storeFilterThrottled) if ok { t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) } @@ -995,14 +998,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { stores[2].Capacity.RangeCount = 46 sg.GossipStores(stores, t) for i := 0; i < 10; i++ { - target, _, details, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - status, - replicas, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, details, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), status, replicas, nil, rangeUsageInfo, storeFilterThrottled) if ok { t.Fatalf("expected no rebalance, but got target s%d; details: %s", target.StoreID, details) } @@ -1012,14 +1008,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { stores[1].Capacity.RangeCount = 44 sg.GossipStores(stores, t) for i := 0; i < 10; i++ { - target, origin, details, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - status, - replicas, - rangeUsageInfo, - storeFilterThrottled, - ) + target, origin, details, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), status, replicas, nil, rangeUsageInfo, storeFilterThrottled) expTo := stores[1].StoreID expFrom := stores[0].StoreID if !ok || target.StoreID != expTo || origin.StoreID != expFrom { @@ -1088,13 +1077,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { var rangeUsageInfo RangeUsageInfo - target, _, _, ok := a.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, - c.existing, - rangeUsageInfo, - storeFilterThrottled) + target, _, _, ok := a.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, c.existing, nil, rangeUsageInfo, storeFilterThrottled) if c.expected > 0 { if !ok { t.Fatalf("expected %d, but found nil", c.expected) @@ -1230,14 +1213,14 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { }) sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) - // Verify shouldRebalance returns the expected value. + // Verify shouldRebalanceBasedOnRangeCount returns the expected value. for j, store := range stores { desc, ok := a.storePool.getStoreDescriptor(store.StoreID) if !ok { t.Fatalf("[store %d]: unable to get store %d descriptor", j, store.StoreID) } - if a, e := shouldRebalance(context.Background(), desc, sl, a.scorerOptions()), cluster[j].shouldRebalanceFrom; a != e { - t.Errorf("[store %d]: shouldRebalance %t != expected %t", store.StoreID, a, e) + if a, e := shouldRebalanceBasedOnRangeCount(context.Background(), desc, sl, a.scorerOptions()), cluster[j].shouldRebalanceFrom; a != e { + t.Errorf("[store %d]: shouldRebalanceBasedOnRangeCount %t != expected %t", store.StoreID, a, e) } } }) @@ -1284,27 +1267,20 @@ func TestAllocatorRebalanceByCount(t *testing.T) { // Every rebalance target must be store 4 (or nil for case of missing the only option). for i := 0; i < 10; i++ { var rangeUsageInfo RangeUsageInfo - result, _, _, ok := a.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, - []roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, - rangeUsageInfo, - storeFilterThrottled, - ) + result, _, _, ok := a.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, []roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, nil, rangeUsageInfo, storeFilterThrottled) if ok && result.StoreID != 4 { t.Errorf("expected store 4; got %d", result.StoreID) } } - // Verify shouldRebalance results. + // Verify shouldRebalanceBasedOnRangeCount results. for i, store := range stores { desc, ok := a.storePool.getStoreDescriptor(store.StoreID) if !ok { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) - result := shouldRebalance(ctx, desc, sl, a.scorerOptions()) + result := shouldRebalanceBasedOnRangeCount(ctx, desc, sl, a.scorerOptions()) if expResult := (i < 3); expResult != result { t.Errorf("%d: expected rebalance %t; got %t", i, expResult, result) } @@ -1568,20 +1544,13 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { for i, tc := range testCases { var rangeUsageInfo RangeUsageInfo - result, _, details, ok := a.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, /* raftStatus */ - tc.existing, - rangeUsageInfo, - storeFilterThrottled, - ) + result, _, details, ok := a.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, tc.existing, nil, rangeUsageInfo, storeFilterThrottled) var resultID roachpb.StoreID if ok { resultID = result.StoreID } if resultID != tc.expected { - t.Errorf("%d: RebalanceTarget(%v) expected s%d; got %v: %s", i, tc.existing, tc.expected, result, details) + t.Errorf("%d: RebalanceVoter(%v) expected s%d; got %v: %s", i, tc.existing, tc.expected, result, details) } } @@ -1638,14 +1607,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { for i, tc := range testCases2 { log.Infof(ctx, "case #%d", i) var rangeUsageInfo RangeUsageInfo - result, _, details, ok := a.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, /* raftStatus */ - tc.existing, - rangeUsageInfo, - storeFilterThrottled, - ) + result, _, details, ok := a.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, tc.existing, nil, rangeUsageInfo, storeFilterThrottled) var gotExpected bool if !ok { gotExpected = (tc.expected == nil) @@ -1658,7 +1620,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { } } if !gotExpected { - t.Errorf("%d: RebalanceTarget(%v) expected store in %v; got %v: %s", + t.Errorf("%d: RebalanceVoter(%v) expected store in %v; got %v: %s", i, tc.existing, tc.expected, result, details) } } @@ -2448,16 +2410,9 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { } } var rangeUsageInfo RangeUsageInfo - target, _, details, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - nil, - existingRepls, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, details, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, existingRepls, nil, rangeUsageInfo, storeFilterThrottled) if !ok { - t.Fatalf("%d: RebalanceTarget(%v) returned no target store; details: %s", i, c.existing, details) + t.Fatalf("%d: RebalanceVoter(%v) returned no target store; details: %s", i, c.existing, details) } var found bool for _, storeID := range c.expected { @@ -2467,7 +2422,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { } } if !found { - t.Errorf("%d: expected RebalanceTarget(%v) in %v, but got %d; details: %s", + t.Errorf("%d: expected RebalanceVoter(%v) in %v, but got %d; details: %s", i, c.existing, c.expected, target.StoreID, details) } } @@ -2660,7 +2615,9 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { analyzed := constraint.AnalyzeConstraints( context.Background(), a.storePool.getStoreDescriptor, existingRepls, *zone.NumReplicas, zone.Constraints) - checkFn := voterConstraintsCheckerForAllocation(analyzed, constraint.EmptyAnalyzedConstraints) + allocationConstraintsChecker := voterConstraintsCheckerForAllocation(analyzed, constraint.EmptyAnalyzedConstraints) + removalConstraintsChecker := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints) + rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance(analyzed, constraint.EmptyAnalyzedConstraints) a.storePool.isNodeReadyForRoutineReplicaTransfer = func(_ context.Context, n roachpb.NodeID) bool { for _, s := range tc.excluded { @@ -2675,7 +2632,7 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { t.Run(fmt.Sprintf("%d/allocate", testIdx), func(t *testing.T) { candidates := rankedCandidateListForAllocation(context.Background(), sl, - checkFn, + allocationConstraintsChecker, existingRepls, a.storePool.getLocalitiesByStore(existingRepls), a.storePool.isNodeReadyForRoutineReplicaTransfer, @@ -2688,22 +2645,35 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { }) t.Run(fmt.Sprintf("%d/rebalance", testIdx), func(t *testing.T) { - results := rebalanceCandidates( + results := rankedCandidateListForRebalancing( context.Background(), sl, - analyzed, + removalConstraintsChecker, + rebalanceConstraintsChecker, existingRepls, + nil, a.storePool.getLocalitiesByStore(existingRepls), a.storePool.isNodeReadyForRoutineReplicaTransfer, a.scorerOptions(), + voterTarget, ) for i := range results { if !expectedStoreIDsMatch(tc.existing, results[i].existingCandidates) { - t.Errorf("results[%d]: expected existing candidates %v, got %v", i, tc.existing, results[i].existingCandidates) + t.Errorf( + "results[%d]: expected existing candidates %v, got %v", + i, + tc.existing, + results[i].existingCandidates, + ) } if !expectedStoreIDsMatch(tc.expected, results[i].candidates) { - t.Errorf("results[%d]: expected candidates %v, got %v", i, tc.expected, results[i].candidates) + t.Errorf( + "results[%d]: expected candidates %v, got %v", + i, + tc.expected, + results[i].candidates, + ) } } }) @@ -3273,6 +3243,199 @@ func expectedStoreIDsMatch(expected []roachpb.StoreID, results candidateList) bo return true } +// TestAllocatorRebalanceNonVoters tests that non-voting replicas rebalance "as +// expected". In particular, it checks the following things: +// +// 1. Non-voter rebalancing obeys the allocator's capacity based heuristics. +// 2. Non-voter rebalancing tries to ensure constraints conformance. +func TestAllocatorRebalanceNonVoters(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + type testCase struct { + name string + stores []*roachpb.StoreDescriptor + zone *zonepb.ZoneConfig + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor + expectNoAction bool + expectedRemoveTargets, expectedAddTargets []roachpb.StoreID + } + tests := []testCase{ + { + name: "no-op", + stores: multiDiversityDCStores, + zone: zonepb.EmptyCompleteZoneConfig(), + existingVoters: replicas(1), + existingNonVoters: replicas(3), + expectNoAction: true, + }, + // Test that rebalancing based on just the diversity scores works as + // expected. In particular, we expect non-voter rebalancing to compute + // diversity scores based on the entire existing replica set, and not just + // the set of non-voting replicas. + { + name: "diversity among non-voters", + stores: multiDiversityDCStores, + zone: zonepb.EmptyCompleteZoneConfig(), + existingVoters: replicas(1, 2), + existingNonVoters: replicas(3, 4, 6), + expectedRemoveTargets: []roachpb.StoreID{3, 4}, + expectedAddTargets: []roachpb.StoreID{7, 8}, + }, + { + name: "diversity among all existing replicas", + stores: multiDiversityDCStores, + zone: zonepb.EmptyCompleteZoneConfig(), + existingVoters: replicas(1), + existingNonVoters: replicas(2, 4, 6), + expectedRemoveTargets: []roachpb.StoreID{2}, + expectedAddTargets: []roachpb.StoreID{7, 8}, + }, + // Test that non-voting replicas obey the capacity / load based heuristics + // for rebalancing. + { + name: "move off of nodes with full disk", + // NB: Store 1 has a 97.5% full disk. + stores: oneStoreWithFullDisk, + zone: zonepb.EmptyCompleteZoneConfig(), + existingVoters: replicas(3), + existingNonVoters: replicas(1), + expectedRemoveTargets: []roachpb.StoreID{1}, + expectedAddTargets: []roachpb.StoreID{2}, + }, + { + name: "move off of nodes with too many ranges", + // NB: Store 1 has 3x the number of ranges as the other stores. + stores: oneStoreWithTooManyRanges, + zone: zonepb.EmptyCompleteZoneConfig(), + existingVoters: replicas(3), + existingNonVoters: replicas(1), + expectedRemoveTargets: []roachpb.StoreID{1}, + expectedAddTargets: []roachpb.StoreID{2}, + }, + // Test that `constraints` cause non-voters to move around in order to + // sustain constraints conformance. + { + name: "already on a store that satisfies constraints for non_voters", + stores: multiDCStores, + // Constrain a voter to store 2 and a non_voter to store 1. + zone: &multiDCConfigVoterAndNonVoter, + existingVoters: replicas(2), + existingNonVoters: replicas(1), + expectNoAction: true, + }, + { + name: "need to rebalance to conform to constraints", + stores: multiDCStores, + // Constrain a non_voter to store 1. + zone: &multiDCConfigVoterAndNonVoter, + existingVoters: nil, + existingNonVoters: replicas(2), + expectedRemoveTargets: []roachpb.StoreID{2}, + expectedAddTargets: []roachpb.StoreID{1}, + }, + { + // Test that non-voting replica rebalancing does not consider stores that + // have voters as valid candidates, even if those stores satisfy + // constraints. + name: "need to rebalance, but cannot because a voter already exists", + stores: multiDCStores, + zone: &multiDCConfigVoterAndNonVoter, + existingVoters: replicas(1), + existingNonVoters: replicas(2), + expectNoAction: true, + }, + } + + var rangeUsageInfo RangeUsageInfo + chk := func(target roachpb.ReplicationTarget, expectedCandidates []roachpb.StoreID) bool { + for _, candidate := range expectedCandidates { + if target.StoreID == candidate { + return true + } + } + return false + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { + stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(test.stores, t) + add, remove, _, ok := a.RebalanceNonVoter(ctx, + test.zone, + nil, + test.existingVoters, + test.existingNonVoters, + rangeUsageInfo, + storeFilterThrottled) + if test.expectNoAction { + require.True(t, !ok) + } else { + require.Truef(t, ok, "no action taken on range") + require.Truef(t, + chk(add, test.expectedAddTargets), + "the addition target %+v from RebalanceNonVoter doesn't match expectation", + add) + require.Truef(t, + chk(remove, test.expectedRemoveTargets), + "the removal target %+v from RebalanceNonVoter doesn't match expectation", + remove) + } + }) + } +} + +// TestVotersCanRebalanceToNonVoterStores ensures that rebalancing of voting +// replicas considers stores that have non-voters as feasible candidates. +func TestVotersCanRebalanceToNonVoterStores(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + defer stopper.Stop(context.Background()) + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(multiDiversityDCStores, t) + + zone := zonepb.ZoneConfig{ + NumReplicas: proto.Int32(4), + NumVoters: proto.Int32(2), + // We constrain 2 voting replicas to datacenter "a" (stores 1 and 2) but + // place non voting replicas there. In order to achieve constraints + // conformance, each of the voters must want to move to one of these stores. + VoterConstraints: []zonepb.ConstraintsConjunction{ + { + NumReplicas: 2, + Constraints: []zonepb.Constraint{ + {Type: zonepb.Constraint_REQUIRED, Key: "datacenter", Value: "a"}, + }, + }, + }, + } + + var rangeUsageInfo RangeUsageInfo + existingNonVoters := replicas(1, 2) + existingVoters := replicas(3, 4) + add, remove, _, ok := a.RebalanceVoter( + ctx, + &zone, + nil, + existingVoters, + existingNonVoters, + rangeUsageInfo, + storeFilterThrottled, + ) + + require.Truef(t, ok, "no action taken") + if !(add.StoreID == roachpb.StoreID(1) || add.StoreID == roachpb.StoreID(2)) { + t.Fatalf("received unexpected addition target %s from RebalanceVoter", add) + } + if !(remove.StoreID == roachpb.StoreID(3) || remove.StoreID == roachpb.StoreID(4)) { + t.Fatalf("received unexpected removal target %s from RebalanceVoter", remove) + } +} + func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -4041,14 +4204,26 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { analyzed := constraint.AnalyzeConstraints( context.Background(), a.storePool.getStoreDescriptor, existingRepls, *zone.NumReplicas, zone.Constraints) - results := rebalanceCandidates( + removalConstraintsChecker := voterConstraintsCheckerForRemoval( + analyzed, + constraint.EmptyAnalyzedConstraints, + ) + rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance( + analyzed, + constraint.EmptyAnalyzedConstraints, + ) + + results := rankedCandidateListForRebalancing( context.Background(), sl, - analyzed, + removalConstraintsChecker, + rebalanceConstraintsChecker, existingRepls, + nil, a.storePool.getLocalitiesByStore(existingRepls), - func(context.Context, roachpb.NodeID) bool { return true }, /* isNodeValidForRoutineReplicaTransfer */ + func(context.Context, roachpb.NodeID) bool { return true }, a.scorerOptions(), + voterTarget, ) match := true if len(tc.expected) != len(results) { @@ -4066,13 +4241,12 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { } } if !match { - t.Errorf("%d: expected rebalanceCandidates(%v) = %v, but got %v", + t.Errorf("%d: expected rankedCandidateListForRebalancing(%v) = %v, but got %v", testIdx, tc.existing, tc.expected, results) } else { - // Also verify that RebalanceTarget picks out one of the best options as + // Also verify that RebalanceVoter picks out one of the best options as // the final rebalance choice. - target, _, details, ok := a.RebalanceTarget( - context.Background(), zone, nil, existingRepls, rangeUsageInfo, storeFilterThrottled) + target, _, details, ok := a.RebalanceVoter(context.Background(), zone, nil, existingRepls, nil, rangeUsageInfo, storeFilterThrottled) var found bool if !ok && len(tc.validTargets) == 0 { found = true @@ -4084,7 +4258,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { } } if !found { - t.Errorf("%d: expected RebalanceTarget(%v) to be in %v, but got %v; details: %s", + t.Errorf("%d: expected RebalanceVoter(%v) to be in %v, but got %v; details: %s", testIdx, tc.existing, tc.validTargets, target, details) } } @@ -6155,14 +6329,7 @@ func TestAllocatorRebalanceAway(t *testing.T) { } var rangeUsageInfo RangeUsageInfo - actual, _, _, ok := a.RebalanceTarget( - ctx, - &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: []zonepb.ConstraintsConjunction{constraints}}, - nil, - existingReplicas, - rangeUsageInfo, - storeFilterThrottled, - ) + actual, _, _, ok := a.RebalanceVoter(ctx, &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: []zonepb.ConstraintsConjunction{constraints}}, nil, existingReplicas, nil, rangeUsageInfo, storeFilterThrottled) if tc.expected == nil && ok { t.Errorf("rebalancing to the incorrect store, expected nil, got %d", actual.StoreID) @@ -6322,14 +6489,7 @@ func TestAllocatorFullDisks(t *testing.T) { // Rebalance until there's no more rebalancing to do. if ts.Capacity.RangeCount > 0 { var rangeUsageInfo RangeUsageInfo - target, _, details, ok := alloc.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, - []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, details, ok := alloc.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, nil, rangeUsageInfo, storeFilterThrottled) if ok { if log.V(1) { log.Infof(ctx, "rebalancing to %v; details: %s", target, details) @@ -6451,14 +6611,7 @@ func Example_rebalancing() { for j := 0; j < len(testStores); j++ { ts := &testStores[j] var rangeUsageInfo RangeUsageInfo - target, _, details, ok := alloc.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - nil, - []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, details, ok := alloc.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, nil, rangeUsageInfo, storeFilterThrottled) if ok { log.Infof(context.Background(), "rebalancing to %v; details: %s", target, details) testStores[j].rebalance(&testStores[int(target.StoreID)], alloc.randGen.Int63n(1<<20)) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 7497b3554e61..8e1bfcf78f5c 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2312,18 +2312,20 @@ func TestChangeReplicasSwapVoterWithNonVoter(t *testing.T) { firstStore, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID()) require.NoError(t, err) firstRepl := firstStore.LookupReplica(roachpb.RKey(key)) - require.NotNil(t, firstRepl, `the first node in the TestCluster must have a replica for the ScratchRange`) + require.NotNil(t, firstRepl, "the first node in the TestCluster must have a"+ + " replica for the ScratchRange") + tc.AddNonVotersOrFatal(t, key, nonVoter) // TODO(aayush): Trying to swap the last voting replica with a non-voter hits // the safeguard inside Replica.propose() as the last voting replica is always // the leaseholder. There are a bunch of subtleties around getting a // leaseholder to remove itself without another voter to immediately transfer - // the lease to. Determine if/how this needs to be fixed. - tc.AddNonVotersOrFatal(t, key, nonVoter) + // the lease to. See #40333. _, err = tc.SwapVoterWithNonVoter(key, firstVoter, nonVoter) require.Regexp(t, "received invalid ChangeReplicasTrigger", err) tc.AddVotersOrFatal(t, key, secondVoter) + tc.SwapVoterWithNonVoterOrFatal(t, key, secondVoter, nonVoter) } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index b7679d4b4d6a..0d41747a226d 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -99,6 +99,18 @@ var ( Measurement: "Lease Transfers", Unit: metric.Unit_COUNT, } + metaReplicateQueueNonVoterPromotionsCount = metric.Metadata{ + Name: "queue.replicate.nonvoterpromotions", + Help: "Number of non-voters promoted to voters by the replicate queue", + Measurement: "Promotions of Non Voters to Voters", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueVoterDemotionsCount = metric.Metadata{ + Name: "queue.replicate.voterdemotions", + Help: "Number of voters demoted to non-voters by the replicate queue", + Measurement: "Demotions of Voters to Non Voters", + Unit: metric.Unit_COUNT, + } ) // quorumError indicates a retryable error condition which sends replicas being @@ -129,6 +141,8 @@ type ReplicateQueueMetrics struct { RemoveLearnerReplicaCount *metric.Counter RebalanceReplicaCount *metric.Counter TransferLeaseCount *metric.Counter + NonVoterPromotionsCount *metric.Counter + VoterDemotionsCount *metric.Counter } func makeReplicateQueueMetrics() ReplicateQueueMetrics { @@ -139,6 +153,8 @@ func makeReplicateQueueMetrics() ReplicateQueueMetrics { RemoveLearnerReplicaCount: metric.NewCounter(metaReplicateQueueRemoveLearnerReplicaCount), RebalanceReplicaCount: metric.NewCounter(metaReplicateQueueRebalanceReplicaCount), TransferLeaseCount: metric.NewCounter(metaReplicateQueueTransferLeaseCount), + NonVoterPromotionsCount: metric.NewCounter(metaReplicateQueueNonVoterPromotionsCount), + VoterDemotionsCount: metric.NewCounter(metaReplicateQueueVoterDemotionsCount), } } @@ -236,8 +252,7 @@ func (rq *replicateQueue) shouldQueue( if !rq.store.TestingKnobs().DisableReplicaRebalancing { rangeUsageInfo := rangeUsageInfoForRepl(repl) - _, _, _, ok := rq.allocator.RebalanceTarget( - ctx, zone, repl.RaftStatus(), voterReplicas, rangeUsageInfo, storeFilterThrottled) + _, _, _, ok := rq.allocator.RebalanceVoter(ctx, zone, repl.RaftStatus(), voterReplicas, nil, rangeUsageInfo, storeFilterThrottled) if ok { log.VEventf(ctx, 2, "rebalance target found, enqueuing") return true, 0 @@ -407,7 +422,7 @@ func (rq *replicateQueue) processOneChange( case AllocatorRemoveLearner: return rq.removeLearner(ctx, repl, dryRun) case AllocatorConsiderRebalance: - return rq.considerRebalance(ctx, repl, voterReplicas, canTransferLease, dryRun) + return rq.considerRebalance(ctx, repl, voterReplicas, nonVoterReplicas, canTransferLease, dryRun) case AllocatorFinalizeAtomicReplicationChange: _, err := maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, repl.store, repl.Desc()) // Requeue because either we failed to transition out of a joint state @@ -423,10 +438,9 @@ func (rq *replicateQueue) processOneChange( // existingVoters and specifies which voter to replace with a new one. // // The method preferably issues an atomic replica swap, but may not be able to -// do this in all cases, such as when atomic replication changes are not -// available, or when the range consists of a single replica. As a fall back, -// only the addition is carried out; the removal is then a follow-up step for -// the next scanner cycle. +// do this in all cases, such as when the range consists of a single replica. As +// a fall back, only the addition is carried out; the removal is then a +// follow-up step for the next scanner cycle. func (rq *replicateQueue) addOrReplaceVoters( ctx context.Context, repl *Replica, @@ -467,11 +481,10 @@ func (rq *replicateQueue) addOrReplaceVoters( } desc, zone := repl.DescAndZone() - // Allocate a target assuming that the replica we're replacing (if any) is - // already gone. The allocator should not try to re-add this replica since - // there is a reason we're removing it (i.e. dead or decommissioning). If we - // left the replica in the slice, the allocator would not be guaranteed to - // pick a replica that fills the gap removeRepl leaves once it's gone. + // The allocator should not try to re-add this replica since there is a reason + // we're removing it (i.e. dead or decommissioning). If we left the replica in + // the slice, the allocator would not be guaranteed to pick a replica that + // fills the gap removeRepl leaves once it's gone. newStore, details, err := rq.allocator.AllocateVoter(ctx, zone, remainingLiveVoters, remainingLiveNonVoters) if err != nil { return false, err @@ -517,7 +530,24 @@ func (rq *replicateQueue) addOrReplaceVoters( } } rq.metrics.AddReplicaCount.Inc(1) - ops := roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newReplica) + + // Figure out whether we should be promoting an existing non-voting replica to + // a voting replica or if we ought to be adding a voter afresh. + var ops []roachpb.ReplicationChange + replDesc, found := desc.GetReplicaDescriptor(newReplica.StoreID) + if found { + if replDesc.GetType() != roachpb.NON_VOTER { + return false, errors.AssertionFailedf("allocation target %s for a voter"+ + " already has an unexpected replica: %s", newReplica, replDesc) + } + // If the allocation target has a non-voter already, we will promote it to a + // voter. + rq.metrics.NonVoterPromotionsCount.Inc(1) + ops = roachpb.ReplicationChangesForPromotion(newReplica) + } else { + ops = roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newReplica) + } + if removeIdx < 0 { log.VEventf(ctx, 1, "adding replica %+v: %s", newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) @@ -526,6 +556,12 @@ func (rq *replicateQueue) addOrReplaceVoters( removeReplica := existingVoters[removeIdx] log.VEventf(ctx, 1, "replacing replica %s with %+v: %s", removeReplica, newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) + // NB: We may have performed a promotion of a non-voter above, but we will + // not perform a demotion here and instead just remove the existing replica + // entirely. This is because we know that the `removeReplica` is either dead + // or decommissioning (see `Allocator.computeAction`) . This means that + // after this allocation is executed, we could be one non-voter short. This + // will be handled by the replicateQueue's next attempt at this range. ops = append(ops, roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, roachpb.ReplicationTarget{ StoreID: removeReplica.StoreID, @@ -742,6 +778,11 @@ func (rq *replicateQueue) removeVoter( StoreID: removeVoter.StoreID, } desc, _ := repl.DescAndZone() + // TODO(aayush): Directly removing the voter here is a bit of a missed + // opportunity since we could potentially be 1 non-voter short and the + // `target` could be a valid store for a non-voter. In such a scenario, we + // could save a bunch of work by just performing an atomic demotion of a + // voter. if err := rq.changeReplicas( ctx, repl, @@ -910,20 +951,41 @@ func (rq *replicateQueue) removeLearner( func (rq *replicateQueue) considerRebalance( ctx context.Context, repl *Replica, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, canTransferLease func() bool, dryRun bool, ) (requeue bool, _ error) { desc, zone := repl.DescAndZone() - // The Noop case will result if this replica was queued in order to - // rebalance. Attempt to find a rebalancing target. + rebalanceTargetType := voterTarget if !rq.store.TestingKnobs().DisableReplicaRebalancing { rangeUsageInfo := rangeUsageInfoForRepl(repl) - addTarget, removeTarget, details, ok := rq.allocator.RebalanceTarget( - ctx, zone, repl.RaftStatus(), existingReplicas, rangeUsageInfo, - storeFilterThrottled) + addTarget, removeTarget, details, ok := rq.allocator.RebalanceVoter( + ctx, + zone, + repl.RaftStatus(), + existingVoters, + existingNonVoters, + rangeUsageInfo, + storeFilterThrottled, + ) + if !ok { + // If there was nothing to do for the set of voting replicas on this + // range, attempt to rebalance non-voters. + log.VEventf(ctx, 1, "no suitable rebalance target for voters") + addTarget, removeTarget, details, ok = rq.allocator.RebalanceNonVoter( + ctx, + zone, + repl.RaftStatus(), + existingVoters, + existingNonVoters, + rangeUsageInfo, + storeFilterThrottled, + ) + rebalanceTargetType = nonVoterTarget + } + if !ok { - log.VEventf(ctx, 1, "no suitable rebalance target") + log.VEventf(ctx, 1, "no suitable rebalance target for non-voters") } else if done, err := rq.maybeTransferLeaseAway( ctx, repl, removeTarget.StoreID, dryRun, canTransferLease, ); err != nil { @@ -932,46 +994,21 @@ func (rq *replicateQueue) considerRebalance( // Lease is now elsewhere, so we're not in charge any more. return false, nil } else { - // We have a replica to remove and one we can add, so let's swap them - // out. - chgs := []roachpb.ReplicationChange{ - // NB: we place the addition first because in the case of - // atomic replication changes being turned off, the changes - // will be executed individually in the order in which they - // appear. - {Target: addTarget, ChangeType: roachpb.ADD_VOTER}, - {Target: removeTarget, ChangeType: roachpb.REMOVE_VOTER}, - } - - if len(existingReplicas) == 1 { - // If there's only one replica, the removal target is the - // leaseholder and this is unsupported and will fail. However, - // this is also the only way to rebalance in a single-replica - // range. If we try the atomic swap here, we'll fail doing - // nothing, and so we stay locked into the current distribution - // of replicas. (Note that maybeTransferLeaseAway above will not - // have found a target, and so will have returned (false, nil). - // - // Do the best thing we can, which is carry out the addition - // only, which should succeed, and the next time we touch this - // range, we will have one more replica and hopefully it will - // take the lease and remove the current leaseholder. - // - // It's possible that "rebalancing deadlock" can occur in other - // scenarios, it's really impossible to tell from the code given - // the constraints we support. However, the lease transfer often - // does not happen spuriously, and we can't enter dangerous - // configurations sporadically, so this code path is only hit - // when we know it's necessary, picking the smaller of two evils. - // - // See https://github.com/cockroachdb/cockroach/issues/40333. - chgs = chgs[:1] - log.VEventf(ctx, 1, "can't swap replica due to lease; falling back to add") + // If we have a valid rebalance action (ok == true) and we haven't + // transferred our lease away, execute the rebalance. + chgs, err := rq.replicationChangesForRebalance(ctx, desc, len(existingVoters), addTarget, + removeTarget, rebalanceTargetType) + if err != nil { + return false, err } - rq.metrics.RebalanceReplicaCount.Inc(1) - log.VEventf(ctx, 1, "rebalancing %+v to %+v: %s", - removeTarget, addTarget, rangeRaftProgress(repl.RaftStatus(), existingReplicas)) + log.VEventf(ctx, + 1, + "rebalancing %s %+v to %+v: %s", + rebalanceTargetType, + removeTarget, + addTarget, + rangeRaftProgress(repl.RaftStatus(), existingVoters)) if err := rq.changeReplicas( ctx, @@ -1009,6 +1046,98 @@ func (rq *replicateQueue) considerRebalance( }, ) return false, err + +} + +// replicationChangesForRebalance returns a list of ReplicationChanges to +// execute for a rebalancing decision made by the allocator. +func (rq *replicateQueue) replicationChangesForRebalance( + ctx context.Context, + desc *roachpb.RangeDescriptor, + numExistingVoters int, + addTarget roachpb.ReplicationTarget, + removeTarget roachpb.ReplicationTarget, + rebalanceTargetType targetReplicaType, +) (chgs []roachpb.ReplicationChange, err error) { + if rebalanceTargetType == voterTarget && numExistingVoters == 1 { + // If there's only one replica, the removal target is the + // leaseholder and this is unsupported and will fail. However, + // this is also the only way to rebalance in a single-replica + // range. If we try the atomic swap here, we'll fail doing + // nothing, and so we stay locked into the current distribution + // of replicas. (Note that maybeTransferLeaseAway above will not + // have found a target, and so will have returned (false, nil). + // + // Do the best thing we can, which is carry out the addition + // only, which should succeed, and the next time we touch this + // range, we will have one more replica and hopefully it will + // take the lease and remove the current leaseholder. + // + // It's possible that "rebalancing deadlock" can occur in other + // scenarios, it's really impossible to tell from the code given + // the constraints we support. However, the lease transfer often + // does not happen spuriously, and we can't enter dangerous + // configurations sporadically, so this code path is only hit + // when we know it's necessary, picking the smaller of two evils. + // + // See https://github.com/cockroachdb/cockroach/issues/40333. + chgs = []roachpb.ReplicationChange{ + {ChangeType: roachpb.ADD_VOTER, Target: addTarget}, + } + log.VEventf(ctx, 1, "can't swap replica due to lease; falling back to add") + return chgs, err + } + + rdesc, found := desc.GetReplicaDescriptor(addTarget.StoreID) + switch rebalanceTargetType { + case voterTarget: + // Check if the target being added already has a non-voting replica. + if found && rdesc.GetType() == roachpb.NON_VOTER { + // If the receiving store already has a non-voting replica, we *must* + // execute a swap between that non-voting replica and the voting replica + // we're trying to move to it. This swap is executed atomically via + // joint-consensus. + // + // NB: Since voting replicas abide by both the overall `constraints` and + // the `voter_constraints`, it is copacetic to make this swap since: + // + // 1. `addTarget` must already be a valid target for a voting replica + // (i.e. it must already satisfy both *constraints fields) since + // `Allocator.RebalanceVoter` just handed it to us. + // 2. `removeTarget` may or may not be a valid target for a non-voting + // replica, but `considerRebalance` takes care to `requeue` the current + // replica into the replicateQueue. So we expect the replicateQueue's next + // attempt at rebalancing this range to rebalance the non-voter if it ends + // up being in violation of the range's constraints. + rq.metrics.NonVoterPromotionsCount.Inc(1) + rq.metrics.VoterDemotionsCount.Inc(1) + promo := roachpb.ReplicationChangesForPromotion(addTarget) + demo := roachpb.ReplicationChangesForDemotion(removeTarget) + chgs = append(promo, demo...) + } else if found { + return nil, errors.AssertionFailedf("programming error:"+ + " store being rebalanced to(%s) already has a voting replica", addTarget.StoreID) + } else { + // We have a replica to remove and one we can add, so let's swap them out. + chgs = []roachpb.ReplicationChange{ + {ChangeType: roachpb.ADD_VOTER, Target: addTarget}, + {ChangeType: roachpb.REMOVE_VOTER, Target: removeTarget}, + } + } + case nonVoterTarget: + if found { + // Non-voters should not consider any of the range's existing stores as + // valid candidates. If we get here, we must have raced with another + // rebalancing decision. + return nil, errors.AssertionFailedf("invalid rebalancing decision: trying to"+ + " move non-voter to a store that already has a replica %s for the range", rdesc) + } + chgs = []roachpb.ReplicationChange{ + {ChangeType: roachpb.ADD_NON_VOTER, Target: addTarget}, + {ChangeType: roachpb.REMOVE_NON_VOTER, Target: removeTarget}, + } + } + return chgs, nil } type transferLeaseOptions struct { diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index c941307eb90c..e46de78df5a4 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -16,6 +16,8 @@ import ( "encoding/json" "fmt" "math" + "math/rand" + "strconv" "strings" "testing" "time" @@ -32,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -287,6 +290,17 @@ func TestReplicateQueueDownReplicate(t *testing.T) { } } +func scanAndGetNumNonVoters( + t *testing.T, tc *testcluster.TestCluster, store *kvserver.Store, scratchKey roachpb.Key, +) int { + // Nudge the replicateQueue to up/down-replicate our scratch range. + if err := store.ForceReplicationScanAndProcess(); err != nil { + t.Fatal(err) + } + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + return len(scratchRange.Replicas().NonVoterDescriptors()) +} + // TestReplicateQueueUpAndDownReplicateNonVoters is an end-to-end test ensuring // that the replicateQueue will add or remove non-voter(s) to a range based on // updates to its zone configuration. @@ -320,18 +334,9 @@ func TestReplicateQueueUpAndDownReplicateNonVoters(t *testing.T) { tc.Server(0).GetFirstStoreID()) require.NoError(t, err) - get := func() int { - // Nudge the replicateQueue to up/down-replicate our scratch range. - if err := store.ForceReplicationScanAndProcess(); err != nil { - t.Fatal(err) - } - scratchRange := tc.LookupRangeOrFatal(t, scratchKey) - return len(scratchRange.Replicas().NonVoterDescriptors()) - } - var expectedNonVoterCount = 2 testutils.SucceedsSoon(t, func() error { - if found := get(); found != expectedNonVoterCount { + if found := scanAndGetNumNonVoters(t, tc, store, scratchKey); found != expectedNonVoterCount { return errors.Errorf("expected upreplication to %d non-voters; found %d", expectedNonVoterCount, found) } @@ -344,7 +349,7 @@ func TestReplicateQueueUpAndDownReplicateNonVoters(t *testing.T) { require.NoError(t, err) expectedNonVoterCount = 0 testutils.SucceedsSoon(t, func() error { - if found := get(); found != expectedNonVoterCount { + if found := scanAndGetNumNonVoters(t, tc, store, scratchKey); found != expectedNonVoterCount { return errors.Errorf("expected downreplication to %d non-voters; found %d", expectedNonVoterCount, found) } @@ -352,6 +357,146 @@ func TestReplicateQueueUpAndDownReplicateNonVoters(t *testing.T) { }) } +// TestReplicateQueueSwapVoterWithNonVoters tests that voting replicas can +// rebalance to stores that already have a non-voter by "swapping" with them. +// "Swapping" in this context means simply changing the `ReplicaType` on the +// receiving store from non-voter to voter and changing it on the other side +// from voter to non-voter. +func TestReplicateQueueSwapVotersWithNonVoters(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + serverArgs := make(map[int]base.TestServerArgs) + // Assign each store a rack number so we can constrain individual voting and + // non-voting replicas to them. + for i := 1; i <= 5; i++ { + serverArgs[i-1] = base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "rack", Value: strconv.Itoa(i), + }, + }, + }, + } + } + clusterArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + ServerArgsPerNode: serverArgs, + } + + synthesizeRandomConstraints := func() ( + constraints string, voterStores, nonVoterStores []roachpb.StoreID, + ) { + storeList := []roachpb.StoreID{1, 2, 3, 4, 5} + // Shuffle the list of stores and designate the first 3 as voters and the + // rest as non-voters. + rand.Shuffle(5, func(i, j int) { + storeList[i], storeList[j] = storeList[j], storeList[i] + }) + voterStores = storeList[:3] + nonVoterStores = storeList[3:5] + + var overallConstraints, voterConstraints []string + for _, store := range nonVoterStores { + overallConstraints = append(overallConstraints, fmt.Sprintf(`"+rack=%d": 1`, store)) + } + for _, store := range voterStores { + voterConstraints = append(voterConstraints, fmt.Sprintf(`"+rack=%d": 1`, store)) + } + return fmt.Sprintf( + "ALTER RANGE default CONFIGURE ZONE USING num_replicas = 5, num_voters = 3,"+ + " constraints = '{%s}', voter_constraints = '{%s}'", + strings.Join(overallConstraints, ","), strings.Join(voterConstraints, ","), + ), voterStores, nonVoterStores + } + + tc := testcluster.StartTestCluster(t, 5, clusterArgs) + defer tc.Stopper().Stop(context.Background()) + + scratchKey := tc.ScratchRange(t) + // Start with 1 voter and 4 non-voters. This ensures that we also exercise the + // swapping behavior during voting replica allocation when we upreplicate to 3 + // voters after calling `synthesizeRandomConstraints` below. See comment + // inside `allocateTargetFromList`. + _, err := tc.ServerConn(0).Exec("ALTER RANGE default CONFIGURE ZONE USING" + + " num_replicas=5, num_voters=1") + require.NoError(t, err) + testutils.SucceedsSoon(t, func() error { + if err := forceScanOnAllReplicationQueues(tc); err != nil { + return err + } + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + if voters := scratchRange.Replicas().VoterDescriptors(); len(voters) != 1 { + return errors.Newf("expected 1 voter; got %v", voters) + } + if nonVoters := scratchRange.Replicas().NonVoterDescriptors(); len(nonVoters) != 4 { + return errors.Newf("expected 4 non-voters; got %v", nonVoters) + } + return nil + }) + + checkRelocated := func(t *testing.T, voterStores, nonVoterStores []roachpb.StoreID) { + testutils.SucceedsSoon(t, func() error { + if err := forceScanOnAllReplicationQueues(tc); err != nil { + return err + } + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + if n := len(scratchRange.Replicas().VoterDescriptors()); n != 3 { + return errors.Newf("number of voters %d does not match expectation", n) + } + if n := len(scratchRange.Replicas().NonVoterDescriptors()); n != 2 { + return errors.Newf("number of non-voters %d does not match expectation", n) + } + + // Check that each replica set is present on the stores designated by + // synthesizeRandomConstraints. + for _, store := range voterStores { + replDesc, ok := scratchRange.GetReplicaDescriptor(store) + if !ok { + return errors.Newf("no replica found on store %d", store) + } + if typ := replDesc.GetType(); typ != roachpb.VOTER_FULL { + return errors.Newf("replica on store %d does not match expectation;"+ + " expected VOTER_FULL, got %s", typ) + } + } + for _, store := range nonVoterStores { + replDesc, ok := scratchRange.GetReplicaDescriptor(store) + if !ok { + return errors.Newf("no replica found on store %d", store) + } + if typ := replDesc.GetType(); typ != roachpb.NON_VOTER { + return errors.Newf("replica on store %d does not match expectation;"+ + " expected NON_VOTER, got %s", typ) + } + } + return nil + }) + } + + var numIterations = 10 + if util.RaceEnabled { + numIterations = 1 + } + for i := 0; i < numIterations; i++ { + // Generate random (but valid) constraints for the 3 voters and 2 non_voters + // and check that the replicate queue achieves conformance. + // + // NB: `synthesizeRandomConstraints` sets up the default zone configs such + // that every range should have 3 voting replica and 2 non-voting replicas. + // The crucial thing to note here is that we have 5 stores and 5 replicas, + // and since we never allow a single store to have >1 replica for a range at + // any given point, any change in the configuration of these 5 replicas + // _must_ go through atomic non-voter promotions and voter demotions. + alterStatement, voterStores, nonVoterStores := synthesizeRandomConstraints() + log.Infof(ctx, "applying: %s", alterStatement) + _, err := tc.ServerConn(0).Exec(alterStatement) + require.NoError(t, err) + checkRelocated(t, voterStores, nonVoterStores) + } +} + // queryRangeLog queries the range log. The query must be of type: // `SELECT info from system.rangelog ...`. func queryRangeLog( @@ -410,6 +555,15 @@ func toggleReplicationQueues(tc *testcluster.TestCluster, active bool) { } } +func forceScanOnAllReplicationQueues(tc *testcluster.TestCluster) (err error) { + for _, s := range tc.Servers { + err = s.Stores().VisitStores(func(store *kvserver.Store) error { + return store.ForceReplicationScanAndProcess() + }) + } + return err +} + func toggleSplitQueues(tc *testcluster.TestCluster, active bool) { for _, s := range tc.Servers { _ = s.Stores().VisitStores(func(store *kvserver.Store) error { diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 6d061a5145da..62cc30a12155 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -384,11 +384,11 @@ func (sp *StorePool) updateLocalStoreAfterRebalance( return } switch changeType { - case roachpb.ADD_VOTER: + case roachpb.ADD_VOTER, roachpb.ADD_NON_VOTER: detail.desc.Capacity.RangeCount++ detail.desc.Capacity.LogicalBytes += rangeUsageInfo.LogicalBytes detail.desc.Capacity.WritesPerSecond += rangeUsageInfo.WritesPerSecond - case roachpb.REMOVE_VOTER: + case roachpb.REMOVE_VOTER, roachpb.REMOVE_NON_VOTER: detail.desc.Capacity.RangeCount-- if detail.desc.Capacity.LogicalBytes <= rangeUsageInfo.LogicalBytes { detail.desc.Capacity.LogicalBytes = 0 @@ -400,6 +400,8 @@ func (sp *StorePool) updateLocalStoreAfterRebalance( } else { detail.desc.Capacity.WritesPerSecond -= rangeUsageInfo.WritesPerSecond } + default: + return } sp.detailsMu.storeDetails[storeID] = &detail } diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index e0a054c8c551..baa2d8d6b164 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1397,6 +1397,22 @@ func MakeReplicationChanges( return chgs } +// ReplicationChangesForPromotion returns the replication changes that +// correspond to the promotion of a non-voter to a voter. +func ReplicationChangesForPromotion(target ReplicationTarget) []ReplicationChange { + return []ReplicationChange{ + {ChangeType: ADD_VOTER, Target: target}, {ChangeType: REMOVE_NON_VOTER, Target: target}, + } +} + +// ReplicationChangesForDemotion returns the replication changes that correspond +// to the demotion of a voter to a non-voter. +func ReplicationChangesForDemotion(target ReplicationTarget) []ReplicationChange { + return []ReplicationChange{ + {ChangeType: ADD_NON_VOTER, Target: target}, {ChangeType: REMOVE_VOTER, Target: target}, + } +} + // AddChanges adds a batch of changes to the request in a backwards-compatible // way. func (acrr *AdminChangeReplicasRequest) AddChanges(chgs ...ReplicationChange) { diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 4c0366fa47b6..6fd3baacec13 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1466,9 +1466,17 @@ var charts = []sectionDescription{ Metrics: []string{"queue.replicate.purgatory"}, }, { - Title: "Reblance Count", + Title: "Rebalance Count", Metrics: []string{"queue.replicate.rebalancereplica"}, }, + { + Title: "Demotions of Voters to Non Voters", + Metrics: []string{"queue.replicate.voterdemotions"}, + }, + { + Title: "Promotions of Non Voters to Voters", + Metrics: []string{"queue.replicate.nonvoterpromotions"}, + }, { Title: "Remove Replica Count", Metrics: []string{