From ef6db0c342992ddc6f1ed877c6cfaeec6cde50d4 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Tue, 15 Nov 2022 16:04:57 -0500 Subject: [PATCH] allocator: refactor allocator to accept StorePool arguments This change adds methods to evaluate allocator actions and targets utilizing a passed-in `StorePool` object, allowing for the allocator to consider potential scenarios rather than those simply based on the current liveness. Depends on #91461, #91965. Part of #91570. Release note: None --- .../allocator/allocatorimpl/allocator.go | 179 +++--- .../allocator/allocatorimpl/allocator_test.go | 526 +++++++++++++++++- .../storepool/override_store_pool.go | 15 + .../allocator/storepool/store_pool.go | 26 + pkg/kv/kvserver/allocator_impl_test.go | 9 +- pkg/kv/kvserver/asim/queue/replicate_queue.go | 5 +- pkg/kv/kvserver/replica_command.go | 9 +- pkg/kv/kvserver/replicate_queue.go | 19 +- pkg/kv/kvserver/store_rebalancer.go | 5 + 9 files changed, 695 insertions(+), 98 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index a1abf47f546c..7ff9616a2f0a 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -478,6 +478,7 @@ type AllocatorMetrics struct { // in the cluster. type Allocator struct { st *cluster.Settings + deterministic bool StorePool storepool.AllocatorStorePool nodeLatencyFn func(addr string) (time.Duration, bool) // TODO(aayush): Let's replace this with a *rand.Rand that has a rand.Source @@ -516,17 +517,19 @@ func MakeAllocator( knobs *allocator.TestingKnobs, ) Allocator { var randSource rand.Source + var deterministic bool // There are number of test cases that make a test store but don't add // gossip or a store pool. So we can't rely on the existence of the // store pool in those cases. if storePool != nil && storePool.IsDeterministic() { randSource = rand.NewSource(777) + deterministic = true } else { - randSource = rand.NewSource(rand.Int63()) } allocator := Allocator{ st: st, + deterministic: deterministic, StorePool: storePool, nodeLatencyFn: nodeLatencyFn, randGen: makeAllocatorRand(randSource), @@ -597,9 +600,12 @@ func GetNeededNonVoters(numVoters, zoneConfigNonVoterCount, clusterNodes int) in // supplied range, as governed by the supplied zone configuration. It // returns the required action that should be taken and a priority. func (a *Allocator) ComputeAction( - ctx context.Context, conf roachpb.SpanConfig, desc *roachpb.RangeDescriptor, + ctx context.Context, + storePool storepool.AllocatorStorePool, + conf roachpb.SpanConfig, + desc *roachpb.RangeDescriptor, ) (action AllocatorAction, priority float64) { - if a.StorePool == nil { + if storePool == nil { // Do nothing if storePool is nil for some unittests. action = AllocatorNoop return action, action.Priority() @@ -657,13 +663,13 @@ func (a *Allocator) ComputeAction( return action, action.Priority() } - return a.computeAction(ctx, conf, desc.Replicas().VoterDescriptors(), + return a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(), desc.Replicas().NonVoterDescriptors()) - } func (a *Allocator) computeAction( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, voterReplicas []roachpb.ReplicaDescriptor, nonVoterReplicas []roachpb.ReplicaDescriptor, @@ -681,10 +687,10 @@ func (a *Allocator) computeAction( // After that we handle rebalancing related actions, followed by removal // actions. haveVoters := len(voterReplicas) - decommissioningVoters := a.StorePool.DecommissioningReplicas(voterReplicas) + decommissioningVoters := storePool.DecommissioningReplicas(voterReplicas) // Node count including dead nodes but excluding // decommissioning/decommissioned nodes. - clusterNodes := a.StorePool.ClusterNodeCount() + clusterNodes := storePool.ClusterNodeCount() neededVoters := GetNeededVoters(conf.GetNumVoters(), clusterNodes) desiredQuorum := computeQuorum(neededVoters) quorum := computeQuorum(haveVoters) @@ -710,7 +716,7 @@ func (a *Allocator) computeAction( // heartbeat in the recent past. This means we won't move those replicas // elsewhere (for a regular rebalance or for decommissioning). const includeSuspectAndDrainingStores = true - liveVoters, deadVoters := a.StorePool.LiveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores) + liveVoters, deadVoters := storePool.LiveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores) if len(liveVoters) < quorum { // Do not take any replacement/removal action if we do not have a quorum of @@ -789,7 +795,7 @@ func (a *Allocator) computeAction( return action, action.Priority() } - liveNonVoters, deadNonVoters := a.StorePool.LiveAndDeadReplicas( + liveNonVoters, deadNonVoters := storePool.LiveAndDeadReplicas( nonVoterReplicas, includeSuspectAndDrainingStores, ) if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 { @@ -800,7 +806,7 @@ func (a *Allocator) computeAction( return action, action.Priority() } - decommissioningNonVoters := a.StorePool.DecommissioningReplicas(nonVoterReplicas) + decommissioningNonVoters := storePool.DecommissioningReplicas(nonVoterReplicas) if haveNonVoters == neededNonVoters && len(decommissioningNonVoters) > 0 { // The range has non-voter(s) on a decommissioning node that we should // replace. @@ -922,12 +928,13 @@ func (s *GoodCandidateSelector) selectOne(cl candidateList) *candidate { func (a *Allocator) allocateTarget( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string, error) { - candidateStoreList, aliveStoreCount, throttled := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + candidateStoreList, aliveStoreCount, throttled := storePool.GetStoreList(storepool.StoreFilterThrottled) // If the replica is alive we are upreplicating, and in that case we want to // allocate new replicas on the best possible store. Otherwise, the replica is @@ -941,8 +948,9 @@ func (a *Allocator) allocateTarget( selector = a.NewGoodCandidateSelector() } - target, details := a.AllocateTargetFromList( + target, details := a.allocateTargetFromList( ctx, + storePool, candidateStoreList, conf, existingVoters, @@ -984,11 +992,12 @@ func (a *Allocator) allocateTarget( // voting replicas are ruled out as targets. func (a *Allocator) AllocateVoter( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget) + return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget) } // AllocateNonVoter returns a suitable store for a new allocation of a @@ -996,11 +1005,12 @@ func (a *Allocator) AllocateVoter( // _any_ existing replicas are ruled out as targets. func (a *Allocator) AllocateNonVoter( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget) + return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget) } // AllocateTargetFromList returns a suitable store for a new allocation of a @@ -1008,6 +1018,22 @@ func (a *Allocator) AllocateNonVoter( // existing set of voters and non-voters.. func (a *Allocator) AllocateTargetFromList( ctx context.Context, + storePool storepool.AllocatorStorePool, + candidateStores storepool.StoreList, + conf roachpb.SpanConfig, + existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + options ScorerOptions, + selector CandidateSelector, + allowMultipleReplsPerNode bool, + targetType TargetReplicaType, +) (roachpb.ReplicationTarget, string) { + return a.allocateTargetFromList(ctx, storePool, candidateStores, conf, existingVoters, + existingNonVoters, options, selector, allowMultipleReplsPerNode, targetType) +} + +func (a *Allocator) allocateTargetFromList( + ctx context.Context, + storePool storepool.AllocatorStorePool, candidateStores storepool.StoreList, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, @@ -1018,13 +1044,13 @@ func (a *Allocator) AllocateTargetFromList( ) (roachpb.ReplicationTarget, string) { existingReplicas := append(existingVoters, existingNonVoters...) analyzedOverallConstraints := constraint.AnalyzeConstraints( - a.StorePool, + storePool, existingReplicas, conf.NumReplicas, conf.Constraints, ) analyzedVoterConstraints := constraint.AnalyzeConstraints( - a.StorePool, + storePool, existingVoters, conf.GetNumVoters(), conf.VoterConstraints, @@ -1056,8 +1082,8 @@ func (a *Allocator) AllocateTargetFromList( constraintsChecker, existingReplicaSet, existingNonVoters, - a.StorePool.GetLocalitiesByStore(existingReplicaSet), - a.StorePool.IsStoreReadyForRoutineReplicaTransfer, + storePool.GetLocalitiesByStore(existingReplicaSet), + storePool.IsStoreReadyForRoutineReplicaTransfer, allowMultipleReplsPerNode, options, targetType, @@ -1081,6 +1107,7 @@ func (a *Allocator) AllocateTargetFromList( func (a Allocator) simulateRemoveTarget( ctx context.Context, + storePool storepool.AllocatorStorePool, targetStore roachpb.StoreID, conf roachpb.SpanConfig, candidates []roachpb.ReplicaDescriptor, @@ -1103,8 +1130,8 @@ func (a Allocator) simulateRemoveTarget( // Update statistics first switch t := targetType; t { case VoterTarget: - a.StorePool.UpdateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_VOTER) - defer a.StorePool.UpdateLocalStoreAfterRebalance( + storePool.UpdateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_VOTER) + defer storePool.UpdateLocalStoreAfterRebalance( targetStore, rangeUsageInfo, roachpb.REMOVE_VOTER, @@ -1113,12 +1140,12 @@ func (a Allocator) simulateRemoveTarget( targetStore) return a.RemoveTarget( - ctx, conf, storepool.MakeStoreList(candidateStores), + ctx, storePool, conf, storepool.MakeStoreList(candidateStores), existingVoters, existingNonVoters, VoterTarget, options, ) case NonVoterTarget: - a.StorePool.UpdateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_NON_VOTER) - defer a.StorePool.UpdateLocalStoreAfterRebalance( + storePool.UpdateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.ADD_NON_VOTER) + defer storePool.UpdateLocalStoreAfterRebalance( targetStore, rangeUsageInfo, roachpb.REMOVE_NON_VOTER, @@ -1126,7 +1153,7 @@ func (a Allocator) simulateRemoveTarget( log.KvDistribution.VEventf(ctx, 3, "simulating which non-voter would be removed after adding s%d", targetStore) return a.RemoveTarget( - ctx, conf, storepool.MakeStoreList(candidateStores), + ctx, storePool, conf, storepool.MakeStoreList(candidateStores), existingVoters, existingNonVoters, NonVoterTarget, options, ) default: @@ -1134,24 +1161,11 @@ func (a Allocator) simulateRemoveTarget( } } -// StoreListForTargets converts the set of replica targets to a StoreList. -func (a Allocator) StoreListForTargets(candidates []roachpb.ReplicationTarget) storepool.StoreList { - result := make([]roachpb.StoreDescriptor, 0, len(candidates)) - sl, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterNone) - for _, cand := range candidates { - for _, store := range sl.Stores { - if cand.StoreID == store.StoreID { - result = append(result, store) - } - } - } - return storepool.MakeStoreList(result) -} - // RemoveTarget returns a suitable replica (of the given type) to remove from // the provided set of replicas. func (a Allocator) RemoveTarget( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, candidateStoreList storepool.StoreList, existingVoters []roachpb.ReplicaDescriptor, @@ -1168,13 +1182,13 @@ func (a Allocator) RemoveTarget( existingReplicas := append(existingVoters, existingNonVoters...) analyzedOverallConstraints := constraint.AnalyzeConstraints( - a.StorePool, + storePool, existingReplicas, conf.NumReplicas, conf.Constraints, ) analyzedVoterConstraints := constraint.AnalyzeConstraints( - a.StorePool, + storePool, existingVoters, conf.GetNumVoters(), conf.VoterConstraints, @@ -1201,7 +1215,7 @@ func (a Allocator) RemoveTarget( ctx, candidateStoreList, constraintsChecker, - a.StorePool.GetLocalitiesByStore(replicaSetForDiversityCalc), + storePool.GetLocalitiesByStore(replicaSetForDiversityCalc), options, ) @@ -1231,6 +1245,7 @@ func (a Allocator) RemoveTarget( // back to selecting a random target from any of the existing voting replicas. func (a Allocator) RemoveVoter( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, voterCandidates []roachpb.ReplicaDescriptor, existingVoters []roachpb.ReplicaDescriptor, @@ -1242,10 +1257,11 @@ func (a Allocator) RemoveVoter( for i, exist := range voterCandidates { candidateStoreIDs[i] = exist.StoreID } - candidateStoreList, _, _ := a.StorePool.GetStoreListFromIDs(candidateStoreIDs, storepool.StoreFilterNone) + candidateStoreList, _, _ := storePool.GetStoreListFromIDs(candidateStoreIDs, storepool.StoreFilterNone) return a.RemoveTarget( ctx, + storePool, conf, candidateStoreList, existingVoters, @@ -1262,6 +1278,7 @@ func (a Allocator) RemoveVoter( // non-voting replicas. func (a Allocator) RemoveNonVoter( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, nonVoterCandidates []roachpb.ReplicaDescriptor, existingVoters []roachpb.ReplicaDescriptor, @@ -1273,10 +1290,11 @@ func (a Allocator) RemoveNonVoter( for i, exist := range nonVoterCandidates { candidateStoreIDs[i] = exist.StoreID } - candidateStoreList, _, _ := a.StorePool.GetStoreListFromIDs(candidateStoreIDs, storepool.StoreFilterNone) + candidateStoreList, _, _ := storePool.GetStoreListFromIDs(candidateStoreIDs, storepool.StoreFilterNone) return a.RemoveTarget( ctx, + storePool, conf, candidateStoreList, existingVoters, @@ -1290,6 +1308,7 @@ func (a Allocator) RemoveNonVoter( // type) with required attributes. func (a Allocator) RebalanceTarget( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, raftStatus *raft.Status, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, @@ -1298,7 +1317,7 @@ func (a Allocator) RebalanceTarget( targetType TargetReplicaType, options ScorerOptions, ) (add, remove roachpb.ReplicationTarget, details string, ok bool) { - sl, _, _ := a.StorePool.GetStoreList(filter) + sl, _, _ := storePool.GetStoreList(filter) // If we're considering a rebalance due to an `AdminScatterRequest`, we'd like // to ensure that we're returning a random rebalance target to a new store @@ -1310,13 +1329,13 @@ func (a Allocator) RebalanceTarget( zero := roachpb.ReplicationTarget{} analyzedOverallConstraints := constraint.AnalyzeConstraints( - a.StorePool, + storePool, existingReplicas, conf.NumReplicas, conf.Constraints, ) analyzedVoterConstraints := constraint.AnalyzeConstraints( - a.StorePool, + storePool, existingVoters, conf.GetNumVoters(), conf.VoterConstraints, @@ -1360,8 +1379,8 @@ func (a Allocator) RebalanceTarget( rebalanceConstraintsChecker, replicaSetToRebalance, replicasWithExcludedStores, - a.StorePool.GetLocalitiesByStore(replicaSetForDiversityCalc), - a.StorePool.IsStoreReadyForRoutineReplicaTransfer, + storePool.GetLocalitiesByStore(replicaSetForDiversityCalc), + storePool.IsStoreReadyForRoutineReplicaTransfer, options, a.Metrics, ) @@ -1411,6 +1430,7 @@ func (a Allocator) RebalanceTarget( var err error removeReplica, removeDetails, err = a.simulateRemoveTarget( ctx, + storePool, target.store.StoreID, conf, replicaCandidates, @@ -1484,6 +1504,7 @@ func (a Allocator) RebalanceTarget( // opportunity was found). func (a Allocator) RebalanceVoter( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, raftStatus *raft.Status, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, @@ -1493,6 +1514,7 @@ func (a Allocator) RebalanceVoter( ) (add, remove roachpb.ReplicationTarget, details string, ok bool) { return a.RebalanceTarget( ctx, + storePool, conf, raftStatus, existingVoters, @@ -1518,6 +1540,7 @@ func (a Allocator) RebalanceVoter( // replicas. func (a Allocator) RebalanceNonVoter( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, raftStatus *raft.Status, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, @@ -1527,6 +1550,7 @@ func (a Allocator) RebalanceNonVoter( ) (add, remove roachpb.ReplicationTarget, details string, ok bool) { return a.RebalanceTarget( ctx, + storePool, conf, raftStatus, existingVoters, @@ -1543,7 +1567,7 @@ func (a Allocator) RebalanceNonVoter( func (a *Allocator) ScorerOptions(ctx context.Context) *RangeCountScorerOptions { return &RangeCountScorerOptions{ StoreHealthOptions: a.StoreHealthOptions(ctx), - deterministic: a.StorePool.IsDeterministic(), + deterministic: a.deterministic, rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.st.SV), } } @@ -1553,7 +1577,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO return &ScatterScorerOptions{ RangeCountScorerOptions: RangeCountScorerOptions{ StoreHealthOptions: a.StoreHealthOptions(ctx), - deterministic: a.StorePool.IsDeterministic(), + deterministic: a.deterministic, rangeRebalanceThreshold: 0, }, // We set jitter to be equal to the padding around replica-count rebalancing @@ -1578,6 +1602,7 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO // replicas need a snapshot or not), produces no results. func (a *Allocator) ValidLeaseTargets( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor, leaseRepl interface { @@ -1601,7 +1626,7 @@ func (a *Allocator) ValidLeaseTargets( } candidates = append(candidates, existing[i]) } - candidates, _ = a.StorePool.LiveAndDeadReplicas( + candidates, _ = storePool.LiveAndDeadReplicas( candidates, false, /* includeSuspectAndDrainingStores */ ) @@ -1653,7 +1678,7 @@ func (a *Allocator) ValidLeaseTargets( // Determine which store(s) is preferred based on user-specified preferences. // If any stores match, only consider those stores as candidates. - preferred := a.PreferredLeaseholders(conf, candidates) + preferred := a.PreferredLeaseholders(storePool, conf, candidates) if len(preferred) > 0 { candidates = preferred } @@ -1666,6 +1691,7 @@ func (a *Allocator) ValidLeaseTargets( // some existing replica. func (a *Allocator) leaseholderShouldMoveDueToPreferences( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, leaseRepl interface { StoreID() roachpb.StoreID @@ -1693,12 +1719,12 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences( } // Exclude suspect/draining/dead stores. - candidates, _ := a.StorePool.LiveAndDeadReplicas( + candidates, _ := storePool.LiveAndDeadReplicas( allExistingReplicas, false, /* includeSuspectAndDrainingStores */ ) // If there are any replicas that do match lease preferences, then we check if // the existing leaseholder is one of them. - preferred := a.PreferredLeaseholders(conf, candidates) + preferred := a.PreferredLeaseholders(storePool, conf, candidates) preferred = excludeReplicasInNeedOfSnapshots( ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), preferred) if len(preferred) == 0 { @@ -1742,6 +1768,7 @@ func (a *Allocator) StoreHealthOptions(_ context.Context) StoreHealthOptions { // to a learner. func (a *Allocator) TransferLeaseTarget( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor, leaseRepl interface { @@ -1756,26 +1783,26 @@ func (a *Allocator) TransferLeaseTarget( opts allocator.TransferLeaseOptions, ) roachpb.ReplicaDescriptor { excludeLeaseRepl := opts.ExcludeLeaseRepl - if a.leaseholderShouldMoveDueToPreferences(ctx, conf, leaseRepl, existing) { + if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) { // Explicitly exclude the current leaseholder from the result set if it is // in violation of lease preferences that can be satisfied by some other // replica. excludeLeaseRepl = true } - allStoresList, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterNone) + allStoresList, _, _ := storePool.GetStoreList(storepool.StoreFilterNone) storeDescMap := allStoresList.ToMap() - sl, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterSuspect) + sl, _, _ := storePool.GetStoreList(storepool.StoreFilterSuspect) sl = sl.ExcludeInvalid(conf.Constraints) sl = sl.ExcludeInvalid(conf.VoterConstraints) candidateLeasesMean := sl.CandidateLeases.Mean - source, ok := a.StorePool.GetStoreDescriptor(leaseRepl.StoreID()) + source, ok := storePool.GetStoreDescriptor(leaseRepl.StoreID()) if !ok { return roachpb.ReplicaDescriptor{} } - existing = a.ValidLeaseTargets(ctx, conf, existing, leaseRepl, opts) + existing = a.ValidLeaseTargets(ctx, storePool, conf, existing, leaseRepl, opts) // Short-circuit if there are no valid targets out there. if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseRepl.StoreID()) { log.KvDistribution.VEventf(ctx, 2, "no lease transfer target found for r%d", leaseRepl.GetRangeID()) @@ -1793,7 +1820,7 @@ func (a *Allocator) TransferLeaseTarget( // falls back to `leaseCountConvergence`. Rationalize this or refactor this // logic to be more clear. transferDec, repl := a.shouldTransferLeaseForAccessLocality( - ctx, source, existing, statSummary, nil, candidateLeasesMean, + ctx, storePool, source, existing, statSummary, nil, candidateLeasesMean, ) if !excludeLeaseRepl { switch transferDec { @@ -1803,7 +1830,7 @@ func (a *Allocator) TransferLeaseTarget( } fallthrough case decideWithoutStats: - if !a.shouldTransferLeaseForLeaseCountConvergence(ctx, sl, source, existing) { + if !a.shouldTransferLeaseForLeaseCountConvergence(ctx, storePool, sl, source, existing) { return roachpb.ReplicaDescriptor{} } case shouldTransfer: @@ -1839,7 +1866,7 @@ func (a *Allocator) TransferLeaseTarget( if leaseRepl.StoreID() == repl.StoreID { continue } - storeDesc, ok := a.StorePool.GetStoreDescriptor(repl.StoreID) + storeDesc, ok := storePool.GetStoreDescriptor(repl.StoreID) if !ok { continue } @@ -1996,6 +2023,7 @@ func getQPSDelta(storeQPSMap map[roachpb.StoreID]float64, domain []roachpb.Store // attributes. func (a *Allocator) ShouldTransferLease( ctx context.Context, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor, leaseRepl interface { @@ -2006,11 +2034,12 @@ func (a *Allocator) ShouldTransferLease( }, statSummary *replicastats.RatedSummary, ) bool { - if a.leaseholderShouldMoveDueToPreferences(ctx, conf, leaseRepl, existing) { + if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) { return true } existing = a.ValidLeaseTargets( ctx, + storePool, conf, existing, leaseRepl, @@ -2021,18 +2050,19 @@ func (a *Allocator) ShouldTransferLease( if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseRepl.StoreID()) { return false } - source, ok := a.StorePool.GetStoreDescriptor(leaseRepl.StoreID()) + source, ok := storePool.GetStoreDescriptor(leaseRepl.StoreID()) if !ok { return false } - sl, _, _ := a.StorePool.GetStoreList(storepool.StoreFilterSuspect) + sl, _, _ := storePool.GetStoreList(storepool.StoreFilterSuspect) sl = sl.ExcludeInvalid(conf.Constraints) sl = sl.ExcludeInvalid(conf.VoterConstraints) log.KvDistribution.VEventf(ctx, 3, "ShouldTransferLease (lease-holder=s%d):\n%s", leaseRepl.StoreID(), sl) transferDec, _ := a.shouldTransferLeaseForAccessLocality( ctx, + storePool, source, existing, statSummary, @@ -2046,7 +2076,7 @@ func (a *Allocator) ShouldTransferLease( case shouldTransfer: result = true case decideWithoutStats: - result = a.shouldTransferLeaseForLeaseCountConvergence(ctx, sl, source, existing) + result = a.shouldTransferLeaseForLeaseCountConvergence(ctx, storePool, sl, source, existing) default: log.KvDistribution.Fatalf(ctx, "unexpected transfer decision %d", transferDec) } @@ -2061,6 +2091,7 @@ func (a *Allocator) ShouldTransferLease( // follow-the-workload strategy would still prefer selecting the local store. func (a Allocator) FollowTheWorkloadPrefersLocal( ctx context.Context, + storePool storepool.AllocatorStorePool, sl storepool.StoreList, source roachpb.StoreDescriptor, candidate roachpb.StoreID, @@ -2068,7 +2099,7 @@ func (a Allocator) FollowTheWorkloadPrefersLocal( statSummary *replicastats.RatedSummary, ) bool { adjustments := make(map[roachpb.StoreID]float64) - decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, source, existing, statSummary, adjustments, sl.CandidateLeases.Mean) + decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, storePool, source, existing, statSummary, adjustments, sl.CandidateLeases.Mean) if decision == decideWithoutStats { return false } @@ -2084,6 +2115,7 @@ func (a Allocator) FollowTheWorkloadPrefersLocal( func (a Allocator) shouldTransferLeaseForAccessLocality( ctx context.Context, + storePool storepool.AllocatorStorePool, source roachpb.StoreDescriptor, existing []roachpb.ReplicaDescriptor, statSummary *replicastats.RatedSummary, @@ -2097,7 +2129,7 @@ func (a Allocator) shouldTransferLeaseForAccessLocality( !EnableLoadBasedLeaseRebalancing.Get(&a.st.SV) { return decideWithoutStats, roachpb.ReplicaDescriptor{} } - replicaLocalities := a.StorePool.GetLocalitiesByNode(existing) + replicaLocalities := storePool.GetLocalitiesByNode(existing) for _, locality := range replicaLocalities { if len(locality.Tiers) == 0 { return decideWithoutStats, roachpb.ReplicaDescriptor{} @@ -2152,11 +2184,11 @@ func (a Allocator) shouldTransferLeaseForAccessLocality( if repl.NodeID == source.Node.NodeID { continue } - storeDesc, ok := a.StorePool.GetStoreDescriptor(repl.StoreID) + storeDesc, ok := storePool.GetStoreDescriptor(repl.StoreID) if !ok { continue } - addr, err := a.StorePool.GossipNodeIDAddress(repl.NodeID) + addr, err := storePool.GossipNodeIDAddress(repl.NodeID) if err != nil { log.KvDistribution.Errorf(ctx, "missing address for n%d: %+v", repl.NodeID, err) continue @@ -2273,6 +2305,7 @@ func loadBasedLeaseRebalanceScore( func (a Allocator) shouldTransferLeaseForLeaseCountConvergence( ctx context.Context, + storePool storepool.AllocatorStorePool, sl storepool.StoreList, source roachpb.StoreDescriptor, existing []roachpb.ReplicaDescriptor, @@ -2301,7 +2334,7 @@ func (a Allocator) shouldTransferLeaseForLeaseCountConvergence( } for _, repl := range existing { - storeDesc, ok := a.StorePool.GetStoreDescriptor(repl.StoreID) + storeDesc, ok := storePool.GetStoreDescriptor(repl.StoreID) if !ok { continue } @@ -2316,7 +2349,7 @@ func (a Allocator) shouldTransferLeaseForLeaseCountConvergence( // PreferredLeaseholders returns a slice of replica descriptors corresponding to // replicas that meet lease preferences (among the `existing` replicas). func (a Allocator) PreferredLeaseholders( - conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor, + storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor, ) []roachpb.ReplicaDescriptor { // Go one preference at a time. As soon as we've found replicas that match a // preference, we don't need to look at the later preferences, because @@ -2327,7 +2360,7 @@ func (a Allocator) PreferredLeaseholders( // TODO(a-robinson): Do all these lookups at once, up front? We could // easily be passing a slice of StoreDescriptors around all the Allocator // functions instead of ReplicaDescriptors. - storeDesc, ok := a.StorePool.GetStoreDescriptor(repl.StoreID) + storeDesc, ok := storePool.GetStoreDescriptor(repl.StoreID) if !ok { continue } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 3f1c1f0658a2..1a2ef399fc18 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -558,6 +558,7 @@ func TestAllocatorSimpleRetrieval(t *testing.T) { gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) result, _, err := a.AllocateVoter( ctx, + a.StorePool, simpleSpanConfig, nil /* existingVoters */, nil, /* existingNonVoters */ Dead, @@ -579,6 +580,7 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { defer stopper.Stop(ctx) result, _, err := a.AllocateVoter( ctx, + a.StorePool, simpleSpanConfig, nil /* existingVoters */, nil, /* existingNonVoters */ Dead, @@ -695,6 +697,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) { // Allocate a voter where all replicas are alive (e.g. up-replicating a valid range). add, _, err := a.AllocateVoter( ctx, + a.StorePool, test.conf, nil, nil, @@ -709,6 +712,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) { // Allocate a voter where we have a dead (or decommissioning) replica. add, _, err = a.AllocateVoter( ctx, + a.StorePool, test.conf, nil, nil, @@ -739,6 +743,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { gossiputil.NewStoreGossiper(g).GossipStores(multiDCStores, t) result1, _, err := a.AllocateVoter( ctx, + a.StorePool, multiDCConfigSSD, nil /* existingVoters */, nil, /* existingNonVoters */ Dead, @@ -748,6 +753,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { } result2, _, err := a.AllocateVoter( ctx, + a.StorePool, multiDCConfigSSD, []roachpb.ReplicaDescriptor{{ NodeID: result1.NodeID, @@ -766,6 +772,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { // Verify that no result is forthcoming if we already have a replica. result3, _, err := a.AllocateVoter( ctx, + a.StorePool, multiDCConfigSSD, []roachpb.ReplicaDescriptor{ { @@ -794,6 +801,7 @@ func TestAllocatorExistingReplica(t *testing.T) { gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t) result, _, err := a.AllocateVoter( ctx, + a.StorePool, roachpb.SpanConfig{ NumReplicas: 0, Constraints: []roachpb.ConstraintsConjunction{ @@ -821,6 +829,59 @@ func TestAllocatorExistingReplica(t *testing.T) { } } +func TestAllocatorReplaceDecommissioningReplica(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */) + defer stopper.Stop(ctx) + gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t) + + // Override liveness of n3 to decommissioning so the only available target is s4. + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if nid == roachpb.NodeID(3) { + return livenesspb.NodeLivenessStatus_DECOMMISSIONING + } + + return sp.NodeLivenessFn(nid, now, timeUntilStoreDead) + }) + + result, _, err := a.AllocateVoter( + ctx, + oSp, + roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + Constraints: []roachpb.Constraint{ + {Value: "mem", Type: roachpb.Constraint_PROHIBITED}, + }, + }, + }, + }, + []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + { + NodeID: 2, + StoreID: 2, + ReplicaID: 2, + }, + }, nil, /* existingNonVoters */ + Decommissioning, + ) + if err != nil { + t.Fatalf("Unable to perform allocation: %+v", err) + } + if !(result.StoreID == 4) { + t.Errorf("expected result to have store ID 4: %+v", result) + } +} + func TestAllocatorMultipleStoresPerNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -914,7 +975,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { for _, tc := range testCases { { result, _, err := a.AllocateVoter( - ctx, emptySpanConfig(), tc.existing, nil, + ctx, a.StorePool, emptySpanConfig(), tc.existing, nil, Dead, ) if e, a := tc.expectTargetAllocate, !roachpb.Empty(result); e != a { @@ -929,6 +990,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo target, _, details, ok := a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, tc.existing, @@ -1003,6 +1065,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { for i := 1; i < 40; i++ { add, remove, _, ok := a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, ranges[i].InternalReplicas, @@ -1044,6 +1107,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { for i := 1; i < 40; i++ { _, _, _, ok := a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, ranges[i].InternalReplicas, @@ -1116,6 +1180,7 @@ func TestAllocatorRebalanceBasedOnRangeCount(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo target, _, _, ok := a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: 3, StoreID: 3}}, @@ -1219,6 +1284,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo target, _, _, ok := a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, c.existing, @@ -1501,6 +1567,7 @@ func TestAllocatorRebalanceByQPS(t *testing.T) { } add, remove, _, ok := a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{StoreID: subtest.testStores[0].StoreID}}, @@ -1614,6 +1681,7 @@ func TestAllocatorRemoveBasedOnQPS(t *testing.T) { } remove, _, err := a.RemoveVoter( ctx, + a.StorePool, emptySpanConfig(), subtest.existingRepls, subtest.existingRepls, @@ -1667,6 +1735,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo result, _, _, ok := a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, @@ -1796,6 +1865,7 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) { t.Run("", func(t *testing.T) { target := a.TransferLeaseTarget( ctx, + a.StorePool, emptySpanConfig(), c.existing, &mockRepl{ @@ -1912,6 +1982,7 @@ func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) { t.Run("", func(t *testing.T) { target := a.TransferLeaseTarget( ctx, + a.StorePool, emptySpanConfig(), c.existing, repl, @@ -2003,6 +2074,7 @@ func TestAllocatorTransferLeaseTargetConstraints(t *testing.T) { t.Run("", func(t *testing.T) { target := a.TransferLeaseTarget( context.Background(), + a.StorePool, c.conf, c.existing, &mockRepl{ @@ -2114,6 +2186,7 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { t.Run("", func(t *testing.T) { target := a.TransferLeaseTarget( ctx, + a.StorePool, c.conf, c.existing, &mockRepl{ @@ -2254,6 +2327,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo result, _, details, ok := a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, tc.existing, @@ -2326,6 +2400,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo result, _, details, ok := a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, tc.existing, @@ -2395,6 +2470,7 @@ func TestAllocatorShouldTransferLease(t *testing.T) { t.Run("", func(t *testing.T) { result := a.ShouldTransferLease( ctx, + a.StorePool, emptySpanConfig(), c.existing, &mockRepl{ @@ -2462,6 +2538,7 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { t.Run("", func(t *testing.T) { result := a.ShouldTransferLease( ctx, + a.StorePool, emptySpanConfig(), c.existing, &mockRepl{ @@ -2508,6 +2585,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { t.Helper() result := a.ShouldTransferLease( ctx, + a.StorePool, emptySpanConfig(), replicas(1, 2, 3), &mockRepl{storeID: 2, replicationFactor: 3}, @@ -2648,6 +2726,7 @@ func TestAllocatorLeasePreferences(t *testing.T) { conf := roachpb.SpanConfig{LeasePreferences: c.preferences} result := a.ShouldTransferLease( ctx, + a.StorePool, conf, c.existing, &mockRepl{ @@ -2662,6 +2741,7 @@ func TestAllocatorLeasePreferences(t *testing.T) { } target := a.TransferLeaseTarget( ctx, + a.StorePool, conf, c.existing, &mockRepl{ @@ -2680,6 +2760,7 @@ func TestAllocatorLeasePreferences(t *testing.T) { } target = a.TransferLeaseTarget( ctx, + a.StorePool, conf, c.existing, &mockRepl{ @@ -2772,6 +2853,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { conf := roachpb.SpanConfig{LeasePreferences: c.preferences} target := a.TransferLeaseTarget( ctx, + a.StorePool, conf, c.existing, &mockRepl{ @@ -2791,6 +2873,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { target = a.TransferLeaseTarget( ctx, + a.StorePool, conf, c.existing, &mockRepl{ @@ -2871,6 +2954,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { for _, c := range testCases { targetVoter, details, err := a.RemoveVoter( ctx, + a.StorePool, emptySpanConfig(), c.existingVoters, /* voterCandidates */ c.existingVoters, @@ -2890,6 +2974,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { // diversity score calculations, we would fail here. targetVoter, _, err = a.RemoveVoter( ctx, + a.StorePool, emptySpanConfig(), c.existingVoters, c.existingVoters, @@ -2903,6 +2988,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { targetNonVoter, _, err := a.RemoveNonVoter( ctx, + a.StorePool, emptySpanConfig(), c.existingNonVoters, /* nonVoterCandidates */ c.existingVoters, @@ -2973,7 +3059,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { // Allocate the voting replica first, before the non-voter. This is the // order in which we'd expect the allocator to repair a given range. See // TestAllocatorComputeAction. - voterTarget, _, err := a.AllocateVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters, Dead) + voterTarget, _, err := a.AllocateVoter(ctx, a.StorePool, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldVoterAllocFail { require.Errorf(t, err, "expected voter allocation to fail; got %v as a valid target instead", voterTarget) } else { @@ -2982,7 +3068,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { test.existingVoters = append(test.existingVoters, replicas(voterTarget.StoreID)...) } - nonVoterTarget, _, err := a.AllocateNonVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters, Dead) + nonVoterTarget, _, err := a.AllocateNonVoter(ctx, a.StorePool, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldNonVoterAllocFail { require.Errorf(t, err, "expected non-voter allocation to fail; got %v as a valid target instead", nonVoterTarget) } else { @@ -3056,7 +3142,7 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { StoreID: storeID, } } - targetStore, details, err := a.AllocateVoter(ctx, emptySpanConfig(), existingRepls, nil, Dead) + targetStore, details, err := a.AllocateVoter(ctx, a.StorePool, emptySpanConfig(), existingRepls, nil, Dead) if err != nil { t.Fatal(err) } @@ -3177,6 +3263,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo target, _, details, ok := a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, existingRepls, @@ -3523,7 +3610,7 @@ func TestAllocatorNonVoterAllocationExcludesVoterNodes(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) - result, _, err := a.AllocateNonVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters, Dead) + result, _, err := a.AllocateNonVoter(ctx, a.StorePool, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldFail { require.Error(t, err) require.Regexp(t, test.expError, err) @@ -4144,6 +4231,7 @@ func TestAllocatorRebalanceNonVoters(t *testing.T) { // Enable read disk health checking in candidate exclusion. add, remove, _, ok := a.RebalanceNonVoter( ctx, + a.StorePool, test.conf, nil, test.existingVoters, @@ -4277,6 +4365,7 @@ func TestAllocatorRebalanceReadAmpCheck(t *testing.T) { options.StoreHealthOptions = StoreHealthOptions{EnforcementLevel: test.enforcement, L0SublevelThreshold: 20} add, remove, _, ok := a.RebalanceVoter( ctx, + a.StorePool, test.conf, nil, test.existingVoters, @@ -4334,6 +4423,7 @@ func TestVotersCanRebalanceToNonVoterStores(t *testing.T) { existingVoters := replicas(3, 4) add, remove, _, ok := a.RebalanceVoter( ctx, + a.StorePool, conf, nil, existingVoters, @@ -4393,6 +4483,7 @@ func TestNonVotersCannotRebalanceToVoterStores(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo add, remove, _, ok := a.RebalanceNonVoter( ctx, + a.StorePool, emptySpanConfig(), nil, existingVoters, @@ -5226,6 +5317,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { // the final rebalance choice. target, _, details, ok := a.RebalanceVoter( ctx, + a.StorePool, conf, nil, existingRepls, @@ -5410,6 +5502,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { }, nil) target := a.TransferLeaseTarget( ctx, + a.StorePool, emptySpanConfig(), existing, &mockRepl{ @@ -5615,6 +5708,7 @@ func TestAllocatorRemoveTargetBasedOnCapacity(t *testing.T) { for i := 0; i < 10; i++ { targetRepl, _, err := a.RemoveVoter( ctx, + a.StorePool, emptySpanConfig(), replicas, replicas, @@ -6313,7 +6407,7 @@ func TestAllocatorComputeAction(t *testing.T) { lastPriority := float64(999999999) for i, tcase := range testCases { - action, priority := a.ComputeAction(ctx, tcase.conf, &tcase.desc) + action, priority := a.ComputeAction(ctx, a.StorePool, tcase.conf, &tcase.desc) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %q, got action %q", i, allocatorActionNames[tcase.expectedAction], allocatorActionNames[action]) @@ -6410,7 +6504,109 @@ func TestAllocatorComputeActionRemoveDead(t *testing.T) { for i, tcase := range testCases { mockStorePool(sp, tcase.live, nil, tcase.dead, nil, nil, nil) - action, _ := a.ComputeAction(ctx, conf, &tcase.desc) + action, _ := a.ComputeAction(ctx, a.StorePool, conf, &tcase.desc) + if tcase.expectedAction != action { + t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) + } + } +} + +func TestAllocatorComputeActionWithStorePoolRemoveDead(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + conf := roachpb.SpanConfig{NumReplicas: 3} + threeReplDesc := roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + }, + } + fourReplDesc := threeReplDesc + fourReplDesc.InternalReplicas = append(fourReplDesc.InternalReplicas, roachpb.ReplicaDescriptor{ + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + }) + + // Each test case should describe a repair situation which has a lower + // priority than the previous test case. + testCases := []struct { + desc roachpb.RangeDescriptor + live []roachpb.StoreID + dead []roachpb.StoreID + expectedAction AllocatorAction + }{ + // Needs three replicas, one is dead, and there's no replacement. Since + // there's no replacement we can't do anything, but an action is still + // emitted. + { + desc: threeReplDesc, + live: []roachpb.StoreID{1, 2}, + dead: []roachpb.StoreID{3}, + expectedAction: AllocatorReplaceDeadVoter, + }, + // Needs three replicas, one is dead, but there is a replacement. + { + desc: threeReplDesc, + live: []roachpb.StoreID{1, 2, 4}, + dead: []roachpb.StoreID{3}, + expectedAction: AllocatorReplaceDeadVoter, + }, + // Needs three replicas, two are dead (i.e. the range lacks a quorum). + { + desc: threeReplDesc, + live: []roachpb.StoreID{1, 4}, + dead: []roachpb.StoreID{2, 3}, + expectedAction: AllocatorRangeUnavailable, + }, + // Needs three replicas, has four, one is dead. + { + desc: fourReplDesc, + live: []roachpb.StoreID{1, 2, 4}, + dead: []roachpb.StoreID{3}, + expectedAction: AllocatorRemoveDeadVoter, + }, + // Needs three replicas, has four, two are dead (i.e. the range lacks a quorum). + { + desc: fourReplDesc, + live: []roachpb.StoreID{1, 4}, + dead: []roachpb.StoreID{2, 3}, + expectedAction: AllocatorRangeUnavailable, + }, + } + + ctx := context.Background() + stopper, _, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(ctx) + + for i, tcase := range testCases { + // Mark all dead nodes as alive, so we can override later. + all := append(tcase.live, tcase.dead...) + mockStorePool(sp, all, nil, nil, nil, nil, nil) + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + for _, deadStoreID := range tcase.dead { + if nid == roachpb.NodeID(deadStoreID) { + return livenesspb.NodeLivenessStatus_DEAD + } + } + + return sp.NodeLivenessFn(nid, now, timeUntilStoreDead) + }) + action, _ := a.ComputeAction(ctx, oSp, conf, &tcase.desc) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) } @@ -6483,7 +6679,7 @@ func TestAllocatorComputeActionSuspect(t *testing.T) { for i, tcase := range testCases { mockStorePool(sp, tcase.live, nil, nil, nil, nil, tcase.suspect) - action, _ := a.ComputeAction(ctx, conf, &tcase.desc) + action, _ := a.ComputeAction(ctx, a.StorePool, conf, &tcase.desc) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) } @@ -6762,7 +6958,304 @@ func TestAllocatorComputeActionDecommission(t *testing.T) { for i, tcase := range testCases { mockStorePool(sp, tcase.live, nil, tcase.dead, tcase.decommissioning, tcase.decommissioned, nil) - action, _ := a.ComputeAction(ctx, tcase.conf, &tcase.desc) + action, _ := a.ComputeAction(ctx, a.StorePool, tcase.conf, &tcase.desc) + if tcase.expectedAction != action { + t.Errorf("Test case %d expected action %s, got action %s", i, tcase.expectedAction, action) + continue + } + } +} + +func TestAllocatorComputeActionWithStorePoolDecommission(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + conf roachpb.SpanConfig + desc roachpb.RangeDescriptor + expectedAction AllocatorAction + live []roachpb.StoreID + dead []roachpb.StoreID + decommissioning []roachpb.StoreID + decommissioned []roachpb.StoreID + }{ + // Has three replicas, but one is in decommissioning status. We can't + // replace it (nor add a new replica) since there isn't a live target, + // but that's still the action being emitted. + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + }, + }, + expectedAction: AllocatorReplaceDecommissioningVoter, + live: []roachpb.StoreID{1, 2}, + dead: nil, + decommissioning: []roachpb.StoreID{3}, + }, + // Has three replicas, one is in decommissioning status, and one is on a + // dead node. Replacing the dead replica is more important. + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + }, + }, + expectedAction: AllocatorReplaceDeadVoter, + live: []roachpb.StoreID{1}, + dead: []roachpb.StoreID{2}, + decommissioning: []roachpb.StoreID{3}, + }, + // Needs three replicas, has four, where one is decommissioning and one is + // dead. + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + }, + }, + }, + expectedAction: AllocatorRemoveDeadVoter, + live: []roachpb.StoreID{1, 4}, + dead: []roachpb.StoreID{2}, + decommissioning: []roachpb.StoreID{3}, + }, + // Needs three replicas, has four, where one is decommissioning and one is + // decommissioned. + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + }, + }, + }, + expectedAction: AllocatorRemoveDeadVoter, + live: []roachpb.StoreID{1, 4}, + dead: nil, + decommissioning: []roachpb.StoreID{3}, + decommissioned: []roachpb.StoreID{2}, + }, + // Needs three replicas, has three, all decommissioning + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + }, + }, + expectedAction: AllocatorReplaceDecommissioningVoter, + live: nil, + dead: nil, + decommissioning: []roachpb.StoreID{1, 2, 3}, + }, + // Needs 3. Has 1 live, 3 decommissioning. + { + conf: roachpb.SpanConfig{NumReplicas: 3}, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + }, + }, + }, + expectedAction: AllocatorRemoveDecommissioningVoter, + live: []roachpb.StoreID{4}, + dead: nil, + decommissioning: []roachpb.StoreID{1, 2, 3}, + }, + { + conf: roachpb.SpanConfig{ + NumVoters: 1, + NumReplicas: 3, + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + Type: roachpb.NON_VOTER, + }, + { + StoreID: 6, + NodeID: 6, + ReplicaID: 6, + Type: roachpb.NON_VOTER, + }, + { + StoreID: 7, + NodeID: 7, + ReplicaID: 7, + Type: roachpb.NON_VOTER, + }, + }, + }, + expectedAction: AllocatorRemoveDecommissioningNonVoter, + live: []roachpb.StoreID{1, 4, 6}, + dead: nil, + decommissioning: []roachpb.StoreID{7}, + }, + { + conf: roachpb.SpanConfig{ + NumVoters: 1, + NumReplicas: 3, + }, + desc: roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 4, + NodeID: 4, + ReplicaID: 4, + Type: roachpb.NON_VOTER, + }, + { + StoreID: 6, + NodeID: 6, + ReplicaID: 6, + Type: roachpb.NON_VOTER, + }, + }, + }, + expectedAction: AllocatorReplaceDecommissioningNonVoter, + live: []roachpb.StoreID{1, 2, 3, 4, 6}, + dead: nil, + decommissioning: []roachpb.StoreID{4}, + }, + } + + ctx := context.Background() + stopper, _, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(ctx) + + for i, tcase := range testCases { + // Mark all decommissioning and decommissioned nodes as alive, so we can override later. + all := append(tcase.live, tcase.decommissioning...) + all = append(all, tcase.decommissioned...) + overrideLivenessMap := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) + for _, sID := range tcase.decommissioned { + overrideLivenessMap[roachpb.NodeID(sID)] = livenesspb.NodeLivenessStatus_DECOMMISSIONED + } + for _, sID := range tcase.decommissioning { + overrideLivenessMap[roachpb.NodeID(sID)] = livenesspb.NodeLivenessStatus_DECOMMISSIONING + } + mockStorePool(sp, all, nil, tcase.dead, nil, nil, nil) + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if liveness, ok := overrideLivenessMap[nid]; ok { + return liveness + } + + return sp.NodeLivenessFn(nid, now, timeUntilStoreDead) + }) + action, _ := a.ComputeAction(ctx, oSp, tcase.conf, &tcase.desc) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %s, got action %s", i, tcase.expectedAction, action) continue @@ -6798,7 +7291,7 @@ func TestAllocatorRemoveLearner(t *testing.T) { defer stopper.Stop(ctx) live, dead := []roachpb.StoreID{1, 2}, []roachpb.StoreID{3} mockStorePool(sp, live, nil, dead, nil, nil, nil) - action, _ := a.ComputeAction(ctx, conf, &rangeWithLearnerDesc) + action, _ := a.ComputeAction(ctx, a.StorePool, conf, &rangeWithLearnerDesc) require.Equal(t, AllocatorRemoveLearner, action) } @@ -7005,7 +7498,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { effectiveNumReplicas := GetNeededVoters(conf.NumReplicas, clusterNodes) require.Equal(t, c.expectedNumReplicas, effectiveNumReplicas, "clusterNodes=%d", clusterNodes) - action, _ := a.ComputeAction(ctx, conf, &desc) + action, _ := a.ComputeAction(ctx, a.StorePool, conf, &desc) require.Equal(t, c.expectedAction.String(), action.String()) }) } @@ -7089,7 +7582,7 @@ func TestAllocatorComputeActionNoStorePool(t *testing.T) { defer log.Scope(t).Close(t) a := MakeAllocator(nil, nil, nil, nil) - action, priority := a.ComputeAction(context.Background(), roachpb.SpanConfig{}, nil) + action, priority := a.ComputeAction(context.Background(), a.StorePool, roachpb.SpanConfig{}, nil) if action != AllocatorNoop { t.Errorf("expected AllocatorNoop, but got %v", action) } @@ -7421,6 +7914,7 @@ func TestAllocatorRebalanceDeterminism(t *testing.T) { // replica count. add, remove, _, _ := a.RebalanceVoter( ctx, + a.StorePool, roachpb.TestingDefaultSpanConfig(), nil, replicas(1, 2, 5), @@ -7491,6 +7985,7 @@ func TestAllocatorRebalanceWithScatter(t *testing.T) { // replica count. _, _, _, ok := a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, replicas(1), @@ -7504,6 +7999,7 @@ func TestAllocatorRebalanceWithScatter(t *testing.T) { // Ensure that we would produce a rebalance target when running with scatter. _, _, _, ok = a.RebalanceVoter( ctx, + a.StorePool, emptySpanConfig(), nil, replicas(1), @@ -7618,6 +8114,7 @@ func TestAllocatorRebalanceAway(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo actual, _, _, ok := a.RebalanceVoter( ctx, + a.StorePool, roachpb.SpanConfig{Constraints: []roachpb.ConstraintsConjunction{constraints}}, nil, existingReplicas, @@ -7795,6 +8292,7 @@ func TestAllocatorFullDisks(t *testing.T) { var rangeUsageInfo allocator.RangeUsageInfo target, _, details, ok := alloc.RebalanceVoter( ctx, + alloc.StorePool, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, @@ -7841,6 +8339,7 @@ func Example_rangeCountRebalancing() { var rangeUsageInfo allocator.RangeUsageInfo target, _, details, ok := alloc.RebalanceVoter( ctx, + alloc.StorePool, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, @@ -7948,6 +8447,7 @@ func qpsBasedRebalanceFn( var rangeUsageInfo allocator.RangeUsageInfo add, remove, details, ok := alloc.RebalanceVoter( ctx, + alloc.StorePool, emptySpanConfig(), nil, []roachpb.ReplicaDescriptor{{NodeID: candidate.Node.NodeID, StoreID: candidate.StoreID}}, @@ -8368,7 +8868,7 @@ func TestNonVoterPrioritizationInVoterAdditions(t *testing.T) { } for i, tc := range testCases { - result, _, _ := a.AllocateVoter(ctx, tc.spanConfig, tc.existingVoters, tc.existingNonVoters, Alive) + result, _, _ := a.AllocateVoter(ctx, a.StorePool, tc.spanConfig, tc.existingVoters, tc.existingNonVoters, Alive) assert.Equal(t, tc.expectedTargetAllocate, result, "Unexpected replication target returned by allocate voter in test %d", i) } } diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go index cdd3d2f2a07f..670abfc2ae56 100644 --- a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go @@ -89,6 +89,21 @@ func (o *OverrideStorePool) GetStoreListFromIDs( return o.sp.getStoreListFromIDsLocked(storeIDs, o.overrideNodeLivenessFn, filter) } +// GetStoreListForTargets implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetStoreListForTargets( + candidates []roachpb.ReplicationTarget, filter StoreFilter, +) (StoreList, int, ThrottledStoreReasons) { + o.sp.DetailsMu.Lock() + defer o.sp.DetailsMu.Unlock() + + storeIDs := make(roachpb.StoreIDSlice, 0, len(candidates)) + for _, tgt := range candidates { + storeIDs = append(storeIDs, tgt.StoreID) + } + + return o.sp.getStoreListFromIDsLocked(storeIDs, o.overrideNodeLivenessFn, filter) +} + // LiveAndDeadReplicas implements the AllocatorStorePool interface. func (o *OverrideStorePool) LiveAndDeadReplicas( repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool, diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 7ff75c231574..6b99c90b3972 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -385,6 +385,14 @@ type AllocatorStorePool interface { filter StoreFilter, ) (StoreList, int, ThrottledStoreReasons) + // GetStoreListForTargets is the same as GetStoreList, but only returns stores + // from the subset of stores present in the passed in replication targets, + // converting to a StoreList. + GetStoreListForTargets( + candidates []roachpb.ReplicationTarget, + filter StoreFilter, + ) (StoreList, int, ThrottledStoreReasons) + // GossipNodeIDAddress looks up the RPC address for the given node via gossip. GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) @@ -1072,6 +1080,24 @@ func (sp *StorePool) GetStoreListFromIDs( return sp.getStoreListFromIDsLocked(storeIDs, sp.NodeLivenessFn, filter) } +// GetStoreListForTargets is the same as GetStoreList, but only returns stores +// from the subset of stores present in the passed in replication targets, +// converting to a StoreList. +func (sp *StorePool) GetStoreListForTargets( + candidates []roachpb.ReplicationTarget, + filter StoreFilter, +) (StoreList, int, ThrottledStoreReasons) { + sp.DetailsMu.Lock() + defer sp.DetailsMu.Unlock() + + storeIDs := make(roachpb.StoreIDSlice, 0, len(candidates)) + for _, tgt := range candidates { + storeIDs = append(storeIDs, tgt.StoreID) + } + + return sp.getStoreListFromIDsLocked(storeIDs, sp.NodeLivenessFn, filter) +} + // getStoreListFromIDsRLocked is the same function as GetStoreList but requires // that the detailsMU read lock is held. func (sp *StorePool) getStoreListFromIDsLocked( diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index 16472a13da0e..69ae59877e46 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -180,6 +180,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { for i := 0; i < 10; i++ { result, _, details, ok := a.RebalanceVoter( ctx, + a.StorePool, roachpb.SpanConfig{}, status, replicas, @@ -205,6 +206,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { for i := 0; i < 10; i++ { target, _, details, ok := a.RebalanceVoter( ctx, + a.StorePool, roachpb.SpanConfig{}, status, replicas, @@ -224,6 +226,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { for i := 0; i < 10; i++ { target, origin, details, ok := a.RebalanceVoter( ctx, + a.StorePool, roachpb.SpanConfig{}, status, replicas, @@ -252,14 +255,14 @@ func TestAllocatorThrottled(t *testing.T) { defer stopper.Stop(ctx) // First test to make sure we would send the replica to purgatory. - _, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + _, _, err := a.AllocateVoter(ctx, a.StorePool, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); !ok { t.Fatalf("expected a purgatory error, got: %+v", err) } // Second, test the normal case in which we can allocate to the store. gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) - result, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + result, _, err := a.AllocateVoter(ctx, a.StorePool, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if err != nil { t.Fatalf("unable to perform allocation: %+v", err) } @@ -277,7 +280,7 @@ func TestAllocatorThrottled(t *testing.T) { } storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour) storePool.DetailsMu.Unlock() - _, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + _, _, err = a.AllocateVoter(ctx, a.StorePool, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); ok { t.Fatalf("expected a non purgatory error, got: %+v", err) } diff --git a/pkg/kv/kvserver/asim/queue/replicate_queue.go b/pkg/kv/kvserver/asim/queue/replicate_queue.go index e61a4f1edf71..db9b9a700bd3 100644 --- a/pkg/kv/kvserver/asim/queue/replicate_queue.go +++ b/pkg/kv/kvserver/asim/queue/replicate_queue.go @@ -59,7 +59,7 @@ func (rq *replicateQueue) MaybeAdd( return false } - action, priority := rq.allocator.ComputeAction(ctx, rng.SpanConfig(), rng.Descriptor()) + action, priority := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, rng.SpanConfig(), rng.Descriptor()) if action == allocatorimpl.AllocatorNoop { return false } @@ -98,7 +98,7 @@ func (rq *replicateQueue) Tick(ctx context.Context, tick time.Time, s state.Stat return } - action, _ := rq.allocator.ComputeAction(ctx, rng.SpanConfig(), rng.Descriptor()) + action, _ := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, rng.SpanConfig(), rng.Descriptor()) switch action { case allocatorimpl.AllocatorConsiderRebalance: @@ -125,6 +125,7 @@ func (rq *replicateQueue) considerRebalance( ) { add, remove, _, ok := rq.allocator.RebalanceVoter( ctx, + rq.allocator.StorePool, rng.SpanConfig(), nil, /* raftStatus */ rng.Descriptor().Replicas().VoterDescriptors(), diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 4d78f82f54e2..9760248e6104 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3339,6 +3339,7 @@ func RelocateOne( additionTarget, _ = allocator.AllocateTargetFromList( ctx, + allocator.StorePool, candidateStoreList, conf, existingVoters, @@ -3407,10 +3408,14 @@ func RelocateOne( // (s1,s2,s3,s4) which is a reasonable request; that replica set is // overreplicated. If we asked it instead to remove s3 from (s1,s2,s3) it // may not want to do that due to constraints. + candidatesStoreList, _, _ := allocator.StorePool.GetStoreListForTargets( + args.targetsToRemove(), storepool.StoreFilterNone, + ) targetStore, _, err := allocator.RemoveTarget( ctx, + allocator.StorePool, conf, - allocator.StoreListForTargets(args.targetsToRemove()), + candidatesStoreList, existingVoters, existingNonVoters, args.targetType, @@ -3678,7 +3683,7 @@ func (r *Replica) adminScatter( if args.RandomizeLeases && r.OwnsValidLease(ctx, r.store.Clock().NowAsClockTimestamp()) { desc := r.Desc() potentialLeaseTargets := r.store.allocator.ValidLeaseTargets( - ctx, r.SpanConfig(), desc.Replicas().VoterDescriptors(), r, allocator.TransferLeaseOptions{}) + ctx, r.store.allocator.StorePool, r.SpanConfig(), desc.Replicas().VoterDescriptors(), r, allocator.TransferLeaseOptions{}) if len(potentialLeaseTargets) > 0 { newLeaseholderIdx := rand.Intn(len(potentialLeaseTargets)) targetStoreID := potentialLeaseTargets[newLeaseholderIdx].StoreID diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index c5755ded92d7..424191721fd9 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -611,7 +611,7 @@ func (rq *replicateQueue) shouldQueue( ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader, ) (shouldQueue bool, priority float64) { desc, conf := repl.DescAndSpanConfig() - action, priority := rq.allocator.ComputeAction(ctx, conf, desc) + action, priority := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, conf, desc) if action == allocatorimpl.AllocatorNoop { log.KvDistribution.VEventf(ctx, 2, "no action to take") @@ -627,6 +627,7 @@ func (rq *replicateQueue) shouldQueue( rangeUsageInfo := rangeUsageInfoForRepl(repl) _, _, _, ok := rq.allocator.RebalanceVoter( ctx, + rq.allocator.StorePool, conf, repl.RaftStatus(), voterReplicas, @@ -641,6 +642,7 @@ func (rq *replicateQueue) shouldQueue( } _, _, _, ok = rq.allocator.RebalanceNonVoter( ctx, + rq.allocator.StorePool, conf, repl.RaftStatus(), voterReplicas, @@ -660,6 +662,7 @@ func (rq *replicateQueue) shouldQueue( if rq.canTransferLeaseFrom(ctx, repl) && rq.allocator.ShouldTransferLease( ctx, + rq.allocator.StorePool, conf, voterReplicas, repl, @@ -1036,7 +1039,7 @@ func (rq *replicateQueue) PlanOneChange( // unavailability; see: _ = execChangeReplicasTxn - action, allocatorPrio := rq.allocator.ComputeAction(ctx, conf, desc) + action, allocatorPrio := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, conf, desc) log.KvDistribution.VEventf(ctx, 1, "next replica action: %s", action) var err error @@ -1249,7 +1252,7 @@ func (rq *replicateQueue) addOrReplaceVoters( // we're removing it (i.e. dead or decommissioning). If we left the replica in // the slice, the allocator would not be guaranteed to pick a replica that // fills the gap removeRepl leaves once it's gone. - newVoter, details, err := rq.allocator.AllocateVoter(ctx, conf, remainingLiveVoters, remainingLiveNonVoters, replicaStatus) + newVoter, details, err := rq.allocator.AllocateVoter(ctx, rq.allocator.StorePool, conf, remainingLiveVoters, remainingLiveNonVoters, replicaStatus) if err != nil { return nil, err } @@ -1281,7 +1284,7 @@ func (rq *replicateQueue) addOrReplaceVoters( oldPlusNewReplicas, roachpb.ReplicaDescriptor{NodeID: newVoter.NodeID, StoreID: newVoter.StoreID}, ) - _, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus) + _, _, err := rq.allocator.AllocateVoter(ctx, rq.allocator.StorePool, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus) if err != nil { // It does not seem possible to go to the next odd replica state. Note // that AllocateVoter returns an allocatorError (a PurgatoryError) @@ -1362,7 +1365,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( existingNonVoters := desc.Replicas().NonVoterDescriptors() effects := effectBuilder{} - newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, conf, liveVoterReplicas, liveNonVoterReplicas, replicaStatus) + newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, rq.allocator.StorePool, conf, liveVoterReplicas, liveNonVoterReplicas, replicaStatus) if err != nil { return nil, err } @@ -1485,6 +1488,7 @@ func (rq *replicateQueue) findRemoveVoter( return rq.allocator.RemoveVoter( ctx, + rq.allocator.StorePool, zone, candidates, existingVoters, @@ -1527,6 +1531,7 @@ func (rq *replicateQueue) maybeTransferLeaseAwayTarget( // a replica needs to be removed for constraint violations. target := rq.allocator.TransferLeaseTarget( ctx, + rq.allocator.StorePool, conf, desc.Replicas().VoterDescriptors(), repl, @@ -1606,6 +1611,7 @@ func (rq *replicateQueue) removeNonVoter( _, conf := repl.DescAndSpanConfig() removeNonVoter, details, err := rq.allocator.RemoveNonVoter( ctx, + rq.allocator.StorePool, conf, existingNonVoters, existingVoters, @@ -1765,6 +1771,7 @@ func (rq *replicateQueue) considerRebalance( rangeUsageInfo := rangeUsageInfoForRepl(repl) addTarget, removeTarget, details, ok := rq.allocator.RebalanceVoter( ctx, + rq.allocator.StorePool, conf, repl.RaftStatus(), existingVoters, @@ -1779,6 +1786,7 @@ func (rq *replicateQueue) considerRebalance( log.KvDistribution.Infof(ctx, "no suitable rebalance target for voters") addTarget, removeTarget, details, ok = rq.allocator.RebalanceNonVoter( ctx, + rq.allocator.StorePool, conf, repl.RaftStatus(), existingVoters, @@ -1962,6 +1970,7 @@ func (rq *replicateQueue) shedLease( // so only consider the `VoterDescriptors` replicas. target := rq.allocator.TransferLeaseTarget( ctx, + rq.allocator.StorePool, conf, desc.Replicas().VoterDescriptors(), repl, diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 51b7d628fe41..65dba708ec21 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -669,6 +669,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( candidate := sr.allocator.TransferLeaseTarget( ctx, + sr.allocator.StorePool, conf, candidates, candidateReplica, @@ -694,6 +695,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( filteredStoreList := rctx.allStoresList.ExcludeInvalid(conf.VoterConstraints) if sr.allocator.FollowTheWorkloadPrefersLocal( ctx, + sr.allocator.StorePool, filteredStoreList, rctx.LocalDesc, candidate.StoreID, @@ -864,6 +866,7 @@ func (sr *StoreRebalancer) chooseRangeToRebalance( // misconfiguration. validTargets := sr.allocator.ValidLeaseTargets( ctx, + sr.allocator.StorePool, rebalanceCtx.conf, targetVoterRepls, rebalanceCtx.candidateReplica, @@ -935,6 +938,7 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS( // `AdminRelocateRange` so that these decisions show up in system.rangelog add, remove, _, shouldRebalance := sr.allocator.RebalanceTarget( ctx, + sr.allocator.StorePool, rbCtx.conf, rbCtx.candidateReplica.RaftStatus(), finalVoterTargets, @@ -999,6 +1003,7 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS( for i := 0; i < len(finalNonVoterTargets); i++ { add, remove, _, shouldRebalance := sr.allocator.RebalanceTarget( ctx, + sr.allocator.StorePool, rbCtx.conf, rbCtx.candidateReplica.RaftStatus(), finalVoterTargets,