Skip to content

Commit

Permalink
kvserver: introduce cpu rebalancing
Browse files Browse the repository at this point in the history
This patch allows the store rebalancer to use CPU in place of QPS when
balancing load on a cluster. This patch adds `cpu` as an option with the
cluster setting:

`kv.allocator.load_based_rebalancing.objective`

When set to `cpu`, rather than `qps`. The store rebalancer will perform
a mostly identical function, however, it will target balancing the sum
of all replica's cpu time on each store, rather than qps. The default
remains as `qps` here.

Similar to QPS, the rebalance threshold can be set to allow controlling
the range above and below the mean store CPU is considered imbalanced,
either overfull or underfull respectively:

`kv.allocator.cpu_rebalance_threshold`: 0.1

In order to manage with mixed versions during upgrade and some
architectures not supporting the cpu sampling method, a rebalance
objective manager is introduced in `rebalance_objective.go`. The manager
mediates access to the rebalance objective and overwrites it in cases
where the objective set in the cluster setting cannot be supported.

resolves: #95380

Release note (ops change): Add option to balance cpu time (cpu)
instead of queries per second (qps) among stores in a cluster. This is
done by setting `kv.allocator.load_based_rebalancing.objective='cpu'`.
`kv.allocator.cpu_rebalance_threshold` is also added, similar to
`kv.allocator.qps_rebalance_threshold` to control the target range for
store cpu above and below the cluster mean.
  • Loading branch information
kvoli committed Feb 9, 2023
1 parent 1529670 commit c28ed6b
Show file tree
Hide file tree
Showing 22 changed files with 1,225 additions and 270 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-38 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-40 set the active cluster version in the format '<major>.<minor>'
4 changes: 3 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
<tr><td><div id="setting-jobs-retention-time" class="anchored"><code>jobs.retention_time</code></div></td><td>duration</td><td><code>336h0m0s</code></td><td>the amount of time to retain records for completed jobs before</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-lease-rebalancing-enabled" class="anchored"><code>kv.allocator.load_based_lease_rebalancing.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to enable rebalancing of range leases based on load and latency</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing" class="anchored"><code>kv.allocator.load_based_rebalancing</code></div></td><td>enumeration</td><td><code>leases and replicas</code></td><td>whether to rebalance based on the distribution of load across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing-objective" class="anchored"><code>kv.allocator.load_based_rebalancing.objective</code></div></td><td>enumeration</td><td><code>qps</code></td><td>what objective does the cluster use to rebalance; if set to `qps` the cluster will attempt to balance qps among stores, if set to `cpu` the cluster will attempt to balance cpu usage among stores [qps = 0, cpu = 1]</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing-interval" class="anchored"><code>kv.allocator.load_based_rebalancing_interval</code></div></td><td>duration</td><td><code>1m0s</code></td><td>the rough interval at which each store will check for load-based lease / replica rebalancing opportunities</td></tr>
<tr><td><div id="setting-kv-allocator-qps-rebalance-threshold" class="anchored"><code>kv.allocator.qps_rebalance_threshold</code></div></td><td>float</td><td><code>0.1</code></td><td>minimum fraction away from the mean a store&#39;s QPS (such as queries per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><div id="setting-kv-allocator-range-rebalance-threshold" class="anchored"><code>kv.allocator.range_rebalance_threshold</code></div></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store&#39;s range count can be before it is considered overfull or underfull</td></tr>
<tr><td><div id="setting-kv-allocator-store-cpu-rebalance-threshold" class="anchored"><code>kv.allocator.store_cpu_rebalance_threshold</code></div></td><td>float</td><td><code>0.1</code></td><td>minimum fraction away from the mean a store&#39;s cpu usage can be before it is considered overfull or underfull</td></tr>
<tr><td><div id="setting-kv-bulk-io-write-max-rate" class="anchored"><code>kv.bulk_io_write.max_rate</code></div></td><td>byte size</td><td><code>1.0 TiB</code></td><td>the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops</td></tr>
<tr><td><div id="setting-kv-bulk-sst-max-allowed-overage" class="anchored"><code>kv.bulk_sst.max_allowed_overage</code></div></td><td>byte size</td><td><code>64 MiB</code></td><td>if positive, allowed size in excess of target size for SSTs from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory</td></tr>
<tr><td><div id="setting-kv-bulk-sst-target-size" class="anchored"><code>kv.bulk_sst.target_size</code></div></td><td>byte size</td><td><code>16 MiB</code></td><td>target size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory</td></tr>
Expand Down Expand Up @@ -236,6 +238,6 @@
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-38</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-40</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
9 changes: 8 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ const (
// responsible for polling the jobs table for metrics.
V23_1_CreateJobsMetricsPollingJob

// V23_1AllocatorCPUBalancing adds balancing CPU usage among stores using
// the allocator and store rebalancer. It assumes that at this version,
// stores now include their CPU in the StoreCapacity proto when gossiping.
V23_1AllocatorCPUBalancing
// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -708,7 +712,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1_CreateJobsMetricsPollingJob,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 38},
},

{
Key: V23_1AllocatorCPUBalancing,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 40},
},
// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"raft_transport_metrics.go",
"raft_truncator_replica.go",
"range_log.go",
"rebalance_objective.go",
"replica.go",
"replica_app_batch.go",
"replica_application_cmd.go",
Expand Down Expand Up @@ -273,6 +274,7 @@ go_test(
"raft_transport_test.go",
"raft_transport_unit_test.go",
"range_log_test.go",
"rebalance_objective_test.go",
"replica_application_cmd_buf_test.go",
"replica_application_state_machine_test.go",
"replica_batch_updates_test.go",
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func getLoadThreshold(dim load.Dimension, sv *settings.Values) float64 {
switch dim {
case load.Queries:
return allocator.QPSRebalanceThreshold.Get(sv)
case load.CPU:
return allocator.CPURebalanceThreshold.Get(sv)
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand All @@ -51,6 +53,8 @@ func getLoadMinThreshold(dim load.Dimension) float64 {
switch dim {
case load.Queries:
return allocator.MinQPSThresholdDifference
case load.CPU:
return allocator.MinCPUThresholdDifference
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand All @@ -76,6 +80,8 @@ func getLoadRebalanceMinRequiredDiff(dim load.Dimension, sv *settings.Values) fl
switch dim {
case load.Queries:
return allocator.MinQPSDifferenceForTransfers.Get(sv)
case load.CPU:
return allocator.MinCPUDifferenceForTransfers
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand Down Expand Up @@ -117,3 +123,13 @@ func MakeQPSOnlyDim(v float64) load.Load {
dims[load.Queries] = v
return dims
}

// WithAllDims returns a load vector with all dimensions filled in with the
// value given.
func WithAllDims(v float64) load.Load {
dims := load.Vector{}
for i := range dims {
dims[i] = v
}
return dims
}
45 changes: 45 additions & 0 deletions pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@ const (
// lightly loaded clusters.
MinQPSThresholdDifference = 100

// MinCPUThresholdDifference is the minimum CPU difference from the cluster
// mean that this system should care about. The system won't attempt to
// take action if a store's CPU differs from the mean by less than this
// amount even if it is greater than the percentage threshold. This
// prevents too many lease transfers or range rebalances in lightly loaded
// clusters.
//
// NB: This represents 5% (1/20) utilization of 1 cpu on average. This
// number was arrived at from testing to minimize thrashing. This number is
// set independent of processor speed and assumes identical value of cpu
// time across all stores. i.e. all cpu's are identical.
MinCPUThresholdDifference = float64(50 * time.Millisecond)

// MinCPUDifferenceForTransfers is the minimum CPU difference that a
// store rebalncer would care about to reconcile (via lease or replica
// rebalancing) between any two stores.
//
// NB: This is set to be two times the minimum threshold that a store needs
// to be above or below the mean to be considered overfull or underfull
// respectively. This is to make lease transfers and replica rebalances
// less sensistive to jitters in any given workload by introducing
// additional friction before taking these actions.
MinCPUDifferenceForTransfers = 2 * MinCPUThresholdDifference

// defaultLoadBasedRebalancingInterval is how frequently to check the store-level
// balance of the cluster.
defaultLoadBasedRebalancingInterval = time.Minute
Expand Down Expand Up @@ -107,6 +131,27 @@ var QPSRebalanceThreshold = func() *settings.FloatSetting {
return s
}()

// CPURebalanceThreshold is the minimum ratio of a store's cpu time to the mean
// cpu time at which that store is considered overfull or underfull of cpu
// usage.
var CPURebalanceThreshold = func() *settings.FloatSetting {
s := settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.allocator.store_cpu_rebalance_threshold",
"minimum fraction away from the mean a store's cpu usage can be before it is considered overfull or underfull",
0.10,
settings.NonNegativeFloat,
func(f float64) error {
if f < 0.01 {
return errors.Errorf("cannot set kv.allocator.store_cpu_rebalance_threshold to less than 0.01")
}
return nil
},
)
s.SetVisibility(settings.Public)
return s
}()

// LoadBasedRebalanceInterval controls how frequently each store checks for
// load-base lease/replica rebalancing opportunties.
var LoadBasedRebalanceInterval = settings.RegisterPublicDurationSettingWithExplicitUnit(
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/load/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load",
visibility = ["//visibility:public"],
deps = ["//pkg/util/humanizeutil"],
)

get_x_data(name = "get_x_data")
13 changes: 12 additions & 1 deletion pkg/kv/kvserver/allocator/load/dimension.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@

package load

import "fmt"
import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
)

// Dimension is a singe dimension of load that a component may track.
type Dimension int

const (
// Queries refers to the number of queries.
Queries Dimension = iota
// CPU refers to the cpu time (ns) used in processing.
CPU

nDimensionsTyped
nDimensions = int(nDimensionsTyped)
Expand All @@ -28,6 +35,8 @@ func (d Dimension) String() string {
switch d {
case Queries:
return "queries-per-second"
case CPU:
return "cpu-per-second"
default:
panic(fmt.Sprintf("cannot name: unknown dimension with ordinal %d", d))
}
Expand All @@ -38,6 +47,8 @@ func (d Dimension) Format(value float64) string {
switch d {
case Queries:
return fmt.Sprintf("%.1f", value)
case CPU:
return string(humanizeutil.Duration(time.Duration(int64(value))))
default:
panic(fmt.Sprintf("cannot format value: unknown dimension with ordinal %d", d))
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/allocator/load/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,29 @@ func ElementWiseProduct(a, b Load) Load {
return bimap(a, b, func(ai, bi float64) float64 { return ai * bi })
}

// Scale applies the factor given against every dimension.
func Scale(l Load, factor float64) Load {
return nmap(l, func(_ Dimension, li float64) float64 { return li * factor })
}

// Set returns a new Load with every dimension equal to the value given.
func Set(val float64) Load {
l := Vector{}
return nmap(l, func(_ Dimension, li float64) float64 { return val })
}

func bimap(a, b Load, op func(ai, bi float64) float64) Load {
mapped := Vector{}
for dim := Dimension(0); dim < Dimension(nDimensions); dim++ {
mapped[dim] = op(a.Dim(dim), b.Dim(dim))
}
return mapped
}

func nmap(l Load, op func(d Dimension, li float64) float64) Load {
mapped := Vector{}
for dim := Dimension(0); dim < Dimension(nDimensions); dim++ {
mapped[dim] = op(dim, l.Dim(dim))
}
return mapped
}
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/allocator/range_usage_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type RangeRequestLocalityInfo struct {
func (r RangeUsageInfo) Load() load.Load {
dims := load.Vector{}
dims[load.Queries] = r.QueriesPerSecond
dims[load.CPU] = r.RequestCPUNanosPerSecond + r.RaftCPUNanosPerSecond
return dims
}

Expand All @@ -50,5 +51,11 @@ func (r RangeUsageInfo) Load() load.Load {
func (r RangeUsageInfo) TransferImpact() load.Load {
dims := load.Vector{}
dims[load.Queries] = r.QueriesPerSecond
// Only use the request recorded cpu. This assumes that all replicas will
// use the same amount of raft cpu - which may be dubious.
//
// TODO(kvoli): Look to separate out leaseholder vs replica cpu usage in
// accounting to account for follower reads if able.
dims[load.CPU] = r.RequestCPUNanosPerSecond
return dims
}
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/override_store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,10 @@ func (o *OverrideStorePool) UpdateLocalStoreAfterRelocate(
_ allocator.RangeUsageInfo,
) {
}

// SetOnCapacityChange installs a callback to be called when any store
// capacity changes in the storepool. This currently doesn't consider local
// updates (UpdateLocalStoreAfterRelocate, UpdateLocalStoreAfterRebalance,
// UpdateLocalStoresAfterLeaseTransfer) as capacity changes.
func (o *OverrideStorePool) SetOnCapacityChange(fn CapacityChangeFn) {
}
Loading

0 comments on commit c28ed6b

Please sign in to comment.