diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 3e45f4d21c69..90db15bceb04 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -18,7 +18,7 @@
kv.allocator.lease_rebalancing_aggressiveness | float | 1 | set greater than 1.0 to rebalance leases toward load more aggressively, or between 0 and 1.0 to be more conservative about rebalancing leases |
kv.allocator.load_based_lease_rebalancing.enabled | boolean | true | set to enable rebalancing of range leases based on load and latency |
kv.allocator.range_rebalance_threshold | float | 0.05 | minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull |
-kv.allocator.stat_based_rebalancing.enabled | boolean | false | set to enable rebalancing of range replicas based on write load and disk usage |
+kv.allocator.stat_based_rebalancing.enabled | boolean | true | set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster |
kv.allocator.stat_rebalance_threshold | float | 0.2 | minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull |
kv.bulk_io_write.concurrent_export_requests | integer | 5 | number of export requests a store will handle concurrently before queuing |
kv.bulk_io_write.concurrent_import_requests | integer | 1 | number of import requests a store will handle concurrently before queuing |
diff --git a/pkg/cmd/roachtest/allocator.go b/pkg/cmd/roachtest/allocator.go
index 7fac79ade1cb..f30b4b8a55f3 100644
--- a/pkg/cmd/roachtest/allocator.go
+++ b/pkg/cmd/roachtest/allocator.go
@@ -33,7 +33,7 @@ func registerAllocator(r *registry) {
c.Put(ctx, workload, "./workload")
// Start the first `start` nodes and restore the fixture
- args := startArgs("--args=--vmodule=allocator=5,allocator_scorer=5,replicate_queue=5")
+ args := startArgs("--args=--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5")
c.Start(ctx, c.Range(1, start), args)
db := c.Conn(ctx, 1)
defer db.Close()
diff --git a/pkg/cmd/roachtest/rebalance_load.go b/pkg/cmd/roachtest/rebalance_load.go
new file mode 100644
index 000000000000..a2611e1e0e49
--- /dev/null
+++ b/pkg/cmd/roachtest/rebalance_load.go
@@ -0,0 +1,180 @@
+// Copyright 2018 The Cockroach Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied. See the License for the specific language governing
+// permissions and limitations under the License. See the AUTHORS file
+// for names of contributors.
+
+package main
+
+import (
+ "context"
+ gosql "database/sql"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "sort"
+ "strconv"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "golang.org/x/sync/errgroup"
+)
+
+func registerRebalanceLoad(r *registry) {
+ // This test creates a single table for kv to use and splits the table to
+ // have one range for every node in the cluster. Because even brand new
+ // clusters start with 20+ ranges in them, the number of new ranges in kv's
+ // table is small enough that it typically won't trigger rebalancing of
+ // leases in the cluster based on lease count alone. We let kv generate a lot
+ // of load against the ranges such that when
+ // kv.allocator.stat_based_rebalancing.enabled is set to true, we'd expect
+ // load-based rebalancing to distribute the load evenly across the nodes in
+ // the cluster. Without that setting, the fact that the kv table has so few
+ // ranges means that they probablistically won't have their leases evenly
+ // spread across all the nodes (they'll often just end up staying on n1).
+ //
+ // In other words, this test should always pass with
+ // kv.allocator.stat_based_rebalancing.enabled set to true, while it should
+ // usually (but not always fail) with it set to false.
+ rebalanceLoadRun := func(ctx context.Context, t *test, c *cluster, duration time.Duration, concurrency int) {
+ roachNodes := c.Range(1, c.nodes-1)
+ appNode := c.Node(c.nodes)
+
+ c.Put(ctx, cockroach, "./cockroach", roachNodes)
+ args := startArgs(
+ "--args=--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5")
+ c.Start(ctx, roachNodes, args)
+
+ c.Put(ctx, workload, "./workload", appNode)
+ c.Run(ctx, appNode, `./workload init kv --drop {pgurl:1}`)
+
+ var m *errgroup.Group // see comment in version.go
+ m, ctx = errgroup.WithContext(ctx)
+
+ m.Go(func() error {
+ c.l.printf("starting load generator\n")
+
+ quietL, err := newLogger("run kv", strconv.Itoa(0), "workload"+strconv.Itoa(0), ioutil.Discard, os.Stderr)
+ if err != nil {
+ return err
+ }
+ splits := len(roachNodes) - 1 // n-1 splits => n ranges => 1 lease per node
+ return c.RunL(ctx, quietL, appNode, fmt.Sprintf(
+ "./workload run kv --read-percent=95 --splits=%d --tolerate-errors --concurrency=%d "+
+ "--duration=%s {pgurl:1-3}",
+ splits, concurrency, duration.String()))
+ })
+
+ m.Go(func() error {
+ t.Status(fmt.Sprintf("starting checks for lease balance"))
+
+ db := c.Conn(ctx, 1)
+ defer db.Close()
+
+ if _, err := db.ExecContext(
+ ctx, `SET CLUSTER SETTING kv.allocator.stat_based_rebalancing.enabled=true`,
+ ); err != nil {
+ return err
+ }
+
+ for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= duration; {
+ if done, err := isLoadEvenlyDistributed(c.l, db, len(roachNodes)); err != nil {
+ return err
+ } else if done {
+ c.l.printf("successfully achieved lease balance\n")
+ return nil
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-time.After(5 * time.Second):
+ }
+ }
+
+ return fmt.Errorf("timed out before leases were evenly spread")
+ })
+ if err := m.Wait(); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ minutes := 2 * time.Minute
+ numNodes := 4 // the last node is just used to generate load
+ concurrency := 128
+
+ r.Add(testSpec{
+ Name: `rebalance-leases-by-load`,
+ Nodes: nodes(numNodes),
+ Stable: false, // TODO(a-robinson): Promote to stable
+ Run: func(ctx context.Context, t *test, c *cluster) {
+ if local {
+ concurrency = 32
+ fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
+ }
+ rebalanceLoadRun(ctx, t, c, minutes, concurrency)
+ },
+ })
+}
+
+func isLoadEvenlyDistributed(l *logger, db *gosql.DB, numNodes int) (bool, error) {
+ rows, err := db.Query(
+ `select lease_holder, count(*) ` +
+ `from [show experimental_ranges from table kv.kv] ` +
+ `group by lease_holder;`)
+ if err != nil {
+ return false, err
+ }
+ defer rows.Close()
+ leaseCounts := make(map[int]int)
+ var rangeCount int
+ for rows.Next() {
+ var storeID, leaseCount int
+ if err := rows.Scan(&storeID, &leaseCount); err != nil {
+ return false, err
+ }
+ leaseCounts[storeID] = leaseCount
+ rangeCount += leaseCount
+ }
+ l.printf("numbers of test.kv leases on each store: %v\n", leaseCounts)
+
+ if len(leaseCounts) < numNodes {
+ l.printf("not all nodes have a lease yet: %v\n", leaseCounts)
+ return false, nil
+ }
+
+ // The simple case is when ranges haven't split. We can require that every
+ // store has one lease.
+ if rangeCount == numNodes {
+ for _, leaseCount := range leaseCounts {
+ if leaseCount != 1 {
+ l.printf("uneven lease distribution: %v\n", leaseCounts)
+ return false, nil
+ }
+ }
+ return true, nil
+ }
+
+ // For completeness, if leases have split, verify the leases per store don't
+ // differ by any more than 1.
+ leases := make([]int, 0, numNodes)
+ for _, leaseCount := range leaseCounts {
+ leases = append(leases, leaseCount)
+ }
+ sort.Ints(leases)
+ if leases[0]+1 < leases[len(leases)-1] {
+ l.printf("leases per store differ by more than one: %v\n", leaseCounts)
+ return false, nil
+ }
+
+ return true, nil
+}
diff --git a/pkg/cmd/roachtest/registry.go b/pkg/cmd/roachtest/registry.go
index 5d9319b83cc2..e425b7bd0998 100644
--- a/pkg/cmd/roachtest/registry.go
+++ b/pkg/cmd/roachtest/registry.go
@@ -45,6 +45,7 @@ func registerTests(r *registry) {
registerKVSplits(r)
registerLargeRange(r)
registerQueue(r)
+ registerRebalanceLoad(r)
registerRestore(r)
registerRoachmart(r)
registerScaleData(r)
diff --git a/pkg/sql/opt/norm/rules/join.opt b/pkg/sql/opt/norm/rules/join.opt
index 6c182e23d664..0240f7aa1f55 100644
--- a/pkg/sql/opt/norm/rules/join.opt
+++ b/pkg/sql/opt/norm/rules/join.opt
@@ -49,17 +49,18 @@
# Given this mapping, we can safely push the filter down to both sides and
# remove it from the ON filters list.
#
-# Note that this rule is only applied to InnerJoin and SemiJoin, not
-# InnerJoinApply or SemiJoinApply. The apply variants would cause a
-# non-detectable cycle with TryDecorrelateSelect, causing the filters to get
-# remapped to both sides and pushed down over and over again.
+# Note that this rule is only applied when the left and right inputs do not have
+# outer columns. If they do, then this rule can cause undetectable cycles with
+# TryDecorrelateSelect, since the filter is pushed down to both sides, but then
+# only pulled up from the right side by TryDecorrelateSelect. For this reason,
+# the rule also does not apply to InnerJoinApply or SemiJoinApply.
#
# NOTE: It is important that this rule is first among the join filter push-down
# rules.
[PushFilterIntoJoinLeftAndRight, Normalize]
(InnerJoin | SemiJoin
- $left:*
- $right:*
+ $left:* & ^(HasOuterCols $left)
+ $right:* & ^(HasOuterCols $right)
$filters:(Filters
$list:[
...
diff --git a/pkg/sql/opt/norm/testdata/rules/join b/pkg/sql/opt/norm/testdata/rules/join
index 94193438516e..b6a078eaeec3 100644
--- a/pkg/sql/opt/norm/testdata/rules/join
+++ b/pkg/sql/opt/norm/testdata/rules/join
@@ -19,6 +19,24 @@ TABLE b
└── INDEX primary
└── x int not null
+exec-ddl
+CREATE TABLE xy (x INT PRIMARY KEY, y INT)
+----
+TABLE xy
+ ├── x int not null
+ ├── y int
+ └── INDEX primary
+ └── x int not null
+
+exec-ddl
+CREATE TABLE uv (u INT PRIMARY KEY, v INT)
+----
+TABLE uv
+ ├── u int not null
+ ├── v int
+ └── INDEX primary
+ └── u int not null
+
# --------------------------------------------------
# EnsureJoinFiltersAnd
# --------------------------------------------------
@@ -939,6 +957,113 @@ inner-join
└── filters [type=bool, outer=(1,3), constraints=(/1: (/NULL - ]; /3: (/NULL - ]), fd=(1)==(3), (3)==(1)]
└── a = b [type=bool, outer=(1,3), constraints=(/1: (/NULL - ]; /3: (/NULL - ])]
+# Regression for issue 28818. Try to trigger undetectable cycle between the
+# PushFilterIntoJoinLeftAndRight and TryDecorrelateSelect rules.
+opt
+SELECT 1
+FROM a
+WHERE EXISTS (
+ SELECT 1
+ FROM xy
+ INNER JOIN uv
+ ON EXISTS (
+ SELECT 1
+ FROM b
+ WHERE a.s >= 'foo'
+ LIMIT 10
+ )
+ WHERE
+ (SELECT s FROM a) = 'foo'
+)
+----
+project
+ ├── columns: "?column?":22(int!null)
+ ├── fd: ()-->(22)
+ ├── distinct-on
+ │ ├── columns: a.k:1(int!null)
+ │ ├── grouping columns: a.k:1(int!null)
+ │ ├── key: (1)
+ │ └── select
+ │ ├── columns: a.k:1(int!null) xy.x:6(int!null) u:8(int!null) true_agg:14(bool!null)
+ │ ├── key: (1,6,8)
+ │ ├── fd: (1,6,8)-->(14)
+ │ ├── group-by
+ │ │ ├── columns: a.k:1(int!null) xy.x:6(int!null) u:8(int!null) true_agg:14(bool)
+ │ │ ├── grouping columns: a.k:1(int!null) xy.x:6(int!null) u:8(int!null)
+ │ │ ├── key: (1,6,8)
+ │ │ ├── fd: (1,6,8)-->(14)
+ │ │ ├── project
+ │ │ │ ├── columns: true:13(bool!null) a.k:1(int!null) xy.x:6(int!null) u:8(int!null)
+ │ │ │ ├── fd: ()-->(13)
+ │ │ │ ├── inner-join-apply
+ │ │ │ │ ├── columns: a.k:1(int!null) a.s:4(string) xy.x:6(int!null) u:8(int!null)
+ │ │ │ │ ├── fd: (1)-->(4)
+ │ │ │ │ ├── scan a
+ │ │ │ │ │ ├── columns: a.k:1(int!null) a.s:4(string)
+ │ │ │ │ │ ├── key: (1)
+ │ │ │ │ │ └── fd: (1)-->(4)
+ │ │ │ │ ├── inner-join
+ │ │ │ │ │ ├── columns: xy.x:6(int!null) u:8(int!null)
+ │ │ │ │ │ ├── outer: (4)
+ │ │ │ │ │ ├── inner-join
+ │ │ │ │ │ │ ├── columns: xy.x:6(int!null) u:8(int!null)
+ │ │ │ │ │ │ ├── key: (6,8)
+ │ │ │ │ │ │ ├── select
+ │ │ │ │ │ │ │ ├── columns: xy.x:6(int!null)
+ │ │ │ │ │ │ │ ├── key: (6)
+ │ │ │ │ │ │ │ ├── scan xy
+ │ │ │ │ │ │ │ │ ├── columns: xy.x:6(int!null)
+ │ │ │ │ │ │ │ │ └── key: (6)
+ │ │ │ │ │ │ │ └── filters [type=bool]
+ │ │ │ │ │ │ │ └── eq [type=bool]
+ │ │ │ │ │ │ │ ├── subquery [type=string]
+ │ │ │ │ │ │ │ │ └── max1-row
+ │ │ │ │ │ │ │ │ ├── columns: a.s:19(string)
+ │ │ │ │ │ │ │ │ ├── cardinality: [0 - 1]
+ │ │ │ │ │ │ │ │ ├── key: ()
+ │ │ │ │ │ │ │ │ ├── fd: ()-->(19)
+ │ │ │ │ │ │ │ │ └── scan a
+ │ │ │ │ │ │ │ │ └── columns: a.s:19(string)
+ │ │ │ │ │ │ │ └── const: 'foo' [type=string]
+ │ │ │ │ │ │ ├── select
+ │ │ │ │ │ │ │ ├── columns: u:8(int!null)
+ │ │ │ │ │ │ │ ├── key: (8)
+ │ │ │ │ │ │ │ ├── scan uv
+ │ │ │ │ │ │ │ │ ├── columns: u:8(int!null)
+ │ │ │ │ │ │ │ │ └── key: (8)
+ │ │ │ │ │ │ │ └── filters [type=bool]
+ │ │ │ │ │ │ │ └── eq [type=bool]
+ │ │ │ │ │ │ │ ├── subquery [type=string]
+ │ │ │ │ │ │ │ │ └── max1-row
+ │ │ │ │ │ │ │ │ ├── columns: a.s:19(string)
+ │ │ │ │ │ │ │ │ ├── cardinality: [0 - 1]
+ │ │ │ │ │ │ │ │ ├── key: ()
+ │ │ │ │ │ │ │ │ ├── fd: ()-->(19)
+ │ │ │ │ │ │ │ │ └── scan a
+ │ │ │ │ │ │ │ │ └── columns: a.s:19(string)
+ │ │ │ │ │ │ │ └── const: 'foo' [type=string]
+ │ │ │ │ │ │ └── true [type=bool]
+ │ │ │ │ │ ├── limit
+ │ │ │ │ │ │ ├── outer: (4)
+ │ │ │ │ │ │ ├── cardinality: [0 - 10]
+ │ │ │ │ │ │ ├── select
+ │ │ │ │ │ │ │ ├── outer: (4)
+ │ │ │ │ │ │ │ ├── scan b
+ │ │ │ │ │ │ │ └── filters [type=bool, outer=(4), constraints=(/4: [/'foo' - ]; tight)]
+ │ │ │ │ │ │ │ └── a.s >= 'foo' [type=bool, outer=(4), constraints=(/4: [/'foo' - ]; tight)]
+ │ │ │ │ │ │ └── const: 10 [type=int]
+ │ │ │ │ │ └── true [type=bool]
+ │ │ │ │ └── true [type=bool]
+ │ │ │ └── projections [outer=(1,6,8)]
+ │ │ │ └── true [type=bool]
+ │ │ └── aggregations [outer=(13)]
+ │ │ └── const-not-null-agg [type=bool, outer=(13)]
+ │ │ └── variable: true [type=bool, outer=(13)]
+ │ └── filters [type=bool, outer=(14), constraints=(/14: (/NULL - ]; tight)]
+ │ └── true_agg IS NOT NULL [type=bool, outer=(14), constraints=(/14: (/NULL - ]; tight)]
+ └── projections
+ └── const: 1 [type=int]
+
# --------------------------------------------------
# PushFilterIntoJoinLeft + PushFilterIntoJoinRight
# --------------------------------------------------
diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go
index 55e3c37ec69f..b5d4b6216349 100644
--- a/pkg/storage/allocator.go
+++ b/pkg/storage/allocator.go
@@ -34,10 +34,16 @@ import (
)
const (
- // baseLeaseRebalanceThreshold is the minimum ratio of a store's lease surplus
+ // leaseRebalanceThreshold is the minimum ratio of a store's lease surplus
// to the mean range/lease count that permits lease-transfers away from that
// store.
- baseLeaseRebalanceThreshold = 0.05
+ leaseRebalanceThreshold = 0.05
+
+ // baseLoadBasedLeaseRebalanceThreshold is the equivalent of
+ // leaseRebalanceThreshold for load-based lease rebalance decisions (i.e.
+ // "follow-the-workload"). It's the base threshold for decisions that get
+ // adjusted based on the load and latency of the involved ranges/nodes.
+ baseLoadBasedLeaseRebalanceThreshold = 2 * leaseRebalanceThreshold
// minReplicaWeight sets a floor for how low a replica weight can be. This is
// needed because a weight of zero doesn't work in the current lease scoring
@@ -714,7 +720,7 @@ func (a *Allocator) TransferLeaseTarget(
// whether we actually should be transferring the lease. The transfer
// decision is only needed if we've been asked to check the source.
transferDec, repl := a.shouldTransferLeaseUsingStats(
- ctx, sl, source, existing, stats,
+ ctx, sl, source, existing, stats, nil,
)
if checkTransferLeaseSource {
switch transferDec {
@@ -814,7 +820,7 @@ func (a *Allocator) ShouldTransferLease(
return false
}
- transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats)
+ transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats, nil)
var result bool
switch transferDec {
case shouldNotTransfer:
@@ -831,12 +837,36 @@ func (a *Allocator) ShouldTransferLease(
return result
}
+func (a Allocator) followTheWorkloadPrefersLocal(
+ ctx context.Context,
+ sl StoreList,
+ source roachpb.StoreDescriptor,
+ candidate roachpb.StoreID,
+ existing []roachpb.ReplicaDescriptor,
+ stats *replicaStats,
+) bool {
+ adjustments := make(map[roachpb.StoreID]float64)
+ decision, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats, adjustments)
+ if decision == decideWithoutStats {
+ return false
+ }
+ adjustment := adjustments[candidate]
+ if adjustment > baseLoadBasedLeaseRebalanceThreshold {
+ log.VEventf(ctx, 3,
+ "s%d is a better fit than s%d due to follow-the-workload (score: %.2f; threshold: %.2f)",
+ source.StoreID, candidate, adjustment, baseLoadBasedLeaseRebalanceThreshold)
+ return true
+ }
+ return false
+}
+
func (a Allocator) shouldTransferLeaseUsingStats(
ctx context.Context,
sl StoreList,
source roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
stats *replicaStats,
+ rebalanceAdjustments map[roachpb.StoreID]float64,
) (transferDecision, roachpb.ReplicaDescriptor) {
// Only use load-based rebalancing if it's enabled and we have both
// stats and locality information to base our decision on.
@@ -903,7 +933,7 @@ func (a Allocator) shouldTransferLeaseUsingStats(
}
addr, err := a.storePool.gossip.GetNodeIDAddress(repl.NodeID)
if err != nil {
- log.Errorf(ctx, "missing address for node %d: %s", repl.NodeID, err)
+ log.Errorf(ctx, "missing address for n%d: %s", repl.NodeID, err)
continue
}
remoteLatency, ok := a.nodeLatencyFn(addr.String())
@@ -912,20 +942,24 @@ func (a Allocator) shouldTransferLeaseUsingStats(
}
remoteWeight := math.Max(minReplicaWeight, replicaWeights[repl.NodeID])
- score := loadBasedLeaseRebalanceScore(
+ replScore, rebalanceAdjustment := loadBasedLeaseRebalanceScore(
ctx, a.storePool.st, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, sl.candidateLeases.mean)
- if score > bestReplScore {
- bestReplScore = score
+ if replScore > bestReplScore {
+ bestReplScore = replScore
bestRepl = repl
}
+ if rebalanceAdjustments != nil {
+ rebalanceAdjustments[repl.StoreID] = rebalanceAdjustment
+ }
}
- // Return the best replica even in cases where transferring is not advised in
- // order to support forced lease transfers, such as when removing a replica or
- // draining all leases before shutdown.
if bestReplScore > 0 {
return shouldTransfer, bestRepl
}
+
+ // Return the best replica even in cases where transferring is not advised in
+ // order to support forced lease transfers, such as when removing a replica or
+ // draining all leases before shutdown.
return shouldNotTransfer, bestRepl
}
@@ -948,7 +982,7 @@ func (a Allocator) shouldTransferLeaseUsingStats(
// logic behind each part of the formula is as follows:
//
// * LeaseRebalancingAggressiveness: Allow the aggressiveness to be tuned via
-// an environment variable.
+// a cluster setting.
// * 0.1: Constant factor to reduce aggressiveness by default
// * math.Log10(remoteWeight/sourceWeight): Comparison of the remote replica's
// weight to the local replica's weight. Taking the log of the ratio instead
@@ -963,6 +997,18 @@ func (a Allocator) shouldTransferLeaseUsingStats(
// of the ideal number of leases on each store. We then calculate these to
// compare how close each node is to its ideal state and use the differences
// from the ideal state on each node to compute a final score.
+//
+// Returns a total score for the replica that takes into account the number of
+// leases already on each store. Also returns the raw "adjustment" value that's
+// purely based on replica weights and latency in order for the caller to
+// determine how large a role the user's workload played in the decision. The
+// adjustment value is positive if the remote store is preferred for load-based
+// reasons or negative if the local store is preferred. The magnitude depends
+// on the difference in load and the latency between the nodes.
+//
+// TODO(a-robinson): Should this be changed to avoid even thinking about lease
+// counts now that we try to spread leases and replicas based on QPS? As is it
+// may fight back a little bit against store-level QPS-based rebalancing.
func loadBasedLeaseRebalanceScore(
ctx context.Context,
st *cluster.Settings,
@@ -972,14 +1018,14 @@ func loadBasedLeaseRebalanceScore(
sourceWeight float64,
source roachpb.StoreDescriptor,
meanLeases float64,
-) int32 {
+) (int32, float64) {
remoteLatencyMillis := float64(remoteLatency) / float64(time.Millisecond)
rebalanceAdjustment :=
leaseRebalancingAggressiveness.Get(&st.SV) * 0.1 * math.Log10(remoteWeight/sourceWeight) * math.Log1p(remoteLatencyMillis)
// Start with twice the base rebalance threshold in order to fight more
// strongly against thrashing caused by small variances in the distribution
// of request weights.
- rebalanceThreshold := (2 * baseLeaseRebalanceThreshold) - rebalanceAdjustment
+ rebalanceThreshold := baseLoadBasedLeaseRebalanceThreshold - rebalanceAdjustment
overfullLeaseThreshold := int32(math.Ceil(meanLeases * (1 + rebalanceThreshold)))
overfullScore := source.Capacity.LeaseCount - overfullLeaseThreshold
@@ -995,7 +1041,7 @@ func loadBasedLeaseRebalanceScore(
rebalanceThreshold, meanLeases, source.Capacity.LeaseCount, overfullLeaseThreshold,
remoteStore.Capacity.LeaseCount, underfullLeaseThreshold, totalScore,
)
- return totalScore
+ return totalScore, rebalanceAdjustment
}
func (a Allocator) shouldTransferLeaseWithoutStats(
@@ -1004,9 +1050,14 @@ func (a Allocator) shouldTransferLeaseWithoutStats(
source roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
) bool {
+ // TODO(a-robinson): Should we disable this behavior when load-based lease
+ // rebalancing is enabled? In happy cases it's nice to keep this working
+ // to even out the number of leases in addition to the number of replicas,
+ // but it's certainly a blunt instrument that could undo what we want.
+
// Allow lease transfer if we're above the overfull threshold, which is
- // mean*(1+baseLeaseRebalanceThreshold).
- overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + baseLeaseRebalanceThreshold)))
+ // mean*(1+leaseRebalanceThreshold).
+ overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + leaseRebalanceThreshold)))
minOverfullThreshold := int32(math.Ceil(sl.candidateLeases.mean + 5))
if overfullLeaseThreshold < minOverfullThreshold {
overfullLeaseThreshold = minOverfullThreshold
@@ -1016,7 +1067,7 @@ func (a Allocator) shouldTransferLeaseWithoutStats(
}
if float64(source.Capacity.LeaseCount) > sl.candidateLeases.mean {
- underfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 - baseLeaseRebalanceThreshold)))
+ underfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 - leaseRebalanceThreshold)))
minUnderfullThreshold := int32(math.Ceil(sl.candidateLeases.mean - 5))
if underfullLeaseThreshold > minUnderfullThreshold {
underfullLeaseThreshold = minUnderfullThreshold
diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go
index 9d24fadef684..3a9155692bf4 100644
--- a/pkg/storage/allocator_scorer.go
+++ b/pkg/storage/allocator_scorer.go
@@ -59,8 +59,8 @@ const (
// If disabled, rebalancing is done purely based on replica count.
var EnableStatsBasedRebalancing = settings.RegisterBoolSetting(
"kv.allocator.stat_based_rebalancing.enabled",
- "set to enable rebalancing of range replicas based on write load and disk usage",
- false,
+ "set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster",
+ false, // TODO(a-robinson): switch to true for v2.1 once the store-rebalancer is done
)
// rangeRebalanceThreshold is the minimum ratio of a store's range count to
diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go
index d93f7b4d2fc4..1a3ae4e70d23 100644
--- a/pkg/storage/allocator_test.go
+++ b/pkg/storage/allocator_test.go
@@ -1248,6 +1248,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
+ EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false)
ctx := context.Background()
defer stopper.Stop(ctx)
@@ -2040,6 +2041,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
+ EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false)
defer stopper.Stop(context.Background())
stores := []*roachpb.StoreDescriptor{
@@ -3817,7 +3819,7 @@ func TestLoadBasedLeaseRebalanceScore(t *testing.T) {
for _, c := range testCases {
remoteStore.Capacity.LeaseCount = c.remoteLeases
sourceStore.Capacity.LeaseCount = c.sourceLeases
- score := loadBasedLeaseRebalanceScore(
+ score, _ := loadBasedLeaseRebalanceScore(
context.Background(),
st,
c.remoteWeight,
diff --git a/pkg/storage/replica_rankings.go b/pkg/storage/replica_rankings.go
new file mode 100644
index 000000000000..13b4a99f582d
--- /dev/null
+++ b/pkg/storage/replica_rankings.go
@@ -0,0 +1,133 @@
+// Copyright 2018 The Cockroach Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied. See the License for the specific language governing
+// permissions and limitations under the License.
+
+package storage
+
+import (
+ "container/heap"
+
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+)
+
+const (
+ // TODO(a-robinson): Scale this up based on the number of replicas on a store?
+ numTopReplicasToTrack = 128
+)
+
+type replicaWithStats struct {
+ repl *Replica
+ qps float64
+}
+
+// replicaRankings maintains top-k orderings of the replicas in a store along
+// different dimensions of concern, such as QPS, keys written per second, and
+// disk used.
+type replicaRankings struct {
+ mu struct {
+ syncutil.Mutex
+ qpsAccumulator *rrAccumulator
+ byQPS []replicaWithStats
+ }
+}
+
+func newReplicaRankings() *replicaRankings {
+ return &replicaRankings{}
+}
+
+func (rr *replicaRankings) newAccumulator() *rrAccumulator {
+ res := &rrAccumulator{}
+ res.qps.val = func(r replicaWithStats) float64 { return r.qps }
+ return res
+}
+
+func (rr *replicaRankings) update(acc *rrAccumulator) {
+ rr.mu.Lock()
+ rr.mu.qpsAccumulator = acc
+ rr.mu.Unlock()
+}
+
+func (rr *replicaRankings) topQPS() []replicaWithStats {
+ rr.mu.Lock()
+ defer rr.mu.Unlock()
+ // If we have a new set of data, consume it. Otherwise, just return the most
+ // recently consumed data.
+ if rr.mu.qpsAccumulator.qps.Len() > 0 {
+ rr.mu.byQPS = consumeAccumulator(&rr.mu.qpsAccumulator.qps)
+ }
+ return rr.mu.byQPS
+}
+
+// rrAccumulator is used to update the replicas tracked by replicaRankings.
+// The typical pattern should be to call replicaRankings.newAccumulator, add
+// all the replicas you care about to the accumulator using addReplica, then
+// pass the accumulator back to the replicaRankings using the update method.
+// This method of loading the new rankings all at once avoids interfering with
+// any consumers that are concurrently reading from the rankings, and also
+// prevents concurrent loaders of data from messing with each other -- the last
+// `update`d accumulator will win.
+type rrAccumulator struct {
+ qps rrPriorityQueue
+}
+
+func (a *rrAccumulator) addReplica(repl replicaWithStats) {
+ // If the heap isn't full, just push the new replica and return.
+ if a.qps.Len() < numTopReplicasToTrack {
+ heap.Push(&a.qps, repl)
+ return
+ }
+
+ // Otherwise, conditionally push if the new replica is more deserving than
+ // the current tip of the heap.
+ if repl.qps > a.qps.entries[0].qps {
+ heap.Pop(&a.qps)
+ heap.Push(&a.qps, repl)
+ }
+}
+
+func consumeAccumulator(pq *rrPriorityQueue) []replicaWithStats {
+ length := pq.Len()
+ sorted := make([]replicaWithStats, length)
+ for i := 1; i <= length; i++ {
+ sorted[length-i] = heap.Pop(pq).(replicaWithStats)
+ }
+ return sorted
+}
+
+type rrPriorityQueue struct {
+ entries []replicaWithStats
+ val func(replicaWithStats) float64
+}
+
+func (pq rrPriorityQueue) Len() int { return len(pq.entries) }
+
+func (pq rrPriorityQueue) Less(i, j int) bool {
+ return pq.val(pq.entries[i]) < pq.val(pq.entries[j])
+}
+
+func (pq rrPriorityQueue) Swap(i, j int) {
+ pq.entries[i], pq.entries[j] = pq.entries[j], pq.entries[i]
+}
+
+func (pq *rrPriorityQueue) Push(x interface{}) {
+ item := x.(replicaWithStats)
+ pq.entries = append(pq.entries, item)
+}
+
+func (pq *rrPriorityQueue) Pop() interface{} {
+ old := pq.entries
+ n := len(old)
+ item := old[n-1]
+ pq.entries = old[0 : n-1]
+ return item
+}
diff --git a/pkg/storage/replica_rankings_test.go b/pkg/storage/replica_rankings_test.go
new file mode 100644
index 000000000000..80c007790337
--- /dev/null
+++ b/pkg/storage/replica_rankings_test.go
@@ -0,0 +1,77 @@
+// Copyright 2018 The Cockroach Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied. See the License for the specific language governing
+// permissions and limitations under the License.
+
+package storage
+
+import (
+ "math/rand"
+ "reflect"
+ "testing"
+
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+)
+
+func TestReplicaRankings(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ rr := newReplicaRankings()
+
+ testCases := []struct {
+ replicasByQPS []float64
+ }{
+ {replicasByQPS: []float64{}},
+ {replicasByQPS: []float64{0}},
+ {replicasByQPS: []float64{1, 0}},
+ {replicasByQPS: []float64{3, 2, 1, 0}},
+ {replicasByQPS: []float64{3, 3, 2, 2, 1, 1, 0, 0}},
+ {replicasByQPS: []float64{1.1, 1.0, 0.9, -0.9, -1.0, -1.1}},
+ }
+
+ for _, tc := range testCases {
+ acc := rr.newAccumulator()
+
+ // Randomize the order of the inputs each time the test is run.
+ want := make([]float64, len(tc.replicasByQPS))
+ copy(want, tc.replicasByQPS)
+ rand.Shuffle(len(tc.replicasByQPS), func(i, j int) {
+ tc.replicasByQPS[i], tc.replicasByQPS[j] = tc.replicasByQPS[j], tc.replicasByQPS[i]
+ })
+
+ for i, replQPS := range tc.replicasByQPS {
+ acc.addReplica(replicaWithStats{
+ repl: &Replica{RangeID: roachpb.RangeID(i)},
+ qps: replQPS,
+ })
+ }
+ rr.update(acc)
+
+ // Make sure we can read off all expected replicas in the correct order.
+ repls := rr.topQPS()
+ if len(repls) != len(want) {
+ t.Errorf("wrong number of replicas in output; got: %v; want: %v", repls, tc.replicasByQPS)
+ continue
+ }
+ for i := range want {
+ if repls[i].qps != want[i] {
+ t.Errorf("got %f for %d'th element; want %f (input: %v)", repls[i].qps, i, want, tc.replicasByQPS)
+ break
+ }
+ }
+ replsCopy := rr.topQPS()
+ if !reflect.DeepEqual(repls, replsCopy) {
+ t.Errorf("got different replicas on second call to topQPS; first call: %v, second call: %v", repls, replsCopy)
+ }
+ }
+}
diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go
index 6ae97ae6bc5a..5578ed1bb528 100644
--- a/pkg/storage/replicate_queue.go
+++ b/pkg/storage/replicate_queue.go
@@ -374,7 +374,7 @@ func (rq *replicateQueue) processOneChange(
// out of situations where this store is overfull and yet holds all the
// leases. The fullness checks need to be ignored for cases where
// a replica needs to be removed for constraint violations.
- transferred, err := rq.transferLease(
+ transferred, err := rq.findTargetAndTransferLease(
ctx,
repl,
desc,
@@ -419,7 +419,7 @@ func (rq *replicateQueue) processOneChange(
if dryRun {
return false, nil
}
- transferred, err := rq.transferLease(
+ transferred, err := rq.findTargetAndTransferLease(
ctx,
repl,
desc,
@@ -503,7 +503,7 @@ func (rq *replicateQueue) processOneChange(
if canTransferLease() {
// We require the lease in order to process replicas, so
// repl.store.StoreID() corresponds to the lease-holder's store ID.
- transferred, err := rq.transferLease(
+ transferred, err := rq.findTargetAndTransferLease(
ctx,
repl,
desc,
@@ -537,7 +537,7 @@ type transferLeaseOptions struct {
dryRun bool
}
-func (rq *replicateQueue) transferLease(
+func (rq *replicateQueue) findTargetAndTransferLease(
ctx context.Context,
repl *Replica,
desc *roachpb.RangeDescriptor,
@@ -545,7 +545,7 @@ func (rq *replicateQueue) transferLease(
opts transferLeaseOptions,
) (bool, error) {
candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas, 0 /* brandNewReplicaID */)
- if target := rq.allocator.TransferLeaseTarget(
+ target := rq.allocator.TransferLeaseTarget(
ctx,
zone,
candidates,
@@ -555,24 +555,35 @@ func (rq *replicateQueue) transferLease(
opts.checkTransferLeaseSource,
opts.checkCandidateFullness,
false, /* alwaysAllowDecisionWithoutStats */
- ); target != (roachpb.ReplicaDescriptor{}) {
- rq.metrics.TransferLeaseCount.Inc(1)
+ )
+ if target == (roachpb.ReplicaDescriptor{}) {
+ return false, nil
+ }
+
+ if opts.dryRun {
log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
- if opts.dryRun {
- return false, nil
- }
- avgQPS, qpsMeasurementDur := repl.leaseholderStats.avgQPS()
- if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil {
- return false, errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
- }
- rq.lastLeaseTransfer.Store(timeutil.Now())
- if qpsMeasurementDur >= MinStatsDuration {
- rq.allocator.storePool.updateLocalStoresAfterLeaseTransfer(
- repl.store.StoreID(), target.StoreID, avgQPS)
- }
- return true, nil
+ return false, nil
}
- return false, nil
+
+ err := rq.transferLease(ctx, repl, target)
+ return err == nil, err
+}
+
+func (rq *replicateQueue) transferLease(
+ ctx context.Context, repl *Replica, target roachpb.ReplicaDescriptor,
+) error {
+ rq.metrics.TransferLeaseCount.Inc(1)
+ log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
+ avgQPS, qpsMeasurementDur := repl.leaseholderStats.avgQPS()
+ if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil {
+ return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
+ }
+ rq.lastLeaseTransfer.Store(timeutil.Now())
+ if qpsMeasurementDur >= MinStatsDuration {
+ rq.allocator.storePool.updateLocalStoresAfterLeaseTransfer(
+ repl.store.StoreID(), target.StoreID, avgQPS)
+ }
+ return nil
}
func (rq *replicateQueue) addReplica(
diff --git a/pkg/storage/store.go b/pkg/storage/store.go
index 18fdc629ad3c..a3bfb253bd67 100644
--- a/pkg/storage/store.go
+++ b/pkg/storage/store.go
@@ -374,10 +374,12 @@ type Store struct {
Ident *roachpb.StoreIdent // pointer to catch access before Start() is called
cfg StoreConfig
db *client.DB
- engine engine.Engine // The underlying key-value store
- compactor *compactor.Compactor // Schedules compaction of the engine
- tsCache tscache.Cache // Most recent timestamps for keys / key ranges
- allocator Allocator // Makes allocation decisions
+ engine engine.Engine // The underlying key-value store
+ compactor *compactor.Compactor // Schedules compaction of the engine
+ tsCache tscache.Cache // Most recent timestamps for keys / key ranges
+ allocator Allocator // Makes allocation decisions
+ replRankings *replicaRankings
+ storeRebalancer *StoreRebalancer
rangeIDAlloc *idalloc.Allocator // Range ID allocator
gcQueue *gcQueue // Garbage collection queue
mergeQueue *mergeQueue // Range merging queue
@@ -924,6 +926,7 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
return 0, false
})
}
+ s.replRankings = newReplicaRankings()
s.intentResolver = newIntentResolver(s, cfg.IntentResolverTaskLimit)
s.raftEntryCache = newRaftEntryCache(cfg.RaftEntryCacheSize)
s.draining.Store(false)
@@ -1010,6 +1013,9 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
}
}
+ s.storeRebalancer = NewStoreRebalancer(
+ s.cfg.AmbientCtx, cfg.Settings, s.replicateQueue, s.replRankings)
+
if cfg.TestingKnobs.DisableGCQueue {
s.setGCQueueActive(false)
}
@@ -1142,7 +1148,7 @@ func (s *Store) SetDraining(drain bool) {
log.Errorf(ctx, "could not get zone config for key %s when draining: %s", desc.StartKey, err)
}
}
- leaseTransferred, err := s.replicateQueue.transferLease(
+ leaseTransferred, err := s.replicateQueue.findTargetAndTransferLease(
ctx,
r,
desc,
@@ -1541,6 +1547,8 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.startLeaseRenewer(ctx)
}
+ s.storeRebalancer.Start(ctx, s.stopper, s.StoreID())
+
// Start the storage engine compactor.
if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) {
s.compactor.Start(s.AnnotateCtx(context.Background()), s.stopper)
@@ -2776,6 +2784,7 @@ func (s *Store) Capacity(useCached bool) (roachpb.StoreCapacity, error) {
replicaCount := s.metrics.ReplicaCount.Value()
bytesPerReplica := make([]float64, 0, replicaCount)
writesPerReplica := make([]float64, 0, replicaCount)
+ rankingsAccumulator := s.replRankings.newAccumulator()
newStoreReplicaVisitor(s).Visit(func(r *Replica) bool {
rangeCount++
if r.OwnsValidLease(now) {
@@ -2788,14 +2797,20 @@ func (s *Store) Capacity(useCached bool) (roachpb.StoreCapacity, error) {
// incorrectly low the first time or two it gets gossiped when a store
// starts? We can't easily have a countdown as its value changes like for
// leases/replicas.
- if qps, dur := r.leaseholderStats.avgQPS(); dur >= MinStatsDuration {
- totalQueriesPerSecond += qps
+ var qps float64
+ if avgQPS, dur := r.leaseholderStats.avgQPS(); dur >= MinStatsDuration {
+ qps = avgQPS
+ totalQueriesPerSecond += avgQPS
// TODO(a-robinson): Calculate percentiles for qps? Get rid of other percentiles?
}
if wps, dur := r.writeStats.avgQPS(); dur >= MinStatsDuration {
totalWritesPerSecond += wps
writesPerReplica = append(writesPerReplica, wps)
}
+ rankingsAccumulator.addReplica(replicaWithStats{
+ repl: r,
+ qps: qps,
+ })
return true
})
capacity.RangeCount = rangeCount
@@ -2806,6 +2821,7 @@ func (s *Store) Capacity(useCached bool) (roachpb.StoreCapacity, error) {
capacity.BytesPerReplica = roachpb.PercentilesFromData(bytesPerReplica)
capacity.WritesPerReplica = roachpb.PercentilesFromData(writesPerReplica)
s.recordNewPerSecondStats(totalQueriesPerSecond, totalWritesPerSecond)
+ s.replRankings.update(rankingsAccumulator)
s.cachedCapacity.Lock()
s.cachedCapacity.StoreCapacity = capacity
diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go
new file mode 100644
index 000000000000..5be226f33073
--- /dev/null
+++ b/pkg/storage/store_rebalancer.go
@@ -0,0 +1,309 @@
+// Copyright 2018 The Cockroach Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied. See the License for the specific language governing
+// permissions and limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "math"
+ "sort"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/config"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/stop"
+)
+
+const (
+ // storeRebalancerTimerDuration is how frequently to check the store-level
+ // balance of the cluster.
+ storeRebalancerTimerDuration = time.Minute
+
+ // minQPSThresholdDifference is the minimum QPS difference from the cluster
+ // mean that this system should care about. In other words, we won't worry
+ // about rebalancing for QPS reasons if a store's QPS differs from the mean
+ // by less than this amount even if the amount is greater than the percentage
+ // threshold. This avoids too many lease transfers in lightly loaded clusters.
+ minQPSThresholdDifference = 100
+)
+
+// StoreRebalancer is responsible for examining how the associated store's load
+// compares to the load on other stores in the cluster and transferring leases
+// or replicas away if the local store is overloaded.
+//
+// This isn't implemented as a Queue because the Queues all operate on one
+// replica at a time, making a local decision about each replica. Queues don't
+// really know how the replica they're looking at compares to other replicas on
+// the store. Our goal is balancing stores, though, so it's preferable to make
+// decisions about each store and then carefully pick replicas to move that
+// will best accomplish the store-level goals.
+type StoreRebalancer struct {
+ log.AmbientContext
+ st *cluster.Settings
+ rq *replicateQueue
+ replRankings *replicaRankings
+}
+
+// NewStoreRebalancer creates a StoreRebalancer to work in tandem with the
+// provided replicateQueue.
+func NewStoreRebalancer(
+ ambientCtx log.AmbientContext,
+ st *cluster.Settings,
+ rq *replicateQueue,
+ replRankings *replicaRankings,
+) *StoreRebalancer {
+ ambientCtx.AddLogTag("store-rebalancer", nil)
+ return &StoreRebalancer{
+ AmbientContext: ambientCtx,
+ st: st,
+ rq: rq,
+ replRankings: replRankings,
+ }
+}
+
+// Start runs an infinite loop in a goroutine which regularly checks whether
+// the store is overloaded along any important dimension (e.g. range count,
+// QPS, disk usage), and if so attempts to correct that by moving leases or
+// replicas elsewhere.
+//
+// This worker acts on store-level imbalances, whereas the replicate queue
+// makes decisions based on the zone config constraints and diversity of
+// individual ranges. This means that there are two different workers that
+// could potentially be making decisions about a given range, so they have to
+// be careful to avoid stepping on each others' toes.
+//
+// TODO(a-robinson): Expose metrics to make this understandable without having
+// to dive into logspy.
+func (sr *StoreRebalancer) Start(
+ ctx context.Context, stopper *stop.Stopper, storeID roachpb.StoreID,
+) {
+ ctx = sr.AnnotateCtx(ctx)
+
+ // Start a goroutine that watches and proactively renews certain
+ // expiration-based leases.
+ stopper.RunWorker(ctx, func(ctx context.Context) {
+ ticker := time.NewTicker(storeRebalancerTimerDuration)
+ defer ticker.Stop()
+ for {
+ // Wait out the first tick before doing anything since the store is still
+ // starting up and we might as well wait for some qps/wps stats to
+ // accumulate.
+ select {
+ case <-stopper.ShouldQuiesce():
+ return
+ case <-ticker.C:
+ }
+
+ if !EnableStatsBasedRebalancing.Get(&sr.st.SV) {
+ continue
+ }
+
+ localDesc, found := sr.rq.allocator.storePool.getStoreDescriptor(storeID)
+ if !found {
+ log.Warningf(ctx, "StorePool missing descriptor for local store")
+ continue
+ }
+ storeList, _, _ := sr.rq.allocator.storePool.getStoreList(roachpb.RangeID(0), storeFilterNone)
+ sr.rebalanceStore(ctx, localDesc, storeList)
+ }
+ })
+}
+
+func (sr *StoreRebalancer) rebalanceStore(
+ ctx context.Context, localDesc roachpb.StoreDescriptor, storeList StoreList,
+) {
+
+ statThreshold := statRebalanceThreshold.Get(&sr.st.SV)
+
+ // First check if we should transfer leases away to better balance QPS.
+ qpsMinThreshold := math.Min(storeList.candidateQueriesPerSecond.mean*(1-statThreshold),
+ storeList.candidateQueriesPerSecond.mean-minQPSThresholdDifference)
+ qpsMaxThreshold := math.Max(storeList.candidateQueriesPerSecond.mean*(1+statThreshold),
+ storeList.candidateQueriesPerSecond.mean+minQPSThresholdDifference)
+
+ if !(localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold) {
+ log.VEventf(ctx, 1, "local QPS %.2f is below max threshold %.2f (mean=%.2f); no rebalancing needed",
+ localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold, storeList.candidateQueriesPerSecond.mean)
+ return
+ }
+
+ storeMap := storeListToMap(storeList)
+ sysCfg, cfgOk := sr.rq.allocator.storePool.gossip.GetSystemConfig()
+ if !cfgOk {
+ log.VEventf(ctx, 1, "no system config available, unable to choose lease transfer targets")
+ return
+ }
+
+ log.Infof(ctx, "considering load-based lease transfers for s%d with %.2f qps (mean=%.2f, upperThreshold=%.2f)",
+ localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold)
+
+ hottestRanges := sr.replRankings.topQPS()
+ for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold {
+ replWithStats, target := sr.chooseLeaseToTransfer(
+ ctx, sysCfg, &hottestRanges, localDesc, storeList, storeMap, qpsMinThreshold, qpsMaxThreshold)
+ if replWithStats.repl == nil {
+ log.Infof(ctx,
+ "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f)",
+ localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold)
+ break
+ }
+ log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load",
+ replWithStats.repl.RangeID, replWithStats.qps, target.StoreID)
+ replCtx := replWithStats.repl.AnnotateCtx(ctx)
+ if err := sr.rq.transferLease(replCtx, replWithStats.repl, target); err != nil {
+ log.Errorf(replCtx, "unable to transfer lease to s%d: %v", target.StoreID, err)
+ continue
+ }
+ // Finally, update our local copies of the descriptors so that if
+ // additional transfers are needed we'll be making the decisions with more
+ // up-to-date info.
+ localDesc.Capacity.LeaseCount--
+ localDesc.Capacity.QueriesPerSecond -= replWithStats.qps
+ if otherDesc := storeMap[target.StoreID]; otherDesc != nil {
+ otherDesc.Capacity.LeaseCount++
+ otherDesc.Capacity.QueriesPerSecond += replWithStats.qps
+ }
+ }
+}
+
+// TODO(a-robinson): Should we take the number of leases on each store into
+// account here or just continue to let that happen in allocator.go?
+func (sr *StoreRebalancer) chooseLeaseToTransfer(
+ ctx context.Context,
+ sysCfg config.SystemConfig,
+ hottestRanges *[]replicaWithStats,
+ localDesc roachpb.StoreDescriptor,
+ storeList StoreList,
+ storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor,
+ minQPS float64,
+ maxQPS float64,
+) (replicaWithStats, roachpb.ReplicaDescriptor) {
+ now := sr.rq.store.Clock().Now()
+ for {
+ if len(*hottestRanges) == 0 {
+ return replicaWithStats{}, roachpb.ReplicaDescriptor{}
+ }
+ replWithStats := (*hottestRanges)[0]
+ *hottestRanges = (*hottestRanges)[1:]
+
+ // We're all out of replicas.
+ if replWithStats.repl == nil {
+ return replicaWithStats{}, roachpb.ReplicaDescriptor{}
+ }
+
+ if !replWithStats.repl.OwnsValidLease(now) {
+ log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID)
+ continue
+ }
+
+ if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS {
+ log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)",
+ replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS)
+ continue
+ }
+
+ // Don't bother moving leases whose QPS is below some small fraction of the
+ // store's QPS (unless the store has extra leases to spare anyway). It's
+ // just unnecessary churn with no benefit to move leases responsible for,
+ // for example, 1 qps on a store with 5000 qps.
+ const minQPSFraction = .001
+ if replWithStats.qps < localDesc.Capacity.QueriesPerSecond*minQPSFraction &&
+ float64(localDesc.Capacity.LeaseCount) <= storeList.candidateLeases.mean {
+ log.VEventf(ctx, 5, "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps",
+ replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, localDesc.Capacity.QueriesPerSecond)
+ continue
+ }
+
+ desc := replWithStats.repl.Desc()
+ log.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps", desc.RangeID, replWithStats.qps)
+
+ // Check all the other replicas in order of increasing qps.
+ replicas := make([]roachpb.ReplicaDescriptor, len(desc.Replicas))
+ copy(replicas, desc.Replicas)
+ sort.Slice(replicas, func(i, j int) bool {
+ var iQPS, jQPS float64
+ if desc := storeMap[replicas[i].StoreID]; desc != nil {
+ iQPS = desc.Capacity.QueriesPerSecond
+ }
+ if desc := storeMap[replicas[j].StoreID]; desc != nil {
+ jQPS = desc.Capacity.QueriesPerSecond
+ }
+ return iQPS < jQPS
+ })
+
+ for _, candidate := range replicas {
+ if candidate.StoreID == localDesc.StoreID {
+ continue
+ }
+ storeDesc, ok := storeMap[candidate.StoreID]
+ if !ok {
+ log.VEventf(ctx, 3, "missing store descriptor for s%d", candidate.StoreID)
+ continue
+ }
+
+ newCandidateQPS := storeDesc.Capacity.QueriesPerSecond + replWithStats.qps
+ if storeDesc.Capacity.QueriesPerSecond < minQPS {
+ if newCandidateQPS > maxQPS {
+ log.VEventf(ctx, 3,
+ "r%d's %.2f qps would push s%d over the max threshold (%.2f) with %.2f qps afterwards",
+ desc.RangeID, replWithStats.qps, candidate.StoreID, maxQPS, newCandidateQPS)
+ continue
+ }
+ } else if newCandidateQPS > storeList.candidateQueriesPerSecond.mean {
+ log.VEventf(ctx, 3,
+ "r%d's %.2f qps would push s%d over the mean (%.2f) with %.2f qps afterwards",
+ desc.RangeID, replWithStats.qps, candidate.StoreID,
+ storeList.candidateQueriesPerSecond.mean, newCandidateQPS)
+ continue
+ }
+
+ zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey)
+ if err != nil {
+ log.Error(ctx, err)
+ return replicaWithStats{}, roachpb.ReplicaDescriptor{}
+ }
+ preferred := sr.rq.allocator.preferredLeaseholders(zone, desc.Replicas)
+ if len(preferred) > 0 && !storeHasReplica(candidate.StoreID, preferred) {
+ log.VEventf(ctx, 3, "s%d not a preferred leaseholder; preferred: %v", candidate.StoreID, preferred)
+ continue
+ }
+
+ filteredStoreList := storeList.filter(zone.Constraints)
+ if sr.rq.allocator.followTheWorkloadPrefersLocal(
+ ctx,
+ filteredStoreList,
+ localDesc,
+ candidate.StoreID,
+ desc.Replicas,
+ replWithStats.repl.leaseholderStats,
+ ) {
+ log.VEventf(ctx, 3, "r%d is on s%d due to follow-the-workload; skipping",
+ desc.RangeID, localDesc.StoreID)
+ continue
+ }
+
+ return replWithStats, candidate
+ }
+ }
+}
+
+func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor {
+ storeMap := make(map[roachpb.StoreID]*roachpb.StoreDescriptor)
+ for i := range sl.stores {
+ storeMap[sl.stores[i].StoreID] = &sl.stores[i]
+ }
+ return storeMap
+}
diff --git a/pkg/storage/store_rebalancer_test.go b/pkg/storage/store_rebalancer_test.go
new file mode 100644
index 000000000000..bec59923cf9d
--- /dev/null
+++ b/pkg/storage/store_rebalancer_test.go
@@ -0,0 +1,165 @@
+// Copyright 2018 The Cockroach Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied. See the License for the specific language governing
+// permissions and limitations under the License.
+
+package storage
+
+import (
+ "context"
+ "testing"
+
+ "github.com/cockroachdb/cockroach/pkg/config"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
+ "github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/stop"
+)
+
+var (
+ // noLocalityStores specifies a set of stores where one store is
+ // under-utilized in terms of QPS, three are in the middle, and one is
+ // over-utilized.
+ noLocalityStores = []*roachpb.StoreDescriptor{
+ {
+ StoreID: 1,
+ Node: roachpb.NodeDescriptor{NodeID: 1},
+ Capacity: roachpb.StoreCapacity{
+ QueriesPerSecond: 1500,
+ },
+ },
+ {
+ StoreID: 2,
+ Node: roachpb.NodeDescriptor{NodeID: 2},
+ Capacity: roachpb.StoreCapacity{
+ QueriesPerSecond: 1100,
+ },
+ },
+ {
+ StoreID: 3,
+ Node: roachpb.NodeDescriptor{NodeID: 3},
+ Capacity: roachpb.StoreCapacity{
+ QueriesPerSecond: 1000,
+ },
+ },
+ {
+ StoreID: 4,
+ Node: roachpb.NodeDescriptor{NodeID: 4},
+ Capacity: roachpb.StoreCapacity{
+ QueriesPerSecond: 900,
+ },
+ },
+ {
+ StoreID: 5,
+ Node: roachpb.NodeDescriptor{NodeID: 5},
+ Capacity: roachpb.StoreCapacity{
+ QueriesPerSecond: 500,
+ },
+ },
+ }
+)
+
+type testRange struct {
+ // The first storeID in the list will be the leaseholder.
+ storeIDs []roachpb.StoreID
+ qps float64
+}
+
+func loadRanges(rr *replicaRankings, s *Store, ranges []testRange) {
+ acc := rr.newAccumulator()
+ for _, r := range ranges {
+ repl := &Replica{store: s}
+ repl.mu.state.Desc = &roachpb.RangeDescriptor{}
+ for _, storeID := range r.storeIDs {
+ repl.mu.state.Desc.Replicas = append(repl.mu.state.Desc.Replicas, roachpb.ReplicaDescriptor{
+ NodeID: roachpb.NodeID(storeID),
+ StoreID: storeID,
+ ReplicaID: roachpb.ReplicaID(storeID),
+ })
+ }
+ repl.mu.state.Lease = &roachpb.Lease{
+ Expiration: &hlc.MaxTimestamp,
+ Replica: repl.mu.state.Desc.Replicas[0],
+ }
+ acc.addReplica(replicaWithStats{
+ repl: repl,
+ qps: r.qps,
+ })
+ }
+ rr.update(acc)
+}
+
+func TestChooseLeaseToTransfer(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ ctx := context.Background()
+ stopper := stop.NewStopper()
+ defer stopper.Stop(ctx)
+
+ stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
+ defer stopper.Stop(context.Background())
+ gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t)
+ storeList, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled)
+ storeMap := storeListToMap(storeList)
+
+ const minQPS = 800
+ const maxQPS = 1200
+
+ localDesc := *noLocalityStores[0]
+ cfg := TestStoreConfig(nil)
+ s := createTestStoreWithoutStart(t, stopper, &cfg)
+ s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID}
+ rq := newReplicateQueue(s, g, a)
+ rr := newReplicaRankings()
+
+ sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr)
+
+ testCases := []struct {
+ storeIDs []roachpb.StoreID
+ qps float64
+ expectTarget roachpb.StoreID
+ }{
+ {[]roachpb.StoreID{1}, 100, 0},
+ {[]roachpb.StoreID{1, 2}, 100, 0},
+ {[]roachpb.StoreID{1, 3}, 100, 0},
+ {[]roachpb.StoreID{1, 4}, 100, 4},
+ {[]roachpb.StoreID{1, 5}, 100, 5},
+ {[]roachpb.StoreID{5, 1}, 100, 0},
+ {[]roachpb.StoreID{1, 2}, 200, 0},
+ {[]roachpb.StoreID{1, 3}, 200, 0},
+ {[]roachpb.StoreID{1, 4}, 200, 0},
+ {[]roachpb.StoreID{1, 5}, 200, 5},
+ {[]roachpb.StoreID{1, 2}, 500, 0},
+ {[]roachpb.StoreID{1, 3}, 500, 0},
+ {[]roachpb.StoreID{1, 4}, 500, 0},
+ {[]roachpb.StoreID{1, 5}, 500, 5},
+ {[]roachpb.StoreID{1, 5}, 600, 5},
+ {[]roachpb.StoreID{1, 5}, 700, 5},
+ {[]roachpb.StoreID{1, 5}, 800, 0},
+ {[]roachpb.StoreID{1, 4}, 1.5, 4},
+ {[]roachpb.StoreID{1, 5}, 1.5, 5},
+ {[]roachpb.StoreID{1, 4}, 1.49, 0},
+ {[]roachpb.StoreID{1, 5}, 1.49, 0},
+ }
+
+ for _, tc := range testCases {
+ loadRanges(rr, s, []testRange{{storeIDs: tc.storeIDs, qps: tc.qps}})
+ hottestRanges := rr.topQPS()
+ _, target := sr.chooseLeaseToTransfer(
+ ctx, config.SystemConfig{}, &hottestRanges, localDesc, storeList, storeMap, minQPS, maxQPS)
+ if target.StoreID != tc.expectTarget {
+ t.Errorf("got target store %d for range with replicas %v and %f qps; want %d",
+ target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget)
+ }
+ }
+}