diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 8bebe040a1f1..704ba21df2bf 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "compact_span_client.go", "consistency_queue.go", "debug_print.go", + "deprecated_store_rebalancer.go", "doc.go", "lease_history.go", "log.go", @@ -239,6 +240,7 @@ go_test( "closed_timestamp_test.go", "consistency_queue_test.go", "debug_print_test.go", + "deprecated_store_rebalancer_test.go", "gossip_test.go", "helpers_test.go", "intent_resolver_integration_test.go", diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 655d52f971b3..584da8e7f9dc 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -957,7 +957,7 @@ func (a *Allocator) allocateTargetFromList( candidateStores StoreList, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, - options *rangeCountScorerOptions, + options scorerOptions, allowMultipleReplsPerNode bool, targetType targetReplicaType, ) (roachpb.ReplicationTarget, string) { @@ -1454,7 +1454,7 @@ func (a Allocator) RebalanceNonVoter( func (a *Allocator) scorerOptions() *rangeCountScorerOptions { return &rangeCountScorerOptions{ deterministic: a.storePool.deterministic, - rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV), + rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.storePool.st.SV), } } @@ -1470,7 +1470,7 @@ func (a *Allocator) scorerOptionsForScatter() *scatterScorerOptions { // made by the replicateQueue during normal course of operations. In other // words, we don't want stores that are too far away from the mean to be // affected by the jitter. - jitter: rangeRebalanceThreshold.Get(&a.storePool.st.SV), + jitter: RangeRebalanceThreshold.Get(&a.storePool.st.SV), } } @@ -2093,6 +2093,8 @@ func (a Allocator) shouldTransferLeaseForLeaseCountConvergence( return false } +// preferredLeaseholders returns a slice of replica descriptors corresponding to +// replicas that meet lease preferences (among the `existing` replicas). func (a Allocator) preferredLeaseholders( conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor, ) []roachpb.ReplicaDescriptor { diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index 3c33358f74e8..f393d38e4510 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -72,10 +72,10 @@ const ( minRangeRebalanceThreshold = 2 ) -// rangeRebalanceThreshold is the minimum ratio of a store's range count to +// RangeRebalanceThreshold is the minimum ratio of a store's range count to // the mean range count at which that store is considered overfull or underfull // of ranges. -var rangeRebalanceThreshold = func() *settings.FloatSetting { +var RangeRebalanceThreshold = func() *settings.FloatSetting { s := settings.RegisterFloatSetting( settings.SystemOnly, "kv.allocator.range_rebalance_threshold", @@ -133,13 +133,15 @@ type scorerOptions interface { // (relative to the equivalence class `eqClass`). This makes it more likely // for us to pick this store as the rebalance target. rebalanceToConvergesScore(eqClass equivalenceClass, candidate roachpb.StoreDescriptor) int - // removalConvergesScore is similar to `rebalanceFromConvergesScore` (both - // deal with computing a converges score for existing stores that might + // removalMaximallyConvergesScore is similar to `rebalanceFromConvergesScore` + // (both deal with computing a converges score for existing stores that might // relinquish a replica). removalConvergesScore assigns a negative convergence // score to the existing store (or multiple replicas, if there are multiple // with the same QPS) that would converge the range's existing stores' QPS the // most. removalMaximallyConvergesScore(removalCandStoreList StoreList, existing roachpb.StoreDescriptor) int + // getRangeRebalanceThreshold returns the current range rebalance threshold. + getRangeRebalanceThreshold() float64 } func jittered(val float64, jitter float64, rand allocatorRand) float64 { @@ -290,13 +292,22 @@ func (o *rangeCountScorerOptions) removalMaximallyConvergesScore( return 0 } +func (o *rangeCountScorerOptions) getRangeRebalanceThreshold() float64 { + return o.rangeRebalanceThreshold +} + // qpsScorerOptions is used by the StoreRebalancer to tell the Allocator's // rebalancing machinery to base its balance/convergence scores on -// queries-per-second. This means that the resulting rebalancing decisions will -// further the goal of converging QPS across stores in the cluster. +// queries-per-second. This means that the resulting rebalancing decisions will further the goal of +// converging QPS across stores in the cluster. type qpsScorerOptions struct { deterministic bool qpsRebalanceThreshold, minRequiredQPSDiff float64 + // NB: For mixed version compatibility with 21.2, we need to include the range + // count based rebalance threshold here. This is because in 21.2, the store + // rebalancer took range count into account when trying to rank candidate + // stores. + deprecatedRangeRebalanceThreshold float64 // QPS-based rebalancing assumes that: // 1. Every replica of a range currently receives the same level of traffic. @@ -315,6 +326,10 @@ type qpsScorerOptions struct { qpsPerReplica float64 } +func (o *qpsScorerOptions) getRangeRebalanceThreshold() float64 { + return o.deprecatedRangeRebalanceThreshold +} + func (o *qpsScorerOptions) maybeJitterStoreStats(sl StoreList, _ allocatorRand) StoreList { return sl } @@ -745,7 +760,7 @@ func rankedCandidateListForAllocation( existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, isStoreValidForRoutineReplicaTransfer func(context.Context, roachpb.StoreID) bool, allowMultipleReplsPerNode bool, - options *rangeCountScorerOptions, + options scorerOptions, ) candidateList { var candidates candidateList existingReplTargets := roachpb.MakeReplicaSet(existingReplicas).ReplicationTargets() @@ -779,11 +794,42 @@ func rankedCandidateListForAllocation( } diversityScore := diversityAllocateScore(s, existingStoreLocalities) balanceScore := options.balanceScore(candidateStores, s.Capacity) + // NB: This is only applicable in mixed version (21.2 along with 22.1) + // clusters. `rankedCandidateListForAllocation` will never be called in 22.1 + // with a `qpsScorerOptions`. + // + // TODO(aayush): Remove this some time in the 22.2 cycle. + var convergesScore int + if qpsOpts, ok := options.(*qpsScorerOptions); ok { + if qpsOpts.qpsRebalanceThreshold > 0 { + if s.Capacity.QueriesPerSecond < underfullQPSThreshold( + qpsOpts, candidateStores.candidateQueriesPerSecond.mean, + ) { + convergesScore = 1 + } else if s.Capacity.QueriesPerSecond < candidateStores.candidateQueriesPerSecond.mean { + convergesScore = 0 + } else if s.Capacity.QueriesPerSecond < overfullQPSThreshold( + qpsOpts, candidateStores.candidateQueriesPerSecond.mean, + ) { + convergesScore = -1 + } else { + convergesScore = -2 + } + + // NB: Maintain parity with the 21.2 implementation, which computed the + // `balanceScore` using range-counts instead of QPS even during + // load-based rebalancing. + balanceScore = (&rangeCountScorerOptions{ + rangeRebalanceThreshold: options.getRangeRebalanceThreshold(), + }).balanceScore(candidateStores, s.Capacity) + } + } candidates = append(candidates, candidate{ store: s, valid: constraintsOK, necessary: necessary, diversityScore: diversityScore, + convergesScore: convergesScore, balanceScore: balanceScore, rangeCount: int(s.Capacity.RangeCount), }) diff --git a/pkg/kv/kvserver/allocator_scorer_test.go b/pkg/kv/kvserver/allocator_scorer_test.go index bb48103d62d8..ed7013094c1b 100644 --- a/pkg/kv/kvserver/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator_scorer_test.go @@ -1447,7 +1447,7 @@ func TestDiversityScoreEquivalence(t *testing.T) { {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreUSb, testStoreEurope}, 13.0 / 20.0}, } - // Ensure that rangeDiversityScore and diversityRebalanceFromScore return + // Ensure that RangeDiversityScore and diversityRebalanceFromScore return // the same results for the same configurations, enabling their results // to be directly compared with each other. The same is not true for // diversityAllocateScore and diversityRemovalScore as of their initial @@ -1460,7 +1460,7 @@ func TestDiversityScoreEquivalence(t *testing.T) { } rangeScore := rangeDiversityScore(existingLocalities) if a, e := rangeScore, tc.expected; !scoresAlmostEqual(a, e) { - t.Errorf("rangeDiversityScore(%v) got %f, want %f", existingLocalities, a, e) + t.Errorf("RangeDiversityScore(%v) got %f, want %f", existingLocalities, a, e) } for _, storeID := range tc.stores { s := testStores[storeID] @@ -1472,7 +1472,7 @@ func TestDiversityScoreEquivalence(t *testing.T) { s, fromStoreID, existingLocalities, a, e) } if a, e := rebalanceScore, rangeScore; !scoresAlmostEqual(a, e) { - t.Errorf("diversityRebalanceFromScore(%v, %d, %v)=%f not equal to rangeDiversityScore(%v)=%f", + t.Errorf("diversityRebalanceFromScore(%v, %d, %v)=%f not equal to RangeDiversityScore(%v)=%f", s, fromStoreID, existingLocalities, a, existingLocalities, e) } } diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index fe23fde6fb9d..bba2e6554552 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -1236,7 +1236,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { for i := range stores { stores[i].rangeCount = mean } - surplus := int32(math.Ceil(float64(mean)*rangeRebalanceThreshold.Get(&st.SV) + 1)) + surplus := int32(math.Ceil(float64(mean)*RangeRebalanceThreshold.Get(&st.SV) + 1)) stores[0].rangeCount += surplus stores[0].shouldRebalanceFrom = true for i := 1; i < len(stores); i++ { @@ -1257,7 +1257,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { // Subtract enough ranges from the first store to make it a suitable // rebalance target. To maintain the specified mean, we then add that delta // back to the rest of the replicas. - deficit := int32(math.Ceil(float64(mean)*rangeRebalanceThreshold.Get(&st.SV) + 1)) + deficit := int32(math.Ceil(float64(mean)*RangeRebalanceThreshold.Get(&st.SV) + 1)) stores[0].rangeCount -= deficit for i := 1; i < len(stores); i++ { stores[i].rangeCount += int32(math.Ceil(float64(deficit) / float64(len(stores)-1))) diff --git a/pkg/kv/kvserver/deprecated_store_rebalancer.go b/pkg/kv/kvserver/deprecated_store_rebalancer.go new file mode 100644 index 000000000000..b41f8b0870f2 --- /dev/null +++ b/pkg/kv/kvserver/deprecated_store_rebalancer.go @@ -0,0 +1,627 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "fmt" + "math" + "sort" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/redact" + "go.etcd.io/etcd/raft/v3" +) + +func (sr *StoreRebalancer) deprecatedChooseLeaseToTransfer( + ctx context.Context, + hottestRanges *[]replicaWithStats, + localDesc *roachpb.StoreDescriptor, + storeList StoreList, + storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + minQPS float64, + maxQPS float64, +) (replicaWithStats, roachpb.ReplicaDescriptor, []replicaWithStats) { + var considerForRebalance []replicaWithStats + now := sr.rq.store.Clock().NowAsClockTimestamp() + for { + if len(*hottestRanges) == 0 { + return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance + } + replWithStats := (*hottestRanges)[0] + *hottestRanges = (*hottestRanges)[1:] + + // We're all out of replicas. + if replWithStats.repl == nil { + return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance + } + + if shouldNotMoveAway(ctx, replWithStats, localDesc, now, minQPS) { + continue + } + + // Don't bother moving leases whose QPS is below some small fraction of the + // store's QPS (unless the store has extra leases to spare anyway). It's + // just unnecessary churn with no benefit to move leases responsible for, + // for example, 1 qps on a store with 5000 qps. + const minQPSFraction = .001 + if replWithStats.qps < localDesc.Capacity.QueriesPerSecond*minQPSFraction && + float64(localDesc.Capacity.LeaseCount) <= storeList.candidateLeases.mean { + log.VEventf(ctx, 5, "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps", + replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, localDesc.Capacity.QueriesPerSecond) + continue + } + + desc, conf := replWithStats.repl.DescAndSpanConfig() + log.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps", + desc.RangeID, replWithStats.qps) + + // Check all the other voting replicas in order of increasing qps. + // Learners or non-voters aren't allowed to become leaseholders or raft + // leaders, so only consider the `Voter` replicas. + candidates := desc.Replicas().DeepCopy().VoterDescriptors() + sort.Slice(candidates, func(i, j int) bool { + var iQPS, jQPS float64 + if desc := storeMap[candidates[i].StoreID]; desc != nil { + iQPS = desc.Capacity.QueriesPerSecond + } + if desc := storeMap[candidates[j].StoreID]; desc != nil { + jQPS = desc.Capacity.QueriesPerSecond + } + return iQPS < jQPS + }) + + var raftStatus *raft.Status + + preferred := sr.rq.allocator.preferredLeaseholders(conf, candidates) + + // Filter both the list of preferred stores as well as the list of all + // candidate replicas to only consider live (non-suspect, non-draining) + // nodes. + const includeSuspectAndDrainingStores = false + preferred, _ = sr.rq.allocator.storePool.liveAndDeadReplicas(preferred, includeSuspectAndDrainingStores) + candidates, _ = sr.rq.allocator.storePool.liveAndDeadReplicas(candidates, includeSuspectAndDrainingStores) + + for _, candidate := range candidates { + if candidate.StoreID == localDesc.StoreID { + continue + } + + meanQPS := storeList.candidateQueriesPerSecond.mean + if sr.shouldNotMoveTo(ctx, storeMap, replWithStats, candidate.StoreID, meanQPS, minQPS, maxQPS) { + continue + } + + if raftStatus == nil { + raftStatus = sr.getRaftStatusFn(replWithStats.repl) + } + if replicaIsBehind(raftStatus, candidate.ReplicaID) { + log.VEventf(ctx, 3, "%v is behind or this store isn't the raft leader for r%d; raftStatus: %v", + candidate, desc.RangeID, raftStatus) + continue + } + + if len(preferred) > 0 && !storeHasReplica(candidate.StoreID, roachpb.MakeReplicaSet(preferred).ReplicationTargets()) { + log.VEventf(ctx, 3, "s%d not a preferred leaseholder for r%d; preferred: %v", + candidate.StoreID, desc.RangeID, preferred) + continue + } + + filteredStoreList := storeList.excludeInvalid(conf.Constraints) + filteredStoreList = storeList.excludeInvalid(conf.VoterConstraints) + if sr.rq.allocator.followTheWorkloadPrefersLocal( + ctx, + filteredStoreList, + *localDesc, + candidate.StoreID, + candidates, + replWithStats.repl.leaseholderStats, + ) { + log.VEventf(ctx, 3, "r%d is on s%d due to follow-the-workload; skipping", + desc.RangeID, localDesc.StoreID) + continue + } + + return replWithStats, candidate, considerForRebalance + } + + // If none of the other replicas are valid lease transfer targets, consider + // this range for replica rebalancing. + considerForRebalance = append(considerForRebalance, replWithStats) + } +} + +// rangeRebalanceContext represents a snapshot of a range's state during the +// StoreRebalancer's attempt to rebalance it based on QPS. +type deprecatedRebalanceContext struct { + replWithStats replicaWithStats + rangeDesc *roachpb.RangeDescriptor + conf roachpb.SpanConfig + clusterNodes int + numDesiredVoters, numDesiredNonVoters int +} + +func (rbc *deprecatedRebalanceContext) numDesiredReplicas(targetType targetReplicaType) int { + switch targetType { + case voterTarget: + return rbc.numDesiredVoters + case nonVoterTarget: + return rbc.numDesiredNonVoters + default: + panic(fmt.Sprintf("unknown targetReplicaType %s", targetType)) + } +} + +func (sr *StoreRebalancer) deprecatedChooseRangeToRebalance( + ctx context.Context, + hottestRanges *[]replicaWithStats, + localDesc *roachpb.StoreDescriptor, + storeList StoreList, + storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + minQPS float64, + maxQPS float64, +) (replWithStats replicaWithStats, voterTargets, nonVoterTargets []roachpb.ReplicationTarget) { + now := sr.rq.store.Clock().NowAsClockTimestamp() + for { + if len(*hottestRanges) == 0 { + return replicaWithStats{}, nil, nil + } + replWithStats := (*hottestRanges)[0] + *hottestRanges = (*hottestRanges)[1:] + + if replWithStats.repl == nil { + return replicaWithStats{}, nil, nil + } + + if shouldNotMoveAway(ctx, replWithStats, localDesc, now, minQPS) { + continue + } + + // Don't bother moving ranges whose QPS is below some small fraction of the + // store's QPS (unless the store has extra ranges to spare anyway). It's + // just unnecessary churn with no benefit to move ranges responsible for, + // for example, 1 qps on a store with 5000 qps. + const minQPSFraction = .001 + if replWithStats.qps < localDesc.Capacity.QueriesPerSecond*minQPSFraction { + log.VEventf( + ctx, + 5, + "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps", + replWithStats.repl.RangeID, + replWithStats.qps, + localDesc.StoreID, + localDesc.Capacity.QueriesPerSecond, + ) + continue + } + + log.VEventf(ctx, 3, "considering replica rebalance for r%d with %.2f qps", + replWithStats.repl.GetRangeID(), replWithStats.qps) + rangeDesc, conf := replWithStats.repl.DescAndSpanConfig() + clusterNodes := sr.rq.allocator.storePool.ClusterNodeCount() + numDesiredVoters := GetNeededVoters(conf.GetNumVoters(), clusterNodes) + numDesiredNonVoters := GetNeededNonVoters( + numDesiredVoters, int(conf.GetNumNonVoters()), clusterNodes, + ) + if rs := rangeDesc.Replicas(); numDesiredVoters != len(rs.VoterDescriptors()) || + numDesiredNonVoters != len(rs.NonVoterDescriptors()) { + // If the StoreRebalancer is allowed past this point, it may accidentally + // downreplicate and this can cause unavailable ranges. + // + // See: https://github.com/cockroachdb/cockroach/issues/54444#issuecomment-707706553 + log.VEventf(ctx, 3, "range needs up/downreplication; not considering rebalance") + continue + } + + rebalanceCtx := deprecatedRebalanceContext{ + replWithStats: replWithStats, + rangeDesc: rangeDesc, + conf: conf, + clusterNodes: clusterNodes, + numDesiredVoters: numDesiredVoters, + numDesiredNonVoters: numDesiredNonVoters, + } + targetVoterRepls, targetNonVoterRepls := sr.getRebalanceCandidatesBasedOnQPS( + ctx, rebalanceCtx, localDesc, storeMap, storeList, minQPS, maxQPS, + ) + + // If we couldn't find enough valid targets, forget about this range. + // + // TODO(a-robinson): Support more incremental improvements -- move what we + // can if it makes things better even if it isn't great. For example, + // moving one of the other existing replicas that's on a store with less + // qps than the max threshold but above the mean would help in certain + // locality configurations. + if len(targetVoterRepls) < rebalanceCtx.numDesiredVoters { + log.VEventf(ctx, 3, "couldn't find enough voter rebalance targets for r%d (%d/%d)", + rangeDesc.RangeID, len(targetVoterRepls), rebalanceCtx.numDesiredVoters) + continue + } + if len(targetNonVoterRepls) < rebalanceCtx.numDesiredNonVoters { + log.VEventf(ctx, 3, "couldn't find enough non-voter rebalance targets for r%d (%d/%d)", + rangeDesc.RangeID, len(targetNonVoterRepls), rebalanceCtx.numDesiredNonVoters) + continue + } + + // If the new set of replicas has lower diversity scores than the existing + // set, we don't continue with the rebalance. since we want to ensure we + // don't hurt locality diversity just to improve QPS. + // + // 1. Ensure that diversity among voting replicas is not hurt by this + // rebalancing decision. + if sr.worsensDiversity( + ctx, + rangeDesc.GetRangeID(), + rangeDesc.Replicas().VoterDescriptors(), + targetVoterRepls, + true, /* onlyVoters */ + ) { + continue + } + // 2. Ensure that diversity among all replicas is not hurt by this decision. + allTargetRepls := append(targetVoterRepls, targetNonVoterRepls...) + if sr.worsensDiversity( + ctx, + rangeDesc.GetRangeID(), + rangeDesc.Replicas().Descriptors(), + allTargetRepls, + false, /* onlyVoters */ + ) { + continue + } + + // Pick the voter with the least QPS to be leaseholder; + // RelocateRange transfers the lease to the first provided target. + newLeaseIdx := 0 + newLeaseQPS := math.MaxFloat64 + var raftStatus *raft.Status + for i := 0; i < len(targetVoterRepls); i++ { + // Ensure we don't transfer the lease to an existing replica that is behind + // in processing its raft log. + if replica, ok := rangeDesc.GetReplicaDescriptor(targetVoterRepls[i].StoreID); ok { + if raftStatus == nil { + raftStatus = sr.getRaftStatusFn(replWithStats.repl) + } + if replicaIsBehind(raftStatus, replica.ReplicaID) { + continue + } + } + + storeDesc, ok := storeMap[targetVoterRepls[i].StoreID] + if ok && storeDesc.Capacity.QueriesPerSecond < newLeaseQPS { + newLeaseIdx = i + newLeaseQPS = storeDesc.Capacity.QueriesPerSecond + } + } + targetVoterRepls[0], targetVoterRepls[newLeaseIdx] = targetVoterRepls[newLeaseIdx], targetVoterRepls[0] + return replWithStats, + roachpb.MakeReplicaSet(targetVoterRepls).ReplicationTargets(), + roachpb.MakeReplicaSet(targetNonVoterRepls).ReplicationTargets() + } +} + +// worsensDiversity returns true iff the diversity score of `currentRepls` is +// higher than `targetRepls` (either among just the set of voting replicas, or +// across all replicas in the range -- determined by `onlyVoters`). +func (sr *StoreRebalancer) worsensDiversity( + ctx context.Context, + rangeID roachpb.RangeID, + currentRepls, targetRepls []roachpb.ReplicaDescriptor, + onlyVoters bool, +) bool { + curDiversity := rangeDiversityScore( + sr.rq.allocator.storePool.getLocalitiesByStore(currentRepls), + ) + newDiversity := rangeDiversityScore( + sr.rq.allocator.storePool.getLocalitiesByStore(targetRepls), + ) + replicaStr := "replica" + if onlyVoters { + replicaStr = "voting replica" + } + if curDiversity > newDiversity { + log.VEventf( + ctx, + 3, + "new %s diversity %.2f for r%d worse than current diversity %.2f; not rebalancing", + replicaStr, + newDiversity, + rangeID, + curDiversity, + ) + return true + } + return false +} + +// getRebalanceCandidatesBasedOnQPS returns a list of rebalance targets for +// voting and non-voting replicas on the range that match the relevant +// constraints on the range and would further the goal of balancing the QPS on +// the stores in this cluster. In case there aren't enough stores that meet the +// constraints and are valid rebalance candidates based on QPS, the list of +// targets returned may contain fewer-than-required replicas. +// +// NB: `localStoreDesc` is expected to be the leaseholder of the range being +// operated on. +func (sr *StoreRebalancer) getRebalanceCandidatesBasedOnQPS( + ctx context.Context, + rebalanceCtx deprecatedRebalanceContext, + localStoreDesc *roachpb.StoreDescriptor, + storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + storeList StoreList, + minQPS, maxQPS float64, +) (finalVoterTargets, finalNonVoterTargets []roachpb.ReplicaDescriptor) { + options := sr.scorerOptions() + options.qpsRebalanceThreshold = qpsRebalanceThreshold.Get(&sr.st.SV) + + // Decide which voting / non-voting replicas we want to keep around and find + // rebalance targets for the rest. + partialVoterTargets := sr.pickReplsToKeep( + ctx, + rebalanceCtx, + nil, /* replsToExclude */ + localStoreDesc, + storeMap, + maxQPS, + voterTarget, + ) + finalVoterTargets = sr.pickRemainingRepls( + ctx, + rebalanceCtx, + partialVoterTargets, + nil, /* partialNonVoterTargets */ + storeMap, + storeList, + options, + minQPS, maxQPS, + voterTarget, + ) + + partialNonVoterTargets := sr.pickReplsToKeep( + ctx, + rebalanceCtx, + // NB: `finalVoterTargets` may contain replicas that are part of the + // existing set of non-voter targets, so we make sure that we don't keep + // those replicas around in `partialNonVoterTargets`. + finalVoterTargets, + localStoreDesc, + storeMap, + maxQPS, + nonVoterTarget, + ) + finalNonVoterTargets = sr.pickRemainingRepls( + ctx, + rebalanceCtx, + finalVoterTargets, + partialNonVoterTargets, + storeMap, + storeList, + options, + minQPS, + maxQPS, + nonVoterTarget, + ) + + return finalVoterTargets, finalNonVoterTargets +} + +// pickRemainingRepls determines the set of rebalance targets to fill in the +// rest of `partial{Voter,NonVoter}Targets` such that the resulting set contains +// exactly as many replicas as dictated by the zone configs. +// +// The caller is expected to synthesize the set of +// `partial{Voter,NonVoter}Targets` via `StoreRebalancer.pickReplsToKeep`. +func (sr *StoreRebalancer) pickRemainingRepls( + ctx context.Context, + rebalanceCtx deprecatedRebalanceContext, + partialVoterTargets, partialNonVoterTargets []roachpb.ReplicaDescriptor, + storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + storeList StoreList, + options scorerOptions, + minQPS, maxQPS float64, + targetType targetReplicaType, +) []roachpb.ReplicaDescriptor { + // Alias the slice that corresponds to the set of replicas that is being + // appended to. This is because we want subsequent calls to + // `allocateTargetFromList` to observe the results of previous calls (note the + // append to the slice referenced by `finalTargetsForType`). + var finalTargetsForType *[]roachpb.ReplicaDescriptor + switch targetType { + case voterTarget: + finalTargetsForType = &partialVoterTargets + case nonVoterTarget: + finalTargetsForType = &partialNonVoterTargets + default: + log.Fatalf(ctx, "unknown targetReplicaType: %s", targetType) + } + for len(*finalTargetsForType) < rebalanceCtx.numDesiredReplicas(targetType) { + // Use the preexisting Allocate{Non}Voter logic to ensure that + // considerations such as zone constraints, locality diversity, and full + // disk come into play. + target, _ := sr.rq.allocator.allocateTargetFromList( + ctx, + storeList, + rebalanceCtx.conf, + partialVoterTargets, + partialNonVoterTargets, + options, + // The store rebalancer should never need to perform lateral relocations, + // so we ask the allocator to disregard all the nodes that exist in + // `partial{Non}VoterTargets`. + false, /* allowMultipleReplsPerNode */ + targetType, + ) + if roachpb.Empty(target) { + log.VEventf( + ctx, 3, "no rebalance %ss found to replace the current store for r%d", + targetType, rebalanceCtx.rangeDesc.RangeID, + ) + break + } + + meanQPS := storeList.candidateQueriesPerSecond.mean + if sr.shouldNotMoveTo( + ctx, + storeMap, + rebalanceCtx.replWithStats, + target.StoreID, + meanQPS, + minQPS, + maxQPS, + ) { + // NB: If the target store returned by the allocator is not fit to + // receive a new replica due to balancing reasons, there is no point + // continuing with this loop since we'd expect future calls to + // `allocateTargetFromList` to return the same target. + break + } + + *finalTargetsForType = append(*finalTargetsForType, roachpb.ReplicaDescriptor{ + NodeID: target.NodeID, + StoreID: target.StoreID, + }) + } + return *finalTargetsForType +} + +// pickReplsToKeep determines the set of existing replicas for a range which +// should _not_ be rebalanced (because they belong to stores that aren't +// overloaded). +func (sr *StoreRebalancer) pickReplsToKeep( + ctx context.Context, + rebalanceCtx deprecatedRebalanceContext, + replsToExclude []roachpb.ReplicaDescriptor, + localStoreDesc *roachpb.StoreDescriptor, + storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + maxQPS float64, + targetType targetReplicaType, +) (partialTargetRepls []roachpb.ReplicaDescriptor) { + shouldExclude := func(repl roachpb.ReplicaDescriptor) bool { + for _, excluded := range replsToExclude { + if repl.StoreID == excluded.StoreID { + return true + } + } + return false + } + + var currentReplsForType []roachpb.ReplicaDescriptor + switch targetType { + case voterTarget: + currentReplsForType = rebalanceCtx.rangeDesc.Replicas().VoterDescriptors() + case nonVoterTarget: + currentReplsForType = rebalanceCtx.rangeDesc.Replicas().NonVoterDescriptors() + default: + log.Fatalf(ctx, "unknown targetReplicaType: %s", targetType) + } + + // Check the existing replicas, keeping around those that aren't overloaded. + for i := range currentReplsForType { + if shouldExclude(currentReplsForType[i]) || + currentReplsForType[i].StoreID == localStoreDesc.StoreID { + continue + } + + // Keep the replica in the range if we don't know its QPS or if its QPS is + // below the upper threshold. Punishing stores not in our store map could + // cause mass evictions if the storePool gets out of sync. + storeDesc, ok := storeMap[currentReplsForType[i].StoreID] + if !ok || storeDesc.Capacity.QueriesPerSecond < maxQPS { + if log.V(3) { + var reason redact.RedactableString + if ok { + reason = redact.Sprintf( + " (qps %.2f vs max %.2f)", + storeDesc.Capacity.QueriesPerSecond, + maxQPS, + ) + } + log.VEventf( + ctx, + 3, + "keeping %s r%d/%d on s%d%s", + targetType, + rebalanceCtx.rangeDesc.RangeID, + currentReplsForType[i].ReplicaID, + currentReplsForType[i].StoreID, + reason, + ) + } + + partialTargetRepls = append(partialTargetRepls, currentReplsForType[i]) + } + } + return partialTargetRepls +} + +func shouldNotMoveAway( + ctx context.Context, + replWithStats replicaWithStats, + localDesc *roachpb.StoreDescriptor, + now hlc.ClockTimestamp, + minQPS float64, +) bool { + if !replWithStats.repl.OwnsValidLease(ctx, now) { + log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) + return true + } + if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { + log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", + replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) + return true + } + return false +} + +func (sr *StoreRebalancer) shouldNotMoveTo( + ctx context.Context, + storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + replWithStats replicaWithStats, + candidateStoreID roachpb.StoreID, + meanQPS float64, + minQPS float64, + maxQPS float64, +) bool { + candidateStore, ok := storeMap[candidateStoreID] + if !ok { + log.VEventf(ctx, 3, "missing store descriptor for s%d", candidateStoreID) + return true + } + + newCandidateQPS := candidateStore.Capacity.QueriesPerSecond + replWithStats.qps + if candidateStore.Capacity.QueriesPerSecond < minQPS { + if newCandidateQPS > maxQPS { + log.VEventf(ctx, 3, + "r%d's %.2f qps would push s%d over the max threshold (%.2f) with %.2f qps afterwards", + replWithStats.repl.RangeID, replWithStats.qps, candidateStoreID, maxQPS, newCandidateQPS) + return true + } + } else if newCandidateQPS > meanQPS { + log.VEventf(ctx, 3, + "r%d's %.2f qps would push s%d over the mean (%.2f) with %.2f qps afterwards", + replWithStats.repl.RangeID, replWithStats.qps, candidateStoreID, meanQPS, newCandidateQPS) + return true + } + + // If the target store is on a separate node, we will also care + // about node liveness. + targetNodeID := candidateStore.Node.NodeID + if targetNodeID != sr.rq.store.Ident.NodeID { + if !sr.rq.allocator.storePool.isStoreReadyForRoutineReplicaTransfer(ctx, candidateStore.StoreID) { + log.VEventf(ctx, 3, + "refusing to transfer replica to n%d/s%d", targetNodeID, candidateStore.StoreID) + return true + } + } + return false +} diff --git a/pkg/kv/kvserver/deprecated_store_rebalancer_test.go b/pkg/kv/kvserver/deprecated_store_rebalancer_test.go new file mode 100644 index 000000000000..bb4e1187ff62 --- /dev/null +++ b/pkg/kv/kvserver/deprecated_store_rebalancer_test.go @@ -0,0 +1,692 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3/tracker" +) + +var ( + // deprecatedNoLocalityStores specifies a set of stores where s5 is + // under-utilized in terms of QPS, s2-s4 are in the middle, and s1 is + // over-utilized. + deprecatedNoLocalityStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1500, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1100, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 900, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 500, + }, + }, + } +) + +func TestDeprecatedChooseLeaseToTransfer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + stopper, g, _, a, _ := createTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(context.Background()) + gossiputil.NewStoreGossiper(g).GossipStores(deprecatedNoLocalityStores, t) + storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) + storeMap := storeListToMap(storeList) + + const minQPS = 800 + const maxQPS = 1200 + + localDesc := *deprecatedNoLocalityStores[0] + cfg := TestStoreConfig(nil) + cfg.Gossip = g + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, a) + rr := newReplicaRankings() + + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + + // Rather than trying to populate every Replica with a real raft group in + // order to pass replicaIsBehind checks, fake out the function for getting + // raft status with one that always returns all replicas as up to date. + sr.getRaftStatusFn = func(r *Replica) *raft.Status { + status := &raft.Status{ + Progress: make(map[uint64]tracker.Progress), + } + status.Lead = uint64(r.ReplicaID()) + status.Commit = 1 + for _, replica := range r.Desc().InternalReplicas { + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ + Match: 1, + State: tracker.StateReplicate, + } + } + return status + } + + testCases := []struct { + storeIDs []roachpb.StoreID + qps float64 + expectTarget roachpb.StoreID + }{ + {[]roachpb.StoreID{1}, 100, 0}, + {[]roachpb.StoreID{1, 2}, 100, 0}, + {[]roachpb.StoreID{1, 3}, 100, 0}, + {[]roachpb.StoreID{1, 4}, 100, 4}, + {[]roachpb.StoreID{1, 5}, 100, 5}, + {[]roachpb.StoreID{5, 1}, 100, 0}, + {[]roachpb.StoreID{1, 2}, 200, 0}, + {[]roachpb.StoreID{1, 3}, 200, 0}, + {[]roachpb.StoreID{1, 4}, 200, 0}, + {[]roachpb.StoreID{1, 5}, 200, 5}, + {[]roachpb.StoreID{1, 2}, 500, 0}, + {[]roachpb.StoreID{1, 3}, 500, 0}, + {[]roachpb.StoreID{1, 4}, 500, 0}, + {[]roachpb.StoreID{1, 5}, 500, 5}, + {[]roachpb.StoreID{1, 5}, 600, 5}, + {[]roachpb.StoreID{1, 5}, 700, 5}, + {[]roachpb.StoreID{1, 5}, 800, 0}, + {[]roachpb.StoreID{1, 4}, 1.5, 4}, + {[]roachpb.StoreID{1, 5}, 1.5, 5}, + {[]roachpb.StoreID{1, 4}, 1.49, 0}, + {[]roachpb.StoreID{1, 5}, 1.49, 0}, + } + + for _, tc := range testCases { + loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps}}) + hottestRanges := rr.topQPS() + _, target, _ := sr.deprecatedChooseLeaseToTransfer( + ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS) + if target.StoreID != tc.expectTarget { + t.Errorf("got target store %d for range with replicas %v and %f qps; want %d", + target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget) + } + } +} + +// TestDeprecatedChooseRangeToRebalanceBalanceScore ensures that the (21.2) +// store rebalancer rebalances to the store with the lower range count when +// there are two "equally good" candidate stores in terms of QPS. +func TestDeprecatedChooseRangeToRebalanceBalanceScore(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + stopper, g, _, a, _ := createTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(ctx) + noLocalityStoresWithRangeCounts := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 2000, + RangeCount: 1000, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + RangeCount: 1000, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + RangeCount: 500, + }, + }, + } + gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStoresWithRangeCounts, t) + storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) + storeMap := storeListToMap(storeList) + + const minQPS = 800 + const maxQPS = 1200 + + localDesc := *deprecatedNoLocalityStores[0] + cfg := TestStoreConfig(nil) + cfg.Gossip = g + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, a) + rr := newReplicaRankings() + + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + + // Rather than trying to populate every Replica with a real raft group in + // order to pass replicaIsBehind checks, fake out the function for getting + // raft status with one that always returns all replicas as up to date. + sr.getRaftStatusFn = func(r *Replica) *raft.Status { + status := &raft.Status{ + Progress: make(map[uint64]tracker.Progress), + } + status.Lead = uint64(r.ReplicaID()) + status.Commit = 1 + for _, replica := range r.Desc().InternalReplicas { + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ + Match: 1, + State: tracker.StateReplicate, + } + } + return status + } + + voters := []roachpb.StoreID{1} + // NB: We always expect a rebalance from store 1 to store 4 (instead of store + // 3, even though they have the same QPS), since store 4 has a low range + // count. + expectedRebalancedVoters := []roachpb.StoreID{4} + const qps = float64(50) + s.cfg.DefaultSpanConfig.NumReplicas = int32(len(voters)) + loadRanges(rr, s, []testRange{{voters: voters, qps: qps}}) + hottestRanges := rr.topQPS() + _, voterTargets, _ := sr.deprecatedChooseRangeToRebalance( + ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS, + ) + + require.Len(t, voterTargets, len(expectedRebalancedVoters)) + if len(voterTargets) > 0 && voterTargets[0].StoreID != expectedRebalancedVoters[0] { + t.Errorf("chooseRangeToRebalance(existing=%v, qps=%f) chose s%v as leaseholder; want s%v", + voters, qps, voterTargets[0], expectedRebalancedVoters[0]) + } + + voterStoreIDs := make([]roachpb.StoreID, len(voterTargets)) + for i, target := range voterTargets { + voterStoreIDs[i] = target.StoreID + } + require.ElementsMatch(t, voterStoreIDs, expectedRebalancedVoters) +} + +func TestDeprecatedChooseRangeToRebalance(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + stopper, g, _, a, _ := createTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(context.Background()) + gossiputil.NewStoreGossiper(g).GossipStores(deprecatedNoLocalityStores, t) + storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) + storeMap := storeListToMap(storeList) + + const minQPS = 800 + const maxQPS = 1200 + + localDesc := *deprecatedNoLocalityStores[0] + cfg := TestStoreConfig(nil) + cfg.Gossip = g + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, a) + rr := newReplicaRankings() + + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + + // Rather than trying to populate every Replica with a real raft group in + // order to pass replicaIsBehind checks, fake out the function for getting + // raft status with one that always returns all replicas as up to date. + sr.getRaftStatusFn = func(r *Replica) *raft.Status { + status := &raft.Status{ + Progress: make(map[uint64]tracker.Progress), + } + status.Lead = uint64(r.ReplicaID()) + status.Commit = 1 + for _, replica := range r.Desc().InternalReplicas { + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ + Match: 1, + State: tracker.StateReplicate, + } + } + return status + } + + testCases := []struct { + voters, nonVoters []roachpb.StoreID + // stores that are not to be considered for rebalancing + nonLive []roachpb.StoreID + qps float64 + // the first listed voter target is expected to be the leaseholder + expectedRebalancedVoters, expectedRebalancedNonVoters []roachpb.StoreID + }{ + { + voters: []roachpb.StoreID{1}, + nonVoters: nil, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{5}, + expectedRebalancedNonVoters: nil, + }, + // If s5 is unavailable, s4 is the next best guess. + { + voters: []roachpb.StoreID{1}, + nonVoters: nil, + nonLive: []roachpb.StoreID{5}, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{4}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1}, + nonVoters: nil, + nonLive: []roachpb.StoreID{4, 5}, + qps: 100, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1}, + nonVoters: nil, + nonLive: nil, + qps: 500, + expectedRebalancedVoters: []roachpb.StoreID{5}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1}, + nonVoters: nil, + nonLive: []roachpb.StoreID{5}, + qps: 500, + expectedRebalancedVoters: []roachpb.StoreID{}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1}, + nonVoters: nil, + nonLive: nil, + qps: 800, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1}, + nonVoters: nil, + nonLive: nil, + qps: 1.5, + expectedRebalancedVoters: []roachpb.StoreID{5}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1}, + nonVoters: nil, + nonLive: []roachpb.StoreID{5}, + qps: 1.5, + expectedRebalancedVoters: []roachpb.StoreID{4}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1}, + nonVoters: nil, + nonLive: nil, + qps: 1.49, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 2}, + nonVoters: nil, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{5, 2}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 2}, + nonVoters: nil, + nonLive: []roachpb.StoreID{5}, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{4, 2}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 3}, + nonVoters: nil, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{5, 3}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 4}, + nonVoters: nil, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{5, 4}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 2}, + nonVoters: nil, + nonLive: nil, + qps: 800, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 2}, + nonVoters: nil, + nonLive: nil, + qps: 1.49, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 4, 5}, + nonVoters: nil, + nonLive: nil, + qps: 500, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 4, 5}, + nonVoters: nil, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 3, 5}, + nonVoters: nil, + nonLive: nil, + qps: 500, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 3, 4}, + nonVoters: nil, + nonLive: nil, + qps: 500, + expectedRebalancedVoters: []roachpb.StoreID{5, 4, 3}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 3, 5}, + nonVoters: nil, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{5, 4, 3}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 3, 5}, + nonVoters: nil, + nonLive: []roachpb.StoreID{4}, + qps: 100, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + // Rebalancing to s2 isn't chosen even though it's better than s1 because it's above the mean. + { + voters: []roachpb.StoreID{1, 3, 4, 5}, + nonVoters: nil, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 2, 4, 5}, + nonVoters: nil, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 2, 3, 5}, + nonVoters: nil, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{5, 4, 3, 2}, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1, 2, 3, 4}, + nonVoters: nil, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{5, 4, 3, 2}, + expectedRebalancedNonVoters: nil, + }, + { + // Don't bother moving any replicas around since it won't make much of a + // difference. See `minQPSFraction` inside `chooseRangeToRebalance()`. + voters: []roachpb.StoreID{1}, + nonVoters: []roachpb.StoreID{2, 3, 4}, + nonLive: nil, + qps: 1, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + // None of the stores are worth moving to because they will be above the + // maxQPS after the move. + voters: []roachpb.StoreID{1}, + nonVoters: []roachpb.StoreID{2, 3, 4}, + nonLive: nil, + qps: 1000, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + { + voters: []roachpb.StoreID{1}, + nonVoters: []roachpb.StoreID{2, 3, 4}, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{5}, + expectedRebalancedNonVoters: []roachpb.StoreID{4, 3, 2}, + }, + // Voters may rebalance to stores that have a non-voter, and those + // displaced non-voters will be rebalanced to other valid stores. + { + voters: []roachpb.StoreID{1}, + nonVoters: []roachpb.StoreID{5}, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{5}, + expectedRebalancedNonVoters: []roachpb.StoreID{4}, + }, + { + voters: []roachpb.StoreID{1}, + nonVoters: []roachpb.StoreID{5, 2, 3}, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: []roachpb.StoreID{5}, + expectedRebalancedNonVoters: []roachpb.StoreID{2, 3, 4}, + }, + { + // Voters may rebalance to stores that have a non-voter, but only if the + // displaced non-voters can be rebalanced to other underfull (based on + // QPS) stores. Note that stores 1 and 2 are above the maxQPS and the + // meanQPS, respectively, so non-voters cannot be rebalanced to them. + voters: []roachpb.StoreID{1, 2}, + nonVoters: []roachpb.StoreID{5, 4, 3}, + nonLive: nil, + qps: 100, + expectedRebalancedVoters: nil, + expectedRebalancedNonVoters: nil, + }, + } + + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + a.storePool.isStoreReadyForRoutineReplicaTransfer = func(_ context.Context, storeID roachpb.StoreID) bool { + for _, s := range tc.nonLive { + if s == storeID { + return false + } + } + return true + } + + s.cfg.DefaultSpanConfig.NumVoters = int32(len(tc.voters)) + s.cfg.DefaultSpanConfig.NumReplicas = int32(len(tc.voters) + len(tc.nonVoters)) + loadRanges( + rr, s, []testRange{ + {voters: tc.voters, nonVoters: tc.nonVoters, qps: tc.qps}, + }, + ) + hottestRanges := rr.topQPS() + _, voterTargets, nonVoterTargets := sr.deprecatedChooseRangeToRebalance( + ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS, + ) + + require.Len(t, voterTargets, len(tc.expectedRebalancedVoters)) + if len(voterTargets) > 0 && voterTargets[0].StoreID != tc.expectedRebalancedVoters[0] { + t.Errorf("chooseRangeToRebalance(existing=%v, qps=%f) chose s%d as leaseholder; want s%v", + tc.voters, tc.qps, voterTargets[0], tc.expectedRebalancedVoters[0]) + } + + voterStoreIDs := make([]roachpb.StoreID, len(voterTargets)) + for i, target := range voterTargets { + voterStoreIDs[i] = target.StoreID + } + require.ElementsMatch(t, voterStoreIDs, tc.expectedRebalancedVoters) + + require.Len(t, nonVoterTargets, len(tc.expectedRebalancedNonVoters)) + nonVoterStoreIDs := make([]roachpb.StoreID, len(nonVoterTargets)) + for i, target := range nonVoterTargets { + nonVoterStoreIDs[i] = target.StoreID + } + require.ElementsMatch(t, nonVoterStoreIDs, tc.expectedRebalancedNonVoters) + }) + } +} + +func TestDeprecatedNoLeaseTransferToBehindReplicas(t *testing.T) { + defer leaktest.AfterTest(t)() + log.Scope(t).Close(t) // Lots of setup boilerplate. ctx := context.Background() stopper := + + ctx := context.Background() + stopper, g, _, a, _ := createTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(ctx) + gossiputil.NewStoreGossiper(g).GossipStores(deprecatedNoLocalityStores, t) + storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) + storeMap := storeListToMap(storeList) + + const minQPS = 800 + const maxQPS = 1200 + + localDesc := *deprecatedNoLocalityStores[0] + cfg := TestStoreConfig(nil) + cfg.Gossip = g + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, a) + rr := newReplicaRankings() + + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + + // Load in a range with replicas on an overfull node, a slightly underfull + // node, and a very underfull node. + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100}}) + hottestRanges := rr.topQPS() + repl := hottestRanges[0].repl + + // Set up a fake RaftStatus that indicates s5 is behind (but all other stores + // are caught up). We thus shouldn't transfer a lease to s5. + sr.getRaftStatusFn = func(r *Replica) *raft.Status { + status := &raft.Status{ + Progress: make(map[uint64]tracker.Progress), + } + status.Lead = uint64(r.ReplicaID()) + status.Commit = 1 + for _, replica := range r.Desc().InternalReplicas { + match := uint64(1) + if replica.StoreID == roachpb.StoreID(5) { + match = 0 + } + status.Progress[uint64(replica.ReplicaID)] = tracker.Progress{ + Match: match, + State: tracker.StateReplicate, + } + } + return status + } + + _, target, _ := sr.deprecatedChooseLeaseToTransfer( + ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS, + ) + expectTarget := roachpb.StoreID(4) + if target.StoreID != expectTarget { + t.Errorf("got target store s%d for range with RaftStatus %v; want s%d", + target.StoreID, sr.getRaftStatusFn(repl), expectTarget) + } + + // Then do the same, but for replica rebalancing. Make s5 an existing replica + // that's behind, and see how a new replica is preferred as the leaseholder + // over it. + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100}}) + hottestRanges = rr.topQPS() + repl = hottestRanges[0].repl + + _, targets, _ := sr.deprecatedChooseRangeToRebalance( + ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS) + expectTargets := []roachpb.ReplicationTarget{ + {NodeID: 4, StoreID: 4}, {NodeID: 5, StoreID: 5}, {NodeID: 3, StoreID: 3}, + } + if !reflect.DeepEqual(targets, expectTargets) { + t.Errorf("got targets %v for range with RaftStatus %v; want %v", + targets, sr.getRaftStatusFn(repl), expectTargets) + } +} diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 338df82da75d..3eb052183b6e 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -16,6 +16,7 @@ import ( "math/rand" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -97,6 +98,12 @@ var qpsRebalanceThreshold = func() *settings.FloatSetting { "minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull", 0.25, settings.NonNegativeFloat, + func(f float64) error { + if f < 0.01 { + return errors.Errorf("cannot set kv.allocator.qps_rebalance_threshold to less than 0.01") + } + return nil + }, ) s.SetVisibility(settings.Public) return s @@ -307,6 +314,7 @@ func (sr *StoreRebalancer) rebalanceStore( localDesc, allStoresList, storeMap, + sr.scorerOptions(), ) replicasToMaybeRebalance = append(replicasToMaybeRebalance, considerForRebalance...) if replWithStats.repl == nil { @@ -432,9 +440,28 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( ctx context.Context, hottestRanges *[]replicaWithStats, localDesc *roachpb.StoreDescriptor, - storeList StoreList, + allStoresList StoreList, storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + options *qpsScorerOptions, ) (replicaWithStats, roachpb.ReplicaDescriptor, []replicaWithStats) { + // NB: Don't switch over to the new locality-aware lease transfer scheme until + // the cluster version is finalized. + if !sr.st.Version.IsActive(ctx, clusterversion.EnableNewStoreRebalancer) { + log.Infof( + ctx, "cluster version has not been finalized; using pre-22.1 load-based lease transfer scheme", + ) + + // We manually compute the cluster level under/over-fullness thresholds + // since the deprecated rebalance logic doesn't care about equivalence + // classes. + qpsMinThreshold := underfullQPSThreshold(options, allStoresList.candidateQueriesPerSecond.mean) + qpsMaxThreshold := overfullQPSThreshold(options, allStoresList.candidateQueriesPerSecond.mean) + return sr.deprecatedChooseLeaseToTransfer( + ctx, hottestRanges, localDesc, allStoresList, + storeMap, qpsMinThreshold, qpsMaxThreshold, + ) + } + var considerForRebalance []replicaWithStats now := sr.rq.store.Clock().NowAsClockTimestamp() for { @@ -503,8 +530,8 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( continue } - filteredStoreList := storeList.excludeInvalid(conf.Constraints) - filteredStoreList = storeList.excludeInvalid(conf.VoterConstraints) + filteredStoreList := allStoresList.excludeInvalid(conf.Constraints) + filteredStoreList = allStoresList.excludeInvalid(conf.VoterConstraints) if sr.rq.allocator.followTheWorkloadPrefersLocal( ctx, filteredStoreList, @@ -553,6 +580,28 @@ func (sr *StoreRebalancer) chooseRangeToRebalance( allStoresList StoreList, options *qpsScorerOptions, ) (replWithStats replicaWithStats, voterTargets, nonVoterTargets []roachpb.ReplicationTarget) { + // NB: Don't switch over to the locality aware rebalancer until the cluster + // version is finalized. + if !sr.st.Version.IsActive(ctx, clusterversion.EnableNewStoreRebalancer) { + log.Infof( + ctx, "cluster version has not been finalized; using pre-22.1 load-based rebalancing scheme", + ) + + // We manually compute the cluster level under/over-fullness thresholds + // since the deprecated rebalance logic doesn't care about equivalence + // classes. + qpsMinThreshold := underfullQPSThreshold( + options, allStoresList.candidateQueriesPerSecond.mean, + ) + qpsMaxThreshold := overfullQPSThreshold( + options, allStoresList.candidateQueriesPerSecond.mean, + ) + return sr.deprecatedChooseRangeToRebalance( + ctx, hottestRanges, localDesc, allStoresList, + storeListToMap(allStoresList), qpsMinThreshold, qpsMaxThreshold, + ) + } + now := sr.rq.store.Clock().NowAsClockTimestamp() for { if len(*hottestRanges) == 0 { diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 1f6e4e5f37da..1d26b5c0c08c 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -487,7 +487,14 @@ func TestChooseLeaseToTransfer(t *testing.T) { t.Run("", func(t *testing.T) { loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps}}) hottestRanges := rr.topQPS() - _, target, _ := sr.chooseLeaseToTransfer(ctx, &hottestRanges, &localDesc, storeList, storeMap) + _, target, _ := sr.chooseLeaseToTransfer( + ctx, + &hottestRanges, + &localDesc, + storeList, + storeMap, + nil, /* qpsScorerOptions */ + ) if target.StoreID != tc.expectTarget { t.Errorf("got target store %d for range with replicas %v and %f qps; want %d", target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget) @@ -1191,7 +1198,14 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { return status } - _, target, _ := sr.chooseLeaseToTransfer(ctx, &hottestRanges, &localDesc, storeList, storeMap) + _, target, _ := sr.chooseLeaseToTransfer( + ctx, + &hottestRanges, + &localDesc, + storeList, + storeMap, + nil, /* qpsScorerOptions */ + ) expectTarget := roachpb.StoreID(4) if target.StoreID != expectTarget { t.Errorf("got target store s%d for range with RaftStatus %v; want s%d",