diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 359bf0144dc5..20afab343254 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -395,6 +395,10 @@ const ( // V23_1AlterSystemSQLInstancesAddSqlAddr adds a sql_addr column to the // system.sql_instances table. V23_1AlterSystemSQLInstancesAddSQLAddr + // V23_1AllocatorCPUBalancing adds balancing CPU usage among stores using + // the allocator and store rebalancer. It assumes that at this version, + // stores now include their CPU in the StoreCapacity proto when gossiping. + V23_1AllocatorCPUBalancing // ************************************************* // Step (1): Add new versions here. @@ -678,6 +682,10 @@ var rawVersionsSingleton = keyedVersions{ Key: V23_1AlterSystemSQLInstancesAddSQLAddr, Version: roachpb.Version{Major: 22, Minor: 2, Internal: 28}, }, + { + Key: V23_1AllocatorCPUBalancing, + Version: roachpb.Version{Major: 22, Minor: 2, Internal: 30}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go b/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go index e570ed471a77..9ab8ee8bbdfe 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go @@ -27,6 +27,8 @@ func getLoadThreshold(dim load.Dimension, sv *settings.Values) float64 { switch dim { case load.Queries: return allocator.QPSRebalanceThreshold.Get(sv) + case load.StoreCPU: + return allocator.StoreCPURebalanceThreshold.Get(sv) default: panic(errors.AssertionFailedf("Unkown load dimension %d", dim)) } @@ -51,6 +53,8 @@ func getLoadMinThreshold(dim load.Dimension) float64 { switch dim { case load.Queries: return allocator.MinQPSThresholdDifference + case load.StoreCPU: + return allocator.MinStoreCPUThresholdDifference default: panic(errors.AssertionFailedf("Unkown load dimension %d", dim)) } @@ -76,6 +80,8 @@ func getLoadRebalanceMinRequiredDiff(dim load.Dimension, sv *settings.Values) fl switch dim { case load.Queries: return allocator.MinQPSDifferenceForTransfers.Get(sv) + case load.StoreCPU: + return allocator.MinStoreCPUDifferenceForTransfers default: panic(errors.AssertionFailedf("Unkown load dimension %d", dim)) } @@ -117,3 +123,13 @@ func MakeQPSOnlyDim(v float64) load.Load { dims[load.Queries] = v return dims } + +// SetAllDims returns a load vector with all dimensions filled in with the +// value given. +func SetAllDims(v float64) load.Load { + dims := load.Vector{} + for i := range dims { + dims[i] = v + } + return dims +} diff --git a/pkg/kv/kvserver/allocator/base.go b/pkg/kv/kvserver/allocator/base.go index 0f70f5b9a3bc..f266a9aba108 100644 --- a/pkg/kv/kvserver/allocator/base.go +++ b/pkg/kv/kvserver/allocator/base.go @@ -37,6 +37,30 @@ const ( // lightly loaded clusters. MinQPSThresholdDifference = 100 + // MinStoreCPUThresholdDifference is the minimum CPU difference from the cluster + // mean that this system should care about. The system won't attempt to + // take action if a store's CPU differs from the mean by less than this + // amount even if it is greater than the percentage threshold. This + // prevents too many lease transfers or range rebalances in lightly loaded + // clusters. + // + // NB: This represents 5% (1/20) utilization of 1 cpu on average. This + // number was arrived at from testing to minimize thrashing. This number is + // set independent of processor speed and assumes identical value of cpu + // time across all stores. i.e. all cpu's are identical. + MinStoreCPUThresholdDifference = float64(50 * time.Millisecond) + + // MinStoreCPUDifferenceForTransfers is the minimum CPU difference that a + // store rebalncer would care about to reconcile (via lease or replica + // rebalancing) between any two stores. + // + // NB: This is set to be two times the minimum threshold that a store needs + // to be above or below the mean to be considered overfull or underfull + // respectively. This is to make lease transfers and replica rebalances + // less sensistive to jitters in any given workload by introducing + // additional friction before taking these actions. + MinStoreCPUDifferenceForTransfers = 2 * MinStoreCPUThresholdDifference + // defaultLoadBasedRebalancingInterval is how frequently to check the store-level // balance of the cluster. defaultLoadBasedRebalancingInterval = time.Minute @@ -107,6 +131,27 @@ var QPSRebalanceThreshold = func() *settings.FloatSetting { return s }() +// StoreCPURebalanceThreshold is the minimum ratio of a store's cpu time to the mean +// cpu time at which that store is considered overfull or underfull of cpu +// usage. +var StoreCPURebalanceThreshold = func() *settings.FloatSetting { + s := settings.RegisterFloatSetting( + settings.SystemOnly, + "kv.allocator.store_cpu_rebalance_threshold", + "minimum fraction away from the mean a store's cpu usage can be before it is considered overfull or underfull", + 0.10, + settings.NonNegativeFloat, + func(f float64) error { + if f < 0.01 { + return errors.Errorf("cannot set kv.allocator.store_cpu_rebalance_threshold to less than 0.01") + } + return nil + }, + ) + s.SetVisibility(settings.Public) + return s +}() + // LoadBasedRebalanceInterval controls how frequently each store checks for // load-base lease/replica rebalancing opportunties. var LoadBasedRebalanceInterval = settings.RegisterPublicDurationSettingWithExplicitUnit( diff --git a/pkg/kv/kvserver/allocator/load/BUILD.bazel b/pkg/kv/kvserver/allocator/load/BUILD.bazel index 05873c3c89bf..a9cf0b67deaa 100644 --- a/pkg/kv/kvserver/allocator/load/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/load/BUILD.bazel @@ -10,6 +10,7 @@ go_library( ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load", visibility = ["//visibility:public"], + deps = ["//pkg/util/humanizeutil"], ) get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/allocator/load/dimension.go b/pkg/kv/kvserver/allocator/load/dimension.go index 3241f2ee1dae..1cc0557a599e 100644 --- a/pkg/kv/kvserver/allocator/load/dimension.go +++ b/pkg/kv/kvserver/allocator/load/dimension.go @@ -10,7 +10,12 @@ package load -import "fmt" +import ( + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" +) // Dimension is a singe dimension of load that a component may track. type Dimension int @@ -18,6 +23,8 @@ type Dimension int const ( // Queries refers to the number of queries. Queries Dimension = iota + // StoreCPU refers to the sum of replica cpu time on a store. + StoreCPU nDimensionsTyped nDimensions = int(nDimensionsTyped) @@ -28,6 +35,8 @@ func (d Dimension) String() string { switch d { case Queries: return "queries-per-second" + case StoreCPU: + return "store-cpu-per-second" default: panic(fmt.Sprintf("cannot name: unknown dimension with ordinal %d", d)) } @@ -38,6 +47,8 @@ func (d Dimension) Format(value float64) string { switch d { case Queries: return fmt.Sprintf("%.1f", value) + case StoreCPU: + return string(humanizeutil.Duration(time.Duration(int64(value)))) default: panic(fmt.Sprintf("cannot format value: unknown dimension with ordinal %d", d)) } diff --git a/pkg/kv/kvserver/allocator/range_usage_info.go b/pkg/kv/kvserver/allocator/range_usage_info.go index 45e2fa9568a8..d8fb929dcb95 100644 --- a/pkg/kv/kvserver/allocator/range_usage_info.go +++ b/pkg/kv/kvserver/allocator/range_usage_info.go @@ -39,6 +39,7 @@ type RangeRequestLocalityInfo struct { func (r RangeUsageInfo) Load() load.Load { dims := load.Vector{} dims[load.Queries] = r.QueriesPerSecond + dims[load.StoreCPU] = r.RequestCPUNanosPerSecond + r.RaftCPUNanosPerSecond return dims } @@ -47,5 +48,12 @@ func (r RangeUsageInfo) Load() load.Load { func (r RangeUsageInfo) TransferImpact() load.Load { dims := load.Vector{} dims[load.Queries] = r.QueriesPerSecond + // Only use the request recorded cpu. This assumes that all replicas will + // use the same amount of raft cpu - which may be dubious. + // + // TODO(kvoli): + // Investigate whether this is a valid assumption and look to separate out + // leaseholder vs replica cpu usage in accounting. + dims[load.StoreCPU] = r.RequestCPUNanosPerSecond return dims } diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 9dbd4978884e..936352875dad 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -577,11 +577,16 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance( // network). We can't update the local store at this time. return } + // Only apply the raft cpu delta on rebalance. This estimate assumes + // that the raft cpu usage is approximately equal across replicas for a + // range. + // TODO(kvoli): Validate this assumption or remove the estimated impact. switch changeType { case roachpb.ADD_VOTER, roachpb.ADD_NON_VOTER: detail.Desc.Capacity.RangeCount++ detail.Desc.Capacity.LogicalBytes += rangeUsageInfo.LogicalBytes detail.Desc.Capacity.WritesPerSecond += rangeUsageInfo.WritesPerSecond + detail.Desc.Capacity.StoreCPUPerSecond += rangeUsageInfo.RaftCPUNanosPerSecond case roachpb.REMOVE_VOTER, roachpb.REMOVE_NON_VOTER: detail.Desc.Capacity.RangeCount-- if detail.Desc.Capacity.LogicalBytes <= rangeUsageInfo.LogicalBytes { @@ -594,6 +599,11 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance( } else { detail.Desc.Capacity.WritesPerSecond -= rangeUsageInfo.WritesPerSecond } + if detail.Desc.Capacity.StoreCPUPerSecond <= rangeUsageInfo.RaftCPUNanosPerSecond { + detail.Desc.Capacity.StoreCPUPerSecond = 0 + } else { + detail.Desc.Capacity.StoreCPUPerSecond -= rangeUsageInfo.RaftCPUNanosPerSecond + } default: return } @@ -622,10 +632,15 @@ func (sp *StorePool) UpdateLocalStoreAfterRelocate( sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() + // Only apply the raft cpu delta on rebalance. This estimate assumes + // that the raft cpu usage is approximately equal across replicas for a + // range. + // TODO(kvoli): Validate this assumption or remove the estimated impact. updateTargets := func(targets []roachpb.ReplicationTarget) { for _, target := range targets { if toDetail := sp.GetStoreDetailLocked(target.StoreID); toDetail != nil { toDetail.Desc.Capacity.RangeCount++ + toDetail.Desc.Capacity.StoreCPUPerSecond += rangeUsageInfo.RaftCPUNanosPerSecond } } } @@ -633,6 +648,7 @@ func (sp *StorePool) UpdateLocalStoreAfterRelocate( for _, old := range previous { if toDetail := sp.GetStoreDetailLocked(old.StoreID); toDetail != nil { toDetail.Desc.Capacity.RangeCount-- + toDetail.Desc.Capacity.StoreCPUPerSecond -= rangeUsageInfo.RaftCPUNanosPerSecond } } } @@ -659,6 +675,16 @@ func (sp *StorePool) UpdateLocalStoresAfterLeaseTransfer( } else { fromDetail.Desc.Capacity.QueriesPerSecond -= rangeUsageInfo.QueriesPerSecond } + // Only apply the request cpu (leaseholder + follower-reads) delta on + // transfers. Note this does not correctly account for follower reads + // remaining on the prior leaseholder after lease transfer. Instead, + // only a cpu delta specific to the lease should be applied. + if fromDetail.Desc.Capacity.StoreCPUPerSecond <= rangeUsageInfo.RequestCPUNanosPerSecond { + fromDetail.Desc.Capacity.StoreCPUPerSecond = 0 + } else { + fromDetail.Desc.Capacity.StoreCPUPerSecond -= rangeUsageInfo.RequestCPUNanosPerSecond + } + sp.DetailsMu.StoreDetails[from] = &fromDetail } @@ -666,6 +692,7 @@ func (sp *StorePool) UpdateLocalStoresAfterLeaseTransfer( if toDetail.Desc != nil { toDetail.Desc.Capacity.LeaseCount++ toDetail.Desc.Capacity.QueriesPerSecond += rangeUsageInfo.QueriesPerSecond + toDetail.Desc.Capacity.StoreCPUPerSecond += rangeUsageInfo.RequestCPUNanosPerSecond sp.DetailsMu.StoreDetails[to] = &toDetail } } @@ -935,6 +962,10 @@ type StoreList struct { // to be rebalance targets. candidateLogicalBytes Stat + // CandidateStoreCPU tracks store-cpu-per-second stats for Stores that are + // eligible to be rebalance targets. + CandidateStoreCPU Stat + // CandidateQueriesPerSecond tracks queries-per-second stats for Stores that // are eligible to be rebalance targets. CandidateQueriesPerSecond Stat @@ -961,6 +992,7 @@ func MakeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { sl.CandidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond) sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond) sl.CandidateL0Sublevels.update(float64(desc.Capacity.L0Sublevels)) + sl.CandidateStoreCPU.update(desc.Capacity.StoreCPUPerSecond) } return sl } @@ -968,11 +1000,12 @@ func MakeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { func (sl StoreList) String() string { var buf bytes.Buffer fmt.Fprintf(&buf, - " candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v", + " candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v avg-store-cpu-per-second=%v", sl.CandidateRanges.Mean, sl.CandidateLeases.Mean, humanizeutil.IBytes(int64(sl.candidateLogicalBytes.Mean)), sl.CandidateQueriesPerSecond.Mean, + humanizeutil.Duration(time.Duration(int64(sl.CandidateStoreCPU.Mean))), ) if len(sl.Stores) > 0 { fmt.Fprintf(&buf, "\n") @@ -980,10 +1013,11 @@ func (sl StoreList) String() string { fmt.Fprintf(&buf, " ") } for _, desc := range sl.Stores { - fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f l0-sublevels=%d\n", + fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f store-cpu-per-second=%s l0-sublevels=%d\n", desc.StoreID, desc.Capacity.RangeCount, desc.Capacity.LeaseCount, humanizeutil.IBytes(desc.Capacity.LogicalBytes), desc.Capacity.QueriesPerSecond, + humanizeutil.Duration(time.Duration(int64(desc.Capacity.StoreCPUPerSecond))), desc.Capacity.L0Sublevels, ) } @@ -1010,6 +1044,7 @@ func (sl StoreList) ExcludeInvalid(constraints []roachpb.ConstraintsConjunction) func (sl StoreList) LoadMeans() load.Load { dims := load.Vector{} dims[load.Queries] = sl.CandidateQueriesPerSecond.Mean + dims[load.StoreCPU] = sl.CandidateStoreCPU.Mean return dims } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 0bd120cd9ec9..76a16acc417b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2910,11 +2910,11 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa var l0SublevelsMax int64 var totalQueriesPerSecond float64 var totalWritesPerSecond float64 + var totalStoreCPUTimePerSecond float64 replicaCount := s.metrics.ReplicaCount.Value() bytesPerReplica := make([]float64, 0, replicaCount) writesPerReplica := make([]float64, 0, replicaCount) - rankingsAccumulator := NewReplicaAccumulator( - LBRebalancingDimension(LoadBasedRebalancingDimension.Get(&s.ClusterSettings().SV)).ToDimension()) + rankingsAccumulator := NewReplicaAccumulator(LoadBasedRebalancingObjective(ctx, s.ClusterSettings()).ToDimension()) rankingsByTenantAccumulator := NewTenantReplicaAccumulator() // Query the current L0 sublevels and record the updated maximum to metrics. @@ -2932,6 +2932,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // starts? We can't easily have a countdown as its value changes like for // leases/replicas. // TODO(a-robinson): Calculate percentiles for qps? Get rid of other percentiles? + totalStoreCPUTimePerSecond += usage.RequestCPUNanosPerSecond + usage.RaftCPUNanosPerSecond totalQueriesPerSecond += usage.QueriesPerSecond totalWritesPerSecond += usage.WritesPerSecond writesPerReplica = append(writesPerReplica, usage.WritesPerSecond) @@ -2946,6 +2947,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa capacity.RangeCount = rangeCount capacity.LeaseCount = leaseCount capacity.LogicalBytes = logicalBytes + capacity.StoreCPUPerSecond = totalStoreCPUTimePerSecond capacity.QueriesPerSecond = totalQueriesPerSecond capacity.WritesPerSecond = totalWritesPerSecond capacity.L0Sublevels = l0SublevelsMax diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index ee704a6687c5..75b5455f2696 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -16,6 +16,7 @@ import ( "math/rand" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/contextutil" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -85,17 +87,16 @@ var LoadBasedRebalancingMode = settings.RegisterEnumSetting( // LoadBasedRebalancingDimension controls what dimension rebalancing takes into // account. -// NB: This value is set to private on purpose, as this cluster setting is a -// noop at the moment. var LoadBasedRebalancingDimension = settings.RegisterEnumSetting( settings.SystemOnly, "kv.allocator.load_based_rebalancing_dimension", "what dimension of load does rebalancing consider", - "qps", + "store_cpu", map[int64]string{ - int64(LBRebalancingQueries): "qps", + int64(LBRebalancingQueries): "qps", + int64(LBRebalancingStoreCPU): "store_cpu", }, -) +).WithPublic() // LBRebalancingMode controls if and when we do store-level rebalancing // based on load. @@ -119,12 +120,48 @@ type LBRebalancingDimension int64 const ( // LBRebalancingQueries is a rebalancing mode that balances queries (QPS). LBRebalancingQueries LBRebalancingDimension = iota + + // LBRebalancingStoreCPU is a rebalance mode that balances the store CPU + // usage. The store cpu usage is calculated as the sum of replica's cpu + // usage on the store. + LBRebalancingStoreCPU ) +// LoadBasedRebalancingObjective returns the load based rebalancing objective +// for the cluster. In cases where a first objective cannot be used, it will +// return a fallback. +func LoadBasedRebalancingObjective( + ctx context.Context, st *cluster.Settings, +) LBRebalancingDimension { + set := LoadBasedRebalancingDimension.Get(&st.SV) + // Queries should always be supported, return early if set. + if set == int64(LBRebalancingQueries) { + return LBRebalancingQueries + } + // When the cluster version hasn't finalized to 23.1, some unupgraded + // stores will not be populating additional fields in their StoreCapacity, + // in such cases we cannot balance another objective since the data may not + // exist. Fall back to QPS balancing. + if st.Version.IsActive(ctx, clusterversion.V23_1AllocatorCPUBalancing) { + return LBRebalancingQueries + } + // When the cpu timekeeping utility is unsupported on this aarch, the cpu + // usage cannot be gathered. Fall back to QPS balancing. + if !grunning.Supported() { + return LBRebalancingQueries + } + // The cluster is on a supported version and this local store is on aarch + // which supported the cpu timekeeping utility, return the cluster setting + // as is. + return LBRebalancingDimension(set) +} + func (d LBRebalancingDimension) ToDimension() load.Dimension { switch d { case LBRebalancingQueries: return load.Queries + case LBRebalancingStoreCPU: + return load.StoreCPU default: panic("unknown dimension") } @@ -233,6 +270,17 @@ type RebalanceContext struct { hottestRanges, rebalanceCandidates []CandidateReplica } +// RebalanceMode returns the mode of the store rebalancer. See +// LoadBasedRebalancingMode. +func (sr *StoreRebalancer) RebalanceMode() LBRebalancingMode { + return LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)) +} + +// RebalanceDimension returns the dimension the store rebalancer is balancing. +func (sr *StoreRebalancer) RebalanceObjective(ctx context.Context) LBRebalancingDimension { + return LoadBasedRebalancingObjective(ctx, sr.st) +} + // LessThanMaxThresholds returns true if the local store is below the maximum // threshold w.r.t the balanced load dimension, false otherwise. func (r *RebalanceContext) LessThanMaxThresholds() bool { @@ -270,14 +318,14 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { timer.Reset(jitteredInterval(allocator.LoadBasedRebalanceInterval.Get(&sr.st.SV))) } - mode := LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)) + mode := sr.RebalanceMode() if mode == LBRebalancingOff { continue } hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) sr.rebalanceStore(ctx, rctx) } }) @@ -289,7 +337,7 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { // Instead, we use our own implementation of `scorerOptions` that promotes load // balance. func (sr *StoreRebalancer) scorerOptions(ctx context.Context) *allocatorimpl.LoadScorerOptions { - lbDimension := LBRebalancingDimension(LoadBasedRebalancingDimension.Get(&sr.st.SV)).ToDimension() + lbDimension := sr.RebalanceObjective(ctx).ToDimension() return &allocatorimpl.LoadScorerOptions{ StoreHealthOptions: sr.allocator.StoreHealthOptions(ctx), Deterministic: sr.storePool.IsDeterministic(), @@ -323,7 +371,7 @@ func (sr *StoreRebalancer) NewRebalanceContext( return nil } - dims := LBRebalancingDimension(LoadBasedRebalancingDimension.Get(&sr.st.SV)).ToDimension() + dims := sr.RebalanceObjective(ctx).ToDimension() return &RebalanceContext{ LocalDesc: localDesc, loadDimension: dims, diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 66089fabf487..0567f6417961 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -17,6 +17,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" @@ -56,8 +57,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 3000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, + QueriesPerSecond: 3000, + StoreCPUPerSecond: 3000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, { @@ -74,8 +76,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 2800, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, + QueriesPerSecond: 2800, + StoreCPUPerSecond: 2800 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, }, }, { @@ -92,8 +95,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 2600, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 2, + QueriesPerSecond: 2600, + StoreCPUPerSecond: 2600 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 2, }, }, { @@ -110,8 +114,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 2400, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, + QueriesPerSecond: 2400, + StoreCPUPerSecond: 2400 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, { @@ -128,8 +133,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 2200, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 3, + QueriesPerSecond: 2200, + StoreCPUPerSecond: 2200 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 3, }, }, { @@ -146,8 +152,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 2000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 2, + QueriesPerSecond: 2000, + StoreCPUPerSecond: 2000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 2, }, }, { @@ -164,8 +171,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1800, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, + QueriesPerSecond: 1800, + StoreCPUPerSecond: 1800 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, { @@ -182,8 +190,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1600, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, + QueriesPerSecond: 1600, + StoreCPUPerSecond: 1600 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, }, }, { @@ -200,8 +209,9 @@ var ( }, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1400, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 3, + QueriesPerSecond: 1400, + StoreCPUPerSecond: 1400 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 3, }, }, } @@ -214,35 +224,40 @@ var ( StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1500, + QueriesPerSecond: 1500, + StoreCPUPerSecond: 1500 * float64(time.Millisecond), }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1300, + QueriesPerSecond: 1300, + StoreCPUPerSecond: 1300 * float64(time.Millisecond), }, }, { StoreID: 3, Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), }, }, { StoreID: 4, Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 900, + QueriesPerSecond: 900, + StoreCPUPerSecond: 900 * float64(time.Millisecond), }, }, { StoreID: 5, Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 500, + QueriesPerSecond: 500, + StoreCPUPerSecond: 500 * float64(time.Millisecond), }, }, } @@ -256,40 +271,45 @@ var ( StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, + QueriesPerSecond: 1500, + StoreCPUPerSecond: 1500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1300, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, + QueriesPerSecond: 1300, + StoreCPUPerSecond: 1300 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, { StoreID: 3, Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, }, }, { StoreID: 4, Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 900, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 20, + QueriesPerSecond: 900, + StoreCPUPerSecond: 900 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 20, }, }, { StoreID: 5, Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 25, + QueriesPerSecond: 500, + StoreCPUPerSecond: 500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 25, }, }, } @@ -301,40 +321,45 @@ var ( StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, }, }, { StoreID: 3, Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, { StoreID: 4, Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, }, }, { StoreID: 5, Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, } @@ -346,40 +371,45 @@ var ( StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 1500, + StoreCPUPerSecond: 1500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1300, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 1300, + StoreCPUPerSecond: 1300 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, { StoreID: 3, Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, { StoreID: 4, Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 900, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 900, + StoreCPUPerSecond: 900 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, { StoreID: 5, Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 500, + StoreCPUPerSecond: 500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, } @@ -391,40 +421,45 @@ var ( StoreID: 1, Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, + QueriesPerSecond: 1500, + StoreCPUPerSecond: 1500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, { StoreID: 2, Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1300, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 10, + QueriesPerSecond: 1300, + StoreCPUPerSecond: 1300 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 10, }, }, { StoreID: 3, Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 1000, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 50, + QueriesPerSecond: 1000, + StoreCPUPerSecond: 1000 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 50, }, }, { StoreID: 4, Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 900, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, + QueriesPerSecond: 900, + StoreCPUPerSecond: 900 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, { StoreID: 5, Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 500, - L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, + QueriesPerSecond: 500, + StoreCPUPerSecond: 500 * float64(time.Millisecond), + L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, } @@ -433,7 +468,7 @@ var ( type testRange struct { // The first storeID in the list will be the leaseholder. voters, nonVoters []roachpb.StoreID - qps float64 + qps, reqCPU float64 } func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange, loadDimension load.Dimension) { @@ -469,6 +504,7 @@ func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange, loadDimension repl.mu.state.Stats = &enginepb.MVCCStats{} repl.loadStats = rload.NewReplicaLoad(s.Clock(), nil) repl.loadStats.TestingSetStat(rload.Queries, r.qps) + repl.loadStats.TestingSetStat(rload.ReqCPUNanos, r.reqCPU) acc.AddReplica(candidateReplica{ Replica: repl, @@ -516,12 +552,13 @@ func TestChooseLeaseToTransfer(t *testing.T) { testCases := []struct { storeIDs []roachpb.StoreID - qps float64 + qps, reqCPU float64 expectTarget roachpb.StoreID }{ { storeIDs: []roachpb.StoreID{1}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 0, }, @@ -529,29 +566,37 @@ func TestChooseLeaseToTransfer(t *testing.T) { // (1300 for stores 1 and 2) is close enough to the current leaseholder's // QPS (1500). { - storeIDs: []roachpb.StoreID{1, 2}, - qps: 100, + storeIDs: []roachpb.StoreID{1, 2}, + qps: 100, + // NB: This is set +50 above qps, as the minimum threshold + // difference for store cpu is 50ms, while it is 100 qps for + // queries per second. + reqCPU: 150 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 2}, qps: 1000, + reqCPU: 1000 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 4, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{5, 1}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 0, }, { @@ -559,43 +604,54 @@ func TestChooseLeaseToTransfer(t *testing.T) { // be projected to have 1300 and 1200 qps respectively. storeIDs: []roachpb.StoreID{1, 3}, qps: 200, + reqCPU: 200 * float64(time.Millisecond), expectTarget: 3, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 200, + reqCPU: 200 * float64(time.Millisecond), expectTarget: 4, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 200, + reqCPU: 200 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 2}, qps: 500, + reqCPU: 500 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 3}, qps: 500, + reqCPU: 500 * float64(time.Millisecond), expectTarget: 0, }, // s1 without the lease would be projected to have 1000 qps, which is close // enough to s4's 900 qps. { - storeIDs: []roachpb.StoreID{1, 4}, - qps: 500, + storeIDs: []roachpb.StoreID{1, 4}, + qps: 500, + // NB: This is set +50 above qps, as the minimum threshold + // difference for store cpu is 50ms, while it is 100 qps for + // queries per second. + reqCPU: 550 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 500, + reqCPU: 500 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 600, + reqCPU: 600 * float64(time.Millisecond), expectTarget: 5, }, @@ -606,16 +662,19 @@ func TestChooseLeaseToTransfer(t *testing.T) { { storeIDs: []roachpb.StoreID{1, 5}, qps: 800, + reqCPU: 800 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 4, 5}, qps: 800, + reqCPU: 800 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 3, 4, 5}, qps: 800, + reqCPU: 800 * float64(time.Millisecond), expectTarget: 5, }, // NB: However, if s1 is projected to have 750 qps, we would expect a lease @@ -623,47 +682,56 @@ func TestChooseLeaseToTransfer(t *testing.T) { { storeIDs: []roachpb.StoreID{1, 3, 4, 5}, qps: 750, + reqCPU: 750 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 1.5, + reqCPU: 1.5 * float64(time.Millisecond), expectTarget: 4, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 1.5, + reqCPU: 1.5 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 1.49, + reqCPU: 1.49 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 1.49, + reqCPU: 1.49 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 2, 3, 4}, qps: 1500, + reqCPU: 1500 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 2, 3, 4, 5}, qps: 1500, + reqCPU: 1500 * float64(time.Millisecond), expectTarget: 0, }, } for _, tc := range testCases { t.Run("", func(t *testing.T) { - loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps}}, load.Queries) + lbRebalanceDimension := sr.RebalanceObjective(ctx).ToDimension() + loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps, reqCPU: tc.reqCPU}}, lbRebalanceDimension) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + options.LoadThreshold = allocatorimpl.SetAllDims(0.1) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) _, target, _ := sr.chooseLeaseToTransfer( ctx, rctx, @@ -677,16 +745,16 @@ func TestChooseLeaseToTransfer(t *testing.T) { } func randomNoLocalityStores( - numNodes int, qpsMultiplier float64, -) (stores []*roachpb.StoreDescriptor, qpsMean float64) { + numNodes int, loadMultiplier float64, +) (stores []*roachpb.StoreDescriptor, mean float64) { var totalQPS float64 for i := 1; i <= numNodes; i++ { - qps := rand.Float64() * qpsMultiplier + qps := rand.Float64() * loadMultiplier stores = append( stores, &roachpb.StoreDescriptor{ StoreID: roachpb.StoreID(i), Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, - Capacity: roachpb.StoreCapacity{QueriesPerSecond: qps}, + Capacity: roachpb.StoreCapacity{QueriesPerSecond: qps, StoreCPUPerSecond: qps * float64(time.Millisecond)}, }, ) totalQPS = totalQPS + qps @@ -713,6 +781,7 @@ func logSummary( log.Infof(ctx, "generated random store list:\n%s", summary.String()) } +// TODO(kvoli): map to store cpu dimension for testing. func TestChooseRangeToRebalanceRandom(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -726,6 +795,7 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { numNodes = 12 numDeadNodes = 3 perReplicaQPS = 100 + perReplicaReqCPU = 100 * float64(time.Millisecond) qpsRebalanceThreshold = 0.25 epsilon = 1 @@ -791,17 +861,15 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { } loadRanges( rr, s, []testRange{ - {voters: voterStores, nonVoters: nonVoterStores, qps: perReplicaQPS}, - }, load.Queries, + {voters: voterStores, nonVoters: nonVoterStores, qps: perReplicaQPS, reqCPU: perReplicaReqCPU}, + }, sr.RebalanceObjective(ctx).ToDimension(), ) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(qpsRebalanceThreshold) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(qpsRebalanceThreshold) _, voterTargets, nonVoterTargets := sr.chooseRangeToRebalance(ctx, rctx) var rebalancedVoterStores, rebalancedNonVoterStores []roachpb.StoreID @@ -1125,10 +1193,11 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { s.cfg.DefaultSpanConfig.VoterConstraints = tc.voterConstraints s.cfg.DefaultSpanConfig.LeasePreferences = tc.leasePreferences const testingQPS = float64(60) + const testingReqCPU = 60 * float64(time.Millisecond) loadRanges( rr, s, []testRange{ - {voters: tc.voters, nonVoters: tc.nonVoters, qps: testingQPS}, - }, load.Queries, + {voters: tc.voters, nonVoters: tc.nonVoters, qps: testingQPS, reqCPU: testingReqCPU}, + }, sr.RebalanceObjective(ctx).ToDimension(), ) hottestRanges := sr.replicaRankings.TopLoad() @@ -1136,7 +1205,7 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingLeasesAndReplicas) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: allocatorimpl.StoreHealthBlockRebalanceTo} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(0.05) _, voterTargets, nonVoterTargets := sr.chooseRangeToRebalance( ctx, @@ -1208,16 +1277,21 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) { // Load a fake hot range that's already on the best stores. We want to ensure // that the store rebalancer doesn't attempt to rebalance ranges that it // cannot find better rebalance opportunities for. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{localDesc.StoreID}, qps: 100}}, load.Queries) + loadRanges(rr, s, + []testRange{ + {voters: []roachpb.StoreID{localDesc.StoreID}, + qps: 100, + reqCPU: 100 * float64(time.Millisecond)}, + }, + sr.RebalanceObjective(ctx).ToDimension(), + ) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(0.05) sr.chooseRangeToRebalance(ctx, rctx) trace := finishAndGetRecording() @@ -1239,7 +1313,8 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { NodeID: 1, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 12000, + QueriesPerSecond: 12000, + StoreCPUPerSecond: 12000 * float64(time.Millisecond), }, }, { @@ -1248,7 +1323,8 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { NodeID: 2, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 10000, + QueriesPerSecond: 10000, + StoreCPUPerSecond: 10000 * float64(time.Millisecond), }, }, { @@ -1257,7 +1333,8 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { NodeID: 3, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 8000, + QueriesPerSecond: 8000, + StoreCPUPerSecond: 8000 * float64(time.Millisecond), }, }, { @@ -1266,7 +1343,8 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { NodeID: 4, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 200, + QueriesPerSecond: 200, + StoreCPUPerSecond: 200 * float64(time.Millisecond), }, }, { @@ -1275,19 +1353,21 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { NodeID: 5, }, Capacity: roachpb.StoreCapacity{ - QueriesPerSecond: 100, + QueriesPerSecond: 100, + StoreCPUPerSecond: 100 * float64(time.Millisecond), }, }, } for _, tc := range []struct { - voters, expRebalancedVoters []roachpb.StoreID - QPS, rebalanceThreshold float64 - shouldRebalance bool + voters, expRebalancedVoters []roachpb.StoreID + QPS, reqCPU, rebalanceThreshold float64 + shouldRebalance bool }{ { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{3, 4, 5}, QPS: 5000, + reqCPU: 5000 * float64(time.Millisecond), rebalanceThreshold: 0.25, shouldRebalance: true, }, @@ -1295,6 +1375,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{5, 2, 3}, QPS: 5000, + reqCPU: 5000 * float64(time.Millisecond), rebalanceThreshold: 0.8, shouldRebalance: true, }, @@ -1302,12 +1383,14 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{3, 4, 5}, QPS: 1000, + reqCPU: 1000 * float64(time.Millisecond), rebalanceThreshold: 0.05, shouldRebalance: true, }, { voters: []roachpb.StoreID{1, 2, 3}, QPS: 5000, + reqCPU: 5000 * float64(time.Millisecond), // NB: This will lead to an overfull threshold of just above 12000. Thus, // no store should be considered overfull and we should not rebalance at // all. @@ -1317,6 +1400,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { { voters: []roachpb.StoreID{4}, QPS: 100, + reqCPU: 100 * float64(time.Millisecond), rebalanceThreshold: 0.01, // NB: We don't expect a rebalance here because the difference between s4 // and s5 is not high enough to justify a rebalance. @@ -1326,6 +1410,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{5, 2, 3}, QPS: 10000, + reqCPU: 10000 * float64(time.Millisecond), rebalanceThreshold: 0.01, // NB: s5 will be hotter than s1 after this move. shouldRebalance: true, @@ -1361,16 +1446,17 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { } s.cfg.DefaultSpanConfig.NumReplicas = int32(len(tc.voters)) - loadRanges(rr, s, []testRange{{voters: tc.voters, qps: tc.QPS}}, load.Queries) + loadRanges(rr, s, + []testRange{{voters: tc.voters, qps: tc.QPS, reqCPU: tc.reqCPU}}, + sr.RebalanceObjective(ctx).ToDimension(), + ) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(tc.rebalanceThreshold) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(tc.rebalanceThreshold) _, voterTargets, _ := sr.chooseRangeToRebalance(ctx, rctx) require.Len(t, voterTargets, len(tc.expRebalancedVoters)) @@ -1449,17 +1535,15 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { sr.getRaftStatusFn = func(r CandidateReplica) *raft.Status { return behindTestingRaftStatusFn(r) } + lbRebalanceDimension := sr.RebalanceObjective(ctx).ToDimension() // Load in a range with replicas on an overfull node, a slightly underfull // node, and a very underfull node. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100}}, load.Queries) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)), - ) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) repl := rctx.hottestRanges[0] _, target, _ := sr.chooseLeaseToTransfer(ctx, rctx) @@ -1472,17 +1556,14 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { // Then do the same, but for replica rebalancing. Make s5 an existing replica // that's behind, and see how a new replica is preferred as the leaseholder // over it. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100}}, load.Queries) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) hottestRanges = sr.replicaRankings.TopLoad() options = sr.scorerOptions(ctx) - rctx = sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)), - ) + rctx = sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(0.05) rctx.options.Deterministic = true repl = rctx.hottestRanges[0] @@ -1638,22 +1719,20 @@ func TestStoreRebalancerReadAmpCheck(t *testing.T) { rr := NewReplicaRankings() sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + lbRebalanceDimension := sr.RebalanceObjective(ctx).ToDimension() // Load in a range with replicas on an overfull node, a slightly underfull // node, and a very underfull node. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100}}, load.Queries) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) hottestRanges := sr.replicaRankings.TopLoad() options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)), - ) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) require.Greater(t, len(rctx.hottestRanges), 0) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: test.enforcement, L0SublevelThreshold: allocatorimpl.MaxL0SublevelThreshold} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) + rctx.options.LoadThreshold = allocatorimpl.SetAllDims(0.05) _, targetVoters, _ := sr.chooseRangeToRebalance(ctx, rctx) require.Equal(t, test.expectedTargets, targetVoters) diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index fa7e2b22d2e5..799bed94afa6 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -549,6 +549,7 @@ func (sc StoreCapacity) FractionUsed() float64 { func (sc StoreCapacity) Load() load.Load { dims := load.Vector{} dims[load.Queries] = sc.QueriesPerSecond + dims[load.StoreCPU] = sc.StoreCPUPerSecond return dims } diff --git a/pkg/roachpb/metadata.proto b/pkg/roachpb/metadata.proto index f92e704582e1..48cfb2224568 100644 --- a/pkg/roachpb/metadata.proto +++ b/pkg/roachpb/metadata.proto @@ -323,6 +323,10 @@ message StoreCapacity { // instances where overlapping node-binary versions within a cluster result // in this this field missing. optional int64 l0_sublevels = 12 [(gogoproto.nullable) = false]; + // store_cpu_per_second tracks the average store cpu nanoseconds per second. + // This is the sum of all the replica's cpu time on this store, which is + // tracked in replica stats. + optional double store_cpu_per_second = 14 [(gogoproto.nullable) = false, (gogoproto.customname) = "StoreCPUPerSecond"]; optional cockroach.util.admission.admissionpb.IOThreshold io_threshold = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "IOThreshold" ]; // bytes_per_replica and writes_per_replica contain percentiles for the // number of bytes and writes-per-second to each replica in the store.