diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 10fa5d7b8d44..f54d27c27dd0 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -18,8 +18,8 @@
kv.allocator.lease_rebalancing_aggressiveness | float | 1 | set greater than 1.0 to rebalance leases toward load more aggressively, or between 0 and 1.0 to be more conservative about rebalancing leases |
kv.allocator.load_based_lease_rebalancing.enabled | boolean | true | set to enable rebalancing of range leases based on load and latency |
kv.allocator.range_rebalance_threshold | float | 0.05 | minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull |
-kv.allocator.stat_based_rebalancing.enabled | boolean | false | set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster |
-kv.allocator.stat_rebalance_threshold | float | 0.2 | minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull |
+kv.allocator.stat_based_rebalancing.enabled | boolean | true | set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster |
+kv.allocator.stat_rebalance_threshold | float | 0.2 | minimum fraction away from the mean a store's stats (such as queries per second) can be before it is considered overfull or underfull |
kv.bulk_io_write.concurrent_export_requests | integer | 5 | number of export requests a store will handle concurrently before queuing |
kv.bulk_io_write.concurrent_import_requests | integer | 1 | number of import requests a store will handle concurrently before queuing |
kv.bulk_io_write.max_rate | byte size | 8.0 EiB | the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops |
diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go
index 25ab24cef571..500084c9e6dd 100644
--- a/pkg/storage/allocator.go
+++ b/pkg/storage/allocator.go
@@ -361,9 +361,36 @@ func (a *Allocator) AllocateTarget(
) (*roachpb.StoreDescriptor, string, error) {
sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeInfo.Desc.RangeID, storeFilterThrottled)
+ target, details := a.allocateTargetFromList(
+ ctx, sl, zone, existing, rangeInfo, a.scorerOptions(disableStatsBasedRebalancing))
+
+ if target != nil {
+ return target, details, nil
+ }
+
+ // When there are throttled stores that do match, we shouldn't send
+ // the replica to purgatory.
+ if throttledStoreCount > 0 {
+ return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
+ }
+ return nil, "", &allocatorError{
+ constraints: zone.Constraints,
+ existingReplicas: len(existing),
+ aliveStores: aliveStoreCount,
+ throttledStores: throttledStoreCount,
+ }
+}
+
+func (a *Allocator) allocateTargetFromList(
+ ctx context.Context,
+ sl StoreList,
+ zone config.ZoneConfig,
+ existing []roachpb.ReplicaDescriptor,
+ rangeInfo RangeInfo,
+ options scorerOptions,
+) (*roachpb.StoreDescriptor, string) {
analyzedConstraints := analyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existing, zone)
- options := a.scorerOptions(disableStatsBasedRebalancing)
candidates := allocateCandidates(
sl, analyzedConstraints, existing, rangeInfo, a.storePool.getLocalities(existing), options,
)
@@ -379,20 +406,10 @@ func (a *Allocator) AllocateTarget(
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing allocate target: %s", err)
}
- return &target.store, string(detailsBytes), nil
+ return &target.store, string(detailsBytes)
}
- // When there are throttled stores that do match, we shouldn't send
- // the replica to purgatory.
- if throttledStoreCount > 0 {
- return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
- }
- return nil, "", &allocatorError{
- constraints: zone.Constraints,
- existingReplicas: len(existing),
- aliveStores: aliveStoreCount,
- throttledStores: throttledStoreCount,
- }
+ return nil, ""
}
func (a Allocator) simulateRemoveTarget(
diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go
index 3a9155692bf4..289c4a00297c 100644
--- a/pkg/storage/allocator_scorer.go
+++ b/pkg/storage/allocator_scorer.go
@@ -60,7 +60,7 @@ const (
var EnableStatsBasedRebalancing = settings.RegisterBoolSetting(
"kv.allocator.stat_based_rebalancing.enabled",
"set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster",
- false, // TODO(a-robinson): switch to true for v2.1 once the store-rebalancer is done
+ true,
)
// rangeRebalanceThreshold is the minimum ratio of a store's range count to
@@ -85,12 +85,13 @@ var rangeRebalanceThreshold = settings.RegisterNonNegativeFloatSetting(
// TODO(a-robinson): Should disk usage be held to a higher standard than this?
var statRebalanceThreshold = settings.RegisterNonNegativeFloatSetting(
"kv.allocator.stat_rebalance_threshold",
- "minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull",
+ "minimum fraction away from the mean a store's stats (such as queries per second) can be before it is considered overfull or underfull",
0.20,
)
type scorerOptions struct {
deterministic bool
+ balanceQPSInsteadOfCount bool
statsBasedRebalancingEnabled bool
rangeRebalanceThreshold float64
statRebalanceThreshold float64
@@ -442,11 +443,22 @@ func allocateCandidates(
}
diversityScore := diversityAllocateScore(s, existingNodeLocalities)
balanceScore := balanceScore(sl, s.Capacity, rangeInfo, options)
+ var convergesScore int
+ if options.balanceQPSInsteadOfCount {
+ if s.Capacity.QueriesPerSecond < underfullStatThreshold(options, sl.candidateQueriesPerSecond.mean) {
+ convergesScore = 1
+ } else if s.Capacity.QueriesPerSecond < sl.candidateQueriesPerSecond.mean {
+ convergesScore = 0
+ } else {
+ convergesScore = -1
+ }
+ }
candidates = append(candidates, candidate{
store: s,
valid: constraintsOK,
necessary: necessary,
diversityScore: diversityScore,
+ convergesScore: convergesScore,
balanceScore: balanceScore,
rangeCount: int(s.Capacity.RangeCount),
})
diff --git a/pkg/storage/replica_rankings.go b/pkg/storage/replica_rankings.go
index 13b4a99f582d..3810fd9c943c 100644
--- a/pkg/storage/replica_rankings.go
+++ b/pkg/storage/replica_rankings.go
@@ -28,6 +28,7 @@ const (
type replicaWithStats struct {
repl *Replica
qps float64
+ // TODO(a-robinson): Include writes-per-second and logicalBytes of storage?
}
// replicaRankings maintains top-k orderings of the replicas in a store along
diff --git a/pkg/storage/store.go b/pkg/storage/store.go
index ae1df3ead04f..1311b032ba74 100644
--- a/pkg/storage/store.go
+++ b/pkg/storage/store.go
@@ -1546,7 +1546,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.startLeaseRenewer(ctx)
}
- s.storeRebalancer.Start(ctx, s.stopper, s.StoreID())
+ s.storeRebalancer.Start(ctx, s.stopper)
// Start the storage engine compactor.
if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) {
diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go
index 5be226f33073..1f9afe958dfa 100644
--- a/pkg/storage/store_rebalancer.go
+++ b/pkg/storage/store_rebalancer.go
@@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
)
@@ -87,9 +88,7 @@ func NewStoreRebalancer(
//
// TODO(a-robinson): Expose metrics to make this understandable without having
// to dive into logspy.
-func (sr *StoreRebalancer) Start(
- ctx context.Context, stopper *stop.Stopper, storeID roachpb.StoreID,
-) {
+func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) {
ctx = sr.AnnotateCtx(ctx)
// Start a goroutine that watches and proactively renews certain
@@ -111,20 +110,13 @@ func (sr *StoreRebalancer) Start(
continue
}
- localDesc, found := sr.rq.allocator.storePool.getStoreDescriptor(storeID)
- if !found {
- log.Warningf(ctx, "StorePool missing descriptor for local store")
- continue
- }
storeList, _, _ := sr.rq.allocator.storePool.getStoreList(roachpb.RangeID(0), storeFilterNone)
- sr.rebalanceStore(ctx, localDesc, storeList)
+ sr.rebalanceStore(ctx, storeList)
}
})
}
-func (sr *StoreRebalancer) rebalanceStore(
- ctx context.Context, localDesc roachpb.StoreDescriptor, storeList StoreList,
-) {
+func (sr *StoreRebalancer) rebalanceStore(ctx context.Context, storeList StoreList) {
statThreshold := statRebalanceThreshold.Get(&sr.st.SV)
@@ -134,12 +126,24 @@ func (sr *StoreRebalancer) rebalanceStore(
qpsMaxThreshold := math.Max(storeList.candidateQueriesPerSecond.mean*(1+statThreshold),
storeList.candidateQueriesPerSecond.mean+minQPSThresholdDifference)
+ var localDesc *roachpb.StoreDescriptor
+ for i := range storeList.stores {
+ if storeList.stores[i].StoreID == sr.rq.store.StoreID() {
+ localDesc = &storeList.stores[i]
+ }
+ }
+ if localDesc == nil {
+ log.Warningf(ctx, "StorePool missing descriptor for local store")
+ return
+ }
+
if !(localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold) {
log.VEventf(ctx, 1, "local QPS %.2f is below max threshold %.2f (mean=%.2f); no rebalancing needed",
localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold, storeList.candidateQueriesPerSecond.mean)
return
}
+ var replicasToMaybeRebalance []replicaWithStats
storeMap := storeListToMap(storeList)
sysCfg, cfgOk := sr.rq.allocator.storePool.gossip.GetSystemConfig()
if !cfgOk {
@@ -147,29 +151,39 @@ func (sr *StoreRebalancer) rebalanceStore(
return
}
- log.Infof(ctx, "considering load-based lease transfers for s%d with %.2f qps (mean=%.2f, upperThreshold=%.2f)",
+ log.Infof(ctx,
+ "considering load-based lease transfers for s%d with %.2f qps (mean=%.2f, upperThreshold=%.2f)",
localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold)
hottestRanges := sr.replRankings.topQPS()
for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold {
- replWithStats, target := sr.chooseLeaseToTransfer(
+ replWithStats, target, considerForRebalance := sr.chooseLeaseToTransfer(
ctx, sysCfg, &hottestRanges, localDesc, storeList, storeMap, qpsMinThreshold, qpsMaxThreshold)
+ replicasToMaybeRebalance = append(replicasToMaybeRebalance, considerForRebalance...)
if replWithStats.repl == nil {
log.Infof(ctx,
- "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f)",
+ "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f); considering load-based replica rebalances",
localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold)
break
}
+
log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load",
replWithStats.repl.RangeID, replWithStats.qps, target.StoreID)
- replCtx := replWithStats.repl.AnnotateCtx(ctx)
+ replCtx, cancel := context.WithTimeout(replWithStats.repl.AnnotateCtx(ctx), sr.rq.processTimeout)
if err := sr.rq.transferLease(replCtx, replWithStats.repl, target); err != nil {
+ cancel()
log.Errorf(replCtx, "unable to transfer lease to s%d: %v", target.StoreID, err)
continue
}
+ cancel()
+
// Finally, update our local copies of the descriptors so that if
// additional transfers are needed we'll be making the decisions with more
// up-to-date info.
+ //
+ // TODO(a-robinson): This just updates the copies used locally by the
+ // storeRebalancer. We may also want to update the copies in the StorePool
+ // itself.
localDesc.Capacity.LeaseCount--
localDesc.Capacity.QueriesPerSecond -= replWithStats.qps
if otherDesc := storeMap[target.StoreID]; otherDesc != nil {
@@ -177,6 +191,76 @@ func (sr *StoreRebalancer) rebalanceStore(
otherDesc.Capacity.QueriesPerSecond += replWithStats.qps
}
}
+
+ if !(localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold) {
+ log.Infof(ctx,
+ "load-based lease transfers successfully brought s%d down to %.2f qps (mean=%.2f, upperThreshold=%.2f)",
+ localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold)
+ return
+ }
+
+ // Re-combine replicasToMaybeRebalance with what remains of hottestRanges so
+ // that we'll reconsider them for replica rebalancing.
+ replicasToMaybeRebalance = append(replicasToMaybeRebalance, hottestRanges...)
+
+ for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold {
+ replWithStats, targets := sr.chooseReplicaToRebalance(
+ ctx,
+ sysCfg,
+ &replicasToMaybeRebalance,
+ localDesc,
+ storeList,
+ storeMap,
+ qpsMinThreshold,
+ qpsMaxThreshold)
+ if replWithStats.repl == nil {
+ log.Infof(ctx,
+ "ran out of replicas worth transferring and qps (%.2f) is still above desired threshold (%.2f); will check again soon",
+ localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold)
+ return
+ }
+
+ descBeforeRebalance := replWithStats.repl.Desc()
+ log.VEventf(ctx, 1, "rebalancing r%d (%.2f qps) from %v to %v to better balance load",
+ replWithStats.repl.RangeID, replWithStats.qps, descBeforeRebalance.Replicas, targets)
+ replCtx, cancel := context.WithTimeout(replWithStats.repl.AnnotateCtx(ctx), sr.rq.processTimeout)
+ // TODO: Either make RelocateRange production-ready or do the rebalancing
+ // another way.
+ if err := RelocateRange(replCtx, sr.rq.store.DB(), *descBeforeRebalance, targets); err != nil {
+ cancel()
+ log.Errorf(replCtx, "unable to relocate range to %v: %v", targets, err)
+ continue
+ }
+ cancel()
+
+ // Finally, update our local copies of the descriptors so that if
+ // additional transfers are needed we'll be making the decisions with more
+ // up-to-date info.
+ //
+ // TODO(a-robinson): This just updates the copies used locally by the
+ // storeRebalancer. We may also want to update the copies in the StorePool
+ // itself.
+ for i := range descBeforeRebalance.Replicas {
+ if storeDesc := storeMap[descBeforeRebalance.Replicas[i].StoreID]; storeDesc != nil {
+ storeDesc.Capacity.RangeCount--
+ }
+ }
+ localDesc.Capacity.LeaseCount--
+ localDesc.Capacity.QueriesPerSecond -= replWithStats.qps
+ for i := range targets {
+ if storeDesc := storeMap[targets[i].StoreID]; storeDesc != nil {
+ storeDesc.Capacity.RangeCount++
+ if i == 0 {
+ storeDesc.Capacity.LeaseCount++
+ storeDesc.Capacity.QueriesPerSecond += replWithStats.qps
+ }
+ }
+ }
+ }
+
+ log.Infof(ctx,
+ "load-based replica transfers successfully brought s%d down to %.2f qps (mean=%.2f, upperThreshold=%.2f)",
+ localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold)
}
// TODO(a-robinson): Should we take the number of leases on each store into
@@ -185,33 +269,27 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
ctx context.Context,
sysCfg config.SystemConfig,
hottestRanges *[]replicaWithStats,
- localDesc roachpb.StoreDescriptor,
+ localDesc *roachpb.StoreDescriptor,
storeList StoreList,
storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor,
minQPS float64,
maxQPS float64,
-) (replicaWithStats, roachpb.ReplicaDescriptor) {
+) (replicaWithStats, roachpb.ReplicaDescriptor, []replicaWithStats) {
+ var considerForRebalance []replicaWithStats
now := sr.rq.store.Clock().Now()
for {
if len(*hottestRanges) == 0 {
- return replicaWithStats{}, roachpb.ReplicaDescriptor{}
+ return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance
}
replWithStats := (*hottestRanges)[0]
*hottestRanges = (*hottestRanges)[1:]
// We're all out of replicas.
if replWithStats.repl == nil {
- return replicaWithStats{}, roachpb.ReplicaDescriptor{}
- }
-
- if !replWithStats.repl.OwnsValidLease(now) {
- log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID)
- continue
+ return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance
}
- if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS {
- log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)",
- replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS)
+ if shouldNotMoveAway(ctx, replWithStats, localDesc, now, minQPS) {
continue
}
@@ -228,7 +306,8 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
}
desc := replWithStats.repl.Desc()
- log.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps", desc.RangeID, replWithStats.qps)
+ log.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps",
+ desc.RangeID, replWithStats.qps)
// Check all the other replicas in order of increasing qps.
replicas := make([]roachpb.ReplicaDescriptor, len(desc.Replicas))
@@ -273,7 +352,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey)
if err != nil {
log.Error(ctx, err)
- return replicaWithStats{}, roachpb.ReplicaDescriptor{}
+ return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance
}
preferred := sr.rq.allocator.preferredLeaseholders(zone, desc.Replicas)
if len(preferred) > 0 && !storeHasReplica(candidate.StoreID, preferred) {
@@ -285,7 +364,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
if sr.rq.allocator.followTheWorkloadPrefersLocal(
ctx,
filteredStoreList,
- localDesc,
+ *localDesc,
candidate.StoreID,
desc.Replicas,
replWithStats.repl.leaseholderStats,
@@ -295,11 +374,181 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
continue
}
- return replWithStats, candidate
+ return replWithStats, candidate, considerForRebalance
+ }
+
+ // If none of the other replicas are valid lease transfer targets, consider
+ // this range for replica rebalancing.
+ considerForRebalance = append(considerForRebalance, replWithStats)
+ }
+}
+
+func (sr *StoreRebalancer) chooseReplicaToRebalance(
+ ctx context.Context,
+ sysCfg config.SystemConfig,
+ hottestRanges *[]replicaWithStats,
+ localDesc *roachpb.StoreDescriptor,
+ storeList StoreList,
+ storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor,
+ minQPS float64,
+ maxQPS float64,
+) (replicaWithStats, []roachpb.ReplicationTarget) {
+ now := sr.rq.store.Clock().Now()
+ for {
+ if len(*hottestRanges) == 0 {
+ return replicaWithStats{}, nil
+ }
+ replWithStats := (*hottestRanges)[0]
+ *hottestRanges = (*hottestRanges)[1:]
+
+ if replWithStats.repl == nil {
+ return replicaWithStats{}, nil
+ }
+
+ if shouldNotMoveAway(ctx, replWithStats, localDesc, now, minQPS) {
+ continue
+ }
+
+ // Don't bother moving ranges whose QPS is below some small fraction of the
+ // store's QPS (unless the store has extra ranges to spare anyway). It's
+ // just unnecessary churn with no benefit to move ranges responsible for,
+ // for example, 1 qps on a store with 5000 qps.
+ const minQPSFraction = .001
+ if replWithStats.qps < localDesc.Capacity.QueriesPerSecond*minQPSFraction &&
+ float64(localDesc.Capacity.RangeCount) <= storeList.candidateRanges.mean {
+ log.VEventf(ctx, 5, "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps",
+ replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, localDesc.Capacity.QueriesPerSecond)
+ continue
+ }
+
+ desc := replWithStats.repl.Desc()
+ log.VEventf(ctx, 3, "considering replica rebalance for r%d with %.2f qps",
+ desc.RangeID, replWithStats.qps)
+
+ // Pick out the stores that we want the range on, keeping existing replicas
+ // around if they aren't on overfull stores.
+ zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey)
+ if err != nil {
+ log.Error(ctx, err)
+ return replicaWithStats{}, nil
+ }
+ desiredReplicas := int(zone.NumReplicas)
+ targets := make([]roachpb.ReplicationTarget, 0, desiredReplicas)
+ targetReplicas := make([]roachpb.ReplicaDescriptor, 0, desiredReplicas)
+
+ // Check the range's existing diversity score, since we want to ensure we
+ // don't hurt locality diversity just to improve QPS.
+ curDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalities(desc.Replicas))
+
+ // Check the existing replicas, keeping around those that aren't overloaded.
+ for i := range desc.Replicas {
+ if desc.Replicas[i].StoreID == localDesc.StoreID {
+ continue
+ }
+ // Keep the replica in the range if we don't know its QPS or if its QPS
+ // is below the upper threshold. Punishing stores not in our store map
+ // could cause mass evictions if the storePool gets out of sync.
+ storeDesc, ok := storeMap[desc.Replicas[i].StoreID]
+ if !ok || storeDesc.Capacity.QueriesPerSecond < maxQPS {
+ targets = append(targets, roachpb.ReplicationTarget{
+ NodeID: desc.Replicas[i].NodeID,
+ StoreID: desc.Replicas[i].StoreID,
+ })
+ targetReplicas = append(targetReplicas, roachpb.ReplicaDescriptor{
+ NodeID: desc.Replicas[i].NodeID,
+ StoreID: desc.Replicas[i].StoreID,
+ })
+ }
+ }
+
+ // Then pick out which new stores to add the remaining replicas to.
+ for len(targets) < desiredReplicas {
+ // Use the preexisting AllocateTarget logic to ensure that considerations
+ // such as zone constraints, locality diversity, and full disk come
+ // into play.
+ rangeInfo := rangeInfoForRepl(replWithStats.repl, desc)
+ options := sr.rq.allocator.scorerOptions(false /* disableStatsBasedRebalancing */)
+ options.balanceQPSInsteadOfCount = true
+ target, _ := sr.rq.allocator.allocateTargetFromList(
+ ctx,
+ storeList,
+ zone,
+ targetReplicas,
+ rangeInfo,
+ options,
+ )
+ if target == nil {
+ log.VEventf(ctx, 3, "no rebalance targets found to replace the current store for r%d",
+ desc.RangeID)
+ break
+ }
+
+ targets = append(targets, roachpb.ReplicationTarget{
+ NodeID: target.Node.NodeID,
+ StoreID: target.StoreID,
+ })
+ targetReplicas = append(targetReplicas, roachpb.ReplicaDescriptor{
+ NodeID: target.Node.NodeID,
+ StoreID: target.StoreID,
+ })
+ }
+
+ // If we couldn't find enough valid targets, forget about this range.
+ //
+ // TODO(a-robinson): Support more incremental improvements -- move what we
+ // can if it makes things better even if it isn't great. For example,
+ // moving one of the other existing replicas that's on a store with less
+ // qps than the max threshold but above the mean would help in certain
+ // locality configurations.
+ if len(targets) < desiredReplicas {
+ log.VEventf(ctx, 3, "couldn't find enough rebalance targets for r%d (%d/%d)",
+ desc.RangeID, len(targets), desiredReplicas)
+ continue
+ }
+ newDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalities(targetReplicas))
+ if newDiversity < curDiversity {
+ log.VEventf(ctx, 3,
+ "new diversity %.2f for r%d worse than current diversity %.2f; not rebalancing",
+ newDiversity, desc.RangeID, curDiversity)
+ continue
+ }
+
+ // Pick the replica with the least QPS to be leaseholder;
+ // TestingRelocateRange transfers the lease to the first provided
+ // target.
+ newLeaseIdx := 0
+ newLeaseQPS := math.MaxFloat64
+ for i := 0; i < len(targets); i++ {
+ storeDesc, ok := storeMap[desc.Replicas[i].StoreID]
+ if ok && storeDesc.Capacity.QueriesPerSecond < newLeaseQPS {
+ newLeaseIdx = i
+ newLeaseQPS = storeDesc.Capacity.QueriesPerSecond
+ }
}
+ targets[0], targets[newLeaseIdx] = targets[newLeaseIdx], targets[0]
+ return replWithStats, targets
}
}
+func shouldNotMoveAway(
+ ctx context.Context,
+ replWithStats replicaWithStats,
+ localDesc *roachpb.StoreDescriptor,
+ now hlc.Timestamp,
+ minQPS float64,
+) bool {
+ if !replWithStats.repl.OwnsValidLease(now) {
+ log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID)
+ return true
+ }
+ if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS {
+ log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)",
+ replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS)
+ return true
+ }
+ return false
+}
+
func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor {
storeMap := make(map[roachpb.StoreID]*roachpb.StoreDescriptor)
for i := range sl.stores {
@@ -307,3 +556,12 @@ func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor {
}
return storeMap
}
+
+func existingTarget(targets []roachpb.ReplicationTarget, candidateID roachpb.StoreID) bool {
+ for _, target := range targets {
+ if candidateID == target.StoreID {
+ return true
+ }
+ }
+ return false
+}
diff --git a/pkg/storage/store_rebalancer_test.go b/pkg/storage/store_rebalancer_test.go
index bec59923cf9d..c631923c72c4 100644
--- a/pkg/storage/store_rebalancer_test.go
+++ b/pkg/storage/store_rebalancer_test.go
@@ -155,8 +155,8 @@ func TestChooseLeaseToTransfer(t *testing.T) {
for _, tc := range testCases {
loadRanges(rr, s, []testRange{{storeIDs: tc.storeIDs, qps: tc.qps}})
hottestRanges := rr.topQPS()
- _, target := sr.chooseLeaseToTransfer(
- ctx, config.SystemConfig{}, &hottestRanges, localDesc, storeList, storeMap, minQPS, maxQPS)
+ _, target, _ := sr.chooseLeaseToTransfer(
+ ctx, config.SystemConfig{}, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS)
if target.StoreID != tc.expectTarget {
t.Errorf("got target store %d for range with replicas %v and %f qps; want %d",
target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget)