Skip to content

Commit

Permalink
Merge #109085
Browse files Browse the repository at this point in the history
109085: admission: elastic tokens for L0 compaction bandwidth r=bananabrick a=sumeerbhola

Currently we start shaping traffic at 5 sublevels (tokens are 2x the compaction bandwidth), and stabilize close to 10 sublevels (tokens are equal to compaction bandwidth). This token calculation is under the assumption that stabilizing at 10 sublevels represents an acceptably high utilization point for the resource, and can be sustained indefinitely. We know this assumption is incorrect in two ways:
- Having 10 sublevels results in worse performance for reads, so it isn't necessarily acceptable.
- This throughput can't be sustained indefinitely, since compaction debt is increasing.

We have ongoing work to increase the duration we can sustain this throughput (bullet 2), since in the general case we may have such load due to multiple tenants, or a combination of regular-priority and high-priority traffic. And the increased duration will allow the allocator to shift load.

Here we take a complementary approach to the ongoing work, which addresses both bullets above for the case where the resource saturation is being caused by lower priority (elastic traffic). This can happen for MVCC GC, index backfills etc. We generate a different set of tokens that start shaping when sublevels are 1 or greater (1.25 the compaction bandwidth at 1 sublevel). These tokens are deducted from for all traffic priorities, but elastic traffic must block if these tokens are unavailable. This earlier shaping causes L0 sublevels to stabilize around 3, and compaction debt to become flat at a reasonable backlog.

Epic: none

Release note: None

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Aug 25, 2023
2 parents 35397b9 + 08cc9b1 commit c215aba
Show file tree
Hide file tree
Showing 12 changed files with 747 additions and 407 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"admission_control_database_drop.go",
"admission_control_elastic_backup.go",
"admission_control_elastic_cdc.go",
"admission_control_elastic_io.go",
"admission_control_index_backfill.go",
"admission_control_index_overload.go",
"admission_control_intent_resolution.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/admission_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ func registerAdmission(r registry.Registry) {
registerIndexBackfill(r)
registerDatabaseDrop(r)
registerIntentResolutionOverload(r)
registerElasticIO(r)
}
142 changes: 142 additions & 0 deletions pkg/cmd/roachtest/tests/admission_control_elastic_io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/grafana"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// This test sets up a 1 node CRDB cluster on an 8vCPU machine, runs low
// priority kv0 that will overload the compaction throughput (out of L0) of
// the store. With admission control subjecting this low priority load to
// elastic IO tokens, the overload is limited.
func registerElasticIO(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "admission-control/elastic-io",
Owner: registry.OwnerAdmissionControl,
Timeout: time.Hour,
Benchmark: true,
// TODO(sumeer): Reduce to weekly after working well.
// Tags: registry.Tags(`weekly`),
// Second node is solely for Prometheus.
Cluster: r.MakeClusterSpec(2, spec.CPU(8)),
RequiresLicense: true,
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.IsLocal() {
t.Skip("IO overload test is not meant to run locally")
}
if c.Spec().NodeCount != 2 {
t.Fatalf("expected 2 nodes, found %d", c.Spec().NodeCount)
}
crdbNodes := c.Spec().NodeCount - 1
workAndPromNode := crdbNodes + 1

promCfg := &prometheus.Config{}
promCfg.WithPrometheusNode(c.Node(workAndPromNode).InstallNodes()[0]).
WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithGrafanaDashboardJSON(grafana.ChangefeedAdmissionControlGrafana)
err := c.StartGrafana(ctx, t.L(), promCfg)
require.NoError(t, err)
promClient, err := clusterstats.SetupCollectorPromClient(ctx, c, t.L(), promCfg)
require.NoError(t, err)
statCollector := clusterstats.NewStatsCollector(ctx, promClient)

c.Put(ctx, t.Cockroach(), "./cockroach", c.Range(1, crdbNodes))
c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(workAndPromNode))
startOpts := option.DefaultStartOptsNoBackups()
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
"--vmodule=io_load_listener=2")
settings := install.MakeClusterSettings()
c.Start(ctx, t.L(), startOpts, settings, c.Range(1, crdbNodes))
setAdmissionControl(ctx, t, c, true)
duration := 30 * time.Minute
t.Status("running workload")
m := c.NewMonitor(ctx, c.Range(1, crdbNodes))
m.Go(func(ctx context.Context) error {
dur := " --duration=" + duration.String()
url := fmt.Sprintf(" {pgurl:1-%d}", crdbNodes)
cmd := "./workload run kv --init --histograms=perf/stats.json --concurrency=512 " +
"--splits=1000 --read-percent=0 --min-block-bytes=65536 --max-block-bytes=65536 " +
"--background-qos=true --tolerate-errors" + dur + url
c.Run(ctx, c.Node(workAndPromNode), cmd)
return nil
})
m.Go(func(ctx context.Context) error {
const subLevelMetric = "storage_l0_sublevels"
getMetricVal := func(metricName string) (float64, error) {
point, err := statCollector.CollectPoint(ctx, t.L(), timeutil.Now(), metricName)
if err != nil {
t.L().Errorf("could not query prom %s", err.Error())
return 0, err
}
const labelName = "store"
val := point[labelName]
if len(val) != 1 {
err = errors.Errorf(
"unexpected number %d of points for metric %s", len(val), metricName)
t.L().Errorf("%s", err.Error())
return 0, err

}
for storeID, v := range val {
t.L().Printf("%s(store=%s): %f", metricName, storeID, v.Value)
return v.Value, nil
}
// Unreachable.
panic("unreachable")
}
now := timeutil.Now()
endTime := now.Add(duration)
// We typically see fluctuations from 1 to 5 sub-levels because the
// elastic IO token logic gives 1.25*compaction-bandwidth tokens at 1
// sub-level and 0.75*compaction-bandwidth at 5 sub-levels, with 5
// sub-levels being very rare. We leave some breathing room and pick a
// threshold of greater than 7 to fail the test. If elastic tokens are
// not working, the threshold of 7 will be easily breached, since
// regular tokens allow sub-levels to exceed 10.
const subLevelThreshold = 7
// Sleep initially for stability to be achieved, before measuring.
time.Sleep(5 * time.Minute)
for {
time.Sleep(30 * time.Second)
val, err := getMetricVal(subLevelMetric)
if err != nil {
continue
}
if val > subLevelThreshold {
t.Fatalf("sub-level count %f exceeded threshold", val)
}
if timeutil.Now().After(endTime) {
return nil
}
}
})
m.Wait()
},
})
}
14 changes: 8 additions & 6 deletions pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,15 @@ type granterWithIOTokens interface {
// provided by tokens. This method needs to be called periodically.
// {io, elasticDiskBandwidth}TokensCapacity is the ceiling up to which we allow
// elastic or disk bandwidth tokens to accumulate. The return value is the
// number of used tokens in the interval since the prior call to this method.
// Note that tokensUsed can be negative, though that will be rare, since it is
// possible for tokens to be returned.
// number of used tokens in the interval since the prior call to this method
// (and the tokens used by elastic work). Note that tokensUsed* can be
// negative, though that will be rare, since it is possible for tokens to be
// returned.
setAvailableTokens(
ioTokens int64, elasticDiskBandwidthTokens int64,
ioTokensCapacity int64, elasticDiskBandwidthTokensCapacity int64,
) (tokensUsed int64)
ioTokens int64, elasticIOTokens int64, elasticDiskBandwidthTokens int64,
ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskBandwidthTokensCapacity int64,
lastTick bool,
) (tokensUsed int64, tokensUsedByElasticWork int64)
// getDiskTokensUsedAndReset returns the disk bandwidth tokens used
// since the last such call.
getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64
Expand Down
9 changes: 8 additions & 1 deletion pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type StoreGrantCoordinators struct {
makeStoreRequesterFunc makeStoreRequesterFunc
kvIOTokensExhaustedDuration *metric.Counter
kvIOTokensAvailable *metric.Gauge
kvElasticIOTokensAvailable *metric.Gauge
kvIOTokensTookWithoutPermission *metric.Counter
kvIOTotalTokensTaken *metric.Counter

Expand Down Expand Up @@ -168,10 +169,12 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
startingIOTokens: unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval(),
ioTokensExhaustedDurationMetric: sgc.kvIOTokensExhaustedDuration,
availableTokensMetrics: sgc.kvIOTokensAvailable,
availableElasticTokensMetric: sgc.kvElasticIOTokensAvailable,
tookWithoutPermissionMetric: sgc.kvIOTokensTookWithoutPermission,
totalTokensTaken: sgc.kvIOTotalTokensTaken,
}
kvg.coordMu.availableIOTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.availableElasticIOTokens = kvg.coordMu.availableIOTokens
kvg.coordMu.elasticDiskBWTokensAvailable = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()

opts := makeWorkQueueOptions(KVWork)
Expand Down Expand Up @@ -466,6 +469,7 @@ func makeStoresGrantCoordinators(
kvIOTokensTookWithoutPermission: metrics.KVIOTokensTookWithoutPermission,
kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
workQueueMetrics: storeWorkQueueMetrics,
onLogEntryAdmitted: onLogEntryAdmitted,
knobs: knobs,
Expand Down Expand Up @@ -975,7 +979,8 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, _ rune) {
case *slotGranter:
s.Printf("%s%s: used: %d, total: %d", curSep, workKindString(kind), g.usedSlots, g.totalSlots)
case *kvStoreTokenGranter:
s.Printf(" io-avail: %d, elastic-disk-bw-tokens-avail: %d", g.coordMu.availableIOTokens,
s.Printf(" io-avail: %d(%d), elastic-disk-bw-tokens-avail: %d", g.coordMu.availableIOTokens,
g.coordMu.availableElasticIOTokens,
g.coordMu.elasticDiskBWTokensAvailable)
}
case SQLStatementLeafStartWork, SQLStatementRootStartWork:
Expand Down Expand Up @@ -1011,6 +1016,7 @@ type GrantCoordinatorMetrics struct {
KVIOTokensTookWithoutPermission *metric.Counter
KVIOTotalTokensTaken *metric.Counter
KVIOTokensAvailable *metric.Gauge
KVElasticIOTokensAvailable *metric.Gauge
SQLLeafStartUsedSlots *metric.Gauge
SQLRootStartUsedSlots *metric.Gauge
}
Expand All @@ -1033,6 +1039,7 @@ func makeGrantCoordinatorMetrics() GrantCoordinatorMetrics {
KVIOTokensTookWithoutPermission: metric.NewCounter(kvIONumIOTokensTookWithoutPermission),
KVIOTotalTokensTaken: metric.NewCounter(kvIOTotalTokensTaken),
KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable),
KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable),
}
return m
}
Expand Down
Loading

0 comments on commit c215aba

Please sign in to comment.