From 5634c5338d06f59383259cde5551d9e1ad8b269c Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Tue, 19 Jan 2021 04:51:35 -0500 Subject: [PATCH] kvserver: teach replicateQueue to handle non-voter addition/removal This commit primarily teaches the allocator to be able to rank non-voting replica candidates for addition and removal. This allows us to have the replicateQueue execute upon the allocator's actions to add or remove non-voting replicas to a range. Note that this commit does not deal with _rebalancing_ of non-voting replicas, just simple additions and removals when a range is over or under-replicated. Release note: None --- pkg/kv/kvserver/allocator.go | 223 +++++++++++++++++------ pkg/kv/kvserver/allocator_scorer.go | 109 +++++++++-- pkg/kv/kvserver/allocator_scorer_test.go | 5 +- pkg/kv/kvserver/allocator_test.go | 141 +++++++------- pkg/kv/kvserver/client_merge_test.go | 6 + pkg/kv/kvserver/constraint/analyzer.go | 19 +- pkg/kv/kvserver/replica_command.go | 125 +++++++------ pkg/kv/kvserver/replicate_queue.go | 182 ++++++++++++------ pkg/kv/kvserver/replicate_queue_test.go | 76 +++++++- pkg/kv/kvserver/store_rebalancer.go | 48 ++--- 10 files changed, 646 insertions(+), 288 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 3fb8ee9a2711..0a1d4f28cfcb 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -158,18 +158,21 @@ const ( // can be retried quickly as soon as new stores come online, or additional // space frees up. type allocatorError struct { - constraints []zonepb.ConstraintsConjunction - existingReplicas int - aliveStores int - throttledStores int + constraints []zonepb.ConstraintsConjunction + voterConstraints []zonepb.ConstraintsConjunction + existingVoterCount int + existingNonVoterCount int + aliveStores int + throttledStores int } +// TODO(aayush): !!! Add the appropriate string for non-voters. func (ae *allocatorError) Error() string { var existingReplsStr string - if ae.existingReplicas == 1 { - existingReplsStr = "1 already has a replica" + if ae.existingVoterCount == 1 { + existingReplsStr = "1 already has a voter" } else { - existingReplsStr = fmt.Sprintf("%d already have a replica", ae.existingReplicas) + existingReplsStr = fmt.Sprintf("%d already have a voter", ae.existingVoterCount) } var baseMsg string @@ -538,21 +541,23 @@ type decisionDetails struct { Existing string `json:",omitempty"` } -// AllocateTarget returns a suitable store for a new allocation with the -// required attributes. Nodes already accommodating existing replicas are ruled -// out as targets. The range ID of the replica being allocated for is also -// passed in to ensure that we don't try to replace an existing dead replica on -// a store. -// -// TODO(tbg): AllocateReplacement? -func (a *Allocator) AllocateTarget( - ctx context.Context, zone *zonepb.ZoneConfig, existingReplicas []roachpb.ReplicaDescriptor, -) (*roachpb.StoreDescriptor, string, error) { - sl, aliveStoreCount, throttled := a.storePool.getStoreList(storeFilterThrottled) +type chooseReplicaToAddFn func( + ctx context.Context, + candidateStores StoreList, + zone *zonepb.ZoneConfig, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + options scorerOptions) (*roachpb.StoreDescriptor, string) - target, details := a.allocateTargetFromList( - ctx, sl, zone, existingReplicas, a.scorerOptions()) +func (a *Allocator) allocateTarget( + ctx context.Context, + zone *zonepb.ZoneConfig, + allocateFromList chooseReplicaToAddFn, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, +) (*roachpb.StoreDescriptor, string, error) { + candidateStoreList, aliveStoreCount, throttled := a.storePool.getStoreList(storeFilterThrottled) + target, details := allocateFromList(ctx, candidateStoreList, zone, existingVoters, + existingNonVoters, a.scorerOptions()) if target != nil { return target, details, nil } @@ -565,30 +570,100 @@ func (a *Allocator) AllocateTarget( ) } return nil, "", &allocatorError{ - constraints: zone.Constraints, - existingReplicas: len(existingReplicas), - aliveStores: aliveStoreCount, - throttledStores: len(throttled), + voterConstraints: zone.VoterConstraints, + constraints: zone.Constraints, + existingVoterCount: len(existingVoters), + existingNonVoterCount: len(existingNonVoters), + aliveStores: aliveStoreCount, + throttledStores: len(throttled), } } -func (a *Allocator) allocateTargetFromList( +// AllocateVoterTarget returns a suitable store for a new allocation of a voting +// replica with the required attributes. Nodes already accommodating existing +// replicas are ruled out as targets. +// +// TODO(aayush): This method currently rules out all the stores that have any +// replica for the given range. However, we want to get to a place where we'll +// consider the targets that have a non-voter as feasible +// relocation/up-replication targets for existing/new voting replicas, since it +// is cheaper to promote a non-voter than it is to add a new voter via the +// LEARNER->VOTER_FULL path. +func (a *Allocator) AllocateVoterTarget( + ctx context.Context, + zone *zonepb.ZoneConfig, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, +) (*roachpb.StoreDescriptor, string, error) { + return a.allocateTarget(ctx, zone, a.allocateVotersFromList, existingVoters, existingNonVoters) +} + +// AllocateNonVoterTarget returns a suitable store for a new allocation of a +// non-voting replica with the required attributes. Nodes already accommodating +// _any_ existing replicas are ruled out as targets. +func (a *Allocator) AllocateNonVoterTarget( + ctx context.Context, + zone *zonepb.ZoneConfig, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, +) (*roachpb.StoreDescriptor, string, error) { + return a.allocateTarget(ctx, zone, a.allocateNonVotersFromList, existingVoters, existingNonVoters) +} + +func (a *Allocator) allocateVotersFromList( ctx context.Context, candidateStores StoreList, zone *zonepb.ZoneConfig, - existingReplicas []roachpb.ReplicaDescriptor, + // TODO(aayush): This method needs to be taught to make decisions to swap a + // voter with a non-voter when "appropriate" to do so. We would want to prefer + // promoting an existing non-voter to a voter over adding a new voting + // replica. + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, options scorerOptions, ) (*roachpb.StoreDescriptor, string) { + existingReplicas := append(existingVoters, existingNonVoters...) + // Voting replicas have to abide by both the overall `constraints` (which + // apply to all replicas) and `voter_constraints` which apply only to voting + // replicas. analyzedConstraints := constraint.AnalyzeConstraints( - ctx, a.storePool.getStoreDescriptor, existingReplicas, zone) - candidates := allocateCandidates( - ctx, - candidateStores, analyzedConstraints, existingReplicas, + ctx, a.storePool.getStoreDescriptor, existingReplicas, int(*zone.NumReplicas), zone.Constraints) + analyzedVoterConstraints := constraint.AnalyzeConstraints( + ctx, a.storePool.getStoreDescriptor, existingVoters, int(zone.GetNumVoters()), zone.VoterConstraints) + + candidates := allocateVoterCandidates( + ctx, candidateStores, analyzedConstraints, analyzedVoterConstraints, existingReplicas, a.storePool.getLocalitiesByStore(existingReplicas), - a.storePool.isNodeReadyForRoutineReplicaTransfer, - options, + a.storePool.isNodeReadyForRoutineReplicaTransfer, options) + log.VEventf(ctx, 3, "allocate voter candidates: %s", candidates) + if target := candidates.selectGood(a.randGen); target != nil { + log.VEventf(ctx, 3, "add target: %s", target) + details := decisionDetails{Target: target.compactString(options)} + detailsBytes, err := json.Marshal(details) + if err != nil { + log.Warningf(ctx, "failed to marshal details for choosing allocate target: %+v", err) + } + return &target.store, string(detailsBytes) + } + + return nil, "" +} + +func (a *Allocator) allocateNonVotersFromList( + ctx context.Context, + candidateStores StoreList, + zone *zonepb.ZoneConfig, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + options scorerOptions, +) (*roachpb.StoreDescriptor, string) { + existingReplicas := append(existingVoters, existingNonVoters...) + analyzedConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor, + existingReplicas, int(*zone.NumReplicas), nil) + + candidates := allocateNonVoterCandidates( + ctx, candidateStores, analyzedConstraints, constraint.EmptyAnalyzedConstraints, existingReplicas, + a.storePool.getLocalitiesByStore(existingReplicas), + a.storePool.isNodeReadyForRoutineReplicaTransfer, options, ) - log.VEventf(ctx, 3, "allocate candidates: %s", candidates) + + log.VEventf(ctx, 3, "allocate non-voter candidates: %s", candidates) if target := candidates.selectGood(a.randGen); target != nil { log.VEventf(ctx, 3, "add target: %s", target) details := decisionDetails{Target: target.compactString(options)} @@ -602,12 +677,13 @@ func (a *Allocator) allocateTargetFromList( return nil, "" } +// TODO(aayush): Generalize this to work for non-voting replicas as well. func (a Allocator) simulateRemoveTarget( ctx context.Context, targetStore roachpb.StoreID, zone *zonepb.ZoneConfig, candidates []roachpb.ReplicaDescriptor, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, rangeUsageInfo RangeUsageInfo, ) (roachpb.ReplicaDescriptor, string, error) { // Update statistics first @@ -620,40 +696,41 @@ func (a Allocator) simulateRemoveTarget( a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.REMOVE_VOTER) }() log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore) - return a.RemoveTarget(ctx, zone, candidates, existingReplicas) + return a.RemoveVoter(ctx, zone, candidates, existingVoters, nil) } -// RemoveTarget returns a suitable replica to remove from the provided replica -// set. It first attempts to randomly select a target from the set of stores -// that have greater than the average number of replicas. Failing that, it -// falls back to selecting a random target from any of the existing -// replicas. -func (a Allocator) RemoveTarget( +func (a Allocator) removeTarget( ctx context.Context, zone *zonepb.ZoneConfig, candidates []roachpb.ReplicaDescriptor, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, + constraintsCheck constraintsCheckFn, ) (roachpb.ReplicaDescriptor, string, error) { if len(candidates) == 0 { - return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()") + return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveVoter()") } + existingReplicas := append(existingVoters, existingNonVoters...) // Retrieve store descriptors for the provided candidates from the StorePool. candidateStoreIDs := make(roachpb.StoreIDSlice, len(candidates)) for i, exist := range candidates { candidateStoreIDs[i] = exist.StoreID } candidateStoreList, _, _ := a.storePool.getStoreListFromIDs(candidateStoreIDs, storeFilterNone) - - analyzedConstraints := constraint.AnalyzeConstraints( - ctx, a.storePool.getStoreDescriptor, existingReplicas, zone) + analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor, + existingReplicas, int(*zone.NumReplicas), zone.Constraints) + analyzedVoterConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor, + existingVoters, int(zone.GetNumVoters()), zone.VoterConstraints) options := a.scorerOptions() - rankedCandidates := removeCandidates( + rankedCandidates := rankedCandidateListForRemoval( candidateStoreList, - analyzedConstraints, - a.storePool.getLocalitiesByStore(existingReplicas), + analyzedOverallConstraints, analyzedVoterConstraints, + constraintsCheck, + a.storePool.getLocalitiesByStore(existingVoters), options, ) + log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates) if bad := rankedCandidates.selectBad(a.randGen); bad != nil { for _, exist := range existingReplicas { @@ -672,9 +749,48 @@ func (a Allocator) RemoveTarget( return roachpb.ReplicaDescriptor{}, "", errors.New("could not select an appropriate replica to be removed") } +type chooseReplicaToRemoveFn func( + ctx context.Context, + zone *zonepb.ZoneConfig, + nonVoterCandidates []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, +) (roachpb.ReplicaDescriptor, string, error) + +// RemoveVoter returns a suitable replica to remove from the provided replica +// set. It first attempts to randomly select a target from the set of stores +// that have greater than the average number of replicas. Failing that, it +// falls back to selecting a random target from any of the existing +// replicas. +func (a Allocator) RemoveVoter( + ctx context.Context, + zone *zonepb.ZoneConfig, + voterCandidates []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, +) (roachpb.ReplicaDescriptor, string, error) { + return a.removeTarget(ctx, zone, voterCandidates, existingVoters, existingNonVoters, + checkVoterConstraintsForRemoval) +} + +// RemoveNonVoter returns a suitable non-voting replica to remove from the +// provided set. It first attempts to randomly select a target from the set of +// stores that have greater than the average number of replicas. Failing that, +// it falls back to selecting a random target from any of the existing replicas. +func (a Allocator) RemoveNonVoter( + ctx context.Context, + zone *zonepb.ZoneConfig, + nonVoterCandidates []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, + existingNonVoters []roachpb.ReplicaDescriptor, +) (roachpb.ReplicaDescriptor, string, error) { + return a.removeTarget(ctx, zone, nonVoterCandidates, existingVoters, existingNonVoters, + checkNonVoterConstraintsForRemoval) +} + // RebalanceTarget returns a suitable store for a rebalance target with // required attributes. Rebalance targets are selected via the same mechanism -// as AllocateTarget(), except the chosen target must follow some additional +// as AllocateVoterTarget(), except the chosen target must follow some additional // criteria. Namely, if chosen, it must further the goal of balancing the // cluster. // @@ -686,7 +802,7 @@ func (a Allocator) RemoveTarget( // replica to the range, then removing the most undesirable replica. // // Simply ignoring a rebalance opportunity in the event that the target chosen -// by AllocateTarget() doesn't fit balancing criteria is perfectly fine, as +// by AllocateVoterTarget() doesn't fit balancing criteria is perfectly fine, as // other stores in the cluster will also be doing their probabilistic best to // rebalance. This helps prevent a stampeding herd targeting an abnormally // under-utilized store. @@ -737,9 +853,8 @@ func (a Allocator) RebalanceTarget( return zero, zero, "", false } } - analyzedConstraints := constraint.AnalyzeConstraints( - ctx, a.storePool.getStoreDescriptor, existingReplicas, zone) + ctx, a.storePool.getStoreDescriptor, existingReplicas, int(*zone.NumReplicas), zone.Constraints) options := a.scorerOptions() results := rebalanceCandidates( ctx, @@ -804,7 +919,7 @@ func (a Allocator) RebalanceTarget( rangeUsageInfo, ) if err != nil { - log.Warningf(ctx, "simulating RemoveTarget failed: %+v", err) + log.Warningf(ctx, "simulating RemoveVoter failed: %+v", err) return zero, zero, "", false } if target.store.StoreID != removeReplica.StoreID { diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index 3e0789b570da..7981e6ed11f9 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -404,13 +404,30 @@ func (cl candidateList) removeCandidate(c candidate) candidateList { return cl } -// allocateCandidates creates a candidate list of all stores that can be used -// for allocating a new replica ordered from the best to the worst. Only -// stores that meet the criteria are included in the list. +type constraintsCheckFn func(s roachpb.StoreDescriptor, + overallConstraints, voterConstraints constraint.AnalyzedConstraints) (valid, necessary bool) + +func checkVoterConstraintsForAllocation( + s roachpb.StoreDescriptor, overallConstraints, voterConstraints constraint.AnalyzedConstraints, +) (valid, necessary bool) { + overallConstraintsOK, necessaryOverall := allocateConstraintsCheck(s, overallConstraints) + voterConstraintsOK, necessaryForVoters := allocateConstraintsCheck(s, voterConstraints) + + return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters +} + +func checkNonVoterConstraintsForAllocation( + s roachpb.StoreDescriptor, overallConstraints, _ constraint.AnalyzedConstraints, +) (valid, necessary bool) { + return allocateConstraintsCheck(s, overallConstraints) +} + func allocateCandidates( ctx context.Context, candidateStores StoreList, - constraints constraint.AnalyzedConstraints, + overallConstraints constraint.AnalyzedConstraints, + voterConstraints constraint.AnalyzedConstraints, + constraintsCheck constraintsCheckFn, existingReplicas []roachpb.ReplicaDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool, @@ -425,10 +442,11 @@ func allocateCandidates( log.VEventf(ctx, 3, "not considering non-ready node n%d for allocate", s.Node.NodeID) continue } - constraintsOK, necessary := allocateConstraintsCheck(s, constraints) + constraintsOK, necessary := constraintsCheck(s, overallConstraints, voterConstraints) if !constraintsOK { continue } + if !maxCapacityCheck(s) { continue } @@ -436,11 +454,13 @@ func allocateCandidates( balanceScore := balanceScore(candidateStores, s.Capacity, options) var convergesScore int if options.qpsRebalanceThreshold > 0 { - if s.Capacity.QueriesPerSecond < underfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) { + if s.Capacity.QueriesPerSecond < underfullThreshold( + candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) { convergesScore = 1 } else if s.Capacity.QueriesPerSecond < candidateStores.candidateQueriesPerSecond.mean { convergesScore = 0 - } else if s.Capacity.QueriesPerSecond < overfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) { + } else if s.Capacity.QueriesPerSecond < overfullThreshold( + candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) { convergesScore = -1 } else { convergesScore = -2 @@ -464,18 +484,72 @@ func allocateCandidates( return candidates } -// removeCandidates creates a candidate list of all existing replicas' stores -// ordered from least qualified for removal to most qualified. Stores that are -// marked as not valid, are in violation of a required criteria. -func removeCandidates( +// allocateVoterCandidates creates a candidateList of all stores that can be +// used for allocating a new voting replica ordered from the best to the worst. +// Only stores that meet the criteria are included in the list. +func allocateVoterCandidates( + ctx context.Context, + candidateStores StoreList, + overallConstraints, voterConstraints constraint.AnalyzedConstraints, + allExistingReplicas []roachpb.ReplicaDescriptor, + existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, + isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool, + options scorerOptions, +) candidateList { + return allocateCandidates( + ctx, candidateStores, overallConstraints, voterConstraints, + checkVoterConstraintsForAllocation, allExistingReplicas, existingStoreLocalities, + isNodeValidForRoutineReplicaTransfer, options, + ) +} + +// allocateNonVoterCandidates creates a candidateList of all stores that can be +// used for allocating a new non voting replica ordered from the best to the +// worst. Only stores that meet the criteria are included in the list. +func allocateNonVoterCandidates( + ctx context.Context, + candidateStores StoreList, + overallConstraints, voterConstraints constraint.AnalyzedConstraints, + allExistingReplicas []roachpb.ReplicaDescriptor, + existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, + isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool, + options scorerOptions, +) candidateList { + return allocateCandidates( + ctx, candidateStores, overallConstraints, voterConstraints, + checkNonVoterConstraintsForAllocation, allExistingReplicas, existingStoreLocalities, + isNodeValidForRoutineReplicaTransfer, options, + ) +} + +func checkVoterConstraintsForRemoval( + s roachpb.StoreDescriptor, overallConstraints, voterConstraints constraint.AnalyzedConstraints, +) (valid, necessary bool) { + overallConstraintsOK, necessaryOverall := removeConstraintsCheck(s, overallConstraints) + voterConstraintsOK, necessaryForVoters := removeConstraintsCheck(s, voterConstraints) + + return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters +} + +func checkNonVoterConstraintsForRemoval( + s roachpb.StoreDescriptor, overallConstraints, _ constraint.AnalyzedConstraints, +) (valid, necessary bool) { + return removeConstraintsCheck(s, overallConstraints) +} + +// rankedCandidateListForRemoval creates a candidate list of all existing +// replicas' stores ordered from least qualified for removal to most qualified. +// Stores that are marked as not valid, are in violation of a required criteria. +func rankedCandidateListForRemoval( sl StoreList, - constraints constraint.AnalyzedConstraints, + overallConstraints, voterConstraints constraint.AnalyzedConstraints, + constraintsCheck constraintsCheckFn, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, options scorerOptions, ) candidateList { var candidates candidateList for _, s := range sl.stores { - constraintsOK, necessary := removeConstraintsCheck(s, constraints) + constraintsOK, necessary := constraintsCheck(s, overallConstraints, voterConstraints) if !constraintsOK { candidates = append(candidates, candidate{ store: s, @@ -707,10 +781,9 @@ func rebalanceCandidates( balanceScore := balanceScore(comparable.sl, existing.store.Capacity, options) var convergesScore int if !rebalanceFromConvergesOnMean(comparable.sl, existing.store.Capacity) { - // Similarly to in removeCandidates, any replica whose removal - // would not converge the range stats to their means is given a - // constraint score boost of 1 to make it less attractive for - // removal. + // Similarly to in rankedCandidateListForRemoval, any replica whose + // removal would not converge the range stats to their means is given a + // constraint score boost of 1 to make it less attractive for removal. convergesScore = 1 } existing.convergesScore = convergesScore @@ -927,6 +1000,8 @@ func allocateConstraintsCheck( ); constraintsOK { valid = true matchingStores := analyzed.SatisfiedBy[i] + // TODO DURING REVIEW: Not directly related to this patch, but shouldn't + // this be <= ?! if len(matchingStores) < int(constraints.NumReplicas) { return true, true } diff --git a/pkg/kv/kvserver/allocator_scorer_test.go b/pkg/kv/kvserver/allocator_scorer_test.go index e24d409ec195..88a188424722 100644 --- a/pkg/kv/kvserver/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator_scorer_test.go @@ -918,7 +918,8 @@ func TestAllocateConstraintsCheck(t *testing.T) { NumReplicas: proto.Int32(tc.zoneNumReplicas), } analyzed := constraint.AnalyzeConstraints( - context.Background(), getTestStoreDesc, testStoreReplicas(tc.existing), zone) + context.Background(), getTestStoreDesc, testStoreReplicas(tc.existing), + int(*zone.NumReplicas), zone.Constraints) for _, s := range testStores { valid, necessary := allocateConstraintsCheck(s, analyzed) if e, a := tc.expectedValid[s.StoreID], valid; e != a { @@ -1052,7 +1053,7 @@ func TestRemoveConstraintsCheck(t *testing.T) { NumReplicas: proto.Int32(tc.zoneNumReplicas), } analyzed := constraint.AnalyzeConstraints( - context.Background(), getTestStoreDesc, existing, zone) + context.Background(), getTestStoreDesc, existing, int(*zone.NumReplicas), zone.Constraints) for storeID, expected := range tc.expected { valid, necessary := removeConstraintsCheck(testStores[storeID], analyzed) if e, a := expected.valid, valid; e != a { diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 111c6a7f48d3..00abc8159087 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -389,10 +389,10 @@ func TestAllocatorSimpleRetrieval(t *testing.T) { stopper, g, _, a, _ := createTestAllocator(1, false /* deterministic */) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) - result, _, err := a.AllocateTarget( + result, _, err := a.AllocateVoterTarget( context.Background(), &simpleZoneConfig, - []roachpb.ReplicaDescriptor{}, + nil /* existingVoters */, nil, /* existingNonVoters */ ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -408,10 +408,10 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { stopper, _, _, a, _ := createTestAllocator(1, false /* deterministic */) defer stopper.Stop(context.Background()) - result, _, err := a.AllocateTarget( + result, _, err := a.AllocateVoterTarget( context.Background(), &simpleZoneConfig, - []roachpb.ReplicaDescriptor{}, + nil /* existingVoters */, nil, /* existingNonVoters */ ) if result != nil { t.Errorf("expected nil result: %+v", result) @@ -429,21 +429,21 @@ func TestAllocatorTwoDatacenters(t *testing.T) { defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(multiDCStores, t) ctx := context.Background() - result1, _, err := a.AllocateTarget( + result1, _, err := a.AllocateVoterTarget( ctx, &multiDCConfig, - []roachpb.ReplicaDescriptor{}, + nil /* existingVoters */, nil, /* existingNonVoters */ ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) } - result2, _, err := a.AllocateTarget( + result2, _, err := a.AllocateVoterTarget( ctx, &multiDCConfig, []roachpb.ReplicaDescriptor{{ NodeID: result1.Node.NodeID, StoreID: result1.StoreID, - }}, + }}, nil, /* existingNonVoters */ ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -454,7 +454,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { t.Errorf("Expected nodes %+v: %+v vs %+v", expected, result1.Node, result2.Node) } // Verify that no result is forthcoming if we already have a replica. - result3, _, err := a.AllocateTarget( + result3, _, err := a.AllocateVoterTarget( ctx, &multiDCConfig, []roachpb.ReplicaDescriptor{ @@ -466,7 +466,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { NodeID: result2.Node.NodeID, StoreID: result2.StoreID, }, - }, + }, nil, /* existingNonVoters */ ) if err == nil { t.Errorf("expected error on allocation without available stores: %+v", result3) @@ -480,7 +480,7 @@ func TestAllocatorExistingReplica(t *testing.T) { stopper, g, _, a, _ := createTestAllocator(1, false /* deterministic */) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t) - result, _, err := a.AllocateTarget( + result, _, err := a.AllocateVoterTarget( context.Background(), &zonepb.ZoneConfig{ NumReplicas: proto.Int32(0), @@ -498,7 +498,7 @@ func TestAllocatorExistingReplica(t *testing.T) { NodeID: 2, StoreID: 2, }, - }, + }, nil, /* existingNonVoters */ ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -599,13 +599,9 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { for _, tc := range testCases { { - result, _, err := a.AllocateTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - tc.existing, - ) + result, _, err := a.AllocateVoterTarget(context.Background(), zonepb.EmptyCompleteZoneConfig(), tc.existing, nil) if e, a := tc.expectTargetAllocate, result != nil; e != a { - t.Errorf("AllocateTarget(%v) got target %v, err %v; expectTarget=%v", + t.Errorf("AllocateVoterTarget(%v) got target %v, err %v; expectTarget=%v", tc.existing, result, err, tc.expectTargetAllocate) } } @@ -835,7 +831,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { // We make 5 stores in this test -- 3 in the same datacenter, and 1 each in // 2 other datacenters. All of our replicas are distributed within these 3 // datacenters. Originally, the stores that are all alone in their datacenter - // are fuller than the other stores. If we didn't simulate RemoveTarget in + // are fuller than the other stores. If we didn't simulate RemoveVoter in // RebalanceTarget, we would try to choose store 2 or 3 as the target store // to make a rebalance. However, we would immediately remove the replica on // store 1 or 2 to retain the locality diversity. @@ -2060,6 +2056,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { } } +// TODO(aayush): These tests should be renamed. func TestAllocatorRemoveTargetLocality(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2108,11 +2105,12 @@ func TestAllocatorRemoveTargetLocality(t *testing.T) { StoreID: storeID, } } - targetRepl, details, err := a.RemoveTarget( + targetRepl, details, err := a.RemoveVoter( context.Background(), zonepb.EmptyCompleteZoneConfig(), existingRepls, existingRepls, + nil, /* existingNonVoters */ ) if err != nil { t.Fatal(err) @@ -2125,7 +2123,7 @@ func TestAllocatorRemoveTargetLocality(t *testing.T) { } } if !found { - t.Errorf("expected RemoveTarget(%v) in %v, but got %d; details: %s", c.existing, c.expected, targetRepl.StoreID, details) + t.Errorf("expected RemoveVoter(%v) in %v, but got %d; details: %s", c.existing, c.expected, targetRepl.StoreID, details) } } } @@ -2192,11 +2190,7 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { StoreID: storeID, } } - targetStore, details, err := a.AllocateTarget( - context.Background(), - zonepb.EmptyCompleteZoneConfig(), - existingRepls, - ) + targetStore, details, err := a.AllocateVoterTarget(context.Background(), zonepb.EmptyCompleteZoneConfig(), existingRepls, nil) if err != nil { t.Fatal(err) } @@ -2208,7 +2202,7 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { } } if !found { - t.Errorf("expected AllocateTarget(%v) in %v, but got %d; details: %s", c.existing, c.expected, targetStore.StoreID, details) + t.Errorf("expected AllocateVoterTarget(%v) in %v, but got %d; details: %s", c.existing, c.expected, targetStore.StoreID, details) } } } @@ -2524,7 +2518,8 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { // No constraints. zone := &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: nil} analyzed := constraint.AnalyzeConstraints( - context.Background(), a.storePool.getStoreDescriptor, existingRepls, zone) + context.Background(), a.storePool.getStoreDescriptor, existingRepls, int(*zone.NumReplicas), + zone.Constraints) a.storePool.isNodeReadyForRoutineReplicaTransfer = func(_ context.Context, n roachpb.NodeID) bool { for _, s := range tc.excluded { @@ -2541,6 +2536,8 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { context.Background(), sl, analyzed, + constraint.EmptyAnalyzedConstraints, + checkVoterConstraintsForAllocation, existingRepls, a.storePool.getLocalitiesByStore(existingRepls), a.storePool.isNodeReadyForRoutineReplicaTransfer, @@ -2786,11 +2783,14 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { } zone := &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: tc.constraints} analyzed := constraint.AnalyzeConstraints( - context.Background(), a.storePool.getStoreDescriptor, existingRepls, zone) + context.Background(), a.storePool.getStoreDescriptor, existingRepls, int(*zone.NumReplicas), + zone.Constraints) candidates := allocateCandidates( context.Background(), sl, analyzed, + constraint.EmptyAnalyzedConstraints, + checkVoterConstraintsForAllocation, existingRepls, a.storePool.getLocalitiesByStore(existingRepls), func(context.Context, roachpb.NodeID) bool { return true }, /* isNodeValidForRoutineReplicaTransfer */ @@ -3010,18 +3010,20 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { StoreID: storeID, } } + ctx := context.Background() zone := &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: tc.constraints} analyzed := constraint.AnalyzeConstraints( - context.Background(), a.storePool.getStoreDescriptor, existingRepls, zone) - candidates := removeCandidates( - sl, - analyzed, - a.storePool.getLocalitiesByStore(existingRepls), - a.scorerOptions(), - ) + ctx, a.storePool.getStoreDescriptor, existingRepls, int(*zone.NumReplicas), + zone.Constraints) + existingVoters := roachpb.MakeReplicaSet(existingRepls).VoterDescriptors() + analyzedVoterConstraints := constraint.AnalyzeConstraints( + ctx, a.storePool.getStoreDescriptor, existingVoters, int(zone.GetNumVoters()), + zone.VoterConstraints) + candidates := rankedCandidateListForRemoval(sl, analyzed, analyzedVoterConstraints, + checkVoterConstraintsForRemoval, a.storePool.getLocalitiesByStore(existingRepls), a.scorerOptions()) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { - t.Errorf("%d: expected removeCandidates(%v) = %v, but got %v", - testIdx, tc.existing, tc.expected, candidates) + t.Errorf("%d: expected rankedCandidateListForRemoval(%v) = %v, but got %v\n for candidates %v", + testIdx, tc.existing, tc.expected, candidates.worst(), candidates) } } } @@ -3807,7 +3809,8 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { NumReplicas: proto.Int32(tc.zoneNumReplicas), } analyzed := constraint.AnalyzeConstraints( - context.Background(), a.storePool.getStoreDescriptor, existingRepls, zone) + context.Background(), a.storePool.getStoreDescriptor, existingRepls, + int(*zone.NumReplicas), zone.Constraints) results := rebalanceCandidates( context.Background(), sl, @@ -4138,13 +4141,13 @@ func TestLoadBasedLeaseRebalanceScore(t *testing.T) { } } -// TestAllocatorRemoveTarget verifies that the replica chosen by RemoveTarget is +// TestAllocatorRemoveTarget verifies that the replica chosen by RemoveVoter is // the one with the lowest capacity. func TestAllocatorRemoveTarget(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // List of replicas that will be passed to RemoveTarget + // List of replicas that will be passed to RemoveVoter replicas := []roachpb.ReplicaDescriptor{ { StoreID: 1, @@ -4211,17 +4214,13 @@ func TestAllocatorRemoveTarget(t *testing.T) { // Repeat this test 10 times, it should always be either store 2 or 3. for i := 0; i < 10; i++ { - targetRepl, _, err := a.RemoveTarget( - ctx, - zonepb.EmptyCompleteZoneConfig(), - replicas, - replicas, - ) + targetRepl, _, err := a.RemoveVoter(ctx, zonepb.EmptyCompleteZoneConfig(), replicas, replicas, + nil) if err != nil { t.Fatal(err) } if a, e1, e2 := targetRepl, replicas[1], replicas[2]; a != e1 && a != e2 { - t.Fatalf("%d: RemoveTarget did not select either expected replica; expected %v or %v, got %v", + t.Fatalf("%d: RemoveVoter did not select either expected replica; expected %v or %v, got %v", i, e1, e2, a) } } @@ -5531,24 +5530,24 @@ func TestAllocatorError(t *testing.T) { ae allocatorError expected string }{ - {allocatorError{constraints: nil, existingReplicas: 1, aliveStores: 1}, - "0 of 1 live stores are able to take a new replica for the range (1 already has a replica); likely not enough nodes in cluster"}, - {allocatorError{constraints: nil, existingReplicas: 1, aliveStores: 2, throttledStores: 1}, - "0 of 2 live stores are able to take a new replica for the range (1 throttled, 1 already has a replica)"}, - {allocatorError{constraints: constraint, existingReplicas: 1, aliveStores: 1}, - `0 of 1 live stores are able to take a new replica for the range (1 already has a replica); ` + + {allocatorError{constraints: nil, existingVoterCount: 1, aliveStores: 1}, + "0 of 1 live stores are able to take a new replica for the range (1 already has a voter); likely not enough nodes in cluster"}, + {allocatorError{constraints: nil, existingVoterCount: 1, aliveStores: 2, throttledStores: 1}, + "0 of 2 live stores are able to take a new replica for the range (1 throttled, 1 already has a voter)"}, + {allocatorError{constraints: constraint, existingVoterCount: 1, aliveStores: 1}, + `0 of 1 live stores are able to take a new replica for the range (1 already has a voter); ` + `must match constraints [{+one}]`}, - {allocatorError{constraints: constraint, existingReplicas: 1, aliveStores: 2}, - `0 of 2 live stores are able to take a new replica for the range (1 already has a replica); ` + + {allocatorError{constraints: constraint, existingVoterCount: 1, aliveStores: 2}, + `0 of 2 live stores are able to take a new replica for the range (1 already has a voter); ` + `must match constraints [{+one}]`}, - {allocatorError{constraints: constraints, existingReplicas: 1, aliveStores: 1}, - `0 of 1 live stores are able to take a new replica for the range (1 already has a replica); ` + + {allocatorError{constraints: constraints, existingVoterCount: 1, aliveStores: 1}, + `0 of 1 live stores are able to take a new replica for the range (1 already has a voter); ` + `must match constraints [{+one,+two}]`}, - {allocatorError{constraints: constraints, existingReplicas: 1, aliveStores: 2}, - `0 of 2 live stores are able to take a new replica for the range (1 already has a replica); ` + + {allocatorError{constraints: constraints, existingVoterCount: 1, aliveStores: 2}, + `0 of 2 live stores are able to take a new replica for the range (1 already has a voter); ` + `must match constraints [{+one,+two}]`}, - {allocatorError{constraints: constraint, existingReplicas: 1, aliveStores: 2, throttledStores: 1}, - `0 of 2 live stores are able to take a new replica for the range (1 throttled, 1 already has a replica); ` + + {allocatorError{constraints: constraint, existingVoterCount: 1, aliveStores: 2, throttledStores: 1}, + `0 of 2 live stores are able to take a new replica for the range (1 throttled, 1 already has a voter); ` + `must match constraints [{+one}]`}, } @@ -5568,22 +5567,14 @@ func TestAllocatorThrottled(t *testing.T) { defer stopper.Stop(ctx) // First test to make sure we would send the replica to purgatory. - _, _, err := a.AllocateTarget( - ctx, - &simpleZoneConfig, - []roachpb.ReplicaDescriptor{}, - ) + _, _, err := a.AllocateVoterTarget(ctx, &simpleZoneConfig, []roachpb.ReplicaDescriptor{}, nil) if !errors.HasInterface(err, (*purgatoryError)(nil)) { t.Fatalf("expected a purgatory error, got: %+v", err) } // Second, test the normal case in which we can allocate to the store. gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) - result, _, err := a.AllocateTarget( - ctx, - &simpleZoneConfig, - []roachpb.ReplicaDescriptor{}, - ) + result, _, err := a.AllocateVoterTarget(ctx, &simpleZoneConfig, []roachpb.ReplicaDescriptor{}, nil) if err != nil { t.Fatalf("unable to perform allocation: %+v", err) } @@ -5600,11 +5591,7 @@ func TestAllocatorThrottled(t *testing.T) { } storeDetail.throttledUntil = timeutil.Now().Add(24 * time.Hour) a.storePool.detailsMu.Unlock() - _, _, err = a.AllocateTarget( - ctx, - &simpleZoneConfig, - []roachpb.ReplicaDescriptor{}, - ) + _, _, err = a.AllocateVoterTarget(ctx, &simpleZoneConfig, []roachpb.ReplicaDescriptor{}, nil) if errors.HasInterface(err, (*purgatoryError)(nil)) { t.Fatalf("expected a non purgatory error, got: %+v", err) } diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 1150a6cafa6e..b9e29eccf489 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -3495,6 +3495,9 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) { func verifyMerged(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey) { t.Helper() repl := store.LookupReplica(rhsStartKey) + if repl == nil { + t.Fatal("replica doesn't exist") + } if !repl.Desc().StartKey.Equal(lhsStartKey) { t.Fatalf("ranges unexpectedly unmerged expected startKey %s, but got %s", lhsStartKey, repl.Desc().StartKey) } @@ -3503,6 +3506,9 @@ func verifyMerged(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey func verifyUnmerged(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey) { t.Helper() repl := store.LookupReplica(rhsStartKey) + if repl == nil { + t.Fatal("replica doesn't exist") + } if repl.Desc().StartKey.Equal(lhsStartKey) { t.Fatalf("ranges unexpectedly merged") } diff --git a/pkg/kv/kvserver/constraint/analyzer.go b/pkg/kv/kvserver/constraint/analyzer.go index e5e191ec5e0f..30c896e2334d 100644 --- a/pkg/kv/kvserver/constraint/analyzer.go +++ b/pkg/kv/kvserver/constraint/analyzer.go @@ -36,6 +36,10 @@ type AnalyzedConstraints struct { Satisfies map[roachpb.StoreID][]int } +// EmptyAnalyzedConstraints represents an empty set of constraints that are +// satisfied by any given configuration of replicas. +var EmptyAnalyzedConstraints = AnalyzedConstraints{} + // AnalyzeConstraints processes the zone config constraints that apply to a // range along with the current replicas for a range, spitting back out // information about which constraints are satisfied by which replicas and @@ -44,19 +48,22 @@ func AnalyzeConstraints( ctx context.Context, getStoreDescFn func(roachpb.StoreID) (roachpb.StoreDescriptor, bool), existing []roachpb.ReplicaDescriptor, - zone *zonepb.ZoneConfig, + // TODO(aayush): !!! Just change this to an int32 to avoid all the casting the + // callers have to do. + numReplicas int, + constraints []zonepb.ConstraintsConjunction, ) AnalyzedConstraints { result := AnalyzedConstraints{ - Constraints: zone.Constraints, + Constraints: constraints, } - if len(zone.Constraints) > 0 { - result.SatisfiedBy = make([][]roachpb.StoreID, len(zone.Constraints)) + if len(constraints) > 0 { + result.SatisfiedBy = make([][]roachpb.StoreID, len(constraints)) result.Satisfies = make(map[roachpb.StoreID][]int) } var constrainedReplicas int32 - for i, subConstraints := range zone.Constraints { + for i, subConstraints := range constraints { constrainedReplicas += subConstraints.NumReplicas for _, repl := range existing { // If for some reason we don't have the store descriptor (which shouldn't @@ -70,7 +77,7 @@ func AnalyzeConstraints( } } } - if constrainedReplicas > 0 && constrainedReplicas < *zone.NumReplicas { + if constrainedReplicas > 0 && constrainedReplicas < int32(numReplicas) { result.UnconstrainedReplicas = true } return result diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index b3e2870fd6f0..c762c8a9ec5f 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2494,6 +2494,15 @@ func (s *Store) relocateReplicas( } } +type relocationArgs struct { + targetsToAdd, targetsToRemove []roachpb.ReplicaDescriptor + addOp, removeOp roachpb.ReplicaChangeType + allocateFn chooseReplicaToAddFn + removeFn chooseReplicaToRemoveFn + relocationTargets []roachpb.ReplicationTarget + noVotersToRelocate bool +} + func (s *Store) relocateOne( ctx context.Context, desc *roachpb.RangeDescriptor, @@ -2518,17 +2527,34 @@ func (s *Store) relocateOne( storeList, _, _ := s.allocator.storePool.getStoreList(storeFilterNone) storeMap := storeListToMap(storeList) - getTargetsToRelocate := func() (targetsToAdd, targetsToRemove []roachpb.ReplicaDescriptor, - addOp, removeOp roachpb.ReplicaChangeType, votersRelocated bool) { + getRelocationArgs := func() relocationArgs { votersToAdd := subtractTargets(voterTargets, desc.Replicas().Voters().ReplicationTargets()) votersToRemove := subtractTargets(desc.Replicas().Voters().ReplicationTargets(), voterTargets) // If there are no voters to relocate, we relocate the non-voters. if len(votersToAdd) == 0 && len(votersToRemove) == 0 { nonVotersToAdd := subtractTargets(nonVoterTargets, desc.Replicas().NonVoters().ReplicationTargets()) nonVotersToRemove := subtractTargets(desc.Replicas().NonVoters().ReplicationTargets(), nonVoterTargets) - return nonVotersToAdd, nonVotersToRemove, roachpb.ADD_NON_VOTER, roachpb.REMOVE_NON_VOTER, true + return relocationArgs{ + targetsToAdd: nonVotersToAdd, + targetsToRemove: nonVotersToRemove, + addOp: roachpb.ADD_NON_VOTER, + removeOp: roachpb.REMOVE_NON_VOTER, + allocateFn: s.allocator.allocateNonVotersFromList, + removeFn: s.allocator.RemoveNonVoter, + relocationTargets: nonVoterTargets, + noVotersToRelocate: true, + } + } + return relocationArgs{ + targetsToAdd: votersToAdd, + targetsToRemove: votersToRemove, + addOp: roachpb.ADD_VOTER, + removeOp: roachpb.REMOVE_VOTER, + allocateFn: s.allocator.allocateVotersFromList, + removeFn: s.allocator.RemoveVoter, + relocationTargets: voterTargets, + noVotersToRelocate: false, } - return votersToAdd, votersToRemove, roachpb.ADD_VOTER, roachpb.REMOVE_VOTER, false } // Compute which replica to add and/or remove, respectively. We then ask the @@ -2540,24 +2566,20 @@ func (s *Store) relocateOne( // same node, and this code doesn't do anything to specifically avoid that // case (although the allocator will avoid even trying to send snapshots to // such stores), so it could cause some failures. - targetsToAdd, targetsToRemove, addOp, removeOp, votersRelocated := getTargetsToRelocate() - relocationTargets := voterTargets - existingReplicas := desc.Replicas().VoterDescriptors() - if votersRelocated { - relocationTargets = nonVoterTargets - existingReplicas = desc.Replicas().NonVoterDescriptors() - } + args := getRelocationArgs() + existingVoters, existingNonVoters := desc.Replicas().VoterDescriptors(), desc.Replicas().NonVoterDescriptors() + existingReplicas := desc.Replicas().Descriptors() var ops roachpb.ReplicationChanges - if len(targetsToAdd) > 0 { + if len(args.targetsToAdd) > 0 { // Each iteration, pick the most desirable replica to add. However, // prefer the first target because it's the one that should hold the // lease in the end; it helps to add it early so that the lease doesn't // have to move too much. - candidateTargets := targetsToAdd - if !votersRelocated && storeHasReplica(relocationTargets[0].StoreID, candidateTargets) { + candidateTargets := args.targetsToAdd + if !args.noVotersToRelocate && storeHasReplica(args.relocationTargets[0].StoreID, candidateTargets) { candidateTargets = []roachpb.ReplicaDescriptor{ - {NodeID: relocationTargets[0].NodeID, StoreID: relocationTargets[0].StoreID}, + {NodeID: args.relocationTargets[0].NodeID, StoreID: args.relocationTargets[0].StoreID}, } } @@ -2575,46 +2597,48 @@ func (s *Store) relocateOne( } candidateStoreList := makeStoreList(candidateDescs) - targetStore, _ := s.allocator.allocateTargetFromList( - ctx, - candidateStoreList, - zone, - existingReplicas, - s.allocator.scorerOptions()) - if targetStore == nil { - return nil, nil, fmt.Errorf("none of the remaining relocationTargets %v are legal additions to %v", - targetsToAdd, desc.Replicas()) - } + targetStore, _ := args.allocateFn(ctx, candidateStoreList, zone, existingVoters, existingNonVoters, s.allocator.scorerOptions()) target := roachpb.ReplicationTarget{ NodeID: targetStore.Node.NodeID, StoreID: targetStore.StoreID, } - ops = append(ops, roachpb.MakeReplicationChanges(addOp, target)...) + ops = append(ops, roachpb.MakeReplicationChanges(args.addOp, target)...) + // Pretend the replica is already there so that the removal logic below will // take it into account when deciding which replica to remove. - existingReplicas = append(existingReplicas, roachpb.ReplicaDescriptor{ - NodeID: target.NodeID, - StoreID: target.StoreID, - ReplicaID: desc.NextReplicaID, - Type: roachpb.ReplicaTypeVoterFull(), - }) + if args.noVotersToRelocate { + existingNonVoters = append(existingNonVoters, roachpb.ReplicaDescriptor{ + NodeID: target.NodeID, + StoreID: target.StoreID, + ReplicaID: desc.NextReplicaID, + Type: roachpb.ReplicaTypeNonVoter(), + }) + } else { + existingVoters = append(existingVoters, roachpb.ReplicaDescriptor{ + NodeID: target.NodeID, + StoreID: target.StoreID, + ReplicaID: desc.NextReplicaID, + Type: roachpb.ReplicaTypeVoterFull(), + }) + } } var transferTarget *roachpb.ReplicationTarget - if len(targetsToRemove) > 0 { - // Pick a replica to remove. Note that existingReplicas may already reflect - // a replica we're adding in the current round. This is the right thing - // to do. For example, consider relocating from (s1,s2,s3) to (s1,s2,s4) - // where targetsToAdd will be (s4) and targetsToRemove is (s3). In this code, - // we'll want the allocator to see if s3 can be removed from + if len(args.targetsToRemove) > 0 { + // Pick a replica to remove. Note that existingVoters/existingNonVoters may + // already reflect a replica we're adding in the current round. This is the + // right thing to do. For example, consider relocating from (s1,s2,s3) to + // (s1,s2,s4) where targetsToAdd will be (s4) and targetsToRemove is (s3). + // In this code, we'll want the allocator to see if s3 can be removed from // (s1,s2,s3,s4) which is a reasonable request; that replica set is - // overreplicated. If we asked it instead to remove s3 from (s1,s2,s3) - // it may not want to do that due to constraints. - targetStore, _, err := s.allocator.RemoveTarget(ctx, zone, targetsToRemove, existingReplicas) + // overreplicated. If we asked it instead to remove s3 from (s1,s2,s3) it + // may not want to do that due to constraints. + targetStore, _, err := args.removeFn(ctx, zone, args.targetsToRemove, existingVoters, + existingNonVoters) if err != nil { return nil, nil, errors.Wrapf(err, "unable to select removal target from %v; current replicas %v", - targetsToRemove, existingReplicas) + args.targetsToRemove, existingReplicas) } removalTarget := roachpb.ReplicationTarget{ NodeID: targetStore.NodeID, @@ -2634,17 +2658,16 @@ func (s *Store) relocateOne( } curLeaseholder := b.RawResponse().Responses[0].GetLeaseInfo().Lease.Replica ok := curLeaseholder.StoreID != removalTarget.StoreID - if !ok { - // Pick a replica that we can give the lease to. We sort the first - // target to the beginning (if it's there) because that's where the - // lease needs to be in the end. We also exclude the last replica if - // it was added by the add branch above (in which case it doesn't - // exist yet). - sortedTargetReplicas := append([]roachpb.ReplicaDescriptor(nil), existingReplicas[:len(existingReplicas)-len(ops)]...) + if !ok && !args.noVotersToRelocate { + // Pick a voting replica that we can give the lease to. We sort the first + // target to the beginning (if it's there) because that's where the lease + // needs to be in the end. We also exclude the last replica if it was + // added by the add branch above (in which case it doesn't exist yet). + sortedTargetReplicas := append([]roachpb.ReplicaDescriptor(nil), existingVoters[:len(existingVoters)-len(ops)]...) sort.Slice(sortedTargetReplicas, func(i, j int) bool { sl := sortedTargetReplicas // relocationTargets[0] goes to the front (if it's present). - return sl[i].StoreID == relocationTargets[0].StoreID + return sl[i].StoreID == args.relocationTargets[0].StoreID }) for _, rDesc := range sortedTargetReplicas { if rDesc.StoreID != curLeaseholder.StoreID { @@ -2665,7 +2688,7 @@ func (s *Store) relocateOne( // illegal). if ok { ops = append(ops, roachpb.MakeReplicationChanges( - removeOp, + args.removeOp, removalTarget)...) } } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 9494e1cfc9cc..dd73ee318bbe 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -120,6 +120,7 @@ func (e *quorumError) Error() string { func (*quorumError) purgatoryErrorMarker() {} // ReplicateQueueMetrics is the set of metrics for the replicate queue. +// TODO(aayush): Track metrics for non-voting replicas separately here. type ReplicateQueueMetrics struct { AddReplicaCount *metric.Counter RemoveReplicaCount *metric.Counter @@ -322,7 +323,9 @@ func (rq *replicateQueue) processOneChange( // Avoid taking action if the range has too many dead replicas to make // quorum. voterReplicas := desc.Replicas().VoterDescriptors() + nonVoterReplicas := desc.Replicas().NonVoterDescriptors() liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(voterReplicas) + liveNonVoterReplicas, _ := rq.allocator.storePool.liveAndDeadReplicas(nonVoterReplicas) // NB: the replication layer ensures that the below operations don't cause // unavailability; see: @@ -345,9 +348,13 @@ func (rq *replicateQueue) processOneChange( // Let the scanner requeue it again later. return false, nil case AllocatorAddVoter: - return rq.addOrReplace(ctx, repl, voterReplicas, liveVoterReplicas, -1 /* removeIdx */, dryRun) + return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, -1, dryRun) + case AllocatorAddNonVoter: + return rq.addNonVoter(ctx, repl, nonVoterReplicas, liveVoterReplicas, liveNonVoterReplicas, dryRun) case AllocatorRemoveVoter: - return rq.remove(ctx, repl, voterReplicas, dryRun) + return rq.removeVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) + case AllocatorRemoveNonVoter: + return rq.removeNonVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) case AllocatorReplaceDeadVoter: if len(deadVoterReplicas) == 0 { // Nothing to do. @@ -365,7 +372,7 @@ func (rq *replicateQueue) processOneChange( "dead voter %v unexpectedly not found in %v", deadVoterReplicas[0], voterReplicas) } - return rq.addOrReplace(ctx, repl, voterReplicas, liveVoterReplicas, removeIdx, dryRun) + return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) case AllocatorReplaceDecommissioningVoter: decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas(voterReplicas) if len(decommissioningReplicas) == 0 { @@ -384,7 +391,7 @@ func (rq *replicateQueue) processOneChange( "decommissioning voter %v unexpectedly not found in %v", decommissioningReplicas[0], voterReplicas) } - return rq.addOrReplace(ctx, repl, voterReplicas, liveVoterReplicas, removeIdx, dryRun) + return rq.addOrReplaceVoters(ctx, repl, voterReplicas, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) case AllocatorRemoveDecommissioningVoter: // NB: this path will only be hit when the range is over-replicated and // has decommissioning replicas; in the common case we'll hit @@ -404,51 +411,50 @@ func (rq *replicateQueue) processOneChange( // Requeue because either we failed to transition out of a joint state // (bad) or we did and there might be more to do for that range. return true, err - case AllocatorAddNonVoter, AllocatorRemoveNonVoter: - return false, errors.Errorf("allocator actions pertaining to non-voters are"+ - " currently not supported: %v", action) default: return false, errors.Errorf("unknown allocator action %v", action) } } -// addOrReplace adds or replaces a replica. If removeIdx is -1, an addition is -// carried out. Otherwise, removeIdx must be a valid index into existingReplicas -// and specifies which replica to replace with a new one. +// addOrReplaceVoters adds or replaces a voting replica. If removeIdx is -1, an +// addition is carried out. Otherwise, removeIdx must be a valid index into +// existingVoters and specifies which voter to replace with a new one. // // The method preferably issues an atomic replica swap, but may not be able to // do this in all cases, such as when atomic replication changes are not // available, or when the range consists of a single replica. As a fall back, // only the addition is carried out; the removal is then a follow-up step for // the next scanner cycle. -func (rq *replicateQueue) addOrReplace( +func (rq *replicateQueue) addOrReplaceVoters( ctx context.Context, repl *Replica, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters []roachpb.ReplicaDescriptor, liveVoterReplicas []roachpb.ReplicaDescriptor, - removeIdx int, // -1 for no removal + liveNonVoterReplicas []roachpb.ReplicaDescriptor, + removeIdx int, dryRun bool, ) (requeue bool, _ error) { - if len(existingReplicas) == 1 { + if len(existingVoters) == 1 { // If only one replica remains, that replica is the leaseholder and // we won't be able to swap it out. Ignore the removal and simply add // a replica. removeIdx = -1 } - remainingLiveReplicas := liveVoterReplicas + remainingLiveVoters := liveVoterReplicas + remainingLiveNonVoters := liveNonVoterReplicas if removeIdx >= 0 { - replToRemove := existingReplicas[removeIdx] + replToRemove := existingVoters[removeIdx] for i, r := range liveVoterReplicas { if r.ReplicaID == replToRemove.ReplicaID { - remainingLiveReplicas = append(liveVoterReplicas[:i:i], liveVoterReplicas[i+1:]...) + remainingLiveVoters = append(liveVoterReplicas[:i:i], liveVoterReplicas[i+1:]...) break } } // See about transferring the lease away if we're about to remove the // leaseholder. done, err := rq.maybeTransferLeaseAway( - ctx, repl, existingReplicas[removeIdx].StoreID, dryRun, nil /* canTransferLease */) + ctx, repl, existingVoters[removeIdx].StoreID, dryRun, nil /* canTransferLease */) if err != nil { return false, err } @@ -464,15 +470,11 @@ func (rq *replicateQueue) addOrReplace( // there is a reason we're removing it (i.e. dead or decommissioning). If we // left the replica in the slice, the allocator would not be guaranteed to // pick a replica that fills the gap removeRepl leaves once it's gone. - newStore, details, err := rq.allocator.AllocateTarget( - ctx, - zone, - remainingLiveReplicas, - ) + newStore, details, err := rq.allocator.AllocateVoterTarget(ctx, zone, remainingLiveVoters, remainingLiveNonVoters) if err != nil { return false, err } - if removeIdx >= 0 && newStore.StoreID == existingReplicas[removeIdx].StoreID { + if removeIdx >= 0 && newStore.StoreID == existingVoters[removeIdx].StoreID { return false, errors.AssertionFailedf("allocator suggested to replace replica on s%d with itself", newStore.StoreID) } newReplica := roachpb.ReplicationTarget{ @@ -481,7 +483,7 @@ func (rq *replicateQueue) addOrReplace( } clusterNodes := rq.allocator.storePool.ClusterNodeCount() - need := GetNeededVoters(*zone.NumReplicas, clusterNodes) + neededVoters := GetNeededVoters(zone.GetNumVoters(), clusterNodes) // Only up-replicate if there are suitable allocation targets such that, // either the replication goal is met, or it is possible to get to the next @@ -490,28 +492,24 @@ func (rq *replicateQueue) addOrReplace( // quorum. For example, up-replicating from 1 to 2 replicas only makes sense // if it is possible to be able to go to 3 replicas. // - // NB: If willHave > need, then always allow up-replicating as that + // NB: If willHave > neededVoters, then always allow up-replicating as that // will be the case when up-replicating a range with a decommissioning // replica. // // We skip this check if we're swapping a replica, since that does not // change the quorum size. - if willHave := len(existingReplicas) + 1; removeIdx < 0 && willHave < need && willHave%2 == 0 { + if willHave := len(existingVoters) + 1; removeIdx < 0 && willHave < neededVoters && willHave%2 == 0 { // This means we are going to up-replicate to an even replica state. // Check if it is possible to go to an odd replica state beyond it. - oldPlusNewReplicas := append([]roachpb.ReplicaDescriptor(nil), existingReplicas...) + oldPlusNewReplicas := append([]roachpb.ReplicaDescriptor(nil), existingVoters...) oldPlusNewReplicas = append(oldPlusNewReplicas, roachpb.ReplicaDescriptor{ NodeID: newStore.Node.NodeID, StoreID: newStore.StoreID, }) - _, _, err := rq.allocator.AllocateTarget( - ctx, - zone, - oldPlusNewReplicas, - ) + _, _, err := rq.allocator.AllocateVoterTarget(ctx, zone, oldPlusNewReplicas, remainingLiveNonVoters) if err != nil { // It does not seem possible to go to the next odd replica state. Note - // that AllocateTarget returns an allocatorError (a purgatoryError) + // that AllocateVoterTarget returns an allocatorError (a purgatoryError) // when purgatory is requested. return false, errors.Wrap(err, "avoid up-replicating to fragile quorum") } @@ -520,12 +518,12 @@ func (rq *replicateQueue) addOrReplace( ops := roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newReplica) if removeIdx < 0 { log.VEventf(ctx, 1, "adding replica %+v: %s", - newReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas)) + newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) } else { rq.metrics.RemoveReplicaCount.Inc(1) - removeReplica := existingReplicas[removeIdx] + removeReplica := existingVoters[removeIdx] log.VEventf(ctx, 1, "replacing replica %s with %+v: %s", - removeReplica, newReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas)) + removeReplica, newReplica, rangeRaftProgress(repl.RaftStatus(), existingVoters)) ops = append(ops, roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, roachpb.ReplicationTarget{ StoreID: removeReplica.StoreID, @@ -549,17 +547,56 @@ func (rq *replicateQueue) addOrReplace( return true, nil } -// findRemoveTarget takes a list of replicas and picks one to remove, making -// sure to not remove a newly added replica or to violate the zone configs in -// the progress. -func (rq *replicateQueue) findRemoveTarget( +func (rq *replicateQueue) addNonVoter( + ctx context.Context, + repl *Replica, + existingReplicas, liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, + dryRun bool, +) (requeue bool, _ error) { + desc, zone := repl.DescAndZone() + + newStore, details, err := rq.allocator.AllocateNonVoterTarget(ctx, zone, liveVoterReplicas, liveNonVoterReplicas) + if err != nil { + return false, err + } + newNonVoter := roachpb.ReplicationTarget{ + NodeID: newStore.Node.NodeID, + StoreID: newStore.StoreID, + } + + ops := roachpb.MakeReplicationChanges(roachpb.ADD_NON_VOTER, newNonVoter) + if err := rq.changeReplicas( + ctx, + repl, + ops, + desc, + SnapshotRequest_RECOVERY, + kvserverpb.ReasonRangeUnderReplicated, + details, + dryRun, + ); err != nil { + return false, err + } + // Always requeue to see if more work needs to be done. + return true, nil +} + +// findRemoveVoter takes a list of voting replicas and picks one to remove, +// making sure to not remove a newly added voter or to violate the zone configs +// in the process. +// +// TODO(aayush): The structure around replica removal is not great. The entire +// logic of this method should probably live inside Allocator.RemoveVoter. Doing +// so also makes the flow of adding new replicas and removing replicas more +// symmetric. +func (rq *replicateQueue) findRemoveVoter( ctx context.Context, repl interface { DescAndZone() (*roachpb.RangeDescriptor, *zonepb.ZoneConfig) LastReplicaAdded() (roachpb.ReplicaID, time.Time) RaftStatus() *raft.Status }, - existingReplicas []roachpb.ReplicaDescriptor, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, ) (roachpb.ReplicaDescriptor, string, error) { _, zone := repl.DescAndZone() // This retry loop involves quick operations on local state, so a @@ -587,9 +624,9 @@ func (rq *replicateQueue) findRemoveTarget( // If we've lost raft leadership, we're unlikely to regain it so give up immediately. return roachpb.ReplicaDescriptor{}, "", &benignError{errors.Errorf("not raft leader while range needs removal")} } - candidates = filterUnremovableReplicas(ctx, raftStatus, existingReplicas, lastReplAdded) + candidates = filterUnremovableReplicas(ctx, raftStatus, existingVoters, lastReplAdded) log.VEventf(ctx, 3, "filtered unremovable replicas from %v to get %v as candidates for removal: %s", - existingReplicas, candidates, rangeRaftProgress(raftStatus, existingReplicas)) + existingVoters, candidates, rangeRaftProgress(raftStatus, existingVoters)) if len(candidates) > 0 { break } @@ -617,10 +654,10 @@ func (rq *replicateQueue) findRemoveTarget( if len(candidates) == 0 { // If we timed out and still don't have any valid candidates, give up. return roachpb.ReplicaDescriptor{}, "", &benignError{errors.Errorf("no removable replicas from range that needs a removal: %s", - rangeRaftProgress(repl.RaftStatus(), existingReplicas))} + rangeRaftProgress(repl.RaftStatus(), existingVoters))} } - return rq.allocator.RemoveTarget(ctx, zone, candidates, existingReplicas) + return rq.allocator.RemoveVoter(ctx, zone, candidates, existingVoters, existingNonVoters) } // maybeTransferLeaseAway is called whenever a replica on a given store is @@ -668,15 +705,18 @@ func (rq *replicateQueue) maybeTransferLeaseAway( return transferred == transferOK, err } -func (rq *replicateQueue) remove( - ctx context.Context, repl *Replica, existingReplicas []roachpb.ReplicaDescriptor, dryRun bool, +func (rq *replicateQueue) removeVoter( + ctx context.Context, + repl *Replica, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + dryRun bool, ) (requeue bool, _ error) { - removeReplica, details, err := rq.findRemoveTarget(ctx, repl, existingReplicas) + removeVoter, details, err := rq.findRemoveVoter(ctx, repl, existingVoters, existingNonVoters) if err != nil { return false, err } done, err := rq.maybeTransferLeaseAway( - ctx, repl, removeReplica.StoreID, dryRun, nil /* canTransferLease */) + ctx, repl, removeVoter.StoreID, dryRun, nil /* canTransferLease */) if err != nil { return false, err } @@ -687,11 +727,11 @@ func (rq *replicateQueue) remove( // Remove a replica. rq.metrics.RemoveReplicaCount.Inc(1) - log.VEventf(ctx, 1, "removing replica %+v due to over-replication: %s", - removeReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas)) + log.VEventf(ctx, 1, "removing voting replica %+v due to over-replication: %s", + removeVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) target := roachpb.ReplicationTarget{ - NodeID: removeReplica.NodeID, - StoreID: removeReplica.StoreID, + NodeID: removeVoter.NodeID, + StoreID: removeVoter.StoreID, } desc, _ := repl.DescAndZone() if err := rq.changeReplicas( @@ -709,6 +749,40 @@ func (rq *replicateQueue) remove( return true, nil } +func (rq *replicateQueue) removeNonVoter( + ctx context.Context, + repl *Replica, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + dryRun bool, +) (requeue bool, _ error) { + rq.metrics.RemoveReplicaCount.Inc(1) + + desc, zone := repl.DescAndZone() + removeNonVoter, details, err := rq.allocator.RemoveNonVoter( + ctx, + zone, + existingNonVoters, + existingVoters, existingNonVoters, + ) + if err != nil { + return false, err + } + + log.VEventf(ctx, 1, "removing non-voting replica %+v due to over-replication: %s", + removeNonVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) + target := roachpb.ReplicationTarget{ + NodeID: removeNonVoter.NodeID, + StoreID: removeNonVoter.StoreID, + } + + if err := rq.changeReplicas(ctx, repl, + roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, target), desc, SnapshotRequest_UNKNOWN, + kvserverpb.ReasonRangeOverReplicated, details, dryRun); err != nil { + return false, err + } + return true, nil +} + func (rq *replicateQueue) removeDecommissioning( ctx context.Context, repl *Replica, dryRun bool, ) (requeue bool, _ error) { diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 914e02349d72..a159d7b76cc9 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -161,11 +161,12 @@ func testReplicateQueueRebalanceInner(t *testing.T, atomic bool) { } } -// Test that up-replication only proceeds if there are a good number of -// candidates to up-replicate to. Specifically, we won't up-replicate to an -// even number of replicas unless there is an additional candidate that will -// allow a subsequent up-replication to an odd number. -func TestReplicateQueueUpReplicate(t *testing.T) { +// TestReplicateQueueUpReplicateOddVoters tests that up-replication only +// proceeds if there are a good number of candidates to up-replicate to. +// Specifically, we won't up-replicate to an even number of replicas unless +// there is an additional candidate that will allow a subsequent up-replication +// to an odd number. +func TestReplicateQueueUpReplicateOddVoters(t *testing.T) { defer leaktest.AfterTest(t)() skip.UnderRaceWithIssue(t, 57144, "flaky under race") defer log.Scope(t).Close(t) @@ -300,6 +301,71 @@ func TestReplicateQueueDownReplicate(t *testing.T) { } } +// TestReplicateQueueUpAndDownReplicateNonVoters is an end-to-end test ensuring +// that the replicateQueue will add or remove non-voter(s) to a range based on +// updates to its zone configuration. +func TestReplicateQueueUpAndDownReplicateNonVoters(t *testing.T) { + defer leaktest.AfterTest(t)() + skip.UnderRace(t) + defer log.Scope(t).Close(t) + + tc := testcluster.StartTestCluster(t, 1, + base.TestClusterArgs{ReplicationMode: base.ReplicationAuto}, + ) + defer tc.Stopper().Stop(context.Background()) + + scratchKey := tc.ScratchRange(t) + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + + // Since we started the TestCluster with 1 node, that first node should have + // 1 voting replica. + require.Len(t, scratchRange.Replicas().VoterDescriptors(), 1) + // Set up the default zone configs such that every range should have 1 voting + // replica and 2 non-voting replicas. + _, err := tc.ServerConn(0).Exec( + `ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 3, num_voters = 1`, + ) + require.NoError(t, err) + + // Add two new servers and expect that 2 non-voters are added to the range. + tc.AddAndStartServer(t, base.TestServerArgs{}) + tc.AddAndStartServer(t, base.TestServerArgs{}) + store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore( + tc.Server(0).GetFirstStoreID()) + require.NoError(t, err) + + get := func() int { + // Nudge the replicateQueue to up/down-replicate our scratch range. + if err := store.ForceReplicationScanAndProcess(); err != nil { + t.Fatal(err) + } + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + return len(scratchRange.Replicas().NonVoterDescriptors()) + } + + var expectedNonVoterCount = 2 + testutils.SucceedsSoon(t, func() error { + if found := get(); found != expectedNonVoterCount { + return errors.Errorf("expected upreplication to %d non-voters; found %d", + expectedNonVoterCount, found) + } + return nil + }) + + // Now remove all non-voting replicas and expect that the range will + // down-replicate to having just 1 voting replica. + _, err = tc.ServerConn(0).Exec(`ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 1`) + require.NoError(t, err) + expectedNonVoterCount = 0 + testutils.SucceedsSoon(t, func() error { + if found := get(); found != expectedNonVoterCount { + return errors.Errorf("expected downreplication to %d non-voters; found %d", + expectedNonVoterCount, found) + } + return nil + }) +} + // queryRangeLog queries the range log. The query must be of type: // `SELECT info from system.rangelog ...`. func queryRangeLog( diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index f14c3a9200a9..1e06a2c5eaf9 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -504,40 +504,41 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( desc.RangeID, replWithStats.qps) clusterNodes := sr.rq.allocator.storePool.ClusterNodeCount() - desiredReplicas := GetNeededVoters(*zone.NumReplicas, clusterNodes) - targets := make([]roachpb.ReplicationTarget, 0, desiredReplicas) - targetReplicas := make([]roachpb.ReplicaDescriptor, 0, desiredReplicas) - currentReplicas := desc.Replicas().Descriptors() + desiredVoters := GetNeededVoters(zone.GetNumVoters(), clusterNodes) + targets := make([]roachpb.ReplicationTarget, 0, desiredVoters) + targetVoters := make([]roachpb.ReplicaDescriptor, 0, desiredVoters) + currentVoters := desc.Replicas().VoterDescriptors() + currentNonVoters := desc.Replicas().NonVoterDescriptors() // Check the range's existing diversity score, since we want to ensure we // don't hurt locality diversity just to improve QPS. curDiversity := rangeDiversityScore( - sr.rq.allocator.storePool.getLocalitiesByStore(currentReplicas)) + sr.rq.allocator.storePool.getLocalitiesByStore(currentVoters)) // Check the existing replicas, keeping around those that aren't overloaded. - for i := range currentReplicas { - if currentReplicas[i].StoreID == localDesc.StoreID { + for i := range currentVoters { + if currentVoters[i].StoreID == localDesc.StoreID { continue } // Keep the replica in the range if we don't know its QPS or if its QPS // is below the upper threshold. Punishing stores not in our store map // could cause mass evictions if the storePool gets out of sync. - storeDesc, ok := storeMap[currentReplicas[i].StoreID] + storeDesc, ok := storeMap[currentVoters[i].StoreID] if !ok || storeDesc.Capacity.QueriesPerSecond < maxQPS { if log.V(3) { var reason redact.RedactableString if ok { reason = redact.Sprintf(" (qps %.2f vs max %.2f)", storeDesc.Capacity.QueriesPerSecond, maxQPS) } - log.VEventf(ctx, 3, "keeping r%d/%d on s%d%s", desc.RangeID, currentReplicas[i].ReplicaID, currentReplicas[i].StoreID, reason) + log.VEventf(ctx, 3, "keeping r%d/%d on s%d%s", desc.RangeID, currentVoters[i].ReplicaID, currentVoters[i].StoreID, reason) } targets = append(targets, roachpb.ReplicationTarget{ - NodeID: currentReplicas[i].NodeID, - StoreID: currentReplicas[i].StoreID, + NodeID: currentVoters[i].NodeID, + StoreID: currentVoters[i].StoreID, }) - targetReplicas = append(targetReplicas, roachpb.ReplicaDescriptor{ - NodeID: currentReplicas[i].NodeID, - StoreID: currentReplicas[i].StoreID, + targetVoters = append(targetVoters, roachpb.ReplicaDescriptor{ + NodeID: currentVoters[i].NodeID, + StoreID: currentVoters[i].StoreID, }) } } @@ -545,15 +546,18 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( // Then pick out which new stores to add the remaining replicas to. options := sr.rq.allocator.scorerOptions() options.qpsRebalanceThreshold = qpsRebalanceThreshold.Get(&sr.st.SV) - for len(targets) < desiredReplicas { - // Use the preexisting AllocateTarget logic to ensure that considerations + for len(targets) < desiredVoters { + // Use the preexisting AllocateVoterTarget logic to ensure that considerations // such as zone constraints, locality diversity, and full disk come // into play. - target, _ := sr.rq.allocator.allocateTargetFromList( + target, _ := sr.rq.allocator.allocateVotersFromList( ctx, storeList, zone, - targetReplicas, + targetVoters, + // TODO(aayush): For now, we're not going to let the StoreRebalancer + // rebalance non-voting replicas. Fix this. + currentNonVoters, options, ) if target == nil { @@ -571,7 +575,7 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( NodeID: target.Node.NodeID, StoreID: target.StoreID, }) - targetReplicas = append(targetReplicas, roachpb.ReplicaDescriptor{ + targetVoters = append(targetVoters, roachpb.ReplicaDescriptor{ NodeID: target.Node.NodeID, StoreID: target.StoreID, }) @@ -584,12 +588,12 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( // moving one of the other existing replicas that's on a store with less // qps than the max threshold but above the mean would help in certain // locality configurations. - if len(targets) < desiredReplicas { + if len(targets) < desiredVoters { log.VEventf(ctx, 3, "couldn't find enough rebalance targets for r%d (%d/%d)", - desc.RangeID, len(targets), desiredReplicas) + desc.RangeID, len(targets), desiredVoters) continue } - newDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalitiesByStore(targetReplicas)) + newDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalitiesByStore(targetVoters)) if newDiversity < curDiversity { log.VEventf(ctx, 3, "new diversity %.2f for r%d worse than current diversity %.2f; not rebalancing",