Skip to content

Commit

Permalink
Merge pull request #29663 from a-robinson/backport2.1-28852
Browse files Browse the repository at this point in the history
backport-2.1: storage: make load-based replica rebalancing decisions at the store level
  • Loading branch information
a-robinson authored Sep 6, 2018
2 parents 98acdd8 + 8488d02 commit 3599a26
Show file tree
Hide file tree
Showing 15 changed files with 527 additions and 780 deletions.
4 changes: 2 additions & 2 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
<tr><td><code>jobs.registry.leniency</code></td><td>duration</td><td><code>1m0s</code></td><td>the amount of time to defer any attempts to reschedule a job</td></tr>
<tr><td><code>kv.allocator.lease_rebalancing_aggressiveness</code></td><td>float</td><td><code>1</code></td><td>set greater than 1.0 to rebalance leases toward load more aggressively, or between 0 and 1.0 to be more conservative about rebalancing leases</td></tr>
<tr><td><code>kv.allocator.load_based_lease_rebalancing.enabled</code></td><td>boolean</td><td><code>true</code></td><td>set to enable rebalancing of range leases based on load and latency</td></tr>
<tr><td><code>kv.allocator.load_based_rebalancing</code></td><td>enumeration</td><td><code>1</code></td><td>whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><code>kv.allocator.qps_rebalance_threshold</code></td><td>float</td><td><code>0.25</code></td><td>minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.allocator.range_rebalance_threshold</code></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.allocator.stat_based_rebalancing.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster</td></tr>
<tr><td><code>kv.allocator.stat_rebalance_threshold</code></td><td>float</td><td><code>0.2</code></td><td>minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_export_requests</code></td><td>integer</td><td><code>5</code></td><td>number of export requests a store will handle concurrently before queuing</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_import_requests</code></td><td>integer</td><td><code>1</code></td><td>number of import requests a store will handle concurrently before queuing</td></tr>
<tr><td><code>kv.bulk_io_write.max_rate</code></td><td>byte size</td><td><code>8.0 EiB</code></td><td>the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops</td></tr>
Expand Down
20 changes: 15 additions & 5 deletions pkg/cmd/roachtest/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,30 @@ func registerRebalanceLoad(r *registry) {
}
}

minutes := 2 * time.Minute
numNodes := 4 // the last node is just used to generate load
concurrency := 128

r.Add(testSpec{
Name: `rebalance-leases-by-load`,
Nodes: nodes(numNodes),
Stable: false, // TODO(a-robinson): Promote to stable
Nodes: nodes(4), // the last node is just used to generate load
Stable: false, // TODO(a-robinson): Promote to stable
Run: func(ctx context.Context, t *test, c *cluster) {
if local {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(ctx, t, c, minutes, concurrency)
rebalanceLoadRun(ctx, t, c, 2*time.Minute, concurrency)
},
})
r.Add(testSpec{
Name: `rebalance-replicas-by-load`,
Nodes: nodes(7), // the last node is just used to generate load
Stable: false, // TODO(a-robinson): Promote to stable
Run: func(ctx context.Context, t *test, c *cluster) {
if local {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(ctx, t, c, 5*time.Minute, concurrency)
},
})
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ var Registry = make(map[string]Setting)
// When a setting is removed, it should be added to this list so that we cannot
// accidentally reuse its name, potentially mis-handling older values.
var retiredSettings = map[string]struct{}{
//removed as of 2.0.
// removed as of 2.0.
"kv.gc.batch_size": {},
"kv.transaction.max_intents": {},
"diagnostics.reporting.report_metrics": {},
// removed as of 2.1.
"kv.allocator.stat_based_rebalancing.enabled": {},
"kv.allocator.stat_rebalance_threshold": {},
}

// Register adds a setting to the registry.
Expand Down
91 changes: 40 additions & 51 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ var leaseRebalancingAggressiveness = settings.RegisterNonNegativeFloatSetting(
1.0,
)

func statsBasedRebalancingEnabled(st *cluster.Settings, disableStatsBasedRebalance bool) bool {
return EnableStatsBasedRebalancing.Get(&st.SV) && st.Version.IsActive(cluster.VersionStatsBasedRebalancing) && !disableStatsBasedRebalance
}

// AllocatorAction enumerates the various replication adjustments that may be
// recommended by the allocator.
type AllocatorAction int
Expand Down Expand Up @@ -247,10 +243,7 @@ func MakeAllocator(
// supplied range, as governed by the supplied zone configuration. It
// returns the required action that should be taken and a priority.
func (a *Allocator) ComputeAction(
ctx context.Context,
zone config.ZoneConfig,
rangeInfo RangeInfo,
disableStatsBasedRebalancing bool,
ctx context.Context, zone config.ZoneConfig, rangeInfo RangeInfo,
) (AllocatorAction, float64) {
if a.storePool == nil {
// Do nothing if storePool is nil for some unittests.
Expand Down Expand Up @@ -341,10 +334,8 @@ func (a *Allocator) ComputeAction(
}

type decisionDetails struct {
Target string
Existing string `json:",omitempty"`
RangeBytes int64 `json:",omitempty"`
RangeWritesPerSecond float64 `json:",omitempty"`
Target string
Existing string `json:",omitempty"`
}

// AllocateTarget returns a suitable store for a new allocation with the
Expand All @@ -357,42 +348,54 @@ func (a *Allocator) AllocateTarget(
zone config.ZoneConfig,
existing []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
disableStatsBasedRebalancing bool,
) (*roachpb.StoreDescriptor, string, error) {
sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeInfo.Desc.RangeID, storeFilterThrottled)

target, details := a.allocateTargetFromList(
ctx, sl, zone, existing, rangeInfo, a.scorerOptions())

if target != nil {
return target, details, nil
}

// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
}
return nil, "", &allocatorError{
constraints: zone.Constraints,
existingReplicas: len(existing),
aliveStores: aliveStoreCount,
throttledStores: throttledStoreCount,
}
}

func (a *Allocator) allocateTargetFromList(
ctx context.Context,
sl StoreList,
zone config.ZoneConfig,
existing []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
options scorerOptions,
) (*roachpb.StoreDescriptor, string) {
analyzedConstraints := analyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existing, zone)
options := a.scorerOptions(disableStatsBasedRebalancing)
candidates := allocateCandidates(
sl, analyzedConstraints, existing, rangeInfo, a.storePool.getLocalities(existing), options,
)
log.VEventf(ctx, 3, "allocate candidates: %s", candidates)
if target := candidates.selectGood(a.randGen); target != nil {
log.VEventf(ctx, 3, "add target: %s", target)
details := decisionDetails{Target: target.compactString(options)}
if options.statsBasedRebalancingEnabled {
details.RangeBytes = rangeInfo.LogicalBytes
details.RangeWritesPerSecond = rangeInfo.WritesPerSecond
}
detailsBytes, err := json.Marshal(details)
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing allocate target: %s", err)
}
return &target.store, string(detailsBytes), nil
return &target.store, string(detailsBytes)
}

// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
}
return nil, "", &allocatorError{
constraints: zone.Constraints,
existingReplicas: len(existing),
aliveStores: aliveStoreCount,
throttledStores: throttledStoreCount,
}
return nil, ""
}

func (a Allocator) simulateRemoveTarget(
Expand All @@ -401,7 +404,6 @@ func (a Allocator) simulateRemoveTarget(
zone config.ZoneConfig,
candidates []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
disableStatsBasedRebalancing bool,
) (roachpb.ReplicaDescriptor, string, error) {
// Update statistics first
// TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines,
Expand All @@ -413,7 +415,7 @@ func (a Allocator) simulateRemoveTarget(
a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.REMOVE_REPLICA)
}()
log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore)
return a.RemoveTarget(ctx, zone, candidates, rangeInfo, disableStatsBasedRebalancing)
return a.RemoveTarget(ctx, zone, candidates, rangeInfo)
}

// RemoveTarget returns a suitable replica to remove from the provided replica
Expand All @@ -426,7 +428,6 @@ func (a Allocator) RemoveTarget(
zone config.ZoneConfig,
candidates []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
disableStatsBasedRebalancing bool,
) (roachpb.ReplicaDescriptor, string, error) {
if len(candidates) == 0 {
return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()")
Expand All @@ -441,7 +442,7 @@ func (a Allocator) RemoveTarget(

analyzedConstraints := analyzeConstraints(
ctx, a.storePool.getStoreDescriptor, rangeInfo.Desc.Replicas, zone)
options := a.scorerOptions(disableStatsBasedRebalancing)
options := a.scorerOptions()
rankedCandidates := removeCandidates(
sl,
analyzedConstraints,
Expand All @@ -455,10 +456,6 @@ func (a Allocator) RemoveTarget(
if exist.StoreID == bad.store.StoreID {
log.VEventf(ctx, 3, "remove target: %s", bad)
details := decisionDetails{Target: bad.compactString(options)}
if options.statsBasedRebalancingEnabled {
details.RangeBytes = rangeInfo.LogicalBytes
details.RangeWritesPerSecond = rangeInfo.WritesPerSecond
}
detailsBytes, err := json.Marshal(details)
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing remove target: %s", err)
Expand Down Expand Up @@ -495,7 +492,6 @@ func (a Allocator) RebalanceTarget(
raftStatus *raft.Status,
rangeInfo RangeInfo,
filter storeFilter,
disableStatsBasedRebalancing bool,
) (*roachpb.StoreDescriptor, string) {
sl, _, _ := a.storePool.getStoreList(rangeInfo.Desc.RangeID, filter)

Expand Down Expand Up @@ -529,7 +525,7 @@ func (a Allocator) RebalanceTarget(

analyzedConstraints := analyzeConstraints(
ctx, a.storePool.getStoreDescriptor, rangeInfo.Desc.Replicas, zone)
options := a.scorerOptions(disableStatsBasedRebalancing)
options := a.scorerOptions()
results := rebalanceCandidates(
ctx,
sl,
Expand Down Expand Up @@ -585,8 +581,7 @@ func (a Allocator) RebalanceTarget(
target.store.StoreID,
zone,
replicaCandidates,
rangeInfo,
disableStatsBasedRebalancing)
rangeInfo)
if err != nil {
log.Warningf(ctx, "simulating RemoveTarget failed: %s", err)
return nil, ""
Expand All @@ -606,10 +601,6 @@ func (a Allocator) RebalanceTarget(
Target: target.compactString(options),
Existing: existingCandidates.compactString(options),
}
if options.statsBasedRebalancingEnabled {
details.RangeBytes = rangeInfo.LogicalBytes
details.RangeWritesPerSecond = rangeInfo.WritesPerSecond
}
detailsBytes, err := json.Marshal(details)
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing rebalance target: %s", err)
Expand All @@ -618,12 +609,10 @@ func (a Allocator) RebalanceTarget(
return &target.store, string(detailsBytes)
}

func (a *Allocator) scorerOptions(disableStatsBasedRebalancing bool) scorerOptions {
func (a *Allocator) scorerOptions() scorerOptions {
return scorerOptions{
deterministic: a.storePool.deterministic,
statsBasedRebalancingEnabled: statsBasedRebalancingEnabled(a.storePool.st, disableStatsBasedRebalancing),
rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV),
statRebalanceThreshold: statRebalanceThreshold.Get(&a.storePool.st.SV),
deterministic: a.storePool.deterministic,
rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV),
}
}

Expand Down
Loading

0 comments on commit 3599a26

Please sign in to comment.