From af6ceaf5ede6b4a77643c0052955f9fd76c9840c Mon Sep 17 00:00:00 2001
From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com>
Date: Mon, 13 May 2024 21:12:16 +0000
Subject: [PATCH] admission: introduce elastic io token exhausted duration
metric
This patch adds a new metric `elastic_io_tokens_exhausted_duration.kv`.
This is similar to the existing
`admission.granter.io_tokens_exhausted_duration.kv`, but for elastic
traffic.
The patch also does some code refactoring to make it easier to use both
regular and elastic equivalent metrics.
Informs #121574.
Release note: None
---
docs/generated/metrics/metrics.html | 1 +
pkg/util/admission/grant_coordinator.go | 29 +++----
pkg/util/admission/granter.go | 108 +++++++++++++-----------
pkg/util/admission/granter_test.go | 1 -
4 files changed, 75 insertions(+), 64 deletions(-)
diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 1d426f6cd5c3..874ea555b301 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -72,6 +72,7 @@
STORAGE | admission.granter.cpu_load_long_period_duration.kv | Total duration when CPULoad was being called with a long period, in micros | Microseconds | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | admission.granter.cpu_load_short_period_duration.kv | Total duration when CPULoad was being called with a short period, in micros | Microseconds | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | admission.granter.elastic_io_tokens_available.kv | Number of tokens available | Tokens | GAUGE | COUNT | AVG | NONE |
+STORAGE | admission.granter.elastic_io_tokens_exhausted_duration.kv | Total duration when Elastic IO tokens were exhausted, in micros | Microseconds | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | admission.granter.io_tokens_available.kv | Number of tokens available | Tokens | GAUGE | COUNT | AVG | NONE |
STORAGE | admission.granter.io_tokens_bypassed.kv | Total number of tokens taken by work bypassing admission control (for example, follower writes without flow control) | Tokens | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | admission.granter.io_tokens_exhausted_duration.kv | Total duration when IO tokens were exhausted, in micros | Microseconds | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go
index de0fca684a20..ddd1f1d79642 100644
--- a/pkg/util/admission/grant_coordinator.go
+++ b/pkg/util/admission/grant_coordinator.go
@@ -52,9 +52,8 @@ type StoreGrantCoordinators struct {
settings *cluster.Settings
makeStoreRequesterFunc makeStoreRequesterFunc
- kvIOTokensExhaustedDuration *metric.Counter
- kvIOTokensAvailable *metric.Gauge
- kvElasticIOTokensAvailable *metric.Gauge
+ kvIOTokensExhaustedDuration [admissionpb.NumWorkClasses]*metric.Counter
+ kvIOTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge
kvIOTokensTaken *metric.Counter
kvIOTokensReturned *metric.Counter
kvIOTokensBypassed *metric.Counter
@@ -173,12 +172,11 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
startingIOTokens: unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval(),
ioTokensExhaustedDurationMetric: sgc.kvIOTokensExhaustedDuration,
availableTokensMetric: sgc.kvIOTokensAvailable,
- availableElasticTokensMetric: sgc.kvElasticIOTokensAvailable,
tokensTakenMetric: sgc.kvIOTokensTaken,
tokensReturnedMetric: sgc.kvIOTokensReturned,
}
- kvg.coordMu.availableIOTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
- kvg.coordMu.availableElasticIOTokens = kvg.coordMu.availableIOTokens
+ kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
+ kvg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] = kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]
kvg.coordMu.elasticDiskBWTokensAvailable = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
opts := makeWorkQueueOptions(KVWork)
@@ -482,7 +480,6 @@ func makeStoresGrantCoordinators(
kvIOTokensReturned: metrics.KVIOTokensReturned,
kvIOTokensBypassed: metrics.KVIOTokensBypassed,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
- kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
l0CompactedBytes: metrics.L0CompactedBytes,
l0TokensProduced: metrics.L0TokensProduced,
workQueueMetrics: storeWorkQueueMetrics,
@@ -994,8 +991,8 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, _ rune) {
case *slotGranter:
s.Printf("%s%s: used: %d, total: %d", curSep, kind, g.usedSlots, g.totalSlots)
case *kvStoreTokenGranter:
- s.Printf(" io-avail: %d(%d), elastic-disk-bw-tokens-avail: %d", g.coordMu.availableIOTokens,
- g.coordMu.availableElasticIOTokens,
+ s.Printf(" io-avail: %d(%d), elastic-disk-bw-tokens-avail: %d", g.coordMu.availableIOTokens[admissionpb.RegularWorkClass],
+ g.coordMu.availableIOTokens[admissionpb.ElasticWorkClass],
g.coordMu.elasticDiskBWTokensAvailable)
}
case SQLStatementLeafStartWork, SQLStatementRootStartWork:
@@ -1027,12 +1024,11 @@ type GrantCoordinatorMetrics struct {
KVSlotAdjusterIncrements *metric.Counter
KVSlotAdjusterDecrements *metric.Counter
// TODO(banabrick): Make these metrics per store.
- KVIOTokensExhaustedDuration *metric.Counter
KVIOTokensTaken *metric.Counter
KVIOTokensReturned *metric.Counter
KVIOTokensBypassed *metric.Counter
- KVIOTokensAvailable *metric.Gauge
- KVElasticIOTokensAvailable *metric.Gauge
+ KVIOTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge
+ KVIOTokensExhaustedDuration [admissionpb.NumWorkClasses]*metric.Counter
L0CompactedBytes *metric.Counter
L0TokensProduced *metric.Counter
SQLLeafStartUsedSlots *metric.Gauge
@@ -1051,17 +1047,20 @@ func makeGrantCoordinatorMetrics() GrantCoordinatorMetrics {
KVCPULoadLongPeriodDuration: metric.NewCounter(kvCPULoadLongPeriodDuration),
KVSlotAdjusterIncrements: metric.NewCounter(kvSlotAdjusterIncrements),
KVSlotAdjusterDecrements: metric.NewCounter(kvSlotAdjusterDecrements),
- KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration),
SQLLeafStartUsedSlots: metric.NewGauge(addName(SQLStatementLeafStartWork.String(), usedSlots)),
SQLRootStartUsedSlots: metric.NewGauge(addName(SQLStatementRootStartWork.String(), usedSlots)),
KVIOTokensTaken: metric.NewCounter(kvIOTokensTaken),
KVIOTokensReturned: metric.NewCounter(kvIOTokensReturned),
KVIOTokensBypassed: metric.NewCounter(kvIOTokensBypassed),
- KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable),
- KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable),
L0CompactedBytes: metric.NewCounter(l0CompactedBytes),
L0TokensProduced: metric.NewCounter(l0TokensProduced),
}
+ m.KVIOTokensAvailable[admissionpb.RegularWorkClass] = metric.NewGauge(kvIOTokensAvailable)
+ m.KVIOTokensAvailable[admissionpb.ElasticWorkClass] = metric.NewGauge(kvElasticIOTokensAvailable)
+ m.KVIOTokensExhaustedDuration = [admissionpb.NumWorkClasses]*metric.Counter{
+ metric.NewCounter(kvIOTokensExhaustedDuration),
+ metric.NewCounter(kvElasticIOTokensExhaustedDuration),
+ }
return m
}
diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go
index 059264d2696d..16ecb8179a3c 100644
--- a/pkg/util/admission/granter.go
+++ b/pkg/util/admission/granter.go
@@ -301,8 +301,7 @@ type kvStoreTokenGranter struct {
// work deducts from both availableIOTokens and availableElasticIOTokens.
// Regular work blocks if availableIOTokens is <= 0 and elastic work
// blocks if availableElasticIOTokens <= 0.
- availableIOTokens int64
- availableElasticIOTokens int64
+ availableIOTokens [admissionpb.NumWorkClasses]int64
elasticIOTokensUsedByElastic int64
// Disk bandwidth tokens.
elasticDiskBWTokensAvailable int64
@@ -310,17 +309,15 @@ type kvStoreTokenGranter struct {
diskBWTokensUsed [admissionpb.NumWorkClasses]int64
}
+ ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter
+ availableTokensMetric [admissionpb.NumWorkClasses]*metric.Gauge
+ exhaustedStart [admissionpb.NumWorkClasses]time.Time
// startingIOTokens is the number of tokens set by
// setAvailableTokens. It is used to compute the tokens used, by
// computing startingIOTokens-availableIOTokens.
- startingIOTokens int64
- ioTokensExhaustedDurationMetric *metric.Counter
- availableTokensMetric *metric.Gauge
- availableElasticTokensMetric *metric.Gauge
- tokensReturnedMetric *metric.Counter
- tokensTakenMetric *metric.Counter
-
- exhaustedStart time.Time
+ startingIOTokens int64
+ tokensReturnedMetric *metric.Counter
+ tokensTakenMetric *metric.Counter
// Estimation models.
l0WriteLM, l0IngestLM, ingestLM tokensLinearModel
@@ -402,14 +399,15 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
// (and not cause a performance isolation failure).
switch wc {
case admissionpb.RegularWorkClass:
- if sg.coordMu.availableIOTokens > 0 {
+ if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 {
sg.subtractTokensLocked(count, count, false)
sg.coordMu.diskBWTokensUsed[wc] += count
return grantSuccess
}
case admissionpb.ElasticWorkClass:
- if sg.coordMu.elasticDiskBWTokensAvailable > 0 && sg.coordMu.availableIOTokens > 0 &&
- sg.coordMu.availableElasticIOTokens > 0 {
+ if sg.coordMu.elasticDiskBWTokensAvailable > 0 &&
+ sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 &&
+ sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] > 0 {
sg.coordMu.elasticDiskBWTokensAvailable -= count
sg.subtractTokensLocked(count, count, false)
sg.coordMu.elasticIOTokensUsedByElastic += count
@@ -457,11 +455,8 @@ func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHan
func (sg *kvStoreTokenGranter) subtractTokensLocked(
count int64, elasticCount int64, settingAvailableTokens bool,
) {
- avail := sg.coordMu.availableIOTokens
- sg.coordMu.availableIOTokens -= count
- sg.coordMu.availableElasticIOTokens -= elasticCount
- sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens)
- sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens)
+ sg.subtractTokensLockedForWorkClass(admissionpb.RegularWorkClass, count, settingAvailableTokens)
+ sg.subtractTokensLockedForWorkClass(admissionpb.ElasticWorkClass, elasticCount, settingAvailableTokens)
if !settingAvailableTokens {
if count > 0 {
sg.tokensTakenMetric.Inc(count)
@@ -469,19 +464,27 @@ func (sg *kvStoreTokenGranter) subtractTokensLocked(
sg.tokensReturnedMetric.Inc(-count)
}
}
- if count > 0 && avail > 0 && sg.coordMu.availableIOTokens <= 0 {
+}
+
+func (sg *kvStoreTokenGranter) subtractTokensLockedForWorkClass(
+ wc admissionpb.WorkClass, count int64, settingAvailableTokens bool,
+) {
+ avail := sg.coordMu.availableIOTokens[wc]
+ sg.coordMu.availableIOTokens[wc] -= count
+ sg.availableTokensMetric[wc].Update(sg.coordMu.availableIOTokens[wc])
+ if count > 0 && avail > 0 && sg.coordMu.availableIOTokens[wc] <= 0 {
// Transition from > 0 to <= 0.
- sg.exhaustedStart = timeutil.Now()
- } else if count < 0 && avail <= 0 && (sg.coordMu.availableIOTokens > 0 || settingAvailableTokens) {
+ sg.exhaustedStart[wc] = timeutil.Now()
+ } else if count < 0 && avail <= 0 && (sg.coordMu.availableIOTokens[wc] > 0 || settingAvailableTokens) {
// Transition from <= 0 to > 0, or if we're newly setting available
// tokens. The latter ensures that if the available tokens stay <= 0, we
// don't show a sudden change in the metric after minutes of exhaustion
// (we had observed such behavior prior to this change).
now := timeutil.Now()
- exhaustedMicros := now.Sub(sg.exhaustedStart).Microseconds()
- sg.ioTokensExhaustedDurationMetric.Inc(exhaustedMicros)
- if sg.coordMu.availableIOTokens <= 0 {
- sg.exhaustedStart = now
+ exhaustedMicros := now.Sub(sg.exhaustedStart[wc]).Microseconds()
+ sg.ioTokensExhaustedDurationMetric[wc].Inc(exhaustedMicros)
+ if sg.coordMu.availableIOTokens[wc] <= 0 {
+ sg.exhaustedStart[wc] = now
}
}
}
@@ -539,7 +542,7 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
) (ioTokensUsed int64, ioTokensUsedByElasticWork int64) {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
- ioTokensUsed = sg.startingIOTokens - sg.coordMu.availableIOTokens
+ ioTokensUsed = sg.startingIOTokens - sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]
ioTokensUsedByElasticWork = sg.coordMu.elasticIOTokensUsedByElastic
sg.coordMu.elasticIOTokensUsedByElastic = 0
@@ -548,28 +551,31 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
// availableIOTokens become <= 0. We want to remember this previous
// over-allocation.
sg.subtractTokensLocked(-ioTokens, -elasticIOTokens, true)
- if sg.coordMu.availableIOTokens > ioTokenCapacity {
- sg.coordMu.availableIOTokens = ioTokenCapacity
- }
- if sg.coordMu.availableElasticIOTokens > elasticIOTokenCapacity {
- sg.coordMu.availableElasticIOTokens = elasticIOTokenCapacity
- }
- // availableElasticIOTokens can become very negative since it can be fewer
- // than the tokens for regular work, and regular work deducts from it
- // without blocking. This behavior is desirable, but we don't want deficits
- // to accumulate indefinitely. We've found that resetting on the lastTick
- // provides a good enough frequency for resetting the deficit. That is, we
- // are resetting every 15s.
+ if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > ioTokenCapacity {
+ sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] = ioTokenCapacity
+ }
+ if sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] > elasticIOTokenCapacity {
+ sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] = elasticIOTokenCapacity
+ }
+ // availableIOTokens[ElasticWorkClass] can become very negative since it can
+ // be fewer than the tokens for regular work, and regular work deducts from it
+ // without blocking. This behavior is desirable, but we don't want deficits to
+ // accumulate indefinitely. We've found that resetting on the lastTick
+ // provides a good enough frequency for resetting the deficit. That is, we are
+ // resetting every 15s.
if lastTick {
- sg.coordMu.availableElasticIOTokens = max(sg.coordMu.availableElasticIOTokens, 0)
- // It is possible that availableIOTokens is negative, in which case we
- // want availableElasticIOTokens to not exceed it.
- sg.coordMu.availableElasticIOTokens =
- min(sg.coordMu.availableElasticIOTokens, sg.coordMu.availableIOTokens)
+ sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] =
+ max(sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass], 0)
+ // It is possible that availableIOTokens[RegularWorkClass] is negative, in
+ // which case we want availableIOTokens[ElasticWorkClass] to not exceed it.
+ sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] =
+ min(sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass], sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass])
+ }
+ var w admissionpb.WorkClass
+ for w = 0; w < admissionpb.NumWorkClasses; w++ {
+ sg.availableTokensMetric[w].Update(sg.coordMu.availableIOTokens[w])
}
- sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens)
- sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens)
- sg.startingIOTokens = sg.coordMu.availableIOTokens
+ sg.startingIOTokens = sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]
sg.coordMu.elasticDiskBWTokensAvailable += elasticDiskBandwidthTokens
if sg.coordMu.elasticDiskBWTokensAvailable > elasticDiskBandwidthTokensCapacity {
@@ -609,9 +615,9 @@ func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
) (additionalTokens int64) {
// Reminder: coord.mu protects the state in the kvStoreTokenGranter.
exhaustedFunc := func() bool {
- return sg.coordMu.availableIOTokens <= 0 ||
+ return sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] <= 0 ||
(wc == admissionpb.ElasticWorkClass && (sg.coordMu.elasticDiskBWTokensAvailable <= 0 ||
- sg.coordMu.availableElasticIOTokens <= 0))
+ sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] <= 0))
}
wasExhausted := exhaustedFunc()
actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(admittedInfo.WriteBytes)
@@ -732,6 +738,12 @@ var (
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
+ kvElasticIOTokensExhaustedDuration = metric.Metadata{
+ Name: "admission.granter.elastic_io_tokens_exhausted_duration.kv",
+ Help: "Total duration when Elastic IO tokens were exhausted, in micros",
+ Measurement: "Microseconds",
+ Unit: metric.Unit_COUNT,
+ }
kvIOTokensTaken = metric.Metadata{
Name: "admission.granter.io_tokens_taken.kv",
Help: "Total number of tokens taken",
diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go
index f384f05feff6..e7fbd318471f 100644
--- a/pkg/util/admission/granter_test.go
+++ b/pkg/util/admission/granter_test.go
@@ -143,7 +143,6 @@ func TestGranterBasic(t *testing.T) {
},
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
- kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
kvIOTokensTaken: metrics.KVIOTokensTaken,
kvIOTokensReturned: metrics.KVIOTokensReturned,
kvIOTokensBypassed: metrics.KVIOTokensBypassed,