diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 3e45f4d21c69..2e22496b8a8e 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -18,7 +18,7 @@
kv.allocator.lease_rebalancing_aggressiveness | float | 1 | set greater than 1.0 to rebalance leases toward load more aggressively, or between 0 and 1.0 to be more conservative about rebalancing leases |
kv.allocator.load_based_lease_rebalancing.enabled | boolean | true | set to enable rebalancing of range leases based on load and latency |
kv.allocator.range_rebalance_threshold | float | 0.05 | minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull |
-kv.allocator.stat_based_rebalancing.enabled | boolean | false | set to enable rebalancing of range replicas based on write load and disk usage |
+kv.allocator.stat_based_rebalancing.enabled | boolean | true | set to enable rebalancing of range replicas based on write load and disk usage |
kv.allocator.stat_rebalance_threshold | float | 0.2 | minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull |
kv.bulk_io_write.concurrent_export_requests | integer | 5 | number of export requests a store will handle concurrently before queuing |
kv.bulk_io_write.concurrent_import_requests | integer | 1 | number of import requests a store will handle concurrently before queuing |
diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go
index 55e3c37ec69f..bca7c5303645 100644
--- a/pkg/storage/allocator.go
+++ b/pkg/storage/allocator.go
@@ -34,10 +34,16 @@ import (
)
const (
- // baseLeaseRebalanceThreshold is the minimum ratio of a store's lease surplus
+ // leaseRebalanceThreshold is the minimum ratio of a store's lease surplus
// to the mean range/lease count that permits lease-transfers away from that
// store.
- baseLeaseRebalanceThreshold = 0.05
+ leaseRebalanceThreshold = 0.05
+
+ // baseLoadBasedLeaseRebalanceThreshold is the equivalent of
+ // leaseRebalanceThreshold for load-based lease rebalance decisions (i.e.
+ // "follow-the-workload"). Its the base threshold for decisions that gets
+ // adjusted based on the load and latency of the involved ranges/nodes.
+ baseLoadBasedLeaseRebalanceThreshold = 2 * leaseRebalanceThreshold
// minReplicaWeight sets a floor for how low a replica weight can be. This is
// needed because a weight of zero doesn't work in the current lease scoring
@@ -714,7 +720,7 @@ func (a *Allocator) TransferLeaseTarget(
// whether we actually should be transferring the lease. The transfer
// decision is only needed if we've been asked to check the source.
transferDec, repl := a.shouldTransferLeaseUsingStats(
- ctx, sl, source, existing, stats,
+ ctx, sl, source, existing, stats, nil,
)
if checkTransferLeaseSource {
switch transferDec {
@@ -814,7 +820,7 @@ func (a *Allocator) ShouldTransferLease(
return false
}
- transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats)
+ transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats, nil)
var result bool
switch transferDec {
case shouldNotTransfer:
@@ -831,12 +837,36 @@ func (a *Allocator) ShouldTransferLease(
return result
}
+func (a Allocator) followTheWorkloadPrefersLocal(
+ ctx context.Context,
+ sl StoreList,
+ source roachpb.StoreDescriptor,
+ candidate roachpb.StoreID,
+ existing []roachpb.ReplicaDescriptor,
+ stats *replicaStats,
+) bool {
+ adjustments := make(map[roachpb.StoreID]float64)
+ decision, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats, adjustments)
+ if decision == decideWithoutStats {
+ return false
+ }
+ adjustment := adjustments[candidate]
+ if adjustment > baseLoadBasedLeaseRebalanceThreshold {
+ log.VEventf(ctx, 3,
+ "s%d is a better fit than s%d due to follow-the-workload (score: %.2f; threshold: %.2f)",
+ source.StoreID, candidate, adjustment, baseLoadBasedLeaseRebalanceThreshold)
+ return true
+ }
+ return false
+}
+
func (a Allocator) shouldTransferLeaseUsingStats(
ctx context.Context,
sl StoreList,
source roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
stats *replicaStats,
+ rebalanceAdjustments map[roachpb.StoreID]float64,
) (transferDecision, roachpb.ReplicaDescriptor) {
// Only use load-based rebalancing if it's enabled and we have both
// stats and locality information to base our decision on.
@@ -903,7 +933,7 @@ func (a Allocator) shouldTransferLeaseUsingStats(
}
addr, err := a.storePool.gossip.GetNodeIDAddress(repl.NodeID)
if err != nil {
- log.Errorf(ctx, "missing address for node %d: %s", repl.NodeID, err)
+ log.Errorf(ctx, "missing address for n%d: %s", repl.NodeID, err)
continue
}
remoteLatency, ok := a.nodeLatencyFn(addr.String())
@@ -912,20 +942,24 @@ func (a Allocator) shouldTransferLeaseUsingStats(
}
remoteWeight := math.Max(minReplicaWeight, replicaWeights[repl.NodeID])
- score := loadBasedLeaseRebalanceScore(
+ replScore, rebalanceAdjustment := loadBasedLeaseRebalanceScore(
ctx, a.storePool.st, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, sl.candidateLeases.mean)
- if score > bestReplScore {
- bestReplScore = score
+ if replScore > bestReplScore {
+ bestReplScore = replScore
bestRepl = repl
}
+ if rebalanceAdjustments != nil {
+ rebalanceAdjustments[repl.StoreID] = rebalanceAdjustment
+ }
}
- // Return the best replica even in cases where transferring is not advised in
- // order to support forced lease transfers, such as when removing a replica or
- // draining all leases before shutdown.
if bestReplScore > 0 {
return shouldTransfer, bestRepl
}
+
+ // Return the best replica even in cases where transferring is not advised in
+ // order to support forced lease transfers, such as when removing a replica or
+ // draining all leases before shutdown.
return shouldNotTransfer, bestRepl
}
@@ -948,7 +982,7 @@ func (a Allocator) shouldTransferLeaseUsingStats(
// logic behind each part of the formula is as follows:
//
// * LeaseRebalancingAggressiveness: Allow the aggressiveness to be tuned via
-// an environment variable.
+// a cluster setting.
// * 0.1: Constant factor to reduce aggressiveness by default
// * math.Log10(remoteWeight/sourceWeight): Comparison of the remote replica's
// weight to the local replica's weight. Taking the log of the ratio instead
@@ -963,6 +997,18 @@ func (a Allocator) shouldTransferLeaseUsingStats(
// of the ideal number of leases on each store. We then calculate these to
// compare how close each node is to its ideal state and use the differences
// from the ideal state on each node to compute a final score.
+//
+// Returns a total score for the replica that takes into account the number of
+// leases already on each store. Also returns the raw "adjustment" value that's
+// purely based on replica weights and latency in order for the caller to
+// determine how large a role the user's workload played in the decision. The
+// adjustment value is positive if the remote store is preferred for load-based
+// reasons or negative if the local store is preferred. The magnitude depends
+// on the difference in load and the latency between the nodes.
+//
+// TODO(a-robinson): Should this be changed to avoid even thinking about lease
+// counts now that we try to spread leases and replicas based on QPS? As is it
+// may fight back a little bit against store-level QPS--based rebalancing.
func loadBasedLeaseRebalanceScore(
ctx context.Context,
st *cluster.Settings,
@@ -972,14 +1018,14 @@ func loadBasedLeaseRebalanceScore(
sourceWeight float64,
source roachpb.StoreDescriptor,
meanLeases float64,
-) int32 {
+) (int32, float64) {
remoteLatencyMillis := float64(remoteLatency) / float64(time.Millisecond)
rebalanceAdjustment :=
leaseRebalancingAggressiveness.Get(&st.SV) * 0.1 * math.Log10(remoteWeight/sourceWeight) * math.Log1p(remoteLatencyMillis)
// Start with twice the base rebalance threshold in order to fight more
// strongly against thrashing caused by small variances in the distribution
// of request weights.
- rebalanceThreshold := (2 * baseLeaseRebalanceThreshold) - rebalanceAdjustment
+ rebalanceThreshold := baseLoadBasedLeaseRebalanceThreshold - rebalanceAdjustment
overfullLeaseThreshold := int32(math.Ceil(meanLeases * (1 + rebalanceThreshold)))
overfullScore := source.Capacity.LeaseCount - overfullLeaseThreshold
@@ -995,7 +1041,7 @@ func loadBasedLeaseRebalanceScore(
rebalanceThreshold, meanLeases, source.Capacity.LeaseCount, overfullLeaseThreshold,
remoteStore.Capacity.LeaseCount, underfullLeaseThreshold, totalScore,
)
- return totalScore
+ return totalScore, rebalanceAdjustment
}
func (a Allocator) shouldTransferLeaseWithoutStats(
@@ -1004,9 +1050,14 @@ func (a Allocator) shouldTransferLeaseWithoutStats(
source roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
) bool {
+ // TODO(a-robinson): Should we disable this behavior when load-based lease
+ // rebalancing is enabled? In happy cases it's nice to keep this working
+ // to even out the number of leases in addition to the number of replicas,
+ // but it's certainly a blunt instrument that could undo what we want.
+
// Allow lease transfer if we're above the overfull threshold, which is
- // mean*(1+baseLeaseRebalanceThreshold).
- overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + baseLeaseRebalanceThreshold)))
+ // mean*(1+leaseRebalanceThreshold).
+ overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + leaseRebalanceThreshold)))
minOverfullThreshold := int32(math.Ceil(sl.candidateLeases.mean + 5))
if overfullLeaseThreshold < minOverfullThreshold {
overfullLeaseThreshold = minOverfullThreshold
@@ -1016,7 +1067,7 @@ func (a Allocator) shouldTransferLeaseWithoutStats(
}
if float64(source.Capacity.LeaseCount) > sl.candidateLeases.mean {
- underfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 - baseLeaseRebalanceThreshold)))
+ underfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 - leaseRebalanceThreshold)))
minUnderfullThreshold := int32(math.Ceil(sl.candidateLeases.mean - 5))
if underfullLeaseThreshold > minUnderfullThreshold {
underfullLeaseThreshold = minUnderfullThreshold
diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go
index 9d24fadef684..3a9155692bf4 100644
--- a/pkg/storage/allocator_scorer.go
+++ b/pkg/storage/allocator_scorer.go
@@ -59,8 +59,8 @@ const (
// If disabled, rebalancing is done purely based on replica count.
var EnableStatsBasedRebalancing = settings.RegisterBoolSetting(
"kv.allocator.stat_based_rebalancing.enabled",
- "set to enable rebalancing of range replicas based on write load and disk usage",
- false,
+ "set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster",
+ false, // TODO(a-robinson): switch to true for v2.1 once the store-rebalancer is done
)
// rangeRebalanceThreshold is the minimum ratio of a store's range count to
diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go
index d93f7b4d2fc4..1a3ae4e70d23 100644
--- a/pkg/storage/allocator_test.go
+++ b/pkg/storage/allocator_test.go
@@ -1248,6 +1248,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
+ EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false)
ctx := context.Background()
defer stopper.Stop(ctx)
@@ -2040,6 +2041,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
+ EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false)
defer stopper.Stop(context.Background())
stores := []*roachpb.StoreDescriptor{
@@ -3817,7 +3819,7 @@ func TestLoadBasedLeaseRebalanceScore(t *testing.T) {
for _, c := range testCases {
remoteStore.Capacity.LeaseCount = c.remoteLeases
sourceStore.Capacity.LeaseCount = c.sourceLeases
- score := loadBasedLeaseRebalanceScore(
+ score, _ := loadBasedLeaseRebalanceScore(
context.Background(),
st,
c.remoteWeight,
diff --git a/pkg/storage/replica_rankings.go b/pkg/storage/replica_rankings.go
index d8da3d7a399d..d8e667805b16 100644
--- a/pkg/storage/replica_rankings.go
+++ b/pkg/storage/replica_rankings.go
@@ -21,6 +21,7 @@ import (
)
const (
+ // TODO(a-robinson): Scale this up based on the number of replicas on a store?
numTopReplicasToTrack = 128
)
diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go
index 6ae97ae6bc5a..5578ed1bb528 100644
--- a/pkg/storage/replicate_queue.go
+++ b/pkg/storage/replicate_queue.go
@@ -374,7 +374,7 @@ func (rq *replicateQueue) processOneChange(
// out of situations where this store is overfull and yet holds all the
// leases. The fullness checks need to be ignored for cases where
// a replica needs to be removed for constraint violations.
- transferred, err := rq.transferLease(
+ transferred, err := rq.findTargetAndTransferLease(
ctx,
repl,
desc,
@@ -419,7 +419,7 @@ func (rq *replicateQueue) processOneChange(
if dryRun {
return false, nil
}
- transferred, err := rq.transferLease(
+ transferred, err := rq.findTargetAndTransferLease(
ctx,
repl,
desc,
@@ -503,7 +503,7 @@ func (rq *replicateQueue) processOneChange(
if canTransferLease() {
// We require the lease in order to process replicas, so
// repl.store.StoreID() corresponds to the lease-holder's store ID.
- transferred, err := rq.transferLease(
+ transferred, err := rq.findTargetAndTransferLease(
ctx,
repl,
desc,
@@ -537,7 +537,7 @@ type transferLeaseOptions struct {
dryRun bool
}
-func (rq *replicateQueue) transferLease(
+func (rq *replicateQueue) findTargetAndTransferLease(
ctx context.Context,
repl *Replica,
desc *roachpb.RangeDescriptor,
@@ -545,7 +545,7 @@ func (rq *replicateQueue) transferLease(
opts transferLeaseOptions,
) (bool, error) {
candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas, 0 /* brandNewReplicaID */)
- if target := rq.allocator.TransferLeaseTarget(
+ target := rq.allocator.TransferLeaseTarget(
ctx,
zone,
candidates,
@@ -555,24 +555,35 @@ func (rq *replicateQueue) transferLease(
opts.checkTransferLeaseSource,
opts.checkCandidateFullness,
false, /* alwaysAllowDecisionWithoutStats */
- ); target != (roachpb.ReplicaDescriptor{}) {
- rq.metrics.TransferLeaseCount.Inc(1)
+ )
+ if target == (roachpb.ReplicaDescriptor{}) {
+ return false, nil
+ }
+
+ if opts.dryRun {
log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
- if opts.dryRun {
- return false, nil
- }
- avgQPS, qpsMeasurementDur := repl.leaseholderStats.avgQPS()
- if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil {
- return false, errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
- }
- rq.lastLeaseTransfer.Store(timeutil.Now())
- if qpsMeasurementDur >= MinStatsDuration {
- rq.allocator.storePool.updateLocalStoresAfterLeaseTransfer(
- repl.store.StoreID(), target.StoreID, avgQPS)
- }
- return true, nil
+ return false, nil
}
- return false, nil
+
+ err := rq.transferLease(ctx, repl, target)
+ return err == nil, err
+}
+
+func (rq *replicateQueue) transferLease(
+ ctx context.Context, repl *Replica, target roachpb.ReplicaDescriptor,
+) error {
+ rq.metrics.TransferLeaseCount.Inc(1)
+ log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
+ avgQPS, qpsMeasurementDur := repl.leaseholderStats.avgQPS()
+ if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil {
+ return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
+ }
+ rq.lastLeaseTransfer.Store(timeutil.Now())
+ if qpsMeasurementDur >= MinStatsDuration {
+ rq.allocator.storePool.updateLocalStoresAfterLeaseTransfer(
+ repl.store.StoreID(), target.StoreID, avgQPS)
+ }
+ return nil
}
func (rq *replicateQueue) addReplica(
diff --git a/pkg/storage/store.go b/pkg/storage/store.go
index bba4bac225ee..5f130e31475f 100644
--- a/pkg/storage/store.go
+++ b/pkg/storage/store.go
@@ -362,6 +362,7 @@ type Store struct {
tsCache tscache.Cache // Most recent timestamps for keys / key ranges
allocator Allocator // Makes allocation decisions
replRankings *replicaRankings
+ storeRebalancer *StoreRebalancer
rangeIDAlloc *idalloc.Allocator // Range ID allocator
gcQueue *gcQueue // Garbage collection queue
mergeQueue *mergeQueue // Range merging queue
@@ -992,6 +993,9 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
}
}
+ s.storeRebalancer = NewStoreRebalancer(
+ s.cfg.AmbientCtx, cfg.Settings, s.replicateQueue, s.replRankings)
+
if cfg.TestingKnobs.DisableGCQueue {
s.setGCQueueActive(false)
}
@@ -1124,7 +1128,7 @@ func (s *Store) SetDraining(drain bool) {
log.Errorf(ctx, "could not get zone config for key %s when draining: %s", desc.StartKey, err)
}
}
- leaseTransferred, err := s.replicateQueue.transferLease(
+ leaseTransferred, err := s.replicateQueue.findTargetAndTransferLease(
ctx,
r,
desc,
@@ -1514,6 +1518,8 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.startLeaseRenewer(ctx)
}
+ s.storeRebalancer.Start(ctx, s.stopper, s.StoreID())
+
// Start the storage engine compactor.
if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) {
s.compactor.Start(s.AnnotateCtx(context.Background()), s.stopper)
diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go
new file mode 100644
index 000000000000..949f6f16e3c1
--- /dev/null
+++ b/pkg/storage/store_rebalancer.go
@@ -0,0 +1,271 @@
+// Copyright 2018 The Cockroach Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied. See the License for the specific language governing
+// permissions and limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "math"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/config"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/stop"
+)
+
+const (
+ // storeRebalancerTimerDuration is how frequently to check the store-level
+ // balance of the cluster.
+ storeRebalancerTimerDuration = time.Minute
+
+ // minQPSThresholdDifference is the minimum QPS difference from the cluster
+ // mean that this system should care about. In other words, we won't worry
+ // about rebalancing for QPS reasons if a store's QPS differs from the mean
+ // by less than this amount even if the amount is greater than the percentage
+ // threshold. This avoids too many lease transfers in lightly loaded clusters.
+ minQPSThresholdDifference = 100
+)
+
+// StoreRebalancer is responsible for examining how the associated store's load
+// compares to the load on other stores in the cluster and transferring leases
+// or replicas away if the local store is overloaded.
+type StoreRebalancer struct {
+ log.AmbientContext
+ st *cluster.Settings
+ rq *replicateQueue
+ replRankings *replicaRankings
+}
+
+// NewStoreRebalancer creates a StoreRebalancer to work in tandem with the
+// provided replicateQueue.
+func NewStoreRebalancer(
+ ambientCtx log.AmbientContext,
+ st *cluster.Settings,
+ rq *replicateQueue,
+ replRankings *replicaRankings,
+) *StoreRebalancer {
+ ambientCtx.AddLogTag("store-rebalancer", nil)
+ return &StoreRebalancer{
+ AmbientContext: ambientCtx,
+ st: st,
+ rq: rq,
+ replRankings: replRankings,
+ }
+}
+
+// Start runs an infinite loop in a goroutine which regularly checks whether
+// the store is overloaded along any important dimension (e.g. range count,
+// QPS, disk usage), and if so attempts to correct that by moving leases or
+// replicas elsewhere.
+//
+// This worker acts on store-level imbalances, whereas the replicate queue
+// makes decisions based on the zone config constraints and diversity of
+// individual ranges. This means that there are two different workers that
+// could potentially be making decisions about a given range, so they have to
+// be careful to avoid stepping on each others' toes.
+//
+// TODO(a-robinson): Expose metrics to make this understandable without having
+// to dive into logspy.
+func (sr *StoreRebalancer) Start(
+ ctx context.Context, stopper *stop.Stopper, storeID roachpb.StoreID,
+) {
+ ctx = sr.AnnotateCtx(ctx)
+
+ // Start a goroutine that watches and proactively renews certain
+ // expiration-based leases.
+ stopper.RunWorker(ctx, func(ctx context.Context) {
+ ticker := time.NewTicker(storeRebalancerTimerDuration)
+ defer ticker.Stop()
+ for {
+ // Wait out the first tick before doing anything since the store is still
+ // starting up and we might as well wait for some qps/wps stats to
+ // accumulate.
+ select {
+ case <-stopper.ShouldQuiesce():
+ return
+ case <-ticker.C:
+ }
+
+ if !EnableStatsBasedRebalancing.Get(&sr.st.SV) {
+ continue
+ }
+
+ localDesc, found := sr.rq.allocator.storePool.getStoreDescriptor(storeID)
+ if !found {
+ log.Warningf(ctx, "StorePool missing descriptor for local store")
+ continue
+ }
+ storeList, _, _ := sr.rq.allocator.storePool.getStoreList(roachpb.RangeID(0), storeFilterNone)
+ sr.rebalanceStore(ctx, localDesc, storeList)
+ }
+ })
+}
+
+func (sr *StoreRebalancer) rebalanceStore(
+ ctx context.Context, localDesc roachpb.StoreDescriptor, storeList StoreList,
+) {
+
+ statThreshold := statRebalanceThreshold.Get(&sr.st.SV)
+
+ // First check if we should transfer leases away to better balance QPS.
+ qpsMinThreshold := math.Min(storeList.candidateQueriesPerSecond.mean*(1-statThreshold),
+ storeList.candidateQueriesPerSecond.mean-minQPSThresholdDifference)
+ qpsMaxThreshold := math.Max(storeList.candidateQueriesPerSecond.mean*(1+statThreshold),
+ storeList.candidateQueriesPerSecond.mean+minQPSThresholdDifference)
+
+ if !(localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold) {
+ return
+ }
+
+ storeMap := storeListToMap(storeList)
+ sysCfg, cfgOk := sr.rq.allocator.storePool.gossip.GetSystemConfig()
+ if !cfgOk {
+ log.VEventf(ctx, 1, "no system config available, unable to choose lease transfer targets")
+ return
+ }
+
+ log.Infof(ctx, "considering load-based lease transfers for s%d with %.2f qps (mean=%.2f, upperThreshold=%.2f)",
+ localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold)
+
+ for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold {
+ replWithStats, target := sr.chooseLeaseToTransfer(
+ ctx, sysCfg, localDesc, storeList, storeMap, qpsMinThreshold, qpsMaxThreshold)
+ if replWithStats.repl == nil {
+ log.Infof(ctx,
+ "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f)",
+ localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold)
+ break
+ }
+ log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load",
+ replWithStats.repl.RangeID, replWithStats.qps, target.StoreID)
+ replCtx := replWithStats.repl.AnnotateCtx(ctx)
+ if err := sr.rq.transferLease(replCtx, replWithStats.repl, target); err != nil {
+ log.Errorf(replCtx, "unable to transfer lease to s%d: %v", target.StoreID, err)
+ continue
+ }
+ localDesc.Capacity.QueriesPerSecond -= replWithStats.qps
+ }
+}
+
+// TODO(a-robinson): Should we take the number of leases on each store into
+// account here or just continue to let that happen in allocator.go?
+func (sr *StoreRebalancer) chooseLeaseToTransfer(
+ ctx context.Context,
+ sysCfg config.SystemConfig,
+ localDesc roachpb.StoreDescriptor,
+ storeList StoreList,
+ storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor,
+ minQPS float64,
+ maxQPS float64,
+) (replicaWithStats, roachpb.ReplicaDescriptor) {
+ now := sr.rq.store.Clock().Now()
+ for {
+ replWithStats := sr.replRankings.topQPS()
+
+ // We're all out of replicas.
+ if replWithStats.repl == nil {
+ return replicaWithStats{}, roachpb.ReplicaDescriptor{}
+ }
+
+ if !replWithStats.repl.OwnsValidLease(now) {
+ log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID)
+ continue
+ }
+
+ if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS {
+ log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)",
+ replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS)
+ continue
+ }
+
+ // Don't bother moving leases whose QPS is below some small fraction of the
+ // store's QPS (unless the store has extra leases to spare anyway). It's
+ // just unnecessary churn with no benefit to move leases responsible for,
+ // for example, 1 qps on a store with 5000 qps.
+ const minQPSFraction = .001
+ if replWithStats.qps < localDesc.Capacity.QueriesPerSecond*minQPSFraction &&
+ float64(localDesc.Capacity.LeaseCount) <= storeList.candidateLeases.mean {
+ log.VEventf(ctx, 5, "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps",
+ replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, localDesc.Capacity.QueriesPerSecond)
+ continue
+ }
+
+ desc := replWithStats.repl.Desc()
+ log.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps", desc.RangeID, replWithStats.qps)
+ // TODO(a-robinson): Should we sort these to first examine the stores with lower QPS?
+ for _, candidate := range desc.Replicas {
+ if candidate.StoreID == localDesc.StoreID {
+ continue
+ }
+ storeDesc, ok := storeMap[candidate.StoreID]
+ if !ok {
+ log.VEventf(ctx, 3, "missing store descriptor for s%d", candidate.StoreID)
+ continue
+ }
+
+ newCandidateQPS := storeDesc.Capacity.QueriesPerSecond + replWithStats.qps
+ if storeDesc.Capacity.QueriesPerSecond < minQPS {
+ if newCandidateQPS > maxQPS {
+ log.VEventf(ctx, 3,
+ "r%d's %.2f qps would push s%d over the max threshold (%.2f) with %.2f qps afterwards",
+ desc.RangeID, replWithStats.qps, candidate.StoreID, maxQPS, newCandidateQPS)
+ continue
+ }
+ } else if newCandidateQPS > storeList.candidateQueriesPerSecond.mean {
+ log.VEventf(ctx, 3,
+ "r%d's %.2f qps would push s%d over the mean (%.2f) with %.2f qps afterwards",
+ desc.RangeID, replWithStats.qps, candidate.StoreID,
+ storeList.candidateQueriesPerSecond.mean, newCandidateQPS)
+ continue
+ }
+
+ zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey)
+ if err != nil {
+ log.Error(ctx, err)
+ return replicaWithStats{}, roachpb.ReplicaDescriptor{}
+ }
+ preferred := sr.rq.allocator.preferredLeaseholders(zone, desc.Replicas)
+ if len(preferred) > 0 && !storeHasReplica(candidate.StoreID, preferred) {
+ log.VEventf(ctx, 3, "s%d not a preferred leaseholder; preferred: %v", candidate.StoreID, preferred)
+ continue
+ }
+
+ filteredStoreList := storeList.filter(zone.Constraints)
+ if sr.rq.allocator.followTheWorkloadPrefersLocal(
+ ctx,
+ filteredStoreList,
+ localDesc,
+ candidate.StoreID,
+ desc.Replicas,
+ replWithStats.repl.leaseholderStats,
+ ) {
+ log.VEventf(ctx, 3, "r%d is on s%d due to follow-the-workload; skipping",
+ desc.RangeID, localDesc.StoreID)
+ continue
+ }
+
+ return replWithStats, candidate
+ }
+ }
+}
+
+func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor {
+ storeMap := make(map[roachpb.StoreID]*roachpb.StoreDescriptor)
+ for i := range sl.stores {
+ storeMap[sl.stores[i].StoreID] = &sl.stores[i]
+ }
+ return storeMap
+}
diff --git a/pkg/storage/store_rebalancer_test.go b/pkg/storage/store_rebalancer_test.go
new file mode 100644
index 000000000000..d1df0318968e
--- /dev/null
+++ b/pkg/storage/store_rebalancer_test.go
@@ -0,0 +1,165 @@
+// Copyright 2018 The Cockroach Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied. See the License for the specific language governing
+// permissions and limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "testing"
+
+ "github.com/cockroachdb/cockroach/pkg/config"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
+ "github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/stop"
+)
+
+var (
+ // noLocalityStores specifies a set of stores where one store is
+ // under-utilized in terms of QPS, three are in the middle, and one is
+ // over-utilized.
+ noLocalityStores = []*roachpb.StoreDescriptor{
+ {
+ StoreID: 1,
+ Node: roachpb.NodeDescriptor{NodeID: 1},
+ Capacity: roachpb.StoreCapacity{
+ QueriesPerSecond: 1500,
+ },
+ },
+ {
+ StoreID: 2,
+ Node: roachpb.NodeDescriptor{NodeID: 2},
+ Capacity: roachpb.StoreCapacity{
+ QueriesPerSecond: 1100,
+ },
+ },
+ {
+ StoreID: 3,
+ Node: roachpb.NodeDescriptor{NodeID: 3},
+ Capacity: roachpb.StoreCapacity{
+ QueriesPerSecond: 1000,
+ },
+ },
+ {
+ StoreID: 4,
+ Node: roachpb.NodeDescriptor{NodeID: 4},
+ Capacity: roachpb.StoreCapacity{
+ QueriesPerSecond: 900,
+ },
+ },
+ {
+ StoreID: 5,
+ Node: roachpb.NodeDescriptor{NodeID: 5},
+ Capacity: roachpb.StoreCapacity{
+ QueriesPerSecond: 500,
+ },
+ },
+ }
+)
+
+type testRange struct {
+ // The first storeID in the list will be the leaseholder.
+ storeIDs []roachpb.StoreID
+ qps float64
+}
+
+func loadRanges(rr *replicaRankings, s *Store, ranges []testRange) {
+ acc := rr.newAccumulator()
+ for _, r := range ranges {
+ repl := &Replica{store: s}
+ repl.mu.state.Desc = &roachpb.RangeDescriptor{}
+ for _, storeID := range r.storeIDs {
+ repl.mu.state.Desc.Replicas = append(repl.mu.state.Desc.Replicas, roachpb.ReplicaDescriptor{
+ NodeID: roachpb.NodeID(storeID),
+ StoreID: storeID,
+ ReplicaID: roachpb.ReplicaID(storeID),
+ })
+ }
+ repl.mu.state.Lease = &roachpb.Lease{
+ Expiration: &hlc.MaxTimestamp,
+ Replica: repl.mu.state.Desc.Replicas[0],
+ }
+ acc.addReplica(replicaWithStats{
+ repl: repl,
+ qps: r.qps,
+ })
+ }
+ rr.update(acc)
+}
+
+func TestChooseLeaseToTransfer(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ ctx := context.Background()
+ stopper := stop.NewStopper()
+ defer stopper.Stop(ctx)
+
+ stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
+ defer stopper.Stop(context.Background())
+ gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t)
+ storeList, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled)
+ storeMap := storeListToMap(storeList)
+
+ const minQPS = 800
+ const maxQPS = 1200
+
+ localDesc := *noLocalityStores[0]
+ cfg := TestStoreConfig(nil)
+ s := createTestStoreWithoutStart(t, stopper, &cfg)
+ s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID}
+ rq := newReplicateQueue(s, g, a)
+ rr := newReplicaRankings()
+
+ sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr)
+
+ testCases := []struct {
+ storeIDs []roachpb.StoreID
+ qps float64
+ expectTarget roachpb.StoreID
+ }{
+ {[]roachpb.StoreID{1}, 100, 0},
+ {[]roachpb.StoreID{1, 2}, 100, 0},
+ {[]roachpb.StoreID{1, 3}, 100, 0},
+ {[]roachpb.StoreID{1, 4}, 100, 4},
+ {[]roachpb.StoreID{1, 5}, 100, 5},
+ {[]roachpb.StoreID{5, 1}, 100, 0},
+ {[]roachpb.StoreID{1, 2}, 200, 0},
+ {[]roachpb.StoreID{1, 3}, 200, 0},
+ {[]roachpb.StoreID{1, 4}, 200, 0},
+ {[]roachpb.StoreID{1, 5}, 200, 5},
+ {[]roachpb.StoreID{1, 2}, 500, 0},
+ {[]roachpb.StoreID{1, 3}, 500, 0},
+ {[]roachpb.StoreID{1, 4}, 500, 0},
+ {[]roachpb.StoreID{1, 5}, 500, 5},
+ {[]roachpb.StoreID{1, 5}, 600, 5},
+ {[]roachpb.StoreID{1, 5}, 700, 5},
+ {[]roachpb.StoreID{1, 5}, 800, 0},
+ {[]roachpb.StoreID{1, 4}, 1.5, 4},
+ {[]roachpb.StoreID{1, 5}, 1.5, 5},
+ {[]roachpb.StoreID{1, 4}, 1.49, 0},
+ {[]roachpb.StoreID{1, 5}, 1.49, 0},
+ }
+
+ for _, tc := range testCases {
+ loadRanges(rr, s, []testRange{{storeIDs: tc.storeIDs, qps: tc.qps}})
+
+ _, target := sr.chooseLeaseToTransfer(
+ ctx, config.SystemConfig{}, localDesc, storeList, storeMap, minQPS, maxQPS)
+ if target.StoreID != tc.expectTarget {
+ t.Errorf("got target store %d for range with replicas %v and %f qps; want %d",
+ target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget)
+ }
+ }
+}