diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index f516a6ce4d79..0e0b73598a65 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -123,18 +123,16 @@ const ( ) var allocatorActionNames = map[AllocatorAction]string{ - AllocatorNoop: "noop", - AllocatorRemoveVoter: "remove voter", - AllocatorRemoveNonVoter: "remove non-voter", - AllocatorAddVoter: "add voter", - AllocatorAddNonVoter: "add non-voter", - AllocatorReplaceDeadVoter: "replace dead voter", - AllocatorRemoveDeadVoter: "remove dead voter", - AllocatorReplaceDecommissioningVoter: "replace decommissioning voter", - AllocatorRemoveDecommissioningVoter: "remove decommissioning voter", - AllocatorRemoveLearner: "remove learner", - // TODO(aayush): Rationalize whether or not rebalancing of non-voters needs to - // be dictated by a distinct allocator action. + AllocatorNoop: "noop", + AllocatorRemoveVoter: "remove voter", + AllocatorRemoveNonVoter: "remove non-voter", + AllocatorAddVoter: "add voter", + AllocatorAddNonVoter: "add non-voter", + AllocatorReplaceDeadVoter: "replace dead voter", + AllocatorRemoveDeadVoter: "remove dead voter", + AllocatorReplaceDecommissioningVoter: "replace decommissioning voter", + AllocatorRemoveDecommissioningVoter: "remove decommissioning voter", + AllocatorRemoveLearner: "remove learner", AllocatorConsiderRebalance: "consider rebalance", AllocatorRangeUnavailable: "range unavailable", AllocatorFinalizeAtomicReplicationChange: "finalize conf change", @@ -155,9 +153,9 @@ const ( func (t targetReplicaType) String() string { switch typ := t; typ { case voterTarget: - return "voters" + return "voter" case nonVoterTarget: - return "non-voters" + return "non-voter" default: panic(fmt.Sprintf("unknown targetReplicaType %d", typ)) } @@ -559,6 +557,10 @@ func (a *Allocator) computeAction( } // Non-voting replica removal. + + // TODO(aayush): Handle removal/replacement of decommissioning/dead non-voting + // replicas. + if haveNonVoters > neededNonVoters { // Like above, the range is over-replicated but should remove a non-voter. priority := removeExtraNonVoterPriority @@ -571,6 +573,41 @@ func (a *Allocator) computeAction( return AllocatorConsiderRebalance, 0 } +// getReplicasForDiversityCalc returns the set of replica descriptors that +// should be used for computing the diversity scores for a target when +// allocating/removing/rebalancing a replica of `targetType`. +func getReplicasForDiversityCalc( + targetType targetReplicaType, existingVoters, allExistingReplicas []roachpb.ReplicaDescriptor, +) []roachpb.ReplicaDescriptor { + switch t := targetType; t { + case voterTarget: + // When computing the "diversity score" for a given store for a voting + // replica allocation/rebalance/removal, we consider the localities of only + // the stores that contain a voting replica for the range. + // + // Note that if we were to consider all stores that have any kind of replica + // for the range, voting replica allocation would be disincentivized to pick + // stores that (partially or fully) share locality hierarchies with stores + // that contain a non-voting replica. This is undesirable because this could + // inadvertently reduce the fault-tolerance of the range in cases like the + // following: + // + // Consider 3 regions (A, B, C), each with 2 AZs. Suppose that regions A and + // B have a voting replica each, whereas region C has a non-voting replica. + // In cases like these, we would want region C to be picked over regions A + // and B for allocating a new third voting replica since that improves our + // fault tolerance to the greatest degree. + // In the counterfactual (i.e. if we were to compute diversity scores based + // off of all `existingReplicas`), regions A, B, and C would all be equally + // likely to get a new voting replica. + return existingVoters + case nonVoterTarget: + return allExistingReplicas + default: + panic(fmt.Sprintf("unsupported targetReplicaType: %v", t)) + } +} + type decisionDetails struct { Target string Existing string `json:",omitempty"` @@ -650,52 +687,31 @@ func (a *Allocator) allocateTargetFromList( existingVoters, zone.GetNumVoters(), zone.VoterConstraints) var constraintsChecker constraintsCheckFn - var replicaSetForDiversityCalc []roachpb.ReplicaDescriptor switch t := targetType; t { case voterTarget: constraintsChecker = voterConstraintsCheckerForAllocation( analyzedOverallConstraints, analyzedVoterConstraints, ) - // When computing the "diversity score" for a given store for a voting - // replica allocation, we consider the localities of only the stores that - // contain a voting replica for the range. - // - // Note that if we were to consider all stores that have any kind of replica - // for the range, voting replica allocation would be disincentivized to pick - // stores that (partially or fully) share locality hierarchies with stores - // that contain a non-voting replica. This is undesirable because this could - // inadvertently reduce the fault-tolerance of the range in cases like the - // following: - // - // Consider 3 regions (A, B, C), each with 2 AZs. Suppose that regions A and - // B have a voting replica each, whereas region C has a non-voting replica. - // In cases like these, we would want region C to be picked over regions A - // and B for allocating a new third voting replica since that improves our - // fault tolerance to the greatest degree. - // In the counterfactual (i.e. if we were to compute diversity scores based - // off of all `existingReplicas`), regions A, B, and C would all be equally - // likely to get a new voting replica. - replicaSetForDiversityCalc = existingVoters case nonVoterTarget: constraintsChecker = nonVoterConstraintsCheckerForAllocation(analyzedOverallConstraints) - replicaSetForDiversityCalc = existingReplicas default: log.Fatalf(ctx, "unsupported targetReplicaType: %v", t) } + // We'll consider the targets that have a non-voter as feasible + // relocation/up-replication targets for existing/new voting replicas, since + // we always want voter constraint conformance to take precedence over + // non-voters. For instance, in cases where we can only satisfy constraints + // for either 1 voter or 1 non-voter, we want the voter to be able to displace + // the non-voter. + existingReplicaSet := getReplicasForDiversityCalc(targetType, existingVoters, existingReplicas) candidates := rankedCandidateListForAllocation( ctx, candidateStores, constraintsChecker, - // TODO(aayush): We currently rule out all the stores that have any replica - // for the given range. However, we want to get to a place where we'll - // consider the targets that have a non-voter as feasible - // relocation/up-replication targets for existing/new voting replicas, since - // it is cheaper to promote a non-voter than it is to add a new voter via - // the LEARNER->VOTER_FULL path. - existingReplicas, - a.storePool.getLocalitiesByStore(replicaSetForDiversityCalc), + existingReplicaSet, + a.storePool.getLocalitiesByStore(existingReplicaSet), a.storePool.isNodeReadyForRoutineReplicaTransfer, options) @@ -713,26 +729,45 @@ func (a *Allocator) allocateTargetFromList( return nil, "" } -// TODO(aayush): Generalize this to work for non-voting replicas as well. func (a Allocator) simulateRemoveTarget( ctx context.Context, targetStore roachpb.StoreID, zone *zonepb.ZoneConfig, candidates []roachpb.ReplicaDescriptor, existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, rangeUsageInfo RangeUsageInfo, + targetType targetReplicaType, ) (roachpb.ReplicaDescriptor, string, error) { // Update statistics first // TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines, // but as of October 2017 calls to the Allocator are mostly serialized by the ReplicateQueue // (with the main exceptions being Scatter and the status server's allocator debug endpoint). // Try to make this interfere less with other callers. - a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_VOTER) - defer func() { - a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.REMOVE_VOTER) - }() - log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore) - return a.RemoveVoter(ctx, zone, candidates, existingVoters, nil) + switch t := targetType; t { + case voterTarget: + a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_VOTER) + defer a.storePool.updateLocalStoreAfterRebalance( + targetStore, + rangeUsageInfo, + roachpb.REMOVE_VOTER, + ) + log.VEventf(ctx, 3, "simulating which voter would be removed after adding s%d", + targetStore) + return a.RemoveVoter(ctx, zone, candidates, existingVoters, existingNonVoters) + case nonVoterTarget: + a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_NON_VOTER) + defer a.storePool.updateLocalStoreAfterRebalance( + targetStore, + rangeUsageInfo, + roachpb.REMOVE_NON_VOTER, + ) + log.VEventf(ctx, 3, "simulating which non-voter would be removed after adding s%d", + targetStore) + return a.RemoveNonVoter(ctx, zone, candidates, existingVoters, existingNonVoters) + default: + panic(fmt.Sprintf("unknown targetReplicaType: %s", t)) + } } func (a Allocator) removeTarget( @@ -762,7 +797,6 @@ func (a Allocator) removeTarget( options := a.scorerOptions() var constraintsChecker constraintsCheckFn - var replicaSetForDiversityCalc []roachpb.ReplicaDescriptor switch t := targetType; t { case voterTarget: // Voting replicas have to abide by both the overall `constraints` (which @@ -772,16 +806,13 @@ func (a Allocator) removeTarget( analyzedOverallConstraints, analyzedVoterConstraints, ) - // See comment inside `allocateTargetFromList` for why we compute diversity - // scores relative to only the stores of voting replicas. - replicaSetForDiversityCalc = existingVoters case nonVoterTarget: constraintsChecker = nonVoterConstraintsCheckerForRemoval(analyzedOverallConstraints) - replicaSetForDiversityCalc = existingReplicas default: log.Fatalf(ctx, "unsupported targetReplicaType: %v", t) } + replicaSetForDiversityCalc := getReplicasForDiversityCalc(targetType, existingVoters, existingReplicas) rankedCandidates := rankedCandidateListForRemoval( candidateStoreList, constraintsChecker, @@ -850,54 +881,67 @@ func (a Allocator) RemoveNonVoter( ) } -// RebalanceTarget returns a suitable store for a rebalance target with -// required attributes. Rebalance targets are selected via the same mechanism -// as AllocateVoter(), except the chosen target must follow some additional -// criteria. Namely, if chosen, it must further the goal of balancing the -// cluster. -// -// The supplied parameters are the required attributes for the range and -// information about the range being considered for rebalancing. -// -// The existing replicas modulo any store with dead replicas are candidates for -// rebalancing. Note that rebalancing is accomplished by first adding a new -// replica to the range, then removing the most undesirable replica. -// -// Simply ignoring a rebalance opportunity in the event that the target chosen -// by AllocateVoter() doesn't fit balancing criteria is perfectly fine, as -// other stores in the cluster will also be doing their probabilistic best to -// rebalance. This helps prevent a stampeding herd targeting an abnormally -// under-utilized store. -// -// The return values are, in order: -// -// 1. The target on which to add a new replica, -// 2. An existing replica to remove, -// 3. a JSON string for use in the range log, and -// 4. a boolean indicationg whether 1-3 were populated (i.e. whether a rebalance -// opportunity was found). -func (a Allocator) RebalanceTarget( +func (a Allocator) rebalanceTarget( ctx context.Context, zone *zonepb.ZoneConfig, raftStatus *raft.Status, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, rangeUsageInfo RangeUsageInfo, filter storeFilter, + targetType targetReplicaType, ) (add roachpb.ReplicationTarget, remove roachpb.ReplicationTarget, details string, ok bool) { sl, _, _ := a.storePool.getStoreList(filter) + existingReplicas := append(existingVoters, existingNonVoters...) zero := roachpb.ReplicationTarget{} - analyzedConstraints := constraint.AnalyzeConstraints( + analyzedOverallConstraints := constraint.AnalyzeConstraints( ctx, a.storePool.getStoreDescriptor, existingReplicas, *zone.NumReplicas, zone.Constraints) + analyzedVoterConstraints := constraint.AnalyzeConstraints( + ctx, a.storePool.getStoreDescriptor, existingVoters, zone.GetNumVoters(), zone.VoterConstraints) + var removalConstraintsChecker constraintsCheckFn + var rebalanceConstraintsChecker rebalanceConstraintsCheckFn + var replicaSetToRebalance, replicasWithExcludedStores []roachpb.ReplicaDescriptor + var otherReplicaSet []roachpb.ReplicaDescriptor + + switch t := targetType; t { + case voterTarget: + removalConstraintsChecker = voterConstraintsCheckerForRemoval( + analyzedOverallConstraints, + analyzedVoterConstraints, + ) + rebalanceConstraintsChecker = voterConstraintsCheckerForRebalance( + analyzedOverallConstraints, + analyzedVoterConstraints, + ) + replicaSetToRebalance = existingVoters + otherReplicaSet = existingNonVoters + case nonVoterTarget: + removalConstraintsChecker = nonVoterConstraintsCheckerForRemoval(analyzedOverallConstraints) + rebalanceConstraintsChecker = nonVoterConstraintsCheckerForRebalance(analyzedOverallConstraints) + replicaSetToRebalance = existingNonVoters + // When rebalancing non-voting replicas, we don't consider stores that + // already have voting replicas as possible candidates. Voting replicas are + // supposed to be rebalanced before non-voting replicas, and they do + // consider the non-voters' stores as possible candidates. + replicasWithExcludedStores = existingVoters + otherReplicaSet = existingVoters + default: + log.Fatalf(ctx, "unsupported targetReplicaType: %v", t) + } + options := a.scorerOptions() - results := rebalanceCandidates( + replicaSetForDiversityCalc := getReplicasForDiversityCalc(targetType, existingVoters, existingReplicas) + results := rankedCandidateListForRebalancing( ctx, sl, - analyzedConstraints, - existingReplicas, - a.storePool.getLocalitiesByStore(existingReplicas), + removalConstraintsChecker, + rebalanceConstraintsChecker, + replicaSetToRebalance, + replicasWithExcludedStores, + a.storePool.getLocalitiesByStore(replicaSetForDiversityCalc), a.storePool.isNodeReadyForRoutineReplicaTransfer, options, + targetType, ) if len(results) == 0 { @@ -924,21 +968,21 @@ func (a Allocator) RebalanceTarget( ReplicaID: maxReplicaID(existingReplicas) + 1, } // Deep-copy the Replicas slice since we'll mutate it below. - existingPlusOneNew := append([]roachpb.ReplicaDescriptor(nil), existingReplicas...) + existingPlusOneNew := append([]roachpb.ReplicaDescriptor(nil), replicaSetToRebalance...) existingPlusOneNew = append(existingPlusOneNew, newReplica) replicaCandidates := existingPlusOneNew // If we can, filter replicas as we would if we were actually removing one. // If we can't (e.g. because we're the leaseholder but not the raft leader), // it's better to simulate the removal with the info that we do have than to // assume that the rebalance is ok (#20241). - if raftStatus != nil && raftStatus.Progress != nil { + if targetType == voterTarget && raftStatus != nil && raftStatus.Progress != nil { replicaCandidates = simulateFilterUnremovableReplicas( ctx, raftStatus, replicaCandidates, newReplica.ReplicaID) } if len(replicaCandidates) == 0 { // No existing replicas are suitable to remove. - log.VEventf(ctx, 2, "not rebalancing to s%d because there are no existing "+ - "replicas that can be removed", target.store.StoreID) + log.VEventf(ctx, 2, "not rebalancing %s to s%d because there are no existing "+ + "replicas that can be removed", targetType, target.store.StoreID) return zero, zero, "", false } @@ -950,10 +994,12 @@ func (a Allocator) RebalanceTarget( zone, replicaCandidates, existingPlusOneNew, + otherReplicaSet, rangeUsageInfo, + targetType, ) if err != nil { - log.Warningf(ctx, "simulating RemoveVoter failed: %+v", err) + log.Warningf(ctx, "simulating removal of %s failed: %+v", targetType, err) return zero, zero, "", false } if target.store.StoreID != removeReplica.StoreID { @@ -988,6 +1034,83 @@ func (a Allocator) RebalanceTarget( return addTarget, removeTarget, string(detailsBytes), true } +// RebalanceVoter returns a suitable store for a rebalance target with required +// attributes. Rebalance targets are selected via the same mechanism as +// AllocateVoter(), except the chosen target must follow some additional +// criteria. Namely, if chosen, it must further the goal of balancing the +// cluster. +// +// The supplied parameters are the required attributes for the range and +// information about the range being considered for rebalancing. +// +// The existing voting replicas modulo any store with dead replicas are +// candidates for rebalancing. +// +// Simply ignoring a rebalance opportunity in the event that the target chosen +// by rankedCandidateListForRebalancing() doesn't fit balancing criteria is +// perfectly fine, as other stores in the cluster will also be doing their +// probabilistic best to rebalance. This helps prevent a stampeding herd +// targeting an abnormally under-utilized store. +// +// The return values are, in order: +// +// 1. The target on which to add a new replica, +// 2. An existing replica to remove, +// 3. a JSON string for use in the range log, and +// 4. a boolean indicationg whether 1-3 were populated (i.e. whether a rebalance +// opportunity was found). +func (a Allocator) RebalanceVoter( + ctx context.Context, + zone *zonepb.ZoneConfig, + raftStatus *raft.Status, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + rangeUsageInfo RangeUsageInfo, + filter storeFilter, +) (add roachpb.ReplicationTarget, remove roachpb.ReplicationTarget, details string, ok bool) { + return a.rebalanceTarget( + ctx, + zone, + raftStatus, + existingVoters, + existingNonVoters, + rangeUsageInfo, + filter, + voterTarget, + ) +} + +// RebalanceNonVoter returns a suitable pair of rebalance candidates for a +// non-voting replica. This behaves very similarly to `RebalanceVoter` as +// explained above. The key differences are the following: +// +// 1. Non-voting replicas only adhere to the overall `constraints` and not the +// `voter_constraints`. +// 2. We do not consider stores that have voters as valid candidates for +// rebalancing. +// 3. Diversity score calculation for non-voters is relative to all existing +// replicas. This is in contrast to how we compute the diversity scores for +// voting replicas, which are computed relative to just the set of voting +// replicas. +func (a Allocator) RebalanceNonVoter( + ctx context.Context, + zone *zonepb.ZoneConfig, + raftStatus *raft.Status, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + rangeUsageInfo RangeUsageInfo, + filter storeFilter, +) (add roachpb.ReplicationTarget, remove roachpb.ReplicationTarget, details string, ok bool) { + return a.rebalanceTarget( + ctx, + zone, + raftStatus, + existingVoters, + existingNonVoters, + rangeUsageInfo, + filter, + nonVoterTarget, + ) +} + func (a *Allocator) scorerOptions() scorerOptions { return scorerOptions{ deterministic: a.storePool.deterministic, diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index 0842aab29fb1..c7568a8d1a44 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -523,18 +523,20 @@ type rebalanceOptions struct { candidates candidateList } -// rebalanceCandidates creates two candidate lists. The first contains all -// existing replica's stores, ordered from least qualified for rebalancing to -// most qualified. The second list is of all potential stores that could be -// used as rebalancing receivers, ordered from best to worst. -func rebalanceCandidates( +// rankedCandidateListForRebalancing creates two candidate lists. The first +// contains all existing replica's stores, ordered from least qualified for +// rebalancing to most qualified. The second list is of all potential stores +// that could be used as rebalancing receivers, ordered from best to worst. +func rankedCandidateListForRebalancing( ctx context.Context, allStores StoreList, - constraints constraint.AnalyzedConstraints, - existingReplicas []roachpb.ReplicaDescriptor, + removalConstraintsChecker constraintsCheckFn, + rebalanceConstraintsChecker rebalanceConstraintsCheckFn, + existingReplicasForType, replicasWithExcludedStores []roachpb.ReplicaDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool, options scorerOptions, + replicaType targetReplicaType, ) []rebalanceOptions { // 1. Determine whether existing replicas are valid and/or necessary. existingStores := make(map[roachpb.StoreID]candidate) @@ -545,11 +547,11 @@ func rebalanceCandidates( log.VEventf(ctx, 3, "not considering non-ready node n%d for rebalance", store.Node.NodeID) continue } - for _, repl := range existingReplicas { + for _, repl := range existingReplicasForType { if store.StoreID != repl.StoreID { continue } - valid, necessary := removeConstraintsCheck(store, constraints) + valid, necessary := removalConstraintsChecker(store) fullDisk := !maxCapacityCheck(store) if !valid { if !needRebalanceFrom { @@ -617,8 +619,15 @@ func rebalanceCandidates( } var comparableCands candidateList for _, store := range allStores.stores { - constraintsOK, necessary := rebalanceFromConstraintsCheck( - store, existing.store.StoreID, constraints) + // Ignore any stores that contain any of the replicas within + // `replicasWithExcludedStores`. + for _, excluded := range replicasWithExcludedStores { + if store.StoreID == excluded.StoreID { + continue + } + } + + constraintsOK, necessary := rebalanceConstraintsChecker(store, existing.store) maxCapacityOK := maxCapacityCheck(store) diversityScore := diversityRebalanceFromScore( store, existing.store.StoreID, existingStoreLocalities) @@ -630,6 +639,7 @@ func rebalanceCandidates( diversityScore: diversityScore, } if !cand.less(existing) { + // If `cand` is not worse than `existing`, add it to the list. comparableCands = append(comparableCands, cand) if !needRebalanceFrom && !needRebalanceTo && existing.less(cand) { needRebalanceTo = true @@ -676,9 +686,12 @@ func rebalanceCandidates( } } } - // TODO(a-robinson): Some moderate refactoring could extract this logic out - // into the loop below, avoiding duplicate balanceScore calculations. - if shouldRebalance(ctx, existing.store, sl, options) { + // NB: Due to step 2 from above, we're guaranteed to have a non-empty `sl` + // at this point. + // + // TODO(a-robinson): Some moderate refactoring could extract this logic + // out into the loop below, avoiding duplicate balanceScore calculations. + if shouldRebalanceBasedOnRangeCount(ctx, existing.store, sl, options) { shouldRebalanceCheck = true break } @@ -742,7 +755,7 @@ func rebalanceCandidates( // Only consider this candidate if we must rebalance due to constraint, // disk fullness, or diversity reasons. log.VEventf(ctx, 3, "not considering %+v as a candidate for range %+v: score=%s storeList=%+v", - s, existingReplicas, cand.balanceScore, comparable.sl) + s, existingReplicasForType, cand.balanceScore, comparable.sl) continue } cand.rangeCount = int(s.Capacity.RangeCount) @@ -840,9 +853,10 @@ func betterRebalanceTarget(target1, existing1, target2, existing2 *candidate) *c return target1 } -// shouldRebalance returns whether the specified store is a candidate for -// having a replica removed from it given the candidate store list. -func shouldRebalance( +// shouldRebalanceBasedOnRangeCount returns whether the specified store is a +// candidate for having a replica removed from it given the candidate store +// list. +func shouldRebalanceBasedOnRangeCount( ctx context.Context, store roachpb.StoreDescriptor, sl StoreList, options scorerOptions, ) bool { overfullThreshold := int32(math.Ceil(overfullRangeThreshold(options, sl.candidateRanges.mean))) @@ -911,6 +925,11 @@ func sameLocalityAndAttrs(s1, s2 roachpb.StoreDescriptor) bool { // existing one. type constraintsCheckFn func(roachpb.StoreDescriptor) (valid, necessary bool) +// rebalanceConstraintsCheckFn determines whether `toStore` is a valid and/or +// necessary replacement candidate for `fromStore` (which must contain an +// existing replica). +type rebalanceConstraintsCheckFn func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool) + // voterConstraintsCheckerForAllocation returns a constraintsCheckFn that // determines whether a candidate for a new voting replica is valid and/or // necessary as per the `voter_constraints` and `constraints` on the range. @@ -978,6 +997,31 @@ func nonVoterConstraintsCheckerForRemoval( } } +// voterConstraintsCheckerForRebalance returns a rebalanceConstraintsCheckFn +// that determines whether a given store is a valid and/or necessary rebalance +// candidate from a given store of an existing voting replica. +func voterConstraintsCheckerForRebalance( + overallConstraints, voterConstraints constraint.AnalyzedConstraints, +) rebalanceConstraintsCheckFn { + return func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool) { + overallConstraintsOK, necessaryOverall := rebalanceFromConstraintsCheck(toStore, fromStore, overallConstraints) + voterConstraintsOK, necessaryForVoters := rebalanceFromConstraintsCheck(toStore, fromStore, voterConstraints) + + return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters + } +} + +// nonVoterConstraintsCheckerForRebalance returns a rebalanceConstraintsCheckFn +// that determines whether a given store is a valid and/or necessary rebalance +// candidate from a given store of an existing non-voting replica. +func nonVoterConstraintsCheckerForRebalance( + overallConstraints constraint.AnalyzedConstraints, +) rebalanceConstraintsCheckFn { + return func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool) { + return rebalanceFromConstraintsCheck(toStore, fromStore, overallConstraints) + } +} + // allocateConstraintsCheck checks the potential allocation target store // against all the constraints. If it matches a constraint at all, it's valid. // If it matches a constraint that is not already fully satisfied by existing @@ -1057,9 +1101,7 @@ func removeConstraintsCheck( // will be necessary if fromStoreID (an existing replica) is removed from the // range. func rebalanceFromConstraintsCheck( - store roachpb.StoreDescriptor, - fromStoreID roachpb.StoreID, - analyzed constraint.AnalyzedConstraints, + store, fromStoreID roachpb.StoreDescriptor, analyzed constraint.AnalyzedConstraints, ) (valid bool, necessary bool) { // All stores are valid when there are no constraints. if len(analyzed.Constraints) == 0 { @@ -1083,7 +1125,7 @@ func rebalanceFromConstraintsCheck( matchingStores := analyzed.SatisfiedBy[i] if len(matchingStores) < int(constraints.NumReplicas) || (len(matchingStores) == int(constraints.NumReplicas) && - containsStore(analyzed.SatisfiedBy[i], fromStoreID)) { + containsStore(analyzed.SatisfiedBy[i], fromStoreID.StoreID)) { return true, true } } diff --git a/pkg/kv/kvserver/allocator_scorer_test.go b/pkg/kv/kvserver/allocator_scorer_test.go index fe4c214b95f5..5110c844ab8f 100644 --- a/pkg/kv/kvserver/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator_scorer_test.go @@ -315,9 +315,9 @@ func TestBetterThan(t *testing.T) { } // TestBestRebalanceTarget constructs a hypothetical output of -// rebalanceCandidates and verifies that bestRebalanceTarget properly returns -// the candidates in the ideal order of preference and omits any that aren't -// desirable. +// rankedCandidateListForRebalancing and verifies that bestRebalanceTarget +// properly returns the candidates in the ideal order of preference and omits +// any that aren't desirable. func TestBestRebalanceTarget(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1209,18 +1209,36 @@ func TestShouldRebalanceDiversity(t *testing.T) { } } - targets := rebalanceCandidates( + removalConstraintsChecker := voterConstraintsCheckerForRemoval( + constraint.EmptyAnalyzedConstraints, + constraint.EmptyAnalyzedConstraints, + ) + rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance( + constraint.EmptyAnalyzedConstraints, + constraint.EmptyAnalyzedConstraints, + ) + targets := rankedCandidateListForRebalancing( context.Background(), filteredSL, - constraint.AnalyzedConstraints{}, + removalConstraintsChecker, + rebalanceConstraintsChecker, replicas, + nil, existingStoreLocalities, - func(context.Context, roachpb.NodeID) bool { return true }, /* isNodeValidForRoutineReplicaTransfer */ - options) + func(context.Context, roachpb.NodeID) bool { return true }, + options, + voterTarget, + ) actual := len(targets) > 0 if actual != tc.expected { - t.Errorf("%d: shouldRebalance on s%d with replicas on %v got %t, expected %t", - i, tc.s.StoreID, tc.existingNodeIDs, actual, tc.expected) + t.Errorf( + "%d: shouldRebalanceBasedOnRangeCount on s%d with replicas on %v got %t, expected %t", + i, + tc.s.StoreID, + tc.existingNodeIDs, + actual, + tc.expected, + ) } } } diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index c111d4c9cbb7..cb347dd04c06 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -86,13 +86,15 @@ var multiDCConfigUnsatisfiableVoterConstraints = zonepb.ZoneConfig{ } // multiDCConfigVoterAndNonVoter prescribes that one voting replica be placed in -// DC "a" and one non-voting replica be placed in DC "b". +// DC "b" and one non-voting replica be placed in DC "a". var multiDCConfigVoterAndNonVoter = zonepb.ZoneConfig{ NumReplicas: proto.Int32(2), Constraints: []zonepb.ConstraintsConjunction{ + // Constrain the non-voter to "a". {Constraints: []zonepb.Constraint{{Value: "a", Type: zonepb.Constraint_REQUIRED}}, NumReplicas: 1}, }, VoterConstraints: []zonepb.ConstraintsConjunction{ + // Constrain the voter to "b". {Constraints: []zonepb.Constraint{{Value: "b", Type: zonepb.Constraint_REQUIRED}}}, }, } @@ -317,6 +319,42 @@ var multiDiversityDCStores = []*roachpb.StoreDescriptor{ }, } +var oneStoreWithFullDisk = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 5, RangeCount: 600}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 600}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 600}, + }, +} + +var oneStoreWithTooManyRanges = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 5, RangeCount: 600}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 200}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 200}, + }, +} + func replicas(storeIDs ...roachpb.StoreID) []roachpb.ReplicaDescriptor { res := make([]roachpb.ReplicaDescriptor, len(storeIDs)) for i, storeID := range storeIDs { @@ -646,16 +684,9 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { { var rangeUsageInfo RangeUsageInfo - target, _, details, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - nil, /* raftStatus */ - tc.existing, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, details, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, tc.existing, nil, rangeUsageInfo, storeFilterThrottled) if e, a := tc.expectTargetRebalance, ok; e != a { - t.Errorf("RebalanceTarget(%v) got target %v, details %v; expectTarget=%v", + t.Errorf("RebalanceVoter(%v) got target %v, details %v; expectTarget=%v", tc.existing, target, details, tc.expectTargetRebalance) } } @@ -715,14 +746,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { // After that we should not be seeing replicas move. var rangeUsageInfo RangeUsageInfo for i := 1; i < 40; i++ { - add, remove, _, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - nil, /* raftStatus */ - ranges[i].InternalReplicas, - rangeUsageInfo, - storeFilterThrottled, - ) + add, remove, _, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, ranges[i].InternalReplicas, nil, rangeUsageInfo, storeFilterThrottled) if ok { // Update the descriptor. newReplicas := make([]roachpb.ReplicaDescriptor, 0, len(ranges[i].InternalReplicas)) @@ -754,14 +778,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { // We dont expect any range wanting to move since the system should have // reached a stable state at this point. for i := 1; i < 40; i++ { - _, _, _, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - nil, /* raftStatus */ - ranges[i].InternalReplicas, - rangeUsageInfo, - storeFilterThrottled, - ) + _, _, _, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, ranges[i].InternalReplicas, nil, rangeUsageInfo, storeFilterThrottled) require.False(t, ok) } } @@ -824,14 +841,7 @@ func TestAllocatorRebalance(t *testing.T) { // Every rebalance target must be either store 1 or 2. for i := 0; i < 10; i++ { var rangeUsageInfo RangeUsageInfo - target, _, _, ok := a.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, - []roachpb.ReplicaDescriptor{{NodeID: 3, StoreID: 3}}, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, _, ok := a.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: 3, StoreID: 3}}, nil, rangeUsageInfo, storeFilterThrottled) if !ok { i-- // loop until we find 10 candidates continue @@ -843,14 +853,14 @@ func TestAllocatorRebalance(t *testing.T) { } } - // Verify shouldRebalance results. + // Verify shouldRebalanceBasedOnRangeCount results. for i, store := range stores { desc, ok := a.storePool.getStoreDescriptor(store.StoreID) if !ok { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) - result := shouldRebalance(ctx, desc, sl, a.scorerOptions()) + result := shouldRebalanceBasedOnRangeCount(ctx, desc, sl, a.scorerOptions()) if expResult := (i >= 2); expResult != result { t.Errorf("%d: expected rebalance %t; got %t; desc %+v; sl: %+v", i, expResult, result, desc, sl) } @@ -870,7 +880,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { // 2 other datacenters. All of our replicas are distributed within these 3 // datacenters. Originally, the stores that are all alone in their datacenter // are fuller than the other stores. If we didn't simulate RemoveVoter in - // RebalanceTarget, we would try to choose store 2 or 3 as the target store + // RebalanceVoter, we would try to choose store 2 or 3 as the target store // to make a rebalance. However, we would immediately remove the replica on // store 1 or 2 to retain the locality diversity. stores := []*roachpb.StoreDescriptor{ @@ -975,14 +985,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { } } for i := 0; i < 10; i++ { - result, _, details, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - status, - replicas, - rangeUsageInfo, - storeFilterThrottled, - ) + result, _, details, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), status, replicas, nil, rangeUsageInfo, storeFilterThrottled) if ok { t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) } @@ -995,14 +998,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { stores[2].Capacity.RangeCount = 46 sg.GossipStores(stores, t) for i := 0; i < 10; i++ { - target, _, details, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - status, - replicas, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, details, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), status, replicas, nil, rangeUsageInfo, storeFilterThrottled) if ok { t.Fatalf("expected no rebalance, but got target s%d; details: %s", target.StoreID, details) } @@ -1012,14 +1008,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { stores[1].Capacity.RangeCount = 44 sg.GossipStores(stores, t) for i := 0; i < 10; i++ { - target, origin, details, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - status, - replicas, - rangeUsageInfo, - storeFilterThrottled, - ) + target, origin, details, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), status, replicas, nil, rangeUsageInfo, storeFilterThrottled) expTo := stores[1].StoreID expFrom := stores[0].StoreID if !ok || target.StoreID != expTo || origin.StoreID != expFrom { @@ -1088,13 +1077,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { var rangeUsageInfo RangeUsageInfo - target, _, _, ok := a.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, - c.existing, - rangeUsageInfo, - storeFilterThrottled) + target, _, _, ok := a.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, c.existing, nil, rangeUsageInfo, storeFilterThrottled) if c.expected > 0 { if !ok { t.Fatalf("expected %d, but found nil", c.expected) @@ -1230,14 +1213,14 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { }) sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) - // Verify shouldRebalance returns the expected value. + // Verify shouldRebalanceBasedOnRangeCount returns the expected value. for j, store := range stores { desc, ok := a.storePool.getStoreDescriptor(store.StoreID) if !ok { t.Fatalf("[store %d]: unable to get store %d descriptor", j, store.StoreID) } - if a, e := shouldRebalance(context.Background(), desc, sl, a.scorerOptions()), cluster[j].shouldRebalanceFrom; a != e { - t.Errorf("[store %d]: shouldRebalance %t != expected %t", store.StoreID, a, e) + if a, e := shouldRebalanceBasedOnRangeCount(context.Background(), desc, sl, a.scorerOptions()), cluster[j].shouldRebalanceFrom; a != e { + t.Errorf("[store %d]: shouldRebalanceBasedOnRangeCount %t != expected %t", store.StoreID, a, e) } } }) @@ -1284,27 +1267,20 @@ func TestAllocatorRebalanceByCount(t *testing.T) { // Every rebalance target must be store 4 (or nil for case of missing the only option). for i := 0; i < 10; i++ { var rangeUsageInfo RangeUsageInfo - result, _, _, ok := a.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, - []roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, - rangeUsageInfo, - storeFilterThrottled, - ) + result, _, _, ok := a.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, []roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, nil, rangeUsageInfo, storeFilterThrottled) if ok && result.StoreID != 4 { t.Errorf("expected store 4; got %d", result.StoreID) } } - // Verify shouldRebalance results. + // Verify shouldRebalanceBasedOnRangeCount results. for i, store := range stores { desc, ok := a.storePool.getStoreDescriptor(store.StoreID) if !ok { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } sl, _, _ := a.storePool.getStoreList(storeFilterThrottled) - result := shouldRebalance(ctx, desc, sl, a.scorerOptions()) + result := shouldRebalanceBasedOnRangeCount(ctx, desc, sl, a.scorerOptions()) if expResult := (i < 3); expResult != result { t.Errorf("%d: expected rebalance %t; got %t", i, expResult, result) } @@ -1568,20 +1544,13 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { for i, tc := range testCases { var rangeUsageInfo RangeUsageInfo - result, _, details, ok := a.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, /* raftStatus */ - tc.existing, - rangeUsageInfo, - storeFilterThrottled, - ) + result, _, details, ok := a.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, tc.existing, nil, rangeUsageInfo, storeFilterThrottled) var resultID roachpb.StoreID if ok { resultID = result.StoreID } if resultID != tc.expected { - t.Errorf("%d: RebalanceTarget(%v) expected s%d; got %v: %s", i, tc.existing, tc.expected, result, details) + t.Errorf("%d: RebalanceVoter(%v) expected s%d; got %v: %s", i, tc.existing, tc.expected, result, details) } } @@ -1638,14 +1607,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { for i, tc := range testCases2 { log.Infof(ctx, "case #%d", i) var rangeUsageInfo RangeUsageInfo - result, _, details, ok := a.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, /* raftStatus */ - tc.existing, - rangeUsageInfo, - storeFilterThrottled, - ) + result, _, details, ok := a.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, tc.existing, nil, rangeUsageInfo, storeFilterThrottled) var gotExpected bool if !ok { gotExpected = (tc.expected == nil) @@ -1658,7 +1620,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { } } if !gotExpected { - t.Errorf("%d: RebalanceTarget(%v) expected store in %v; got %v: %s", + t.Errorf("%d: RebalanceVoter(%v) expected store in %v; got %v: %s", i, tc.existing, tc.expected, result, details) } } @@ -2448,16 +2410,9 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { } } var rangeUsageInfo RangeUsageInfo - target, _, details, ok := a.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - nil, - existingRepls, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, details, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, existingRepls, nil, rangeUsageInfo, storeFilterThrottled) if !ok { - t.Fatalf("%d: RebalanceTarget(%v) returned no target store; details: %s", i, c.existing, details) + t.Fatalf("%d: RebalanceVoter(%v) returned no target store; details: %s", i, c.existing, details) } var found bool for _, storeID := range c.expected { @@ -2467,7 +2422,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { } } if !found { - t.Errorf("%d: expected RebalanceTarget(%v) in %v, but got %d; details: %s", + t.Errorf("%d: expected RebalanceVoter(%v) in %v, but got %d; details: %s", i, c.existing, c.expected, target.StoreID, details) } } @@ -2660,7 +2615,9 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { analyzed := constraint.AnalyzeConstraints( context.Background(), a.storePool.getStoreDescriptor, existingRepls, *zone.NumReplicas, zone.Constraints) - checkFn := voterConstraintsCheckerForAllocation(analyzed, constraint.EmptyAnalyzedConstraints) + allocationConstraintsChecker := voterConstraintsCheckerForAllocation(analyzed, constraint.EmptyAnalyzedConstraints) + removalConstraintsChecker := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints) + rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance(analyzed, constraint.EmptyAnalyzedConstraints) a.storePool.isNodeReadyForRoutineReplicaTransfer = func(_ context.Context, n roachpb.NodeID) bool { for _, s := range tc.excluded { @@ -2675,7 +2632,7 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { t.Run(fmt.Sprintf("%d/allocate", testIdx), func(t *testing.T) { candidates := rankedCandidateListForAllocation(context.Background(), sl, - checkFn, + allocationConstraintsChecker, existingRepls, a.storePool.getLocalitiesByStore(existingRepls), a.storePool.isNodeReadyForRoutineReplicaTransfer, @@ -2688,22 +2645,35 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { }) t.Run(fmt.Sprintf("%d/rebalance", testIdx), func(t *testing.T) { - results := rebalanceCandidates( + results := rankedCandidateListForRebalancing( context.Background(), sl, - analyzed, + removalConstraintsChecker, + rebalanceConstraintsChecker, existingRepls, + nil, a.storePool.getLocalitiesByStore(existingRepls), a.storePool.isNodeReadyForRoutineReplicaTransfer, a.scorerOptions(), + voterTarget, ) for i := range results { if !expectedStoreIDsMatch(tc.existing, results[i].existingCandidates) { - t.Errorf("results[%d]: expected existing candidates %v, got %v", i, tc.existing, results[i].existingCandidates) + t.Errorf( + "results[%d]: expected existing candidates %v, got %v", + i, + tc.existing, + results[i].existingCandidates, + ) } if !expectedStoreIDsMatch(tc.expected, results[i].candidates) { - t.Errorf("results[%d]: expected candidates %v, got %v", i, tc.expected, results[i].candidates) + t.Errorf( + "results[%d]: expected candidates %v, got %v", + i, + tc.expected, + results[i].candidates, + ) } } }) @@ -3273,6 +3243,199 @@ func expectedStoreIDsMatch(expected []roachpb.StoreID, results candidateList) bo return true } +// TestAllocatorRebalanceNonVoters tests that non-voting replicas rebalance "as +// expected". In particular, it checks the following things: +// +// 1. Non-voter rebalancing obeys the allocator's capacity based heuristics. +// 2. Non-voter rebalancing tries to ensure constraints conformance. +func TestAllocatorRebalanceNonVoters(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + type testCase struct { + name string + stores []*roachpb.StoreDescriptor + zone *zonepb.ZoneConfig + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor + expectNoAction bool + expectedRemoveTargets, expectedAddTargets []roachpb.StoreID + } + tests := []testCase{ + { + name: "no-op", + stores: multiDiversityDCStores, + zone: zonepb.EmptyCompleteZoneConfig(), + existingVoters: replicas(1), + existingNonVoters: replicas(3), + expectNoAction: true, + }, + // Test that rebalancing based on just the diversity scores works as + // expected. In particular, we expect non-voter rebalancing to compute + // diversity scores based on the entire existing replica set, and not just + // the set of non-voting replicas. + { + name: "diversity among non-voters", + stores: multiDiversityDCStores, + zone: zonepb.EmptyCompleteZoneConfig(), + existingVoters: replicas(1, 2), + existingNonVoters: replicas(3, 4, 6), + expectedRemoveTargets: []roachpb.StoreID{3, 4}, + expectedAddTargets: []roachpb.StoreID{7, 8}, + }, + { + name: "diversity among all existing replicas", + stores: multiDiversityDCStores, + zone: zonepb.EmptyCompleteZoneConfig(), + existingVoters: replicas(1), + existingNonVoters: replicas(2, 4, 6), + expectedRemoveTargets: []roachpb.StoreID{2}, + expectedAddTargets: []roachpb.StoreID{7, 8}, + }, + // Test that non-voting replicas obey the capacity / load based heuristics + // for rebalancing. + { + name: "move off of nodes with full disk", + // NB: Store 1 has a 97.5% full disk. + stores: oneStoreWithFullDisk, + zone: zonepb.EmptyCompleteZoneConfig(), + existingVoters: replicas(3), + existingNonVoters: replicas(1), + expectedRemoveTargets: []roachpb.StoreID{1}, + expectedAddTargets: []roachpb.StoreID{2}, + }, + { + name: "move off of nodes with too many ranges", + // NB: Store 1 has 3x the number of ranges as the other stores. + stores: oneStoreWithTooManyRanges, + zone: zonepb.EmptyCompleteZoneConfig(), + existingVoters: replicas(3), + existingNonVoters: replicas(1), + expectedRemoveTargets: []roachpb.StoreID{1}, + expectedAddTargets: []roachpb.StoreID{2}, + }, + // Test that `constraints` cause non-voters to move around in order to + // sustain constraints conformance. + { + name: "already on a store that satisfies constraints for non_voters", + stores: multiDCStores, + // Constrain a voter to store 2 and a non_voter to store 1. + zone: &multiDCConfigVoterAndNonVoter, + existingVoters: replicas(2), + existingNonVoters: replicas(1), + expectNoAction: true, + }, + { + name: "need to rebalance to conform to constraints", + stores: multiDCStores, + // Constrain a non_voter to store 1. + zone: &multiDCConfigVoterAndNonVoter, + existingVoters: nil, + existingNonVoters: replicas(2), + expectedRemoveTargets: []roachpb.StoreID{2}, + expectedAddTargets: []roachpb.StoreID{1}, + }, + { + // Test that non-voting replica rebalancing does not consider stores that + // have voters as valid candidates, even if those stores satisfy + // constraints. + name: "need to rebalance, but cannot because a voter already exists", + stores: multiDCStores, + zone: &multiDCConfigVoterAndNonVoter, + existingVoters: replicas(1), + existingNonVoters: replicas(2), + expectNoAction: true, + }, + } + + var rangeUsageInfo RangeUsageInfo + chk := func(target roachpb.ReplicationTarget, expectedCandidates []roachpb.StoreID) bool { + for _, candidate := range expectedCandidates { + if target.StoreID == candidate { + return true + } + } + return false + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { + stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(test.stores, t) + add, remove, _, ok := a.RebalanceNonVoter(ctx, + test.zone, + nil, + test.existingVoters, + test.existingNonVoters, + rangeUsageInfo, + storeFilterThrottled) + if test.expectNoAction { + require.True(t, !ok) + } else { + require.Truef(t, ok, "no action taken on range") + require.Truef(t, + chk(add, test.expectedAddTargets), + "the addition target %+v from RebalanceNonVoter doesn't match expectation", + add) + require.Truef(t, + chk(remove, test.expectedRemoveTargets), + "the removal target %+v from RebalanceNonVoter doesn't match expectation", + remove) + } + }) + } +} + +// TestVotersCanRebalanceToNonVoterStores ensures that rebalancing of voting +// replicas considers stores that have non-voters as feasible candidates. +func TestVotersCanRebalanceToNonVoterStores(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + defer stopper.Stop(context.Background()) + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(multiDiversityDCStores, t) + + zone := zonepb.ZoneConfig{ + NumReplicas: proto.Int32(4), + NumVoters: proto.Int32(2), + // We constrain 2 voting replicas to datacenter "a" (stores 1 and 2) but + // place non voting replicas there. In order to achieve constraints + // conformance, each of the voters must want to move to one of these stores. + VoterConstraints: []zonepb.ConstraintsConjunction{ + { + NumReplicas: 2, + Constraints: []zonepb.Constraint{ + {Type: zonepb.Constraint_REQUIRED, Key: "datacenter", Value: "a"}, + }, + }, + }, + } + + var rangeUsageInfo RangeUsageInfo + existingNonVoters := replicas(1, 2) + existingVoters := replicas(3, 4) + add, remove, _, ok := a.RebalanceVoter( + ctx, + &zone, + nil, + existingVoters, + existingNonVoters, + rangeUsageInfo, + storeFilterThrottled, + ) + + require.Truef(t, ok, "no action taken") + if !(add.StoreID == roachpb.StoreID(1) || add.StoreID == roachpb.StoreID(2)) { + t.Fatalf("received unexpected addition target %s from RebalanceVoter", add) + } + if !(remove.StoreID == roachpb.StoreID(3) || remove.StoreID == roachpb.StoreID(4)) { + t.Fatalf("received unexpected removal target %s from RebalanceVoter", remove) + } +} + func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -4041,14 +4204,26 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { analyzed := constraint.AnalyzeConstraints( context.Background(), a.storePool.getStoreDescriptor, existingRepls, *zone.NumReplicas, zone.Constraints) - results := rebalanceCandidates( + removalConstraintsChecker := voterConstraintsCheckerForRemoval( + analyzed, + constraint.EmptyAnalyzedConstraints, + ) + rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance( + analyzed, + constraint.EmptyAnalyzedConstraints, + ) + + results := rankedCandidateListForRebalancing( context.Background(), sl, - analyzed, + removalConstraintsChecker, + rebalanceConstraintsChecker, existingRepls, + nil, a.storePool.getLocalitiesByStore(existingRepls), - func(context.Context, roachpb.NodeID) bool { return true }, /* isNodeValidForRoutineReplicaTransfer */ + func(context.Context, roachpb.NodeID) bool { return true }, a.scorerOptions(), + voterTarget, ) match := true if len(tc.expected) != len(results) { @@ -4066,13 +4241,12 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { } } if !match { - t.Errorf("%d: expected rebalanceCandidates(%v) = %v, but got %v", + t.Errorf("%d: expected rankedCandidateListForRebalancing(%v) = %v, but got %v", testIdx, tc.existing, tc.expected, results) } else { - // Also verify that RebalanceTarget picks out one of the best options as + // Also verify that RebalanceVoter picks out one of the best options as // the final rebalance choice. - target, _, details, ok := a.RebalanceTarget( - context.Background(), zone, nil, existingRepls, rangeUsageInfo, storeFilterThrottled) + target, _, details, ok := a.RebalanceVoter(context.Background(), zone, nil, existingRepls, nil, rangeUsageInfo, storeFilterThrottled) var found bool if !ok && len(tc.validTargets) == 0 { found = true @@ -4084,7 +4258,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { } } if !found { - t.Errorf("%d: expected RebalanceTarget(%v) to be in %v, but got %v; details: %s", + t.Errorf("%d: expected RebalanceVoter(%v) to be in %v, but got %v; details: %s", testIdx, tc.existing, tc.validTargets, target, details) } } @@ -6155,14 +6329,7 @@ func TestAllocatorRebalanceAway(t *testing.T) { } var rangeUsageInfo RangeUsageInfo - actual, _, _, ok := a.RebalanceTarget( - ctx, - &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: []zonepb.ConstraintsConjunction{constraints}}, - nil, - existingReplicas, - rangeUsageInfo, - storeFilterThrottled, - ) + actual, _, _, ok := a.RebalanceVoter(ctx, &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: []zonepb.ConstraintsConjunction{constraints}}, nil, existingReplicas, nil, rangeUsageInfo, storeFilterThrottled) if tc.expected == nil && ok { t.Errorf("rebalancing to the incorrect store, expected nil, got %d", actual.StoreID) @@ -6322,14 +6489,7 @@ func TestAllocatorFullDisks(t *testing.T) { // Rebalance until there's no more rebalancing to do. if ts.Capacity.RangeCount > 0 { var rangeUsageInfo RangeUsageInfo - target, _, details, ok := alloc.RebalanceTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - nil, - []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, details, ok := alloc.RebalanceVoter(ctx, zonepb.EmptyCompleteZoneConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, nil, rangeUsageInfo, storeFilterThrottled) if ok { if log.V(1) { log.Infof(ctx, "rebalancing to %v; details: %s", target, details) @@ -6451,14 +6611,7 @@ func Example_rebalancing() { for j := 0; j < len(testStores); j++ { ts := &testStores[j] var rangeUsageInfo RangeUsageInfo - target, _, details, ok := alloc.RebalanceTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - nil, - []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, - rangeUsageInfo, - storeFilterThrottled, - ) + target, _, details, ok := alloc.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, nil, rangeUsageInfo, storeFilterThrottled) if ok { log.Infof(context.Background(), "rebalancing to %v; details: %s", target, details) testStores[j].rebalance(&testStores[int(target.StoreID)], alloc.randGen.Int63n(1<<20)) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 7497b3554e61..8e1bfcf78f5c 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2312,18 +2312,20 @@ func TestChangeReplicasSwapVoterWithNonVoter(t *testing.T) { firstStore, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID()) require.NoError(t, err) firstRepl := firstStore.LookupReplica(roachpb.RKey(key)) - require.NotNil(t, firstRepl, `the first node in the TestCluster must have a replica for the ScratchRange`) + require.NotNil(t, firstRepl, "the first node in the TestCluster must have a"+ + " replica for the ScratchRange") + tc.AddNonVotersOrFatal(t, key, nonVoter) // TODO(aayush): Trying to swap the last voting replica with a non-voter hits // the safeguard inside Replica.propose() as the last voting replica is always // the leaseholder. There are a bunch of subtleties around getting a // leaseholder to remove itself without another voter to immediately transfer - // the lease to. Determine if/how this needs to be fixed. - tc.AddNonVotersOrFatal(t, key, nonVoter) + // the lease to. See #40333. _, err = tc.SwapVoterWithNonVoter(key, firstVoter, nonVoter) require.Regexp(t, "received invalid ChangeReplicasTrigger", err) tc.AddVotersOrFatal(t, key, secondVoter) + tc.SwapVoterWithNonVoterOrFatal(t, key, secondVoter, nonVoter) } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index b7679d4b4d6a..0d41747a226d 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -99,6 +99,18 @@ var ( Measurement: "Lease Transfers", Unit: metric.Unit_COUNT, } + metaReplicateQueueNonVoterPromotionsCount = metric.Metadata{ + Name: "queue.replicate.nonvoterpromotions", + Help: "Number of non-voters promoted to voters by the replicate queue", + Measurement: "Promotions of Non Voters to Voters", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueVoterDemotionsCount = metric.Metadata{ + Name: "queue.replicate.voterdemotions", + Help: "Number of voters demoted to non-voters by the replicate queue", + Measurement: "Demotions of Voters to Non Voters", + Unit: metric.Unit_COUNT, + } ) // quorumError indicates a retryable error condition which sends replicas being @@ -129,6 +141,8 @@ type ReplicateQueueMetrics struct { RemoveLearnerReplicaCount *metric.Counter RebalanceReplicaCount *metric.Counter TransferLeaseCount *metric.Counter + NonVoterPromotionsCount *metric.Counter + VoterDemotionsCount *metric.Counter } func makeReplicateQueueMetrics() ReplicateQueueMetrics { @@ -139,6 +153,8 @@ func makeReplicateQueueMetrics() ReplicateQueueMetrics { RemoveLearnerReplicaCount: metric.NewCounter(metaReplicateQueueRemoveLearnerReplicaCount), RebalanceReplicaCount: metric.NewCounter(metaReplicateQueueRebalanceReplicaCount), TransferLeaseCount: metric.NewCounter(metaReplicateQueueTransferLeaseCount), + NonVoterPromotionsCount: metric.NewCounter(metaReplicateQueueNonVoterPromotionsCount), + VoterDemotionsCount: metric.NewCounter(metaReplicateQueueVoterDemotionsCount), } } @@ -236,8 +252,7 @@ func (rq *replicateQueue) shouldQueue( if !rq.store.TestingKnobs().DisableReplicaRebalancing { rangeUsageInfo := rangeUsageInfoForRepl(repl) - _, _, _, ok := rq.allocator.RebalanceTarget( - ctx, zone, repl.RaftStatus(), voterReplicas, rangeUsageInfo, storeFilterThrottled) + _, _, _, ok := rq.allocator.RebalanceVoter(ctx, zone, repl.RaftStatus(), voterReplicas, nil, rangeUsageInfo, storeFilterThrottled) if ok { log.VEventf(ctx, 2, "rebalance target found, enqueuing") return true, 0 @@ -407,7 +422,7 @@ func (rq *replicateQueue) processOneChange( case AllocatorRemoveLearner: return rq.removeLearner(ctx, repl, dryRun) case AllocatorConsiderRebalance: - return rq.considerRebalance(ctx, repl, voterReplicas, canTransferLease, dryRun) + return rq.considerRebalance(ctx, repl, voterReplicas, nonVoterReplicas, canTransferLease, dryRun) case AllocatorFinalizeAtomicReplicationChange: _, err := maybeLeaveAtomicChangeReplicasAndRemoveLearners(ctx, repl.store, repl.Desc()) // Requeue because either we failed to transition out of a joint state @@ -423,10 +438,9 @@ func (rq *replicateQueue) processOneChange( // existingVoters and specifies which voter to replace with a new one. // // The method preferably issues an atomic replica swap, but may not be able to -// do this in all cases, such as when atomic replication changes are not -// available, or when the range consists of a single replica. As a fall back, -// only the addition is carried out; the removal is then a follow-up step for -// the next scanner cycle. +// do this in all cases, such as when the range consists of a single replica. As +// a fall back, only the addition is carried out; the removal is then a +// follow-up step for the next scanner cycle. func (rq *replicateQueue) addOrReplaceVoters( ctx context.Context, repl *Replica, @@ -467,11 +481,10 @@ func (rq *replicateQueue) addOrReplaceVoters( } desc, zone := repl.DescAndZone() - // Allocate a target assuming that the replica we're replacing (if any) is - // already gone. The allocator should not try to re-add this replica since - // there is a reason we're removing it (i.e. dead or decommissioning). If we - // left the replica in the slice, the allocator would not be guaranteed to - // pick a replica that fills the gap removeRepl leaves once it's gone. + // The allocator should not try to re-add this replica since there is a reason + // we're removing it (i.e. dead or decommissioning). If we left the replica in + // the slice, the allocator would not be guaranteed to pick a replica that + // fills the gap removeRepl leaves once it's gone. newStore, details, err := rq.allocator.AllocateVoter(ctx, zone, remainingLiveVoters, remainingLiveNonVoters) if err != nil { return false, err @@ -517,7 +530,24 @@ func (rq *replicateQueue) addOrReplaceVoters( } } rq.metrics.AddReplicaCount.Inc(1) - ops := roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newReplica) + + // Figure out whether we should be promoting an existing non-voting replica to + // a voting replica or if we ought to be adding a voter afresh. + var ops []roachpb.ReplicationChange + replDesc, found := desc.GetReplicaDescriptor(newReplica.StoreID) + if found { + if replDesc.GetType() != roachpb.NON_VOTER { + return false, errors.AssertionFailedf("allocation target %s for a voter"+ + " already has an unexpected replica: %s", newReplica, replDesc) + } + // If the allocation target has a non-voter already, we will promote it to a + // voter. + rq.metrics.NonVoterPromotionsCount.Inc(1) + ops = roachpb.ReplicationChangesForPromotion(newReplica) + } else { + ops = roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newReplica) + } + if removeIdx < 0 { log.VEventf(ctx, 1, "adding replica %+v: %s", newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) @@ -526,6 +556,12 @@ func (rq *replicateQueue) addOrReplaceVoters( removeReplica := existingVoters[removeIdx] log.VEventf(ctx, 1, "replacing replica %s with %+v: %s", removeReplica, newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) + // NB: We may have performed a promotion of a non-voter above, but we will + // not perform a demotion here and instead just remove the existing replica + // entirely. This is because we know that the `removeReplica` is either dead + // or decommissioning (see `Allocator.computeAction`) . This means that + // after this allocation is executed, we could be one non-voter short. This + // will be handled by the replicateQueue's next attempt at this range. ops = append(ops, roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, roachpb.ReplicationTarget{ StoreID: removeReplica.StoreID, @@ -742,6 +778,11 @@ func (rq *replicateQueue) removeVoter( StoreID: removeVoter.StoreID, } desc, _ := repl.DescAndZone() + // TODO(aayush): Directly removing the voter here is a bit of a missed + // opportunity since we could potentially be 1 non-voter short and the + // `target` could be a valid store for a non-voter. In such a scenario, we + // could save a bunch of work by just performing an atomic demotion of a + // voter. if err := rq.changeReplicas( ctx, repl, @@ -910,20 +951,41 @@ func (rq *replicateQueue) removeLearner( func (rq *replicateQueue) considerRebalance( ctx context.Context, repl *Replica, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, canTransferLease func() bool, dryRun bool, ) (requeue bool, _ error) { desc, zone := repl.DescAndZone() - // The Noop case will result if this replica was queued in order to - // rebalance. Attempt to find a rebalancing target. + rebalanceTargetType := voterTarget if !rq.store.TestingKnobs().DisableReplicaRebalancing { rangeUsageInfo := rangeUsageInfoForRepl(repl) - addTarget, removeTarget, details, ok := rq.allocator.RebalanceTarget( - ctx, zone, repl.RaftStatus(), existingReplicas, rangeUsageInfo, - storeFilterThrottled) + addTarget, removeTarget, details, ok := rq.allocator.RebalanceVoter( + ctx, + zone, + repl.RaftStatus(), + existingVoters, + existingNonVoters, + rangeUsageInfo, + storeFilterThrottled, + ) + if !ok { + // If there was nothing to do for the set of voting replicas on this + // range, attempt to rebalance non-voters. + log.VEventf(ctx, 1, "no suitable rebalance target for voters") + addTarget, removeTarget, details, ok = rq.allocator.RebalanceNonVoter( + ctx, + zone, + repl.RaftStatus(), + existingVoters, + existingNonVoters, + rangeUsageInfo, + storeFilterThrottled, + ) + rebalanceTargetType = nonVoterTarget + } + if !ok { - log.VEventf(ctx, 1, "no suitable rebalance target") + log.VEventf(ctx, 1, "no suitable rebalance target for non-voters") } else if done, err := rq.maybeTransferLeaseAway( ctx, repl, removeTarget.StoreID, dryRun, canTransferLease, ); err != nil { @@ -932,46 +994,21 @@ func (rq *replicateQueue) considerRebalance( // Lease is now elsewhere, so we're not in charge any more. return false, nil } else { - // We have a replica to remove and one we can add, so let's swap them - // out. - chgs := []roachpb.ReplicationChange{ - // NB: we place the addition first because in the case of - // atomic replication changes being turned off, the changes - // will be executed individually in the order in which they - // appear. - {Target: addTarget, ChangeType: roachpb.ADD_VOTER}, - {Target: removeTarget, ChangeType: roachpb.REMOVE_VOTER}, - } - - if len(existingReplicas) == 1 { - // If there's only one replica, the removal target is the - // leaseholder and this is unsupported and will fail. However, - // this is also the only way to rebalance in a single-replica - // range. If we try the atomic swap here, we'll fail doing - // nothing, and so we stay locked into the current distribution - // of replicas. (Note that maybeTransferLeaseAway above will not - // have found a target, and so will have returned (false, nil). - // - // Do the best thing we can, which is carry out the addition - // only, which should succeed, and the next time we touch this - // range, we will have one more replica and hopefully it will - // take the lease and remove the current leaseholder. - // - // It's possible that "rebalancing deadlock" can occur in other - // scenarios, it's really impossible to tell from the code given - // the constraints we support. However, the lease transfer often - // does not happen spuriously, and we can't enter dangerous - // configurations sporadically, so this code path is only hit - // when we know it's necessary, picking the smaller of two evils. - // - // See https://github.com/cockroachdb/cockroach/issues/40333. - chgs = chgs[:1] - log.VEventf(ctx, 1, "can't swap replica due to lease; falling back to add") + // If we have a valid rebalance action (ok == true) and we haven't + // transferred our lease away, execute the rebalance. + chgs, err := rq.replicationChangesForRebalance(ctx, desc, len(existingVoters), addTarget, + removeTarget, rebalanceTargetType) + if err != nil { + return false, err } - rq.metrics.RebalanceReplicaCount.Inc(1) - log.VEventf(ctx, 1, "rebalancing %+v to %+v: %s", - removeTarget, addTarget, rangeRaftProgress(repl.RaftStatus(), existingReplicas)) + log.VEventf(ctx, + 1, + "rebalancing %s %+v to %+v: %s", + rebalanceTargetType, + removeTarget, + addTarget, + rangeRaftProgress(repl.RaftStatus(), existingVoters)) if err := rq.changeReplicas( ctx, @@ -1009,6 +1046,98 @@ func (rq *replicateQueue) considerRebalance( }, ) return false, err + +} + +// replicationChangesForRebalance returns a list of ReplicationChanges to +// execute for a rebalancing decision made by the allocator. +func (rq *replicateQueue) replicationChangesForRebalance( + ctx context.Context, + desc *roachpb.RangeDescriptor, + numExistingVoters int, + addTarget roachpb.ReplicationTarget, + removeTarget roachpb.ReplicationTarget, + rebalanceTargetType targetReplicaType, +) (chgs []roachpb.ReplicationChange, err error) { + if rebalanceTargetType == voterTarget && numExistingVoters == 1 { + // If there's only one replica, the removal target is the + // leaseholder and this is unsupported and will fail. However, + // this is also the only way to rebalance in a single-replica + // range. If we try the atomic swap here, we'll fail doing + // nothing, and so we stay locked into the current distribution + // of replicas. (Note that maybeTransferLeaseAway above will not + // have found a target, and so will have returned (false, nil). + // + // Do the best thing we can, which is carry out the addition + // only, which should succeed, and the next time we touch this + // range, we will have one more replica and hopefully it will + // take the lease and remove the current leaseholder. + // + // It's possible that "rebalancing deadlock" can occur in other + // scenarios, it's really impossible to tell from the code given + // the constraints we support. However, the lease transfer often + // does not happen spuriously, and we can't enter dangerous + // configurations sporadically, so this code path is only hit + // when we know it's necessary, picking the smaller of two evils. + // + // See https://github.com/cockroachdb/cockroach/issues/40333. + chgs = []roachpb.ReplicationChange{ + {ChangeType: roachpb.ADD_VOTER, Target: addTarget}, + } + log.VEventf(ctx, 1, "can't swap replica due to lease; falling back to add") + return chgs, err + } + + rdesc, found := desc.GetReplicaDescriptor(addTarget.StoreID) + switch rebalanceTargetType { + case voterTarget: + // Check if the target being added already has a non-voting replica. + if found && rdesc.GetType() == roachpb.NON_VOTER { + // If the receiving store already has a non-voting replica, we *must* + // execute a swap between that non-voting replica and the voting replica + // we're trying to move to it. This swap is executed atomically via + // joint-consensus. + // + // NB: Since voting replicas abide by both the overall `constraints` and + // the `voter_constraints`, it is copacetic to make this swap since: + // + // 1. `addTarget` must already be a valid target for a voting replica + // (i.e. it must already satisfy both *constraints fields) since + // `Allocator.RebalanceVoter` just handed it to us. + // 2. `removeTarget` may or may not be a valid target for a non-voting + // replica, but `considerRebalance` takes care to `requeue` the current + // replica into the replicateQueue. So we expect the replicateQueue's next + // attempt at rebalancing this range to rebalance the non-voter if it ends + // up being in violation of the range's constraints. + rq.metrics.NonVoterPromotionsCount.Inc(1) + rq.metrics.VoterDemotionsCount.Inc(1) + promo := roachpb.ReplicationChangesForPromotion(addTarget) + demo := roachpb.ReplicationChangesForDemotion(removeTarget) + chgs = append(promo, demo...) + } else if found { + return nil, errors.AssertionFailedf("programming error:"+ + " store being rebalanced to(%s) already has a voting replica", addTarget.StoreID) + } else { + // We have a replica to remove and one we can add, so let's swap them out. + chgs = []roachpb.ReplicationChange{ + {ChangeType: roachpb.ADD_VOTER, Target: addTarget}, + {ChangeType: roachpb.REMOVE_VOTER, Target: removeTarget}, + } + } + case nonVoterTarget: + if found { + // Non-voters should not consider any of the range's existing stores as + // valid candidates. If we get here, we must have raced with another + // rebalancing decision. + return nil, errors.AssertionFailedf("invalid rebalancing decision: trying to"+ + " move non-voter to a store that already has a replica %s for the range", rdesc) + } + chgs = []roachpb.ReplicationChange{ + {ChangeType: roachpb.ADD_NON_VOTER, Target: addTarget}, + {ChangeType: roachpb.REMOVE_NON_VOTER, Target: removeTarget}, + } + } + return chgs, nil } type transferLeaseOptions struct { diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index c941307eb90c..e46de78df5a4 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -16,6 +16,8 @@ import ( "encoding/json" "fmt" "math" + "math/rand" + "strconv" "strings" "testing" "time" @@ -32,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -287,6 +290,17 @@ func TestReplicateQueueDownReplicate(t *testing.T) { } } +func scanAndGetNumNonVoters( + t *testing.T, tc *testcluster.TestCluster, store *kvserver.Store, scratchKey roachpb.Key, +) int { + // Nudge the replicateQueue to up/down-replicate our scratch range. + if err := store.ForceReplicationScanAndProcess(); err != nil { + t.Fatal(err) + } + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + return len(scratchRange.Replicas().NonVoterDescriptors()) +} + // TestReplicateQueueUpAndDownReplicateNonVoters is an end-to-end test ensuring // that the replicateQueue will add or remove non-voter(s) to a range based on // updates to its zone configuration. @@ -320,18 +334,9 @@ func TestReplicateQueueUpAndDownReplicateNonVoters(t *testing.T) { tc.Server(0).GetFirstStoreID()) require.NoError(t, err) - get := func() int { - // Nudge the replicateQueue to up/down-replicate our scratch range. - if err := store.ForceReplicationScanAndProcess(); err != nil { - t.Fatal(err) - } - scratchRange := tc.LookupRangeOrFatal(t, scratchKey) - return len(scratchRange.Replicas().NonVoterDescriptors()) - } - var expectedNonVoterCount = 2 testutils.SucceedsSoon(t, func() error { - if found := get(); found != expectedNonVoterCount { + if found := scanAndGetNumNonVoters(t, tc, store, scratchKey); found != expectedNonVoterCount { return errors.Errorf("expected upreplication to %d non-voters; found %d", expectedNonVoterCount, found) } @@ -344,7 +349,7 @@ func TestReplicateQueueUpAndDownReplicateNonVoters(t *testing.T) { require.NoError(t, err) expectedNonVoterCount = 0 testutils.SucceedsSoon(t, func() error { - if found := get(); found != expectedNonVoterCount { + if found := scanAndGetNumNonVoters(t, tc, store, scratchKey); found != expectedNonVoterCount { return errors.Errorf("expected downreplication to %d non-voters; found %d", expectedNonVoterCount, found) } @@ -352,6 +357,146 @@ func TestReplicateQueueUpAndDownReplicateNonVoters(t *testing.T) { }) } +// TestReplicateQueueSwapVoterWithNonVoters tests that voting replicas can +// rebalance to stores that already have a non-voter by "swapping" with them. +// "Swapping" in this context means simply changing the `ReplicaType` on the +// receiving store from non-voter to voter and changing it on the other side +// from voter to non-voter. +func TestReplicateQueueSwapVotersWithNonVoters(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + serverArgs := make(map[int]base.TestServerArgs) + // Assign each store a rack number so we can constrain individual voting and + // non-voting replicas to them. + for i := 1; i <= 5; i++ { + serverArgs[i-1] = base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "rack", Value: strconv.Itoa(i), + }, + }, + }, + } + } + clusterArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + ServerArgsPerNode: serverArgs, + } + + synthesizeRandomConstraints := func() ( + constraints string, voterStores, nonVoterStores []roachpb.StoreID, + ) { + storeList := []roachpb.StoreID{1, 2, 3, 4, 5} + // Shuffle the list of stores and designate the first 3 as voters and the + // rest as non-voters. + rand.Shuffle(5, func(i, j int) { + storeList[i], storeList[j] = storeList[j], storeList[i] + }) + voterStores = storeList[:3] + nonVoterStores = storeList[3:5] + + var overallConstraints, voterConstraints []string + for _, store := range nonVoterStores { + overallConstraints = append(overallConstraints, fmt.Sprintf(`"+rack=%d": 1`, store)) + } + for _, store := range voterStores { + voterConstraints = append(voterConstraints, fmt.Sprintf(`"+rack=%d": 1`, store)) + } + return fmt.Sprintf( + "ALTER RANGE default CONFIGURE ZONE USING num_replicas = 5, num_voters = 3,"+ + " constraints = '{%s}', voter_constraints = '{%s}'", + strings.Join(overallConstraints, ","), strings.Join(voterConstraints, ","), + ), voterStores, nonVoterStores + } + + tc := testcluster.StartTestCluster(t, 5, clusterArgs) + defer tc.Stopper().Stop(context.Background()) + + scratchKey := tc.ScratchRange(t) + // Start with 1 voter and 4 non-voters. This ensures that we also exercise the + // swapping behavior during voting replica allocation when we upreplicate to 3 + // voters after calling `synthesizeRandomConstraints` below. See comment + // inside `allocateTargetFromList`. + _, err := tc.ServerConn(0).Exec("ALTER RANGE default CONFIGURE ZONE USING" + + " num_replicas=5, num_voters=1") + require.NoError(t, err) + testutils.SucceedsSoon(t, func() error { + if err := forceScanOnAllReplicationQueues(tc); err != nil { + return err + } + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + if voters := scratchRange.Replicas().VoterDescriptors(); len(voters) != 1 { + return errors.Newf("expected 1 voter; got %v", voters) + } + if nonVoters := scratchRange.Replicas().NonVoterDescriptors(); len(nonVoters) != 4 { + return errors.Newf("expected 4 non-voters; got %v", nonVoters) + } + return nil + }) + + checkRelocated := func(t *testing.T, voterStores, nonVoterStores []roachpb.StoreID) { + testutils.SucceedsSoon(t, func() error { + if err := forceScanOnAllReplicationQueues(tc); err != nil { + return err + } + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + if n := len(scratchRange.Replicas().VoterDescriptors()); n != 3 { + return errors.Newf("number of voters %d does not match expectation", n) + } + if n := len(scratchRange.Replicas().NonVoterDescriptors()); n != 2 { + return errors.Newf("number of non-voters %d does not match expectation", n) + } + + // Check that each replica set is present on the stores designated by + // synthesizeRandomConstraints. + for _, store := range voterStores { + replDesc, ok := scratchRange.GetReplicaDescriptor(store) + if !ok { + return errors.Newf("no replica found on store %d", store) + } + if typ := replDesc.GetType(); typ != roachpb.VOTER_FULL { + return errors.Newf("replica on store %d does not match expectation;"+ + " expected VOTER_FULL, got %s", typ) + } + } + for _, store := range nonVoterStores { + replDesc, ok := scratchRange.GetReplicaDescriptor(store) + if !ok { + return errors.Newf("no replica found on store %d", store) + } + if typ := replDesc.GetType(); typ != roachpb.NON_VOTER { + return errors.Newf("replica on store %d does not match expectation;"+ + " expected NON_VOTER, got %s", typ) + } + } + return nil + }) + } + + var numIterations = 10 + if util.RaceEnabled { + numIterations = 1 + } + for i := 0; i < numIterations; i++ { + // Generate random (but valid) constraints for the 3 voters and 2 non_voters + // and check that the replicate queue achieves conformance. + // + // NB: `synthesizeRandomConstraints` sets up the default zone configs such + // that every range should have 3 voting replica and 2 non-voting replicas. + // The crucial thing to note here is that we have 5 stores and 5 replicas, + // and since we never allow a single store to have >1 replica for a range at + // any given point, any change in the configuration of these 5 replicas + // _must_ go through atomic non-voter promotions and voter demotions. + alterStatement, voterStores, nonVoterStores := synthesizeRandomConstraints() + log.Infof(ctx, "applying: %s", alterStatement) + _, err := tc.ServerConn(0).Exec(alterStatement) + require.NoError(t, err) + checkRelocated(t, voterStores, nonVoterStores) + } +} + // queryRangeLog queries the range log. The query must be of type: // `SELECT info from system.rangelog ...`. func queryRangeLog( @@ -410,6 +555,15 @@ func toggleReplicationQueues(tc *testcluster.TestCluster, active bool) { } } +func forceScanOnAllReplicationQueues(tc *testcluster.TestCluster) (err error) { + for _, s := range tc.Servers { + err = s.Stores().VisitStores(func(store *kvserver.Store) error { + return store.ForceReplicationScanAndProcess() + }) + } + return err +} + func toggleSplitQueues(tc *testcluster.TestCluster, active bool) { for _, s := range tc.Servers { _ = s.Stores().VisitStores(func(store *kvserver.Store) error { diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 6d061a5145da..62cc30a12155 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -384,11 +384,11 @@ func (sp *StorePool) updateLocalStoreAfterRebalance( return } switch changeType { - case roachpb.ADD_VOTER: + case roachpb.ADD_VOTER, roachpb.ADD_NON_VOTER: detail.desc.Capacity.RangeCount++ detail.desc.Capacity.LogicalBytes += rangeUsageInfo.LogicalBytes detail.desc.Capacity.WritesPerSecond += rangeUsageInfo.WritesPerSecond - case roachpb.REMOVE_VOTER: + case roachpb.REMOVE_VOTER, roachpb.REMOVE_NON_VOTER: detail.desc.Capacity.RangeCount-- if detail.desc.Capacity.LogicalBytes <= rangeUsageInfo.LogicalBytes { detail.desc.Capacity.LogicalBytes = 0 @@ -400,6 +400,8 @@ func (sp *StorePool) updateLocalStoreAfterRebalance( } else { detail.desc.Capacity.WritesPerSecond -= rangeUsageInfo.WritesPerSecond } + default: + return } sp.detailsMu.storeDetails[storeID] = &detail } diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index e0a054c8c551..baa2d8d6b164 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1397,6 +1397,22 @@ func MakeReplicationChanges( return chgs } +// ReplicationChangesForPromotion returns the replication changes that +// correspond to the promotion of a non-voter to a voter. +func ReplicationChangesForPromotion(target ReplicationTarget) []ReplicationChange { + return []ReplicationChange{ + {ChangeType: ADD_VOTER, Target: target}, {ChangeType: REMOVE_NON_VOTER, Target: target}, + } +} + +// ReplicationChangesForDemotion returns the replication changes that correspond +// to the demotion of a voter to a non-voter. +func ReplicationChangesForDemotion(target ReplicationTarget) []ReplicationChange { + return []ReplicationChange{ + {ChangeType: ADD_NON_VOTER, Target: target}, {ChangeType: REMOVE_VOTER, Target: target}, + } +} + // AddChanges adds a batch of changes to the request in a backwards-compatible // way. func (acrr *AdminChangeReplicasRequest) AddChanges(chgs ...ReplicationChange) { diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 4c0366fa47b6..6fd3baacec13 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1466,9 +1466,17 @@ var charts = []sectionDescription{ Metrics: []string{"queue.replicate.purgatory"}, }, { - Title: "Reblance Count", + Title: "Rebalance Count", Metrics: []string{"queue.replicate.rebalancereplica"}, }, + { + Title: "Demotions of Voters to Non Voters", + Metrics: []string{"queue.replicate.voterdemotions"}, + }, + { + Title: "Promotions of Non Voters to Voters", + Metrics: []string{"queue.replicate.nonvoterpromotions"}, + }, { Title: "Remove Replica Count", Metrics: []string{