From 50de562da3089741714afe3c210ba4da831fcfeb Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Mon, 13 Aug 2018 14:16:59 -0500 Subject: [PATCH 1/5] storage: rebalance replicas based on load at the store level Follow-up to #28340, which did this for just leases. Fixes #17979 Release note (performance improvement): Range replicas will be automatically rebalanced throughout the cluster to even out the amount of QPS being handled by each node. --- docs/generated/settings/settings.html | 4 +- pkg/storage/allocator.go | 43 ++-- pkg/storage/allocator_scorer.go | 16 +- pkg/storage/replica_rankings.go | 1 + pkg/storage/store.go | 2 +- pkg/storage/store_rebalancer.go | 322 +++++++++++++++++++++++--- pkg/storage/store_rebalancer_test.go | 4 +- 7 files changed, 340 insertions(+), 52 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 10fa5d7b8d44..f54d27c27dd0 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -18,8 +18,8 @@ kv.allocator.lease_rebalancing_aggressivenessfloat1set greater than 1.0 to rebalance leases toward load more aggressively, or between 0 and 1.0 to be more conservative about rebalancing leases kv.allocator.load_based_lease_rebalancing.enabledbooleantrueset to enable rebalancing of range leases based on load and latency kv.allocator.range_rebalance_thresholdfloat0.05minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull -kv.allocator.stat_based_rebalancing.enabledbooleanfalseset to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster -kv.allocator.stat_rebalance_thresholdfloat0.2minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull +kv.allocator.stat_based_rebalancing.enabledbooleantrueset to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster +kv.allocator.stat_rebalance_thresholdfloat0.2minimum fraction away from the mean a store's stats (such as queries per second) can be before it is considered overfull or underfull kv.bulk_io_write.concurrent_export_requestsinteger5number of export requests a store will handle concurrently before queuing kv.bulk_io_write.concurrent_import_requestsinteger1number of import requests a store will handle concurrently before queuing kv.bulk_io_write.max_ratebyte size8.0 EiBthe rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index ba2646ceaa45..93fac4ecafbf 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -361,9 +361,36 @@ func (a *Allocator) AllocateTarget( ) (*roachpb.StoreDescriptor, string, error) { sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeInfo.Desc.RangeID, storeFilterThrottled) + target, details := a.allocateTargetFromList( + ctx, sl, zone, existing, rangeInfo, a.scorerOptions(disableStatsBasedRebalancing)) + + if target != nil { + return target, details, nil + } + + // When there are throttled stores that do match, we shouldn't send + // the replica to purgatory. + if throttledStoreCount > 0 { + return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount) + } + return nil, "", &allocatorError{ + constraints: zone.Constraints, + existingReplicas: len(existing), + aliveStores: aliveStoreCount, + throttledStores: throttledStoreCount, + } +} + +func (a *Allocator) allocateTargetFromList( + ctx context.Context, + sl StoreList, + zone config.ZoneConfig, + existing []roachpb.ReplicaDescriptor, + rangeInfo RangeInfo, + options scorerOptions, +) (*roachpb.StoreDescriptor, string) { analyzedConstraints := analyzeConstraints( ctx, a.storePool.getStoreDescriptor, existing, zone) - options := a.scorerOptions(disableStatsBasedRebalancing) candidates := allocateCandidates( sl, analyzedConstraints, existing, rangeInfo, a.storePool.getLocalities(existing), options, ) @@ -379,20 +406,10 @@ func (a *Allocator) AllocateTarget( if err != nil { log.Warningf(ctx, "failed to marshal details for choosing allocate target: %s", err) } - return &target.store, string(detailsBytes), nil + return &target.store, string(detailsBytes) } - // When there are throttled stores that do match, we shouldn't send - // the replica to purgatory. - if throttledStoreCount > 0 { - return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount) - } - return nil, "", &allocatorError{ - constraints: zone.Constraints, - existingReplicas: len(existing), - aliveStores: aliveStoreCount, - throttledStores: throttledStoreCount, - } + return nil, "" } func (a Allocator) simulateRemoveTarget( diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go index 3a9155692bf4..289c4a00297c 100644 --- a/pkg/storage/allocator_scorer.go +++ b/pkg/storage/allocator_scorer.go @@ -60,7 +60,7 @@ const ( var EnableStatsBasedRebalancing = settings.RegisterBoolSetting( "kv.allocator.stat_based_rebalancing.enabled", "set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster", - false, // TODO(a-robinson): switch to true for v2.1 once the store-rebalancer is done + true, ) // rangeRebalanceThreshold is the minimum ratio of a store's range count to @@ -85,12 +85,13 @@ var rangeRebalanceThreshold = settings.RegisterNonNegativeFloatSetting( // TODO(a-robinson): Should disk usage be held to a higher standard than this? var statRebalanceThreshold = settings.RegisterNonNegativeFloatSetting( "kv.allocator.stat_rebalance_threshold", - "minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull", + "minimum fraction away from the mean a store's stats (such as queries per second) can be before it is considered overfull or underfull", 0.20, ) type scorerOptions struct { deterministic bool + balanceQPSInsteadOfCount bool statsBasedRebalancingEnabled bool rangeRebalanceThreshold float64 statRebalanceThreshold float64 @@ -442,11 +443,22 @@ func allocateCandidates( } diversityScore := diversityAllocateScore(s, existingNodeLocalities) balanceScore := balanceScore(sl, s.Capacity, rangeInfo, options) + var convergesScore int + if options.balanceQPSInsteadOfCount { + if s.Capacity.QueriesPerSecond < underfullStatThreshold(options, sl.candidateQueriesPerSecond.mean) { + convergesScore = 1 + } else if s.Capacity.QueriesPerSecond < sl.candidateQueriesPerSecond.mean { + convergesScore = 0 + } else { + convergesScore = -1 + } + } candidates = append(candidates, candidate{ store: s, valid: constraintsOK, necessary: necessary, diversityScore: diversityScore, + convergesScore: convergesScore, balanceScore: balanceScore, rangeCount: int(s.Capacity.RangeCount), }) diff --git a/pkg/storage/replica_rankings.go b/pkg/storage/replica_rankings.go index 13b4a99f582d..3810fd9c943c 100644 --- a/pkg/storage/replica_rankings.go +++ b/pkg/storage/replica_rankings.go @@ -28,6 +28,7 @@ const ( type replicaWithStats struct { repl *Replica qps float64 + // TODO(a-robinson): Include writes-per-second and logicalBytes of storage? } // replicaRankings maintains top-k orderings of the replicas in a store along diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 1861ce599a66..1f919dd98e78 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1560,7 +1560,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Connect rangefeeds to closed timestamp updates. s.startClosedTimestampRangefeedSubscriber(ctx) - s.storeRebalancer.Start(ctx, s.stopper, s.StoreID()) + s.storeRebalancer.Start(ctx, s.stopper) // Start the storage engine compactor. if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) { diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go index 5be226f33073..1f9afe958dfa 100644 --- a/pkg/storage/store_rebalancer.go +++ b/pkg/storage/store_rebalancer.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" ) @@ -87,9 +88,7 @@ func NewStoreRebalancer( // // TODO(a-robinson): Expose metrics to make this understandable without having // to dive into logspy. -func (sr *StoreRebalancer) Start( - ctx context.Context, stopper *stop.Stopper, storeID roachpb.StoreID, -) { +func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { ctx = sr.AnnotateCtx(ctx) // Start a goroutine that watches and proactively renews certain @@ -111,20 +110,13 @@ func (sr *StoreRebalancer) Start( continue } - localDesc, found := sr.rq.allocator.storePool.getStoreDescriptor(storeID) - if !found { - log.Warningf(ctx, "StorePool missing descriptor for local store") - continue - } storeList, _, _ := sr.rq.allocator.storePool.getStoreList(roachpb.RangeID(0), storeFilterNone) - sr.rebalanceStore(ctx, localDesc, storeList) + sr.rebalanceStore(ctx, storeList) } }) } -func (sr *StoreRebalancer) rebalanceStore( - ctx context.Context, localDesc roachpb.StoreDescriptor, storeList StoreList, -) { +func (sr *StoreRebalancer) rebalanceStore(ctx context.Context, storeList StoreList) { statThreshold := statRebalanceThreshold.Get(&sr.st.SV) @@ -134,12 +126,24 @@ func (sr *StoreRebalancer) rebalanceStore( qpsMaxThreshold := math.Max(storeList.candidateQueriesPerSecond.mean*(1+statThreshold), storeList.candidateQueriesPerSecond.mean+minQPSThresholdDifference) + var localDesc *roachpb.StoreDescriptor + for i := range storeList.stores { + if storeList.stores[i].StoreID == sr.rq.store.StoreID() { + localDesc = &storeList.stores[i] + } + } + if localDesc == nil { + log.Warningf(ctx, "StorePool missing descriptor for local store") + return + } + if !(localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold) { log.VEventf(ctx, 1, "local QPS %.2f is below max threshold %.2f (mean=%.2f); no rebalancing needed", localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold, storeList.candidateQueriesPerSecond.mean) return } + var replicasToMaybeRebalance []replicaWithStats storeMap := storeListToMap(storeList) sysCfg, cfgOk := sr.rq.allocator.storePool.gossip.GetSystemConfig() if !cfgOk { @@ -147,29 +151,39 @@ func (sr *StoreRebalancer) rebalanceStore( return } - log.Infof(ctx, "considering load-based lease transfers for s%d with %.2f qps (mean=%.2f, upperThreshold=%.2f)", + log.Infof(ctx, + "considering load-based lease transfers for s%d with %.2f qps (mean=%.2f, upperThreshold=%.2f)", localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold) hottestRanges := sr.replRankings.topQPS() for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold { - replWithStats, target := sr.chooseLeaseToTransfer( + replWithStats, target, considerForRebalance := sr.chooseLeaseToTransfer( ctx, sysCfg, &hottestRanges, localDesc, storeList, storeMap, qpsMinThreshold, qpsMaxThreshold) + replicasToMaybeRebalance = append(replicasToMaybeRebalance, considerForRebalance...) if replWithStats.repl == nil { log.Infof(ctx, - "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f)", + "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f); considering load-based replica rebalances", localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold) break } + log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load", replWithStats.repl.RangeID, replWithStats.qps, target.StoreID) - replCtx := replWithStats.repl.AnnotateCtx(ctx) + replCtx, cancel := context.WithTimeout(replWithStats.repl.AnnotateCtx(ctx), sr.rq.processTimeout) if err := sr.rq.transferLease(replCtx, replWithStats.repl, target); err != nil { + cancel() log.Errorf(replCtx, "unable to transfer lease to s%d: %v", target.StoreID, err) continue } + cancel() + // Finally, update our local copies of the descriptors so that if // additional transfers are needed we'll be making the decisions with more // up-to-date info. + // + // TODO(a-robinson): This just updates the copies used locally by the + // storeRebalancer. We may also want to update the copies in the StorePool + // itself. localDesc.Capacity.LeaseCount-- localDesc.Capacity.QueriesPerSecond -= replWithStats.qps if otherDesc := storeMap[target.StoreID]; otherDesc != nil { @@ -177,6 +191,76 @@ func (sr *StoreRebalancer) rebalanceStore( otherDesc.Capacity.QueriesPerSecond += replWithStats.qps } } + + if !(localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold) { + log.Infof(ctx, + "load-based lease transfers successfully brought s%d down to %.2f qps (mean=%.2f, upperThreshold=%.2f)", + localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold) + return + } + + // Re-combine replicasToMaybeRebalance with what remains of hottestRanges so + // that we'll reconsider them for replica rebalancing. + replicasToMaybeRebalance = append(replicasToMaybeRebalance, hottestRanges...) + + for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold { + replWithStats, targets := sr.chooseReplicaToRebalance( + ctx, + sysCfg, + &replicasToMaybeRebalance, + localDesc, + storeList, + storeMap, + qpsMinThreshold, + qpsMaxThreshold) + if replWithStats.repl == nil { + log.Infof(ctx, + "ran out of replicas worth transferring and qps (%.2f) is still above desired threshold (%.2f); will check again soon", + localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold) + return + } + + descBeforeRebalance := replWithStats.repl.Desc() + log.VEventf(ctx, 1, "rebalancing r%d (%.2f qps) from %v to %v to better balance load", + replWithStats.repl.RangeID, replWithStats.qps, descBeforeRebalance.Replicas, targets) + replCtx, cancel := context.WithTimeout(replWithStats.repl.AnnotateCtx(ctx), sr.rq.processTimeout) + // TODO: Either make RelocateRange production-ready or do the rebalancing + // another way. + if err := RelocateRange(replCtx, sr.rq.store.DB(), *descBeforeRebalance, targets); err != nil { + cancel() + log.Errorf(replCtx, "unable to relocate range to %v: %v", targets, err) + continue + } + cancel() + + // Finally, update our local copies of the descriptors so that if + // additional transfers are needed we'll be making the decisions with more + // up-to-date info. + // + // TODO(a-robinson): This just updates the copies used locally by the + // storeRebalancer. We may also want to update the copies in the StorePool + // itself. + for i := range descBeforeRebalance.Replicas { + if storeDesc := storeMap[descBeforeRebalance.Replicas[i].StoreID]; storeDesc != nil { + storeDesc.Capacity.RangeCount-- + } + } + localDesc.Capacity.LeaseCount-- + localDesc.Capacity.QueriesPerSecond -= replWithStats.qps + for i := range targets { + if storeDesc := storeMap[targets[i].StoreID]; storeDesc != nil { + storeDesc.Capacity.RangeCount++ + if i == 0 { + storeDesc.Capacity.LeaseCount++ + storeDesc.Capacity.QueriesPerSecond += replWithStats.qps + } + } + } + } + + log.Infof(ctx, + "load-based replica transfers successfully brought s%d down to %.2f qps (mean=%.2f, upperThreshold=%.2f)", + localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold) } // TODO(a-robinson): Should we take the number of leases on each store into @@ -185,33 +269,27 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( ctx context.Context, sysCfg config.SystemConfig, hottestRanges *[]replicaWithStats, - localDesc roachpb.StoreDescriptor, + localDesc *roachpb.StoreDescriptor, storeList StoreList, storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, minQPS float64, maxQPS float64, -) (replicaWithStats, roachpb.ReplicaDescriptor) { +) (replicaWithStats, roachpb.ReplicaDescriptor, []replicaWithStats) { + var considerForRebalance []replicaWithStats now := sr.rq.store.Clock().Now() for { if len(*hottestRanges) == 0 { - return replicaWithStats{}, roachpb.ReplicaDescriptor{} + return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance } replWithStats := (*hottestRanges)[0] *hottestRanges = (*hottestRanges)[1:] // We're all out of replicas. if replWithStats.repl == nil { - return replicaWithStats{}, roachpb.ReplicaDescriptor{} - } - - if !replWithStats.repl.OwnsValidLease(now) { - log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) - continue + return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance } - if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { - log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", - replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) + if shouldNotMoveAway(ctx, replWithStats, localDesc, now, minQPS) { continue } @@ -228,7 +306,8 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( } desc := replWithStats.repl.Desc() - log.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps", desc.RangeID, replWithStats.qps) + log.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps", + desc.RangeID, replWithStats.qps) // Check all the other replicas in order of increasing qps. replicas := make([]roachpb.ReplicaDescriptor, len(desc.Replicas)) @@ -273,7 +352,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey) if err != nil { log.Error(ctx, err) - return replicaWithStats{}, roachpb.ReplicaDescriptor{} + return replicaWithStats{}, roachpb.ReplicaDescriptor{}, considerForRebalance } preferred := sr.rq.allocator.preferredLeaseholders(zone, desc.Replicas) if len(preferred) > 0 && !storeHasReplica(candidate.StoreID, preferred) { @@ -285,7 +364,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( if sr.rq.allocator.followTheWorkloadPrefersLocal( ctx, filteredStoreList, - localDesc, + *localDesc, candidate.StoreID, desc.Replicas, replWithStats.repl.leaseholderStats, @@ -295,11 +374,181 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer( continue } - return replWithStats, candidate + return replWithStats, candidate, considerForRebalance + } + + // If none of the other replicas are valid lease transfer targets, consider + // this range for replica rebalancing. + considerForRebalance = append(considerForRebalance, replWithStats) + } +} + +func (sr *StoreRebalancer) chooseReplicaToRebalance( + ctx context.Context, + sysCfg config.SystemConfig, + hottestRanges *[]replicaWithStats, + localDesc *roachpb.StoreDescriptor, + storeList StoreList, + storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + minQPS float64, + maxQPS float64, +) (replicaWithStats, []roachpb.ReplicationTarget) { + now := sr.rq.store.Clock().Now() + for { + if len(*hottestRanges) == 0 { + return replicaWithStats{}, nil + } + replWithStats := (*hottestRanges)[0] + *hottestRanges = (*hottestRanges)[1:] + + if replWithStats.repl == nil { + return replicaWithStats{}, nil + } + + if shouldNotMoveAway(ctx, replWithStats, localDesc, now, minQPS) { + continue + } + + // Don't bother moving ranges whose QPS is below some small fraction of the + // store's QPS (unless the store has extra ranges to spare anyway). It's + // just unnecessary churn with no benefit to move ranges responsible for, + // for example, 1 qps on a store with 5000 qps. + const minQPSFraction = .001 + if replWithStats.qps < localDesc.Capacity.QueriesPerSecond*minQPSFraction && + float64(localDesc.Capacity.RangeCount) <= storeList.candidateRanges.mean { + log.VEventf(ctx, 5, "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps", + replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, localDesc.Capacity.QueriesPerSecond) + continue + } + + desc := replWithStats.repl.Desc() + log.VEventf(ctx, 3, "considering replica rebalance for r%d with %.2f qps", + desc.RangeID, replWithStats.qps) + + // Pick out the stores that we want the range on, keeping existing replicas + // around if they aren't on overfull stores. + zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey) + if err != nil { + log.Error(ctx, err) + return replicaWithStats{}, nil + } + desiredReplicas := int(zone.NumReplicas) + targets := make([]roachpb.ReplicationTarget, 0, desiredReplicas) + targetReplicas := make([]roachpb.ReplicaDescriptor, 0, desiredReplicas) + + // Check the range's existing diversity score, since we want to ensure we + // don't hurt locality diversity just to improve QPS. + curDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalities(desc.Replicas)) + + // Check the existing replicas, keeping around those that aren't overloaded. + for i := range desc.Replicas { + if desc.Replicas[i].StoreID == localDesc.StoreID { + continue + } + // Keep the replica in the range if we don't know its QPS or if its QPS + // is below the upper threshold. Punishing stores not in our store map + // could cause mass evictions if the storePool gets out of sync. + storeDesc, ok := storeMap[desc.Replicas[i].StoreID] + if !ok || storeDesc.Capacity.QueriesPerSecond < maxQPS { + targets = append(targets, roachpb.ReplicationTarget{ + NodeID: desc.Replicas[i].NodeID, + StoreID: desc.Replicas[i].StoreID, + }) + targetReplicas = append(targetReplicas, roachpb.ReplicaDescriptor{ + NodeID: desc.Replicas[i].NodeID, + StoreID: desc.Replicas[i].StoreID, + }) + } + } + + // Then pick out which new stores to add the remaining replicas to. + for len(targets) < desiredReplicas { + // Use the preexisting AllocateTarget logic to ensure that considerations + // such as zone constraints, locality diversity, and full disk come + // into play. + rangeInfo := rangeInfoForRepl(replWithStats.repl, desc) + options := sr.rq.allocator.scorerOptions(false /* disableStatsBasedRebalancing */) + options.balanceQPSInsteadOfCount = true + target, _ := sr.rq.allocator.allocateTargetFromList( + ctx, + storeList, + zone, + targetReplicas, + rangeInfo, + options, + ) + if target == nil { + log.VEventf(ctx, 3, "no rebalance targets found to replace the current store for r%d", + desc.RangeID) + break + } + + targets = append(targets, roachpb.ReplicationTarget{ + NodeID: target.Node.NodeID, + StoreID: target.StoreID, + }) + targetReplicas = append(targetReplicas, roachpb.ReplicaDescriptor{ + NodeID: target.Node.NodeID, + StoreID: target.StoreID, + }) + } + + // If we couldn't find enough valid targets, forget about this range. + // + // TODO(a-robinson): Support more incremental improvements -- move what we + // can if it makes things better even if it isn't great. For example, + // moving one of the other existing replicas that's on a store with less + // qps than the max threshold but above the mean would help in certain + // locality configurations. + if len(targets) < desiredReplicas { + log.VEventf(ctx, 3, "couldn't find enough rebalance targets for r%d (%d/%d)", + desc.RangeID, len(targets), desiredReplicas) + continue + } + newDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalities(targetReplicas)) + if newDiversity < curDiversity { + log.VEventf(ctx, 3, + "new diversity %.2f for r%d worse than current diversity %.2f; not rebalancing", + newDiversity, desc.RangeID, curDiversity) + continue + } + + // Pick the replica with the least QPS to be leaseholder; + // TestingRelocateRange transfers the lease to the first provided + // target. + newLeaseIdx := 0 + newLeaseQPS := math.MaxFloat64 + for i := 0; i < len(targets); i++ { + storeDesc, ok := storeMap[desc.Replicas[i].StoreID] + if ok && storeDesc.Capacity.QueriesPerSecond < newLeaseQPS { + newLeaseIdx = i + newLeaseQPS = storeDesc.Capacity.QueriesPerSecond + } } + targets[0], targets[newLeaseIdx] = targets[newLeaseIdx], targets[0] + return replWithStats, targets } } +func shouldNotMoveAway( + ctx context.Context, + replWithStats replicaWithStats, + localDesc *roachpb.StoreDescriptor, + now hlc.Timestamp, + minQPS float64, +) bool { + if !replWithStats.repl.OwnsValidLease(now) { + log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) + return true + } + if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { + log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", + replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) + return true + } + return false +} + func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor { storeMap := make(map[roachpb.StoreID]*roachpb.StoreDescriptor) for i := range sl.stores { @@ -307,3 +556,12 @@ func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor { } return storeMap } + +func existingTarget(targets []roachpb.ReplicationTarget, candidateID roachpb.StoreID) bool { + for _, target := range targets { + if candidateID == target.StoreID { + return true + } + } + return false +} diff --git a/pkg/storage/store_rebalancer_test.go b/pkg/storage/store_rebalancer_test.go index bec59923cf9d..c631923c72c4 100644 --- a/pkg/storage/store_rebalancer_test.go +++ b/pkg/storage/store_rebalancer_test.go @@ -155,8 +155,8 @@ func TestChooseLeaseToTransfer(t *testing.T) { for _, tc := range testCases { loadRanges(rr, s, []testRange{{storeIDs: tc.storeIDs, qps: tc.qps}}) hottestRanges := rr.topQPS() - _, target := sr.chooseLeaseToTransfer( - ctx, config.SystemConfig{}, &hottestRanges, localDesc, storeList, storeMap, minQPS, maxQPS) + _, target, _ := sr.chooseLeaseToTransfer( + ctx, config.SystemConfig{}, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS) if target.StoreID != tc.expectTarget { t.Errorf("got target store %d for range with replicas %v and %f qps; want %d", target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget) From b8da9c9f22ce084612eb50fa72be0899fa996255 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Mon, 20 Aug 2018 03:41:42 -0500 Subject: [PATCH 2/5] storage: Prevent usage of old stats-based rebalancing logic This leaves properly cleaning up the code for later, but ensures that the existing cluster setting will enable store-level rebalancing rather than the old experimental write/disk-based rebalancing. Release note: None --- pkg/storage/allocator_scorer.go | 112 ++------------------------------ 1 file changed, 5 insertions(+), 107 deletions(-) diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go index 289c4a00297c..3ea0588cf010 100644 --- a/pkg/storage/allocator_scorer.go +++ b/pkg/storage/allocator_scorer.go @@ -113,10 +113,7 @@ func (bd balanceDimensions) String() string { } func (bd balanceDimensions) compactString(options scorerOptions) string { - if !options.statsBasedRebalancingEnabled { - return fmt.Sprintf("%d", bd.ranges) - } - return bd.String() + return fmt.Sprintf("%d", bd.ranges) } // candidate store for allocation. @@ -161,10 +158,6 @@ func (c candidate) compactString(options scorerOptions) string { } fmt.Fprintf(&buf, ", converges:%d, balance:%s, rangeCount:%d", c.convergesScore, c.balanceScore.compactString(options), c.rangeCount) - if options.statsBasedRebalancingEnabled { - fmt.Fprintf(&buf, ", logicalBytes:%s, writesPerSecond:%.2f", - humanizeutil.IBytes(c.store.Capacity.LogicalBytes), c.store.Capacity.WritesPerSecond) - } if c.details != "" { fmt.Fprintf(&buf, ", details:(%s)", c.details) } @@ -854,57 +847,7 @@ func shouldRebalance( rangeInfo RangeInfo, options scorerOptions, ) bool { - if !options.statsBasedRebalancingEnabled { - return shouldRebalanceNoStats(ctx, store, sl, options) - } - - // Rebalance if this store is full enough that the range is a bad fit. - score := balanceScore(sl, store.Capacity, rangeInfo, options) - if rangeIsBadFit(score) { - log.VEventf(ctx, 2, - "s%d: should-rebalance(bad-fit): balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+ - "(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ", - store.StoreID, score, store.Capacity, rangeInfo, - sl.candidateRanges.mean, humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), - sl.candidateWritesPerSecond.mean) - return true - } - - // Rebalance if there exists another store that is very in need of the - // range and this store is a somewhat bad match for it. - if rangeIsPoorFit(score) { - for _, desc := range sl.stores { - otherScore := balanceScore(sl, desc.Capacity, rangeInfo, options) - if !rangeIsGoodFit(otherScore) { - log.VEventf(ctx, 5, - "s%d is not a good enough fit to replace s%d: balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+ - "(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ", - desc.StoreID, store.StoreID, otherScore, desc.Capacity, rangeInfo, - sl.candidateRanges.mean, humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), - sl.candidateWritesPerSecond.mean) - continue - } - if storeHasReplica(desc.StoreID, rangeInfo.Desc.Replicas) { - continue - } - log.VEventf(ctx, 2, - "s%d: should-rebalance(better-fit=s%d): balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+ - "otherScore=%s, otherCapacity=(%v), "+ - "(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ", - store.StoreID, desc.StoreID, score, store.Capacity, rangeInfo, - otherScore, desc.Capacity, sl.candidateRanges.mean, - humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), sl.candidateWritesPerSecond.mean) - return true - } - } - - // If we reached this point, we're happy with the range where it is. - log.VEventf(ctx, 3, - "s%d: should-not-rebalance: balanceScore=%s, capacity=(%v), rangeInfo=%+v, "+ - "(meanRangeCount=%.1f, meanDiskUsage=%s, meanWritesPerSecond=%.2f), ", - store.StoreID, score, store.Capacity, rangeInfo, sl.candidateRanges.mean, - humanizeutil.IBytes(int64(sl.candidateLogicalBytes.mean)), sl.candidateWritesPerSecond.mean) - return false + return shouldRebalanceNoStats(ctx, store, sl, options) } // shouldRebalance implements the decision of whether to rebalance for the case @@ -1327,22 +1270,6 @@ func balanceScore( } else { dimensions.ranges = balanced } - if options.statsBasedRebalancingEnabled { - dimensions.bytes = balanceContribution( - options, - dimensions.ranges, - sl.candidateLogicalBytes.mean, - float64(sc.LogicalBytes), - sc.BytesPerReplica, - float64(rangeInfo.LogicalBytes)) - dimensions.writes = balanceContribution( - options, - dimensions.ranges, - sl.candidateWritesPerSecond.mean, - sc.WritesPerSecond, - sc.WritesPerReplica, - rangeInfo.WritesPerSecond) - } return dimensions } @@ -1458,17 +1385,11 @@ func rangeIsPoorFit(bd balanceDimensions) bool { } func overfullRangeThreshold(options scorerOptions, mean float64) float64 { - if !options.statsBasedRebalancingEnabled { - return mean * (1 + options.rangeRebalanceThreshold) - } - return math.Max(mean*(1+options.rangeRebalanceThreshold), mean+5) + return mean * (1 + options.rangeRebalanceThreshold) } func underfullRangeThreshold(options scorerOptions, mean float64) float64 { - if !options.statsBasedRebalancingEnabled { - return mean * (1 - options.rangeRebalanceThreshold) - } - return math.Min(mean*(1-options.rangeRebalanceThreshold), mean-5) + return mean * (1 - options.rangeRebalanceThreshold) } func overfullStatThreshold(options scorerOptions, mean float64) float64 { @@ -1511,30 +1432,7 @@ func rebalanceConvergesOnMean( newWritesPerSecond float64, options scorerOptions, ) bool { - if !options.statsBasedRebalancingEnabled { - return convergesOnMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean) - } - - // Note that we check both converges and diverges. If we always decremented - // convergeCount when something didn't converge, ranges with stats equal to 0 - // would almost never converge (and thus almost never get rebalanced). - var convergeCount int - if convergesOnMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean) { - convergeCount++ - } else if divergesFromMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean) { - convergeCount-- - } - if convergesOnMean(float64(sc.LogicalBytes), float64(newLogicalBytes), sl.candidateLogicalBytes.mean) { - convergeCount++ - } else if divergesFromMean(float64(sc.LogicalBytes), float64(newLogicalBytes), sl.candidateLogicalBytes.mean) { - convergeCount-- - } - if convergesOnMean(sc.WritesPerSecond, newWritesPerSecond, sl.candidateWritesPerSecond.mean) { - convergeCount++ - } else if divergesFromMean(sc.WritesPerSecond, newWritesPerSecond, sl.candidateWritesPerSecond.mean) { - convergeCount-- - } - return convergeCount > 0 + return convergesOnMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean) } func convergesOnMean(oldVal, newVal, mean float64) bool { From 73fcf3e688281cbf50a0737ffe3a1b6e16359d2f Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Mon, 20 Aug 2018 04:12:40 -0500 Subject: [PATCH 3/5] roachtest: Add test for load-based replica rebalancing It's identical to the test for load-based lease rebalancing, just with more than 3 nodes such that replicas must be rebalanced in addition to leases in order for load to be properly spread across all nodes. Release note: None --- pkg/cmd/roachtest/rebalance_load.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/cmd/roachtest/rebalance_load.go b/pkg/cmd/roachtest/rebalance_load.go index 698f2f43a83f..716dbe0145bc 100644 --- a/pkg/cmd/roachtest/rebalance_load.go +++ b/pkg/cmd/roachtest/rebalance_load.go @@ -107,20 +107,30 @@ func registerRebalanceLoad(r *registry) { } } - minutes := 2 * time.Minute - numNodes := 4 // the last node is just used to generate load concurrency := 128 r.Add(testSpec{ Name: `rebalance-leases-by-load`, - Nodes: nodes(numNodes), - Stable: false, // TODO(a-robinson): Promote to stable + Nodes: nodes(4), // the last node is just used to generate load + Stable: false, // TODO(a-robinson): Promote to stable Run: func(ctx context.Context, t *test, c *cluster) { if local { concurrency = 32 fmt.Printf("lowering concurrency to %d in local testing\n", concurrency) } - rebalanceLoadRun(ctx, t, c, minutes, concurrency) + rebalanceLoadRun(ctx, t, c, 2*time.Minute, concurrency) + }, + }) + r.Add(testSpec{ + Name: `rebalance-replicas-by-load`, + Nodes: nodes(7), // the last node is just used to generate load + Stable: false, // TODO(a-robinson): Promote to stable + Run: func(ctx context.Context, t *test, c *cluster) { + if local { + concurrency = 32 + fmt.Printf("lowering concurrency to %d in local testing\n", concurrency) + } + rebalanceLoadRun(ctx, t, c, 5*time.Minute, concurrency) }, }) } From 9d4d78dd15b0230a2b83e27d9bc75b3a45a53b3d Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Wed, 29 Aug 2018 11:59:42 -0500 Subject: [PATCH 4/5] storage: Remove old experimental stats-based rebalancing code/settings This cleans up all the old code, settings, and tests without massively overhauling the structure of things. More could be done to simplify things, but this is the least intrusive set of changes that seem appropriate so late in the development cycle. Release note (backwards-incompatible change): The experimental, non-recommended stat-based rebalancing setting controlled by the kv.allocator.stat_based_rebalancing.enabled and kv.allocator.stat_rebalance_threshold cluster settings has been removed and replaced by a new, better supported approach to load-based rebalancing that can be controlled via the new kv.allocator.load_based_rebalancing cluster setting. By default, leases will be rebalanced within a cluster to achieve better QPS balance. --- docs/generated/settings/settings.html | 4 +- pkg/settings/registry.go | 5 +- pkg/storage/allocator.go | 50 ++---- pkg/storage/allocator_scorer.go | 223 +++--------------------- pkg/storage/allocator_scorer_test.go | 233 ++++---------------------- pkg/storage/allocator_test.go | 172 ++----------------- pkg/storage/replica_command.go | 2 +- pkg/storage/replicate_queue.go | 15 +- pkg/storage/replicate_queue_test.go | 2 +- pkg/storage/store.go | 2 +- pkg/storage/store_rebalancer.go | 90 +++++++--- 11 files changed, 155 insertions(+), 643 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index f54d27c27dd0..b7412c4d8846 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -17,9 +17,9 @@ jobs.registry.leniencyduration1m0sthe amount of time to defer any attempts to reschedule a job kv.allocator.lease_rebalancing_aggressivenessfloat1set greater than 1.0 to rebalance leases toward load more aggressively, or between 0 and 1.0 to be more conservative about rebalancing leases kv.allocator.load_based_lease_rebalancing.enabledbooleantrueset to enable rebalancing of range leases based on load and latency +kv.allocator.load_based_rebalancingenumeration1whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2] +kv.allocator.qps_rebalance_thresholdfloat0.25minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull kv.allocator.range_rebalance_thresholdfloat0.05minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull -kv.allocator.stat_based_rebalancing.enabledbooleantrueset to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster -kv.allocator.stat_rebalance_thresholdfloat0.2minimum fraction away from the mean a store's stats (such as queries per second) can be before it is considered overfull or underfull kv.bulk_io_write.concurrent_export_requestsinteger5number of export requests a store will handle concurrently before queuing kv.bulk_io_write.concurrent_import_requestsinteger1number of import requests a store will handle concurrently before queuing kv.bulk_io_write.max_ratebyte size8.0 EiBthe rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 8d0a89df2c16..694e73850a9c 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -34,10 +34,13 @@ var Registry = make(map[string]Setting) // When a setting is removed, it should be added to this list so that we cannot // accidentally reuse its name, potentially mis-handling older values. var retiredSettings = map[string]struct{}{ - //removed as of 2.0. + // removed as of 2.0. "kv.gc.batch_size": {}, "kv.transaction.max_intents": {}, "diagnostics.reporting.report_metrics": {}, + // removed as of 2.1. + "kv.allocator.stat_based_rebalancing.enabled": {}, + "kv.allocator.stat_rebalance_threshold": {}, } // Register adds a setting to the registry. diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index 93fac4ecafbf..91ae5fa6140f 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -90,10 +90,6 @@ var leaseRebalancingAggressiveness = settings.RegisterNonNegativeFloatSetting( 1.0, ) -func statsBasedRebalancingEnabled(st *cluster.Settings, disableStatsBasedRebalance bool) bool { - return EnableStatsBasedRebalancing.Get(&st.SV) && st.Version.IsActive(cluster.VersionStatsBasedRebalancing) && !disableStatsBasedRebalance -} - // AllocatorAction enumerates the various replication adjustments that may be // recommended by the allocator. type AllocatorAction int @@ -247,10 +243,7 @@ func MakeAllocator( // supplied range, as governed by the supplied zone configuration. It // returns the required action that should be taken and a priority. func (a *Allocator) ComputeAction( - ctx context.Context, - zone config.ZoneConfig, - rangeInfo RangeInfo, - disableStatsBasedRebalancing bool, + ctx context.Context, zone config.ZoneConfig, rangeInfo RangeInfo, ) (AllocatorAction, float64) { if a.storePool == nil { // Do nothing if storePool is nil for some unittests. @@ -341,10 +334,8 @@ func (a *Allocator) ComputeAction( } type decisionDetails struct { - Target string - Existing string `json:",omitempty"` - RangeBytes int64 `json:",omitempty"` - RangeWritesPerSecond float64 `json:",omitempty"` + Target string + Existing string `json:",omitempty"` } // AllocateTarget returns a suitable store for a new allocation with the @@ -357,12 +348,11 @@ func (a *Allocator) AllocateTarget( zone config.ZoneConfig, existing []roachpb.ReplicaDescriptor, rangeInfo RangeInfo, - disableStatsBasedRebalancing bool, ) (*roachpb.StoreDescriptor, string, error) { sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeInfo.Desc.RangeID, storeFilterThrottled) target, details := a.allocateTargetFromList( - ctx, sl, zone, existing, rangeInfo, a.scorerOptions(disableStatsBasedRebalancing)) + ctx, sl, zone, existing, rangeInfo, a.scorerOptions()) if target != nil { return target, details, nil @@ -398,10 +388,6 @@ func (a *Allocator) allocateTargetFromList( if target := candidates.selectGood(a.randGen); target != nil { log.VEventf(ctx, 3, "add target: %s", target) details := decisionDetails{Target: target.compactString(options)} - if options.statsBasedRebalancingEnabled { - details.RangeBytes = rangeInfo.LogicalBytes - details.RangeWritesPerSecond = rangeInfo.WritesPerSecond - } detailsBytes, err := json.Marshal(details) if err != nil { log.Warningf(ctx, "failed to marshal details for choosing allocate target: %s", err) @@ -418,7 +404,6 @@ func (a Allocator) simulateRemoveTarget( zone config.ZoneConfig, candidates []roachpb.ReplicaDescriptor, rangeInfo RangeInfo, - disableStatsBasedRebalancing bool, ) (roachpb.ReplicaDescriptor, string, error) { // Update statistics first // TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines, @@ -430,7 +415,7 @@ func (a Allocator) simulateRemoveTarget( a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.REMOVE_REPLICA) }() log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore) - return a.RemoveTarget(ctx, zone, candidates, rangeInfo, disableStatsBasedRebalancing) + return a.RemoveTarget(ctx, zone, candidates, rangeInfo) } // RemoveTarget returns a suitable replica to remove from the provided replica @@ -443,7 +428,6 @@ func (a Allocator) RemoveTarget( zone config.ZoneConfig, candidates []roachpb.ReplicaDescriptor, rangeInfo RangeInfo, - disableStatsBasedRebalancing bool, ) (roachpb.ReplicaDescriptor, string, error) { if len(candidates) == 0 { return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()") @@ -458,7 +442,7 @@ func (a Allocator) RemoveTarget( analyzedConstraints := analyzeConstraints( ctx, a.storePool.getStoreDescriptor, rangeInfo.Desc.Replicas, zone) - options := a.scorerOptions(disableStatsBasedRebalancing) + options := a.scorerOptions() rankedCandidates := removeCandidates( sl, analyzedConstraints, @@ -472,10 +456,6 @@ func (a Allocator) RemoveTarget( if exist.StoreID == bad.store.StoreID { log.VEventf(ctx, 3, "remove target: %s", bad) details := decisionDetails{Target: bad.compactString(options)} - if options.statsBasedRebalancingEnabled { - details.RangeBytes = rangeInfo.LogicalBytes - details.RangeWritesPerSecond = rangeInfo.WritesPerSecond - } detailsBytes, err := json.Marshal(details) if err != nil { log.Warningf(ctx, "failed to marshal details for choosing remove target: %s", err) @@ -512,7 +492,6 @@ func (a Allocator) RebalanceTarget( raftStatus *raft.Status, rangeInfo RangeInfo, filter storeFilter, - disableStatsBasedRebalancing bool, ) (*roachpb.StoreDescriptor, string) { sl, _, _ := a.storePool.getStoreList(rangeInfo.Desc.RangeID, filter) @@ -546,7 +525,7 @@ func (a Allocator) RebalanceTarget( analyzedConstraints := analyzeConstraints( ctx, a.storePool.getStoreDescriptor, rangeInfo.Desc.Replicas, zone) - options := a.scorerOptions(disableStatsBasedRebalancing) + options := a.scorerOptions() results := rebalanceCandidates( ctx, sl, @@ -602,8 +581,7 @@ func (a Allocator) RebalanceTarget( target.store.StoreID, zone, replicaCandidates, - rangeInfo, - disableStatsBasedRebalancing) + rangeInfo) if err != nil { log.Warningf(ctx, "simulating RemoveTarget failed: %s", err) return nil, "" @@ -623,10 +601,6 @@ func (a Allocator) RebalanceTarget( Target: target.compactString(options), Existing: existingCandidates.compactString(options), } - if options.statsBasedRebalancingEnabled { - details.RangeBytes = rangeInfo.LogicalBytes - details.RangeWritesPerSecond = rangeInfo.WritesPerSecond - } detailsBytes, err := json.Marshal(details) if err != nil { log.Warningf(ctx, "failed to marshal details for choosing rebalance target: %s", err) @@ -635,12 +609,10 @@ func (a Allocator) RebalanceTarget( return &target.store, string(detailsBytes) } -func (a *Allocator) scorerOptions(disableStatsBasedRebalancing bool) scorerOptions { +func (a *Allocator) scorerOptions() scorerOptions { return scorerOptions{ - deterministic: a.storePool.deterministic, - statsBasedRebalancingEnabled: statsBasedRebalancingEnabled(a.storePool.st, disableStatsBasedRebalancing), - rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV), - statRebalanceThreshold: statRebalanceThreshold.Get(&a.storePool.st.SV), + deterministic: a.storePool.deterministic, + rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV), } } diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go index 3ea0588cf010..52171520b202 100644 --- a/pkg/storage/allocator_scorer.go +++ b/pkg/storage/allocator_scorer.go @@ -54,15 +54,6 @@ const ( rebalanceToMaxFractionUsedThreshold = 0.925 ) -// EnableStatsBasedRebalancing controls whether range rebalancing takes -// additional variables such as write load and disk usage into account. -// If disabled, rebalancing is done purely based on replica count. -var EnableStatsBasedRebalancing = settings.RegisterBoolSetting( - "kv.allocator.stat_based_rebalancing.enabled", - "set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster", - true, -) - // rangeRebalanceThreshold is the minimum ratio of a store's range count to // the mean range count at which that store is considered overfull or underfull // of ranges. @@ -72,29 +63,10 @@ var rangeRebalanceThreshold = settings.RegisterNonNegativeFloatSetting( 0.05, ) -// statRebalanceThreshold is the same as rangeRebalanceThreshold, but for -// statistics other than range count. This should be larger than -// rangeRebalanceThreshold because certain stats (like keys written per second) -// are inherently less stable and thus we need to be a little more forgiving to -// avoid thrashing. -// -// Note that there isn't a ton of science behind this number, but setting it -// to .05 and .1 were shown to cause some instability in clusters without load -// on them. -// -// TODO(a-robinson): Should disk usage be held to a higher standard than this? -var statRebalanceThreshold = settings.RegisterNonNegativeFloatSetting( - "kv.allocator.stat_rebalance_threshold", - "minimum fraction away from the mean a store's stats (such as queries per second) can be before it is considered overfull or underfull", - 0.20, -) - type scorerOptions struct { - deterministic bool - balanceQPSInsteadOfCount bool - statsBasedRebalancingEnabled bool - rangeRebalanceThreshold float64 - statRebalanceThreshold float64 + deterministic bool + rangeRebalanceThreshold float64 + qpsRebalanceThreshold float64 // only considered if non-zero } type balanceDimensions struct { @@ -437,8 +409,8 @@ func allocateCandidates( diversityScore := diversityAllocateScore(s, existingNodeLocalities) balanceScore := balanceScore(sl, s.Capacity, rangeInfo, options) var convergesScore int - if options.balanceQPSInsteadOfCount { - if s.Capacity.QueriesPerSecond < underfullStatThreshold(options, sl.candidateQueriesPerSecond.mean) { + if options.qpsRebalanceThreshold > 0 { + if s.Capacity.QueriesPerSecond < underfullThreshold(sl.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) { convergesScore = 1 } else if s.Capacity.QueriesPerSecond < sl.candidateQueriesPerSecond.mean { convergesScore = 0 @@ -489,7 +461,7 @@ func removeCandidates( diversityScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities) balanceScore := balanceScore(sl, s.Capacity, rangeInfo, options) var convergesScore int - if !rebalanceFromConvergesOnMean(sl, s.Capacity, rangeInfo, options) { + if !rebalanceFromConvergesOnMean(sl, s.Capacity) { // If removing this candidate replica does not converge the store // stats to their means, we make it less attractive for removal by // adding 1 to the constraint score. Note that when selecting a @@ -677,7 +649,7 @@ func rebalanceCandidates( } // TODO(a-robinson): Some moderate refactoring could extract this logic out // into the loop below, avoiding duplicate balanceScore calculations. - if shouldRebalance(ctx, existing.cand.store, sl, rangeInfo, options) { + if shouldRebalance(ctx, existing.cand.store, sl, options) { shouldRebalanceCheck = true break } @@ -708,7 +680,7 @@ func rebalanceCandidates( } balanceScore := balanceScore(comparable.sl, existing.cand.store.Capacity, rangeInfo, options) var convergesScore int - if !rebalanceFromConvergesOnMean(comparable.sl, existing.cand.store.Capacity, rangeInfo, options) { + if !rebalanceFromConvergesOnMean(comparable.sl, existing.cand.store.Capacity) { // Similarly to in removeCandidates, any replica whose removal // would not converge the range stats to their means is given a // constraint score boost of 1 to make it less attractive for @@ -733,7 +705,7 @@ func rebalanceCandidates( s := cand.store cand.fullDisk = !rebalanceToMaxCapacityCheck(s) cand.balanceScore = balanceScore(comparable.sl, s.Capacity, rangeInfo, options) - if rebalanceToConvergesOnMean(comparable.sl, s.Capacity, rangeInfo, options) { + if rebalanceToConvergesOnMean(comparable.sl, s.Capacity) { // This is the counterpart of !rebalanceFromConvergesOnMean from // the existing candidates. Candidates whose addition would // converge towards the range count mean are promoted. @@ -841,19 +813,6 @@ func betterRebalanceTarget(target1, existing1, target2, existing2 *candidate) *c // shouldRebalance returns whether the specified store is a candidate for // having a replica removed from it given the candidate store list. func shouldRebalance( - ctx context.Context, - store roachpb.StoreDescriptor, - sl StoreList, - rangeInfo RangeInfo, - options scorerOptions, -) bool { - return shouldRebalanceNoStats(ctx, store, sl, options) -} - -// shouldRebalance implements the decision of whether to rebalance for the case -// when stats-based rebalancing is disabled and decisions should thus be -// made based only on range counts. -func shouldRebalanceNoStats( ctx context.Context, store roachpb.StoreDescriptor, sl StoreList, options scorerOptions, ) bool { overfullThreshold := int32(math.Ceil(overfullRangeThreshold(options, sl.candidateRanges.mean))) @@ -1252,10 +1211,6 @@ const ( underfull rangeCountStatus = 1 ) -func oppositeStatus(rcs rangeCountStatus) rangeCountStatus { - return -rcs -} - // balanceScore returns an arbitrarily scaled score where higher scores are for // stores where the range is a better fit based on various balance factors // like range count, disk usage, and QPS. @@ -1273,165 +1228,31 @@ func balanceScore( return dimensions } -// balanceContribution generates a single dimension's contribution to a range's -// balanceScore, where larger values mean a store is a better fit for a given -// range. -func balanceContribution( - options scorerOptions, - rcs rangeCountStatus, - mean float64, - storeVal float64, - percentiles roachpb.Percentiles, - rangeVal float64, -) float64 { - if storeVal > overfullStatThreshold(options, mean) { - return percentileScore(rcs, percentiles, rangeVal) - } else if storeVal < underfullStatThreshold(options, mean) { - // To ensure that we behave symmetrically when underfull compared to - // when we're overfull, inverse both the rangeCountStatus and the - // result returned by percentileScore. This makes it so that being - // overfull on ranges and on the given dimension behaves symmetrically to - // being underfull on ranges and the given dimension (and ditto for - // overfull on ranges and underfull on a dimension, etc.). - return -percentileScore(oppositeStatus(rcs), percentiles, rangeVal) - } - return 0 -} - -// percentileScore returns a score for how desirable it is to put a range -// onto a particular store given the assumption that the store is overfull -// along a particular dimension. Takes as parameters: -// * How the number of ranges on the store compares to the norm -// * The distribution of values in the store for the dimension -// * The range's value for the dimension -// A higher score means that the range is a better fit for the store. -func percentileScore( - rcs rangeCountStatus, percentiles roachpb.Percentiles, rangeVal float64, -) float64 { - // Note that there is not any great research behind these values. If they're - // causing thrashing or a bad imbalance, rethink them and modify them as - // appropriate. - if rcs == balanced { - // If the range count is balanced, we should prefer ranges that are - // very small on this particular dimension to try to rebalance this dimension - // without messing up the replica counts. - if rangeVal < percentiles.P10 { - return 1 - } else if rangeVal < percentiles.P25 { - return 0.5 - } else if rangeVal > percentiles.P90 { - return -1 - } else if rangeVal > percentiles.P75 { - return -0.5 - } - // else rangeVal >= percentiles.P25 && rangeVal <= percentiles.P75 - // It may be better to return more than 0 here, since taking on an - // average range isn't necessarily bad, but for now let's see how this works. - return 0 - } else if rcs == overfull { - // If this store has too many ranges, we're ok with moving any range that's - // at least somewhat sizable in this dimension, since we want to reduce both - // the range count and this metric. Moving extreme outliers may be less - // desirable, though, so favor very heavy ranges slightly less and disfavor - // very light ranges. - // - // Note that we can't truly disfavor large ranges, since that prevents us - // from rebalancing nonempty ranges to empty stores (since all nonempty - // ranges will be greater than an empty store's P90). - if rangeVal > percentiles.P90 { - return -0.5 - } else if rangeVal >= percentiles.P25 { - return -1 - } else if rangeVal >= percentiles.P10 { - return 0 - } - // else rangeVal < percentiles.P10 - return 0.5 - } else if rcs == underfull { - // If this store has too few ranges but is overloaded on some other - // dimension, we need to prioritize moving away replicas that are - // high in that dimension and accepting replicas that are low in it. - if rangeVal < percentiles.P10 { - return 1 - } else if rangeVal < percentiles.P25 { - return 0.5 - } else if rangeVal > percentiles.P90 { - return -1 - } else if rangeVal > percentiles.P75 { - return -0.5 - } - // else rangeVal >= percentiles.P25 && rangeVal <= percentiles.P75 - return 0 - } - panic(fmt.Sprintf("reached unreachable code: %+v; %+v; %+v", rcs, percentiles, rangeVal)) -} - -func rangeIsGoodFit(bd balanceDimensions) bool { - // A score greater than 1 means that more than one dimension improves - // without being canceled out by the third, since each dimension can only - // contribute a value from [-1,1] to the score. - return bd.totalScore() > 1 -} - -func rangeIsBadFit(bd balanceDimensions) bool { - // This is the same logic as for rangeIsGoodFit, just reversed. - return bd.totalScore() < -1 -} - -func rangeIsPoorFit(bd balanceDimensions) bool { - // A score less than -0.5 isn't a great fit for a range, since the - // bad dimensions outweigh the good by at least one entire dimension. - return bd.totalScore() < -0.5 -} - func overfullRangeThreshold(options scorerOptions, mean float64) float64 { - return mean * (1 + options.rangeRebalanceThreshold) + return overfullThreshold(mean, options.rangeRebalanceThreshold) } func underfullRangeThreshold(options scorerOptions, mean float64) float64 { - return mean * (1 - options.rangeRebalanceThreshold) + return underfullThreshold(mean, options.rangeRebalanceThreshold) } -func overfullStatThreshold(options scorerOptions, mean float64) float64 { - return mean * (1 + options.statRebalanceThreshold) +func overfullThreshold(mean float64, thresholdFraction float64) float64 { + return mean * (1 + thresholdFraction) } -func underfullStatThreshold(options scorerOptions, mean float64) float64 { - return mean * (1 - options.statRebalanceThreshold) +func underfullThreshold(mean float64, thresholdFraction float64) float64 { + return mean * (1 - thresholdFraction) } -func rebalanceFromConvergesOnMean( - sl StoreList, sc roachpb.StoreCapacity, rangeInfo RangeInfo, options scorerOptions, -) bool { - return rebalanceConvergesOnMean( - sl, - sc, - sc.RangeCount-1, - sc.LogicalBytes-rangeInfo.LogicalBytes, - sc.WritesPerSecond-rangeInfo.WritesPerSecond, - options) +func rebalanceFromConvergesOnMean(sl StoreList, sc roachpb.StoreCapacity) bool { + return rebalanceConvergesOnMean(sl, sc, sc.RangeCount-1) } -func rebalanceToConvergesOnMean( - sl StoreList, sc roachpb.StoreCapacity, rangeInfo RangeInfo, options scorerOptions, -) bool { - return rebalanceConvergesOnMean( - sl, - sc, - sc.RangeCount+1, - sc.LogicalBytes+rangeInfo.LogicalBytes, - sc.WritesPerSecond+rangeInfo.WritesPerSecond, - options) +func rebalanceToConvergesOnMean(sl StoreList, sc roachpb.StoreCapacity) bool { + return rebalanceConvergesOnMean(sl, sc, sc.RangeCount+1) } -func rebalanceConvergesOnMean( - sl StoreList, - sc roachpb.StoreCapacity, - newRangeCount int32, - newLogicalBytes int64, - newWritesPerSecond float64, - options scorerOptions, -) bool { +func rebalanceConvergesOnMean(sl StoreList, sc roachpb.StoreCapacity, newRangeCount int32) bool { return convergesOnMean(float64(sc.RangeCount), float64(newRangeCount), sl.candidateRanges.mean) } @@ -1439,10 +1260,6 @@ func convergesOnMean(oldVal, newVal, mean float64) bool { return math.Abs(newVal-mean) < math.Abs(oldVal-mean) } -func divergesFromMean(oldVal, newVal, mean float64) bool { - return math.Abs(newVal-mean) > math.Abs(oldVal-mean) -} - // maxCapacityCheck returns true if the store has room for a new replica. func maxCapacityCheck(store roachpb.StoreDescriptor) bool { return store.Capacity.FractionUsed() < maxFractionUsedThreshold diff --git a/pkg/storage/allocator_scorer_test.go b/pkg/storage/allocator_scorer_test.go index fca6e2bd0436..50b6a240bf1c 100644 --- a/pkg/storage/allocator_scorer_test.go +++ b/pkg/storage/allocator_scorer_test.go @@ -1058,10 +1058,7 @@ func TestRemoveConstraintsCheck(t *testing.T) { func TestShouldRebalanceDiversity(t *testing.T) { defer leaktest.AfterTest(t)() - options := scorerOptions{ - statsBasedRebalancingEnabled: true, - } - + options := scorerOptions{} newStore := func(id int, locality roachpb.Locality) roachpb.StoreDescriptor { return roachpb.StoreDescriptor{ StoreID: roachpb.StoreID(id), @@ -1473,14 +1470,9 @@ func TestDiversityScoreEquivalence(t *testing.T) { func TestBalanceScore(t *testing.T) { defer leaktest.AfterTest(t)() - options := scorerOptions{ - statsBasedRebalancingEnabled: true, - } - + options := scorerOptions{} storeList := StoreList{ - candidateRanges: stat{mean: 1000}, - candidateLogicalBytes: stat{mean: 512 * 1024 * 1024}, - candidateWritesPerSecond: stat{mean: 1000}, + candidateRanges: stat{mean: 1000}, } sEmpty := roachpb.StoreCapacity{ @@ -1489,146 +1481,30 @@ func TestBalanceScore(t *testing.T) { LogicalBytes: 0, } sMean := roachpb.StoreCapacity{ - Capacity: 1024 * 1024 * 1024, - Available: 512 * 1024 * 1024, - LogicalBytes: 512 * 1024 * 1024, - RangeCount: 1000, - WritesPerSecond: 1000, - BytesPerReplica: roachpb.Percentiles{ - P10: 100 * 1024, - P25: 250 * 1024, - P50: 500 * 1024, - P75: 750 * 1024, - P90: 1000 * 1024, - }, - WritesPerReplica: roachpb.Percentiles{ - P10: 1, - P25: 2.5, - P50: 5, - P75: 7.5, - P90: 10, - }, + Capacity: 1024 * 1024 * 1024, + Available: 512 * 1024 * 1024, + LogicalBytes: 512 * 1024 * 1024, + RangeCount: 1000, } sRangesOverfull := sMean sRangesOverfull.RangeCount = 1500 sRangesUnderfull := sMean sRangesUnderfull.RangeCount = 500 - sBytesOverfull := sMean - sBytesOverfull.Available = 256 * 1024 * 1024 - sBytesOverfull.LogicalBytes = sBytesOverfull.Capacity - sBytesOverfull.Available - sBytesUnderfull := sMean - sBytesUnderfull.Available = 768 * 1024 * 1024 - sBytesUnderfull.LogicalBytes = sBytesUnderfull.Capacity - sBytesUnderfull.Available - sRangesOverfullBytesOverfull := sRangesOverfull - sRangesOverfullBytesOverfull.Available = 256 * 1024 * 1024 - sRangesOverfullBytesOverfull.LogicalBytes = - sRangesOverfullBytesOverfull.Capacity - sRangesOverfullBytesOverfull.Available - sRangesUnderfullBytesUnderfull := sRangesUnderfull - sRangesUnderfullBytesUnderfull.Available = 768 * 1024 * 1024 - sRangesUnderfullBytesUnderfull.LogicalBytes = - sRangesUnderfullBytesUnderfull.Capacity - sRangesUnderfullBytesUnderfull.Available - sRangesUnderfullBytesOverfull := sRangesUnderfull - sRangesUnderfullBytesOverfull.Available = 256 * 1024 * 1024 - sRangesUnderfullBytesOverfull.LogicalBytes = - sRangesUnderfullBytesOverfull.Capacity - sRangesUnderfullBytesOverfull.Available - sRangesOverfullBytesUnderfull := sRangesOverfull - sRangesOverfullBytesUnderfull.Available = 768 * 1024 * 1024 - sRangesOverfullBytesUnderfull.LogicalBytes = - sRangesOverfullBytesUnderfull.Capacity - sRangesOverfullBytesUnderfull.Available - sRangesUnderfullBytesOverfullWritesOverfull := sRangesUnderfullBytesOverfull - sRangesUnderfullBytesOverfullWritesOverfull.WritesPerSecond = 1500 - sRangesUnderfullBytesUnderfullWritesOverfull := sRangesUnderfullBytesUnderfull - sRangesUnderfullBytesUnderfullWritesOverfull.WritesPerSecond = 1500 - - rEmpty := RangeInfo{} - rMedian := RangeInfo{ - LogicalBytes: 500 * 1024, - WritesPerSecond: 5, - } - rHighBytes := rMedian - rHighBytes.LogicalBytes = 2000 * 1024 - rLowBytes := rMedian - rLowBytes.LogicalBytes = 50 * 1024 - rHighBytesHighWrites := rHighBytes - rHighBytesHighWrites.WritesPerSecond = 20 - rHighBytesLowWrites := rHighBytes - rHighBytesLowWrites.WritesPerSecond = 0.5 - rLowBytesHighWrites := rLowBytes - rLowBytesHighWrites.WritesPerSecond = 20 - rLowBytesLowWrites := rLowBytes - rLowBytesLowWrites.WritesPerSecond = 0.5 - rHighWrites := rMedian - rHighWrites.WritesPerSecond = 20 - rLowWrites := rMedian - rLowWrites.WritesPerSecond = 0.5 + + ri := RangeInfo{} testCases := []struct { sc roachpb.StoreCapacity - ri RangeInfo expected float64 }{ - {sEmpty, rEmpty, 3}, - {sEmpty, rMedian, 2}, - {sEmpty, rHighBytes, 2}, - {sEmpty, rLowBytes, 2}, - {sMean, rEmpty, 0}, - {sMean, rMedian, 0}, - {sMean, rHighBytes, 0}, - {sMean, rLowBytes, 0}, - {sRangesOverfull, rEmpty, -1}, - {sRangesOverfull, rMedian, -1}, - {sRangesOverfull, rHighBytes, -1}, - {sRangesOverfull, rLowBytes, -1}, - {sRangesUnderfull, rEmpty, 1}, - {sRangesUnderfull, rMedian, 1}, - {sRangesUnderfull, rHighBytes, 1}, - {sRangesUnderfull, rLowBytes, 1}, - {sBytesOverfull, rEmpty, 1}, - {sBytesOverfull, rMedian, 0}, - {sBytesOverfull, rHighBytes, -1}, - {sBytesOverfull, rLowBytes, 1}, - {sBytesUnderfull, rEmpty, -1}, - {sBytesUnderfull, rMedian, 0}, - {sBytesUnderfull, rHighBytes, 1}, - {sBytesUnderfull, rLowBytes, -1}, - {sRangesOverfullBytesOverfull, rEmpty, -.5}, - {sRangesOverfullBytesOverfull, rMedian, -2}, - {sRangesOverfullBytesOverfull, rHighBytes, -1.5}, - {sRangesOverfullBytesOverfull, rLowBytes, -.5}, - {sRangesUnderfullBytesUnderfull, rEmpty, .5}, - {sRangesUnderfullBytesUnderfull, rMedian, 2}, - {sRangesUnderfullBytesUnderfull, rHighBytes, 1.5}, - {sRangesUnderfullBytesUnderfull, rLowBytes, .5}, - {sRangesUnderfullBytesOverfull, rEmpty, 2}, - {sRangesUnderfullBytesOverfull, rMedian, 1}, - {sRangesUnderfullBytesOverfull, rHighBytes, 0}, - {sRangesUnderfullBytesOverfull, rLowBytes, 2}, - {sRangesOverfullBytesUnderfull, rEmpty, -2}, - {sRangesOverfullBytesUnderfull, rMedian, -1}, - {sRangesOverfullBytesUnderfull, rHighBytes, 0}, - {sRangesOverfullBytesUnderfull, rLowBytes, -2}, - {sRangesUnderfullBytesOverfullWritesOverfull, rEmpty, 3}, - {sRangesUnderfullBytesOverfullWritesOverfull, rMedian, 1}, - {sRangesUnderfullBytesOverfullWritesOverfull, rHighBytes, 0}, - {sRangesUnderfullBytesOverfullWritesOverfull, rHighBytesHighWrites, -1}, - {sRangesUnderfullBytesOverfullWritesOverfull, rHighBytesLowWrites, 1}, - {sRangesUnderfullBytesOverfullWritesOverfull, rLowBytes, 2}, - {sRangesUnderfullBytesOverfullWritesOverfull, rLowBytesHighWrites, 1}, - {sRangesUnderfullBytesOverfullWritesOverfull, rLowBytesLowWrites, 3}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rEmpty, 1.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rMedian, 2}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rHighBytes, 1.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rHighBytesHighWrites, 0.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rHighBytesLowWrites, 2.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rLowBytes, 0.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rLowBytesHighWrites, -0.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rLowBytesLowWrites, 1.5}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rHighWrites, 1}, - {sRangesUnderfullBytesUnderfullWritesOverfull, rLowWrites, 3}, + {sEmpty, 1}, + {sMean, 0}, + {sRangesOverfull, -1}, + {sRangesUnderfull, 1}, } for i, tc := range testCases { - if a, e := balanceScore(storeList, tc.sc, tc.ri, options), tc.expected; a.totalScore() != e { - t.Errorf("%d: balanceScore(storeList, %+v, %+v) got %s; want %.2f", i, tc.sc, tc.ri, a, e) + if a, e := balanceScore(storeList, tc.sc, ri, options), tc.expected; a.totalScore() != e { + t.Errorf("%d: balanceScore(storeList, %+v) got %s; want %.2f", i, tc.sc, a, e) } } } @@ -1636,77 +1512,34 @@ func TestBalanceScore(t *testing.T) { func TestRebalanceConvergesOnMean(t *testing.T) { defer leaktest.AfterTest(t)() - options := scorerOptions{ - statsBasedRebalancingEnabled: true, - } - - const diskCapacity = 2000 storeList := StoreList{ - candidateRanges: stat{mean: 1000}, - candidateLogicalBytes: stat{mean: 1000}, - candidateWritesPerSecond: stat{mean: 1000}, - } - emptyRange := RangeInfo{} - normalRange := RangeInfo{ - LogicalBytes: 10, - WritesPerSecond: 10, - } - outlierRange := RangeInfo{ - LogicalBytes: 10, - WritesPerSecond: 10000, + candidateRanges: stat{mean: 1000}, } testCases := []struct { - rangeCount int32 - liveBytes int64 - writesPerSecond float64 - ri RangeInfo - toConverges bool - fromConverges bool + rangeCount int32 + toConverges bool + fromConverges bool }{ - {0, 0, 0, emptyRange, true, false}, - {900, 900, 900, emptyRange, true, false}, - {900, 900, 2000, emptyRange, true, false}, - {999, 1000, 1000, emptyRange, true, false}, - {1000, 1000, 1000, emptyRange, false, false}, - {1001, 1000, 1000, emptyRange, false, true}, - {2000, 2000, 2000, emptyRange, false, true}, - {900, 2000, 2000, emptyRange, true, false}, - {0, 0, 0, normalRange, true, false}, - {900, 900, 900, normalRange, true, false}, - {900, 900, 2000, normalRange, true, false}, - {999, 1000, 1000, normalRange, false, false}, - {2000, 2000, 2000, normalRange, false, true}, - {900, 2000, 2000, normalRange, false, true}, - {1000, 990, 990, normalRange, true, false}, - {1000, 994, 994, normalRange, true, false}, - {1000, 990, 995, normalRange, false, false}, - {1000, 1010, 1010, normalRange, false, true}, - {1000, 1010, 1005, normalRange, false, false}, - {0, 0, 0, outlierRange, true, false}, - {900, 900, 900, outlierRange, true, false}, - {900, 900, 2000, outlierRange, true, false}, - {999, 1000, 1000, outlierRange, false, false}, - {2000, 2000, 10000, outlierRange, false, true}, - {900, 2000, 10000, outlierRange, false, true}, - {1000, 990, 990, outlierRange, false, false}, - {1000, 1000, 10000, outlierRange, false, false}, - {1000, 1010, 10000, outlierRange, false, true}, - {1001, 1010, 1005, outlierRange, false, true}, + {0, true, false}, + {900, true, false}, + {900, true, false}, + {999, true, false}, + {1000, false, false}, + {1001, false, true}, + {2000, false, true}, + {900, true, false}, } + for i, tc := range testCases { sc := roachpb.StoreCapacity{ - Capacity: diskCapacity, - Available: diskCapacity - tc.liveBytes, - LogicalBytes: tc.liveBytes, - RangeCount: tc.rangeCount, - WritesPerSecond: tc.writesPerSecond, + RangeCount: tc.rangeCount, } - if a, e := rebalanceToConvergesOnMean(storeList, sc, tc.ri, options), tc.toConverges; a != e { - t.Errorf("%d: rebalanceToConvergesOnMean(storeList, %+v, %+v) got %t; want %t", i, sc, tc.ri, a, e) + if a, e := rebalanceToConvergesOnMean(storeList, sc), tc.toConverges; a != e { + t.Errorf("%d: rebalanceToConvergesOnMean(storeList, %+v) got %t; want %t", i, sc, a, e) } - if a, e := rebalanceFromConvergesOnMean(storeList, sc, tc.ri, options), tc.fromConverges; a != e { - t.Errorf("%d: rebalanceFromConvergesOnMean(storeList, %+v, %+v) got %t; want %t", i, sc, tc.ri, a, e) + if a, e := rebalanceFromConvergesOnMean(storeList, sc), tc.fromConverges; a != e { + t.Errorf("%d: rebalanceFromConvergesOnMean(storeList, %+v) got %t; want %t", i, sc, a, e) } } } diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index cabea157392c..98df381a78ac 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -397,7 +397,6 @@ func TestAllocatorSimpleRetrieval(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if err != nil { t.Fatalf("Unable to perform allocation: %v", err) @@ -431,7 +430,6 @@ func TestAllocatorCorruptReplica(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if err != nil { t.Fatal(err) @@ -451,7 +449,6 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if result != nil { t.Errorf("expected nil result: %+v", result) @@ -473,7 +470,6 @@ func TestAllocatorTwoDatacenters(t *testing.T) { multiDCConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if err != nil { t.Fatalf("Unable to perform allocation: %v", err) @@ -486,7 +482,6 @@ func TestAllocatorTwoDatacenters(t *testing.T) { StoreID: result1.StoreID, }}, firstRangeInfo, - false, ) if err != nil { t.Fatalf("Unable to perform allocation: %v", err) @@ -511,7 +506,6 @@ func TestAllocatorTwoDatacenters(t *testing.T) { }, }, firstRangeInfo, - false, ) if err == nil { t.Errorf("expected error on allocation without available stores: %+v", result3) @@ -543,7 +537,6 @@ func TestAllocatorExistingReplica(t *testing.T) { }, }, firstRangeInfo, - false, ) if err != nil { t.Fatalf("Unable to perform allocation: %v", err) @@ -604,9 +597,6 @@ func TestAllocatorRebalance(t *testing.T) { stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) defer stopper.Stop(context.Background()) - st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, false) - gossiputil.NewStoreGossiper(g).GossipStores(stores, t) ctx := context.Background() @@ -618,7 +608,6 @@ func TestAllocatorRebalance(t *testing.T) { nil, testRangeInfo([]roachpb.ReplicaDescriptor{{NodeID: 3, StoreID: 3}}, firstRange), storeFilterThrottled, - false, ) if result == nil { i-- // loop until we find 10 candidates @@ -638,7 +627,7 @@ func TestAllocatorRebalance(t *testing.T) { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } sl, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled) - result := shouldRebalance(ctx, desc, sl, firstRangeInfo, a.scorerOptions(false)) + result := shouldRebalance(ctx, desc, sl, a.scorerOptions()) if expResult := (i >= 2); expResult != result { t.Errorf("%d: expected rebalance %t; got %t; desc %+v; sl: %+v", i, expResult, result, desc, sl) } @@ -735,8 +724,6 @@ func TestAllocatorRebalanceTarget(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(stores, t) - st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, false) replicas := []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1}, {NodeID: 4, StoreID: 4}, @@ -773,7 +760,6 @@ func TestAllocatorRebalanceTarget(t *testing.T) { status, rangeInfo, storeFilterThrottled, - false, ) if result != nil { t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) @@ -793,7 +779,6 @@ func TestAllocatorRebalanceTarget(t *testing.T) { status, rangeInfo, storeFilterThrottled, - false, ) if result != nil { t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) @@ -810,7 +795,6 @@ func TestAllocatorRebalanceTarget(t *testing.T) { status, rangeInfo, storeFilterThrottled, - false, ) if result == nil || result.StoreID != stores[1].StoreID { t.Fatalf("%d: expected rebalance to s%d, but got %v; details: %s", @@ -826,8 +810,6 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { ctx := context.Background() defer stopper.Stop(ctx) - EnableStatsBasedRebalancing.Override(&sp.st.SV, false) - mockStorePool( sp, []roachpb.StoreID{1, 2, 3, 4, 5, 6}, @@ -887,8 +869,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { config.ZoneConfig{}, nil, testRangeInfo(c.existing, firstRange), - storeFilterThrottled, - false) + storeFilterThrottled) if c.expected > 0 { if result == nil { t.Fatalf("expected %d, but found nil", c.expected) @@ -988,8 +969,6 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { defer stopper.Stop(context.Background()) st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, false) - cluster := tc.cluster(st) // It doesn't make sense to test sets of stores containing fewer than 4 @@ -1031,7 +1010,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { if !ok { t.Fatalf("[store %d]: unable to get store %d descriptor", j, store.StoreID) } - if a, e := shouldRebalance(context.Background(), desc, sl, firstRangeInfo, a.scorerOptions(false)), cluster[j].shouldRebalanceFrom; a != e { + if a, e := shouldRebalance(context.Background(), desc, sl, a.scorerOptions()), cluster[j].shouldRebalanceFrom; a != e { t.Errorf("[store %d]: shouldRebalance %t != expected %t", store.StoreID, a, e) } } @@ -1072,9 +1051,6 @@ func TestAllocatorRebalanceByCount(t *testing.T) { stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) defer stopper.Stop(context.Background()) - st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, false) - gossiputil.NewStoreGossiper(g).GossipStores(stores, t) ctx := context.Background() @@ -1086,7 +1062,6 @@ func TestAllocatorRebalanceByCount(t *testing.T) { nil, testRangeInfo([]roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, firstRange), storeFilterThrottled, - false, ) if result != nil && result.StoreID != 4 { t.Errorf("expected store 4; got %d", result.StoreID) @@ -1100,7 +1075,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } sl, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled) - result := shouldRebalance(ctx, desc, sl, firstRangeInfo, a.scorerOptions(false)) + result := shouldRebalance(ctx, desc, sl, a.scorerOptions()) if expResult := (i < 3); expResult != result { t.Errorf("%d: expected rebalance %t; got %t", i, expResult, result) } @@ -1248,7 +1223,6 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { defer leaktest.AfterTest(t)() stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) - EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false) ctx := context.Background() defer stopper.Stop(ctx) @@ -1367,7 +1341,6 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { nil, /* raftStatus */ testRangeInfo(tc.existing, firstRange), storeFilterThrottled, - false, ) var resultID roachpb.StoreID if result != nil { @@ -1436,7 +1409,6 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { nil, /* raftStatus */ testRangeInfo(tc.existing, firstRange), storeFilterThrottled, - false, ) var gotExpected bool if result == nil { @@ -1935,7 +1907,6 @@ func TestAllocatorRemoveTargetLocality(t *testing.T) { config.ZoneConfig{}, existingRepls, testRangeInfo(existingRepls, firstRange), - false, ) if err != nil { t.Fatal(err) @@ -2019,7 +1990,6 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { config.ZoneConfig{}, existingRepls, testRangeInfo(existingRepls, firstRange), - false, ) if err != nil { t.Fatal(err) @@ -2041,7 +2011,6 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { defer leaktest.AfterTest(t)() stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) - EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false) defer stopper.Stop(context.Background()) stores := []*roachpb.StoreDescriptor{ @@ -2143,7 +2112,6 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { nil, testRangeInfo(existingRepls, firstRange), storeFilterThrottled, - false, ) if targetStore == nil { t.Fatalf("%d: RebalanceTarget(%v) returned no target store; details: %s", i, c.existing, details) @@ -2494,7 +2462,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { existingRepls, rangeInfo, a.storePool.getLocalities(existingRepls), - a.scorerOptions(false), + a.scorerOptions(), ) best := candidates.best() match := true @@ -2718,7 +2686,7 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { analyzed, rangeInfo, a.storePool.getLocalities(existingRepls), - a.scorerOptions(false), + a.scorerOptions(), ) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { t.Errorf("%d: expected removeCandidates(%v) = %v, but got %v", @@ -3515,7 +3483,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { rangeInfo, a.storePool.getLocalities(existingRepls), a.storePool.getNodeLocalityString, - a.scorerOptions(false), + a.scorerOptions(), ) match := true if len(tc.expected) != len(results) { @@ -3539,7 +3507,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { // Also verify that RebalanceTarget picks out one of the best options as // the final rebalance choice. target, details := a.RebalanceTarget( - context.Background(), zone, nil, rangeInfo, storeFilterThrottled, false) + context.Background(), zone, nil, rangeInfo, storeFilterThrottled) var found bool if target == nil && len(tc.validTargets) == 0 { found = true @@ -3905,9 +3873,6 @@ func TestAllocatorRemoveTarget(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(stores, t) - st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, false) - // Repeat this test 10 times, it should always be either store 2 or 3. for i := 0; i < 10; i++ { targetRepl, _, err := a.RemoveTarget( @@ -3915,7 +3880,6 @@ func TestAllocatorRemoveTarget(t *testing.T) { config.ZoneConfig{}, replicas, testRangeInfo(replicas, firstRange), - false, ) if err != nil { t.Fatal(err) @@ -4400,7 +4364,7 @@ func TestAllocatorComputeAction(t *testing.T) { lastPriority := float64(999999999) for i, tcase := range testCases { - action, priority := a.ComputeAction(ctx, tcase.zone, RangeInfo{Desc: &tcase.desc}, false) + action, priority := a.ComputeAction(ctx, tcase.zone, RangeInfo{Desc: &tcase.desc}) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %q, got action %q", i, allocatorActionNames[tcase.expectedAction], allocatorActionNames[action]) @@ -4413,108 +4377,6 @@ func TestAllocatorComputeAction(t *testing.T) { } } -// TestAllocatorComputeActionDisableStatsRebalance is used to verify whether the cluster could balance out -// if we disable stats-based-rebalance. -func TestAllocatorRebalanceTargetDisableStatsRebalance(t *testing.T) { - defer leaktest.AfterTest(t)() - ctx := context.Background() - stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) - defer stopper.Stop(ctx) - - st := a.storePool.st - EnableStatsBasedRebalancing.Override(&st.SV, true) - // Make sure the cluster's initial status is not balanced. - // store 1, 2, 3 has 200 replicas, store 4 has only 30 replicas. - // So obviously, if we want the replica is homogeneous within each store, - // we should rebalance some replicas from store 1, 2, 3 to store 4. - // However, we set the WritesPerSecond of store 4 is much bigger than other store, - // so if we enable stats-based rebalance, we couldn't spread out the replicas on store 1, 2, 3. - stores := []*roachpb.StoreDescriptor{ - { - StoreID: 1, - Node: roachpb.NodeDescriptor{NodeID: 1}, - Capacity: roachpb.StoreCapacity{ - RangeCount: 200, - WritesPerSecond: 30, - }, - }, - { - StoreID: 2, - Node: roachpb.NodeDescriptor{NodeID: 2}, - Capacity: roachpb.StoreCapacity{ - RangeCount: 200, - WritesPerSecond: 30, - }, - }, - { - StoreID: 3, - Node: roachpb.NodeDescriptor{NodeID: 3}, - Capacity: roachpb.StoreCapacity{ - RangeCount: 200, - WritesPerSecond: 30, - }, - }, - { - StoreID: 4, - Node: roachpb.NodeDescriptor{NodeID: 4}, - Capacity: roachpb.StoreCapacity{ - RangeCount: 30, - WritesPerSecond: 100, - }, - }, - } - sg := gossiputil.NewStoreGossiper(g) - sg.GossipStores(stores, t) - - desc := roachpb.RangeDescriptor{ - RangeID: firstRange, - Replicas: []roachpb.ReplicaDescriptor{ - { - StoreID: 1, - NodeID: 1, - ReplicaID: 1, - }, - { - StoreID: 2, - NodeID: 2, - ReplicaID: 2, - }, - { - StoreID: 3, - NodeID: 3, - ReplicaID: 3, - }, - }, - } - for i := 0; i < 50; i++ { - target, _ := a.RebalanceTarget( - context.Background(), - config.ZoneConfig{}, - nil, - testRangeInfo(desc.Replicas, desc.RangeID), - storeFilterThrottled, - false, /* disableStatsBasedRebalancing */ - ) - if target != nil { - t.Errorf("expected no balance, but got %d", target.StoreID) - } - } - - for i := 0; i < 50; i++ { - target, _ := a.RebalanceTarget( - context.Background(), - config.ZoneConfig{}, - nil, - testRangeInfo(desc.Replicas, desc.RangeID), - storeFilterThrottled, - true, /* disableStatsBasedRebalancing */ - ) - if expectedStore := roachpb.StoreID(4); target.StoreID != expectedStore { - t.Errorf("expected balance to %d, but got %d", expectedStore, target.StoreID) - } - } -} - func TestAllocatorComputeActionRemoveDead(t *testing.T) { defer leaktest.AfterTest(t)() @@ -4599,7 +4461,7 @@ func TestAllocatorComputeActionRemoveDead(t *testing.T) { for i, tcase := range testCases { mockStorePool(sp, tcase.live, tcase.dead, nil, nil, nil) - action, _ := a.ComputeAction(ctx, zone, RangeInfo{Desc: &tcase.desc}, false) + action, _ := a.ComputeAction(ctx, zone, RangeInfo{Desc: &tcase.desc}) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) } @@ -4820,7 +4682,7 @@ func TestAllocatorComputeActionDecommission(t *testing.T) { for i, tcase := range testCases { mockStorePool(sp, tcase.live, tcase.dead, tcase.decommissioning, tcase.decommissioned, nil) - action, _ := a.ComputeAction(ctx, tcase.zone, RangeInfo{Desc: &tcase.desc}, false) + action, _ := a.ComputeAction(ctx, tcase.zone, RangeInfo{Desc: &tcase.desc}) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) continue @@ -4834,7 +4696,7 @@ func TestAllocatorComputeActionNoStorePool(t *testing.T) { defer leaktest.AfterTest(t)() a := MakeAllocator(nil /* storePool */, nil /* rpcContext */) - action, priority := a.ComputeAction(context.Background(), config.ZoneConfig{}, RangeInfo{}, false) + action, priority := a.ComputeAction(context.Background(), config.ZoneConfig{}, RangeInfo{}) if action != AllocatorNoop { t.Errorf("expected AllocatorNoop, but got %v", action) } @@ -4902,7 +4764,6 @@ func TestAllocatorThrottled(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if _, ok := err.(purgatoryError); !ok { t.Fatalf("expected a purgatory error, got: %v", err) @@ -4915,7 +4776,6 @@ func TestAllocatorThrottled(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if err != nil { t.Fatalf("unable to perform allocation: %v", err) @@ -4938,7 +4798,6 @@ func TestAllocatorThrottled(t *testing.T) { simpleZoneConfig, []roachpb.ReplicaDescriptor{}, firstRangeInfo, - false, ) if _, ok := err.(purgatoryError); ok { t.Fatalf("expected a non purgatory error, got: %v", err) @@ -5205,7 +5064,6 @@ func TestAllocatorRebalanceAway(t *testing.T) { nil, testRangeInfo(existingReplicas, firstRange), storeFilterThrottled, - false, ) if tc.expected == nil && actual != nil { @@ -5267,8 +5125,6 @@ func TestAllocatorFullDisks(t *testing.T) { defer stopper.Stop(ctx) st := cluster.MakeTestingClusterSettings() - EnableStatsBasedRebalancing.Override(&st.SV, false) - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) // Model a set of stores in a cluster doing rebalancing, with ranges being @@ -5367,7 +5223,6 @@ func TestAllocatorFullDisks(t *testing.T) { nil, testRangeInfo([]roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, firstRange), storeFilterThrottled, - false, ) if target != nil { if log.V(1) { @@ -5406,8 +5261,6 @@ func Example_rebalancing() { defer stopper.Stop(context.TODO()) st := cluster.MakeTestingClusterSettings() - EnableStatsBasedRebalancing.Override(&st.SV, false) - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) // Model a set of stores in a cluster, @@ -5492,7 +5345,6 @@ func Example_rebalancing() { nil, testRangeInfo([]roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, firstRange), storeFilterThrottled, - false, ) if target != nil { log.Infof(context.TODO(), "rebalancing to %v; details: %s", target, details) diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 52eb40ffa02e..b67771f64060 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -1159,7 +1159,7 @@ func (r *Replica) adminScatter( var allowLeaseTransfer bool canTransferLease := func() bool { return allowLeaseTransfer } for re := retry.StartWithCtx(ctx, retryOpts); re.Next(); { - requeue, err := rq.processOneChange(ctx, r, sysCfg, canTransferLease, false /* dryRun */, true /* disableStatsBasedRebalancing */) + requeue, err := rq.processOneChange(ctx, r, sysCfg, canTransferLease, false /* dryRun */) if err != nil { if IsSnapshotError(err) { continue diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 82ed88479cdf..34a93da3ced6 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -204,7 +204,7 @@ func (rq *replicateQueue) shouldQueue( } rangeInfo := rangeInfoForRepl(repl, desc) - action, priority := rq.allocator.ComputeAction(ctx, zone, rangeInfo, false) + action, priority := rq.allocator.ComputeAction(ctx, zone, rangeInfo) if action == AllocatorNoop { log.VEventf(ctx, 2, "no action to take") return false, 0 @@ -214,7 +214,7 @@ func (rq *replicateQueue) shouldQueue( } if !rq.store.TestingKnobs().DisableReplicaRebalancing { - target, _ := rq.allocator.RebalanceTarget(ctx, zone, repl.RaftStatus(), rangeInfo, storeFilterThrottled, false) + target, _ := rq.allocator.RebalanceTarget(ctx, zone, repl.RaftStatus(), rangeInfo, storeFilterThrottled) if target != nil { log.VEventf(ctx, 2, "rebalance target found, enqueuing") return true, 0 @@ -249,7 +249,7 @@ func (rq *replicateQueue) process( // snapshot errors, usually signaling that a rebalancing // reservation could not be made with the selected target. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - if requeue, err := rq.processOneChange(ctx, repl, sysCfg, rq.canTransferLease, false /* dryRun */, false /* disableStatsBasedRebalancing */); err != nil { + if requeue, err := rq.processOneChange(ctx, repl, sysCfg, rq.canTransferLease, false /* dryRun */); err != nil { if IsSnapshotError(err) { // If ChangeReplicas failed because the preemptive snapshot failed, we // log the error but then return success indicating we should retry the @@ -276,7 +276,6 @@ func (rq *replicateQueue) processOneChange( sysCfg config.SystemConfig, canTransferLease func() bool, dryRun bool, - disableStatsBasedRebalancing bool, ) (requeue bool, _ error) { desc := repl.Desc() @@ -297,7 +296,7 @@ func (rq *replicateQueue) processOneChange( } rangeInfo := rangeInfoForRepl(repl, desc) - switch action, _ := rq.allocator.ComputeAction(ctx, zone, rangeInfo, disableStatsBasedRebalancing); action { + switch action, _ := rq.allocator.ComputeAction(ctx, zone, rangeInfo); action { case AllocatorNoop: break case AllocatorAdd: @@ -307,7 +306,6 @@ func (rq *replicateQueue) processOneChange( zone, liveReplicas, // only include liveReplicas, since deadReplicas should soon be removed rangeInfo, - disableStatsBasedRebalancing, ) if err != nil { return false, err @@ -342,7 +340,6 @@ func (rq *replicateQueue) processOneChange( zone, oldPlusNewReplicas, rangeInfo, - disableStatsBasedRebalancing, ) if err != nil { // It does not seem possible to go to the next odd replica state. Note @@ -379,7 +376,7 @@ func (rq *replicateQueue) processOneChange( return false, errors.Errorf("no removable replicas from range that needs a removal: %s", rangeRaftProgress(repl.RaftStatus(), desc.Replicas)) } - removeReplica, details, err := rq.allocator.RemoveTarget(ctx, zone, candidates, rangeInfo, disableStatsBasedRebalancing) + removeReplica, details, err := rq.allocator.RemoveTarget(ctx, zone, candidates, rangeInfo) if err != nil { return false, err } @@ -493,7 +490,7 @@ func (rq *replicateQueue) processOneChange( if !rq.store.TestingKnobs().DisableReplicaRebalancing { rebalanceStore, details := rq.allocator.RebalanceTarget( - ctx, zone, repl.RaftStatus(), rangeInfo, storeFilterThrottled, disableStatsBasedRebalancing) + ctx, zone, repl.RaftStatus(), rangeInfo, storeFilterThrottled) if rebalanceStore == nil { log.VEventf(ctx, 1, "no suitable rebalance target") } else { diff --git a/pkg/storage/replicate_queue_test.go b/pkg/storage/replicate_queue_test.go index 8f07bc4a5df5..d32504c6fb5b 100644 --- a/pkg/storage/replicate_queue_test.go +++ b/pkg/storage/replicate_queue_test.go @@ -62,7 +62,7 @@ func TestReplicateQueueRebalance(t *testing.T) { for _, server := range tc.Servers { st := server.ClusterSettings() st.Manual.Store(true) - storage.EnableStatsBasedRebalancing.Override(&st.SV, false) + storage.LoadBasedRebalancingMode.Override(&st.SV, int64(storage.LBRebalancingOff)) } const newRanges = 5 diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 1f919dd98e78..66ea4b7cd31d 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -4542,7 +4542,7 @@ func (s *Store) AllocatorDryRun( defer cancel() canTransferLease := func() bool { return true } _, err := s.replicateQueue.processOneChange( - ctx, repl, sysCfg, canTransferLease, true /* dryRun */, false /* disableStatsBasedRebalancing */) + ctx, repl, sysCfg, canTransferLease, true /* dryRun */) if err != nil { log.Eventf(ctx, "error simulating allocator on replica %s: %s", repl, err) } diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go index 1f9afe958dfa..4627828b824c 100644 --- a/pkg/storage/store_rebalancer.go +++ b/pkg/storage/store_rebalancer.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -41,6 +42,47 @@ const ( minQPSThresholdDifference = 100 ) +// LoadBasedRebalancingMode controls whether range rebalancing takes +// additional variables such as write load and disk usage into account. +// If disabled, rebalancing is done purely based on replica count. +var LoadBasedRebalancingMode = settings.RegisterEnumSetting( + "kv.allocator.load_based_rebalancing", + "whether to rebalance based on the distribution of QPS across stores", + "leases", + map[int64]string{ + int64(LBRebalancingOff): "off", + int64(LBRebalancingLeasesOnly): "leases", + int64(LBRebalancingLeasesAndReplicas): "leases and replicas", + }, +) + +// qpsRebalanceThreshold is much like rangeRebalanceThreshold, but for +// QPS rather than range count. This should be set higher than +// rangeRebalanceThreshold because QPS can naturally vary over time as +// workloads change and clients come and go, so we need to be a little more +// forgiving to avoid thrashing. +var qpsRebalanceThreshold = settings.RegisterNonNegativeFloatSetting( + "kv.allocator.qps_rebalance_threshold", + "minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull", + 0.25, +) + +// LBRebalancingMode controls if and when we do store-level rebalancing +// based on load. +type LBRebalancingMode int64 + +const ( + // LBRebalancingOff means that we do not do store-level rebalancing + // based on load statistics. + LBRebalancingOff LBRebalancingMode = iota + // LBRebalancingLeasesOnly means that we rebalance leases based on + // store-level QPS imbalances. + LBRebalancingLeasesOnly + // LBRebalancingLeasesAndReplicas means that we rebalance both leases and + // replicas based on store-level QPS imbalances. + LBRebalancingLeasesAndReplicas +) + // StoreRebalancer is responsible for examining how the associated store's load // compares to the load on other stores in the cluster and transferring leases // or replicas away if the local store is overloaded. @@ -106,24 +148,26 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { case <-ticker.C: } - if !EnableStatsBasedRebalancing.Get(&sr.st.SV) { + mode := LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)) + if mode == LBRebalancingOff { continue } storeList, _, _ := sr.rq.allocator.storePool.getStoreList(roachpb.RangeID(0), storeFilterNone) - sr.rebalanceStore(ctx, storeList) + sr.rebalanceStore(ctx, mode, storeList) } }) } -func (sr *StoreRebalancer) rebalanceStore(ctx context.Context, storeList StoreList) { - - statThreshold := statRebalanceThreshold.Get(&sr.st.SV) +func (sr *StoreRebalancer) rebalanceStore( + ctx context.Context, mode LBRebalancingMode, storeList StoreList, +) { + qpsThresholdFraction := qpsRebalanceThreshold.Get(&sr.st.SV) // First check if we should transfer leases away to better balance QPS. - qpsMinThreshold := math.Min(storeList.candidateQueriesPerSecond.mean*(1-statThreshold), + qpsMinThreshold := math.Min(storeList.candidateQueriesPerSecond.mean*(1-qpsThresholdFraction), storeList.candidateQueriesPerSecond.mean-minQPSThresholdDifference) - qpsMaxThreshold := math.Max(storeList.candidateQueriesPerSecond.mean*(1+statThreshold), + qpsMaxThreshold := math.Max(storeList.candidateQueriesPerSecond.mean*(1+qpsThresholdFraction), storeList.candidateQueriesPerSecond.mean+minQPSThresholdDifference) var localDesc *roachpb.StoreDescriptor @@ -161,9 +205,6 @@ func (sr *StoreRebalancer) rebalanceStore(ctx context.Context, storeList StoreLi ctx, sysCfg, &hottestRanges, localDesc, storeList, storeMap, qpsMinThreshold, qpsMaxThreshold) replicasToMaybeRebalance = append(replicasToMaybeRebalance, considerForRebalance...) if replWithStats.repl == nil { - log.Infof(ctx, - "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f); considering load-based replica rebalances", - localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold) break } @@ -179,11 +220,7 @@ func (sr *StoreRebalancer) rebalanceStore(ctx context.Context, storeList StoreLi // Finally, update our local copies of the descriptors so that if // additional transfers are needed we'll be making the decisions with more - // up-to-date info. - // - // TODO(a-robinson): This just updates the copies used locally by the - // storeRebalancer. We may also want to update the copies in the StorePool - // itself. + // up-to-date info. The StorePool copies are updated by transferLease. localDesc.Capacity.LeaseCount-- localDesc.Capacity.QueriesPerSecond -= replWithStats.qps if otherDesc := storeMap[target.StoreID]; otherDesc != nil { @@ -199,6 +236,16 @@ func (sr *StoreRebalancer) rebalanceStore(ctx context.Context, storeList StoreLi return } + if mode != LBRebalancingLeasesAndReplicas { + log.Infof(ctx, + "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f)", + localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold) + return + } + log.Infof(ctx, + "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f); considering load-based replica rebalances", + localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold) + // Re-combine replicasToMaybeRebalance with what remains of hottestRanges so // that we'll reconsider them for replica rebalancing. replicasToMaybeRebalance = append(replicasToMaybeRebalance, hottestRanges...) @@ -467,8 +514,8 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( // such as zone constraints, locality diversity, and full disk come // into play. rangeInfo := rangeInfoForRepl(replWithStats.repl, desc) - options := sr.rq.allocator.scorerOptions(false /* disableStatsBasedRebalancing */) - options.balanceQPSInsteadOfCount = true + options := sr.rq.allocator.scorerOptions() + options.qpsRebalanceThreshold = qpsRebalanceThreshold.Get(&sr.st.SV) target, _ := sr.rq.allocator.allocateTargetFromList( ctx, storeList, @@ -556,12 +603,3 @@ func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor { } return storeMap } - -func existingTarget(targets []roachpb.ReplicationTarget, candidateID roachpb.StoreID) bool { - for _, target := range targets { - if candidateID == target.StoreID { - return true - } - } - return false -} From 8488d026f1c50794f9fc94f028b8452fa26bc78e Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Wed, 5 Sep 2018 11:55:46 -0500 Subject: [PATCH 5/5] storage: Add metrics tracking load-based rebalance operations Release note: None --- pkg/storage/store.go | 10 +++-- pkg/storage/store_rebalancer.go | 41 +++++++++++++++++-- .../nodeGraphs/dashboards/replication.tsx | 2 + 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 66ea4b7cd31d..23f692c23e17 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1021,10 +1021,10 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript ) s.scanner.AddQueues(s.tsMaintenanceQueue) } - } - s.storeRebalancer = NewStoreRebalancer( - s.cfg.AmbientCtx, cfg.Settings, s.replicateQueue, s.replRankings) + s.storeRebalancer = NewStoreRebalancer( + s.cfg.AmbientCtx, cfg.Settings, s.replicateQueue, s.replRankings) + } if cfg.TestingKnobs.DisableGCQueue { s.setGCQueueActive(false) @@ -1560,7 +1560,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Connect rangefeeds to closed timestamp updates. s.startClosedTimestampRangefeedSubscriber(ctx) - s.storeRebalancer.Start(ctx, s.stopper) + if s.storeRebalancer != nil { + s.storeRebalancer.Start(ctx, s.stopper) + } // Start the storage engine compactor. if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) { diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go index 4627828b824c..41daaf277a25 100644 --- a/pkg/storage/store_rebalancer.go +++ b/pkg/storage/store_rebalancer.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" ) @@ -42,6 +43,34 @@ const ( minQPSThresholdDifference = 100 ) +var ( + metaStoreRebalancerLeaseTransferCount = metric.Metadata{ + Name: "rebalancing.lease.transfers", + Help: "Number of lease transfers motivated by store-level load imbalances", + Measurement: "Lease Transfers", + Unit: metric.Unit_COUNT, + } + metaStoreRebalancerRangeRebalanceCount = metric.Metadata{ + Name: "rebalancing.range.rebalances", + Help: "Number of range rebalance operations motivated by store-level load imbalances", + Measurement: "Range Rebalances", + Unit: metric.Unit_COUNT, + } +) + +// StoreRebalancerMetrics is the set of metrics for the store-level rebalancer. +type StoreRebalancerMetrics struct { + LeaseTransferCount *metric.Counter + RangeRebalanceCount *metric.Counter +} + +func makeStoreRebalancerMetrics() StoreRebalancerMetrics { + return StoreRebalancerMetrics{ + LeaseTransferCount: metric.NewCounter(metaStoreRebalancerLeaseTransferCount), + RangeRebalanceCount: metric.NewCounter(metaStoreRebalancerRangeRebalanceCount), + } +} + // LoadBasedRebalancingMode controls whether range rebalancing takes // additional variables such as write load and disk usage into account. // If disabled, rebalancing is done purely based on replica count. @@ -95,6 +124,7 @@ const ( // will best accomplish the store-level goals. type StoreRebalancer struct { log.AmbientContext + metrics StoreRebalancerMetrics st *cluster.Settings rq *replicateQueue replRankings *replicaRankings @@ -109,12 +139,15 @@ func NewStoreRebalancer( replRankings *replicaRankings, ) *StoreRebalancer { ambientCtx.AddLogTag("store-rebalancer", nil) - return &StoreRebalancer{ + sr := &StoreRebalancer{ AmbientContext: ambientCtx, + metrics: makeStoreRebalancerMetrics(), st: st, rq: rq, replRankings: replRankings, } + sr.rq.store.metrics.registry.AddMetricStruct(&sr.metrics) + return sr } // Start runs an infinite loop in a goroutine which regularly checks whether @@ -217,6 +250,7 @@ func (sr *StoreRebalancer) rebalanceStore( continue } cancel() + sr.metrics.LeaseTransferCount.Inc(1) // Finally, update our local copies of the descriptors so that if // additional transfers are needed we'll be making the decisions with more @@ -271,14 +305,15 @@ func (sr *StoreRebalancer) rebalanceStore( log.VEventf(ctx, 1, "rebalancing r%d (%.2f qps) from %v to %v to better balance load", replWithStats.repl.RangeID, replWithStats.qps, descBeforeRebalance.Replicas, targets) replCtx, cancel := context.WithTimeout(replWithStats.repl.AnnotateCtx(ctx), sr.rq.processTimeout) - // TODO: Either make RelocateRange production-ready or do the rebalancing - // another way. + // TODO(a-robinson): Either make RelocateRange production-ready or do the + // rebalancing another way. if err := RelocateRange(replCtx, sr.rq.store.DB(), *descBeforeRebalance, targets); err != nil { cancel() log.Errorf(replCtx, "unable to relocate range to %v: %v", targets, err) continue } cancel() + sr.metrics.RangeRebalanceCount.Inc(1) // Finally, update our local copies of the descriptors so that if // additional transfers are needed we'll be making the decisions with more diff --git a/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/replication.tsx b/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/replication.tsx index 135bd59f95a6..c81e41a2eb9d 100644 --- a/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/replication.tsx +++ b/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/replication.tsx @@ -85,6 +85,8 @@ export default function (props: GraphDashboardProps) { + + ,