Skip to content

Commit

Permalink
storage: make lease rebalancing decisions at the store level
Browse files Browse the repository at this point in the history
In order to better balance the QPS being served by each store to avoid
overloaded nodes.

Fixes #21419

Release note (performance improvement): Range leases will be
automatically rebalanced throughout the cluster to even out the amount
of QPS being handled by each node.
  • Loading branch information
a-robinson committed Aug 15, 2018
1 parent 4fcd916 commit fd5df74
Show file tree
Hide file tree
Showing 9 changed files with 551 additions and 44 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<tr><td><code>kv.allocator.lease_rebalancing_aggressiveness</code></td><td>float</td><td><code>1</code></td><td>set greater than 1.0 to rebalance leases toward load more aggressively, or between 0 and 1.0 to be more conservative about rebalancing leases</td></tr>
<tr><td><code>kv.allocator.load_based_lease_rebalancing.enabled</code></td><td>boolean</td><td><code>true</code></td><td>set to enable rebalancing of range leases based on load and latency</td></tr>
<tr><td><code>kv.allocator.range_rebalance_threshold</code></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.allocator.stat_based_rebalancing.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to enable rebalancing of range replicas based on write load and disk usage</td></tr>
<tr><td><code>kv.allocator.stat_based_rebalancing.enabled</code></td><td>boolean</td><td><code>true</code></td><td>set to enable rebalancing of range replicas based on write load and disk usage</td></tr>
<tr><td><code>kv.allocator.stat_rebalance_threshold</code></td><td>float</td><td><code>0.2</code></td><td>minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_export_requests</code></td><td>integer</td><td><code>5</code></td><td>number of export requests a store will handle concurrently before queuing</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_import_requests</code></td><td>integer</td><td><code>1</code></td><td>number of import requests a store will handle concurrently before queuing</td></tr>
Expand Down
87 changes: 69 additions & 18 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,16 @@ import (
)

const (
// baseLeaseRebalanceThreshold is the minimum ratio of a store's lease surplus
// leaseRebalanceThreshold is the minimum ratio of a store's lease surplus
// to the mean range/lease count that permits lease-transfers away from that
// store.
baseLeaseRebalanceThreshold = 0.05
leaseRebalanceThreshold = 0.05

// baseLoadBasedLeaseRebalanceThreshold is the equivalent of
// leaseRebalanceThreshold for load-based lease rebalance decisions (i.e.
// "follow-the-workload"). Its the base threshold for decisions that gets
// adjusted based on the load and latency of the involved ranges/nodes.
baseLoadBasedLeaseRebalanceThreshold = 2 * leaseRebalanceThreshold

// minReplicaWeight sets a floor for how low a replica weight can be. This is
// needed because a weight of zero doesn't work in the current lease scoring
Expand Down Expand Up @@ -714,7 +720,7 @@ func (a *Allocator) TransferLeaseTarget(
// whether we actually should be transferring the lease. The transfer
// decision is only needed if we've been asked to check the source.
transferDec, repl := a.shouldTransferLeaseUsingStats(
ctx, sl, source, existing, stats,
ctx, sl, source, existing, stats, nil,
)
if checkTransferLeaseSource {
switch transferDec {
Expand Down Expand Up @@ -814,7 +820,7 @@ func (a *Allocator) ShouldTransferLease(
return false
}

transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats)
transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats, nil)
var result bool
switch transferDec {
case shouldNotTransfer:
Expand All @@ -831,12 +837,36 @@ func (a *Allocator) ShouldTransferLease(
return result
}

func (a Allocator) followTheWorkloadPrefersLocal(
ctx context.Context,
sl StoreList,
source roachpb.StoreDescriptor,
candidate roachpb.StoreID,
existing []roachpb.ReplicaDescriptor,
stats *replicaStats,
) bool {
adjustments := make(map[roachpb.StoreID]float64)
decision, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats, adjustments)
if decision == decideWithoutStats {
return false
}
adjustment := adjustments[candidate]
if adjustment > baseLoadBasedLeaseRebalanceThreshold {
log.VEventf(ctx, 3,
"s%d is a better fit than s%d due to follow-the-workload (score: %.2f; threshold: %.2f)",
source.StoreID, candidate, adjustment, baseLoadBasedLeaseRebalanceThreshold)
return true
}
return false
}

func (a Allocator) shouldTransferLeaseUsingStats(
ctx context.Context,
sl StoreList,
source roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
stats *replicaStats,
rebalanceAdjustments map[roachpb.StoreID]float64,
) (transferDecision, roachpb.ReplicaDescriptor) {
// Only use load-based rebalancing if it's enabled and we have both
// stats and locality information to base our decision on.
Expand Down Expand Up @@ -903,7 +933,7 @@ func (a Allocator) shouldTransferLeaseUsingStats(
}
addr, err := a.storePool.gossip.GetNodeIDAddress(repl.NodeID)
if err != nil {
log.Errorf(ctx, "missing address for node %d: %s", repl.NodeID, err)
log.Errorf(ctx, "missing address for n%d: %s", repl.NodeID, err)
continue
}
remoteLatency, ok := a.nodeLatencyFn(addr.String())
Expand All @@ -912,20 +942,24 @@ func (a Allocator) shouldTransferLeaseUsingStats(
}

remoteWeight := math.Max(minReplicaWeight, replicaWeights[repl.NodeID])
score := loadBasedLeaseRebalanceScore(
replScore, rebalanceAdjustment := loadBasedLeaseRebalanceScore(
ctx, a.storePool.st, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, sl.candidateLeases.mean)
if score > bestReplScore {
bestReplScore = score
if replScore > bestReplScore {
bestReplScore = replScore
bestRepl = repl
}
if rebalanceAdjustments != nil {
rebalanceAdjustments[repl.StoreID] = rebalanceAdjustment
}
}

// Return the best replica even in cases where transferring is not advised in
// order to support forced lease transfers, such as when removing a replica or
// draining all leases before shutdown.
if bestReplScore > 0 {
return shouldTransfer, bestRepl
}

// Return the best replica even in cases where transferring is not advised in
// order to support forced lease transfers, such as when removing a replica or
// draining all leases before shutdown.
return shouldNotTransfer, bestRepl
}

Expand All @@ -948,7 +982,7 @@ func (a Allocator) shouldTransferLeaseUsingStats(
// logic behind each part of the formula is as follows:
//
// * LeaseRebalancingAggressiveness: Allow the aggressiveness to be tuned via
// an environment variable.
// a cluster setting.
// * 0.1: Constant factor to reduce aggressiveness by default
// * math.Log10(remoteWeight/sourceWeight): Comparison of the remote replica's
// weight to the local replica's weight. Taking the log of the ratio instead
Expand All @@ -963,6 +997,18 @@ func (a Allocator) shouldTransferLeaseUsingStats(
// of the ideal number of leases on each store. We then calculate these to
// compare how close each node is to its ideal state and use the differences
// from the ideal state on each node to compute a final score.
//
// Returns a total score for the replica that takes into account the number of
// leases already on each store. Also returns the raw "adjustment" value that's
// purely based on replica weights and latency in order for the caller to
// determine how large a role the user's workload played in the decision. The
// adjustment value is positive if the remote store is preferred for load-based
// reasons or negative if the local store is preferred. The magnitude depends
// on the difference in load and the latency between the nodes.
//
// TODO(a-robinson): Should this be changed to avoid even thinking about lease
// counts now that we try to spread leases and replicas based on QPS? As is it
// may fight back a little bit against store-level QPS--based rebalancing.
func loadBasedLeaseRebalanceScore(
ctx context.Context,
st *cluster.Settings,
Expand All @@ -972,14 +1018,14 @@ func loadBasedLeaseRebalanceScore(
sourceWeight float64,
source roachpb.StoreDescriptor,
meanLeases float64,
) int32 {
) (int32, float64) {
remoteLatencyMillis := float64(remoteLatency) / float64(time.Millisecond)
rebalanceAdjustment :=
leaseRebalancingAggressiveness.Get(&st.SV) * 0.1 * math.Log10(remoteWeight/sourceWeight) * math.Log1p(remoteLatencyMillis)
// Start with twice the base rebalance threshold in order to fight more
// strongly against thrashing caused by small variances in the distribution
// of request weights.
rebalanceThreshold := (2 * baseLeaseRebalanceThreshold) - rebalanceAdjustment
rebalanceThreshold := baseLoadBasedLeaseRebalanceThreshold - rebalanceAdjustment

overfullLeaseThreshold := int32(math.Ceil(meanLeases * (1 + rebalanceThreshold)))
overfullScore := source.Capacity.LeaseCount - overfullLeaseThreshold
Expand All @@ -995,7 +1041,7 @@ func loadBasedLeaseRebalanceScore(
rebalanceThreshold, meanLeases, source.Capacity.LeaseCount, overfullLeaseThreshold,
remoteStore.Capacity.LeaseCount, underfullLeaseThreshold, totalScore,
)
return totalScore
return totalScore, rebalanceAdjustment
}

func (a Allocator) shouldTransferLeaseWithoutStats(
Expand All @@ -1004,9 +1050,14 @@ func (a Allocator) shouldTransferLeaseWithoutStats(
source roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
) bool {
// TODO(a-robinson): Should we disable this behavior when load-based lease
// rebalancing is enabled? In happy cases it's nice to keep this working
// to even out the number of leases in addition to the number of replicas,
// but it's certainly a blunt instrument that could undo what we want.

// Allow lease transfer if we're above the overfull threshold, which is
// mean*(1+baseLeaseRebalanceThreshold).
overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + baseLeaseRebalanceThreshold)))
// mean*(1+leaseRebalanceThreshold).
overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + leaseRebalanceThreshold)))
minOverfullThreshold := int32(math.Ceil(sl.candidateLeases.mean + 5))
if overfullLeaseThreshold < minOverfullThreshold {
overfullLeaseThreshold = minOverfullThreshold
Expand All @@ -1016,7 +1067,7 @@ func (a Allocator) shouldTransferLeaseWithoutStats(
}

if float64(source.Capacity.LeaseCount) > sl.candidateLeases.mean {
underfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 - baseLeaseRebalanceThreshold)))
underfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 - leaseRebalanceThreshold)))
minUnderfullThreshold := int32(math.Ceil(sl.candidateLeases.mean - 5))
if underfullLeaseThreshold > minUnderfullThreshold {
underfullLeaseThreshold = minUnderfullThreshold
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ const (
// If disabled, rebalancing is done purely based on replica count.
var EnableStatsBasedRebalancing = settings.RegisterBoolSetting(
"kv.allocator.stat_based_rebalancing.enabled",
"set to enable rebalancing of range replicas based on write load and disk usage",
false,
"set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster",
false, // TODO(a-robinson): switch to true for v2.1 once the store-rebalancer is done
)

// rangeRebalanceThreshold is the minimum ratio of a store's range count to
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) {
defer leaktest.AfterTest(t)()

stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false)
ctx := context.Background()
defer stopper.Stop(ctx)

Expand Down Expand Up @@ -2040,6 +2041,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) {
defer leaktest.AfterTest(t)()

stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false)
defer stopper.Stop(context.Background())

stores := []*roachpb.StoreDescriptor{
Expand Down Expand Up @@ -3817,7 +3819,7 @@ func TestLoadBasedLeaseRebalanceScore(t *testing.T) {
for _, c := range testCases {
remoteStore.Capacity.LeaseCount = c.remoteLeases
sourceStore.Capacity.LeaseCount = c.sourceLeases
score := loadBasedLeaseRebalanceScore(
score, _ := loadBasedLeaseRebalanceScore(
context.Background(),
st,
c.remoteWeight,
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/replica_rankings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

const (
// TODO(a-robinson): Scale this up based on the number of replicas on a store?
numTopReplicasToTrack = 128
)

Expand Down
53 changes: 32 additions & 21 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (rq *replicateQueue) processOneChange(
// out of situations where this store is overfull and yet holds all the
// leases. The fullness checks need to be ignored for cases where
// a replica needs to be removed for constraint violations.
transferred, err := rq.transferLease(
transferred, err := rq.findTargetAndTransferLease(
ctx,
repl,
desc,
Expand Down Expand Up @@ -419,7 +419,7 @@ func (rq *replicateQueue) processOneChange(
if dryRun {
return false, nil
}
transferred, err := rq.transferLease(
transferred, err := rq.findTargetAndTransferLease(
ctx,
repl,
desc,
Expand Down Expand Up @@ -503,7 +503,7 @@ func (rq *replicateQueue) processOneChange(
if canTransferLease() {
// We require the lease in order to process replicas, so
// repl.store.StoreID() corresponds to the lease-holder's store ID.
transferred, err := rq.transferLease(
transferred, err := rq.findTargetAndTransferLease(
ctx,
repl,
desc,
Expand Down Expand Up @@ -537,15 +537,15 @@ type transferLeaseOptions struct {
dryRun bool
}

func (rq *replicateQueue) transferLease(
func (rq *replicateQueue) findTargetAndTransferLease(
ctx context.Context,
repl *Replica,
desc *roachpb.RangeDescriptor,
zone config.ZoneConfig,
opts transferLeaseOptions,
) (bool, error) {
candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas, 0 /* brandNewReplicaID */)
if target := rq.allocator.TransferLeaseTarget(
target := rq.allocator.TransferLeaseTarget(
ctx,
zone,
candidates,
Expand All @@ -555,24 +555,35 @@ func (rq *replicateQueue) transferLease(
opts.checkTransferLeaseSource,
opts.checkCandidateFullness,
false, /* alwaysAllowDecisionWithoutStats */
); target != (roachpb.ReplicaDescriptor{}) {
rq.metrics.TransferLeaseCount.Inc(1)
)
if target == (roachpb.ReplicaDescriptor{}) {
return false, nil
}

if opts.dryRun {
log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
if opts.dryRun {
return false, nil
}
avgQPS, qpsMeasurementDur := repl.leaseholderStats.avgQPS()
if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil {
return false, errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
}
rq.lastLeaseTransfer.Store(timeutil.Now())
if qpsMeasurementDur >= MinStatsDuration {
rq.allocator.storePool.updateLocalStoresAfterLeaseTransfer(
repl.store.StoreID(), target.StoreID, avgQPS)
}
return true, nil
return false, nil
}
return false, nil

err := rq.transferLease(ctx, repl, target)
return err == nil, err
}

func (rq *replicateQueue) transferLease(
ctx context.Context, repl *Replica, target roachpb.ReplicaDescriptor,
) error {
rq.metrics.TransferLeaseCount.Inc(1)
log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
avgQPS, qpsMeasurementDur := repl.leaseholderStats.avgQPS()
if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil {
return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
}
rq.lastLeaseTransfer.Store(timeutil.Now())
if qpsMeasurementDur >= MinStatsDuration {
rq.allocator.storePool.updateLocalStoresAfterLeaseTransfer(
repl.store.StoreID(), target.StoreID, avgQPS)
}
return nil
}

func (rq *replicateQueue) addReplica(
Expand Down
8 changes: 7 additions & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ type Store struct {
tsCache tscache.Cache // Most recent timestamps for keys / key ranges
allocator Allocator // Makes allocation decisions
replRankings *replicaRankings
storeRebalancer *StoreRebalancer
rangeIDAlloc *idalloc.Allocator // Range ID allocator
gcQueue *gcQueue // Garbage collection queue
mergeQueue *mergeQueue // Range merging queue
Expand Down Expand Up @@ -992,6 +993,9 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
}
}

s.storeRebalancer = NewStoreRebalancer(
s.cfg.AmbientCtx, cfg.Settings, s.replicateQueue, s.replRankings)

if cfg.TestingKnobs.DisableGCQueue {
s.setGCQueueActive(false)
}
Expand Down Expand Up @@ -1124,7 +1128,7 @@ func (s *Store) SetDraining(drain bool) {
log.Errorf(ctx, "could not get zone config for key %s when draining: %s", desc.StartKey, err)
}
}
leaseTransferred, err := s.replicateQueue.transferLease(
leaseTransferred, err := s.replicateQueue.findTargetAndTransferLease(
ctx,
r,
desc,
Expand Down Expand Up @@ -1514,6 +1518,8 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.startLeaseRenewer(ctx)
}

s.storeRebalancer.Start(ctx, s.stopper, s.StoreID())

// Start the storage engine compactor.
if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) {
s.compactor.Start(s.AnnotateCtx(context.Background()), s.stopper)
Expand Down
Loading

0 comments on commit fd5df74

Please sign in to comment.