From b04d70365d809fb67f5e24f9c59b3f14a1785281 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Mon, 3 May 2021 05:21:23 -0400 Subject: [PATCH] kvserver: actuate load-based replica rebalancing under heterogeneous localities This commit teaches the `StoreRebalancer` to make load-based rebalancing decisions that are meaningful within the context of the replication constraints placed on the ranges being relocated and the set of stores that can legally receive replicas for such ranges. Previously, the `StoreRebalancer` would compute the QPS underfull and overfull thresholds based on the overall average QPS being served by all stores in the cluster. Notably, this included stores that were in replication zones that would not satisfy required constraints for the range being considered for rebalancing. This meant that the store rebalancer would effectively never be able to rebalance ranges within the stores inside heavily loaded replication zones (since all the _valid_ stores would be above the overfull thresholds). This patch is a move away from the bespoke relocation logic in the `StoreRebalancer`. Instead, we have the `StoreRebalancer` rely on the rebalancing logic used by the `replicateQueue` that already has the machinery to compute load based signals for candidates _relative to other comparable stores_. The main difference here is that the `StoreRebalancer` uses this machinery to promote convergence of QPS across stores, whereas the `replicateQueue` uses it to promote convergence of range counts. A series of preceeding commits in this patchset generalize the existing replica rebalancing logic, and this commit teaches the `StoreRebalancer` to use it. This generalization also addresses another key limitation (see #62922) of the `StoreRebalancer` regarding its inability to make partial improvements to a range. Previously, if the `StoreRebalancer` couldn't move a range _entirely_ off of overfull stores, it would give up and not even move the subset of replicas it could. This is no longer the case. Resolves #61883 Resolves #62992 /cc @cockroachdb/kv Release note (performance improvement): QPS-based replica rebalancing is now aware of different constraints placed on different replication zones. This means that heterogeneously loaded replication zones (for instance, regions) will achieve a more even distribution of QPS within the stores inside each such zone. --- pkg/kv/kvserver/allocator.go | 2 +- pkg/kv/kvserver/allocator_scorer.go | 43 +- pkg/kv/kvserver/store_pool.go | 5 +- pkg/kv/kvserver/store_rebalancer.go | 520 +++++--------- pkg/kv/kvserver/store_rebalancer_test.go | 847 ++++++++++++++--------- 5 files changed, 730 insertions(+), 687 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 5843a71920c0..f7af510ed0a0 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -1088,7 +1088,7 @@ func (a Allocator) rebalanceTarget( return zero, zero, "", false } - // Add a fake new replica to our copy of the range descriptor so that we can + // Add a fake new replica to our copy of the replica descriptor so that we can // simulate the removal logic. If we decide not to go with this target, note // that this needs to be removed from desc before we try any other target. newReplica := roachpb.ReplicaDescriptor{ diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index 4fd8300e6577..a85817e85247 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -87,10 +87,14 @@ var rangeRebalanceThreshold = func() *settings.FloatSetting { return s }() +// TODO(aayush): Maybe turn this into an interface with one implementation +// that cares about range count and another that cares about QPS, so its +// impossible to misuse. type scorerOptions struct { deterministic bool rangeRebalanceThreshold float64 - qpsRebalanceThreshold float64 // only considered if non-zero + // Only used if `rangeRebalanceThreshold` is not set. + qpsRebalanceThreshold float64 } // candidate store for allocation. @@ -443,26 +447,11 @@ func rankedCandidateListForAllocation( } diversityScore := diversityAllocateScore(s, existingStoreLocalities) balanceScore := balanceScore(candidateStores, s.Capacity, options) - var convergesScore int - if options.qpsRebalanceThreshold > 0 { - if s.Capacity.QueriesPerSecond < underfullQPSThreshold( - options, candidateStores.candidateQueriesPerSecond.mean) { - convergesScore = 1 - } else if s.Capacity.QueriesPerSecond < candidateStores.candidateQueriesPerSecond.mean { - convergesScore = 0 - } else if s.Capacity.QueriesPerSecond < overfullQPSThreshold( - options, candidateStores.candidateQueriesPerSecond.mean) { - convergesScore = -1 - } else { - convergesScore = -2 - } - } candidates = append(candidates, candidate{ store: s, valid: constraintsOK, necessary: necessary, diversityScore: diversityScore, - convergesScore: convergesScore, balanceScore: balanceScore, rangeCount: int(s.Capacity.RangeCount), }) @@ -793,11 +782,11 @@ func rankedCandidateListForRebalancing( existingCandidates = append(existingCandidates, existing) continue } - balanceScore := balanceScore(comparable.sl, existing.store.Capacity, options) // Similarly to in candidateListForRemoval, any replica whose // removal would not converge the range stats to their mean is given a // constraint score boost of 1 to make it less attractive for removal. convergesScore := rebalanceFromConvergesScore(comparable.sl, existing.store.Capacity, options) + balanceScore := balanceScore(comparable.sl, existing.store.Capacity, options) existing.convergesScore = convergesScore existing.balanceScore = balanceScore existing.rangeCount = int(existing.store.Capacity.RangeCount) @@ -1315,7 +1304,7 @@ func diversityAllocateScore( } // diversityRemovalScore returns a value between 0 and 1 based on how desirable -// it would be to remove a node's replica of a range. A higher score indicates +// it would be to remove a store's replica of a range. A higher score indicates // that the node is a better fit (i.e. keeping it around is good for diversity). func diversityRemovalScore( storeID roachpb.StoreID, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, @@ -1323,8 +1312,9 @@ func diversityRemovalScore( var sumScore float64 var numSamples int locality := existingStoreLocalities[storeID] - // We don't need to calculate the overall diversityScore for the range, because the original overall diversityScore - // of this range is always the same. + // We don't need to calculate the overall diversityScore for the range, + // because the original overall diversityScore of this range is always the + // same. for otherStoreID, otherLocality := range existingStoreLocalities { if otherStoreID == storeID { continue @@ -1340,11 +1330,10 @@ func diversityRemovalScore( } // diversityRebalanceScore returns a value between 0 and 1 based on how -// desirable it would be to rebalance away from an existing store to the target -// store. Because the store to be removed isn't specified, this assumes that +// desirable it would be to rebalance to `store` from any of the existing +// stores. Because the store to be removed isn't specified, this assumes that // the worst-fitting store from a diversity perspective will be removed. A -// higher score indicates that the provided store is a better fit for the -// range. +// higher score indicates that the provided store is a better fit for the range. func diversityRebalanceScore( store roachpb.StoreDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, ) float64 { @@ -1372,20 +1361,20 @@ func diversityRebalanceScore( func diversityRebalanceFromScore( store roachpb.StoreDescriptor, fromStoreID roachpb.StoreID, - existingNodeLocalities map[roachpb.StoreID]roachpb.Locality, + existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, ) float64 { // Compute the pairwise diversity score of all replicas that will exist // after adding store and removing fromNodeID. var sumScore float64 var numSamples int - for storeID, locality := range existingNodeLocalities { + for storeID, locality := range existingStoreLocalities { if storeID == fromStoreID { continue } newScore := store.Locality().DiversityScore(locality) sumScore += newScore numSamples++ - for otherStoreID, otherLocality := range existingNodeLocalities { + for otherStoreID, otherLocality := range existingStoreLocalities { // Only compare pairs of replicas where otherNodeID > nodeID to avoid // computing the diversity score between each pair of localities twice. if otherStoreID <= storeID || otherStoreID == fromStoreID { diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 55bcfdf3a1cf..35420e91b1d7 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -784,8 +784,9 @@ func (sl StoreList) String() string { return buf.String() } -// filter takes a store list and filters it using the passed in constraints. It -// maintains the original order of the passed in store list. +// filter takes a store list and removes stores that would be explicitly invalid +// under the given set of constraints. It maintains the original order of the +// passed in store list. func (sl StoreList) filter(constraints []zonepb.ConstraintsConjunction) StoreList { if len(constraints) == 0 { return sl diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 77fd54b3cc91..565b619c96cd 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -12,7 +12,6 @@ package kvserver import ( "context" - "fmt" "math" "math/rand" "sort" @@ -28,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/redact" "go.etcd.io/etcd/raft/v3" ) @@ -206,20 +204,43 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { }) } +// NB: The StoreRebalancer only cares about the convergence of QPS across +// stores, not the convergence of range count. So, we don't use the allocator's +// `scorerOptions` here, which sets the range count rebalance threshold. +// Instead, we use our own implementation of `scorerOptions` that promotes QPS +// balance. +func (sr *StoreRebalancer) scorerOptions() scorerOptions { + return scorerOptions{ + deterministic: sr.rq.allocator.storePool.deterministic, + qpsRebalanceThreshold: qpsRebalanceThreshold.Get(&sr.st.SV), + } +} + +// rebalanceStore iterates through the top K hottest ranges on this store and +// for each such range, performs a lease transfer if it determines that that +// will improve QPS balance across the stores in the cluster. After it runs out +// of leases to transfer away (i.e. because it couldn't find better +// replacements), it considers these ranges for replica rebalancing. +// +// TODO(aayush): We don't try to move replicas or leases away from the local +// store unless it is fielding more than the overfull threshold of QPS based off +// of all the stores in the cluster. Is this desirable? Should we be more +// aggressive? func (sr *StoreRebalancer) rebalanceStore( - ctx context.Context, mode LBRebalancingMode, storeList StoreList, + ctx context.Context, mode LBRebalancingMode, allStoresList StoreList, ) { // First check if we should transfer leases away to better balance QPS. options := scorerOptions{ qpsRebalanceThreshold: qpsRebalanceThreshold.Get(&sr.st.SV), } - qpsMinThreshold := underfullQPSThreshold(options, storeList.candidateQueriesPerSecond.mean) - qpsMaxThreshold := overfullQPSThreshold(options, storeList.candidateQueriesPerSecond.mean) + qpsMinThreshold := underfullQPSThreshold(options, allStoresList.candidateQueriesPerSecond.mean) + qpsMaxThreshold := overfullQPSThreshold(options, allStoresList.candidateQueriesPerSecond.mean) var localDesc *roachpb.StoreDescriptor - for i := range storeList.stores { - if storeList.stores[i].StoreID == sr.rq.store.StoreID() { - localDesc = &storeList.stores[i] + for i := range allStoresList.stores { + if allStoresList.stores[i].StoreID == sr.rq.store.StoreID() { + localDesc = &allStoresList.stores[i] + break } } if localDesc == nil { @@ -229,21 +250,21 @@ func (sr *StoreRebalancer) rebalanceStore( if !(localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold) { log.VEventf(ctx, 1, "local QPS %.2f is below max threshold %.2f (mean=%.2f); no rebalancing needed", - localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold, storeList.candidateQueriesPerSecond.mean) + localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold, allStoresList.candidateQueriesPerSecond.mean) return } var replicasToMaybeRebalance []replicaWithStats - storeMap := storeListToMap(storeList) + storeMap := storeListToMap(allStoresList) log.Infof(ctx, "considering load-based lease transfers for s%d with %.2f qps (mean=%.2f, upperThreshold=%.2f)", - localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold) + localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, allStoresList.candidateQueriesPerSecond.mean, qpsMaxThreshold) hottestRanges := sr.replRankings.topQPS() for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold { replWithStats, target, considerForRebalance := sr.chooseLeaseToTransfer( - ctx, &hottestRanges, localDesc, storeList, storeMap, qpsMinThreshold, qpsMaxThreshold) + ctx, &hottestRanges, localDesc, allStoresList, storeMap, qpsMinThreshold, qpsMaxThreshold) replicasToMaybeRebalance = append(replicasToMaybeRebalance, considerForRebalance...) if replWithStats.repl == nil { break @@ -274,7 +295,7 @@ func (sr *StoreRebalancer) rebalanceStore( if !(localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold) { log.Infof(ctx, "load-based lease transfers successfully brought s%d down to %.2f qps (mean=%.2f, upperThreshold=%.2f)", - localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold) + localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, allStoresList.candidateQueriesPerSecond.mean, qpsMaxThreshold) return } @@ -297,10 +318,8 @@ func (sr *StoreRebalancer) rebalanceStore( ctx, &replicasToMaybeRebalance, localDesc, - storeList, - storeMap, - qpsMinThreshold, - qpsMaxThreshold) + allStoresList, + ) if replWithStats.repl == nil { log.Infof(ctx, "ran out of replicas worth transferring and qps (%.2f) is still above desired threshold (%.2f); will check again soon", @@ -358,7 +377,7 @@ func (sr *StoreRebalancer) rebalanceStore( log.Infof(ctx, "load-based replica transfers successfully brought s%d down to %.2f qps (mean=%.2f, upperThreshold=%.2f)", - localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold) + localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, allStoresList.candidateQueriesPerSecond.mean, qpsMaxThreshold) } // TODO(a-robinson): Should we take the number of leases on each store into @@ -386,7 +405,13 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance } - if shouldNotMoveAway(ctx, replWithStats, localDesc, now, minQPS) { + if !replWithStats.repl.OwnsValidLease(ctx, now) { + log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) + continue + } + if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { + log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", + replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) continue } @@ -473,35 +498,22 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( } } -// rangeRebalanceContext represents a snapshot of a range's state during the -// StoreRebalancer's attempt to rebalance it based on QPS. +// rangeRebalanceContext represents a snapshot of a replicas's state along with +// the state of the cluster during the StoreRebalancer's attempt to rebalance it +// based on QPS. type rangeRebalanceContext struct { - replWithStats replicaWithStats - rangeDesc *roachpb.RangeDescriptor - zone *zonepb.ZoneConfig - clusterNodes int - numDesiredVoters, numDesiredNonVoters int -} + replWithStats replicaWithStats + rangeDesc *roachpb.RangeDescriptor + zone *zonepb.ZoneConfig -func (rbc *rangeRebalanceContext) numDesiredReplicas(targetType targetReplicaType) int { - switch targetType { - case voterTarget: - return rbc.numDesiredVoters - case nonVoterTarget: - return rbc.numDesiredNonVoters - default: - panic(fmt.Sprintf("unknown targetReplicaType %s", targetType)) - } + qpsThresholdFraction float64 } func (sr *StoreRebalancer) chooseRangeToRebalance( ctx context.Context, hottestRanges *[]replicaWithStats, localDesc *roachpb.StoreDescriptor, - storeList StoreList, - storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, - minQPS float64, - maxQPS float64, + allStoresList StoreList, ) (replWithStats replicaWithStats, voterTargets, nonVoterTargets []roachpb.ReplicationTarget) { now := sr.rq.store.Clock().NowAsClockTimestamp() for { @@ -515,10 +527,6 @@ func (sr *StoreRebalancer) chooseRangeToRebalance( return replicaWithStats{}, nil, nil } - if shouldNotMoveAway(ctx, replWithStats, localDesc, now, minQPS) { - continue - } - // Don't bother moving ranges whose QPS is below some small fraction of the // store's QPS (unless the store has extra ranges to spare anyway). It's // just unnecessary churn with no benefit to move ranges responsible for, @@ -537,79 +545,49 @@ func (sr *StoreRebalancer) chooseRangeToRebalance( continue } - log.VEventf(ctx, 3, "considering replica rebalance for r%d with %.2f qps", - replWithStats.repl.GetRangeID(), replWithStats.qps) rangeDesc, zone := replWithStats.repl.DescAndZone() clusterNodes := sr.rq.allocator.storePool.ClusterNodeCount() numDesiredVoters := GetNeededVoters(zone.GetNumVoters(), clusterNodes) numDesiredNonVoters := GetNeededNonVoters(numDesiredVoters, int(zone.GetNumNonVoters()), clusterNodes) - if rs := rangeDesc.Replicas(); numDesiredVoters != len(rs.VoterDescriptors()) || - numDesiredNonVoters != len(rs.NonVoterDescriptors()) { - // If the StoreRebalancer is allowed past this point, it may accidentally - // downreplicate and this can cause unavailable ranges. - // - // See: https://github.com/cockroachdb/cockroach/issues/54444#issuecomment-707706553 - log.VEventf(ctx, 3, "range needs up/downreplication; not considering rebalance") + if expected, actual := numDesiredVoters, len(rangeDesc.Replicas().VoterDescriptors()); expected != actual { + log.VEventf( + ctx, + 3, + "r%d is either over or under replicated (expected %d voters, found %d); ignoring", + rangeDesc.RangeID, + expected, + actual, + ) continue } - - rebalanceCtx := rangeRebalanceContext{ - replWithStats: replWithStats, - rangeDesc: rangeDesc, - zone: zone, - clusterNodes: clusterNodes, - numDesiredVoters: numDesiredVoters, - numDesiredNonVoters: numDesiredNonVoters, - } - targetVoterRepls, targetNonVoterRepls := sr.getRebalanceCandidatesBasedOnQPS( - ctx, rebalanceCtx, localDesc, storeMap, storeList, minQPS, maxQPS, - ) - - // If we couldn't find enough valid targets, forget about this range. - // - // TODO(a-robinson): Support more incremental improvements -- move what we - // can if it makes things better even if it isn't great. For example, - // moving one of the other existing replicas that's on a store with less - // qps than the max threshold but above the mean would help in certain - // locality configurations. - if len(targetVoterRepls) < rebalanceCtx.numDesiredVoters { - log.VEventf(ctx, 3, "couldn't find enough voter rebalance targets for r%d (%d/%d)", - rangeDesc.RangeID, len(targetVoterRepls), rebalanceCtx.numDesiredVoters) + if expected, actual := numDesiredNonVoters, len(rangeDesc.Replicas().NonVoterDescriptors()); expected != actual { + log.VEventf( + ctx, + 3, + "r%d is either over or under replicated (expected %d non-voters, found %d); ignoring", + rangeDesc.RangeID, + expected, + actual, + ) continue } - if len(targetNonVoterRepls) < rebalanceCtx.numDesiredNonVoters { - log.VEventf(ctx, 3, "couldn't find enough non-voter rebalance targets for r%d (%d/%d)", - rangeDesc.RangeID, len(targetNonVoterRepls), rebalanceCtx.numDesiredNonVoters) - continue + rebalanceCtx := rangeRebalanceContext{ + replWithStats: replWithStats, + rangeDesc: rangeDesc, + zone: zone, } - // If the new set of replicas has lower diversity scores than the existing - // set, we don't continue with the rebalance. since we want to ensure we - // don't hurt locality diversity just to improve QPS. - // - // 1. Ensure that diversity among voting replicas is not hurt by this - // rebalancing decision. - if sr.worsensDiversity( - ctx, - rangeDesc.GetRangeID(), - rangeDesc.Replicas().VoterDescriptors(), - targetVoterRepls, - true, /* onlyVoters */ - ) { - continue - } - // 2. Ensure that diversity among all replicas is not hurt by this decision. - allTargetRepls := append(targetVoterRepls, targetNonVoterRepls...) - if sr.worsensDiversity( - ctx, - rangeDesc.GetRangeID(), - rangeDesc.Replicas().Descriptors(), - allTargetRepls, - false, /* onlyVoters */ - ) { + if !replWithStats.repl.OwnsValidLease(ctx, now) { + log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) continue } + log.VEventf(ctx, 3, "considering replica rebalance for r%d with %.2f qps", + replWithStats.repl.GetRangeID(), replWithStats.qps) + + targetVoterRepls, targetNonVoterRepls := sr.getRebalanceTargetsBasedOnQPS(ctx, rebalanceCtx) + storeDescMap := storeListToMap(allStoresList) + // Pick the voter with the least QPS to be leaseholder; // RelocateRange transfers the lease to the first provided target. newLeaseIdx := 0 @@ -627,7 +605,7 @@ func (sr *StoreRebalancer) chooseRangeToRebalance( } } - storeDesc, ok := storeMap[targetVoterRepls[i].StoreID] + storeDesc, ok := storeDescMap[targetVoterRepls[i].StoreID] if ok && storeDesc.Capacity.QueriesPerSecond < newLeaseQPS { newLeaseIdx = i newLeaseQPS = storeDesc.Capacity.QueriesPerSecond @@ -640,258 +618,126 @@ func (sr *StoreRebalancer) chooseRangeToRebalance( } } -// worsensDiversity returns true iff the diversity score of `currentRepls` is -// higher than `targetRepls` (either among just the set of voting replicas, or -// across all replicas in the range -- determined by `onlyVoters`). -func (sr *StoreRebalancer) worsensDiversity( - ctx context.Context, - rangeID roachpb.RangeID, - currentRepls, targetRepls []roachpb.ReplicaDescriptor, - onlyVoters bool, -) bool { - curDiversity := rangeDiversityScore( - sr.rq.allocator.storePool.getLocalitiesByStore(currentRepls), - ) - newDiversity := rangeDiversityScore( - sr.rq.allocator.storePool.getLocalitiesByStore(targetRepls), - ) - replicaStr := "replica" - if onlyVoters { - replicaStr = "voting replica" - } - if curDiversity > newDiversity { - log.VEventf( - ctx, - 3, - "new %s diversity %.2f for r%d worse than current diversity %.2f; not rebalancing", - replicaStr, - newDiversity, - rangeID, - curDiversity, - ) - return true - } - return false -} - -// getRebalanceCandidatesBasedOnQPS returns a list of rebalance targets for +// getRebalanceTargetsBasedOnQPS returns a list of rebalance targets for // voting and non-voting replicas on the range that match the relevant // constraints on the range and would further the goal of balancing the QPS on -// the stores in this cluster. In case there aren't enough stores that meet the -// constraints and are valid rebalance candidates based on QPS, the list of -// targets returned may contain fewer-than-required replicas. -// -// NB: `localStoreDesc` is expected to be the leaseholder of the range being -// operated on. -func (sr *StoreRebalancer) getRebalanceCandidatesBasedOnQPS( - ctx context.Context, - rebalanceCtx rangeRebalanceContext, - localStoreDesc *roachpb.StoreDescriptor, - storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, - storeList StoreList, - minQPS, maxQPS float64, +// the stores in this cluster. +func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS( + ctx context.Context, rbCtx rangeRebalanceContext, ) (finalVoterTargets, finalNonVoterTargets []roachpb.ReplicaDescriptor) { - options := sr.rq.allocator.scorerOptions() - options.qpsRebalanceThreshold = qpsRebalanceThreshold.Get(&sr.st.SV) - - // Decide which voting / non-voting replicas we want to keep around and find - // rebalance targets for the rest. - partialVoterTargets := sr.pickReplsToKeep( - ctx, - rebalanceCtx, - nil, /* replsToExclude */ - localStoreDesc, - storeMap, - maxQPS, - voterTarget, - ) - finalVoterTargets = sr.pickRemainingRepls( - ctx, - rebalanceCtx, - partialVoterTargets, - nil, /* partialNonVoterTargets */ - storeMap, - storeList, - options, - minQPS, maxQPS, - voterTarget, - ) - - partialNonVoterTargets := sr.pickReplsToKeep( - ctx, - rebalanceCtx, - // NB: `finalVoterTargets` may contain replicas that are part of the - // existing set of non-voter targets, so we make sure that we don't keep - // those replicas around in `partialNonVoterTargets`. - finalVoterTargets, - localStoreDesc, - storeMap, - maxQPS, - nonVoterTarget, - ) - finalNonVoterTargets = sr.pickRemainingRepls( - ctx, - rebalanceCtx, - finalVoterTargets, - partialNonVoterTargets, - storeMap, - storeList, - options, - minQPS, - maxQPS, - nonVoterTarget, - ) - - return finalVoterTargets, finalNonVoterTargets -} - -// pickRemainingRepls determines the set of rebalance targets to fill in the -// rest of `partial{Voter,NonVoter}Targets` such that the resulting set contains -// exactly as many replicas as dictated by the zone configs. -// -// The caller is expected to synthesize the set of -// `partial{Voter,NonVoter}Targets` via `StoreRebalancer.pickReplsToKeep`. -func (sr *StoreRebalancer) pickRemainingRepls( - ctx context.Context, - rebalanceCtx rangeRebalanceContext, - partialVoterTargets, partialNonVoterTargets []roachpb.ReplicaDescriptor, - storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, - storeList StoreList, - options scorerOptions, - minQPS, maxQPS float64, - targetType targetReplicaType, -) []roachpb.ReplicaDescriptor { - // Alias the slice that corresponds to the set of replicas that is being - // appended to. This is because we want subsequent calls to - // `allocateTargetFromList` to observe the results of previous calls (note the - // append to the slice referenced by `finalTargetsForType`). - var finalTargetsForType *[]roachpb.ReplicaDescriptor - switch targetType { - case voterTarget: - finalTargetsForType = &partialVoterTargets - case nonVoterTarget: - finalTargetsForType = &partialNonVoterTargets - default: - log.Fatalf(ctx, "unknown targetReplicaType: %s", targetType) - } - for len(*finalTargetsForType) < rebalanceCtx.numDesiredReplicas(targetType) { - // Use the preexisting Allocate{Non}Voter logic to ensure that - // considerations such as zone constraints, locality diversity, and full - // disk come into play. - target, _ := sr.rq.allocator.allocateTargetFromList( + options := sr.scorerOptions() + finalVoterTargets = rbCtx.rangeDesc.Replicas().VoterDescriptors() + finalNonVoterTargets = rbCtx.rangeDesc.Replicas().NonVoterDescriptors() + + // NB: We attempt to rebalance N times for N replicas as we may want to + // replace all of them (they could all be on suboptimal stores). + for i := 0; i < len(finalVoterTargets); i++ { + // TODO(aayush): Figure out a way to plumb the `details` here into + // `AdminRelocateRange` so that these decisions show up in system.rangelog + add, remove, _, shouldRebalance := sr.rq.allocator.rebalanceTarget( ctx, - storeList, - rebalanceCtx.zone, - partialVoterTargets, - partialNonVoterTargets, + rbCtx.zone, + rbCtx.replWithStats.repl.RaftStatus(), + finalVoterTargets, finalNonVoterTargets, + rangeUsageInfoForRepl(rbCtx.replWithStats.repl), + storeFilterSuspect, + voterTarget, options, - // The store rebalancer should never need to perform lateral relocations, - // so we ask the allocator to disregard all the nodes that exist in - // `partial{Non}VoterTargets`. - false, /* allowMultipleReplsPerNode */ - targetType, ) - if target == nil { + if !shouldRebalance { log.VEventf( - ctx, 3, "no rebalance %ss found to replace the current store for r%d", - targetType, rebalanceCtx.rangeDesc.RangeID, + ctx, + 3, + "no more rebalancing opportunities for r%d voters that improve QPS balance", + rbCtx.rangeDesc.RangeID, ) break } - - meanQPS := storeList.candidateQueriesPerSecond.mean - if sr.shouldNotMoveTo( + log.VEventf( ctx, - storeMap, - rebalanceCtx.replWithStats, - target.StoreID, - meanQPS, - minQPS, - maxQPS, - ) { - // NB: If the target store returned by the allocator is not fit to - // receive a new replica due to balancing reasons, there is no point - // continuing with this loop since we'd expect future calls to - // `allocateTargetFromList` to return the same target. - break - } - - *finalTargetsForType = append(*finalTargetsForType, roachpb.ReplicaDescriptor{ - NodeID: target.Node.NodeID, - StoreID: target.StoreID, - }) - } - return *finalTargetsForType -} + 3, + "rebalancing voter (qps=%.2f) for r%d on %v to %v in order to improve QPS balance", + rbCtx.replWithStats.qps, + rbCtx.rangeDesc.RangeID, + remove, + add, + ) -// pickReplsToKeep determines the set of existing replicas for a range which -// should _not_ be rebalanced (because they belong to stores that aren't -// overloaded). -func (sr *StoreRebalancer) pickReplsToKeep( - ctx context.Context, - rebalanceCtx rangeRebalanceContext, - replsToExclude []roachpb.ReplicaDescriptor, - localStoreDesc *roachpb.StoreDescriptor, - storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, - maxQPS float64, - targetType targetReplicaType, -) (partialTargetRepls []roachpb.ReplicaDescriptor) { - shouldExclude := func(repl roachpb.ReplicaDescriptor) bool { - for _, excluded := range replsToExclude { - if repl.StoreID == excluded.StoreID { - return true + afterVoters := make([]roachpb.ReplicaDescriptor, 0, len(finalVoterTargets)) + afterNonVoters := make([]roachpb.ReplicaDescriptor, 0, len(finalNonVoterTargets)) + for _, voter := range finalVoterTargets { + if voter.StoreID == remove.StoreID { + afterVoters = append( + afterVoters, roachpb.ReplicaDescriptor{ + StoreID: add.StoreID, + NodeID: add.NodeID, + }) + } else { + afterVoters = append(afterVoters, voter) } } - return false - } - - var currentReplsForType []roachpb.ReplicaDescriptor - switch targetType { - case voterTarget: - currentReplsForType = rebalanceCtx.rangeDesc.Replicas().VoterDescriptors() - case nonVoterTarget: - currentReplsForType = rebalanceCtx.rangeDesc.Replicas().NonVoterDescriptors() - default: - log.Fatalf(ctx, "unknown targetReplicaType: %s", targetType) + // Voters are allowed to relocate to stores that have non-voters, which may + // displace them. + for _, nonVoter := range finalNonVoterTargets { + if nonVoter.StoreID == add.StoreID { + afterNonVoters = append(afterNonVoters, roachpb.ReplicaDescriptor{ + StoreID: remove.StoreID, + NodeID: remove.NodeID, + }) + } else { + afterNonVoters = append(afterNonVoters, nonVoter) + } + } + // Pretend that we've executed upon this rebalancing decision. + finalVoterTargets = afterVoters + finalNonVoterTargets = afterNonVoters } - // Check the existing replicas, keeping around those that aren't overloaded. - for i := range currentReplsForType { - if shouldExclude(currentReplsForType[i]) || - currentReplsForType[i].StoreID == localStoreDesc.StoreID { - continue + for i := 0; i < len(finalNonVoterTargets); i++ { + add, remove, _, shouldRebalance := sr.rq.allocator.rebalanceTarget( + ctx, + rbCtx.zone, + rbCtx.replWithStats.repl.RaftStatus(), + finalVoterTargets, finalNonVoterTargets, + rangeUsageInfoForRepl(rbCtx.replWithStats.repl), + storeFilterSuspect, + nonVoterTarget, + options, + ) + if !shouldRebalance { + log.VEventf( + ctx, + 3, + "no more rebalancing opportunities for r%d non-voters that improve QPS balance", + rbCtx.rangeDesc.RangeID, + ) + break } - - // Keep the replica in the range if we don't know its QPS or if its QPS is - // below the upper threshold. Punishing stores not in our store map could - // cause mass evictions if the storePool gets out of sync. - storeDesc, ok := storeMap[currentReplsForType[i].StoreID] - if !ok || storeDesc.Capacity.QueriesPerSecond < maxQPS { - if log.V(3) { - var reason redact.RedactableString - if ok { - reason = redact.Sprintf( - " (qps %.2f vs max %.2f)", - storeDesc.Capacity.QueriesPerSecond, - maxQPS, - ) - } - log.VEventf( - ctx, - 3, - "keeping %s r%d/%d on s%d%s", - targetType, - rebalanceCtx.rangeDesc.RangeID, - currentReplsForType[i].ReplicaID, - currentReplsForType[i].StoreID, - reason, - ) + log.VEventf( + ctx, + 3, + "rebalancing non-voter (qps=%.2f) for r%d on %v to %v in order to improve QPS balance", + rbCtx.replWithStats.qps, + rbCtx.rangeDesc.RangeID, + remove, + add, + ) + var newNonVoters []roachpb.ReplicaDescriptor + for _, nonVoter := range finalNonVoterTargets { + if nonVoter.StoreID == remove.StoreID { + newNonVoters = append( + newNonVoters, roachpb.ReplicaDescriptor{ + StoreID: add.StoreID, + NodeID: add.NodeID, + }) + } else { + newNonVoters = append(newNonVoters, nonVoter) } - - partialTargetRepls = append(partialTargetRepls, currentReplsForType[i]) } + // Pretend that we've executed upon this rebalancing decision. + finalNonVoterTargets = newNonVoters } - return partialTargetRepls + return finalVoterTargets, finalNonVoterTargets } func shouldNotMoveAway( diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index d9c3b82f32eb..d808f54f6061 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -12,9 +12,13 @@ package kvserver import ( "context" + "fmt" + "math/rand" "reflect" + "strings" "testing" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" @@ -28,10 +32,173 @@ import ( "go.etcd.io/etcd/raft/v3/tracker" ) +const defaultQPSRebalanceThreshold = 0.25 + var ( - // noLocalityStores specifies a set of stores where s5 is - // under-utilized in terms of QPS, s2-s4 are in the middle, and s1 is - // over-utilized. + // multiRegionStores specifies a set of stores across 3 regions. These stores + // are arranged in descending order of the QPS they are receiving. Store 1 is + // the most heavily loaded, and store 9 is the least heavily loaded store. + // Consequently, region "a" is fielding the most QPS whereas region "c" is + // fielding the least. + multiRegionStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{ + NodeID: 1, + Locality: roachpb.Locality{ + []roachpb.Tier{ + { + Key: "region", + Value: "a", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 3000, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{ + NodeID: 2, + Locality: roachpb.Locality{ + []roachpb.Tier{ + { + Key: "region", + Value: "a", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 2800, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{ + NodeID: 3, + Locality: roachpb.Locality{ + []roachpb.Tier{ + { + Key: "region", + Value: "a", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 2600, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{ + NodeID: 4, + Locality: roachpb.Locality{ + []roachpb.Tier{ + { + Key: "region", + Value: "b", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 2400, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{ + NodeID: 5, + Locality: roachpb.Locality{ + []roachpb.Tier{ + { + Key: "region", + Value: "b", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 2200, + }, + }, + { + StoreID: 6, + Node: roachpb.NodeDescriptor{ + NodeID: 6, + Locality: roachpb.Locality{ + []roachpb.Tier{ + { + Key: "region", + Value: "b", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 2000, + }, + }, + { + StoreID: 7, + Node: roachpb.NodeDescriptor{ + NodeID: 7, + Locality: roachpb.Locality{ + []roachpb.Tier{ + { + Key: "region", + Value: "c", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1800, + }, + }, + { + StoreID: 8, + Node: roachpb.NodeDescriptor{ + NodeID: 8, + Locality: roachpb.Locality{ + []roachpb.Tier{ + { + Key: "region", + Value: "c", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1600, + }, + }, + { + StoreID: 9, + Node: roachpb.NodeDescriptor{ + NodeID: 9, + Locality: roachpb.Locality{ + []roachpb.Tier{ + { + Key: "region", + Value: "c", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1400, + }, + }, + } + + // noLocalityStores specifies a set of stores that do not have any associated + // locality tags, where s5 is under-utilized in terms of QPS, s2-s4 are in the + // middle, and s1 is over-utilized. noLocalityStores = []*roachpb.StoreDescriptor{ { StoreID: 1, @@ -200,361 +367,402 @@ func TestChooseLeaseToTransfer(t *testing.T) { } } -func TestChooseRangeToRebalance(t *testing.T) { +func randomNoLocalityStores( + numNodes int, qpsMultiplier float64, +) (stores []*roachpb.StoreDescriptor, qpsMean float64) { + var totalQPS float64 + for i := 1; i <= numNodes; i++ { + qps := rand.Float64() * qpsMultiplier + stores = append( + stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{QueriesPerSecond: qps}, + }, + ) + totalQPS = totalQPS + qps + } + return stores, totalQPS / float64(numNodes) +} + +func logSummary( + ctx context.Context, allStores, deadStores []*roachpb.StoreDescriptor, meanQPS float64, +) { + var summary strings.Builder + for _, store := range allStores { + summary.WriteString( + fmt.Sprintf("s%d: %.2f qps", store.StoreID, store.Capacity.QueriesPerSecond), + ) + for _, dead := range deadStores { + if dead.StoreID == store.StoreID { + summary.WriteString(" (dead)") + } + } + summary.WriteString("\n") + } + summary.WriteString(fmt.Sprintf("overall-mean: %.2f", meanQPS)) + log.Infof(ctx, "generated random store list:\n%s", summary.String()) +} + +func TestChooseRangeToRebalanceRandom(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) + const ( + numIterations = 10 + + qpsMultiplier = 2000 + numVoters = 3 + numNonVoters = 3 + numNodes = 12 + numDeadNodes = 3 + perReplicaQPS = 100 + qpsRebalanceThreshold = 0.1 + ) + + for i := 0; i < numIterations; i++ { + t.Run(fmt.Sprintf("%d", i+1), func(t *testing.T) { + ctx := context.Background() + stopper, g, _, a, _ := createTestAllocator(numNodes, false /* deterministic */) + defer stopper.Stop(context.Background()) + + stores, actualQPSMean := randomNoLocalityStores(numNodes, qpsMultiplier) + deadStores := stores[len(stores)-numDeadNodes:] + logSummary(ctx, stores, deadStores, actualQPSMean) + meanQPS := func(targets []roachpb.StoreID) float64 { + var totalQPS float64 + for _, store := range stores { + for _, target := range targets { + if target == store.StoreID { + totalQPS = totalQPS + store.Capacity.QueriesPerSecond + break + } + } + } + return totalQPS / float64(len(stores)) + } - stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) - defer stopper.Stop(context.Background()) - gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) - storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) - storeMap := storeListToMap(storeList) + // Test setup boilerplate. + gossiputil.NewStoreGossiper(g).GossipStores(stores, t) + storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) + localDesc := *stores[0] + cfg := TestStoreConfig(nil) + s := createTestStoreWithoutStart(t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, g, a) + rr := newReplicaRankings() + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + // Rather than trying to populate every Replica with a real raft group in + // order to pass replicaIsBehind checks, fake out the function for getting + // raft status with one that always returns all replicas as up to date. + sr.getRaftStatusFn = func(r *Replica) *raft.Status { + status := &raft.Status{ + Progress: make(map[uint64]tracker.Progress), + } + status.Lead = uint64(r.ReplicaID()) + status.Commit = 1 + for _, replica := range r.Desc().InternalReplicas { + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ + Match: 1, + State: tracker.StateReplicate, + } + } + return status + } + a.storePool.isStoreReadyForRoutineReplicaTransfer = func(_ context.Context, this roachpb.StoreID) bool { + for _, deadStore := range deadStores { + // NodeID match StoreIDs here, so this comparison is valid. + if deadStore.StoreID == this { + return false + } + } + return true + } + s.cfg.DefaultZoneConfig.NumVoters = proto.Int32(int32(numVoters)) + s.cfg.DefaultZoneConfig.NumReplicas = proto.Int32(int32(numVoters + numNonVoters)) + // Place voters on the first `numVoters` stores and place non-voters on the + // next `numNonVoters` stores. + var voterStores, nonVoterStores []roachpb.StoreID + for i := 0; i < numVoters; i++ { + voterStores = append(voterStores, stores[i].StoreID) + } + for i := numVoters; i < numVoters+numNonVoters; i++ { + nonVoterStores = append(nonVoterStores, stores[i].StoreID) + } + loadRanges( + rr, s, []testRange{ + {voters: voterStores, nonVoters: nonVoterStores, qps: perReplicaQPS}, + }, + ) + hottestRanges := rr.topQPS() + _, voterTargets, nonVoterTargets := sr.chooseRangeToRebalance( + ctx, + &hottestRanges, + &localDesc, + storeList, + ) + var rebalancedVoterStores, rebalancedNonVoterStores []roachpb.StoreID + for _, target := range voterTargets { + rebalancedVoterStores = append(rebalancedVoterStores, target.StoreID) + } + for _, target := range nonVoterTargets { + rebalancedNonVoterStores = append(rebalancedNonVoterStores, target.StoreID) + } + log.Infof( + ctx, + "rebalanced voters from %v to %v: %.2f qps -> %.2f qps", + voterStores, + voterTargets, + meanQPS(voterStores), + meanQPS(rebalancedVoterStores), + ) + log.Infof( + ctx, + "rebalanced non-voters from %v to %v: %.2f qps -> %.2f qps", + nonVoterStores, + nonVoterTargets, + meanQPS(nonVoterStores), + meanQPS(rebalancedNonVoterStores), + ) + require.GreaterOrEqualf( + t, + meanQPS(voterStores), + meanQPS(rebalancedVoterStores), + "voters were rebalanced onto a set of stores with higher QPS", + ) - const minQPS = 800 - const maxQPS = 1200 + previousMean := meanQPS(append(voterStores, nonVoterStores...)) + newMean := meanQPS(append(rebalancedVoterStores, rebalancedNonVoterStores...)) + log.Infof( + ctx, + "rebalanced range from stores with %.2f average qps to %.2f average qps", + previousMean, + newMean, + ) + require.GreaterOrEqualf( + t, + previousMean, + newMean, + "replicas were rebalanced onto a set of stores with higher QPS", + ) + }) + } +} - localDesc := *noLocalityStores[0] - cfg := TestStoreConfig(nil) - s := createTestStoreWithoutStart(t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) - s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} - rq := newReplicateQueue(s, g, a) - rr := newReplicaRankings() +func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) - sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) - // Rather than trying to populate every Replica with a real raft group in - // order to pass replicaIsBehind checks, fake out the function for getting - // raft status with one that always returns all replicas as up to date. - sr.getRaftStatusFn = func(r *Replica) *raft.Status { - status := &raft.Status{ - Progress: make(map[uint64]tracker.Progress), - } - status.Lead = uint64(r.ReplicaID()) - status.Commit = 1 - for _, replica := range r.Desc().InternalReplicas { - status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ - Match: 1, - State: tracker.StateReplicate, - } + constraint := func(region string, numReplicas int32) zonepb.ConstraintsConjunction { + return zonepb.ConstraintsConjunction{ + NumReplicas: numReplicas, + Constraints: []zonepb.Constraint{ + { + Type: zonepb.Constraint_REQUIRED, + Key: "region", + Value: region, + }, + }, } - return status + } + + oneReplicaPerRegion := []zonepb.ConstraintsConjunction{ + constraint("a", 1), + constraint("b", 1), + constraint("c", 1), + } + twoReplicasInHotRegion := []zonepb.ConstraintsConjunction{ + constraint("a", 2), + } + allReplicasInHotRegion := []zonepb.ConstraintsConjunction{ + constraint("a", 3), } testCases := []struct { - voters, nonVoters []roachpb.StoreID - // stores that are not to be considered for rebalancing - nonLive []roachpb.StoreID - qps float64 - // the first listed voter target is expected to be the leaseholder - expectedRebalancedVoters, expectedRebalancedNonVoters []roachpb.StoreID + name string + voters, nonVoters []roachpb.StoreID + voterConstraints, constraints []zonepb.ConstraintsConjunction + + // the first listed voter target is expected to be the leaseholder. + expRebalancedVoters, expRebalancedNonVoters []roachpb.StoreID }{ - { - voters: []roachpb.StoreID{1}, - nonVoters: nil, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{5}, - expectedRebalancedNonVoters: nil, - }, - // If s5 is unavailable, s4 is the next best guess. - { - voters: []roachpb.StoreID{1}, - nonVoters: nil, - nonLive: []roachpb.StoreID{5}, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{4}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1}, - nonVoters: nil, - nonLive: []roachpb.StoreID{4, 5}, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1}, - nonVoters: nil, - nonLive: nil, - qps: 500, - expectedRebalancedVoters: []roachpb.StoreID{5}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1}, - nonVoters: nil, - nonLive: []roachpb.StoreID{5}, - qps: 500, - expectedRebalancedVoters: []roachpb.StoreID{}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1}, - nonVoters: nil, - nonLive: nil, - qps: 800, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1}, - nonVoters: nil, - nonLive: nil, - qps: 1.5, - expectedRebalancedVoters: []roachpb.StoreID{5}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1}, - nonVoters: nil, - nonLive: []roachpb.StoreID{5}, - qps: 1.5, - expectedRebalancedVoters: []roachpb.StoreID{4}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1}, - nonVoters: nil, - nonLive: nil, - qps: 1.49, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 2}, - nonVoters: nil, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{5, 2}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 2}, - nonVoters: nil, - nonLive: []roachpb.StoreID{5}, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{4, 2}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 3}, - nonVoters: nil, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{5, 3}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 4}, - nonVoters: nil, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{5, 4}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 2}, - nonVoters: nil, - nonLive: nil, - qps: 800, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 2}, - nonVoters: nil, - nonLive: nil, - qps: 1.49, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 4, 5}, - nonVoters: nil, - nonLive: nil, - qps: 500, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 4, 5}, - nonVoters: nil, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 3, 5}, - nonVoters: nil, - nonLive: nil, - qps: 500, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 3, 4}, - nonVoters: nil, - nonLive: nil, - qps: 500, - expectedRebalancedVoters: []roachpb.StoreID{5, 4, 3}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 3, 5}, - nonVoters: nil, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{5, 4, 3}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 3, 5}, - nonVoters: nil, - nonLive: []roachpb.StoreID{4}, - qps: 100, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - // Rebalancing to s2 isn't chosen even though it's better than s1 because it's above the mean. - { - voters: []roachpb.StoreID{1, 3, 4, 5}, - nonVoters: nil, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 2, 4, 5}, - nonVoters: nil, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 2, 3, 5}, - nonVoters: nil, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{5, 4, 3, 2}, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1, 2, 3, 4}, - nonVoters: nil, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{5, 4, 3, 2}, - expectedRebalancedNonVoters: nil, - }, - { - // Don't bother moving any replicas around since it won't make much of a - // difference. See `minQPSFraction` inside `chooseRangeToRebalance()`. - voters: []roachpb.StoreID{1}, - nonVoters: []roachpb.StoreID{2, 3, 4}, - nonLive: nil, - qps: 1, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - { - // None of the stores are worth moving to because they will be above the - // maxQPS after the move. - voters: []roachpb.StoreID{1}, - nonVoters: []roachpb.StoreID{2, 3, 4}, - nonLive: nil, - qps: 1000, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, - }, - { - voters: []roachpb.StoreID{1}, - nonVoters: []roachpb.StoreID{2, 3, 4}, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{5}, - expectedRebalancedNonVoters: []roachpb.StoreID{4, 3, 2}, - }, - // Voters may rebalance to stores that have a non-voter, and those - // displaced non-voters will be rebalanced to other valid stores. - { - voters: []roachpb.StoreID{1}, - nonVoters: []roachpb.StoreID{5}, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{5}, - expectedRebalancedNonVoters: []roachpb.StoreID{4}, - }, - { - voters: []roachpb.StoreID{1}, - nonVoters: []roachpb.StoreID{5, 2, 3}, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: []roachpb.StoreID{5}, - expectedRebalancedNonVoters: []roachpb.StoreID{2, 3, 4}, - }, - { - // Voters may rebalance to stores that have a non-voter, but only if the - // displaced non-voters can be rebalanced to other underfull (based on - // QPS) stores. Note that stores 1 and 2 are above the maxQPS and the - // meanQPS, respectively, so non-voters cannot be rebalanced to them. - voters: []roachpb.StoreID{1, 2}, - nonVoters: []roachpb.StoreID{5, 4, 3}, - nonLive: nil, - qps: 100, - expectedRebalancedVoters: nil, - expectedRebalancedNonVoters: nil, + // All the replicas are already on the best possible stores. No + // rebalancing should be attempted. + { + name: "no rebalance", + voters: []roachpb.StoreID{3, 6, 9}, + constraints: oneReplicaPerRegion, + expRebalancedVoters: []roachpb.StoreID{9, 6, 3}, + }, + // A replica is in a heavily loaded region, on a relatively heavily loaded + // store. We expect it to be moved to a less busy store within the same + // region. + { + name: "rebalance one replica within heavy region", + voters: []roachpb.StoreID{1, 6, 9}, + constraints: oneReplicaPerRegion, + expRebalancedVoters: []roachpb.StoreID{9, 6, 3}, + }, + // Two replicas are in the hot region, both on relatively heavily loaded + // nodes. We expect one of those replicas to be moved to a less busy store + // within the same region. + { + name: "rebalance two replicas out of three within heavy region", + voters: []roachpb.StoreID{1, 2, 9}, + constraints: twoReplicasInHotRegion, + expRebalancedVoters: []roachpb.StoreID{9, 2, 3}, + }, + { + name: "rebalance two replicas out of five within heavy region", + voters: []roachpb.StoreID{1, 2, 6, 8, 9}, + constraints: twoReplicasInHotRegion, + // NB: Because of the diversity heuristic we won't rebalance to node 7. + expRebalancedVoters: []roachpb.StoreID{9, 3, 6, 8, 2}, + }, + // In the absence of any constraints, ensure that as long as diversity is + // maximized, replicas on hot stores are rebalanced to cooler stores within + // the same region. + { + // Within the hottest region, expect rebalance from the hottest node (n1) + // to the coolest node (n3). + name: "QPS balance without constraints", + voters: []roachpb.StoreID{1, 5, 8}, + expRebalancedVoters: []roachpb.StoreID{8, 5, 3}, + }, + { + // Within the second hottest region, expect rebalance from the hottest + // node (n4) to the coolest node (n6). + name: "QPS balance without constraints", + voters: []roachpb.StoreID{8, 4, 3}, + expRebalancedVoters: []roachpb.StoreID{8, 6, 3}, + }, + + // Multi-region database configurations. + { + name: "primary region with highest QPS, zone survival, one non-voter on hot node", + voters: []roachpb.StoreID{1, 2, 3}, + nonVoters: []roachpb.StoreID{4, 9}, + // Pin all voters to the hottest region (region A) and have overall + // constraints require at least one replica per each region. + voterConstraints: allReplicasInHotRegion, + constraints: oneReplicaPerRegion, + + expRebalancedVoters: []roachpb.StoreID{3, 2, 1}, + // NB: Expect the non-voter on node 4 (hottest node in region B) to move + // to node 6 (least hot region in region B). + expRebalancedNonVoters: []roachpb.StoreID{6, 9}, + }, + { + name: "primary region with highest QPS, region survival, one voter on sub-optimal node", + voters: []roachpb.StoreID{2, 3, 4, 8, 9}, + // Pin two voters to the hottest region (region A) and have overall + // constraints require at least one replica per each region. + voterConstraints: twoReplicasInHotRegion, + constraints: oneReplicaPerRegion, + // NB: Expect the voter on node 4 (hottest node in region B) to move to + // node 6 (least hot region in region B). + expRebalancedVoters: []roachpb.StoreID{9, 2, 6, 8, 3}, + }, + { + name: "primary region with highest QPS, region survival, two voters on sub-optimal nodes", + voters: []roachpb.StoreID{1, 2, 3, 4, 9}, + // Pin two voters to the hottest region (region A) and have overall + // constraints require at least one replica per each region. + voterConstraints: twoReplicasInHotRegion, + constraints: oneReplicaPerRegion, + // NB: We've got 3 voters in the hottest region, but we only need 2. We + // expect that one of the voters from the hottest region will be moved to + // the least hot region. Furthermore, we'd expect the voter on node 4 + // (hottest node in region B) to move to node 6 (least hot node in region + // B). + expRebalancedVoters: []roachpb.StoreID{9, 2, 6, 8, 3}, }, } - for _, tc := range testCases { - t.Run("", func(t *testing.T) { - a.storePool.isStoreReadyForRoutineReplicaTransfer = func(_ context.Context, storeID roachpb.StoreID) bool { - for _, s := range tc.nonLive { - if s == storeID { - return false + t.Run(tc.name, func(t *testing.T) { + // Boilerplate for test setup. + stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + defer stopper.Stop(context.Background()) + gossiputil.NewStoreGossiper(g).GossipStores(multiRegionStores, t) + storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) + + var localDesc roachpb.StoreDescriptor + for _, store := range multiRegionStores { + if store.StoreID == tc.voters[0] { + localDesc = *store + } + } + cfg := TestStoreConfig(nil) + s := createTestStoreWithoutStart(t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, g, a) + rr := newReplicaRankings() + + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + + // Rather than trying to populate every Replica with a real raft group in + // order to pass replicaIsBehind checks, fake out the function for getting + // raft status with one that always returns all replicas as up to date. + sr.getRaftStatusFn = func(r *Replica) *raft.Status { + status := &raft.Status{ + Progress: make(map[uint64]tracker.Progress), + } + status.Lead = uint64(r.ReplicaID()) + status.Commit = 1 + for _, replica := range r.Desc().InternalReplicas { + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ + Match: 1, + State: tracker.StateReplicate, } } - return true + return status } s.cfg.DefaultZoneConfig.NumVoters = proto.Int32(int32(len(tc.voters))) s.cfg.DefaultZoneConfig.NumReplicas = proto.Int32(int32(len(tc.voters) + len(tc.nonVoters))) + s.cfg.DefaultZoneConfig.Constraints = tc.constraints + s.cfg.DefaultZoneConfig.VoterConstraints = tc.voterConstraints + const testingQPS = float64(50) loadRanges( rr, s, []testRange{ - {voters: tc.voters, nonVoters: tc.nonVoters, qps: tc.qps}, + {voters: tc.voters, nonVoters: tc.nonVoters, qps: testingQPS}, }, ) hottestRanges := rr.topQPS() _, voterTargets, nonVoterTargets := sr.chooseRangeToRebalance( - ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS, + ctx, + &hottestRanges, + &localDesc, + storeList, ) - require.Len(t, voterTargets, len(tc.expectedRebalancedVoters)) - if len(voterTargets) > 0 && voterTargets[0].StoreID != tc.expectedRebalancedVoters[0] { + require.Len(t, voterTargets, len(tc.expRebalancedVoters)) + if len(voterTargets) > 0 && voterTargets[0].StoreID != tc.expRebalancedVoters[0] { t.Errorf("chooseRangeToRebalance(existing=%v, qps=%f) chose s%d as leaseholder; want s%v", - tc.voters, tc.qps, voterTargets[0], tc.expectedRebalancedVoters[0]) + tc.voters, testingQPS, voterTargets[0], tc.expRebalancedVoters[0]) } voterStoreIDs := make([]roachpb.StoreID, len(voterTargets)) for i, target := range voterTargets { voterStoreIDs[i] = target.StoreID } - require.ElementsMatch(t, voterStoreIDs, tc.expectedRebalancedVoters) - - require.Len(t, nonVoterTargets, len(tc.expectedRebalancedNonVoters)) + require.ElementsMatch(t, voterStoreIDs, tc.expRebalancedVoters) + // Check that things "still work" when `VoterConstraints` are used + // instead. + s.cfg.DefaultZoneConfig.Constraints = []zonepb.ConstraintsConjunction{} + s.cfg.DefaultZoneConfig.VoterConstraints = tc.constraints + require.ElementsMatch(t, voterStoreIDs, tc.expRebalancedVoters) + + require.Len(t, nonVoterTargets, len(tc.expRebalancedNonVoters)) nonVoterStoreIDs := make([]roachpb.StoreID, len(nonVoterTargets)) for i, target := range nonVoterTargets { nonVoterStoreIDs[i] = target.StoreID } - require.ElementsMatch(t, nonVoterStoreIDs, tc.expectedRebalancedNonVoters) + require.ElementsMatch(t, nonVoterStoreIDs, tc.expRebalancedNonVoters) }) } } @@ -629,10 +837,9 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { hottestRanges = rr.topQPS() repl = hottestRanges[0].repl - _, targets, _ := sr.chooseRangeToRebalance( - ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS) + _, targets, _ := sr.chooseRangeToRebalance(ctx, &hottestRanges, &localDesc, storeList) expectTargets := []roachpb.ReplicationTarget{ - {NodeID: 4, StoreID: 4}, {NodeID: 5, StoreID: 5}, {NodeID: 3, StoreID: 3}, + {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, } if !reflect.DeepEqual(targets, expectTargets) { t.Errorf("got targets %v for range with RaftStatus %v; want %v",