Skip to content

Commit

Permalink
admission: introduce elastic io token exhausted duration metric
Browse files Browse the repository at this point in the history
This patch adds a new metric `elastic_io_tokens_exhausted_duration.kv`.
This is similar to the existing
`admission.granter.io_tokens_exhausted_duration.kv`, but for elastic
traffic.

The patch also does some code refactoring to make it easier to use both
regular and elastic equivalent metrics.

Informs cockroachdb#121574.

Release note: None
  • Loading branch information
aadityasondhi committed May 14, 2024
1 parent bf7788e commit 4bb1730
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 63 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
<tr><td>STORAGE</td><td>admission.granter.cpu_load_long_period_duration.kv</td><td>Total duration when CPULoad was being called with a long period, in micros</td><td>Microseconds</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>admission.granter.cpu_load_short_period_duration.kv</td><td>Total duration when CPULoad was being called with a short period, in micros</td><td>Microseconds</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>admission.granter.elastic_io_tokens_available.kv</td><td>Number of tokens available</td><td>Tokens</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>admission.granter.elastic_io_tokens_exhausted_duration.kv</td><td>Total duration when Elastic IO tokens were exhausted, in micros</td><td>Microseconds</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>admission.granter.io_tokens_available.kv</td><td>Number of tokens available</td><td>Tokens</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>admission.granter.io_tokens_bypassed.kv</td><td>Total number of tokens taken by work bypassing admission control (for example, follower writes without flow control)</td><td>Tokens</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>admission.granter.io_tokens_exhausted_duration.kv</td><td>Total duration when IO tokens were exhausted, in micros</td><td>Microseconds</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
29 changes: 14 additions & 15 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ type StoreGrantCoordinators struct {

settings *cluster.Settings
makeStoreRequesterFunc makeStoreRequesterFunc
kvIOTokensExhaustedDuration *metric.Counter
kvIOTokensAvailable *metric.Gauge
kvElasticIOTokensAvailable *metric.Gauge
kvIOTokensExhaustedDuration [admissionpb.NumWorkClasses]*metric.Counter
kvIOTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge
kvIOTokensTaken *metric.Counter
kvIOTokensReturned *metric.Counter
kvIOTokensBypassed *metric.Counter
Expand Down Expand Up @@ -173,12 +172,11 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
startingIOTokens: unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval(),
ioTokensExhaustedDurationMetric: sgc.kvIOTokensExhaustedDuration,
availableTokensMetric: sgc.kvIOTokensAvailable,
availableElasticTokensMetric: sgc.kvElasticIOTokensAvailable,
tokensTakenMetric: sgc.kvIOTokensTaken,
tokensReturnedMetric: sgc.kvIOTokensReturned,
}
kvg.coordMu.availableIOTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.availableElasticIOTokens = kvg.coordMu.availableIOTokens
kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] = kvg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]
kvg.coordMu.elasticDiskBWTokensAvailable = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()

opts := makeWorkQueueOptions(KVWork)
Expand Down Expand Up @@ -477,7 +475,6 @@ func makeStoresGrantCoordinators(
kvIOTokensReturned: metrics.KVIOTokensReturned,
kvIOTokensBypassed: metrics.KVIOTokensBypassed,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
l0CompactedBytes: metrics.L0CompactedBytes,
l0TokensProduced: metrics.L0TokensProduced,
workQueueMetrics: storeWorkQueueMetrics,
Expand Down Expand Up @@ -989,8 +986,8 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, _ rune) {
case *slotGranter:
s.Printf("%s%s: used: %d, total: %d", curSep, kind, g.usedSlots, g.totalSlots)
case *kvStoreTokenGranter:
s.Printf(" io-avail: %d(%d), elastic-disk-bw-tokens-avail: %d", g.coordMu.availableIOTokens,
g.coordMu.availableElasticIOTokens,
s.Printf(" io-avail: %d(%d), elastic-disk-bw-tokens-avail: %d", g.coordMu.availableIOTokens[admissionpb.RegularWorkClass],
g.coordMu.availableIOTokens[admissionpb.ElasticWorkClass],
g.coordMu.elasticDiskBWTokensAvailable)
}
case SQLStatementLeafStartWork, SQLStatementRootStartWork:
Expand Down Expand Up @@ -1022,12 +1019,11 @@ type GrantCoordinatorMetrics struct {
KVSlotAdjusterIncrements *metric.Counter
KVSlotAdjusterDecrements *metric.Counter
// TODO(banabrick): Make these metrics per store.
KVIOTokensExhaustedDuration *metric.Counter
KVIOTokensTaken *metric.Counter
KVIOTokensReturned *metric.Counter
KVIOTokensBypassed *metric.Counter
KVIOTokensAvailable *metric.Gauge
KVElasticIOTokensAvailable *metric.Gauge
KVIOTokensAvailable [admissionpb.NumWorkClasses]*metric.Gauge
KVIOTokensExhaustedDuration [admissionpb.NumWorkClasses]*metric.Counter
L0CompactedBytes *metric.Counter
L0TokensProduced *metric.Counter
SQLLeafStartUsedSlots *metric.Gauge
Expand All @@ -1046,17 +1042,20 @@ func makeGrantCoordinatorMetrics() GrantCoordinatorMetrics {
KVCPULoadLongPeriodDuration: metric.NewCounter(kvCPULoadLongPeriodDuration),
KVSlotAdjusterIncrements: metric.NewCounter(kvSlotAdjusterIncrements),
KVSlotAdjusterDecrements: metric.NewCounter(kvSlotAdjusterDecrements),
KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration),
SQLLeafStartUsedSlots: metric.NewGauge(addName(SQLStatementLeafStartWork.String(), usedSlots)),
SQLRootStartUsedSlots: metric.NewGauge(addName(SQLStatementRootStartWork.String(), usedSlots)),
KVIOTokensTaken: metric.NewCounter(kvIOTokensTaken),
KVIOTokensReturned: metric.NewCounter(kvIOTokensReturned),
KVIOTokensBypassed: metric.NewCounter(kvIOTokensBypassed),
KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable),
KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable),
L0CompactedBytes: metric.NewCounter(l0CompactedBytes),
L0TokensProduced: metric.NewCounter(l0TokensProduced),
}
m.KVIOTokensAvailable[admissionpb.RegularWorkClass] = metric.NewGauge(kvIOTokensAvailable)
m.KVIOTokensAvailable[admissionpb.ElasticWorkClass] = metric.NewGauge(kvElasticIOTokensAvailable)
m.KVIOTokensExhaustedDuration = [admissionpb.NumWorkClasses]*metric.Counter{
metric.NewCounter(kvIOTokensExhaustedDuration),
metric.NewCounter(kvElasticIOTokensExhaustedDuration),
}
return m
}

Expand Down
120 changes: 73 additions & 47 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,26 +301,23 @@ type kvStoreTokenGranter struct {
// work deducts from both availableIOTokens and availableElasticIOTokens.
// Regular work blocks if availableIOTokens is <= 0 and elastic work
// blocks if availableElasticIOTokens <= 0.
availableIOTokens int64
availableElasticIOTokens int64
availableIOTokens [admissionpb.NumWorkClasses]int64
elasticIOTokensUsedByElastic int64
// Disk bandwidth tokens.
elasticDiskBWTokensAvailable int64

diskBWTokensUsed [admissionpb.NumWorkClasses]int64
}

ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter
availableTokensMetric [admissionpb.NumWorkClasses]*metric.Gauge
exhaustedStart [admissionpb.NumWorkClasses]time.Time
// startingIOTokens is the number of tokens set by
// setAvailableTokens. It is used to compute the tokens used, by
// computing startingIOTokens-availableIOTokens.
startingIOTokens int64
ioTokensExhaustedDurationMetric *metric.Counter
availableTokensMetric *metric.Gauge
availableElasticTokensMetric *metric.Gauge
tokensReturnedMetric *metric.Counter
tokensTakenMetric *metric.Counter

exhaustedStart time.Time
startingIOTokens int64
tokensReturnedMetric *metric.Counter
tokensTakenMetric *metric.Counter

// Estimation models.
l0WriteLM, l0IngestLM, ingestLM tokensLinearModel
Expand Down Expand Up @@ -402,14 +399,15 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
// (and not cause a performance isolation failure).
switch wc {
case admissionpb.RegularWorkClass:
if sg.coordMu.availableIOTokens > 0 {
if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 {
sg.subtractTokensLocked(count, count, false)
sg.coordMu.diskBWTokensUsed[wc] += count
return grantSuccess
}
case admissionpb.ElasticWorkClass:
if sg.coordMu.elasticDiskBWTokensAvailable > 0 && sg.coordMu.availableIOTokens > 0 &&
sg.coordMu.availableElasticIOTokens > 0 {
if sg.coordMu.elasticDiskBWTokensAvailable > 0 &&
sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 &&
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] > 0 {
sg.coordMu.elasticDiskBWTokensAvailable -= count
sg.subtractTokensLocked(count, count, false)
sg.coordMu.elasticIOTokensUsedByElastic += count
Expand Down Expand Up @@ -458,30 +456,49 @@ func (sg *kvStoreTokenGranter) subtractTokensLocked(
count int64, elasticCount int64, settingAvailableTokens bool,
) {
avail := sg.coordMu.availableIOTokens
sg.coordMu.availableIOTokens -= count
sg.coordMu.availableElasticIOTokens -= elasticCount
sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens)
sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens)
var w admissionpb.WorkClass
var tokenCount int64
for w = 0; w < admissionpb.NumWorkClasses; w++ {
if w == admissionpb.ElasticWorkClass {
tokenCount = elasticCount
} else {
tokenCount = count
}
sg.coordMu.availableIOTokens[w] -= tokenCount
sg.availableTokensMetric[w].Update(sg.coordMu.availableIOTokens[w])
updateExhaustedTokenDurationLocked(tokenCount, avail[w], sg.coordMu.availableIOTokens[w], settingAvailableTokens,
&sg.exhaustedStart[w], sg.ioTokensExhaustedDurationMetric[w])
}
if !settingAvailableTokens {
if count > 0 {
sg.tokensTakenMetric.Inc(count)
} else {
sg.tokensReturnedMetric.Inc(-count)
}
}
if count > 0 && avail > 0 && sg.coordMu.availableIOTokens <= 0 {
}

func updateExhaustedTokenDurationLocked(
count int64,
prevAvail int64,
curAvail int64,
settingAvailableTokens bool,
exhaustedStart *time.Time,
ioTokensExhaustedDurationMetric *metric.Counter,
) {
if count > 0 && prevAvail > 0 && curAvail <= 0 {
// Transition from > 0 to <= 0.
sg.exhaustedStart = timeutil.Now()
} else if count < 0 && avail <= 0 && (sg.coordMu.availableIOTokens > 0 || settingAvailableTokens) {
*exhaustedStart = timeutil.Now()
} else if count < 0 && prevAvail <= 0 && (curAvail > 0 || settingAvailableTokens) {
// Transition from <= 0 to > 0, or if we're newly setting available
// tokens. The latter ensures that if the available tokens stay <= 0, we
// don't show a sudden change in the metric after minutes of exhaustion
// (we had observed such behavior prior to this change).
now := timeutil.Now()
exhaustedMicros := now.Sub(sg.exhaustedStart).Microseconds()
sg.ioTokensExhaustedDurationMetric.Inc(exhaustedMicros)
if sg.coordMu.availableIOTokens <= 0 {
sg.exhaustedStart = now
exhaustedMicros := now.Sub(*exhaustedStart).Microseconds()
ioTokensExhaustedDurationMetric.Inc(exhaustedMicros)
if curAvail <= 0 {
*exhaustedStart = now
}
}
}
Expand Down Expand Up @@ -539,7 +556,7 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
) (ioTokensUsed int64, ioTokensUsedByElasticWork int64) {
sg.coord.mu.Lock()
defer sg.coord.mu.Unlock()
ioTokensUsed = sg.startingIOTokens - sg.coordMu.availableIOTokens
ioTokensUsed = sg.startingIOTokens - sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]
ioTokensUsedByElasticWork = sg.coordMu.elasticIOTokensUsedByElastic
sg.coordMu.elasticIOTokensUsedByElastic = 0

Expand All @@ -548,28 +565,31 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
// availableIOTokens become <= 0. We want to remember this previous
// over-allocation.
sg.subtractTokensLocked(-ioTokens, -elasticIOTokens, true)
if sg.coordMu.availableIOTokens > ioTokenCapacity {
sg.coordMu.availableIOTokens = ioTokenCapacity
}
if sg.coordMu.availableElasticIOTokens > elasticIOTokenCapacity {
sg.coordMu.availableElasticIOTokens = elasticIOTokenCapacity
}
// availableElasticIOTokens can become very negative since it can be fewer
// than the tokens for regular work, and regular work deducts from it
// without blocking. This behavior is desirable, but we don't want deficits
// to accumulate indefinitely. We've found that resetting on the lastTick
// provides a good enough frequency for resetting the deficit. That is, we
// are resetting every 15s.
if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > ioTokenCapacity {
sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] = ioTokenCapacity
}
if sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] > elasticIOTokenCapacity {
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] = elasticIOTokenCapacity
}
// availableIOTokens[ElasticWorkClass] can become very negative since it can
// be fewer than the tokens for regular work, and regular work deducts from it
// without blocking. This behavior is desirable, but we don't want deficits to
// accumulate indefinitely. We've found that resetting on the lastTick
// provides a good enough frequency for resetting the deficit. That is, we are
// resetting every 15s.
if lastTick {
sg.coordMu.availableElasticIOTokens = max(sg.coordMu.availableElasticIOTokens, 0)
// It is possible that availableIOTokens is negative, in which case we
// want availableElasticIOTokens to not exceed it.
sg.coordMu.availableElasticIOTokens =
min(sg.coordMu.availableElasticIOTokens, sg.coordMu.availableIOTokens)
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] =
max(sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass], 0)
// It is possible that availableIOTokens[RegularWorkClass] is negative, in
// which case we want availableIOTokens[ElasticWorkClass] to not exceed it.
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] =
min(sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass], sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass])
}
var w admissionpb.WorkClass
for w = 0; w < admissionpb.NumWorkClasses; w++ {
sg.availableTokensMetric[w].Update(sg.coordMu.availableIOTokens[w])
}
sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens)
sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens)
sg.startingIOTokens = sg.coordMu.availableIOTokens
sg.startingIOTokens = sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]

sg.coordMu.elasticDiskBWTokensAvailable += elasticDiskBandwidthTokens
if sg.coordMu.elasticDiskBWTokensAvailable > elasticDiskBandwidthTokensCapacity {
Expand Down Expand Up @@ -609,9 +629,9 @@ func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked(
) (additionalTokens int64) {
// Reminder: coord.mu protects the state in the kvStoreTokenGranter.
exhaustedFunc := func() bool {
return sg.coordMu.availableIOTokens <= 0 ||
return sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] <= 0 ||
(wc == admissionpb.ElasticWorkClass && (sg.coordMu.elasticDiskBWTokensAvailable <= 0 ||
sg.coordMu.availableElasticIOTokens <= 0))
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] <= 0))
}
wasExhausted := exhaustedFunc()
actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(admittedInfo.WriteBytes)
Expand Down Expand Up @@ -732,6 +752,12 @@ var (
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
kvElasticIOTokensExhaustedDuration = metric.Metadata{
Name: "admission.granter.elastic_io_tokens_exhausted_duration.kv",
Help: "Total duration when Elastic IO tokens were exhausted, in micros",
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
kvIOTokensTaken = metric.Metadata{
Name: "admission.granter.io_tokens_taken.kv",
Help: "Total number of tokens taken",
Expand Down
1 change: 0 additions & 1 deletion pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func TestGranterBasic(t *testing.T) {
},
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
kvIOTokensTaken: metrics.KVIOTokensTaken,
kvIOTokensReturned: metrics.KVIOTokensReturned,
kvIOTokensBypassed: metrics.KVIOTokensBypassed,
Expand Down

0 comments on commit 4bb1730

Please sign in to comment.