diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 308bedc8fc2e..68d2949eaa7c 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -17,9 +17,9 @@ jobs.registry.leniencyduration1m0sthe amount of time to defer any attempts to reschedule a job kv.allocator.lease_rebalancing_aggressivenessfloat1set greater than 1.0 to rebalance leases toward load more aggressively, or between 0 and 1.0 to be more conservative about rebalancing leases kv.allocator.load_based_lease_rebalancing.enabledbooleantrueset to enable rebalancing of range leases based on load and latency +kv.allocator.load_based_rebalancingenumeration1whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2] +kv.allocator.qps_rebalance_thresholdfloat0.25minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull kv.allocator.range_rebalance_thresholdfloat0.05minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull -kv.allocator.stat_based_rebalancing.enabledbooleanfalseset to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster -kv.allocator.stat_rebalance_thresholdfloat0.2minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull kv.bulk_io_write.concurrent_export_requestsinteger5number of export requests a store will handle concurrently before queuing kv.bulk_io_write.concurrent_import_requestsinteger1number of import requests a store will handle concurrently before queuing kv.bulk_io_write.max_ratebyte size8.0 EiBthe rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops diff --git a/pkg/cmd/roachtest/rebalance_load.go b/pkg/cmd/roachtest/rebalance_load.go index 698f2f43a83f..716dbe0145bc 100644 --- a/pkg/cmd/roachtest/rebalance_load.go +++ b/pkg/cmd/roachtest/rebalance_load.go @@ -107,20 +107,30 @@ func registerRebalanceLoad(r *registry) { } } - minutes := 2 * time.Minute - numNodes := 4 // the last node is just used to generate load concurrency := 128 r.Add(testSpec{ Name: `rebalance-leases-by-load`, - Nodes: nodes(numNodes), - Stable: false, // TODO(a-robinson): Promote to stable + Nodes: nodes(4), // the last node is just used to generate load + Stable: false, // TODO(a-robinson): Promote to stable Run: func(ctx context.Context, t *test, c *cluster) { if local { concurrency = 32 fmt.Printf("lowering concurrency to %d in local testing\n", concurrency) } - rebalanceLoadRun(ctx, t, c, minutes, concurrency) + rebalanceLoadRun(ctx, t, c, 2*time.Minute, concurrency) + }, + }) + r.Add(testSpec{ + Name: `rebalance-replicas-by-load`, + Nodes: nodes(7), // the last node is just used to generate load + Stable: false, // TODO(a-robinson): Promote to stable + Run: func(ctx context.Context, t *test, c *cluster) { + if local { + concurrency = 32 + fmt.Printf("lowering concurrency to %d in local testing\n", concurrency) + } + rebalanceLoadRun(ctx, t, c, 5*time.Minute, concurrency) }, }) } diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 8d0a89df2c16..694e73850a9c 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -34,10 +34,13 @@ var Registry = make(map[string]Setting) // When a setting is removed, it should be added to this list so that we cannot // accidentally reuse its name, potentially mis-handling older values. var retiredSettings = map[string]struct{}{ - //removed as of 2.0. + // removed as of 2.0. "kv.gc.batch_size": {}, "kv.transaction.max_intents": {}, "diagnostics.reporting.report_metrics": {}, + // removed as of 2.1. + "kv.allocator.stat_based_rebalancing.enabled": {}, + "kv.allocator.stat_rebalance_threshold": {}, } // Register adds a setting to the registry. diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index ba2646ceaa45..91ae5fa6140f 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -90,10 +90,6 @@ var leaseRebalancingAggressiveness = settings.RegisterNonNegativeFloatSetting( 1.0, ) -func statsBasedRebalancingEnabled(st *cluster.Settings, disableStatsBasedRebalance bool) bool { - return EnableStatsBasedRebalancing.Get(&st.SV) && st.Version.IsActive(cluster.VersionStatsBasedRebalancing) && !disableStatsBasedRebalance -} - // AllocatorAction enumerates the various replication adjustments that may be // recommended by the allocator. type AllocatorAction int @@ -247,10 +243,7 @@ func MakeAllocator( // supplied range, as governed by the supplied zone configuration. It // returns the required action that should be taken and a priority. func (a *Allocator) ComputeAction( - ctx context.Context, - zone config.ZoneConfig, - rangeInfo RangeInfo, - disableStatsBasedRebalancing bool, + ctx context.Context, zone config.ZoneConfig, rangeInfo RangeInfo, ) (AllocatorAction, float64) { if a.storePool == nil { // Do nothing if storePool is nil for some unittests. @@ -341,10 +334,8 @@ func (a *Allocator) ComputeAction( } type decisionDetails struct { - Target string - Existing string `json:",omitempty"` - RangeBytes int64 `json:",omitempty"` - RangeWritesPerSecond float64 `json:",omitempty"` + Target string + Existing string `json:",omitempty"` } // AllocateTarget returns a suitable store for a new allocation with the @@ -357,13 +348,39 @@ func (a *Allocator) AllocateTarget( zone config.ZoneConfig, existing []roachpb.ReplicaDescriptor, rangeInfo RangeInfo, - disableStatsBasedRebalancing bool, ) (*roachpb.StoreDescriptor, string, error) { sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeInfo.Desc.RangeID, storeFilterThrottled) + target, details := a.allocateTargetFromList( + ctx, sl, zone, existing, rangeInfo, a.scorerOptions()) + + if target != nil { + return target, details, nil + } + + // When there are throttled stores that do match, we shouldn't send + // the replica to purgatory. + if throttledStoreCount > 0 { + return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount) + } + return nil, "", &allocatorError{ + constraints: zone.Constraints, + existingReplicas: len(existing), + aliveStores: aliveStoreCount, + throttledStores: throttledStoreCount, + } +} + +func (a *Allocator) allocateTargetFromList( + ctx context.Context, + sl StoreList, + zone config.ZoneConfig, + existing []roachpb.ReplicaDescriptor, + rangeInfo RangeInfo, + options scorerOptions, +) (*roachpb.StoreDescriptor, string) { analyzedConstraints := analyzeConstraints( ctx, a.storePool.getStoreDescriptor, existing, zone) - options := a.scorerOptions(disableStatsBasedRebalancing) candidates := allocateCandidates( sl, analyzedConstraints, existing, rangeInfo, a.storePool.getLocalities(existing), options, ) @@ -371,28 +388,14 @@ func (a *Allocator) AllocateTarget( if target := candidates.selectGood(a.randGen); target != nil { log.VEventf(ctx, 3, "add target: %s", target) details := decisionDetails{Target: target.compactString(options)} - if options.statsBasedRebalancingEnabled { - details.RangeBytes = rangeInfo.LogicalBytes - details.RangeWritesPerSecond = rangeInfo.WritesPerSecond - } detailsBytes, err := json.Marshal(details) if err != nil { log.Warningf(ctx, "failed to marshal details for choosing allocate target: %s", err) } - return &target.store, string(detailsBytes), nil + return &target.store, string(detailsBytes) } - // When there are throttled stores that do match, we shouldn't send - // the replica to purgatory. - if throttledStoreCount > 0 { - return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount) - } - return nil, "", &allocatorError{ - constraints: zone.Constraints, - existingReplicas: len(existing), - aliveStores: aliveStoreCount, - throttledStores: throttledStoreCount, - } + return nil, "" } func (a Allocator) simulateRemoveTarget( @@ -401,7 +404,6 @@ func (a Allocator) simulateRemoveTarget( zone config.ZoneConfig, candidates []roachpb.ReplicaDescriptor, rangeInfo RangeInfo, - disableStatsBasedRebalancing bool, ) (roachpb.ReplicaDescriptor, string, error) { // Update statistics first // TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines, @@ -413,7 +415,7 @@ func (a Allocator) simulateRemoveTarget( a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.REMOVE_REPLICA) }() log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore) - return a.RemoveTarget(ctx, zone, candidates, rangeInfo, disableStatsBasedRebalancing) + return a.RemoveTarget(ctx, zone, candidates, rangeInfo) } // RemoveTarget returns a suitable replica to remove from the provided replica @@ -426,7 +428,6 @@ func (a Allocator) RemoveTarget( zone config.ZoneConfig, candidates []roachpb.ReplicaDescriptor, rangeInfo RangeInfo, - disableStatsBasedRebalancing bool, ) (roachpb.ReplicaDescriptor, string, error) { if len(candidates) == 0 { return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()") @@ -441,7 +442,7 @@ func (a Allocator) RemoveTarget( analyzedConstraints := analyzeConstraints( ctx, a.storePool.getStoreDescriptor, rangeInfo.Desc.Replicas, zone) - options := a.scorerOptions(disableStatsBasedRebalancing) + options := a.scorerOptions() rankedCandidates := removeCandidates( sl, analyzedConstraints, @@ -455,10 +456,6 @@ func (a Allocator) RemoveTarget( if exist.StoreID == bad.store.StoreID { log.VEventf(ctx, 3, "remove target: %s", bad) details := decisionDetails{Target: bad.compactString(options)} - if options.statsBasedRebalancingEnabled { - details.RangeBytes = rangeInfo.LogicalBytes - details.RangeWritesPerSecond = rangeInfo.WritesPerSecond - } detailsBytes, err := json.Marshal(details) if err != nil { log.Warningf(ctx, "failed to marshal details for choosing remove target: %s", err) @@ -495,7 +492,6 @@ func (a Allocator) RebalanceTarget( raftStatus *raft.Status, rangeInfo RangeInfo, filter storeFilter, - disableStatsBasedRebalancing bool, ) (*roachpb.StoreDescriptor, string) { sl, _, _ := a.storePool.getStoreList(rangeInfo.Desc.RangeID, filter) @@ -529,7 +525,7 @@ func (a Allocator) RebalanceTarget( analyzedConstraints := analyzeConstraints( ctx, a.storePool.getStoreDescriptor, rangeInfo.Desc.Replicas, zone) - options := a.scorerOptions(disableStatsBasedRebalancing) + options := a.scorerOptions() results := rebalanceCandidates( ctx, sl, @@ -585,8 +581,7 @@ func (a Allocator) RebalanceTarget( target.store.StoreID, zone, replicaCandidates, - rangeInfo, - disableStatsBasedRebalancing) + rangeInfo) if err != nil { log.Warningf(ctx, "simulating RemoveTarget failed: %s", err) return nil, "" @@ -606,10 +601,6 @@ func (a Allocator) RebalanceTarget( Target: target.compactString(options), Existing: existingCandidates.compactString(options), } - if options.statsBasedRebalancingEnabled { - details.RangeBytes = rangeInfo.LogicalBytes - details.RangeWritesPerSecond = rangeInfo.WritesPerSecond - } detailsBytes, err := json.Marshal(details) if err != nil { log.Warningf(ctx, "failed to marshal details for choosing rebalance target: %s", err) @@ -618,12 +609,10 @@ func (a Allocator) RebalanceTarget( return &target.store, string(detailsBytes) } -func (a *Allocator) scorerOptions(disableStatsBasedRebalancing bool) scorerOptions { +func (a *Allocator) scorerOptions() scorerOptions { return scorerOptions{ - deterministic: a.storePool.deterministic, - statsBasedRebalancingEnabled: statsBasedRebalancingEnabled(a.storePool.st, disableStatsBasedRebalancing), - rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV), - statRebalanceThreshold: statRebalanceThreshold.Get(&a.storePool.st.SV), + deterministic: a.storePool.deterministic, + rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV), } } diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go index 3a9155692bf4..52171520b202 100644 --- a/pkg/storage/allocator_scorer.go +++ b/pkg/storage/allocator_scorer.go @@ -54,15 +54,6 @@ const ( rebalanceToMaxFractionUsedThreshold = 0.925 ) -// EnableStatsBasedRebalancing controls whether range rebalancing takes -// additional variables such as write load and disk usage into account. -// If disabled, rebalancing is done purely based on replica count. -var EnableStatsBasedRebalancing = settings.RegisterBoolSetting( - "kv.allocator.stat_based_rebalancing.enabled", - "set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster", - false, // TODO(a-robinson): switch to true for v2.1 once the store-rebalancer is done -) - // rangeRebalanceThreshold is the minimum ratio of a store's range count to // the mean range count at which that store is considered overfull or underfull // of ranges. @@ -72,28 +63,10 @@ var rangeRebalanceThreshold = settings.RegisterNonNegativeFloatSetting( 0.05, ) -// statRebalanceThreshold is the same as rangeRebalanceThreshold, but for -// statistics other than range count. This should be larger than -// rangeRebalanceThreshold because certain stats (like keys written per second) -// are inherently less stable and thus we need to be a little more forgiving to -// avoid thrashing. -// -// Note that there isn't a ton of science behind this number, but setting it -// to .05 and .1 were shown to cause some instability in clusters without load -// on them. -// -// TODO(a-robinson): Should disk usage be held to a higher standard than this? -var statRebalanceThreshold = settings.RegisterNonNegativeFloatSetting( - "kv.allocator.stat_rebalance_threshold", - "minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull", - 0.20, -) - type scorerOptions struct { - deterministic bool - statsBasedRebalancingEnabled bool - rangeRebalanceThreshold float64 - statRebalanceThreshold float64 + deterministic bool + rangeRebalanceThreshold float64 + qpsRebalanceThreshold float64 // only considered if non-zero } type balanceDimensions struct { @@ -112,10 +85,7 @@ func (bd balanceDimensions) String() string { } func (bd balanceDimensions) compactString(options scorerOptions) string { - if !options.statsBasedRebalancingEnabled { - return fmt.Sprintf("%d", bd.ranges) - } - return bd.String() + return fmt.Sprintf("%d", bd.ranges) } // candidate store for allocation. @@ -160,10 +130,6 @@ func (c candidate) compactString(options scorerOptions) string { } fmt.Fprintf(&buf, ", converges:%d, balance:%s, rangeCount:%d", c.convergesScore, c.balanceScore.compactString(options), c.rangeCount) - if options.statsBasedRebalancingEnabled { - fmt.Fprintf(&buf, ", logicalBytes:%s, writesPerSecond:%.2f", - humanizeutil.IBytes(c.store.Capacity.LogicalBytes), c.store.Capacity.WritesPerSecond) - } if c.details != "" { fmt.Fprintf(&buf, ", details:(%s)", c.details) } @@ -442,11 +408,22 @@ func allocateCandidates( } diversityScore := diversityAllocateScore(s, existingNodeLocalities) balanceScore := balanceScore(sl, s.Capacity, rangeInfo, options) + var convergesScore int + if options.qpsRebalanceThreshold > 0 { + if s.Capacity.QueriesPerSecond < underfullThreshold(sl.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) { + convergesScore = 1 + } else if s.Capacity.QueriesPerSecond < sl.candidateQueriesPerSecond.mean { + convergesScore = 0 + } else { + convergesScore = -1 + } + } candidates = append(candidates, candidate{ store: s, valid: constraintsOK, necessary: necessary, diversityScore: diversityScore, + convergesScore: convergesScore, balanceScore: balanceScore, rangeCount: int(s.Capacity.RangeCount), }) @@ -484,7 +461,7 @@ func removeCandidates( diversityScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities) balanceScore := balanceScore(sl, s.Capacity, rangeInfo, options) var convergesScore int - if !rebalanceFromConvergesOnMean(sl, s.Capacity, rangeInfo, options) { + if !rebalanceFromConvergesOnMean(sl, s.Capacity) { // If removing this candidate replica does not converge the store // stats to their means, we make it less attractive for removal by // adding 1 to the constraint score. Note that when selecting a @@ -672,7 +649,7 @@ func rebalanceCandidates( } // TODO(a-robinson): Some moderate refactoring could extract this logic out // into the loop below, avoiding duplicate balanceScore calculations. - if shouldRebalance(ctx, existing.cand.store, sl, rangeInfo, options) { + if shouldRebalance(ctx, existing.cand.store, sl, options) { shouldRebalanceCheck = true break } @@ -703,7 +680,7 @@ func rebalanceCandidates( } balanceScore := balanceScore(comparable.sl, existing.cand.store.Capacity, rangeInfo, options) var convergesScore int - if !rebalanceFromConvergesOnMean(comparable.sl, existing.cand.store.Capacity, rangeInfo, options) { + if !rebalanceFromConvergesOnMean(comparable.sl, existing.cand.store.Capacity) { // Similarly to in removeCandidates, any replica whose removal // would not converge the range stats to their means is given a // constraint score boost of 1 to make it less attractive for @@ -728,7 +705,7 @@ func rebalanceCandidates( s := cand.store cand.fullDisk = !rebalanceToMaxCapacityCheck(s) cand.balanceScore = balanceScore(comparable.sl, s.Capacity, rangeInfo, options) - if rebalanceToConvergesOnMean(comparable.sl, s.Capacity, rangeInfo, options) { + if rebalanceToConvergesOnMean(comparable.sl, s.Capacity) { // This is the counterpart of !rebalanceFromConvergesOnMean from // the existing candidates. Candidates whose addition would // converge towards the range count mean are promoted. @@ -836,69 +813,6 @@ func betterRebalanceTarget(target1, existing1, target2, existing2 *candidate) *c // shouldRebalance returns whether the specified store is a candidate for // having a replica removed from it given the candidate store list. func shouldRebalance( - ctx context.Context, - store roachpb.StoreDescriptor, - sl StoreList, - rangeInfo RangeInfo, - options scorerOptions, -) bool { - if !options.statsBasedRebalancingEnabled { - return shouldRebalanceNoStats(ctx, store, sl, options) - } - - // Rebalance if this store is full enough that the range is a bad fit. - score := balanceScore(sl, store.Capacity, rangeInfo, options) - if rangeIsBadFit(score) { - log.VEventf(ctx, 2, - "s%d: should-rebalance(bad-fit): balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+ - "(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ", - store.StoreID, score, store.Capacity, rangeInfo, - sl.candidateRanges.mean, humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), - sl.candidateWritesPerSecond.mean) - return true - } - - // Rebalance if there exists another store that is very in need of the - // range and this store is a somewhat bad match for it. - if rangeIsPoorFit(score) { - for _, desc := range sl.stores { - otherScore := balanceScore(sl, desc.Capacity, rangeInfo, options) - if !rangeIsGoodFit(otherScore) { - log.VEventf(ctx, 5, - "s%d is not a good enough fit to replace s%d: balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+ - "(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ", - desc.StoreID, store.StoreID, otherScore, desc.Capacity, rangeInfo, - sl.candidateRanges.mean, humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), - sl.candidateWritesPerSecond.mean) - continue - } - if storeHasReplica(desc.StoreID, rangeInfo.Desc.Replicas) { - continue - } - log.VEventf(ctx, 2, - "s%d: should-rebalance(better-fit=s%d): balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+ - "otherScore=%s, otherCapacity=(%v), "+ - "(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ", - store.StoreID, desc.StoreID, score, store.Capacity, rangeInfo, - otherScore, desc.Capacity, sl.candidateRanges.mean, - humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), sl.candidateWritesPerSecond.mean) - return true - } - } - - // If we reached this point, we're happy with the range where it is. - log.VEventf(ctx, 3, - "s%d: should-not-rebalance: balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+ - "(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ", - store.StoreID, score, store.Capacity, rangeInfo, sl.candidateRanges.mean, - humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), sl.candidateWritesPerSecond.mean) - return false -} - -// shouldRebalance implements the decision of whether to rebalance for the case -// when stats-based rebalancing is disabled and decisions should thus be -// made based only on range counts. -func shouldRebalanceNoStats( ctx context.Context, store roachpb.StoreDescriptor, sl StoreList, options scorerOptions, ) bool { overfullThreshold := int32(math.Ceil(overfullRangeThreshold(options, sl.candidateRanges.mean))) @@ -1297,10 +1211,6 @@ const ( underfull rangeCountStatus = 1 ) -func oppositeStatus(rcs rangeCountStatus) rangeCountStatus { - return -rcs -} - // balanceScore returns an arbitrarily scaled score where higher scores are for // stores where the range is a better fit based on various balance factors // like range count, disk usage, and QPS. @@ -1315,224 +1225,41 @@ func balanceScore( } else { dimensions.ranges = balanced } - if options.statsBasedRebalancingEnabled { - dimensions.bytes = balanceContribution( - options, - dimensions.ranges, - sl.candidateLogicalBytes.mean, - float64(sc.LogicalBytes), - sc.BytesPerReplica, - float64(rangeInfo.LogicalBytes)) - dimensions.writes = balanceContribution( - options, - dimensions.ranges, - sl.candidateWritesPerSecond.mean, - sc.WritesPerSecond, - sc.WritesPerReplica, - rangeInfo.WritesPerSecond) - } return dimensions } -// balanceContribution generates a single dimension's contribution to a range's -// balanceScore, where larger values mean a store is a better fit for a given -// range. -func balanceContribution( - options scorerOptions, - rcs rangeCountStatus, - mean float64, - storeVal float64, - percentiles roachpb.Percentiles, - rangeVal float64, -) float64 { - if storeVal > overfullStatThreshold(options, mean) { - return percentileScore(rcs, percentiles, rangeVal) - } else if storeVal < underfullStatThreshold(options, mean) { - // To ensure that we behave symmetrically when underfull compared to - // when we're overfull, inverse both the rangeCountStatus and the - // result returned by percentileScore. This makes it so that being - // overfull on ranges and on the given dimension behaves symmetrically to - // being underfull on ranges and the given dimension (and ditto for - // overfull on ranges and underfull on a dimension, etc.). - return -percentileScore(oppositeStatus(rcs), percentiles, rangeVal) - } - return 0 -} - -// percentileScore returns a score for how desirable it is to put a range -// onto a particular store given the assumption that the store is overfull -// along a particular dimension. Takes as parameters: -// * How the number of ranges on the store compares to the norm -// * The distribution of values in the store for the dimension -// * The range's value for the dimension -// A higher score means that the range is a better fit for the store. -func percentileScore( - rcs rangeCountStatus, percentiles roachpb.Percentiles, rangeVal float64, -) float64 { - // Note that there is not any great research behind these values. If they're - // causing thrashing or a bad imbalance, rethink them and modify them as - // appropriate. - if rcs == balanced { - // If the range count is balanced, we should prefer ranges that are - // very small on this particular dimension to try to rebalance this dimension - // without messing up the replica counts. - if rangeVal < percentiles.P10 { - return 1 - } else if rangeVal < percentiles.P25 { - return 0.5 - } else if rangeVal > percentiles.P90 { - return -1 - } else if rangeVal > percentiles.P75 { - return -0.5 - } - // else rangeVal >= percentiles.P25 && rangeVal <= percentiles.P75 - // It may be better to return more than 0 here, since taking on an - // average range isn't necessarily bad, but for now let's see how this works. - return 0 - } else if rcs == overfull { - // If this store has too many ranges, we're ok with moving any range that's - // at least somewhat sizable in this dimension, since we want to reduce both - // the range count and this metric. Moving extreme outliers may be less - // desirable, though, so favor very heavy ranges slightly less and disfavor - // very light ranges. - // - // Note that we can't truly disfavor large ranges, since that prevents us - // from rebalancing nonempty ranges to empty stores (since all nonempty - // ranges will be greater than an empty store's P90). - if rangeVal > percentiles.P90 { - return -0.5 - } else if rangeVal >= percentiles.P25 { - return -1 - } else if rangeVal >= percentiles.P10 { - return 0 - } - // else rangeVal < percentiles.P10 - return 0.5 - } else if rcs == underfull { - // If this store has too few ranges but is overloaded on some other - // dimension, we need to prioritize moving away replicas that are - // high in that dimension and accepting replicas that are low in it. - if rangeVal < percentiles.P10 { - return 1 - } else if rangeVal < percentiles.P25 { - return 0.5 - } else if rangeVal > percentiles.P90 { - return -1 - } else if rangeVal > percentiles.P75 { - return -0.5 - } - // else rangeVal >= percentiles.P25 && rangeVal <= percentiles.P75 - return 0 - } - panic(fmt.Sprintf("reached unreachable code: %+v; %+v; %+v", rcs, percentiles, rangeVal)) -} - -func rangeIsGoodFit(bd balanceDimensions) bool { - // A score greater than 1 means that more than one dimension improves - // without being canceled out by the third, since each dimension can only - // contribute a value from [-1,1] to the score. - return bd.totalScore() > 1 -} - -func rangeIsBadFit(bd balanceDimensions) bool { - // This is the same logic as for rangeIsGoodFit, just reversed. - return bd.totalScore() < -1 -} - -func rangeIsPoorFit(bd balanceDimensions) bool { - // A score less than -0.5 isn't a great fit for a range, since the - // bad dimensions outweigh the good by at least one entire dimension. - return bd.totalScore() < -0.5 -} - func overfullRangeThreshold(options scorerOptions, mean float64) float64 { - if !options.statsBasedRebalancingEnabled { - return mean * (1 + options.rangeRebalanceThreshold) - } - return math.Max(mean*(1+options.rangeRebalanceThreshold), mean+5) + return overfullThreshold(mean, options.rangeRebalanceThreshold) } func underfullRangeThreshold(options scorerOptions, mean float64) float64 { - if !options.statsBasedRebalancingEnabled { - return mean * (1 - options.rangeRebalanceThreshold) - } - return math.Min(mean*(1-options.rangeRebalanceThreshold), mean-5) + return underfullThreshold(mean, options.rangeRebalanceThreshold) } -func overfullStatThreshold(options scorerOptions, mean float64) float64 { - return mean * (1 + options.statRebalanceThreshold) +func overfullThreshold(mean float64, thresholdFraction float64) float64 { + return mean * (1 + thresholdFraction) } -func underfullStatThreshold(options scorerOptions, mean float64) float64 { - return mean * (1 - options.statRebalanceThreshold) +func underfullThreshold(mean float64, thresholdFraction float64) float64 { + return mean * (1 - thresholdFraction) } -func rebalanceFromConvergesOnMean( - sl StoreList, sc roachpb.StoreCapacity, rangeInfo RangeInfo, options scorerOptions, -) bool { - return rebalanceConvergesOnMean( - sl, - sc, - sc.RangeCount-1, - sc.LogicalBytes-rangeInfo.LogicalBytes, - sc.WritesPerSecond-rangeInfo.WritesPerSecond, - options) +func rebalanceFromConvergesOnMean(sl StoreList, sc roachpb.StoreCapacity) bool { + return rebalanceConvergesOnMean(sl, sc, sc.RangeCount-1) } -func rebalanceToConvergesOnMean( - sl StoreList, sc roachpb.StoreCapacity, rangeInfo RangeInfo, options scorerOptions, -) bool { - return rebalanceConvergesOnMean( - sl, - sc, - sc.RangeCount+1, - sc.LogicalBytes+rangeInfo.LogicalBytes, - sc.WritesPerSecond+rangeInfo.WritesPerSecond, - options) +func rebalanceToConvergesOnMean(sl StoreList, sc roachpb.StoreCapacity) bool { + return rebalanceConvergesOnMean(sl, sc, sc.RangeCount+1) } -func rebalanceConvergesOnMean( - sl StoreList, - sc roachpb.StoreCapacity, - newRangeCount int32, - newLogicalBytes int64, - newWritesPerSecond float64, - options scorerOptions, -) bool { - if !options.statsBasedRebalancingEnabled { - return convergesOnMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean) - } - - // Note that we check both converges and diverges. If we always decremented - // convergeCount when something didn't converge, ranges with stats equal to 0 - // would almost never converge (and thus almost never get rebalanced). - var convergeCount int - if convergesOnMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean) { - convergeCount++ - } else if divergesFromMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean) { - convergeCount-- - } - if convergesOnMean(float64(sc.LogicalBytes), float64(newLogicalBytes), sl.candidateLogicalBytes.mean) { - convergeCount++ - } else if divergesFromMean(float64(sc.LogicalBytes), float64(newLogicalBytes), sl.candidateLogicalBytes.mean) { - convergeCount-- - } - if convergesOnMean(sc.WritesPerSecond, newWritesPerSecond, sl.candidateWritesPerSecond.mean) { - convergeCount++ - } else if divergesFromMean(sc.WritesPerSecond, newWritesPerSecond, sl.candidateWritesPerSecond.mean) { - convergeCount-- - } - return convergeCount > 0 +func rebalanceConvergesOnMean(sl StoreList, sc roachpb.StoreCapacity, newRangeCount int32) bool { + return convergesOnMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean) } func convergesOnMean(oldVal, newVal, mean float64) bool { return math.Abs(newVal-mean) < math.Abs(oldVal-mean) } -func divergesFromMean(oldVal, newVal, mean float64) bool { - return math.Abs(newVal-mean) > math.Abs(oldVal-mean) -} - // maxCapacityCheck returns true if the store has room for a new replica. func maxCapacityCheck(store roachpb.StoreDescriptor) bool { return store.Capacity.FractionUsed() < maxFractionUsedThreshold diff --git a/pkg/storage/allocator_scorer_test.go b/pkg/storage/allocator_scorer_test.go index fca6e2bd0436..50b6a240bf1c 100644 --- a/pkg/storage/allocator_scorer_test.go +++ b/pkg/storage/allocator_scorer_test.go @@ -1058,10 +1058,7 @@ func TestRemoveConstraintsCheck(t *testing.T) { func TestShouldRebalanceDiversity(t *testing.T) { defer leaktest.AfterTest(t)() - options := scorerOptions{ - statsBasedRebalancingEnabled: true, - } - + options := scorerOptions{} newStore := func(id int, locality roachpb.Locality) roachpb.StoreDescriptor { return roachpb.StoreDescriptor{ StoreID: roachpb.StoreID(id), @@ -1473,14 +1470,9 @@ func TestDiversityScoreEquivalence(t *testing.T) { func TestBalanceScore(t *testing.T) { defer leaktest.AfterTest(t)() - options := scorerOptions{ - statsBasedRebalancingEnabled: true, - } - + options := scorerOptions{} storeList := StoreList{ - candidateRanges: stat{mean: 1000}, - candidateLogicalBytes: stat{mean: 512 * 1024 * 1024}, - candidateWritesPerSecond: stat{mean: 1000}, + candidateRanges: stat{mean: 1000}, } sEmpty := roachpb.StoreCapacity{ @@ -1489,146 +1481,30 @@ func TestBalanceScore(t *testing.T) { LogicalBytes: 0, } sMean := roachpb.StoreCapacity{ - Capacity: 1024 * 1024 * 1024, - Available: 512 * 1024 * 1024, - LogicalBytes: 512 * 1024 * 1024, - RangeCount: 1000, - WritesPerSecond: 1000, - BytesPerReplica: roachpb.Percentiles{ - P10: 100 * 1024, - P25: 250 * 1024, - P50: 500 * 1024, - P75: 750 * 1024, - P90: 1000 * 1024, - }, - WritesPerReplica: roachpb.Percentiles{ - P10: 1, - P25: 2.5, - P50: 5, - P75: 7.5, - P90: 10, - }, + Capacity: 1024 * 1024 * 1024, + Available: 512 * 1024 * 1024, + LogicalBytes: 512 * 1024 * 1024, + RangeCount: 1000, } sRangesOverfull := sMean sRangesOverfull.RangeCount = 1500 sRangesUnderfull := sMean sRangesUnderfull.RangeCount = 500 - sBytesOverfull := sMean - sBytesOverfull.Available = 256 * 1024 * 1024 - sBytesOverfull.LogicalBytes = sBytesOverfull.Capacity - sBytesOverfull.Available - sBytesUnderfull := sMean - sBytesUnderfull.Available = 768 * 1024 * 1024 - sBytesUnderfull.LogicalBytes = sBytesUnderfull.Capacity - sBytesUnderfull.Available - sRangesOverfullBytesOverfull := sRangesOverfull - sRangesOverfullBytesOverfull.Available = 256 * 1024 * 1024 - sRangesOverfullBytesOverfull.LogicalBytes = - sRangesOverfullBytesOverfull.Capacity - sRangesOverfullBytesOverfull.Available - sRangesUnderfullBytesUnderfull := sRangesUnderfull - sRangesUnderfullBytesUnderfull.Available = 768 * 1024 * 1024 - sRangesUnderfullBytesUnderfull.LogicalBytes = - sRangesUnderfullBytesUnderfull.Capacity - sRangesUnderfullBytesUnderfull.Available - sRangesUnderfullBytesOverfull := sRangesUnderfull - sRangesUnderfullBytesOverfull.Available = 256 * 1024 * 1024 - sRangesUnderfullBytesOverfull.LogicalBytes = - sRangesUnderfullBytesOverfull.Capacity - sRangesUnderfullBytesOverfull.Available - sRangesOverfullBytesUnderfull := sRangesOverfull - sRangesOverfullBytesUnderfull.Available = 768 * 1024 * 1024 - sRangesOverfullBytesUnderfull.LogicalBytes = - sRangesOverfullBytesUnderfull.Capacity - sRangesOverfullBytesUnderfull.Available - sRangesUnderfullBytesOverfullWritesOverfull := sRangesUnderfullBytesOverfull - sRangesUnderfullBytesOverfullWritesOverfull.WritesPerSecond = 1500 - sRangesUnderfullBytesUnderfullWritesOverfull := sRangesUnderfullBytesUnderfull - sRangesUnderfullBytesUnderfullWritesOverfull.WritesPerSecond = 1500 - - rEmpty := RangeInfo{} - rMedian := RangeInfo{ - LogicalBytes: 500 * 1024, - WritesPerSecond: 5, - } - rHighBytes := rMedian - rHighBytes.LogicalBytes = 2000 * 1024 - rLowBytes := rMedian - rLowBytes.LogicalBytes = 50 * 1024 - rHighBytesHighWrites := rHighBytes - rHighBytesHighWrites.WritesPerSecond = 20 - rHighBytesLowWrites := rHighBytes - rHighBytesLowWrites.WritesPerSecond = 0.5 - rLowBytesHighWrites := rLowBytes - rLowBytesHighWrites.WritesPerSecond = 20 - rLowBytesLowWrites := rLowBytes - rLowBytesLowWrites.WritesPerSecond = 0.5 - rHighWrites := rMedian - rHighWrites.WritesPerSecond = 20 - rLowWrites := rMedian - rLowWrites.WritesPerSecond = 0.5 + + ri := RangeInfo{} testCases := []struct { sc roachpb.StoreCapacity - ri RangeInfo expected float64 }{ - {sEmpty, rEmpty, 3}, - {sEmpty, rMedian, 2}, - {sEmpty, rHighBytes, 2}, - {sEmpty, rLowBytes, 2}, - {sMean, rEmpty, 0}, - {sMean, rMedian, 0}, - {sMean, rHighBytes, 0}, - {sMean, rLowBytes, 0}, - {sRangesOverfull, rEmpty, -1}, - {sRangesOverfull, rMedian, -1}, - {sRangesOverfull, rHighBytes, -1}, - {sRangesOverfull, rLowBytes, -1}, - {sRangesUnderfull, rEmpty, 1}, - {sRangesUnderfull, rMedian, 1}, - {sRangesUnderfull, rHighBytes, 1}, - {sRangesUnderfull, rLowBytes, 1}, - {sBytesOverfull, rEmpty, 1}, - {sBytesOverfull, rMedian, 0}, - {sBytesOverfull, rHighBytes, -1}, - {sBytesOverfull, rLowBytes, 1}, - {sBytesUnderfull, rEmpty, -1}, - {sBytesUnderfull, rMedian, 0}, - {sBytesUnderfull, rHighBytes, 1}, - {sBytesUnderfull, rLowBytes, -1}, - {sRangesOverfullBytesOverfull, rEmpty, -.5}, - {sRangesOverfullBytesOverfull, rMedian, -2}, - {sRangesOverfullBytesOverfull, rHighBytes, -1.5}, - {sRangesOverfullBytesOverfull, rLowBytes, -.5}, - {sRangesUnderfullBytesUnderfull, rEmpty, .5}, - {sRangesUnderfullBytesUnderfull, rMedian, 2}, - {sRangesUnderfullBytesUnderfull, rHighBytes, 1.5}, - {sRangesUnderfullBytesUnderfull, rLowBytes, .5}, - {sRangesUnderfullBytesOverfull, rEmpty, 2}, - {sRangesUnderfullBytesOverfull, rMedian, 1}, - {sRangesUnderfullBytesOverfull, rHighBytes, 0}, - {sRangesUnderfullBytesOverfull, rLowBytes, 2}, - {sRangesOverfullBytesUnderfull, rEmpty, -2}, - {sRangesOverfullBytesUnderfull, rMedian, -1}, - {sRangesOverfullBytesUnderfull, rHighBytes, 0}, - {sRangesOverfullBytesUnderfull, rLowBytes, -2}, - {sRangesUnderfullBytesOverfullWritesOverfull, rEmpty, 3}, - {sRangesUnderfullBytesOverfullWritesOverfull, rMedian, 1}, - {sRangesUnderfullBytesOverfullWritesOverfull, rHighBytes, 0}, - {sRangesUnderfullBytesOverfullWritesOverfull, rHighBytesHighWrites, -1}, - {sRangesUnderfullBytesOverfullWritesOverfull, rHighBytesLowWrites, 1}, - {sRangesUnderfullBytesOverfullWritesOverfull, rLowBytes, 2}, - {sRangesUnderfullBytesOverfullWritesOverfull, rLowBytesHighWrites, 1}, - {sRangesUnderfullBytesOverfullWritesOverfull, rLowBytesLowWrites, 3}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rEmpty, 1.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rMedian, 2}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rHighBytes, 1.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rHighBytesHighWrites, 0.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rHighBytesLowWrites, 2.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rLowBytes, 0.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rLowBytesHighWrites, -0.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rLowBytesLowWrites, 1.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rHighWrites, 1}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rLowWrites, 3}, + {sEmpty, 1}, + {sMean, 0}, + {sRangesOverfull, -1}, + {sRangesUnderfull, 1}, } for i, tc := range testCases { - if a, e := balanceScore(storeList, tc.sc, tc.ri, options), tc.expected; a.totalScore() != e { - t.Errorf("%d: balanceScore(storeList, %+v, %+v) got %s; want %.2f", i, tc.sc, tc.ri, a, e) + if a, e := balanceScore(storeList, tc.sc, ri, options), tc.expected; a.totalScore() != e { + t.Errorf("%d: balanceScore(storeList, %+v) got %s; want %.2f", i, tc.sc, a, e) } } } @@ -1636,77 +1512,34 @@ func TestBalanceScore(t *testing.T) { func TestRebalanceConvergesOnMean(t *testing.T) { defer leaktest.AfterTest(t)() - options := scorerOptions{ - statsBasedRebalancingEnabled: true, - } - - const diskCapacity = 2000 storeList := StoreList{ - candidateRanges: stat{mean: 1000}, - candidateLogicalBytes: stat{mean: 1000}, - candidateWritesPerSecond: stat{mean: 1000}, - } - emptyRange := RangeInfo{} - normalRange := RangeInfo{ - LogicalBytes: 10, - WritesPerSecond: 10, - } - outlierRange := RangeInfo{ - LogicalBytes: 10, - WritesPerSecond: 10000, + candidateRanges: stat{mean: 1000}, } testCases := []struct { - rangeCount int32 - liveBytes int64 - writesPerSecond float64 - ri RangeInfo - toConverges bool - fromConverges bool + rangeCount int32 + toConverges bool + fromConverges bool }{ - {0, 0, 0, emptyRange, true, false}, - {900, 900, 900, emptyRange, true, false}, - {900, 900, 2000, emptyRange, true, false}, - {999, 1000, 1000, emptyRange, true, false}, - {1000, 1000, 1000, emptyRange, false, false}, - {1001, 1000, 1000, emptyRange, false, true}, - {2000, 2000, 2000, emptyRange, false, true}, - {900, 2000, 2000, emptyRange, true, false}, - {0, 0, 0, normalRange, true, false}, - {900, 900, 900, normalRange, true, false}, - {900, 900, 2000, normalRange, true, false}, - {999, 1000, 1000, normalRange, false, false}, - {2000, 2000, 2000, normalRange, false, true}, - {900, 2000, 2000, normalRange, false, true}, - {1000, 990, 990, normalRange, true, false}, - {1000, 994, 994, normalRange, true, false}, - {1000, 990, 995, normalRange, false, false}, - {1000, 1010, 1010, normalRange, false, true}, - {1000, 1010, 1005, normalRange, false, false}, - {0, 0, 0, outlierRange, true, false}, - {900, 900, 900, outlierRange, true, false}, - {900, 900, 2000, outlierRange, true, false}, - {999, 1000, 1000, outlierRange, false, false}, - {2000, 2000, 10000, outlierRange, false, true}, - {900, 2000, 10000, outlierRange, false, true}, - {1000, 990, 990, outlierRange, false, false}, - {1000, 1000, 10000, outlierRange, false, false}, - {1000, 1010, 10000, outlierRange, false, true}, - {1001, 1010, 1005, outlierRange, false, true}, + {0, true, false}, + {900, true, false}, + {900, true, false}, + {999, true, false}, + {1000, false, false}, + {1001, false, true}, + {2000, false, true}, + {900, true, false}, } + for i, tc := range testCases { sc := roachpb.StoreCapacity{ - Capacity: diskCapacity, - Available: diskCapacity - tc.liveBytes, - LogicalBytes: tc.liveBytes, - RangeCount: tc.rangeCount, - WritesPerSecond: tc.writesPerSecond, + RangeCount: tc.rangeCount, } - if a, e := rebalanceToConvergesOnMean(storeList, sc, tc.ri, options), tc.toConverges; a != e { - t.Errorf("%d: rebalanceToConvergesOnMean(storeList, %+v, %+v) got %t; want %t", i, sc, tc.ri, a, e) + if a, e := rebalanceToConvergesOnMean(storeList, sc), tc.toConverges; a != e { + t.Errorf("%d: rebalanceToConvergesOnMean(storeList, %+v) got %t; want %t", i, sc, a, e) } - if a, e := rebalanceFromConvergesOnMean(storeList, sc, tc.ri, options), tc.fromConverges; a != e { - t.Errorf("%d: rebalanceFromConvergesOnMean(storeList, %+v, %+v) got %t; want %t", i, sc, tc.ri, a, e) + if a, e := rebalanceFromConvergesOnMean(storeList, sc), tc.fromConverges; a != e { + t.Errorf("%d: rebalanceFromConvergesOnMean(storeList, %+v) got %t; want %t", i, sc, a, e) } } } diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index cabea157392c..98df381a78ac 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -397,7 +397,6 @@ func TestAllocatorSimpleRetrieval(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if err != nil { t.Fatalf("Unable to perform allocation: %v", err) @@ -431,7 +430,6 @@ func TestAllocatorCorruptReplica(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if err != nil { t.Fatal(err) @@ -451,7 +449,6 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if result != nil { t.Errorf("expected nil result: %+v", result) @@ -473,7 +470,6 @@ func TestAllocatorTwoDatacenters(t *testing.T) { multiDCConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if err != nil { t.Fatalf("Unable to perform allocation: %v", err) @@ -486,7 +482,6 @@ func TestAllocatorTwoDatacenters(t *testing.T) { StoreID: result1.StoreID, }}, firstRangeInfo, - false, ) if err != nil { t.Fatalf("Unable to perform allocation: %v", err) @@ -511,7 +506,6 @@ func TestAllocatorTwoDatacenters(t *testing.T) { }, }, firstRangeInfo, - false, ) if err == nil { t.Errorf("expected error on allocation without available stores: %+v", result3) @@ -543,7 +537,6 @@ func TestAllocatorExistingReplica(t *testing.T) { }, }, firstRangeInfo, - false, ) if err != nil { t.Fatalf("Unable to perform allocation: %v", err) @@ -604,9 +597,6 @@ func TestAllocatorRebalance(t *testing.T) { stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) defer stopper.Stop(context.Background()) - st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, false) - gossiputil.NewStoreGossiper(g).GossipStores(stores, t) ctx := context.Background() @@ -618,7 +608,6 @@ func TestAllocatorRebalance(t *testing.T) { nil, testRangeInfo([]roachpb.ReplicaDescriptor{{NodeID: 3, StoreID: 3}}, firstRange), storeFilterThrottled, - false, ) if result == nil { i-- // loop until we find 10 candidates @@ -638,7 +627,7 @@ func TestAllocatorRebalance(t *testing.T) { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } sl, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled) - result := shouldRebalance(ctx, desc, sl, firstRangeInfo, a.scorerOptions(false)) + result := shouldRebalance(ctx, desc, sl, a.scorerOptions()) if expResult := (i >= 2); expResult != result { t.Errorf("%d: expected rebalance %t; got %t; desc %+v; sl: %+v", i, expResult, result, desc, sl) } @@ -735,8 +724,6 @@ func TestAllocatorRebalanceTarget(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(stores, t) - st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, false) replicas := []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1}, {NodeID: 4, StoreID: 4}, @@ -773,7 +760,6 @@ func TestAllocatorRebalanceTarget(t *testing.T) { status, rangeInfo, storeFilterThrottled, - false, ) if result != nil { t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) @@ -793,7 +779,6 @@ func TestAllocatorRebalanceTarget(t *testing.T) { status, rangeInfo, storeFilterThrottled, - false, ) if result != nil { t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) @@ -810,7 +795,6 @@ func TestAllocatorRebalanceTarget(t *testing.T) { status, rangeInfo, storeFilterThrottled, - false, ) if result == nil || result.StoreID != stores[1].StoreID { t.Fatalf("%d: expected rebalance to s%d, but got %v; details: %s", @@ -826,8 +810,6 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { ctx := context.Background() defer stopper.Stop(ctx) - EnableStatsBasedRebalancing.Override(&sp.st.SV, false) - mockStorePool( sp, []roachpb.StoreID{1, 2, 3, 4, 5, 6}, @@ -887,8 +869,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { config.ZoneConfig{}, nil, testRangeInfo(c.existing, firstRange), - storeFilterThrottled, - false) + storeFilterThrottled) if c.expected > 0 { if result == nil { t.Fatalf("expected %d, but found nil", c.expected) @@ -988,8 +969,6 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { defer stopper.Stop(context.Background()) st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, false) - cluster := tc.cluster(st) // It doesn't make sense to test sets of stores containing fewer than 4 @@ -1031,7 +1010,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { if !ok { t.Fatalf("[store %d]: unable to get store %d descriptor", j, store.StoreID) } - if a, e := shouldRebalance(context.Background(), desc, sl, firstRangeInfo, a.scorerOptions(false)), cluster[j].shouldRebalanceFrom; a != e { + if a, e := shouldRebalance(context.Background(), desc, sl, a.scorerOptions()), cluster[j].shouldRebalanceFrom; a != e { t.Errorf("[store %d]: shouldRebalance %t != expected %t", store.StoreID, a, e) } } @@ -1072,9 +1051,6 @@ func TestAllocatorRebalanceByCount(t *testing.T) { stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) defer stopper.Stop(context.Background()) - st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, false) - gossiputil.NewStoreGossiper(g).GossipStores(stores, t) ctx := context.Background() @@ -1086,7 +1062,6 @@ func TestAllocatorRebalanceByCount(t *testing.T) { nil, testRangeInfo([]roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, firstRange), storeFilterThrottled, - false, ) if result != nil && result.StoreID != 4 { t.Errorf("expected store 4; got %d", result.StoreID) @@ -1100,7 +1075,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } sl, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled) - result := shouldRebalance(ctx, desc, sl, firstRangeInfo, a.scorerOptions(false)) + result := shouldRebalance(ctx, desc, sl, a.scorerOptions()) if expResult := (i < 3); expResult != result { t.Errorf("%d: expected rebalance %t; got %t", i, expResult, result) } @@ -1248,7 +1223,6 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { defer leaktest.AfterTest(t)() stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) - EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false) ctx := context.Background() defer stopper.Stop(ctx) @@ -1367,7 +1341,6 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { nil, /* raftStatus */ testRangeInfo(tc.existing, firstRange), storeFilterThrottled, - false, ) var resultID roachpb.StoreID if result != nil { @@ -1436,7 +1409,6 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { nil, /* raftStatus */ testRangeInfo(tc.existing, firstRange), storeFilterThrottled, - false, ) var gotExpected bool if result == nil { @@ -1935,7 +1907,6 @@ func TestAllocatorRemoveTargetLocality(t *testing.T) { config.ZoneConfig{}, existingRepls, testRangeInfo(existingRepls, firstRange), - false, ) if err != nil { t.Fatal(err) @@ -2019,7 +1990,6 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { config.ZoneConfig{}, existingRepls, testRangeInfo(existingRepls, firstRange), - false, ) if err != nil { t.Fatal(err) @@ -2041,7 +2011,6 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { defer leaktest.AfterTest(t)() stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) - EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false) defer stopper.Stop(context.Background()) stores := []*roachpb.StoreDescriptor{ @@ -2143,7 +2112,6 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { nil, testRangeInfo(existingRepls, firstRange), storeFilterThrottled, - false, ) if targetStore == nil { t.Fatalf("%d: RebalanceTarget(%v) returned no target store; details: %s", i, c.existing, details) @@ -2494,7 +2462,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { existingRepls, rangeInfo, a.storePool.getLocalities(existingRepls), - a.scorerOptions(false), + a.scorerOptions(), ) best := candidates.best() match := true @@ -2718,7 +2686,7 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { analyzed, rangeInfo, a.storePool.getLocalities(existingRepls), - a.scorerOptions(false), + a.scorerOptions(), ) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { t.Errorf("%d: expected removeCandidates(%v) = %v, but got %v", @@ -3515,7 +3483,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { rangeInfo, a.storePool.getLocalities(existingRepls), a.storePool.getNodeLocalityString, - a.scorerOptions(false), + a.scorerOptions(), ) match := true if len(tc.expected) != len(results) { @@ -3539,7 +3507,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { // Also verify that RebalanceTarget picks out one of the best options as // the final rebalance choice. target, details := a.RebalanceTarget( - context.Background(), zone, nil, rangeInfo, storeFilterThrottled, false) + context.Background(), zone, nil, rangeInfo, storeFilterThrottled) var found bool if target == nil && len(tc.validTargets) == 0 { found = true @@ -3905,9 +3873,6 @@ func TestAllocatorRemoveTarget(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(stores, t) - st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, false) - // Repeat this test 10 times, it should always be either store 2 or 3. for i := 0; i < 10; i++ { targetRepl, _, err := a.RemoveTarget( @@ -3915,7 +3880,6 @@ func TestAllocatorRemoveTarget(t *testing.T) { config.ZoneConfig{}, replicas, testRangeInfo(replicas, firstRange), - false, ) if err != nil { t.Fatal(err) @@ -4400,7 +4364,7 @@ func TestAllocatorComputeAction(t *testing.T) { lastPriority := float64(999999999) for i, tcase := range testCases { - action, priority := a.ComputeAction(ctx, tcase.zone, RangeInfo{Desc: &tcase.desc}, false) + action, priority := a.ComputeAction(ctx, tcase.zone, RangeInfo{Desc: &tcase.desc}) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %q, got action %q", i, allocatorActionNames[tcase.expectedAction], allocatorActionNames[action]) @@ -4413,108 +4377,6 @@ func TestAllocatorComputeAction(t *testing.T) { } } -// TestAllocatorComputeActionDisableStatsRebalance is used to verify whether the cluster could balance out -// if we disable stats-based-rebalance. -func TestAllocatorRebalanceTargetDisableStatsRebalance(t *testing.T) { - defer leaktest.AfterTest(t)() - ctx := context.Background() - stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) - defer stopper.Stop(ctx) - - st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, true) - // Make sure the cluster's initial status is not balanced. - // store 1, 2, 3 has 200 replicas, store 4 has only 30 replicas. - // So obviously, if we want the replica is homogeneous within each store, - // we should rebalance some replicas from store 1, 2, 3 to store 4. - // However, we set the WritesPerSecond of store 4 is much bigger than other store, - // so if we enable stats-based rebalance, we couldn't spread out the replicas on store 1, 2, 3. - stores := []*roachpb.StoreDescriptor{ - { - StoreID: 1, - Node: roachpb.NodeDescriptor{NodeID: 1}, - Capacity: roachpb.StoreCapacity{ - RangeCount: 200, - WritesPerSecond: 30, - }, - }, - { - StoreID: 2, - Node: roachpb.NodeDescriptor{NodeID: 2}, - Capacity: roachpb.StoreCapacity{ - RangeCount: 200, - WritesPerSecond: 30, - }, - }, - { - StoreID: 3, - Node: roachpb.NodeDescriptor{NodeID: 3}, - Capacity: roachpb.StoreCapacity{ - RangeCount: 200, - WritesPerSecond: 30, - }, - }, - { - StoreID: 4, - Node: roachpb.NodeDescriptor{NodeID: 4}, - Capacity: roachpb.StoreCapacity{ - RangeCount: 30, - WritesPerSecond: 100, - }, - }, - } - sg := gossiputil.NewStoreGossiper(g) - sg.GossipStores(stores, t) - - desc := roachpb.RangeDescriptor{ - RangeID: firstRange, - Replicas: []roachpb.ReplicaDescriptor{ - { - StoreID: 1, - NodeID: 1, - ReplicaID: 1, - }, - { - StoreID: 2, - NodeID: 2, - ReplicaID: 2, - }, - { - StoreID: 3, - NodeID: 3, - ReplicaID: 3, - }, - }, - } - for i := 0; i < 50; i++ { - target, _ := a.RebalanceTarget( - context.Background(), - config.ZoneConfig{}, - nil, - testRangeInfo(desc.Replicas, desc.RangeID), - storeFilterThrottled, - false, /* disableStatsBasedRebalancing */ - ) - if target != nil { - t.Errorf("expected no balance, but got %d", target.StoreID) - } - } - - for i := 0; i < 50; i++ { - target, _ := a.RebalanceTarget( - context.Background(), - config.ZoneConfig{}, - nil, - testRangeInfo(desc.Replicas, desc.RangeID), - storeFilterThrottled, - true, /* disableStatsBasedRebalancing */ - ) - if expectedStore := roachpb.StoreID(4); target.StoreID != expectedStore { - t.Errorf("expected balance to %d, but got %d", expectedStore, target.StoreID) - } - } -} - func TestAllocatorComputeActionRemoveDead(t *testing.T) { defer leaktest.AfterTest(t)() @@ -4599,7 +4461,7 @@ func TestAllocatorComputeActionRemoveDead(t *testing.T) { for i, tcase := range testCases { mockStorePool(sp, tcase.live, tcase.dead, nil, nil, nil) - action, _ := a.ComputeAction(ctx, zone, RangeInfo{Desc: &tcase.desc}, false) + action, _ := a.ComputeAction(ctx, zone, RangeInfo{Desc: &tcase.desc}) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) } @@ -4820,7 +4682,7 @@ func TestAllocatorComputeActionDecommission(t *testing.T) { for i, tcase := range testCases { mockStorePool(sp, tcase.live, tcase.dead, tcase.decommissioning, tcase.decommissioned, nil) - action, _ := a.ComputeAction(ctx, tcase.zone, RangeInfo{Desc: &tcase.desc}, false) + action, _ := a.ComputeAction(ctx, tcase.zone, RangeInfo{Desc: &tcase.desc}) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) continue @@ -4834,7 +4696,7 @@ func TestAllocatorComputeActionNoStorePool(t *testing.T) { defer leaktest.AfterTest(t)() a := MakeAllocator(nil /* storePool */, nil /* rpcContext */) - action, priority := a.ComputeAction(context.Background(), config.ZoneConfig{}, RangeInfo{}, false) + action, priority := a.ComputeAction(context.Background(), config.ZoneConfig{}, RangeInfo{}) if action != AllocatorNoop { t.Errorf("expected AllocatorNoop, but got %v", action) } @@ -4902,7 +4764,6 @@ func TestAllocatorThrottled(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if _, ok := err.(purgatoryError); !ok { t.Fatalf("expected a purgatory error, got: %v", err) @@ -4915,7 +4776,6 @@ func TestAllocatorThrottled(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if err != nil { t.Fatalf("unable to perform allocation: %v", err) @@ -4938,7 +4798,6 @@ func TestAllocatorThrottled(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if _, ok := err.(purgatoryError); ok { t.Fatalf("expected a non purgatory error, got: %v", err) @@ -5205,7 +5064,6 @@ func TestAllocatorRebalanceAway(t *testing.T) { nil, testRangeInfo(existingReplicas, firstRange), storeFilterThrottled, - false, ) if tc.expected == nil && actual != nil { @@ -5267,8 +5125,6 @@ func TestAllocatorFullDisks(t *testing.T) { defer stopper.Stop(ctx) st := cluster.MakeTestingClusterSettings() - EnableStatsBasedRebalancing.Override(&st.SV, false) - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) // Model a set of stores in a cluster doing rebalancing, with ranges being @@ -5367,7 +5223,6 @@ func TestAllocatorFullDisks(t *testing.T) { nil, testRangeInfo([]roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, firstRange), storeFilterThrottled, - false, ) if target != nil { if log.V(1) { @@ -5406,8 +5261,6 @@ func Example_rebalancing() { defer stopper.Stop(context.TODO()) st := cluster.MakeTestingClusterSettings() - EnableStatsBasedRebalancing.Override(&st.SV, false) - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) // Model a set of stores in a cluster, @@ -5492,7 +5345,6 @@ func Example_rebalancing() { nil, testRangeInfo([]roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, firstRange), storeFilterThrottled, - false, ) if target != nil { log.Infof(context.TODO(), "rebalancing to %v; details: %s", target, details) diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 52eb40ffa02e..b67771f64060 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -1159,7 +1159,7 @@ func (r *Replica) adminScatter( var allowLeaseTransfer bool canTransferLease := func() bool { return allowLeaseTransfer } for re := retry.StartWithCtx(ctx, retryOpts); re.Next(); { - requeue, err := rq.processOneChange(ctx, r, sysCfg, canTransferLease, false /* dryRun */, true /* disableStatsBasedRebalancing */) + requeue, err := rq.processOneChange(ctx, r, sysCfg, canTransferLease, false /* dryRun */) if err != nil { if IsSnapshotError(err) { continue diff --git a/pkg/storage/replica_rankings.go b/pkg/storage/replica_rankings.go index 13b4a99f582d..3810fd9c943c 100644 --- a/pkg/storage/replica_rankings.go +++ b/pkg/storage/replica_rankings.go @@ -28,6 +28,7 @@ const ( type replicaWithStats struct { repl *Replica qps float64 + // TODO(a-robinson): Include writes-per-second and logicalBytes of storage? } // replicaRankings maintains top-k orderings of the replicas in a store along diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 82ed88479cdf..34a93da3ced6 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -204,7 +204,7 @@ func (rq *replicateQueue) shouldQueue( } rangeInfo := rangeInfoForRepl(repl, desc) - action, priority := rq.allocator.ComputeAction(ctx, zone, rangeInfo, false) + action, priority := rq.allocator.ComputeAction(ctx, zone, rangeInfo) if action == AllocatorNoop { log.VEventf(ctx, 2, "no action to take") return false, 0 @@ -214,7 +214,7 @@ func (rq *replicateQueue) shouldQueue( } if !rq.store.TestingKnobs().DisableReplicaRebalancing { - target, _ := rq.allocator.RebalanceTarget(ctx, zone, repl.RaftStatus(), rangeInfo, storeFilterThrottled, false) + target, _ := rq.allocator.RebalanceTarget(ctx, zone, repl.RaftStatus(), rangeInfo, storeFilterThrottled) if target != nil { log.VEventf(ctx, 2, "rebalance target found, enqueuing") return true, 0 @@ -249,7 +249,7 @@ func (rq *replicateQueue) process( // snapshot errors, usually signaling that a rebalancing // reservation could not be made with the selected target. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - if requeue, err := rq.processOneChange(ctx, repl, sysCfg, rq.canTransferLease, false /* dryRun */, false /* disableStatsBasedRebalancing */); err != nil { + if requeue, err := rq.processOneChange(ctx, repl, sysCfg, rq.canTransferLease, false /* dryRun */); err != nil { if IsSnapshotError(err) { // If ChangeReplicas failed because the preemptive snapshot failed, we // log the error but then return success indicating we should retry the @@ -276,7 +276,6 @@ func (rq *replicateQueue) processOneChange( sysCfg config.SystemConfig, canTransferLease func() bool, dryRun bool, - disableStatsBasedRebalancing bool, ) (requeue bool, _ error) { desc := repl.Desc() @@ -297,7 +296,7 @@ func (rq *replicateQueue) processOneChange( } rangeInfo := rangeInfoForRepl(repl, desc) - switch action, _ := rq.allocator.ComputeAction(ctx, zone, rangeInfo, disableStatsBasedRebalancing); action { + switch action, _ := rq.allocator.ComputeAction(ctx, zone, rangeInfo); action { case AllocatorNoop: break case AllocatorAdd: @@ -307,7 +306,6 @@ func (rq *replicateQueue) processOneChange( zone, liveReplicas, // only include liveReplicas, since deadReplicas should soon be removed rangeInfo, - disableStatsBasedRebalancing, ) if err != nil { return false, err @@ -342,7 +340,6 @@ func (rq *replicateQueue) processOneChange( zone, oldPlusNewReplicas, rangeInfo, - disableStatsBasedRebalancing, ) if err != nil { // It does not seem possible to go to the next odd replica state. Note @@ -379,7 +376,7 @@ func (rq *replicateQueue) processOneChange( return false, errors.Errorf("no removable replicas from range that needs a removal: %s", rangeRaftProgress(repl.RaftStatus(), desc.Replicas)) } - removeReplica, details, err := rq.allocator.RemoveTarget(ctx, zone, candidates, rangeInfo, disableStatsBasedRebalancing) + removeReplica, details, err := rq.allocator.RemoveTarget(ctx, zone, candidates, rangeInfo) if err != nil { return false, err } @@ -493,7 +490,7 @@ func (rq *replicateQueue) processOneChange( if !rq.store.TestingKnobs().DisableReplicaRebalancing { rebalanceStore, details := rq.allocator.RebalanceTarget( - ctx, zone, repl.RaftStatus(), rangeInfo, storeFilterThrottled, disableStatsBasedRebalancing) + ctx, zone, repl.RaftStatus(), rangeInfo, storeFilterThrottled) if rebalanceStore == nil { log.VEventf(ctx, 1, "no suitable rebalance target") } else { diff --git a/pkg/storage/replicate_queue_test.go b/pkg/storage/replicate_queue_test.go index 8f07bc4a5df5..d32504c6fb5b 100644 --- a/pkg/storage/replicate_queue_test.go +++ b/pkg/storage/replicate_queue_test.go @@ -62,7 +62,7 @@ func TestReplicateQueueRebalance(t *testing.T) { for _, server := range tc.Servers { st := server.ClusterSettings() st.Manual.Store(true) - storage.EnableStatsBasedRebalancing.Override(&st.SV, false) + storage.LoadBasedRebalancingMode.Override(&st.SV, int64(storage.LBRebalancingOff)) } const newRanges = 5 diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 1861ce599a66..23f692c23e17 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1021,10 +1021,10 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript ) s.scanner.AddQueues(s.tsMaintenanceQueue) } - } - s.storeRebalancer = NewStoreRebalancer( - s.cfg.AmbientCtx, cfg.Settings, s.replicateQueue, s.replRankings) + s.storeRebalancer = NewStoreRebalancer( + s.cfg.AmbientCtx, cfg.Settings, s.replicateQueue, s.replRankings) + } if cfg.TestingKnobs.DisableGCQueue { s.setGCQueueActive(false) @@ -1560,7 +1560,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Connect rangefeeds to closed timestamp updates. s.startClosedTimestampRangefeedSubscriber(ctx) - s.storeRebalancer.Start(ctx, s.stopper, s.StoreID()) + if s.storeRebalancer != nil { + s.storeRebalancer.Start(ctx, s.stopper) + } // Start the storage engine compactor. if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) { @@ -4542,7 +4544,7 @@ func (s *Store) AllocatorDryRun( defer cancel() canTransferLease := func() bool { return true } _, err := s.replicateQueue.processOneChange( - ctx, repl, sysCfg, canTransferLease, true /* dryRun */, false /* disableStatsBasedRebalancing */) + ctx, repl, sysCfg, canTransferLease, true /* dryRun */) if err != nil { log.Eventf(ctx, "error simulating allocator on replica %s: %s", repl, err) } diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go index 5be226f33073..41daaf277a25 100644 --- a/pkg/storage/store_rebalancer.go +++ b/pkg/storage/store_rebalancer.go @@ -22,8 +22,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" ) @@ -40,6 +43,75 @@ const ( minQPSThresholdDifference = 100 ) +var ( + metaStoreRebalancerLeaseTransferCount = metric.Metadata{ + Name: "rebalancing.lease.transfers", + Help: "Number of lease transfers motivated by store-level load imbalances", + Measurement: "Lease Transfers", + Unit: metric.Unit_COUNT, + } + metaStoreRebalancerRangeRebalanceCount = metric.Metadata{ + Name: "rebalancing.range.rebalances", + Help: "Number of range rebalance operations motivated by store-level load imbalances", + Measurement: "Range Rebalances", + Unit: metric.Unit_COUNT, + } +) + +// StoreRebalancerMetrics is the set of metrics for the store-level rebalancer. +type StoreRebalancerMetrics struct { + LeaseTransferCount *metric.Counter + RangeRebalanceCount *metric.Counter +} + +func makeStoreRebalancerMetrics() StoreRebalancerMetrics { + return StoreRebalancerMetrics{ + LeaseTransferCount: metric.NewCounter(metaStoreRebalancerLeaseTransferCount), + RangeRebalanceCount: metric.NewCounter(metaStoreRebalancerRangeRebalanceCount), + } +} + +// LoadBasedRebalancingMode controls whether range rebalancing takes +// additional variables such as write load and disk usage into account. +// If disabled, rebalancing is done purely based on replica count. +var LoadBasedRebalancingMode = settings.RegisterEnumSetting( + "kv.allocator.load_based_rebalancing", + "whether to rebalance based on the distribution of QPS across stores", + "leases", + map[int64]string{ + int64(LBRebalancingOff): "off", + int64(LBRebalancingLeasesOnly): "leases", + int64(LBRebalancingLeasesAndReplicas): "leases and replicas", + }, +) + +// qpsRebalanceThreshold is much like rangeRebalanceThreshold, but for +// QPS rather than range count. This should be set higher than +// rangeRebalanceThreshold because QPS can naturally vary over time as +// workloads change and clients come and go, so we need to be a little more +// forgiving to avoid thrashing. +var qpsRebalanceThreshold = settings.RegisterNonNegativeFloatSetting( + "kv.allocator.qps_rebalance_threshold", + "minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull", + 0.25, +) + +// LBRebalancingMode controls if and when we do store-level rebalancing +// based on load. +type LBRebalancingMode int64 + +const ( + // LBRebalancingOff means that we do not do store-level rebalancing + // based on load statistics. + LBRebalancingOff LBRebalancingMode = iota + // LBRebalancingLeasesOnly means that we rebalance leases based on + // store-level QPS imbalances. + LBRebalancingLeasesOnly + // LBRebalancingLeasesAndReplicas means that we rebalance both leases and + // replicas based on store-level QPS imbalances. + LBRebalancingLeasesAndReplicas +) + // StoreRebalancer is responsible for examining how the associated store's load // compares to the load on other stores in the cluster and transferring leases // or replicas away if the local store is overloaded. @@ -52,6 +124,7 @@ const ( // will best accomplish the store-level goals. type StoreRebalancer struct { log.AmbientContext + metrics StoreRebalancerMetrics st *cluster.Settings rq *replicateQueue replRankings *replicaRankings @@ -66,12 +139,15 @@ func NewStoreRebalancer( replRankings *replicaRankings, ) *StoreRebalancer { ambientCtx.AddLogTag("store-rebalancer", nil) - return &StoreRebalancer{ + sr := &StoreRebalancer{ AmbientContext: ambientCtx, + metrics: makeStoreRebalancerMetrics(), st: st, rq: rq, replRankings: replRankings, } + sr.rq.store.metrics.registry.AddMetricStruct(&sr.metrics) + return sr } // Start runs an infinite loop in a goroutine which regularly checks whether @@ -87,9 +163,7 @@ func NewStoreRebalancer( // // TODO(a-robinson): Expose metrics to make this understandable without having // to dive into logspy. -func (sr *StoreRebalancer) Start( - ctx context.Context, stopper *stop.Stopper, storeID roachpb.StoreID, -) { +func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { ctx = sr.AnnotateCtx(ctx) // Start a goroutine that watches and proactively renews certain @@ -107,39 +181,46 @@ func (sr *StoreRebalancer) Start( case <-ticker.C: } - if !EnableStatsBasedRebalancing.Get(&sr.st.SV) { + mode := LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)) + if mode == LBRebalancingOff { continue } - localDesc, found := sr.rq.allocator.storePool.getStoreDescriptor(storeID) - if !found { - log.Warningf(ctx, "StorePool missing descriptor for local store") - continue - } storeList, _, _ := sr.rq.allocator.storePool.getStoreList(roachpb.RangeID(0), storeFilterNone) - sr.rebalanceStore(ctx, localDesc, storeList) + sr.rebalanceStore(ctx, mode, storeList) } }) } func (sr *StoreRebalancer) rebalanceStore( - ctx context.Context, localDesc roachpb.StoreDescriptor, storeList StoreList, + ctx context.Context, mode LBRebalancingMode, storeList StoreList, ) { - - statThreshold := statRebalanceThreshold.Get(&sr.st.SV) + qpsThresholdFraction := qpsRebalanceThreshold.Get(&sr.st.SV) // First check if we should transfer leases away to better balance QPS. - qpsMinThreshold := math.Min(storeList.candidateQueriesPerSecond.mean*(1-statThreshold), + qpsMinThreshold := math.Min(storeList.candidateQueriesPerSecond.mean*(1-qpsThresholdFraction), storeList.candidateQueriesPerSecond.mean-minQPSThresholdDifference) - qpsMaxThreshold := math.Max(storeList.candidateQueriesPerSecond.mean*(1+statThreshold), + qpsMaxThreshold := math.Max(storeList.candidateQueriesPerSecond.mean*(1+qpsThresholdFraction), storeList.candidateQueriesPerSecond.mean+minQPSThresholdDifference) + var localDesc *roachpb.StoreDescriptor + for i := range storeList.stores { + if storeList.stores[i].StoreID == sr.rq.store.StoreID() { + localDesc = &storeList.stores[i] + } + } + if localDesc == nil { + log.Warningf(ctx, "StorePool missing descriptor for local store") + return + } + if !(localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold) { log.VEventf(ctx, 1, "local QPS %.2f is below max threshold %.2f (mean=%.2f); no rebalancing needed", localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold, storeList.candidateQueriesPerSecond.mean) return } + var replicasToMaybeRebalance []replicaWithStats storeMap := storeListToMap(storeList) sysCfg, cfgOk := sr.rq.allocator.storePool.gossip.GetSystemConfig() if !cfgOk { @@ -147,29 +228,33 @@ func (sr *StoreRebalancer) rebalanceStore( return } - log.Infof(ctx, "considering load-based lease transfers for s%d with %.2f qps (mean=%.2f, upperThreshold=%.2f)", + log.Infof(ctx, + "considering load-based lease transfers for s%d with %.2f qps (mean=%.2f, upperThreshold=%.2f)", localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold) hottestRanges := sr.replRankings.topQPS() for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold { - replWithStats, target := sr.chooseLeaseToTransfer( + replWithStats, target, considerForRebalance := sr.chooseLeaseToTransfer( ctx, sysCfg, &hottestRanges, localDesc, storeList, storeMap, qpsMinThreshold, qpsMaxThreshold) + replicasToMaybeRebalance = append(replicasToMaybeRebalance, considerForRebalance...) if replWithStats.repl == nil { - log.Infof(ctx, - "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f)", - localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold) break } + log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load", replWithStats.repl.RangeID, replWithStats.qps, target.StoreID) - replCtx := replWithStats.repl.AnnotateCtx(ctx) + replCtx, cancel := context.WithTimeout(replWithStats.repl.AnnotateCtx(ctx), sr.rq.processTimeout) if err := sr.rq.transferLease(replCtx, replWithStats.repl, target); err != nil { + cancel() log.Errorf(replCtx, "unable to transfer lease to s%d: %v", target.StoreID, err) continue } + cancel() + sr.metrics.LeaseTransferCount.Inc(1) + // Finally, update our local copies of the descriptors so that if // additional transfers are needed we'll be making the decisions with more - // up-to-date info. + // up-to-date info. The StorePool copies are updated by transferLease. localDesc.Capacity.LeaseCount-- localDesc.Capacity.QueriesPerSecond -= replWithStats.qps if otherDesc := storeMap[target.StoreID]; otherDesc != nil { @@ -177,6 +262,87 @@ func (sr *StoreRebalancer) rebalanceStore( otherDesc.Capacity.QueriesPerSecond += replWithStats.qps } } + + if !(localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold) { + log.Infof(ctx, + "load-based lease transfers successfully brought s%d down to %.2f qps (mean=%.2f, upperThreshold=%.2f)", + localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold) + return + } + + if mode != LBRebalancingLeasesAndReplicas { + log.Infof(ctx, + "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f)", + localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold) + return + } + log.Infof(ctx, + "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f); considering load-based replica rebalances", + localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold) + + // Re-combine replicasToMaybeRebalance with what remains of hottestRanges so + // that we'll reconsider them for replica rebalancing. + replicasToMaybeRebalance = append(replicasToMaybeRebalance, hottestRanges...) + + for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold { + replWithStats, targets := sr.chooseReplicaToRebalance( + ctx, + sysCfg, + &replicasToMaybeRebalance, + localDesc, + storeList, + storeMap, + qpsMinThreshold, + qpsMaxThreshold) + if replWithStats.repl == nil { + log.Infof(ctx, + "ran out of replicas worth transferring and qps (%.2f) is still above desired threshold (%.2f); will check again soon", + localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold) + return + } + + descBeforeRebalance := replWithStats.repl.Desc() + log.VEventf(ctx, 1, "rebalancing r%d (%.2f qps) from %v to %v to better balance load", + replWithStats.repl.RangeID, replWithStats.qps, descBeforeRebalance.Replicas, targets) + replCtx, cancel := context.WithTimeout(replWithStats.repl.AnnotateCtx(ctx), sr.rq.processTimeout) + // TODO(a-robinson): Either make RelocateRange production-ready or do the + // rebalancing another way. + if err := RelocateRange(replCtx, sr.rq.store.DB(), *descBeforeRebalance, targets); err != nil { + cancel() + log.Errorf(replCtx, "unable to relocate range to %v: %v", targets, err) + continue + } + cancel() + sr.metrics.RangeRebalanceCount.Inc(1) + + // Finally, update our local copies of the descriptors so that if + // additional transfers are needed we'll be making the decisions with more + // up-to-date info. + // + // TODO(a-robinson): This just updates the copies used locally by the + // storeRebalancer. We may also want to update the copies in the StorePool + // itself. + for i := range descBeforeRebalance.Replicas { + if storeDesc := storeMap[descBeforeRebalance.Replicas[i].StoreID]; storeDesc != nil { + storeDesc.Capacity.RangeCount-- + } + } + localDesc.Capacity.LeaseCount-- + localDesc.Capacity.QueriesPerSecond -= replWithStats.qps + for i := range targets { + if storeDesc := storeMap[targets[i].StoreID]; storeDesc != nil { + storeDesc.Capacity.RangeCount++ + if i == 0 { + storeDesc.Capacity.LeaseCount++ + storeDesc.Capacity.QueriesPerSecond += replWithStats.qps + } + } + } + } + + log.Infof(ctx, + "load-based replica transfers successfully brought s%d down to %.2f qps (mean=%.2f, upperThreshold=%.2f)", + localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold) } // TODO(a-robinson): Should we take the number of leases on each store into @@ -185,33 +351,27 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( ctx context.Context, sysCfg config.SystemConfig, hottestRanges *[]replicaWithStats, - localDesc roachpb.StoreDescriptor, + localDesc *roachpb.StoreDescriptor, storeList StoreList, storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, minQPS float64, maxQPS float64, -) (replicaWithStats, roachpb.ReplicaDescriptor) { +) (replicaWithStats, roachpb.ReplicaDescriptor, []replicaWithStats) { + var considerForRebalance []replicaWithStats now := sr.rq.store.Clock().Now() for { if len(*hottestRanges) == 0 { - return replicaWithStats{}, roachpb.ReplicaDescriptor{} + return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance } replWithStats := (*hottestRanges)[0] *hottestRanges = (*hottestRanges)[1:] // We're all out of replicas. if replWithStats.repl == nil { - return replicaWithStats{}, roachpb.ReplicaDescriptor{} + return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance } - if !replWithStats.repl.OwnsValidLease(now) { - log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) - continue - } - - if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { - log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", - replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) + if shouldNotMoveAway(ctx, replWithStats, localDesc, now, minQPS) { continue } @@ -228,7 +388,8 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( } desc := replWithStats.repl.Desc() - log.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps", desc.RangeID, replWithStats.qps) + log.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps", + desc.RangeID, replWithStats.qps) // Check all the other replicas in order of increasing qps. replicas := make([]roachpb.ReplicaDescriptor, len(desc.Replicas)) @@ -273,7 +434,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey) if err != nil { log.Error(ctx, err) - return replicaWithStats{}, roachpb.ReplicaDescriptor{} + return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance } preferred := sr.rq.allocator.preferredLeaseholders(zone, desc.Replicas) if len(preferred) > 0 && !storeHasReplica(candidate.StoreID, preferred) { @@ -285,7 +446,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( if sr.rq.allocator.followTheWorkloadPrefersLocal( ctx, filteredStoreList, - localDesc, + *localDesc, candidate.StoreID, desc.Replicas, replWithStats.repl.leaseholderStats, @@ -295,9 +456,179 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( continue } - return replWithStats, candidate + return replWithStats, candidate, considerForRebalance + } + + // If none of the other replicas are valid lease transfer targets, consider + // this range for replica rebalancing. + considerForRebalance = append(considerForRebalance, replWithStats) + } +} + +func (sr *StoreRebalancer) chooseReplicaToRebalance( + ctx context.Context, + sysCfg config.SystemConfig, + hottestRanges *[]replicaWithStats, + localDesc *roachpb.StoreDescriptor, + storeList StoreList, + storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + minQPS float64, + maxQPS float64, +) (replicaWithStats, []roachpb.ReplicationTarget) { + now := sr.rq.store.Clock().Now() + for { + if len(*hottestRanges) == 0 { + return replicaWithStats{}, nil + } + replWithStats := (*hottestRanges)[0] + *hottestRanges = (*hottestRanges)[1:] + + if replWithStats.repl == nil { + return replicaWithStats{}, nil + } + + if shouldNotMoveAway(ctx, replWithStats, localDesc, now, minQPS) { + continue + } + + // Don't bother moving ranges whose QPS is below some small fraction of the + // store's QPS (unless the store has extra ranges to spare anyway). It's + // just unnecessary churn with no benefit to move ranges responsible for, + // for example, 1 qps on a store with 5000 qps. + const minQPSFraction = .001 + if replWithStats.qps < localDesc.Capacity.QueriesPerSecond*minQPSFraction && + float64(localDesc.Capacity.RangeCount) <= storeList.candidateRanges.mean { + log.VEventf(ctx, 5, "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps", + replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, localDesc.Capacity.QueriesPerSecond) + continue + } + + desc := replWithStats.repl.Desc() + log.VEventf(ctx, 3, "considering replica rebalance for r%d with %.2f qps", + desc.RangeID, replWithStats.qps) + + // Pick out the stores that we want the range on, keeping existing replicas + // around if they aren't on overfull stores. + zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey) + if err != nil { + log.Error(ctx, err) + return replicaWithStats{}, nil + } + desiredReplicas := int(zone.NumReplicas) + targets := make([]roachpb.ReplicationTarget, 0, desiredReplicas) + targetReplicas := make([]roachpb.ReplicaDescriptor, 0, desiredReplicas) + + // Check the range's existing diversity score, since we want to ensure we + // don't hurt locality diversity just to improve QPS. + curDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalities(desc.Replicas)) + + // Check the existing replicas, keeping around those that aren't overloaded. + for i := range desc.Replicas { + if desc.Replicas[i].StoreID == localDesc.StoreID { + continue + } + // Keep the replica in the range if we don't know its QPS or if its QPS + // is below the upper threshold. Punishing stores not in our store map + // could cause mass evictions if the storePool gets out of sync. + storeDesc, ok := storeMap[desc.Replicas[i].StoreID] + if !ok || storeDesc.Capacity.QueriesPerSecond < maxQPS { + targets = append(targets, roachpb.ReplicationTarget{ + NodeID: desc.Replicas[i].NodeID, + StoreID: desc.Replicas[i].StoreID, + }) + targetReplicas = append(targetReplicas, roachpb.ReplicaDescriptor{ + NodeID: desc.Replicas[i].NodeID, + StoreID: desc.Replicas[i].StoreID, + }) + } + } + + // Then pick out which new stores to add the remaining replicas to. + for len(targets) < desiredReplicas { + // Use the preexisting AllocateTarget logic to ensure that considerations + // such as zone constraints, locality diversity, and full disk come + // into play. + rangeInfo := rangeInfoForRepl(replWithStats.repl, desc) + options := sr.rq.allocator.scorerOptions() + options.qpsRebalanceThreshold = qpsRebalanceThreshold.Get(&sr.st.SV) + target, _ := sr.rq.allocator.allocateTargetFromList( + ctx, + storeList, + zone, + targetReplicas, + rangeInfo, + options, + ) + if target == nil { + log.VEventf(ctx, 3, "no rebalance targets found to replace the current store for r%d", + desc.RangeID) + break + } + + targets = append(targets, roachpb.ReplicationTarget{ + NodeID: target.Node.NodeID, + StoreID: target.StoreID, + }) + targetReplicas = append(targetReplicas, roachpb.ReplicaDescriptor{ + NodeID: target.Node.NodeID, + StoreID: target.StoreID, + }) + } + + // If we couldn't find enough valid targets, forget about this range. + // + // TODO(a-robinson): Support more incremental improvements -- move what we + // can if it makes things better even if it isn't great. For example, + // moving one of the other existing replicas that's on a store with less + // qps than the max threshold but above the mean would help in certain + // locality configurations. + if len(targets) < desiredReplicas { + log.VEventf(ctx, 3, "couldn't find enough rebalance targets for r%d (%d/%d)", + desc.RangeID, len(targets), desiredReplicas) + continue } + newDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalities(targetReplicas)) + if newDiversity < curDiversity { + log.VEventf(ctx, 3, + "new diversity %.2f for r%d worse than current diversity %.2f; not rebalancing", + newDiversity, desc.RangeID, curDiversity) + continue + } + + // Pick the replica with the least QPS to be leaseholder; + // TestingRelocateRange transfers the lease to the first provided + // target. + newLeaseIdx := 0 + newLeaseQPS := math.MaxFloat64 + for i := 0; i < len(targets); i++ { + storeDesc, ok := storeMap[desc.Replicas[i].StoreID] + if ok && storeDesc.Capacity.QueriesPerSecond < newLeaseQPS { + newLeaseIdx = i + newLeaseQPS = storeDesc.Capacity.QueriesPerSecond + } + } + targets[0], targets[newLeaseIdx] = targets[newLeaseIdx], targets[0] + return replWithStats, targets + } +} + +func shouldNotMoveAway( + ctx context.Context, + replWithStats replicaWithStats, + localDesc *roachpb.StoreDescriptor, + now hlc.Timestamp, + minQPS float64, +) bool { + if !replWithStats.repl.OwnsValidLease(now) { + log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) + return true + } + if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { + log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", + replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) + return true } + return false } func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor { diff --git a/pkg/storage/store_rebalancer_test.go b/pkg/storage/store_rebalancer_test.go index bec59923cf9d..c631923c72c4 100644 --- a/pkg/storage/store_rebalancer_test.go +++ b/pkg/storage/store_rebalancer_test.go @@ -155,8 +155,8 @@ func TestChooseLeaseToTransfer(t *testing.T) { for _, tc := range testCases { loadRanges(rr, s, []testRange{{storeIDs: tc.storeIDs, qps: tc.qps}}) hottestRanges := rr.topQPS() - _, target := sr.chooseLeaseToTransfer( - ctx, config.SystemConfig{}, &hottestRanges, localDesc, storeList, storeMap, minQPS, maxQPS) + _, target, _ := sr.chooseLeaseToTransfer( + ctx, config.SystemConfig{}, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS) if target.StoreID != tc.expectTarget { t.Errorf("got target store %d for range with replicas %v and %f qps; want %d", target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget) diff --git a/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/replication.tsx b/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/replication.tsx index 135bd59f95a6..c81e41a2eb9d 100644 --- a/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/replication.tsx +++ b/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/replication.tsx @@ -85,6 +85,8 @@ export default function (props: GraphDashboardProps) { + + ,