From 4bb1730e4073d77008f54888babda17c787e3a3c Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Mon, 13 May 2024 17:12:16 -0400 Subject: [PATCH] admission: introduce elastic io token exhausted duration metric This patch adds a new metric `elastic_io_tokens_exhausted_duration.kv`. This is similar to the existing `admission.granter.io_tokens_exhausted_duration.kv`, but for elastic traffic. The patch also does some code refactoring to make it easier to use both regular and elastic equivalent metrics. Informs #121574. Release note: None --- docs/generated/metrics/metrics.html | 1 + pkg/util/admission/grant_coordinator.go | 29 +++--- pkg/util/admission/granter.go | 120 ++++++++++++++---------- pkg/util/admission/granter_test.go | 1 - 4 files changed, 88 insertions(+), 63 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 534083770d4d..5972ae9be553 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -70,6 +70,7 @@ STORAGEadmission.granter.cpu_load_long_period_duration.kvTotal duration when CPULoad was being called with a long period, in microsMicrosecondsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEadmission.granter.cpu_load_short_period_duration.kvTotal duration when CPULoad was being called with a short period, in microsMicrosecondsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEadmission.granter.elastic_io_tokens_available.kvNumber of tokens availableTokensGAUGECOUNTAVGNONE +STORAGEadmission.granter.elastic_io_tokens_exhausted_duration.kvTotal duration when Elastic IO tokens were exhausted, in microsMicrosecondsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEadmission.granter.io_tokens_available.kvNumber of tokens availableTokensGAUGECOUNTAVGNONE STORAGEadmission.granter.io_tokens_bypassed.kvTotal number of tokens taken by work bypassing admission control (for example, follower writes without flow control)TokensCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEadmission.granter.io_tokens_exhausted_duration.kvTotal duration when IO tokens were exhausted, in microsMicrosecondsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index de397a4f7668..81d7d5a9a69d 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -52,9 +52,8 @@ type StoreGrantCoordinators struct { settings *cluster.Settings makeStoreRequesterFunc makeStoreRequesterFunc - kvIOTokensExhaustedDuration *metric.Counter - kvIOTokensAvailable *metric.Gauge - kvElasticIOTokensAvailable *metric.Gauge + kvIOTokensExhaustedDuration [admissionpb.NumWorkClasses]*metric.Counter + kvIOTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge kvIOTokensTaken *metric.Counter kvIOTokensReturned *metric.Counter kvIOTokensBypassed *metric.Counter @@ -173,12 +172,11 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) startingIOTokens: unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval(), ioTokensExhaustedDurationMetric: sgc.kvIOTokensExhaustedDuration, availableTokensMetric: sgc.kvIOTokensAvailable, - availableElasticTokensMetric: sgc.kvElasticIOTokensAvailable, tokensTakenMetric: sgc.kvIOTokensTaken, tokensReturnedMetric: sgc.kvIOTokensReturned, } - kvg.coordMu.availableIOTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval() - kvg.coordMu.availableElasticIOTokens = kvg.coordMu.availableIOTokens + kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval() + kvg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] = kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] kvg.coordMu.elasticDiskBWTokensAvailable = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval() opts := makeWorkQueueOptions(KVWork) @@ -477,7 +475,6 @@ func makeStoresGrantCoordinators( kvIOTokensReturned: metrics.KVIOTokensReturned, kvIOTokensBypassed: metrics.KVIOTokensBypassed, kvIOTokensAvailable: metrics.KVIOTokensAvailable, - kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable, l0CompactedBytes: metrics.L0CompactedBytes, l0TokensProduced: metrics.L0TokensProduced, workQueueMetrics: storeWorkQueueMetrics, @@ -989,8 +986,8 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, _ rune) { case *slotGranter: s.Printf("%s%s: used: %d, total: %d", curSep, kind, g.usedSlots, g.totalSlots) case *kvStoreTokenGranter: - s.Printf(" io-avail: %d(%d), elastic-disk-bw-tokens-avail: %d", g.coordMu.availableIOTokens, - g.coordMu.availableElasticIOTokens, + s.Printf(" io-avail: %d(%d), elastic-disk-bw-tokens-avail: %d", g.coordMu.availableIOTokens[admissionpb.RegularWorkClass], + g.coordMu.availableIOTokens[admissionpb.ElasticWorkClass], g.coordMu.elasticDiskBWTokensAvailable) } case SQLStatementLeafStartWork, SQLStatementRootStartWork: @@ -1022,12 +1019,11 @@ type GrantCoordinatorMetrics struct { KVSlotAdjusterIncrements *metric.Counter KVSlotAdjusterDecrements *metric.Counter // TODO(banabrick): Make these metrics per store. - KVIOTokensExhaustedDuration *metric.Counter KVIOTokensTaken *metric.Counter KVIOTokensReturned *metric.Counter KVIOTokensBypassed *metric.Counter - KVIOTokensAvailable *metric.Gauge - KVElasticIOTokensAvailable *metric.Gauge + KVIOTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge + KVIOTokensExhaustedDuration [admissionpb.NumWorkClasses]*metric.Counter L0CompactedBytes *metric.Counter L0TokensProduced *metric.Counter SQLLeafStartUsedSlots *metric.Gauge @@ -1046,17 +1042,20 @@ func makeGrantCoordinatorMetrics() GrantCoordinatorMetrics { KVCPULoadLongPeriodDuration: metric.NewCounter(kvCPULoadLongPeriodDuration), KVSlotAdjusterIncrements: metric.NewCounter(kvSlotAdjusterIncrements), KVSlotAdjusterDecrements: metric.NewCounter(kvSlotAdjusterDecrements), - KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration), SQLLeafStartUsedSlots: metric.NewGauge(addName(SQLStatementLeafStartWork.String(), usedSlots)), SQLRootStartUsedSlots: metric.NewGauge(addName(SQLStatementRootStartWork.String(), usedSlots)), KVIOTokensTaken: metric.NewCounter(kvIOTokensTaken), KVIOTokensReturned: metric.NewCounter(kvIOTokensReturned), KVIOTokensBypassed: metric.NewCounter(kvIOTokensBypassed), - KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable), - KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable), L0CompactedBytes: metric.NewCounter(l0CompactedBytes), L0TokensProduced: metric.NewCounter(l0TokensProduced), } + m.KVIOTokensAvailable[admissionpb.RegularWorkClass] = metric.NewGauge(kvIOTokensAvailable) + m.KVIOTokensAvailable[admissionpb.ElasticWorkClass] = metric.NewGauge(kvElasticIOTokensAvailable) + m.KVIOTokensExhaustedDuration = [admissionpb.NumWorkClasses]*metric.Counter{ + metric.NewCounter(kvIOTokensExhaustedDuration), + metric.NewCounter(kvElasticIOTokensExhaustedDuration), + } return m } diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index 059264d2696d..6f22ce2d4a8a 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -301,8 +301,7 @@ type kvStoreTokenGranter struct { // work deducts from both availableIOTokens and availableElasticIOTokens. // Regular work blocks if availableIOTokens is <= 0 and elastic work // blocks if availableElasticIOTokens <= 0. - availableIOTokens int64 - availableElasticIOTokens int64 + availableIOTokens [admissionpb.NumWorkClasses]int64 elasticIOTokensUsedByElastic int64 // Disk bandwidth tokens. elasticDiskBWTokensAvailable int64 @@ -310,17 +309,15 @@ type kvStoreTokenGranter struct { diskBWTokensUsed [admissionpb.NumWorkClasses]int64 } + ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter + availableTokensMetric [admissionpb.NumWorkClasses]*metric.Gauge + exhaustedStart [admissionpb.NumWorkClasses]time.Time // startingIOTokens is the number of tokens set by // setAvailableTokens. It is used to compute the tokens used, by // computing startingIOTokens-availableIOTokens. - startingIOTokens int64 - ioTokensExhaustedDurationMetric *metric.Counter - availableTokensMetric *metric.Gauge - availableElasticTokensMetric *metric.Gauge - tokensReturnedMetric *metric.Counter - tokensTakenMetric *metric.Counter - - exhaustedStart time.Time + startingIOTokens int64 + tokensReturnedMetric *metric.Counter + tokensTakenMetric *metric.Counter // Estimation models. l0WriteLM, l0IngestLM, ingestLM tokensLinearModel @@ -402,14 +399,15 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant // (and not cause a performance isolation failure). switch wc { case admissionpb.RegularWorkClass: - if sg.coordMu.availableIOTokens > 0 { + if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 { sg.subtractTokensLocked(count, count, false) sg.coordMu.diskBWTokensUsed[wc] += count return grantSuccess } case admissionpb.ElasticWorkClass: - if sg.coordMu.elasticDiskBWTokensAvailable > 0 && sg.coordMu.availableIOTokens > 0 && - sg.coordMu.availableElasticIOTokens > 0 { + if sg.coordMu.elasticDiskBWTokensAvailable > 0 && + sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 && + sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] > 0 { sg.coordMu.elasticDiskBWTokensAvailable -= count sg.subtractTokensLocked(count, count, false) sg.coordMu.elasticIOTokensUsedByElastic += count @@ -458,10 +456,19 @@ func (sg *kvStoreTokenGranter) subtractTokensLocked( count int64, elasticCount int64, settingAvailableTokens bool, ) { avail := sg.coordMu.availableIOTokens - sg.coordMu.availableIOTokens -= count - sg.coordMu.availableElasticIOTokens -= elasticCount - sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens) - sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens) + var w admissionpb.WorkClass + var tokenCount int64 + for w = 0; w < admissionpb.NumWorkClasses; w++ { + if w == admissionpb.ElasticWorkClass { + tokenCount = elasticCount + } else { + tokenCount = count + } + sg.coordMu.availableIOTokens[w] -= tokenCount + sg.availableTokensMetric[w].Update(sg.coordMu.availableIOTokens[w]) + updateExhaustedTokenDurationLocked(tokenCount, avail[w], sg.coordMu.availableIOTokens[w], settingAvailableTokens, + &sg.exhaustedStart[w], sg.ioTokensExhaustedDurationMetric[w]) + } if !settingAvailableTokens { if count > 0 { sg.tokensTakenMetric.Inc(count) @@ -469,19 +476,29 @@ func (sg *kvStoreTokenGranter) subtractTokensLocked( sg.tokensReturnedMetric.Inc(-count) } } - if count > 0 && avail > 0 && sg.coordMu.availableIOTokens <= 0 { +} + +func updateExhaustedTokenDurationLocked( + count int64, + prevAvail int64, + curAvail int64, + settingAvailableTokens bool, + exhaustedStart *time.Time, + ioTokensExhaustedDurationMetric *metric.Counter, +) { + if count > 0 && prevAvail > 0 && curAvail <= 0 { // Transition from > 0 to <= 0. - sg.exhaustedStart = timeutil.Now() - } else if count < 0 && avail <= 0 && (sg.coordMu.availableIOTokens > 0 || settingAvailableTokens) { + *exhaustedStart = timeutil.Now() + } else if count < 0 && prevAvail <= 0 && (curAvail > 0 || settingAvailableTokens) { // Transition from <= 0 to > 0, or if we're newly setting available // tokens. The latter ensures that if the available tokens stay <= 0, we // don't show a sudden change in the metric after minutes of exhaustion // (we had observed such behavior prior to this change). now := timeutil.Now() - exhaustedMicros := now.Sub(sg.exhaustedStart).Microseconds() - sg.ioTokensExhaustedDurationMetric.Inc(exhaustedMicros) - if sg.coordMu.availableIOTokens <= 0 { - sg.exhaustedStart = now + exhaustedMicros := now.Sub(*exhaustedStart).Microseconds() + ioTokensExhaustedDurationMetric.Inc(exhaustedMicros) + if curAvail <= 0 { + *exhaustedStart = now } } } @@ -539,7 +556,7 @@ func (sg *kvStoreTokenGranter) setAvailableTokens( ) (ioTokensUsed int64, ioTokensUsedByElasticWork int64) { sg.coord.mu.Lock() defer sg.coord.mu.Unlock() - ioTokensUsed = sg.startingIOTokens - sg.coordMu.availableIOTokens + ioTokensUsed = sg.startingIOTokens - sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] ioTokensUsedByElasticWork = sg.coordMu.elasticIOTokensUsedByElastic sg.coordMu.elasticIOTokensUsedByElastic = 0 @@ -548,28 +565,31 @@ func (sg *kvStoreTokenGranter) setAvailableTokens( // availableIOTokens become <= 0. We want to remember this previous // over-allocation. sg.subtractTokensLocked(-ioTokens, -elasticIOTokens, true) - if sg.coordMu.availableIOTokens > ioTokenCapacity { - sg.coordMu.availableIOTokens = ioTokenCapacity - } - if sg.coordMu.availableElasticIOTokens > elasticIOTokenCapacity { - sg.coordMu.availableElasticIOTokens = elasticIOTokenCapacity - } - // availableElasticIOTokens can become very negative since it can be fewer - // than the tokens for regular work, and regular work deducts from it - // without blocking. This behavior is desirable, but we don't want deficits - // to accumulate indefinitely. We've found that resetting on the lastTick - // provides a good enough frequency for resetting the deficit. That is, we - // are resetting every 15s. + if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > ioTokenCapacity { + sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] = ioTokenCapacity + } + if sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] > elasticIOTokenCapacity { + sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] = elasticIOTokenCapacity + } + // availableIOTokens[ElasticWorkClass] can become very negative since it can + // be fewer than the tokens for regular work, and regular work deducts from it + // without blocking. This behavior is desirable, but we don't want deficits to + // accumulate indefinitely. We've found that resetting on the lastTick + // provides a good enough frequency for resetting the deficit. That is, we are + // resetting every 15s. if lastTick { - sg.coordMu.availableElasticIOTokens = max(sg.coordMu.availableElasticIOTokens, 0) - // It is possible that availableIOTokens is negative, in which case we - // want availableElasticIOTokens to not exceed it. - sg.coordMu.availableElasticIOTokens = - min(sg.coordMu.availableElasticIOTokens, sg.coordMu.availableIOTokens) + sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] = + max(sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass], 0) + // It is possible that availableIOTokens[RegularWorkClass] is negative, in + // which case we want availableIOTokens[ElasticWorkClass] to not exceed it. + sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] = + min(sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass], sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]) + } + var w admissionpb.WorkClass + for w = 0; w < admissionpb.NumWorkClasses; w++ { + sg.availableTokensMetric[w].Update(sg.coordMu.availableIOTokens[w]) } - sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens) - sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens) - sg.startingIOTokens = sg.coordMu.availableIOTokens + sg.startingIOTokens = sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] sg.coordMu.elasticDiskBWTokensAvailable += elasticDiskBandwidthTokens if sg.coordMu.elasticDiskBWTokensAvailable > elasticDiskBandwidthTokensCapacity { @@ -609,9 +629,9 @@ func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked( ) (additionalTokens int64) { // Reminder: coord.mu protects the state in the kvStoreTokenGranter. exhaustedFunc := func() bool { - return sg.coordMu.availableIOTokens <= 0 || + return sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] <= 0 || (wc == admissionpb.ElasticWorkClass && (sg.coordMu.elasticDiskBWTokensAvailable <= 0 || - sg.coordMu.availableElasticIOTokens <= 0)) + sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] <= 0)) } wasExhausted := exhaustedFunc() actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(admittedInfo.WriteBytes) @@ -732,6 +752,12 @@ var ( Measurement: "Microseconds", Unit: metric.Unit_COUNT, } + kvElasticIOTokensExhaustedDuration = metric.Metadata{ + Name: "admission.granter.elastic_io_tokens_exhausted_duration.kv", + Help: "Total duration when Elastic IO tokens were exhausted, in micros", + Measurement: "Microseconds", + Unit: metric.Unit_COUNT, + } kvIOTokensTaken = metric.Metadata{ Name: "admission.granter.io_tokens_taken.kv", Help: "Total number of tokens taken", diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index 9822afa5164b..3e1851010a3a 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -139,7 +139,6 @@ func TestGranterBasic(t *testing.T) { }, kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, kvIOTokensAvailable: metrics.KVIOTokensAvailable, - kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable, kvIOTokensTaken: metrics.KVIOTokensTaken, kvIOTokensReturned: metrics.KVIOTokensReturned, kvIOTokensBypassed: metrics.KVIOTokensBypassed,