diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 844a0e56096a..abbb0232200d 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -144,6 +144,25 @@ func (a AllocatorAction) String() string { return allocatorActionNames[a] } +type targetReplicaType int + +const ( + _ targetReplicaType = iota + voterTarget + nonVoterTarget +) + +func (t targetReplicaType) String() string { + switch typ := t; typ { + case voterTarget: + return "voters" + case nonVoterTarget: + return "non-voters" + default: + panic(fmt.Sprintf("unknown targetReplicaType %d", typ)) + } +} + type transferDecision int const ( @@ -158,40 +177,50 @@ const ( // can be retried quickly as soon as new stores come online, or additional // space frees up. type allocatorError struct { - constraints []zonepb.ConstraintsConjunction - existingReplicas int - aliveStores int - throttledStores int + constraints []zonepb.ConstraintsConjunction + voterConstraints []zonepb.ConstraintsConjunction + existingVoterCount int + existingNonVoterCount int + aliveStores int + throttledStores int } func (ae *allocatorError) Error() string { - var existingReplsStr string - if ae.existingReplicas == 1 { - existingReplsStr = "1 already has a replica" + var existingVoterStr string + if ae.existingVoterCount == 1 { + existingVoterStr = "1 already has a voter" } else { - existingReplsStr = fmt.Sprintf("%d already have a replica", ae.existingReplicas) + existingVoterStr = fmt.Sprintf("%d already have a voter", ae.existingVoterCount) + } + + var existingNonVoterStr string + if ae.existingNonVoterCount == 1 { + existingNonVoterStr = "1 already has a non-voter" + } else { + existingNonVoterStr = fmt.Sprintf("%d already have a non-voter", ae.existingNonVoterCount) } var baseMsg string if ae.throttledStores != 0 { baseMsg = fmt.Sprintf( - "0 of %d live stores are able to take a new replica for the range (%d throttled, %s)", - ae.aliveStores, ae.throttledStores, existingReplsStr) + "0 of %d live stores are able to take a new replica for the range (%d throttled, %s, %s)", + ae.aliveStores, ae.throttledStores, existingVoterStr, existingNonVoterStr) } else { baseMsg = fmt.Sprintf( - "0 of %d live stores are able to take a new replica for the range (%s)", - ae.aliveStores, existingReplsStr) + "0 of %d live stores are able to take a new replica for the range (%s, %s)", + ae.aliveStores, existingVoterStr, existingNonVoterStr) } - if len(ae.constraints) == 0 { + if len(ae.constraints) == 0 && len(ae.voterConstraints) == 0 { if ae.throttledStores > 0 { return baseMsg } return baseMsg + "; likely not enough nodes in cluster" } + var b strings.Builder b.WriteString(baseMsg) - b.WriteString("; must match constraints [") + b.WriteString("; replicas must match constraints [") for i := range ae.constraints { if i > 0 { b.WriteByte(' ') @@ -201,6 +230,18 @@ func (ae *allocatorError) Error() string { b.WriteByte('}') } b.WriteString("]") + + b.WriteString("; voting replicas must match voter_constraints [") + for i := range ae.voterConstraints { + if i > 0 { + b.WriteByte(' ') + } + b.WriteByte('{') + b.WriteString(ae.voterConstraints[i].String()) + b.WriteByte('}') + } + b.WriteString("]") + return b.String() } @@ -532,21 +573,22 @@ type decisionDetails struct { Existing string `json:",omitempty"` } -// AllocateTarget returns a suitable store for a new allocation with the -// required attributes. Nodes already accommodating existing replicas are ruled -// out as targets. The range ID of the replica being allocated for is also -// passed in to ensure that we don't try to replace an existing dead replica on -// a store. -// -// TODO(tbg): AllocateReplacement? -func (a *Allocator) AllocateTarget( - ctx context.Context, zone *zonepb.ZoneConfig, existingReplicas []roachpb.ReplicaDescriptor, +func (a *Allocator) allocateTarget( + ctx context.Context, + zone *zonepb.ZoneConfig, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + targetType targetReplicaType, ) (*roachpb.StoreDescriptor, string, error) { - sl, aliveStoreCount, throttled := a.storePool.getStoreList(storeFilterThrottled) + candidateStoreList, aliveStoreCount, throttled := a.storePool.getStoreList(storeFilterThrottled) target, details := a.allocateTargetFromList( - ctx, sl, zone, existingReplicas, a.scorerOptions()) - + ctx, + candidateStoreList, + zone, + existingVoters, + existingNonVoters, + a.scorerOptions(), + targetType) if target != nil { return target, details, nil } @@ -559,30 +601,102 @@ func (a *Allocator) AllocateTarget( ) } return nil, "", &allocatorError{ - constraints: zone.Constraints, - existingReplicas: len(existingReplicas), - aliveStores: aliveStoreCount, - throttledStores: len(throttled), + voterConstraints: zone.VoterConstraints, + constraints: zone.Constraints, + existingVoterCount: len(existingVoters), + existingNonVoterCount: len(existingNonVoters), + aliveStores: aliveStoreCount, + throttledStores: len(throttled), } } +// AllocateVoter returns a suitable store for a new allocation of a voting +// replica with the required attributes. Nodes already accommodating existing +// replicas are ruled out as targets. +func (a *Allocator) AllocateVoter( + ctx context.Context, + zone *zonepb.ZoneConfig, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, +) (*roachpb.StoreDescriptor, string, error) { + return a.allocateTarget(ctx, zone, existingVoters, existingNonVoters, voterTarget) +} + +// AllocateNonVoter returns a suitable store for a new allocation of a +// non-voting replica with the required attributes. Nodes already accommodating +// _any_ existing replicas are ruled out as targets. +func (a *Allocator) AllocateNonVoter( + ctx context.Context, + zone *zonepb.ZoneConfig, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, +) (*roachpb.StoreDescriptor, string, error) { + return a.allocateTarget(ctx, zone, existingVoters, existingNonVoters, nonVoterTarget) +} + func (a *Allocator) allocateTargetFromList( ctx context.Context, candidateStores StoreList, zone *zonepb.ZoneConfig, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, options scorerOptions, + targetType targetReplicaType, ) (*roachpb.StoreDescriptor, string) { - analyzedConstraints := constraint.AnalyzeConstraints( - ctx, a.storePool.getStoreDescriptor, existingReplicas, zone) - candidates := allocateCandidates( + existingReplicas := append(existingVoters, existingNonVoters...) + analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor, + existingReplicas, *zone.NumReplicas, zone.Constraints) + analyzedVoterConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor, + existingVoters, zone.GetNumVoters(), zone.VoterConstraints) + + var constraintsChecker constraintsCheckFn + var replicaSetForDiversityCalc []roachpb.ReplicaDescriptor + switch t := targetType; t { + case voterTarget: + constraintsChecker = voterConstraintsCheckerForAllocation( + analyzedOverallConstraints, + analyzedVoterConstraints, + ) + // When computing the "diversity score" for a given store for a voting + // replica allocation, we consider the localities of only the stores that + // contain a voting replica for the range. + // + // Note that if we were to consider all stores that have any kind of replica + // for the range, voting replica allocation would be disincentivized to pick + // stores that (partially or fully) share locality hierarchies with stores + // that contain a non-voting replica. This is undesirable because this could + // inadvertently reduce the fault-tolerance of the range in cases like the + // following: + // + // Consider 3 regions (A, B, C), each with 2 AZs. Suppose that regions A and + // B have a voting replica each, whereas region C has a non-voting replica. + // In cases like these, we would want region C to be picked over regions A + // and B for allocating a new third voting replica since that improves our + // fault tolerance to the greatest degree. + // In the counterfactual (i.e. if we were to compute diversity scores based + // off of all `existingReplicas`), regions A, B, and C would all be equally + // likely to get a new voting replica. + replicaSetForDiversityCalc = existingVoters + case nonVoterTarget: + constraintsChecker = nonVoterConstraintsCheckerForAllocation(analyzedOverallConstraints) + replicaSetForDiversityCalc = existingReplicas + default: + log.Fatalf(ctx, "unsupported targetReplicaType: %v", t) + } + + candidates := rankedCandidateListForAllocation( ctx, - candidateStores, analyzedConstraints, existingReplicas, - a.storePool.getLocalitiesByStore(existingReplicas), + candidateStores, + constraintsChecker, + // TODO(aayush): We currently rule out all the stores that have any replica + // for the given range. However, we want to get to a place where we'll + // consider the targets that have a non-voter as feasible + // relocation/up-replication targets for existing/new voting replicas, since + // it is cheaper to promote a non-voter than it is to add a new voter via + // the LEARNER->VOTER_FULL path. + existingReplicas, + a.storePool.getLocalitiesByStore(replicaSetForDiversityCalc), a.storePool.isNodeReadyForRoutineReplicaTransfer, - options, - ) - log.VEventf(ctx, 3, "allocate candidates: %s", candidates) + options) + + log.VEventf(ctx, 3, "allocate %s: %s", targetType, candidates) if target := candidates.selectGood(a.randGen); target != nil { log.VEventf(ctx, 3, "add target: %s", target) details := decisionDetails{Target: target.compactString(options)} @@ -596,12 +710,13 @@ func (a *Allocator) allocateTargetFromList( return nil, "" } +// TODO(aayush): Generalize this to work for non-voting replicas as well. func (a Allocator) simulateRemoveTarget( ctx context.Context, targetStore roachpb.StoreID, zone *zonepb.ZoneConfig, candidates []roachpb.ReplicaDescriptor, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, rangeUsageInfo RangeUsageInfo, ) (roachpb.ReplicaDescriptor, string, error) { // Update statistics first @@ -614,41 +729,64 @@ func (a Allocator) simulateRemoveTarget( a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.REMOVE_VOTER) }() log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore) - return a.RemoveTarget(ctx, zone, candidates, existingReplicas) + return a.RemoveVoter(ctx, zone, candidates, existingVoters, nil) } -// RemoveTarget returns a suitable replica to remove from the provided replica -// set. It first attempts to randomly select a target from the set of stores -// that have greater than the average number of replicas. Failing that, it -// falls back to selecting a random target from any of the existing -// replicas. -func (a Allocator) RemoveTarget( +func (a Allocator) removeTarget( ctx context.Context, zone *zonepb.ZoneConfig, candidates []roachpb.ReplicaDescriptor, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, + targetType targetReplicaType, ) (roachpb.ReplicaDescriptor, string, error) { if len(candidates) == 0 { - return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()") + return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one" + + " candidate replica to allocator.removeTarget()") } + existingReplicas := append(existingVoters, existingNonVoters...) // Retrieve store descriptors for the provided candidates from the StorePool. candidateStoreIDs := make(roachpb.StoreIDSlice, len(candidates)) for i, exist := range candidates { candidateStoreIDs[i] = exist.StoreID } candidateStoreList, _, _ := a.storePool.getStoreListFromIDs(candidateStoreIDs, storeFilterNone) - - analyzedConstraints := constraint.AnalyzeConstraints( - ctx, a.storePool.getStoreDescriptor, existingReplicas, zone) + analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor, + existingReplicas, *zone.NumReplicas, zone.Constraints) + analyzedVoterConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor, + existingVoters, zone.GetNumVoters(), zone.VoterConstraints) options := a.scorerOptions() - rankedCandidates := removeCandidates( + + var constraintsChecker constraintsCheckFn + var replicaSetForDiversityCalc []roachpb.ReplicaDescriptor + switch t := targetType; t { + case voterTarget: + // Voting replicas have to abide by both the overall `constraints` (which + // apply to all replicas) and `voter_constraints` which apply only to voting + // replicas. + constraintsChecker = voterConstraintsCheckerForRemoval( + analyzedOverallConstraints, + analyzedVoterConstraints, + ) + // See comment inside `allocateTargetFromList` for why we compute diversity + // scores relative to only the stores of voting replicas. + replicaSetForDiversityCalc = existingVoters + case nonVoterTarget: + constraintsChecker = nonVoterConstraintsCheckerForRemoval(analyzedOverallConstraints) + replicaSetForDiversityCalc = existingReplicas + default: + log.Fatalf(ctx, "unsupported targetReplicaType: %v", t) + } + + rankedCandidates := rankedCandidateListForRemoval( candidateStoreList, - analyzedConstraints, - a.storePool.getLocalitiesByStore(existingReplicas), + constraintsChecker, + a.storePool.getLocalitiesByStore(replicaSetForDiversityCalc), options, ) - log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates) + + log.VEventf(ctx, 3, "remove %s: %s", targetType, rankedCandidates) if bad := rankedCandidates.selectBad(a.randGen); bad != nil { for _, exist := range existingReplicas { if exist.StoreID == bad.store.StoreID { @@ -666,9 +804,38 @@ func (a Allocator) RemoveTarget( return roachpb.ReplicaDescriptor{}, "", errors.New("could not select an appropriate replica to be removed") } +// RemoveVoter returns a suitable replica to remove from the provided replica +// set. It first attempts to randomly select a target from the set of stores +// that have greater than the average number of replicas. Failing that, it falls +// back to selecting a random target from any of the existing voting replicas. +func (a Allocator) RemoveVoter( + ctx context.Context, + zone *zonepb.ZoneConfig, + voterCandidates []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, +) (roachpb.ReplicaDescriptor, string, error) { + return a.removeTarget(ctx, zone, voterCandidates, existingVoters, existingNonVoters, voterTarget) +} + +// RemoveNonVoter returns a suitable non-voting replica to remove from the +// provided set. It first attempts to randomly select a target from the set of +// stores that have greater than the average number of replicas. Failing that, +// it falls back to selecting a random target from any of the existing +// non-voting replicas. +func (a Allocator) RemoveNonVoter( + ctx context.Context, + zone *zonepb.ZoneConfig, + nonVoterCandidates []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, +) (roachpb.ReplicaDescriptor, string, error) { + return a.removeTarget(ctx, zone, nonVoterCandidates, existingVoters, existingNonVoters, nonVoterTarget) +} + // RebalanceTarget returns a suitable store for a rebalance target with // required attributes. Rebalance targets are selected via the same mechanism -// as AllocateTarget(), except the chosen target must follow some additional +// as AllocateVoter(), except the chosen target must follow some additional // criteria. Namely, if chosen, it must further the goal of balancing the // cluster. // @@ -680,7 +847,7 @@ func (a Allocator) RemoveTarget( // replica to the range, then removing the most undesirable replica. // // Simply ignoring a rebalance opportunity in the event that the target chosen -// by AllocateTarget() doesn't fit balancing criteria is perfectly fine, as +// by AllocateVoter() doesn't fit balancing criteria is perfectly fine, as // other stores in the cluster will also be doing their probabilistic best to // rebalance. This helps prevent a stampeding herd targeting an abnormally // under-utilized store. @@ -731,9 +898,8 @@ func (a Allocator) RebalanceTarget( return zero, zero, "", false } } - analyzedConstraints := constraint.AnalyzeConstraints( - ctx, a.storePool.getStoreDescriptor, existingReplicas, zone) + ctx, a.storePool.getStoreDescriptor, existingReplicas, *zone.NumReplicas, zone.Constraints) options := a.scorerOptions() results := rebalanceCandidates( ctx, @@ -798,7 +964,7 @@ func (a Allocator) RebalanceTarget( rangeUsageInfo, ) if err != nil { - log.Warningf(ctx, "simulating RemoveTarget failed: %+v", err) + log.Warningf(ctx, "simulating RemoveVoter failed: %+v", err) return zero, zero, "", false } if target.store.StoreID != removeReplica.StoreID { diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index 3e0789b570da..81f6f92ad2bf 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -404,13 +404,13 @@ func (cl candidateList) removeCandidate(c candidate) candidateList { return cl } -// allocateCandidates creates a candidate list of all stores that can be used -// for allocating a new replica ordered from the best to the worst. Only -// stores that meet the criteria are included in the list. -func allocateCandidates( +// rankedCandidateListForAllocation creates a candidate list of all stores that +// can be used for allocating a new replica ordered from the best to the worst. +// Only stores that meet the criteria are included in the list. +func rankedCandidateListForAllocation( ctx context.Context, candidateStores StoreList, - constraints constraint.AnalyzedConstraints, + constraintsCheck constraintsCheckFn, existingReplicas []roachpb.ReplicaDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool, @@ -425,10 +425,11 @@ func allocateCandidates( log.VEventf(ctx, 3, "not considering non-ready node n%d for allocate", s.Node.NodeID) continue } - constraintsOK, necessary := allocateConstraintsCheck(s, constraints) + constraintsOK, necessary := constraintsCheck(s) if !constraintsOK { continue } + if !maxCapacityCheck(s) { continue } @@ -436,11 +437,13 @@ func allocateCandidates( balanceScore := balanceScore(candidateStores, s.Capacity, options) var convergesScore int if options.qpsRebalanceThreshold > 0 { - if s.Capacity.QueriesPerSecond < underfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) { + if s.Capacity.QueriesPerSecond < underfullThreshold( + candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) { convergesScore = 1 } else if s.Capacity.QueriesPerSecond < candidateStores.candidateQueriesPerSecond.mean { convergesScore = 0 - } else if s.Capacity.QueriesPerSecond < overfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) { + } else if s.Capacity.QueriesPerSecond < overfullThreshold( + candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) { convergesScore = -1 } else { convergesScore = -2 @@ -464,18 +467,18 @@ func allocateCandidates( return candidates } -// removeCandidates creates a candidate list of all existing replicas' stores -// ordered from least qualified for removal to most qualified. Stores that are -// marked as not valid, are in violation of a required criteria. -func removeCandidates( +// rankedCandidateListForRemoval creates a candidate list of all existing +// replicas' stores ordered from least qualified for removal to most qualified. +// Stores that are marked as not valid, are in violation of a required criteria. +func rankedCandidateListForRemoval( sl StoreList, - constraints constraint.AnalyzedConstraints, + constraintsCheck constraintsCheckFn, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, options scorerOptions, ) candidateList { var candidates candidateList for _, s := range sl.stores { - constraintsOK, necessary := removeConstraintsCheck(s, constraints) + constraintsOK, necessary := constraintsCheck(s) if !constraintsOK { candidates = append(candidates, candidate{ store: s, @@ -707,10 +710,9 @@ func rebalanceCandidates( balanceScore := balanceScore(comparable.sl, existing.store.Capacity, options) var convergesScore int if !rebalanceFromConvergesOnMean(comparable.sl, existing.store.Capacity) { - // Similarly to in removeCandidates, any replica whose removal - // would not converge the range stats to their means is given a - // constraint score boost of 1 to make it less attractive for - // removal. + // Similarly to in rankedCandidateListForRemoval, any replica whose + // removal would not converge the range stats to their means is given a + // constraint score boost of 1 to make it less attractive for removal. convergesScore = 1 } existing.convergesScore = convergesScore @@ -904,6 +906,78 @@ func sameLocalityAndAttrs(s1, s2 roachpb.StoreDescriptor) bool { return true } +// constraintsCheckFn determines whether the given store is a valid and/or +// necessary candidate for an addition of a new replica or a removal of an +// existing one. +type constraintsCheckFn func(roachpb.StoreDescriptor) (valid, necessary bool) + +// voterConstraintsCheckerForAllocation returns a constraintsCheckFn that +// determines whether a candidate for a new voting replica is valid and/or +// necessary as per the `voter_constraints` and `constraints` on the range. +// +// NB: Potential voting replica candidates are "valid" only if they satisfy both +// the `voter_constraints` as well as the overall `constraints`. Additionally, +// candidates are only marked "necessary" if they're required in order to +// satisfy either the `voter_constraints` set or the `constraints` set. +func voterConstraintsCheckerForAllocation( + overallConstraints, voterConstraints constraint.AnalyzedConstraints, +) constraintsCheckFn { + return func(s roachpb.StoreDescriptor) (valid, necessary bool) { + overallConstraintsOK, necessaryOverall := allocateConstraintsCheck(s, overallConstraints) + voterConstraintsOK, necessaryForVoters := allocateConstraintsCheck(s, voterConstraints) + + return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters + } +} + +// nonVoterConstraintsCheckerForAllocation returns a constraintsCheckFn that +// determines whether a candidate for a new non-voting replica is valid and/or +// necessary as per the `constraints` on the range. +// +// NB: Non-voting replicas don't care about `voter_constraints`, so that +// constraint set is entirely disregarded here. +func nonVoterConstraintsCheckerForAllocation( + overallConstraints constraint.AnalyzedConstraints, +) constraintsCheckFn { + return func(s roachpb.StoreDescriptor) (valid, necessary bool) { + return allocateConstraintsCheck(s, overallConstraints) + } +} + +// voterConstraintsCheckerForRemoval returns a constraintsCheckFn that +// determines whether an existing voting replica is valid and/or necessary with +// respect to the `constraints` and `voter_constraints` on the range. +// +// NB: Candidates are marked invalid if their removal would result in a +// violation of `voter_constraints` or `constraints` on the range. They are +// marked necessary if constraints conformance (for either `constraints` or +// `voter_constraints`) is not possible without them. +func voterConstraintsCheckerForRemoval( + overallConstraints, voterConstraints constraint.AnalyzedConstraints, +) constraintsCheckFn { + return func(s roachpb.StoreDescriptor) (valid, necessary bool) { + overallConstraintsOK, necessaryOverall := removeConstraintsCheck(s, overallConstraints) + voterConstraintsOK, necessaryForVoters := removeConstraintsCheck(s, voterConstraints) + + return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters + } +} + +// nonVoterConstraintsCheckerForRemoval returns a constraintsCheckFn that +// determines whether an existing non-voting replica is valid and/or necessary +// with respect to the `constraints` on the range. +// +// NB: Candidates are marked invalid if their removal would result in a +// violation of `constraints` on the range. They are marked necessary if +// constraints conformance (for `constraints`) is not possible without them. +func nonVoterConstraintsCheckerForRemoval( + overallConstraints constraint.AnalyzedConstraints, +) constraintsCheckFn { + return func(s roachpb.StoreDescriptor) (valid, necessary bool) { + return removeConstraintsCheck(s, overallConstraints) + } +} + // allocateConstraintsCheck checks the potential allocation target store // against all the constraints. If it matches a constraint at all, it's valid. // If it matches a constraint that is not already fully satisfied by existing @@ -927,6 +1001,9 @@ func allocateConstraintsCheck( ); constraintsOK { valid = true matchingStores := analyzed.SatisfiedBy[i] + // NB: We check for "<" here instead of "<=" because `matchingStores` + // doesn't include `store`. Thus, `store` is only marked necessary if we + // have strictly fewer matchingStores than we need. if len(matchingStores) < int(constraints.NumReplicas) { return true, true } diff --git a/pkg/kv/kvserver/allocator_scorer_test.go b/pkg/kv/kvserver/allocator_scorer_test.go index e24d409ec195..80e90100676f 100644 --- a/pkg/kv/kvserver/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator_scorer_test.go @@ -918,7 +918,8 @@ func TestAllocateConstraintsCheck(t *testing.T) { NumReplicas: proto.Int32(tc.zoneNumReplicas), } analyzed := constraint.AnalyzeConstraints( - context.Background(), getTestStoreDesc, testStoreReplicas(tc.existing), zone) + context.Background(), getTestStoreDesc, testStoreReplicas(tc.existing), + *zone.NumReplicas, zone.Constraints) for _, s := range testStores { valid, necessary := allocateConstraintsCheck(s, analyzed) if e, a := tc.expectedValid[s.StoreID], valid; e != a { @@ -1052,7 +1053,7 @@ func TestRemoveConstraintsCheck(t *testing.T) { NumReplicas: proto.Int32(tc.zoneNumReplicas), } analyzed := constraint.AnalyzeConstraints( - context.Background(), getTestStoreDesc, existing, zone) + context.Background(), getTestStoreDesc, existing, *zone.NumReplicas, zone.Constraints) for storeID, expected := range tc.expected { valid, necessary := removeConstraintsCheck(testStores[storeID], analyzed) if e, a := expected.valid, valid; e != a { diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 111c6a7f48d3..3b252743cdd8 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -64,13 +64,39 @@ var simpleZoneConfig = zonepb.ZoneConfig{ }, } -var multiDCConfig = zonepb.ZoneConfig{ +var multiDCConfigSSD = zonepb.ZoneConfig{ NumReplicas: proto.Int32(2), Constraints: []zonepb.ConstraintsConjunction{ {Constraints: []zonepb.Constraint{{Value: "ssd", Type: zonepb.Constraint_REQUIRED}}}, }, } +var multiDCConfigConstrainToA = zonepb.ZoneConfig{ + NumReplicas: proto.Int32(2), + Constraints: []zonepb.ConstraintsConjunction{ + {Constraints: []zonepb.Constraint{{Value: "a", Type: zonepb.Constraint_REQUIRED}}}, + }, +} + +var multiDCConfigUnsatisfiableVoterConstraints = zonepb.ZoneConfig{ + NumReplicas: proto.Int32(2), + VoterConstraints: []zonepb.ConstraintsConjunction{ + {Constraints: []zonepb.Constraint{{Value: "doesNotExist", Type: zonepb.Constraint_REQUIRED}}}, + }, +} + +// multiDCConfigVoterAndNonVoter prescribes that one voting replica be placed in +// DC "a" and one non-voting replica be placed in DC "b". +var multiDCConfigVoterAndNonVoter = zonepb.ZoneConfig{ + NumReplicas: proto.Int32(2), + Constraints: []zonepb.ConstraintsConjunction{ + {Constraints: []zonepb.Constraint{{Value: "a", Type: zonepb.Constraint_REQUIRED}}, NumReplicas: 1}, + }, + VoterConstraints: []zonepb.ConstraintsConjunction{ + {Constraints: []zonepb.Constraint{{Value: "b", Type: zonepb.Constraint_REQUIRED}}}, + }, +} + var singleStore = []*roachpb.StoreDescriptor{ { StoreID: 1, @@ -316,6 +342,18 @@ func createTestAllocator( return stopper, g, storePool, a, manual } +// checkReplExists checks whether the given `repl` exists on any of the +// `stores`. +func checkReplExists(repl roachpb.ReplicaDescriptor, stores []roachpb.StoreID) (found bool) { + for _, storeID := range stores { + if repl.StoreID == storeID { + found = true + break + } + } + return found +} + // mockStorePool sets up a collection of a alive and dead stores in the store // pool for testing purposes. func mockStorePool( @@ -389,10 +427,10 @@ func TestAllocatorSimpleRetrieval(t *testing.T) { stopper, g, _, a, _ := createTestAllocator(1, false /* deterministic */) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) - result, _, err := a.AllocateTarget( + result, _, err := a.AllocateVoter( context.Background(), &simpleZoneConfig, - []roachpb.ReplicaDescriptor{}, + nil /* existingVoters */, nil, /* existingNonVoters */ ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -408,10 +446,10 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { stopper, _, _, a, _ := createTestAllocator(1, false /* deterministic */) defer stopper.Stop(context.Background()) - result, _, err := a.AllocateTarget( + result, _, err := a.AllocateVoter( context.Background(), &simpleZoneConfig, - []roachpb.ReplicaDescriptor{}, + nil /* existingVoters */, nil, /* existingNonVoters */ ) if result != nil { t.Errorf("expected nil result: %+v", result) @@ -429,21 +467,21 @@ func TestAllocatorTwoDatacenters(t *testing.T) { defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(multiDCStores, t) ctx := context.Background() - result1, _, err := a.AllocateTarget( + result1, _, err := a.AllocateVoter( ctx, - &multiDCConfig, - []roachpb.ReplicaDescriptor{}, + &multiDCConfigSSD, + nil /* existingVoters */, nil, /* existingNonVoters */ ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) } - result2, _, err := a.AllocateTarget( + result2, _, err := a.AllocateVoter( ctx, - &multiDCConfig, + &multiDCConfigSSD, []roachpb.ReplicaDescriptor{{ NodeID: result1.Node.NodeID, StoreID: result1.StoreID, - }}, + }}, nil, /* existingNonVoters */ ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -454,9 +492,9 @@ func TestAllocatorTwoDatacenters(t *testing.T) { t.Errorf("Expected nodes %+v: %+v vs %+v", expected, result1.Node, result2.Node) } // Verify that no result is forthcoming if we already have a replica. - result3, _, err := a.AllocateTarget( + result3, _, err := a.AllocateVoter( ctx, - &multiDCConfig, + &multiDCConfigSSD, []roachpb.ReplicaDescriptor{ { NodeID: result1.Node.NodeID, @@ -466,7 +504,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { NodeID: result2.Node.NodeID, StoreID: result2.StoreID, }, - }, + }, nil, /* existingNonVoters */ ) if err == nil { t.Errorf("expected error on allocation without available stores: %+v", result3) @@ -480,7 +518,7 @@ func TestAllocatorExistingReplica(t *testing.T) { stopper, g, _, a, _ := createTestAllocator(1, false /* deterministic */) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t) - result, _, err := a.AllocateTarget( + result, _, err := a.AllocateVoter( context.Background(), &zonepb.ZoneConfig{ NumReplicas: proto.Int32(0), @@ -498,7 +536,7 @@ func TestAllocatorExistingReplica(t *testing.T) { NodeID: 2, StoreID: 2, }, - }, + }, nil, /* existingNonVoters */ ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -599,13 +637,9 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { for _, tc := range testCases { { - result, _, err := a.AllocateTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - tc.existing, - ) + result, _, err := a.AllocateVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), tc.existing, nil) if e, a := tc.expectTargetAllocate, result != nil; e != a { - t.Errorf("AllocateTarget(%v) got target %v, err %v; expectTarget=%v", + t.Errorf("AllocateVoter(%v) got target %v, err %v; expectTarget=%v", tc.existing, result, err, tc.expectTargetAllocate) } } @@ -835,7 +869,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { // We make 5 stores in this test -- 3 in the same datacenter, and 1 each in // 2 other datacenters. All of our replicas are distributed within these 3 // datacenters. Originally, the stores that are all alone in their datacenter - // are fuller than the other stores. If we didn't simulate RemoveTarget in + // are fuller than the other stores. If we didn't simulate RemoveVoter in // RebalanceTarget, we would try to choose store 2 or 3 as the target store // to make a rebalance. However, we would immediately remove the replica on // store 1 or 2 to retain the locality diversity. @@ -2060,7 +2094,13 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { } } -func TestAllocatorRemoveTargetLocality(t *testing.T) { +// TestAllocatorRemoveBasedOnDiversity tests that replicas that are removed on +// the basis of diversity are such that the resulting diversity score of the +// range (after their removal) is the highest. Additionally, it also ensures +// that voting replica removals only consider the set of existing voters when +// computing the diversity score, whereas non-voting replica removal considers +// all existing replicas for its diversity calculation. +func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2072,61 +2112,163 @@ func TestAllocatorRemoveTargetLocality(t *testing.T) { // Given a set of existing replicas for a range, pick out the ones that should // be removed purely on the basis of locality diversity. testCases := []struct { - existing []roachpb.StoreID - expected []roachpb.StoreID + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor + expVoterRemovals, expNonVoterRemovals []roachpb.StoreID }{ + // NB: the `existingNonVoters` in these subtests are such that they would be + // expected to alter the diversity scores if they were not disregarded + // during voter removal. { - []roachpb.StoreID{1, 2, 3, 5}, - []roachpb.StoreID{1, 2}, + existingVoters: replicas(1, 2, 3, 5), + existingNonVoters: replicas(6, 7), + // 1 and 2 are in the same datacenter. + expVoterRemovals: []roachpb.StoreID{1, 2}, + expNonVoterRemovals: []roachpb.StoreID{6}, }, { - []roachpb.StoreID{1, 2, 3}, - []roachpb.StoreID{1, 2}, + existingVoters: replicas(1, 2, 3), + existingNonVoters: replicas(4, 6, 7), + expVoterRemovals: []roachpb.StoreID{1, 2}, + expNonVoterRemovals: []roachpb.StoreID{4}, }, { - []roachpb.StoreID{1, 3, 4, 5}, - []roachpb.StoreID{3, 4}, + existingVoters: replicas(1, 3, 4, 5), + existingNonVoters: replicas(2), + expVoterRemovals: []roachpb.StoreID{3, 4}, + expNonVoterRemovals: []roachpb.StoreID{2}, }, { - []roachpb.StoreID{1, 3, 5, 6}, - []roachpb.StoreID{5, 6}, + existingVoters: replicas(1, 3, 5, 6), + existingNonVoters: replicas(2, 7, 8), + expVoterRemovals: []roachpb.StoreID{5, 6}, + expNonVoterRemovals: []roachpb.StoreID{2, 7, 8}, }, { - []roachpb.StoreID{1, 3, 5}, - []roachpb.StoreID{1, 3, 5}, - }, - { - []roachpb.StoreID{1, 3, 4, 6, 7, 8}, - []roachpb.StoreID{3, 4, 7, 8}, + existingVoters: replicas(3, 4, 7, 8), + existingNonVoters: replicas(2, 5, 6), + expVoterRemovals: []roachpb.StoreID{3, 4, 7, 8}, + expNonVoterRemovals: []roachpb.StoreID{5, 6}, }, } for _, c := range testCases { - existingRepls := make([]roachpb.ReplicaDescriptor, len(c.existing)) - for i, storeID := range c.existing { - existingRepls[i] = roachpb.ReplicaDescriptor{ - NodeID: roachpb.NodeID(storeID), - StoreID: storeID, - } - } - targetRepl, details, err := a.RemoveTarget( + targetVoter, details, err := a.RemoveVoter( context.Background(), zonepb.EmptyCompleteZoneConfig(), - existingRepls, - existingRepls, + c.existingVoters, /* voterCandidates */ + c.existingVoters, + c.existingNonVoters, ) - if err != nil { - t.Fatal(err) - } - var found bool - for _, storeID := range c.expected { - if targetRepl.StoreID == storeID { - found = true - break + require.NoError(t, err) + + require.Truef( + t, + checkReplExists(targetVoter, c.expVoterRemovals), + "expected RemoveVoter(%v) in %v, but got %d; details: %s", + c.existingVoters, c.expVoterRemovals, targetVoter.StoreID, details, + ) + // Ensure that we get the same set of results if we didn't have any + // non-voting replicas. If non-voters were to have an impact on voters' + // diversity score calculations, we would fail here. + targetVoter, _, err = a.RemoveVoter( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + c.existingVoters, /* voterCandidates */ + c.existingVoters, + nil, /* existingNonVoters */ + ) + require.NoError(t, err) + require.Truef(t, checkReplExists(targetVoter, c.expVoterRemovals), + "voter target for removal differs from expectation when non-voters are present;"+ + " expected %v, got %d", c.expVoterRemovals, targetVoter.StoreID) + + targetNonVoter, _, err := a.RemoveNonVoter( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + c.existingNonVoters, /* nonVoterCandidates */ + c.existingVoters, + c.existingNonVoters, + ) + require.NoError(t, err) + require.True(t, checkReplExists(targetNonVoter, c.expNonVoterRemovals)) + } +} + +// TestAllocatorConstraintsAndVoterConstraints tests that allocation of voting +// replicas respects both the `constraints` and the `voter_constraints` and the +// allocation of non-voting replicas respects just the `constraints`. +func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + name string + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor + stores []*roachpb.StoreDescriptor + zone *zonepb.ZoneConfig + expectedVoters, expectedNonVoters []roachpb.StoreID + shouldVoterAllocFail, shouldNonVoterAllocFail bool + expError string + }{ + { + name: "one store satisfies constraints for each type of replica", + stores: multiDCStores, + zone: &multiDCConfigVoterAndNonVoter, + expectedVoters: []roachpb.StoreID{2}, + expectedNonVoters: []roachpb.StoreID{1}, + }, + { + name: "only voter can satisfy constraints", + stores: multiDCStores, + zone: &multiDCConfigConstrainToA, + expectedVoters: []roachpb.StoreID{1}, + shouldNonVoterAllocFail: true, + }, + { + name: "only non_voter can satisfy constraints", + stores: multiDCStores, + zone: &multiDCConfigUnsatisfiableVoterConstraints, + shouldVoterAllocFail: true, + expectedNonVoters: []roachpb.StoreID{1, 2}, + }, + } + + check := func(target roachpb.StoreID, stores []roachpb.StoreID) bool { + for _, s := range stores { + if s == target { + return true } } - if !found { - t.Errorf("expected RemoveTarget(%v) in %v, but got %d; details: %s", c.existing, c.expected, targetRepl.StoreID, details) - } + return false + } + + for i, test := range testCases { + t.Run(fmt.Sprintf("%d:%s", i+1, test.name), func(t *testing.T) { + ctx := context.Background() + stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(test.stores, t) + + // Allocate the voting replica first, before the non-voter. This is the + // order in which we'd expect the allocator to repair a given range. See + // TestAllocatorComputeAction. + voterTarget, _, err := a.AllocateVoter(ctx, test.zone, test.existingVoters, test.existingNonVoters) + if test.shouldVoterAllocFail { + require.Errorf(t, err, "expected voter allocation to fail; got %v as a valid target instead", voterTarget) + } else { + require.NoError(t, err) + require.True(t, check(voterTarget.StoreID, test.expectedVoters)) + test.existingVoters = append(test.existingVoters, replicas(voterTarget.StoreID)...) + } + + nonVoterTarget, _, err := a.AllocateNonVoter(ctx, test.zone, test.existingVoters, test.existingNonVoters) + if test.shouldNonVoterAllocFail { + require.Errorf(t, err, "expected non-voter allocation to fail; got %v as a valid target instead", nonVoterTarget) + } else { + require.True(t, check(nonVoterTarget.StoreID, test.expectedNonVoters)) + require.NoError(t, err) + } + }) } } @@ -2192,11 +2334,7 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { StoreID: storeID, } } - targetStore, details, err := a.AllocateTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - existingRepls, - ) + targetStore, details, err := a.AllocateVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), existingRepls, nil) if err != nil { t.Fatal(err) } @@ -2208,7 +2346,7 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { } } if !found { - t.Errorf("expected AllocateTarget(%v) in %v, but got %d; details: %s", c.existing, c.expected, targetStore.StoreID, details) + t.Errorf("expected AllocateVoter(%v) in %v, but got %d; details: %s", c.existing, c.expected, targetStore.StoreID, details) } } } @@ -2524,7 +2662,9 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { // No constraints. zone := &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: nil} analyzed := constraint.AnalyzeConstraints( - context.Background(), a.storePool.getStoreDescriptor, existingRepls, zone) + context.Background(), a.storePool.getStoreDescriptor, existingRepls, *zone.NumReplicas, + zone.Constraints) + checkFn := voterConstraintsCheckerForAllocation(analyzed, constraint.EmptyAnalyzedConstraints) a.storePool.isNodeReadyForRoutineReplicaTransfer = func(_ context.Context, n roachpb.NodeID) bool { for _, s := range tc.excluded { @@ -2537,18 +2677,16 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { } t.Run(fmt.Sprintf("%d/allocate", testIdx), func(t *testing.T) { - candidates := allocateCandidates( - context.Background(), + candidates := rankedCandidateListForAllocation(context.Background(), sl, - analyzed, + checkFn, existingRepls, a.storePool.getLocalitiesByStore(existingRepls), a.storePool.isNodeReadyForRoutineReplicaTransfer, - a.scorerOptions(), - ) + a.scorerOptions()) if !expectedStoreIDsMatch(tc.expected, candidates) { - t.Errorf("expected allocateCandidates(%v) = %v, but got %v", + t.Errorf("expected rankedCandidateListForAllocation(%v) = %v, but got %v", tc.existing, tc.expected, candidates) } }) @@ -2576,6 +2714,88 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { } } +// TestAllocatorNonVoterAllocationExcludesVoterNodes checks that when allocating +// non-voting replicas, stores that have any existing replica (voting or +// non-voting) are excluded from the list of candidates. +func TestAllocatorNonVoterAllocationExcludesVoterNodes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + name string + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor + stores []*roachpb.StoreDescriptor + zone *zonepb.ZoneConfig + expected roachpb.StoreID + shouldFail bool + expError string + }{ + { + name: "voters only", + existingNonVoters: replicas(1, 2, 3, 4), + stores: sameDCStores, + zone: zonepb.EmptyCompleteZoneConfig(), + // Expect that that the store that doesn't have any replicas would be + // the one to receive a new non-voter. + expected: roachpb.StoreID(5), + }, + { + name: "non-voters only", + existingNonVoters: replicas(1, 2, 3, 4), + stores: sameDCStores, + zone: zonepb.EmptyCompleteZoneConfig(), + expected: roachpb.StoreID(5), + }, + { + name: "mixed", + existingVoters: replicas(1, 2), + existingNonVoters: replicas(3, 4), + stores: sameDCStores, + zone: zonepb.EmptyCompleteZoneConfig(), + expected: roachpb.StoreID(5), + }, + { + name: "only valid store has a voter", + // Place a voter on the only store that would meet the constraints of + // `multiDCConfigConstrainToA`. + existingVoters: replicas(1), + stores: multiDCStores, + zone: &multiDCConfigConstrainToA, + shouldFail: true, + expError: "0 of 2 live stores are able to take a new replica for the range", + }, + { + name: "only valid store has a non_voter", + // Place a non-voter on the only store that would meet the constraints of + // `multiDCConfigConstrainToA`. + existingNonVoters: replicas(1), + stores: multiDCStores, + zone: &multiDCConfigConstrainToA, + shouldFail: true, + expError: "0 of 2 live stores are able to take a new replica for the range", + }, + } + + for i, test := range testCases { + t.Run(fmt.Sprintf("%d:%s", i+1, test.name), func(t *testing.T) { + ctx := context.Background() + stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(test.stores, t) + + result, _, err := a.AllocateNonVoter(ctx, test.zone, test.existingVoters, test.existingNonVoters) + if test.shouldFail { + require.Error(t, err) + require.Regexp(t, test.expError, err) + } else { + require.NoError(t, err) + require.Equal(t, test.expected, result.StoreID) + } + }) + } +} + func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2786,16 +3006,17 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { } zone := &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: tc.constraints} analyzed := constraint.AnalyzeConstraints( - context.Background(), a.storePool.getStoreDescriptor, existingRepls, zone) - candidates := allocateCandidates( - context.Background(), + context.Background(), a.storePool.getStoreDescriptor, existingRepls, *zone.NumReplicas, + zone.Constraints) + checkFn := voterConstraintsCheckerForAllocation(analyzed, constraint.EmptyAnalyzedConstraints) + + candidates := rankedCandidateListForAllocation(context.Background(), sl, - analyzed, + checkFn, existingRepls, a.storePool.getLocalitiesByStore(existingRepls), - func(context.Context, roachpb.NodeID) bool { return true }, /* isNodeValidForRoutineReplicaTransfer */ - a.scorerOptions(), - ) + func(context.Context, roachpb.NodeID) bool { return true }, + a.scorerOptions()) best := candidates.best() match := true if len(tc.expected) != len(best) { @@ -2812,7 +3033,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { } } if !match { - t.Errorf("%d: expected allocateCandidates(%v) = %v, but got %v", + t.Errorf("%d: expected rankedCandidateListForAllocation(%v) = %v, but got %v", testIdx, tc.existing, tc.expected, candidates) } } @@ -3010,18 +3231,33 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { StoreID: storeID, } } - zone := &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: tc.constraints} - analyzed := constraint.AnalyzeConstraints( - context.Background(), a.storePool.getStoreDescriptor, existingRepls, zone) - candidates := removeCandidates( - sl, - analyzed, + ctx := context.Background() + analyzed := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor, existingRepls, + 0 /* numReplicas */, tc.constraints) + + // Check behavior in a zone config where `voter_constraints` are empty. + checkFn := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints) + candidates := rankedCandidateListForRemoval(sl, + checkFn, a.storePool.getLocalitiesByStore(existingRepls), - a.scorerOptions(), - ) + a.scorerOptions()) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { - t.Errorf("%d: expected removeCandidates(%v) = %v, but got %v", - testIdx, tc.existing, tc.expected, candidates) + t.Errorf("%d (with `constraints`): expected rankedCandidateListForRemoval(%v)"+ + " = %v, but got %v\n for candidates %v", testIdx, tc.existing, tc.expected, + candidates.worst(), candidates) + } + + // Check that we'd see the same result if the same constraints were + // specified as `voter_constraints`. + checkFn = voterConstraintsCheckerForRemoval(constraint.EmptyAnalyzedConstraints, analyzed) + candidates = rankedCandidateListForRemoval(sl, + checkFn, + a.storePool.getLocalitiesByStore(existingRepls), + a.scorerOptions()) + if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { + t.Errorf("%d (with `voter_constraints`): expected rankedCandidateListForRemoval(%v)"+ + " = %v, but got %v\n for candidates %v", testIdx, tc.existing, tc.expected, + candidates.worst(), candidates) } } } @@ -3807,7 +4043,8 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { NumReplicas: proto.Int32(tc.zoneNumReplicas), } analyzed := constraint.AnalyzeConstraints( - context.Background(), a.storePool.getStoreDescriptor, existingRepls, zone) + context.Background(), a.storePool.getStoreDescriptor, existingRepls, + *zone.NumReplicas, zone.Constraints) results := rebalanceCandidates( context.Background(), sl, @@ -4138,13 +4375,13 @@ func TestLoadBasedLeaseRebalanceScore(t *testing.T) { } } -// TestAllocatorRemoveTarget verifies that the replica chosen by RemoveTarget is -// the one with the lowest capacity. -func TestAllocatorRemoveTarget(t *testing.T) { +// TestAllocatorRemoveTargetBasedOnCapacity verifies that the replica chosen by +// RemoveVoter is the one with the lowest capacity. +func TestAllocatorRemoveTargetBasedOnCapacity(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // List of replicas that will be passed to RemoveTarget + // List of replicas that will be passed to RemoveVoter replicas := []roachpb.ReplicaDescriptor{ { StoreID: 1, @@ -4211,17 +4448,13 @@ func TestAllocatorRemoveTarget(t *testing.T) { // Repeat this test 10 times, it should always be either store 2 or 3. for i := 0; i < 10; i++ { - targetRepl, _, err := a.RemoveTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - replicas, - replicas, - ) + targetRepl, _, err := a.RemoveVoter(ctx, zonepb.EmptyCompleteZoneConfig(), replicas, replicas, + nil) if err != nil { t.Fatal(err) } if a, e1, e2 := targetRepl, replicas[1], replicas[2]; a != e1 && a != e2 { - t.Fatalf("%d: RemoveTarget did not select either expected replica; expected %v or %v, got %v", + t.Fatalf("%d: RemoveVoter did not select either expected replica; expected %v or %v, got %v", i, e1, e2, a) } } @@ -5531,25 +5764,40 @@ func TestAllocatorError(t *testing.T) { ae allocatorError expected string }{ - {allocatorError{constraints: nil, existingReplicas: 1, aliveStores: 1}, - "0 of 1 live stores are able to take a new replica for the range (1 already has a replica); likely not enough nodes in cluster"}, - {allocatorError{constraints: nil, existingReplicas: 1, aliveStores: 2, throttledStores: 1}, - "0 of 2 live stores are able to take a new replica for the range (1 throttled, 1 already has a replica)"}, - {allocatorError{constraints: constraint, existingReplicas: 1, aliveStores: 1}, - `0 of 1 live stores are able to take a new replica for the range (1 already has a replica); ` + - `must match constraints [{+one}]`}, - {allocatorError{constraints: constraint, existingReplicas: 1, aliveStores: 2}, - `0 of 2 live stores are able to take a new replica for the range (1 already has a replica); ` + - `must match constraints [{+one}]`}, - {allocatorError{constraints: constraints, existingReplicas: 1, aliveStores: 1}, - `0 of 1 live stores are able to take a new replica for the range (1 already has a replica); ` + - `must match constraints [{+one,+two}]`}, - {allocatorError{constraints: constraints, existingReplicas: 1, aliveStores: 2}, - `0 of 2 live stores are able to take a new replica for the range (1 already has a replica); ` + - `must match constraints [{+one,+two}]`}, - {allocatorError{constraints: constraint, existingReplicas: 1, aliveStores: 2, throttledStores: 1}, - `0 of 2 live stores are able to take a new replica for the range (1 throttled, 1 already has a replica); ` + - `must match constraints [{+one}]`}, + {allocatorError{constraints: nil, existingVoterCount: 1, aliveStores: 1}, + "0 of 1 live stores are able to take a new replica for the range" + + " (1 already has a voter, 0 already have a non-voter); likely not enough nodes in cluster", + }, + {allocatorError{constraints: nil, existingVoterCount: 1, aliveStores: 2, throttledStores: 1}, + "0 of 2 live stores are able to take a new replica for the range" + + " (1 throttled, 1 already has a voter, 0 already have a non-voter)"}, + {allocatorError{constraints: constraint, existingVoterCount: 1, aliveStores: 1}, + "0 of 1 live stores are able to take a new replica for the range" + + " (1 already has a voter, 0 already have a non-voter);" + + " replicas must match constraints [{+one}];" + + " voting replicas must match voter_constraints []", + }, + {allocatorError{constraints: constraint, existingVoterCount: 1, aliveStores: 2}, + "0 of 2 live stores are able to take a new replica for the range" + + " (1 already has a voter, 0 already have a non-voter);" + + " replicas must match constraints [{+one}];" + + " voting replicas must match voter_constraints []"}, + {allocatorError{constraints: constraints, existingVoterCount: 1, aliveStores: 1}, + "0 of 1 live stores are able to take a new replica for the range" + + " (1 already has a voter, 0 already have a non-voter);" + + " replicas must match constraints [{+one,+two}];" + + " voting replicas must match voter_constraints []"}, + {allocatorError{constraints: constraints, existingVoterCount: 1, aliveStores: 2}, + "0 of 2 live stores are able to take a new replica for the range" + + " (1 already has a voter, 0 already have a non-voter);" + + " replicas must match constraints [{+one,+two}];" + + " voting replicas must match voter_constraints []"}, + {allocatorError{constraints: constraint, existingVoterCount: 1, aliveStores: 2, throttledStores: 1}, + "0 of 2 live stores are able to take a new replica for the range" + + " (1 throttled, 1 already has a voter, 0 already have a non-voter);" + + " replicas must match constraints [{+one}];" + + " voting replicas must match voter_constraints []", + }, } for i, testCase := range testCases { @@ -5568,22 +5816,14 @@ func TestAllocatorThrottled(t *testing.T) { defer stopper.Stop(ctx) // First test to make sure we would send the replica to purgatory. - _, _, err := a.AllocateTarget( - ctx, - &simpleZoneConfig, - []roachpb.ReplicaDescriptor{}, - ) + _, _, err := a.AllocateVoter(ctx, &simpleZoneConfig, []roachpb.ReplicaDescriptor{}, nil) if !errors.HasInterface(err, (*purgatoryError)(nil)) { t.Fatalf("expected a purgatory error, got: %+v", err) } // Second, test the normal case in which we can allocate to the store. gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) - result, _, err := a.AllocateTarget( - ctx, - &simpleZoneConfig, - []roachpb.ReplicaDescriptor{}, - ) + result, _, err := a.AllocateVoter(ctx, &simpleZoneConfig, []roachpb.ReplicaDescriptor{}, nil) if err != nil { t.Fatalf("unable to perform allocation: %+v", err) } @@ -5600,11 +5840,7 @@ func TestAllocatorThrottled(t *testing.T) { } storeDetail.throttledUntil = timeutil.Now().Add(24 * time.Hour) a.storePool.detailsMu.Unlock() - _, _, err = a.AllocateTarget( - ctx, - &simpleZoneConfig, - []roachpb.ReplicaDescriptor{}, - ) + _, _, err = a.AllocateVoter(ctx, &simpleZoneConfig, []roachpb.ReplicaDescriptor{}, nil) if errors.HasInterface(err, (*purgatoryError)(nil)) { t.Fatalf("expected a non purgatory error, got: %+v", err) } diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index f2bfb7231e7d..5b911ae03179 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -3496,6 +3496,9 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) { func verifyMerged(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey) { t.Helper() repl := store.LookupReplica(rhsStartKey) + if repl == nil { + t.Fatal("replica doesn't exist") + } if !repl.Desc().StartKey.Equal(lhsStartKey) { t.Fatalf("ranges unexpectedly unmerged expected startKey %s, but got %s", lhsStartKey, repl.Desc().StartKey) } @@ -3504,6 +3507,9 @@ func verifyMerged(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey func verifyUnmerged(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey) { t.Helper() repl := store.LookupReplica(rhsStartKey) + if repl == nil { + t.Fatal("replica doesn't exist") + } if repl.Desc().StartKey.Equal(lhsStartKey) { t.Fatalf("ranges unexpectedly merged") } diff --git a/pkg/kv/kvserver/constraint/analyzer.go b/pkg/kv/kvserver/constraint/analyzer.go index e5e191ec5e0f..b037be2c678b 100644 --- a/pkg/kv/kvserver/constraint/analyzer.go +++ b/pkg/kv/kvserver/constraint/analyzer.go @@ -36,6 +36,10 @@ type AnalyzedConstraints struct { Satisfies map[roachpb.StoreID][]int } +// EmptyAnalyzedConstraints represents an empty set of constraints that are +// satisfied by any given configuration of replicas. +var EmptyAnalyzedConstraints = AnalyzedConstraints{} + // AnalyzeConstraints processes the zone config constraints that apply to a // range along with the current replicas for a range, spitting back out // information about which constraints are satisfied by which replicas and @@ -44,19 +48,20 @@ func AnalyzeConstraints( ctx context.Context, getStoreDescFn func(roachpb.StoreID) (roachpb.StoreDescriptor, bool), existing []roachpb.ReplicaDescriptor, - zone *zonepb.ZoneConfig, + numReplicas int32, + constraints []zonepb.ConstraintsConjunction, ) AnalyzedConstraints { result := AnalyzedConstraints{ - Constraints: zone.Constraints, + Constraints: constraints, } - if len(zone.Constraints) > 0 { - result.SatisfiedBy = make([][]roachpb.StoreID, len(zone.Constraints)) + if len(constraints) > 0 { + result.SatisfiedBy = make([][]roachpb.StoreID, len(constraints)) result.Satisfies = make(map[roachpb.StoreID][]int) } var constrainedReplicas int32 - for i, subConstraints := range zone.Constraints { + for i, subConstraints := range constraints { constrainedReplicas += subConstraints.NumReplicas for _, repl := range existing { // If for some reason we don't have the store descriptor (which shouldn't @@ -70,7 +75,7 @@ func AnalyzeConstraints( } } } - if constrainedReplicas > 0 && constrainedReplicas < *zone.NumReplicas { + if constrainedReplicas > 0 && constrainedReplicas < numReplicas { result.UnconstrainedReplicas = true } return result diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index b3e2870fd6f0..3676a1c656c3 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2494,6 +2494,13 @@ func (s *Store) relocateReplicas( } } +type relocationArgs struct { + targetsToAdd, targetsToRemove []roachpb.ReplicaDescriptor + addOp, removeOp roachpb.ReplicaChangeType + relocationTargets []roachpb.ReplicationTarget + targetType targetReplicaType +} + func (s *Store) relocateOne( ctx context.Context, desc *roachpb.RangeDescriptor, @@ -2518,17 +2525,34 @@ func (s *Store) relocateOne( storeList, _, _ := s.allocator.storePool.getStoreList(storeFilterNone) storeMap := storeListToMap(storeList) - getTargetsToRelocate := func() (targetsToAdd, targetsToRemove []roachpb.ReplicaDescriptor, - addOp, removeOp roachpb.ReplicaChangeType, votersRelocated bool) { + getRelocationArgs := func() relocationArgs { votersToAdd := subtractTargets(voterTargets, desc.Replicas().Voters().ReplicationTargets()) votersToRemove := subtractTargets(desc.Replicas().Voters().ReplicationTargets(), voterTargets) // If there are no voters to relocate, we relocate the non-voters. + // + // NB: This means that non-voters are handled after all voters have been + // relocated since relocateOne is expected to be called repeatedly until + // there are no more replicas to relocate. if len(votersToAdd) == 0 && len(votersToRemove) == 0 { nonVotersToAdd := subtractTargets(nonVoterTargets, desc.Replicas().NonVoters().ReplicationTargets()) nonVotersToRemove := subtractTargets(desc.Replicas().NonVoters().ReplicationTargets(), nonVoterTargets) - return nonVotersToAdd, nonVotersToRemove, roachpb.ADD_NON_VOTER, roachpb.REMOVE_NON_VOTER, true + return relocationArgs{ + targetsToAdd: nonVotersToAdd, + targetsToRemove: nonVotersToRemove, + addOp: roachpb.ADD_NON_VOTER, + removeOp: roachpb.REMOVE_NON_VOTER, + relocationTargets: nonVoterTargets, + targetType: nonVoterTarget, + } + } + return relocationArgs{ + targetsToAdd: votersToAdd, + targetsToRemove: votersToRemove, + addOp: roachpb.ADD_VOTER, + removeOp: roachpb.REMOVE_VOTER, + relocationTargets: voterTargets, + targetType: voterTarget, } - return votersToAdd, votersToRemove, roachpb.ADD_VOTER, roachpb.REMOVE_VOTER, false } // Compute which replica to add and/or remove, respectively. We then ask the @@ -2540,24 +2564,22 @@ func (s *Store) relocateOne( // same node, and this code doesn't do anything to specifically avoid that // case (although the allocator will avoid even trying to send snapshots to // such stores), so it could cause some failures. - targetsToAdd, targetsToRemove, addOp, removeOp, votersRelocated := getTargetsToRelocate() - relocationTargets := voterTargets - existingReplicas := desc.Replicas().VoterDescriptors() - if votersRelocated { - relocationTargets = nonVoterTargets - existingReplicas = desc.Replicas().NonVoterDescriptors() - } + args := getRelocationArgs() + existingVoters := desc.Replicas().VoterDescriptors() + existingNonVoters := desc.Replicas().NonVoterDescriptors() + existingReplicas := desc.Replicas().Descriptors() var ops roachpb.ReplicationChanges - if len(targetsToAdd) > 0 { + if len(args.targetsToAdd) > 0 { // Each iteration, pick the most desirable replica to add. However, // prefer the first target because it's the one that should hold the // lease in the end; it helps to add it early so that the lease doesn't // have to move too much. - candidateTargets := targetsToAdd - if !votersRelocated && storeHasReplica(relocationTargets[0].StoreID, candidateTargets) { + candidateTargets := args.targetsToAdd + if args.targetType == voterTarget && + storeHasReplica(args.relocationTargets[0].StoreID, candidateTargets) { candidateTargets = []roachpb.ReplicaDescriptor{ - {NodeID: relocationTargets[0].NodeID, StoreID: relocationTargets[0].StoreID}, + {NodeID: args.relocationTargets[0].NodeID, StoreID: args.relocationTargets[0].StoreID}, } } @@ -2579,42 +2601,51 @@ func (s *Store) relocateOne( ctx, candidateStoreList, zone, - existingReplicas, - s.allocator.scorerOptions()) - if targetStore == nil { - return nil, nil, fmt.Errorf("none of the remaining relocationTargets %v are legal additions to %v", - targetsToAdd, desc.Replicas()) - } + existingVoters, + existingNonVoters, + s.allocator.scorerOptions(), + args.targetType) target := roachpb.ReplicationTarget{ NodeID: targetStore.Node.NodeID, StoreID: targetStore.StoreID, } - ops = append(ops, roachpb.MakeReplicationChanges(addOp, target)...) + ops = append(ops, roachpb.MakeReplicationChanges(args.addOp, target)...) + // Pretend the replica is already there so that the removal logic below will // take it into account when deciding which replica to remove. - existingReplicas = append(existingReplicas, roachpb.ReplicaDescriptor{ - NodeID: target.NodeID, - StoreID: target.StoreID, - ReplicaID: desc.NextReplicaID, - Type: roachpb.ReplicaTypeVoterFull(), - }) + if args.targetType == nonVoterTarget { + existingNonVoters = append(existingNonVoters, roachpb.ReplicaDescriptor{ + NodeID: target.NodeID, + StoreID: target.StoreID, + ReplicaID: desc.NextReplicaID, + Type: roachpb.ReplicaTypeNonVoter(), + }) + } else { + existingVoters = append(existingVoters, roachpb.ReplicaDescriptor{ + NodeID: target.NodeID, + StoreID: target.StoreID, + ReplicaID: desc.NextReplicaID, + Type: roachpb.ReplicaTypeVoterFull(), + }) + } } var transferTarget *roachpb.ReplicationTarget - if len(targetsToRemove) > 0 { - // Pick a replica to remove. Note that existingReplicas may already reflect - // a replica we're adding in the current round. This is the right thing - // to do. For example, consider relocating from (s1,s2,s3) to (s1,s2,s4) - // where targetsToAdd will be (s4) and targetsToRemove is (s3). In this code, - // we'll want the allocator to see if s3 can be removed from + if len(args.targetsToRemove) > 0 { + // Pick a replica to remove. Note that existingVoters/existingNonVoters may + // already reflect a replica we're adding in the current round. This is the + // right thing to do. For example, consider relocating from (s1,s2,s3) to + // (s1,s2,s4) where targetsToAdd will be (s4) and targetsToRemove is (s3). + // In this code, we'll want the allocator to see if s3 can be removed from // (s1,s2,s3,s4) which is a reasonable request; that replica set is - // overreplicated. If we asked it instead to remove s3 from (s1,s2,s3) - // it may not want to do that due to constraints. - targetStore, _, err := s.allocator.RemoveTarget(ctx, zone, targetsToRemove, existingReplicas) + // overreplicated. If we asked it instead to remove s3 from (s1,s2,s3) it + // may not want to do that due to constraints. + targetStore, _, err := s.allocator.removeTarget(ctx, zone, args.targetsToRemove, existingVoters, + existingNonVoters, args.targetType) if err != nil { return nil, nil, errors.Wrapf(err, "unable to select removal target from %v; current replicas %v", - targetsToRemove, existingReplicas) + args.targetsToRemove, existingReplicas) } removalTarget := roachpb.ReplicationTarget{ NodeID: targetStore.NodeID, @@ -2634,17 +2665,16 @@ func (s *Store) relocateOne( } curLeaseholder := b.RawResponse().Responses[0].GetLeaseInfo().Lease.Replica ok := curLeaseholder.StoreID != removalTarget.StoreID - if !ok { - // Pick a replica that we can give the lease to. We sort the first - // target to the beginning (if it's there) because that's where the - // lease needs to be in the end. We also exclude the last replica if - // it was added by the add branch above (in which case it doesn't - // exist yet). - sortedTargetReplicas := append([]roachpb.ReplicaDescriptor(nil), existingReplicas[:len(existingReplicas)-len(ops)]...) + if !ok && args.targetType == voterTarget { + // Pick a voting replica that we can give the lease to. We sort the first + // target to the beginning (if it's there) because that's where the lease + // needs to be in the end. We also exclude the last replica if it was + // added by the add branch above (in which case it doesn't exist yet). + sortedTargetReplicas := append([]roachpb.ReplicaDescriptor(nil), existingVoters[:len(existingVoters)-len(ops)]...) sort.Slice(sortedTargetReplicas, func(i, j int) bool { sl := sortedTargetReplicas // relocationTargets[0] goes to the front (if it's present). - return sl[i].StoreID == relocationTargets[0].StoreID + return sl[i].StoreID == args.relocationTargets[0].StoreID }) for _, rDesc := range sortedTargetReplicas { if rDesc.StoreID != curLeaseholder.StoreID { @@ -2665,7 +2695,7 @@ func (s *Store) relocateOne( // illegal). if ok { ops = append(ops, roachpb.MakeReplicationChanges( - removeOp, + args.removeOp, removalTarget)...) } } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 9494e1cfc9cc..dd75766ff382 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -120,6 +120,7 @@ func (e *quorumError) Error() string { func (*quorumError) purgatoryErrorMarker() {} // ReplicateQueueMetrics is the set of metrics for the replicate queue. +// TODO(aayush): Track metrics for non-voting replicas separately here. type ReplicateQueueMetrics struct { AddReplicaCount *metric.Counter RemoveReplicaCount *metric.Counter @@ -322,7 +323,9 @@ func (rq *replicateQueue) processOneChange( // Avoid taking action if the range has too many dead replicas to make // quorum. voterReplicas := desc.Replicas().VoterDescriptors() + nonVoterReplicas := desc.Replicas().NonVoterDescriptors() liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(voterReplicas) + liveNonVoterReplicas, _ := rq.allocator.storePool.liveAndDeadReplicas(nonVoterReplicas) // NB: the replication layer ensures that the below operations don't cause // unavailability; see: @@ -345,9 +348,14 @@ func (rq *replicateQueue) processOneChange( // Let the scanner requeue it again later. return false, nil case AllocatorAddVoter: - return rq.addOrReplace(ctx, repl, voterReplicas, liveVoterReplicas, -1 /* removeIdx */, dryRun) + return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, + -1 /* removeIdx */, dryRun) + case AllocatorAddNonVoter: + return rq.addNonVoter(ctx, repl, nonVoterReplicas, liveVoterReplicas, liveNonVoterReplicas, dryRun) case AllocatorRemoveVoter: - return rq.remove(ctx, repl, voterReplicas, dryRun) + return rq.removeVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) + case AllocatorRemoveNonVoter: + return rq.removeNonVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) case AllocatorReplaceDeadVoter: if len(deadVoterReplicas) == 0 { // Nothing to do. @@ -365,7 +373,7 @@ func (rq *replicateQueue) processOneChange( "dead voter %v unexpectedly not found in %v", deadVoterReplicas[0], voterReplicas) } - return rq.addOrReplace(ctx, repl, voterReplicas, liveVoterReplicas, removeIdx, dryRun) + return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) case AllocatorReplaceDecommissioningVoter: decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas(voterReplicas) if len(decommissioningReplicas) == 0 { @@ -384,7 +392,7 @@ func (rq *replicateQueue) processOneChange( "decommissioning voter %v unexpectedly not found in %v", decommissioningReplicas[0], voterReplicas) } - return rq.addOrReplace(ctx, repl, voterReplicas, liveVoterReplicas, removeIdx, dryRun) + return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) case AllocatorRemoveDecommissioningVoter: // NB: this path will only be hit when the range is over-replicated and // has decommissioning replicas; in the common case we'll hit @@ -404,51 +412,50 @@ func (rq *replicateQueue) processOneChange( // Requeue because either we failed to transition out of a joint state // (bad) or we did and there might be more to do for that range. return true, err - case AllocatorAddNonVoter, AllocatorRemoveNonVoter: - return false, errors.Errorf("allocator actions pertaining to non-voters are"+ - " currently not supported: %v", action) default: return false, errors.Errorf("unknown allocator action %v", action) } } -// addOrReplace adds or replaces a replica. If removeIdx is -1, an addition is -// carried out. Otherwise, removeIdx must be a valid index into existingReplicas -// and specifies which replica to replace with a new one. +// addOrReplaceVoters adds or replaces a voting replica. If removeIdx is -1, an +// addition is carried out. Otherwise, removeIdx must be a valid index into +// existingVoters and specifies which voter to replace with a new one. // // The method preferably issues an atomic replica swap, but may not be able to // do this in all cases, such as when atomic replication changes are not // available, or when the range consists of a single replica. As a fall back, // only the addition is carried out; the removal is then a follow-up step for // the next scanner cycle. -func (rq *replicateQueue) addOrReplace( +func (rq *replicateQueue) addOrReplaceVoters( ctx context.Context, repl *Replica, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, liveVoterReplicas []roachpb.ReplicaDescriptor, + liveNonVoterReplicas []roachpb.ReplicaDescriptor, removeIdx int, // -1 for no removal dryRun bool, ) (requeue bool, _ error) { - if len(existingReplicas) == 1 { + if len(existingVoters) == 1 { // If only one replica remains, that replica is the leaseholder and // we won't be able to swap it out. Ignore the removal and simply add // a replica. removeIdx = -1 } - remainingLiveReplicas := liveVoterReplicas + remainingLiveVoters := liveVoterReplicas + remainingLiveNonVoters := liveNonVoterReplicas if removeIdx >= 0 { - replToRemove := existingReplicas[removeIdx] + replToRemove := existingVoters[removeIdx] for i, r := range liveVoterReplicas { if r.ReplicaID == replToRemove.ReplicaID { - remainingLiveReplicas = append(liveVoterReplicas[:i:i], liveVoterReplicas[i+1:]...) + remainingLiveVoters = append(liveVoterReplicas[:i:i], liveVoterReplicas[i+1:]...) break } } // See about transferring the lease away if we're about to remove the // leaseholder. done, err := rq.maybeTransferLeaseAway( - ctx, repl, existingReplicas[removeIdx].StoreID, dryRun, nil /* canTransferLease */) + ctx, repl, existingVoters[removeIdx].StoreID, dryRun, nil /* canTransferLease */) if err != nil { return false, err } @@ -464,15 +471,11 @@ func (rq *replicateQueue) addOrReplace( // there is a reason we're removing it (i.e. dead or decommissioning). If we // left the replica in the slice, the allocator would not be guaranteed to // pick a replica that fills the gap removeRepl leaves once it's gone. - newStore, details, err := rq.allocator.AllocateTarget( - ctx, - zone, - remainingLiveReplicas, - ) + newStore, details, err := rq.allocator.AllocateVoter(ctx, zone, remainingLiveVoters, remainingLiveNonVoters) if err != nil { return false, err } - if removeIdx >= 0 && newStore.StoreID == existingReplicas[removeIdx].StoreID { + if removeIdx >= 0 && newStore.StoreID == existingVoters[removeIdx].StoreID { return false, errors.AssertionFailedf("allocator suggested to replace replica on s%d with itself", newStore.StoreID) } newReplica := roachpb.ReplicationTarget{ @@ -481,7 +484,7 @@ func (rq *replicateQueue) addOrReplace( } clusterNodes := rq.allocator.storePool.ClusterNodeCount() - need := GetNeededVoters(*zone.NumReplicas, clusterNodes) + neededVoters := GetNeededVoters(zone.GetNumVoters(), clusterNodes) // Only up-replicate if there are suitable allocation targets such that, // either the replication goal is met, or it is possible to get to the next @@ -490,28 +493,24 @@ func (rq *replicateQueue) addOrReplace( // quorum. For example, up-replicating from 1 to 2 replicas only makes sense // if it is possible to be able to go to 3 replicas. // - // NB: If willHave > need, then always allow up-replicating as that + // NB: If willHave > neededVoters, then always allow up-replicating as that // will be the case when up-replicating a range with a decommissioning // replica. // // We skip this check if we're swapping a replica, since that does not // change the quorum size. - if willHave := len(existingReplicas) + 1; removeIdx < 0 && willHave < need && willHave%2 == 0 { + if willHave := len(existingVoters) + 1; removeIdx < 0 && willHave < neededVoters && willHave%2 == 0 { // This means we are going to up-replicate to an even replica state. // Check if it is possible to go to an odd replica state beyond it. - oldPlusNewReplicas := append([]roachpb.ReplicaDescriptor(nil), existingReplicas...) + oldPlusNewReplicas := append([]roachpb.ReplicaDescriptor(nil), existingVoters...) oldPlusNewReplicas = append(oldPlusNewReplicas, roachpb.ReplicaDescriptor{ NodeID: newStore.Node.NodeID, StoreID: newStore.StoreID, }) - _, _, err := rq.allocator.AllocateTarget( - ctx, - zone, - oldPlusNewReplicas, - ) + _, _, err := rq.allocator.AllocateVoter(ctx, zone, oldPlusNewReplicas, remainingLiveNonVoters) if err != nil { // It does not seem possible to go to the next odd replica state. Note - // that AllocateTarget returns an allocatorError (a purgatoryError) + // that AllocateVoter returns an allocatorError (a purgatoryError) // when purgatory is requested. return false, errors.Wrap(err, "avoid up-replicating to fragile quorum") } @@ -520,12 +519,12 @@ func (rq *replicateQueue) addOrReplace( ops := roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newReplica) if removeIdx < 0 { log.VEventf(ctx, 1, "adding replica %+v: %s", - newReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas)) + newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) } else { rq.metrics.RemoveReplicaCount.Inc(1) - removeReplica := existingReplicas[removeIdx] + removeReplica := existingVoters[removeIdx] log.VEventf(ctx, 1, "replacing replica %s with %+v: %s", - removeReplica, newReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas)) + removeReplica, newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) ops = append(ops, roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, roachpb.ReplicationTarget{ StoreID: removeReplica.StoreID, @@ -549,17 +548,57 @@ func (rq *replicateQueue) addOrReplace( return true, nil } -// findRemoveTarget takes a list of replicas and picks one to remove, making -// sure to not remove a newly added replica or to violate the zone configs in -// the progress. -func (rq *replicateQueue) findRemoveTarget( +// addNonVoter adds a non-voting replica to `repl`s range. +func (rq *replicateQueue) addNonVoter( + ctx context.Context, + repl *Replica, + existingReplicas, liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, + dryRun bool, +) (requeue bool, _ error) { + desc, zone := repl.DescAndZone() + + newStore, details, err := rq.allocator.AllocateNonVoter(ctx, zone, liveVoterReplicas, liveNonVoterReplicas) + if err != nil { + return false, err + } + newNonVoter := roachpb.ReplicationTarget{ + NodeID: newStore.Node.NodeID, + StoreID: newStore.StoreID, + } + + ops := roachpb.MakeReplicationChanges(roachpb.ADD_NON_VOTER, newNonVoter) + if err := rq.changeReplicas( + ctx, + repl, + ops, + desc, + SnapshotRequest_RECOVERY, + kvserverpb.ReasonRangeUnderReplicated, + details, + dryRun, + ); err != nil { + return false, err + } + // Always requeue to see if more work needs to be done. + return true, nil +} + +// findRemoveVoter takes a list of voting replicas and picks one to remove, +// making sure to not remove a newly added voter or to violate the zone configs +// in the process. +// +// TODO(aayush): The structure around replica removal is not great. The entire +// logic of this method should probably live inside Allocator.RemoveVoter. Doing +// so also makes the flow of adding new replicas and removing replicas more +// symmetric. +func (rq *replicateQueue) findRemoveVoter( ctx context.Context, repl interface { DescAndZone() (*roachpb.RangeDescriptor, *zonepb.ZoneConfig) LastReplicaAdded() (roachpb.ReplicaID, time.Time) RaftStatus() *raft.Status }, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, ) (roachpb.ReplicaDescriptor, string, error) { _, zone := repl.DescAndZone() // This retry loop involves quick operations on local state, so a @@ -587,9 +626,9 @@ func (rq *replicateQueue) findRemoveTarget( // If we've lost raft leadership, we're unlikely to regain it so give up immediately. return roachpb.ReplicaDescriptor{}, "", &benignError{errors.Errorf("not raft leader while range needs removal")} } - candidates = filterUnremovableReplicas(ctx, raftStatus, existingReplicas, lastReplAdded) + candidates = filterUnremovableReplicas(ctx, raftStatus, existingVoters, lastReplAdded) log.VEventf(ctx, 3, "filtered unremovable replicas from %v to get %v as candidates for removal: %s", - existingReplicas, candidates, rangeRaftProgress(raftStatus, existingReplicas)) + existingVoters, candidates, rangeRaftProgress(raftStatus, existingVoters)) if len(candidates) > 0 { break } @@ -617,10 +656,10 @@ func (rq *replicateQueue) findRemoveTarget( if len(candidates) == 0 { // If we timed out and still don't have any valid candidates, give up. return roachpb.ReplicaDescriptor{}, "", &benignError{errors.Errorf("no removable replicas from range that needs a removal: %s", - rangeRaftProgress(repl.RaftStatus(), existingReplicas))} + rangeRaftProgress(repl.RaftStatus(), existingVoters))} } - return rq.allocator.RemoveTarget(ctx, zone, candidates, existingReplicas) + return rq.allocator.RemoveVoter(ctx, zone, candidates, existingVoters, existingNonVoters) } // maybeTransferLeaseAway is called whenever a replica on a given store is @@ -668,15 +707,18 @@ func (rq *replicateQueue) maybeTransferLeaseAway( return transferred == transferOK, err } -func (rq *replicateQueue) remove( - ctx context.Context, repl *Replica, existingReplicas []roachpb.ReplicaDescriptor, dryRun bool, +func (rq *replicateQueue) removeVoter( + ctx context.Context, + repl *Replica, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + dryRun bool, ) (requeue bool, _ error) { - removeReplica, details, err := rq.findRemoveTarget(ctx, repl, existingReplicas) + removeVoter, details, err := rq.findRemoveVoter(ctx, repl, existingVoters, existingNonVoters) if err != nil { return false, err } done, err := rq.maybeTransferLeaseAway( - ctx, repl, removeReplica.StoreID, dryRun, nil /* canTransferLease */) + ctx, repl, removeVoter.StoreID, dryRun, nil /* canTransferLease */) if err != nil { return false, err } @@ -687,11 +729,11 @@ func (rq *replicateQueue) remove( // Remove a replica. rq.metrics.RemoveReplicaCount.Inc(1) - log.VEventf(ctx, 1, "removing replica %+v due to over-replication: %s", - removeReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas)) + log.VEventf(ctx, 1, "removing voting replica %+v due to over-replication: %s", + removeVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) target := roachpb.ReplicationTarget{ - NodeID: removeReplica.NodeID, - StoreID: removeReplica.StoreID, + NodeID: removeVoter.NodeID, + StoreID: removeVoter.StoreID, } desc, _ := repl.DescAndZone() if err := rq.changeReplicas( @@ -709,6 +751,47 @@ func (rq *replicateQueue) remove( return true, nil } +func (rq *replicateQueue) removeNonVoter( + ctx context.Context, + repl *Replica, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + dryRun bool, +) (requeue bool, _ error) { + rq.metrics.RemoveReplicaCount.Inc(1) + + desc, zone := repl.DescAndZone() + removeNonVoter, details, err := rq.allocator.RemoveNonVoter( + ctx, + zone, + existingNonVoters, + existingVoters, existingNonVoters, + ) + if err != nil { + return false, err + } + + log.VEventf(ctx, 1, "removing non-voting replica %+v due to over-replication: %s", + removeNonVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) + target := roachpb.ReplicationTarget{ + NodeID: removeNonVoter.NodeID, + StoreID: removeNonVoter.StoreID, + } + + if err := rq.changeReplicas( + ctx, + repl, + roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, target), + desc, + SnapshotRequest_UNKNOWN, + kvserverpb.ReasonRangeOverReplicated, + details, + dryRun, + ); err != nil { + return false, err + } + return true, nil +} + func (rq *replicateQueue) removeDecommissioning( ctx context.Context, repl *Replica, dryRun bool, ) (requeue bool, _ error) { diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 914e02349d72..a159d7b76cc9 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -161,11 +161,12 @@ func testReplicateQueueRebalanceInner(t *testing.T, atomic bool) { } } -// Test that up-replication only proceeds if there are a good number of -// candidates to up-replicate to. Specifically, we won't up-replicate to an -// even number of replicas unless there is an additional candidate that will -// allow a subsequent up-replication to an odd number. -func TestReplicateQueueUpReplicate(t *testing.T) { +// TestReplicateQueueUpReplicateOddVoters tests that up-replication only +// proceeds if there are a good number of candidates to up-replicate to. +// Specifically, we won't up-replicate to an even number of replicas unless +// there is an additional candidate that will allow a subsequent up-replication +// to an odd number. +func TestReplicateQueueUpReplicateOddVoters(t *testing.T) { defer leaktest.AfterTest(t)() skip.UnderRaceWithIssue(t, 57144, "flaky under race") defer log.Scope(t).Close(t) @@ -300,6 +301,71 @@ func TestReplicateQueueDownReplicate(t *testing.T) { } } +// TestReplicateQueueUpAndDownReplicateNonVoters is an end-to-end test ensuring +// that the replicateQueue will add or remove non-voter(s) to a range based on +// updates to its zone configuration. +func TestReplicateQueueUpAndDownReplicateNonVoters(t *testing.T) { + defer leaktest.AfterTest(t)() + skip.UnderRace(t) + defer log.Scope(t).Close(t) + + tc := testcluster.StartTestCluster(t, 1, + base.TestClusterArgs{ReplicationMode: base.ReplicationAuto}, + ) + defer tc.Stopper().Stop(context.Background()) + + scratchKey := tc.ScratchRange(t) + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + + // Since we started the TestCluster with 1 node, that first node should have + // 1 voting replica. + require.Len(t, scratchRange.Replicas().VoterDescriptors(), 1) + // Set up the default zone configs such that every range should have 1 voting + // replica and 2 non-voting replicas. + _, err := tc.ServerConn(0).Exec( + `ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 3, num_voters = 1`, + ) + require.NoError(t, err) + + // Add two new servers and expect that 2 non-voters are added to the range. + tc.AddAndStartServer(t, base.TestServerArgs{}) + tc.AddAndStartServer(t, base.TestServerArgs{}) + store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore( + tc.Server(0).GetFirstStoreID()) + require.NoError(t, err) + + get := func() int { + // Nudge the replicateQueue to up/down-replicate our scratch range. + if err := store.ForceReplicationScanAndProcess(); err != nil { + t.Fatal(err) + } + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + return len(scratchRange.Replicas().NonVoterDescriptors()) + } + + var expectedNonVoterCount = 2 + testutils.SucceedsSoon(t, func() error { + if found := get(); found != expectedNonVoterCount { + return errors.Errorf("expected upreplication to %d non-voters; found %d", + expectedNonVoterCount, found) + } + return nil + }) + + // Now remove all non-voting replicas and expect that the range will + // down-replicate to having just 1 voting replica. + _, err = tc.ServerConn(0).Exec(`ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 1`) + require.NoError(t, err) + expectedNonVoterCount = 0 + testutils.SucceedsSoon(t, func() error { + if found := get(); found != expectedNonVoterCount { + return errors.Errorf("expected downreplication to %d non-voters; found %d", + expectedNonVoterCount, found) + } + return nil + }) +} + // queryRangeLog queries the range log. The query must be of type: // `SELECT info from system.rangelog ...`. func queryRangeLog( diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index f14c3a9200a9..be49480c4918 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -504,40 +504,41 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( desc.RangeID, replWithStats.qps) clusterNodes := sr.rq.allocator.storePool.ClusterNodeCount() - desiredReplicas := GetNeededVoters(*zone.NumReplicas, clusterNodes) - targets := make([]roachpb.ReplicationTarget, 0, desiredReplicas) - targetReplicas := make([]roachpb.ReplicaDescriptor, 0, desiredReplicas) - currentReplicas := desc.Replicas().Descriptors() + desiredVoters := GetNeededVoters(zone.GetNumVoters(), clusterNodes) + targets := make([]roachpb.ReplicationTarget, 0, desiredVoters) + targetVoters := make([]roachpb.ReplicaDescriptor, 0, desiredVoters) + currentVoters := desc.Replicas().VoterDescriptors() + currentNonVoters := desc.Replicas().NonVoterDescriptors() // Check the range's existing diversity score, since we want to ensure we // don't hurt locality diversity just to improve QPS. curDiversity := rangeDiversityScore( - sr.rq.allocator.storePool.getLocalitiesByStore(currentReplicas)) + sr.rq.allocator.storePool.getLocalitiesByStore(currentVoters)) // Check the existing replicas, keeping around those that aren't overloaded. - for i := range currentReplicas { - if currentReplicas[i].StoreID == localDesc.StoreID { + for i := range currentVoters { + if currentVoters[i].StoreID == localDesc.StoreID { continue } // Keep the replica in the range if we don't know its QPS or if its QPS // is below the upper threshold. Punishing stores not in our store map // could cause mass evictions if the storePool gets out of sync. - storeDesc, ok := storeMap[currentReplicas[i].StoreID] + storeDesc, ok := storeMap[currentVoters[i].StoreID] if !ok || storeDesc.Capacity.QueriesPerSecond < maxQPS { if log.V(3) { var reason redact.RedactableString if ok { reason = redact.Sprintf(" (qps %.2f vs max %.2f)", storeDesc.Capacity.QueriesPerSecond, maxQPS) } - log.VEventf(ctx, 3, "keeping r%d/%d on s%d%s", desc.RangeID, currentReplicas[i].ReplicaID, currentReplicas[i].StoreID, reason) + log.VEventf(ctx, 3, "keeping r%d/%d on s%d%s", desc.RangeID, currentVoters[i].ReplicaID, currentVoters[i].StoreID, reason) } targets = append(targets, roachpb.ReplicationTarget{ - NodeID: currentReplicas[i].NodeID, - StoreID: currentReplicas[i].StoreID, + NodeID: currentVoters[i].NodeID, + StoreID: currentVoters[i].StoreID, }) - targetReplicas = append(targetReplicas, roachpb.ReplicaDescriptor{ - NodeID: currentReplicas[i].NodeID, - StoreID: currentReplicas[i].StoreID, + targetVoters = append(targetVoters, roachpb.ReplicaDescriptor{ + NodeID: currentVoters[i].NodeID, + StoreID: currentVoters[i].StoreID, }) } } @@ -545,16 +546,20 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( // Then pick out which new stores to add the remaining replicas to. options := sr.rq.allocator.scorerOptions() options.qpsRebalanceThreshold = qpsRebalanceThreshold.Get(&sr.st.SV) - for len(targets) < desiredReplicas { - // Use the preexisting AllocateTarget logic to ensure that considerations + for len(targets) < desiredVoters { + // Use the preexisting AllocateVoter logic to ensure that considerations // such as zone constraints, locality diversity, and full disk come // into play. target, _ := sr.rq.allocator.allocateTargetFromList( ctx, storeList, zone, - targetReplicas, + targetVoters, + currentNonVoters, options, + // TODO(aayush): For now, we're not going to let the StoreRebalancer + // rebalance non-voting replicas. Fix this. + voterTarget, ) if target == nil { log.VEventf(ctx, 3, "no rebalance targets found to replace the current store for r%d", @@ -571,7 +576,7 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( NodeID: target.Node.NodeID, StoreID: target.StoreID, }) - targetReplicas = append(targetReplicas, roachpb.ReplicaDescriptor{ + targetVoters = append(targetVoters, roachpb.ReplicaDescriptor{ NodeID: target.Node.NodeID, StoreID: target.StoreID, }) @@ -584,12 +589,12 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( // moving one of the other existing replicas that's on a store with less // qps than the max threshold but above the mean would help in certain // locality configurations. - if len(targets) < desiredReplicas { + if len(targets) < desiredVoters { log.VEventf(ctx, 3, "couldn't find enough rebalance targets for r%d (%d/%d)", - desc.RangeID, len(targets), desiredReplicas) + desc.RangeID, len(targets), desiredVoters) continue } - newDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalitiesByStore(targetReplicas)) + newDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalitiesByStore(targetVoters)) if newDiversity < curDiversity { log.VEventf(ctx, 3, "new diversity %.2f for r%d worse than current diversity %.2f; not rebalancing",