Skip to content

Commit

Permalink
Merge #102824
Browse files Browse the repository at this point in the history
102824: roachtest: assert on CPU in rebalance/by-load  r=aliher1911 a=kvoli

The `rebalance/by-load/replicas` roachtests periodically flake due to a
known limitation, where a cluster with a small number of ranges may not
be properly balanced due to heterogeneous localities (including
multi-store) #88829.

This PR updates the total number of ranges from 1 per-store, to 5
per-store for `rebalance/by-load/*` roachtests.

The roachtest asserted on the lease count, as a proxy for load, assuming
that the load evenly hits each lease for the created ranges. However,
the principal indicator of load in a read heavy workload is CPU.

This PR updates the test assertion to require that every store's CPU
is within 10% of the cluster mean. The test assertion previously
required that the max-min lease count delta was 0, when no outside
splits occurred; or 1 when the number of ranges was greater than the
number of stores.

The logging format is updated for easier debugging:

```
cpu outside bounds mean=23.9 tolerance=10.0% (±2.4) bounds=[21.5, 26.3]
   below  = []
   within = [s1: 22 (-6.7%), s2: 22 (-6.3%)]
   above  = [s3: 26 (+13.0%)]
...
cpu within bounds mean=25.7 tolerance=10.0% (±2.6) bounds=[23.2, 28.3]
   stores=[s1: 24 (-3.0%), s2: 24 (-2.9%), s3: 27 (+5.9%)]
```

resolves: #102801
resolves: #102823

Release note: None

Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
craig[bot] and kvoli committed May 9, 2023
2 parents 76c651d + 4d21913 commit 03e4c91
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 87 deletions.
239 changes: 153 additions & 86 deletions pkg/cmd/roachtest/tests/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ package tests

import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"runtime"
"sort"
"strings"
"time"

Expand All @@ -27,22 +25,37 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

const (
// storeToRangeFactor is the number of ranges to create per store in the
// cluster.
storeToRangeFactor = 5
// meanCPUTolerance is the tolerance applied when checking normalized (0-100)
// CPU percent utilization of stores against the mean. In multi-store tests,
// the same CPU utilization will be reported for stores on the same node. The
// acceptable range for CPU w.r.t the mean is:
//
// mean_tolerance = mean * meanCPUTolerance
// [mean - mean_tolerance, mean + mean_tolerance].
meanCPUTolerance = 0.1
// statSamplePeriod is the period at which timeseries stats are sampled.
statSamplePeriod = 10 * time.Second
)

func registerRebalanceLoad(r registry.Registry) {
// This test creates a single table for kv to use and splits the table to
// have one range for every node in the cluster. Because even brand new
// clusters start with 20+ ranges in them, the number of new ranges in kv's
// table is small enough that it typically won't trigger rebalancing of
// leases in the cluster based on lease count alone. We let kv generate a lot
// of load against the ranges such that we'd expect load-based rebalancing to
// distribute the load evenly across the nodes in the cluster.
// have 5 ranges for every node in the cluster. Because even brand new
// clusters start with 40+ ranges in them, the number of new ranges in kv's
// table is small enough that it typically won't trigger significant
// rebalancing of leases in the cluster based on lease count alone. We let kv
// generate a lot of load against the ranges such that we'd expect load-based
// rebalancing to distribute the load evenly across the nodes in the cluster.
rebalanceLoadRun := func(
ctx context.Context,
t test.Test,
Expand All @@ -52,15 +65,24 @@ func registerRebalanceLoad(r registry.Registry) {
concurrency int,
mixedVersion bool,
) {
startOpts := option.DefaultStartOpts()
// This test asserts on the distribution of CPU utilization between nodes
// in the cluster, having backups also running could lead to unrelated
// flakes - disable backup schedule.
startOpts := option.DefaultStartOptsNoBackups()
roachNodes := c.Range(1, c.Spec().NodeCount-1)
appNode := c.Node(c.Spec().NodeCount)
numStores := len(roachNodes)
numNodes := len(roachNodes)
numStores := numNodes
if c.Spec().SSDs > 1 && !c.Spec().RAID0 {
numStores *= c.Spec().SSDs
startOpts.RoachprodOpts.StoreCount = c.Spec().SSDs
}
splits := numStores - 1 // n-1 splits => n ranges => 1 lease per store
// We want each store to end up with approximately storeToRangeFactor
// (factor) leases such that the CPU load is evenly spread, e.g.
// (n * factor) -1 splits = factor * n ranges = factor leases per store
// Note that we only assert on the CPU of each store w.r.t the mean, not
// the lease count.
splits := (numStores * storeToRangeFactor) - 1
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
"--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5")
settings := install.MakeClusterSettings()
Expand All @@ -87,8 +109,8 @@ func registerRebalanceLoad(r registry.Registry) {
var m *errgroup.Group // see comment in version.go
m, ctx = errgroup.WithContext(ctx)

// Enable us to exit out of workload early when we achieve the desired
// lease balance. This drastically shortens the duration of the test in the
// Enable us to exit out of workload early when we achieve the desired CPU
// balance. This drastically shortens the duration of the test in the
// common case.
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -100,7 +122,7 @@ func registerRebalanceLoad(r registry.Registry) {
"--duration=%v {pgurl:1-%d}",
concurrency, maxDuration, len(roachNodes)))
if errors.Is(ctx.Err(), context.Canceled) {
// We got canceled either because lease balance was achieved or the
// We got canceled either because CPU balance was achieved or the
// other worker hit an error. In either case, it's not this worker's
// fault.
return nil
Expand All @@ -109,7 +131,7 @@ func registerRebalanceLoad(r registry.Registry) {
})

m.Go(func() error {
t.Status("checking for lease balance")
t.Status("checking for CPU balance")

db := c.Conn(ctx, t.L(), 1)
defer db.Close()
Expand All @@ -125,23 +147,37 @@ func registerRebalanceLoad(r registry.Registry) {
return err
}

for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; {
if done, err := isLoadEvenlyDistributed(t.L(), db, numStores); err != nil {
return err
} else if done {
t.Status("successfully achieved lease balance; waiting for kv to finish running")
cancel()
return nil
}
storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores)
if err != nil {
return err
}

var reason string
var done bool
for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; {
// Wait out the sample period initially to allow the timeseries to
// populate meaningful information for the test to query.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
case <-time.After(statSamplePeriod):
}

clusterStoresCPU, err := storeCPUFn(ctx)
if err != nil {
t.L().Printf("unable to get the cluster stores CPU %s\n", err.Error())
}

done, reason = isLoadEvenlyDistributed(clusterStoresCPU, meanCPUTolerance)
t.L().Printf("cpu %s", reason)
if done {
t.Status("successfully achieved CPU balance; waiting for kv to finish running")
cancel()
return nil
}
}

return fmt.Errorf("timed out before leases were evenly spread")
return errors.Errorf("CPU not evenly balanced after timeout: %s", reason)
})
if err := m.Wait(); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -236,79 +272,110 @@ func registerRebalanceLoad(r registry.Registry) {
)
}

func isLoadEvenlyDistributed(l *logger.Logger, db *gosql.DB, numStores int) (bool, error) {
rows, err := db.Query(
`SELECT lease_holder, count(*) ` +
`FROM [SHOW RANGES FROM TABLE kv.kv WITH DETAILS] ` +
`GROUP BY lease_holder;`)
if err != nil {
// TODO(rafi): Remove experimental_ranges query once we stop testing 19.1 or
// earlier.
if strings.Contains(err.Error(), "syntax error at or near \"ranges\"") {
rows, err = db.Query(
`SELECT lease_holder, count(*) ` +
`FROM [SHOW RANGES FROM TABLE kv.kv WITH DETAILS] ` +
`GROUP BY lease_holder;`)
}
}
// makeStoreCPUFn returns a function which can be called to gather the CPU of
// the cluster stores. When there are multiple stores per node, stores on the
// same node will report identical CPU.
func makeStoreCPUFn(
octx context.Context, c cluster.Cluster, t test.Test, numNodes, numStores int,
) (func(ctx context.Context) ([]float64, error), error) {
adminURLs, err := c.ExternalAdminUIAddr(octx, t.L(), c.Node(1))
if err != nil {
return false, err
return nil, err
}
defer rows.Close()
leaseCounts := make(map[int]int)
var rangeCount int
for rows.Next() {
var storeID, leaseCount int
if err := rows.Scan(&storeID, &leaseCount); err != nil {
return false, err
url := adminURLs[0]
startTime := timeutil.Now()
tsQueries := make([]tsQuery, numNodes)
for i := range tsQueries {
tsQueries[i] = tsQuery{
name: "cr.node.sys.cpu.combined.percent-normalized",
queryType: total,
sources: []string{fmt.Sprintf("%d", i+1)},
}
leaseCounts[storeID] = leaseCount
rangeCount += leaseCount
}

if len(leaseCounts) < numStores {
l.Printf("not all stores have a lease yet: %v\n", formatLeaseCounts(leaseCounts))
return false, nil
}
return func(ctx context.Context) ([]float64, error) {
now := timeutil.Now()
resp, err := getMetricsWithSamplePeriod(
url, startTime, now, statSamplePeriod, tsQueries)
if err != nil {
return nil, err
}

// The simple case is when ranges haven't split. We can require that every
// store has one lease.
if rangeCount == numStores {
for _, leaseCount := range leaseCounts {
if leaseCount != 1 {
l.Printf("uneven lease distribution: %s\n", formatLeaseCounts(leaseCounts))
return false, nil
// Assume that stores on the same node will have sequential store IDs e.g.
// when the stores per node is 2:
// node 1 = store 1, store 2 ... node N = store 2N-1, store 2N
storesPerNode := numStores / numNodes
storeCPUs := make([]float64, numStores)
for node, result := range resp.Results {
// Take the latest CPU data point only.
cpu := result.Datapoints[len(result.Datapoints)-1].Value
nodeIdx := node * storesPerNode
for storeOffset := 0; storeOffset < storesPerNode; storeOffset++ {
// The values will be a normalized float in [0,1.0], scale to a
// percentage [0,100].
storeCPUs[nodeIdx+storeOffset] = cpu * 100
}
}
l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts))
return true, nil
}
return storeCPUs, nil
}, nil
}

// For completeness, if leases have split, verify the leases per store don't
// differ by any more than 1.
leases := make([]int, 0, numStores)
for _, leaseCount := range leaseCounts {
leases = append(leases, leaseCount)
// isLoadEvenlyDistributed checks whether the load for the stores given are
// within tolerance of the mean. If the store loads are, true is returned as
// well as reason, otherwise false. The function expects the loads to be
// indexed to store IDs, see makeStoreCPUFn for example format.
func isLoadEvenlyDistributed(loads []float64, tolerance float64) (ok bool, reason string) {
mean := arithmeticMean(loads)
// If the mean is zero, there's nothing meaningful to assert on. Return early
// that the load isn't evenly distributed.
if mean == 0 {
return false, "no load: mean=0"
}
sort.Ints(leases)
if leases[0]+1 < leases[len(leases)-1] {
l.Printf("leases per store differ by more than one: %s\n", formatLeaseCounts(leaseCounts))
return false, nil

meanTolerance := mean * tolerance
lb := mean - meanTolerance
ub := mean + meanTolerance

// Partiton the loads into above, below and within the tolerance bounds of
// the load mean.
above, below, within := []int{}, []int{}, []int{}
for i, load := range loads {
storeID := i + 1
if load > ub {
above = append(above, storeID)
} else if load < lb {
below = append(below, storeID)
} else {
within = append(within, storeID)
}
}

l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts))
return true, nil
boundsStr := fmt.Sprintf("mean=%.1f tolerance=%.1f%% (±%.1f) bounds=[%.1f, %.1f]",
mean, 100*tolerance, meanTolerance, lb, ub)
if len(below) > 0 || len(above) > 0 {
ok = false
reason = fmt.Sprintf(
"outside bounds %s\n\tbelow = %s\n\twithin = %s\n\tabove = %s\n",
boundsStr,
formatLoads(below, loads, mean),
formatLoads(within, loads, mean),
formatLoads(above, loads, mean),
)
} else {
ok = true
reason = fmt.Sprintf("within bounds %s\n\tstores=%s\n",
boundsStr, formatLoads(within, loads, mean))
}
return
}

func formatLeaseCounts(counts map[int]int) string {
storeIDs := make([]int, 0, len(counts))
for storeID := range counts {
storeIDs = append(storeIDs, storeID)
}
sort.Ints(storeIDs)
strs := make([]string, 0, len(counts))
for _, storeID := range storeIDs {
strs = append(strs, fmt.Sprintf("s%d: %d", storeID, counts[storeID]))
func formatLoads(storeIDs []int, loads []float64, mean float64) string {
fmtLoads := make([]string, len(storeIDs))
for i, storeID := range storeIDs {
load := loads[storeID-1]
fmtLoads[i] = fmt.Sprintf("s%d: %d (%+3.1f%%)",
storeID, int(load), (load-mean)/mean*100,
)
}
return fmt.Sprintf("[%s]", strings.Join(strs, ", "))
return fmt.Sprintf("[%s]", strings.Join(fmtLoads, ", "))
}
14 changes: 13 additions & 1 deletion pkg/cmd/roachtest/tests/ts_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ const (
rate
)

// defaultSamplePeriod is the default sampling period for getMetrics.
const defaultSamplePeriod = time.Minute

type tsQuery struct {
name string
queryType tsQueryType
sources []string
}

func mustGetMetrics(
Expand All @@ -55,6 +59,12 @@ func mustGetMetrics(

func getMetrics(
adminURL string, start, end time.Time, tsQueries []tsQuery,
) (tspb.TimeSeriesQueryResponse, error) {
return getMetricsWithSamplePeriod(adminURL, start, end, defaultSamplePeriod, tsQueries)
}

func getMetricsWithSamplePeriod(
adminURL string, start, end time.Time, samplePeriod time.Duration, tsQueries []tsQuery,
) (tspb.TimeSeriesQueryResponse, error) {
url := "http://" + adminURL + "/ts/query"
queries := make([]tspb.Query, len(tsQueries))
Expand All @@ -65,13 +75,15 @@ func getMetrics(
Name: tsQueries[i].name,
Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(),
SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(),
Sources: tsQueries[i].sources,
}
case rate:
queries[i] = tspb.Query{
Name: tsQueries[i].name,
Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(),
SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(),
Derivative: tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE.Enum(),
Sources: tsQueries[i].sources,
}
default:
panic("unexpected")
Expand All @@ -83,7 +95,7 @@ func getMetrics(
// Ask for one minute intervals. We can't just ask for the whole hour
// because the time series query system does not support downsampling
// offsets.
SampleNanos: (1 * time.Minute).Nanoseconds(),
SampleNanos: samplePeriod.Nanoseconds(),
Queries: queries,
}
var response tspb.TimeSeriesQueryResponse
Expand Down

0 comments on commit 03e4c91

Please sign in to comment.