From b9b2b6f2e175949cb37466129d6e7233e69b86b8 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Wed, 11 Jan 2023 15:52:39 +0000 Subject: [PATCH] kvserver: instrument replica cputime rebalancing This patch instruments the store rebalancer using store cpu time as opposed to QPS when balancing the cluster. This patch adds `store_cpu` as an option with the existing, now public cluster setting: `kv.allocator.load_based_rebalancing_dimension` When set to `store_cpu`, rather than `qps`. The store rebalancer will perform a mostly identical function, however target balancing the sum of all replica's cpu time on each store, rather than qps. Similar to QPS, the rebalance threshold can be set to allow controlling the aggressiveness of balancing: `kv.allocator.store_cpu_rebalance_threshold`: 0.1 resolves: #95380 Release note (ops change): Add option to balance store cpu time instead of queries per second (qps) by setting `kv.allocator.load_based_rebalancing_dimension='store_cpu'`. `kv.allocator.store_cpu_rebalance_threshold` is also added, similar to `kv.allocator.qps_rebalance_threshold` to control the target range for store cpu above and below the cluster mean. --- pkg/clusterversion/cockroach_versions.go | 8 + .../allocator/allocatorimpl/threshold.go | 16 + pkg/kv/kvserver/allocator/base.go | 45 +++ pkg/kv/kvserver/allocator/load/BUILD.bazel | 1 + pkg/kv/kvserver/allocator/load/dimension.go | 13 +- pkg/kv/kvserver/allocator/range_usage_info.go | 8 + .../allocator/storepool/store_pool.go | 39 ++- pkg/kv/kvserver/store.go | 6 +- pkg/kv/kvserver/store_rebalancer.go | 66 +++- pkg/kv/kvserver/store_rebalancer_test.go | 317 +++++++++++------- pkg/roachpb/metadata.go | 1 + pkg/roachpb/metadata.proto | 4 + 12 files changed, 391 insertions(+), 133 deletions(-) diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 359bf0144dc5..20afab343254 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -395,6 +395,10 @@ const ( // V23_1AlterSystemSQLInstancesAddSqlAddr adds a sql_addr column to the // system.sql_instances table. V23_1AlterSystemSQLInstancesAddSQLAddr + // V23_1AllocatorCPUBalancing adds balancing CPU usage among stores using + // the allocator and store rebalancer. It assumes that at this version, + // stores now include their CPU in the StoreCapacity proto when gossiping. + V23_1AllocatorCPUBalancing // ************************************************* // Step (1): Add new versions here. @@ -678,6 +682,10 @@ var rawVersionsSingleton = keyedVersions{ Key: V23_1AlterSystemSQLInstancesAddSQLAddr, Version: roachpb.Version{Major: 22, Minor: 2, Internal: 28}, }, + { + Key: V23_1AllocatorCPUBalancing, + Version: roachpb.Version{Major: 22, Minor: 2, Internal: 30}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go b/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go index e570ed471a77..9ab8ee8bbdfe 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go @@ -27,6 +27,8 @@ func getLoadThreshold(dim load.Dimension, sv *settings.Values) float64 { switch dim { case load.Queries: return allocator.QPSRebalanceThreshold.Get(sv) + case load.StoreCPU: + return allocator.StoreCPURebalanceThreshold.Get(sv) default: panic(errors.AssertionFailedf("Unkown load dimension %d", dim)) } @@ -51,6 +53,8 @@ func getLoadMinThreshold(dim load.Dimension) float64 { switch dim { case load.Queries: return allocator.MinQPSThresholdDifference + case load.StoreCPU: + return allocator.MinStoreCPUThresholdDifference default: panic(errors.AssertionFailedf("Unkown load dimension %d", dim)) } @@ -76,6 +80,8 @@ func getLoadRebalanceMinRequiredDiff(dim load.Dimension, sv *settings.Values) fl switch dim { case load.Queries: return allocator.MinQPSDifferenceForTransfers.Get(sv) + case load.StoreCPU: + return allocator.MinStoreCPUDifferenceForTransfers default: panic(errors.AssertionFailedf("Unkown load dimension %d", dim)) } @@ -117,3 +123,13 @@ func MakeQPSOnlyDim(v float64) load.Load { dims[load.Queries] = v return dims } + +// SetAllDims returns a load vector with all dimensions filled in with the +// value given. +func SetAllDims(v float64) load.Load { + dims := load.Vector{} + for i := range dims { + dims[i] = v + } + return dims +} diff --git a/pkg/kv/kvserver/allocator/base.go b/pkg/kv/kvserver/allocator/base.go index 0f70f5b9a3bc..f266a9aba108 100644 --- a/pkg/kv/kvserver/allocator/base.go +++ b/pkg/kv/kvserver/allocator/base.go @@ -37,6 +37,30 @@ const ( // lightly loaded clusters. MinQPSThresholdDifference = 100 + // MinStoreCPUThresholdDifference is the minimum CPU difference from the cluster + // mean that this system should care about. The system won't attempt to + // take action if a store's CPU differs from the mean by less than this + // amount even if it is greater than the percentage threshold. This + // prevents too many lease transfers or range rebalances in lightly loaded + // clusters. + // + // NB: This represents 5% (1/20) utilization of 1 cpu on average. This + // number was arrived at from testing to minimize thrashing. This number is + // set independent of processor speed and assumes identical value of cpu + // time across all stores. i.e. all cpu's are identical. + MinStoreCPUThresholdDifference = float64(50 * time.Millisecond) + + // MinStoreCPUDifferenceForTransfers is the minimum CPU difference that a + // store rebalncer would care about to reconcile (via lease or replica + // rebalancing) between any two stores. + // + // NB: This is set to be two times the minimum threshold that a store needs + // to be above or below the mean to be considered overfull or underfull + // respectively. This is to make lease transfers and replica rebalances + // less sensistive to jitters in any given workload by introducing + // additional friction before taking these actions. + MinStoreCPUDifferenceForTransfers = 2 * MinStoreCPUThresholdDifference + // defaultLoadBasedRebalancingInterval is how frequently to check the store-level // balance of the cluster. defaultLoadBasedRebalancingInterval = time.Minute @@ -107,6 +131,27 @@ var QPSRebalanceThreshold = func() *settings.FloatSetting { return s }() +// StoreCPURebalanceThreshold is the minimum ratio of a store's cpu time to the mean +// cpu time at which that store is considered overfull or underfull of cpu +// usage. +var StoreCPURebalanceThreshold = func() *settings.FloatSetting { + s := settings.RegisterFloatSetting( + settings.SystemOnly, + "kv.allocator.store_cpu_rebalance_threshold", + "minimum fraction away from the mean a store's cpu usage can be before it is considered overfull or underfull", + 0.10, + settings.NonNegativeFloat, + func(f float64) error { + if f < 0.01 { + return errors.Errorf("cannot set kv.allocator.store_cpu_rebalance_threshold to less than 0.01") + } + return nil + }, + ) + s.SetVisibility(settings.Public) + return s +}() + // LoadBasedRebalanceInterval controls how frequently each store checks for // load-base lease/replica rebalancing opportunties. var LoadBasedRebalanceInterval = settings.RegisterPublicDurationSettingWithExplicitUnit( diff --git a/pkg/kv/kvserver/allocator/load/BUILD.bazel b/pkg/kv/kvserver/allocator/load/BUILD.bazel index 05873c3c89bf..a9cf0b67deaa 100644 --- a/pkg/kv/kvserver/allocator/load/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/load/BUILD.bazel @@ -10,6 +10,7 @@ go_library( ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load", visibility = ["//visibility:public"], + deps = ["//pkg/util/humanizeutil"], ) get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/allocator/load/dimension.go b/pkg/kv/kvserver/allocator/load/dimension.go index 3241f2ee1dae..1cc0557a599e 100644 --- a/pkg/kv/kvserver/allocator/load/dimension.go +++ b/pkg/kv/kvserver/allocator/load/dimension.go @@ -10,7 +10,12 @@ package load -import "fmt" +import ( + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" +) // Dimension is a singe dimension of load that a component may track. type Dimension int @@ -18,6 +23,8 @@ type Dimension int const ( // Queries refers to the number of queries. Queries Dimension = iota + // StoreCPU refers to the sum of replica cpu time on a store. + StoreCPU nDimensionsTyped nDimensions = int(nDimensionsTyped) @@ -28,6 +35,8 @@ func (d Dimension) String() string { switch d { case Queries: return "queries-per-second" + case StoreCPU: + return "store-cpu-per-second" default: panic(fmt.Sprintf("cannot name: unknown dimension with ordinal %d", d)) } @@ -38,6 +47,8 @@ func (d Dimension) Format(value float64) string { switch d { case Queries: return fmt.Sprintf("%.1f", value) + case StoreCPU: + return string(humanizeutil.Duration(time.Duration(int64(value)))) default: panic(fmt.Sprintf("cannot format value: unknown dimension with ordinal %d", d)) } diff --git a/pkg/kv/kvserver/allocator/range_usage_info.go b/pkg/kv/kvserver/allocator/range_usage_info.go index 45e2fa9568a8..d8fb929dcb95 100644 --- a/pkg/kv/kvserver/allocator/range_usage_info.go +++ b/pkg/kv/kvserver/allocator/range_usage_info.go @@ -39,6 +39,7 @@ type RangeRequestLocalityInfo struct { func (r RangeUsageInfo) Load() load.Load { dims := load.Vector{} dims[load.Queries] = r.QueriesPerSecond + dims[load.StoreCPU] = r.RequestCPUNanosPerSecond + r.RaftCPUNanosPerSecond return dims } @@ -47,5 +48,12 @@ func (r RangeUsageInfo) Load() load.Load { func (r RangeUsageInfo) TransferImpact() load.Load { dims := load.Vector{} dims[load.Queries] = r.QueriesPerSecond + // Only use the request recorded cpu. This assumes that all replicas will + // use the same amount of raft cpu - which may be dubious. + // + // TODO(kvoli): + // Investigate whether this is a valid assumption and look to separate out + // leaseholder vs replica cpu usage in accounting. + dims[load.StoreCPU] = r.RequestCPUNanosPerSecond return dims } diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 9dbd4978884e..936352875dad 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -577,11 +577,16 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance( // network). We can't update the local store at this time. return } + // Only apply the raft cpu delta on rebalance. This estimate assumes + // that the raft cpu usage is approximately equal across replicas for a + // range. + // TODO(kvoli): Validate this assumption or remove the estimated impact. switch changeType { case roachpb.ADD_VOTER, roachpb.ADD_NON_VOTER: detail.Desc.Capacity.RangeCount++ detail.Desc.Capacity.LogicalBytes += rangeUsageInfo.LogicalBytes detail.Desc.Capacity.WritesPerSecond += rangeUsageInfo.WritesPerSecond + detail.Desc.Capacity.StoreCPUPerSecond += rangeUsageInfo.RaftCPUNanosPerSecond case roachpb.REMOVE_VOTER, roachpb.REMOVE_NON_VOTER: detail.Desc.Capacity.RangeCount-- if detail.Desc.Capacity.LogicalBytes <= rangeUsageInfo.LogicalBytes { @@ -594,6 +599,11 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance( } else { detail.Desc.Capacity.WritesPerSecond -= rangeUsageInfo.WritesPerSecond } + if detail.Desc.Capacity.StoreCPUPerSecond <= rangeUsageInfo.RaftCPUNanosPerSecond { + detail.Desc.Capacity.StoreCPUPerSecond = 0 + } else { + detail.Desc.Capacity.StoreCPUPerSecond -= rangeUsageInfo.RaftCPUNanosPerSecond + } default: return } @@ -622,10 +632,15 @@ func (sp *StorePool) UpdateLocalStoreAfterRelocate( sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() + // Only apply the raft cpu delta on rebalance. This estimate assumes + // that the raft cpu usage is approximately equal across replicas for a + // range. + // TODO(kvoli): Validate this assumption or remove the estimated impact. updateTargets := func(targets []roachpb.ReplicationTarget) { for _, target := range targets { if toDetail := sp.GetStoreDetailLocked(target.StoreID); toDetail != nil { toDetail.Desc.Capacity.RangeCount++ + toDetail.Desc.Capacity.StoreCPUPerSecond += rangeUsageInfo.RaftCPUNanosPerSecond } } } @@ -633,6 +648,7 @@ func (sp *StorePool) UpdateLocalStoreAfterRelocate( for _, old := range previous { if toDetail := sp.GetStoreDetailLocked(old.StoreID); toDetail != nil { toDetail.Desc.Capacity.RangeCount-- + toDetail.Desc.Capacity.StoreCPUPerSecond -= rangeUsageInfo.RaftCPUNanosPerSecond } } } @@ -659,6 +675,16 @@ func (sp *StorePool) UpdateLocalStoresAfterLeaseTransfer( } else { fromDetail.Desc.Capacity.QueriesPerSecond -= rangeUsageInfo.QueriesPerSecond } + // Only apply the request cpu (leaseholder + follower-reads) delta on + // transfers. Note this does not correctly account for follower reads + // remaining on the prior leaseholder after lease transfer. Instead, + // only a cpu delta specific to the lease should be applied. + if fromDetail.Desc.Capacity.StoreCPUPerSecond <= rangeUsageInfo.RequestCPUNanosPerSecond { + fromDetail.Desc.Capacity.StoreCPUPerSecond = 0 + } else { + fromDetail.Desc.Capacity.StoreCPUPerSecond -= rangeUsageInfo.RequestCPUNanosPerSecond + } + sp.DetailsMu.StoreDetails[from] = &fromDetail } @@ -666,6 +692,7 @@ func (sp *StorePool) UpdateLocalStoresAfterLeaseTransfer( if toDetail.Desc != nil { toDetail.Desc.Capacity.LeaseCount++ toDetail.Desc.Capacity.QueriesPerSecond += rangeUsageInfo.QueriesPerSecond + toDetail.Desc.Capacity.StoreCPUPerSecond += rangeUsageInfo.RequestCPUNanosPerSecond sp.DetailsMu.StoreDetails[to] = &toDetail } } @@ -935,6 +962,10 @@ type StoreList struct { // to be rebalance targets. candidateLogicalBytes Stat + // CandidateStoreCPU tracks store-cpu-per-second stats for Stores that are + // eligible to be rebalance targets. + CandidateStoreCPU Stat + // CandidateQueriesPerSecond tracks queries-per-second stats for Stores that // are eligible to be rebalance targets. CandidateQueriesPerSecond Stat @@ -961,6 +992,7 @@ func MakeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { sl.CandidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond) sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond) sl.CandidateL0Sublevels.update(float64(desc.Capacity.L0Sublevels)) + sl.CandidateStoreCPU.update(desc.Capacity.StoreCPUPerSecond) } return sl } @@ -968,11 +1000,12 @@ func MakeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { func (sl StoreList) String() string { var buf bytes.Buffer fmt.Fprintf(&buf, - " candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v", + " candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v avg-store-cpu-per-second=%v", sl.CandidateRanges.Mean, sl.CandidateLeases.Mean, humanizeutil.IBytes(int64(sl.candidateLogicalBytes.Mean)), sl.CandidateQueriesPerSecond.Mean, + humanizeutil.Duration(time.Duration(int64(sl.CandidateStoreCPU.Mean))), ) if len(sl.Stores) > 0 { fmt.Fprintf(&buf, "\n") @@ -980,10 +1013,11 @@ func (sl StoreList) String() string { fmt.Fprintf(&buf, " ") } for _, desc := range sl.Stores { - fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f l0-sublevels=%d\n", + fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f store-cpu-per-second=%s l0-sublevels=%d\n", desc.StoreID, desc.Capacity.RangeCount, desc.Capacity.LeaseCount, humanizeutil.IBytes(desc.Capacity.LogicalBytes), desc.Capacity.QueriesPerSecond, + humanizeutil.Duration(time.Duration(int64(desc.Capacity.StoreCPUPerSecond))), desc.Capacity.L0Sublevels, ) } @@ -1010,6 +1044,7 @@ func (sl StoreList) ExcludeInvalid(constraints []roachpb.ConstraintsConjunction) func (sl StoreList) LoadMeans() load.Load { dims := load.Vector{} dims[load.Queries] = sl.CandidateQueriesPerSecond.Mean + dims[load.StoreCPU] = sl.CandidateStoreCPU.Mean return dims } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 0bd120cd9ec9..76a16acc417b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2910,11 +2910,11 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa var l0SublevelsMax int64 var totalQueriesPerSecond float64 var totalWritesPerSecond float64 + var totalStoreCPUTimePerSecond float64 replicaCount := s.metrics.ReplicaCount.Value() bytesPerReplica := make([]float64, 0, replicaCount) writesPerReplica := make([]float64, 0, replicaCount) - rankingsAccumulator := NewReplicaAccumulator( - LBRebalancingDimension(LoadBasedRebalancingDimension.Get(&s.ClusterSettings().SV)).ToDimension()) + rankingsAccumulator := NewReplicaAccumulator(LoadBasedRebalancingObjective(ctx, s.ClusterSettings()).ToDimension()) rankingsByTenantAccumulator := NewTenantReplicaAccumulator() // Query the current L0 sublevels and record the updated maximum to metrics. @@ -2932,6 +2932,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // starts? We can't easily have a countdown as its value changes like for // leases/replicas. // TODO(a-robinson): Calculate percentiles for qps? Get rid of other percentiles? + totalStoreCPUTimePerSecond += usage.RequestCPUNanosPerSecond + usage.RaftCPUNanosPerSecond totalQueriesPerSecond += usage.QueriesPerSecond totalWritesPerSecond += usage.WritesPerSecond writesPerReplica = append(writesPerReplica, usage.WritesPerSecond) @@ -2946,6 +2947,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa capacity.RangeCount = rangeCount capacity.LeaseCount = leaseCount capacity.LogicalBytes = logicalBytes + capacity.StoreCPUPerSecond = totalStoreCPUTimePerSecond capacity.QueriesPerSecond = totalQueriesPerSecond capacity.WritesPerSecond = totalWritesPerSecond capacity.L0Sublevels = l0SublevelsMax diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index ee704a6687c5..75b5455f2696 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -16,6 +16,7 @@ import ( "math/rand" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/contextutil" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -85,17 +87,16 @@ var LoadBasedRebalancingMode = settings.RegisterEnumSetting( // LoadBasedRebalancingDimension controls what dimension rebalancing takes into // account. -// NB: This value is set to private on purpose, as this cluster setting is a -// noop at the moment. var LoadBasedRebalancingDimension = settings.RegisterEnumSetting( settings.SystemOnly, "kv.allocator.load_based_rebalancing_dimension", "what dimension of load does rebalancing consider", - "qps", + "store_cpu", map[int64]string{ - int64(LBRebalancingQueries): "qps", + int64(LBRebalancingQueries): "qps", + int64(LBRebalancingStoreCPU): "store_cpu", }, -) +).WithPublic() // LBRebalancingMode controls if and when we do store-level rebalancing // based on load. @@ -119,12 +120,48 @@ type LBRebalancingDimension int64 const ( // LBRebalancingQueries is a rebalancing mode that balances queries (QPS). LBRebalancingQueries LBRebalancingDimension = iota + + // LBRebalancingStoreCPU is a rebalance mode that balances the store CPU + // usage. The store cpu usage is calculated as the sum of replica's cpu + // usage on the store. + LBRebalancingStoreCPU ) +// LoadBasedRebalancingObjective returns the load based rebalancing objective +// for the cluster. In cases where a first objective cannot be used, it will +// return a fallback. +func LoadBasedRebalancingObjective( + ctx context.Context, st *cluster.Settings, +) LBRebalancingDimension { + set := LoadBasedRebalancingDimension.Get(&st.SV) + // Queries should always be supported, return early if set. + if set == int64(LBRebalancingQueries) { + return LBRebalancingQueries + } + // When the cluster version hasn't finalized to 23.1, some unupgraded + // stores will not be populating additional fields in their StoreCapacity, + // in such cases we cannot balance another objective since the data may not + // exist. Fall back to QPS balancing. + if st.Version.IsActive(ctx, clusterversion.V23_1AllocatorCPUBalancing) { + return LBRebalancingQueries + } + // When the cpu timekeeping utility is unsupported on this aarch, the cpu + // usage cannot be gathered. Fall back to QPS balancing. + if !grunning.Supported() { + return LBRebalancingQueries + } + // The cluster is on a supported version and this local store is on aarch + // which supported the cpu timekeeping utility, return the cluster setting + // as is. + return LBRebalancingDimension(set) +} + func (d LBRebalancingDimension) ToDimension() load.Dimension { switch d { case LBRebalancingQueries: return load.Queries + case LBRebalancingStoreCPU: + return load.StoreCPU default: panic("unknown dimension") } @@ -233,6 +270,17 @@ type RebalanceContext struct { hottestRanges, rebalanceCandidates []CandidateReplica } +// RebalanceMode returns the mode of the store rebalancer. See +// LoadBasedRebalancingMode. +func (sr *StoreRebalancer) RebalanceMode() LBRebalancingMode { + return LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)) +} + +// RebalanceDimension returns the dimension the store rebalancer is balancing. +func (sr *StoreRebalancer) RebalanceObjective(ctx context.Context) LBRebalancingDimension { + return LoadBasedRebalancingObjective(ctx, sr.st) +} + // LessThanMaxThresholds returns true if the local store is below the maximum // threshold w.r.t the balanced load dimension, false otherwise. func (r *RebalanceContext) LessThanMaxThresholds() bool { @@ -270,14 +318,14 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { timer.Reset(jitteredInterval(allocator.LoadBasedRebalanceInterval.Get(&sr.st.SV))) } - mode := LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)) + mode := sr.RebalanceMode() if mode == LBRebalancingOff { continue } hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) sr.rebalanceStore(ctx, rctx) } }) @@ -289,7 +337,7 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { // Instead, we use our own implementation of `scorerOptions` that promotes load // balance. func (sr *StoreRebalancer) scorerOptions(ctx context.Context) *allocatorimpl.LoadScorerOptions { - lbDimension := LBRebalancingDimension(LoadBasedRebalancingDimension.Get(&sr.st.SV)).ToDimension() + lbDimension := sr.RebalanceObjective(ctx).ToDimension() return &allocatorimpl.LoadScorerOptions{ StoreHealthOptions: sr.allocator.StoreHealthOptions(ctx), Deterministic: sr.storePool.IsDeterministic(), @@ -323,7 +371,7 @@ func (sr *StoreRebalancer) NewRebalanceContext( return nil } - dims := LBRebalancingDimension(LoadBasedRebalancingDimension.Get(&sr.st.SV)).ToDimension() + dims := sr.RebalanceObjective(ctx).ToDimension() return &RebalanceContext{ LocalDesc: localDesc, loadDimension: dims, diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 66089fabf487..0567f6417961 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -17,6 +17,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" @@ -56,8 +57,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 3000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, + QueriesPerSecond: 3000, + StoreCPUPerSecond: 3000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, { @@ -74,8 +76,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 2800, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, + QueriesPerSecond: 2800, + StoreCPUPerSecond: 2800 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, }, }, { @@ -92,8 +95,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 2600, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 2, + QueriesPerSecond: 2600, + StoreCPUPerSecond: 2600 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 2, }, }, { @@ -110,8 +114,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 2400, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, + QueriesPerSecond: 2400, + StoreCPUPerSecond: 2400 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, { @@ -128,8 +133,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 2200, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 3, + QueriesPerSecond: 2200, + StoreCPUPerSecond: 2200 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 3, }, }, { @@ -146,8 +152,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 2000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 2, + QueriesPerSecond: 2000, + StoreCPUPerSecond: 2000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 2, }, }, { @@ -164,8 +171,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1800, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, + QueriesPerSecond: 1800, + StoreCPUPerSecond: 1800 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, { @@ -182,8 +190,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1600, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, + QueriesPerSecond: 1600, + StoreCPUPerSecond: 1600 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, }, }, { @@ -200,8 +209,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1400, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 3, + QueriesPerSecond: 1400, + StoreCPUPerSecond: 1400 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 3, }, }, } @@ -214,35 +224,40 @@ var ( StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1500, + QueriesPerSecond: 1500, + StoreCPUPerSecond: 1500 * float64(time.Millisecond), }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1300, + QueriesPerSecond: 1300, + StoreCPUPerSecond: 1300 * float64(time.Millisecond), }, }, { StoreID: 3, Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), }, }, { StoreID: 4, Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 900, + QueriesPerSecond: 900, + StoreCPUPerSecond: 900 * float64(time.Millisecond), }, }, { StoreID: 5, Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 500, + QueriesPerSecond: 500, + StoreCPUPerSecond: 500 * float64(time.Millisecond), }, }, } @@ -256,40 +271,45 @@ var ( StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, + QueriesPerSecond: 1500, + StoreCPUPerSecond: 1500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1300, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, + QueriesPerSecond: 1300, + StoreCPUPerSecond: 1300 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, { StoreID: 3, Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, }, }, { StoreID: 4, Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 900, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 20, + QueriesPerSecond: 900, + StoreCPUPerSecond: 900 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 20, }, }, { StoreID: 5, Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 25, + QueriesPerSecond: 500, + StoreCPUPerSecond: 500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 25, }, }, } @@ -301,40 +321,45 @@ var ( StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, }, }, { StoreID: 3, Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, { StoreID: 4, Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, }, }, { StoreID: 5, Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, } @@ -346,40 +371,45 @@ var ( StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 1500, + StoreCPUPerSecond: 1500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1300, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 1300, + StoreCPUPerSecond: 1300 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, { StoreID: 3, Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, { StoreID: 4, Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 900, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 900, + StoreCPUPerSecond: 900 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, { StoreID: 5, Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 500, + StoreCPUPerSecond: 500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, } @@ -391,40 +421,45 @@ var ( StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 1500, + StoreCPUPerSecond: 1500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1300, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 10, + QueriesPerSecond: 1300, + StoreCPUPerSecond: 1300 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 10, }, }, { StoreID: 3, Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 50, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 50, }, }, { StoreID: 4, Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 900, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, + QueriesPerSecond: 900, + StoreCPUPerSecond: 900 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, { StoreID: 5, Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, + QueriesPerSecond: 500, + StoreCPUPerSecond: 500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, } @@ -433,7 +468,7 @@ var ( type testRange struct { // The first storeID in the list will be the leaseholder. voters, nonVoters []roachpb.StoreID - qps float64 + qps, reqCPU float64 } func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange, loadDimension load.Dimension) { @@ -469,6 +504,7 @@ func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange, loadDimension repl.mu.state.Stats = &enginepb.MVCCStats{} repl.loadStats = rload.NewReplicaLoad(s.Clock(), nil) repl.loadStats.TestingSetStat(rload.Queries, r.qps) + repl.loadStats.TestingSetStat(rload.ReqCPUNanos, r.reqCPU) acc.AddReplica(candidateReplica{ Replica: repl, @@ -516,12 +552,13 @@ func TestChooseLeaseToTransfer(t *testing.T) { testCases := []struct { storeIDs []roachpb.StoreID - qps float64 + qps, reqCPU float64 expectTarget roachpb.StoreID }{ { storeIDs: []roachpb.StoreID{1}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 0, }, @@ -529,29 +566,37 @@ func TestChooseLeaseToTransfer(t *testing.T) { // (1300 for stores 1 and 2) is close enough to the current leaseholder's // QPS (1500). { - storeIDs: []roachpb.StoreID{1, 2}, - qps: 100, + storeIDs: []roachpb.StoreID{1, 2}, + qps: 100, + // NB: This is set +50 above qps, as the minimum threshold + // difference for store cpu is 50ms, while it is 100 qps for + // queries per second. + reqCPU: 150 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 2}, qps: 1000, + reqCPU: 1000 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 4, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{5, 1}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 0, }, { @@ -559,43 +604,54 @@ func TestChooseLeaseToTransfer(t *testing.T) { // be projected to have 1300 and 1200 qps respectively. storeIDs: []roachpb.StoreID{1, 3}, qps: 200, + reqCPU: 200 * float64(time.Millisecond), expectTarget: 3, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 200, + reqCPU: 200 * float64(time.Millisecond), expectTarget: 4, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 200, + reqCPU: 200 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 2}, qps: 500, + reqCPU: 500 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 3}, qps: 500, + reqCPU: 500 * float64(time.Millisecond), expectTarget: 0, }, // s1 without the lease would be projected to have 1000 qps, which is close // enough to s4's 900 qps. { - storeIDs: []roachpb.StoreID{1, 4}, - qps: 500, + storeIDs: []roachpb.StoreID{1, 4}, + qps: 500, + // NB: This is set +50 above qps, as the minimum threshold + // difference for store cpu is 50ms, while it is 100 qps for + // queries per second. + reqCPU: 550 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 500, + reqCPU: 500 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 600, + reqCPU: 600 * float64(time.Millisecond), expectTarget: 5, }, @@ -606,16 +662,19 @@ func TestChooseLeaseToTransfer(t *testing.T) { { storeIDs: []roachpb.StoreID{1, 5}, qps: 800, + reqCPU: 800 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 4, 5}, qps: 800, + reqCPU: 800 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 3, 4, 5}, qps: 800, + reqCPU: 800 * float64(time.Millisecond), expectTarget: 5, }, // NB: However, if s1 is projected to have 750 qps, we would expect a lease @@ -623,47 +682,56 @@ func TestChooseLeaseToTransfer(t *testing.T) { { storeIDs: []roachpb.StoreID{1, 3, 4, 5}, qps: 750, + reqCPU: 750 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 1.5, + reqCPU: 1.5 * float64(time.Millisecond), expectTarget: 4, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 1.5, + reqCPU: 1.5 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 1.49, + reqCPU: 1.49 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 1.49, + reqCPU: 1.49 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 2, 3, 4}, qps: 1500, + reqCPU: 1500 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 2, 3, 4, 5}, qps: 1500, + reqCPU: 1500 * float64(time.Millisecond), expectTarget: 0, }, } for _, tc := range testCases { t.Run("", func(t *testing.T) { - loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps}}, load.Queries) + lbRebalanceDimension := sr.RebalanceObjective(ctx).ToDimension() + loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps, reqCPU: tc.reqCPU}}, lbRebalanceDimension) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + options.LoadThreshold = allocatorimpl.SetAllDims(0.1) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) _, target, _ := sr.chooseLeaseToTransfer( ctx, rctx, @@ -677,16 +745,16 @@ func TestChooseLeaseToTransfer(t *testing.T) { } func randomNoLocalityStores( - numNodes int, qpsMultiplier float64, -) (stores []*roachpb.StoreDescriptor, qpsMean float64) { + numNodes int, loadMultiplier float64, +) (stores []*roachpb.StoreDescriptor, mean float64) { var totalQPS float64 for i := 1; i <= numNodes; i++ { - qps := rand.Float64() * qpsMultiplier + qps := rand.Float64() * loadMultiplier stores = append( stores, &roachpb.StoreDescriptor{ StoreID: roachpb.StoreID(i), Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, - Capacity: roachpb.StoreCapacity{QueriesPerSecond: qps}, + Capacity: roachpb.StoreCapacity{QueriesPerSecond: qps, StoreCPUPerSecond: qps * float64(time.Millisecond)}, }, ) totalQPS = totalQPS + qps @@ -713,6 +781,7 @@ func logSummary( log.Infof(ctx, "generated random store list:\n%s", summary.String()) } +// TODO(kvoli): map to store cpu dimension for testing. func TestChooseRangeToRebalanceRandom(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -726,6 +795,7 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { numNodes = 12 numDeadNodes = 3 perReplicaQPS = 100 + perReplicaReqCPU = 100 * float64(time.Millisecond) qpsRebalanceThreshold = 0.25 epsilon = 1 @@ -791,17 +861,15 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { } loadRanges( rr, s, []testRange{ - {voters: voterStores, nonVoters: nonVoterStores, qps: perReplicaQPS}, - }, load.Queries, + {voters: voterStores, nonVoters: nonVoterStores, qps: perReplicaQPS, reqCPU: perReplicaReqCPU}, + }, sr.RebalanceObjective(ctx).ToDimension(), ) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(qpsRebalanceThreshold) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(qpsRebalanceThreshold) _, voterTargets, nonVoterTargets := sr.chooseRangeToRebalance(ctx, rctx) var rebalancedVoterStores, rebalancedNonVoterStores []roachpb.StoreID @@ -1125,10 +1193,11 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { s.cfg.DefaultSpanConfig.VoterConstraints = tc.voterConstraints s.cfg.DefaultSpanConfig.LeasePreferences = tc.leasePreferences const testingQPS = float64(60) + const testingReqCPU = 60 * float64(time.Millisecond) loadRanges( rr, s, []testRange{ - {voters: tc.voters, nonVoters: tc.nonVoters, qps: testingQPS}, - }, load.Queries, + {voters: tc.voters, nonVoters: tc.nonVoters, qps: testingQPS, reqCPU: testingReqCPU}, + }, sr.RebalanceObjective(ctx).ToDimension(), ) hottestRanges := sr.replicaRankings.TopLoad() @@ -1136,7 +1205,7 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingLeasesAndReplicas) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: allocatorimpl.StoreHealthBlockRebalanceTo} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(0.05) _, voterTargets, nonVoterTargets := sr.chooseRangeToRebalance( ctx, @@ -1208,16 +1277,21 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) { // Load a fake hot range that's already on the best stores. We want to ensure // that the store rebalancer doesn't attempt to rebalance ranges that it // cannot find better rebalance opportunities for. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{localDesc.StoreID}, qps: 100}}, load.Queries) + loadRanges(rr, s, + []testRange{ + {voters: []roachpb.StoreID{localDesc.StoreID}, + qps: 100, + reqCPU: 100 * float64(time.Millisecond)}, + }, + sr.RebalanceObjective(ctx).ToDimension(), + ) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(0.05) sr.chooseRangeToRebalance(ctx, rctx) trace := finishAndGetRecording() @@ -1239,7 +1313,8 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { NodeID: 1, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 12000, + QueriesPerSecond: 12000, + StoreCPUPerSecond: 12000 * float64(time.Millisecond), }, }, { @@ -1248,7 +1323,8 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { NodeID: 2, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 10000, + QueriesPerSecond: 10000, + StoreCPUPerSecond: 10000 * float64(time.Millisecond), }, }, { @@ -1257,7 +1333,8 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { NodeID: 3, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 8000, + QueriesPerSecond: 8000, + StoreCPUPerSecond: 8000 * float64(time.Millisecond), }, }, { @@ -1266,7 +1343,8 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { NodeID: 4, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 200, + QueriesPerSecond: 200, + StoreCPUPerSecond: 200 * float64(time.Millisecond), }, }, { @@ -1275,19 +1353,21 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { NodeID: 5, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 100, + QueriesPerSecond: 100, + StoreCPUPerSecond: 100 * float64(time.Millisecond), }, }, } for _, tc := range []struct { - voters, expRebalancedVoters []roachpb.StoreID - QPS, rebalanceThreshold float64 - shouldRebalance bool + voters, expRebalancedVoters []roachpb.StoreID + QPS, reqCPU, rebalanceThreshold float64 + shouldRebalance bool }{ { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{3, 4, 5}, QPS: 5000, + reqCPU: 5000 * float64(time.Millisecond), rebalanceThreshold: 0.25, shouldRebalance: true, }, @@ -1295,6 +1375,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{5, 2, 3}, QPS: 5000, + reqCPU: 5000 * float64(time.Millisecond), rebalanceThreshold: 0.8, shouldRebalance: true, }, @@ -1302,12 +1383,14 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{3, 4, 5}, QPS: 1000, + reqCPU: 1000 * float64(time.Millisecond), rebalanceThreshold: 0.05, shouldRebalance: true, }, { voters: []roachpb.StoreID{1, 2, 3}, QPS: 5000, + reqCPU: 5000 * float64(time.Millisecond), // NB: This will lead to an overfull threshold of just above 12000. Thus, // no store should be considered overfull and we should not rebalance at // all. @@ -1317,6 +1400,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { { voters: []roachpb.StoreID{4}, QPS: 100, + reqCPU: 100 * float64(time.Millisecond), rebalanceThreshold: 0.01, // NB: We don't expect a rebalance here because the difference between s4 // and s5 is not high enough to justify a rebalance. @@ -1326,6 +1410,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{5, 2, 3}, QPS: 10000, + reqCPU: 10000 * float64(time.Millisecond), rebalanceThreshold: 0.01, // NB: s5 will be hotter than s1 after this move. shouldRebalance: true, @@ -1361,16 +1446,17 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { } s.cfg.DefaultSpanConfig.NumReplicas = int32(len(tc.voters)) - loadRanges(rr, s, []testRange{{voters: tc.voters, qps: tc.QPS}}, load.Queries) + loadRanges(rr, s, + []testRange{{voters: tc.voters, qps: tc.QPS, reqCPU: tc.reqCPU}}, + sr.RebalanceObjective(ctx).ToDimension(), + ) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(tc.rebalanceThreshold) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(tc.rebalanceThreshold) _, voterTargets, _ := sr.chooseRangeToRebalance(ctx, rctx) require.Len(t, voterTargets, len(tc.expRebalancedVoters)) @@ -1449,17 +1535,15 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { sr.getRaftStatusFn = func(r CandidateReplica) *raft.Status { return behindTestingRaftStatusFn(r) } + lbRebalanceDimension := sr.RebalanceObjective(ctx).ToDimension() // Load in a range with replicas on an overfull node, a slightly underfull // node, and a very underfull node. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100}}, load.Queries) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)), - ) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) repl := rctx.hottestRanges[0] _, target, _ := sr.chooseLeaseToTransfer(ctx, rctx) @@ -1472,17 +1556,14 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { // Then do the same, but for replica rebalancing. Make s5 an existing replica // that's behind, and see how a new replica is preferred as the leaseholder // over it. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100}}, load.Queries) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) hottestRanges = sr.replicaRankings.TopLoad() options = sr.scorerOptions(ctx) - rctx = sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)), - ) + rctx = sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(0.05) rctx.options.Deterministic = true repl = rctx.hottestRanges[0] @@ -1638,22 +1719,20 @@ func TestStoreRebalancerReadAmpCheck(t *testing.T) { rr := NewReplicaRankings() sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + lbRebalanceDimension := sr.RebalanceObjective(ctx).ToDimension() // Load in a range with replicas on an overfull node, a slightly underfull // node, and a very underfull node. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100}}, load.Queries) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)), - ) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) require.Greater(t, len(rctx.hottestRanges), 0) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: test.enforcement, L0SublevelThreshold: allocatorimpl.MaxL0SublevelThreshold} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(0.05) _, targetVoters, _ := sr.chooseRangeToRebalance(ctx, rctx) require.Equal(t, test.expectedTargets, targetVoters) diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index fa7e2b22d2e5..799bed94afa6 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -549,6 +549,7 @@ func (sc StoreCapacity) FractionUsed() float64 { func (sc StoreCapacity) Load() load.Load { dims := load.Vector{} dims[load.Queries] = sc.QueriesPerSecond + dims[load.StoreCPU] = sc.StoreCPUPerSecond return dims } diff --git a/pkg/roachpb/metadata.proto b/pkg/roachpb/metadata.proto index f92e704582e1..48cfb2224568 100644 --- a/pkg/roachpb/metadata.proto +++ b/pkg/roachpb/metadata.proto @@ -323,6 +323,10 @@ message StoreCapacity { // instances where overlapping node-binary versions within a cluster result // in this this field missing. optional int64 l0_sublevels = 12 [(gogoproto.nullable) = false]; + // store_cpu_per_second tracks the average store cpu nanoseconds per second. + // This is the sum of all the replica's cpu time on this store, which is + // tracked in replica stats. + optional double store_cpu_per_second = 14 [(gogoproto.nullable) = false, (gogoproto.customname) = "StoreCPUPerSecond"]; optional cockroach.util.admission.admissionpb.IOThreshold io_threshold = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "IOThreshold" ]; // bytes_per_replica and writes_per_replica contain percentiles for the // number of bytes and writes-per-second to each replica in the store.