Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admission: add l0 control metrics #109640

Merged
merged 1 commit into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 52 additions & 42 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ func (gcs GrantCoordinators) Close() {
type StoreGrantCoordinators struct {
ambientCtx log.AmbientContext

settings *cluster.Settings
makeStoreRequesterFunc makeStoreRequesterFunc
kvIOTokensExhaustedDuration *metric.Counter
kvIOTokensAvailable *metric.Gauge
kvElasticIOTokensAvailable *metric.Gauge
kvIOTokensTookWithoutPermission *metric.Counter
kvIOTotalTokensTaken *metric.Counter
settings *cluster.Settings
makeStoreRequesterFunc makeStoreRequesterFunc
kvIOTokensExhaustedDuration *metric.Counter
kvIOTokensAvailable *metric.Gauge
kvElasticIOTokensAvailable *metric.Gauge
kvIOTotalTokensTaken *metric.Counter
kvIOTotalTokensReturned *metric.Counter
l0CompactedBytes *metric.Counter
l0TokensProduced *metric.Counter

// These metrics are shared by WorkQueues across stores.
workQueueMetrics *WorkQueueMetrics
Expand Down Expand Up @@ -168,10 +170,10 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
// initialization, which will also set these to unlimited.
startingIOTokens: unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval(),
ioTokensExhaustedDurationMetric: sgc.kvIOTokensExhaustedDuration,
availableTokensMetrics: sgc.kvIOTokensAvailable,
availableTokensMetric: sgc.kvIOTokensAvailable,
availableElasticTokensMetric: sgc.kvElasticIOTokensAvailable,
tookWithoutPermissionMetric: sgc.kvIOTokensTookWithoutPermission,
totalTokensTaken: sgc.kvIOTotalTokensTaken,
tokensTakenMetric: sgc.kvIOTotalTokensTaken,
tokensReturnedMetric: sgc.kvIOTotalTokensReturned,
}
kvg.coordMu.availableIOTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.availableElasticIOTokens = kvg.coordMu.availableIOTokens
Expand Down Expand Up @@ -215,6 +217,8 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
perWorkTokenEstimator: makeStorePerWorkTokenEstimator(),
diskBandwidthLimiter: makeDiskBandwidthLimiter(),
kvGranter: kvg,
l0CompactedBytes: sgc.l0CompactedBytes,
l0TokensProduced: sgc.l0TokensProduced,
}
return coord
}
Expand Down Expand Up @@ -462,17 +466,19 @@ func makeStoresGrantCoordinators(
makeStoreRequester = opts.makeStoreRequesterFunc
}
storeCoordinators := &StoreGrantCoordinators{
ambientCtx: ambientCtx,
settings: st,
makeStoreRequesterFunc: makeStoreRequester,
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTokensTookWithoutPermission: metrics.KVIOTokensTookWithoutPermission,
kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
workQueueMetrics: storeWorkQueueMetrics,
onLogEntryAdmitted: onLogEntryAdmitted,
knobs: knobs,
ambientCtx: ambientCtx,
settings: st,
makeStoreRequesterFunc: makeStoreRequester,
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken,
kvIOTotalTokensReturned: metrics.KVIOTotalTokensReturned,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
l0CompactedBytes: metrics.L0CompactedBytes,
l0TokensProduced: metrics.L0TokensProduced,
workQueueMetrics: storeWorkQueueMetrics,
onLogEntryAdmitted: onLogEntryAdmitted,
knobs: knobs,
}
return storeCoordinators
}
Expand Down Expand Up @@ -1012,34 +1018,38 @@ type GrantCoordinatorMetrics struct {
KVSlotAdjusterIncrements *metric.Counter
KVSlotAdjusterDecrements *metric.Counter
// TODO(banabrick): Make these metrics per store.
KVIOTokensExhaustedDuration *metric.Counter
KVIOTokensTookWithoutPermission *metric.Counter
KVIOTotalTokensTaken *metric.Counter
KVIOTokensAvailable *metric.Gauge
KVElasticIOTokensAvailable *metric.Gauge
SQLLeafStartUsedSlots *metric.Gauge
SQLRootStartUsedSlots *metric.Gauge
KVIOTokensExhaustedDuration *metric.Counter
KVIOTotalTokensTaken *metric.Counter
KVIOTotalTokensReturned *metric.Counter
KVIOTokensAvailable *metric.Gauge
KVElasticIOTokensAvailable *metric.Gauge
L0CompactedBytes *metric.Counter
L0TokensProduced *metric.Counter
SQLLeafStartUsedSlots *metric.Gauge
SQLRootStartUsedSlots *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
func (GrantCoordinatorMetrics) MetricStruct() {}

func makeGrantCoordinatorMetrics() GrantCoordinatorMetrics {
m := GrantCoordinatorMetrics{
KVTotalSlots: metric.NewGauge(totalSlots),
KVUsedSlots: metric.NewGauge(addName(workKindString(KVWork), usedSlots)),
KVSlotsExhaustedDuration: metric.NewCounter(kvSlotsExhaustedDuration),
KVCPULoadShortPeriodDuration: metric.NewCounter(kvCPULoadShortPeriodDuration),
KVCPULoadLongPeriodDuration: metric.NewCounter(kvCPULoadLongPeriodDuration),
KVSlotAdjusterIncrements: metric.NewCounter(kvSlotAdjusterIncrements),
KVSlotAdjusterDecrements: metric.NewCounter(kvSlotAdjusterDecrements),
KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration),
SQLLeafStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementLeafStartWork), usedSlots)),
SQLRootStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementRootStartWork), usedSlots)),
KVIOTokensTookWithoutPermission: metric.NewCounter(kvIONumIOTokensTookWithoutPermission),
KVIOTotalTokensTaken: metric.NewCounter(kvIOTotalTokensTaken),
KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable),
KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable),
KVTotalSlots: metric.NewGauge(totalSlots),
KVUsedSlots: metric.NewGauge(addName(workKindString(KVWork), usedSlots)),
KVSlotsExhaustedDuration: metric.NewCounter(kvSlotsExhaustedDuration),
KVCPULoadShortPeriodDuration: metric.NewCounter(kvCPULoadShortPeriodDuration),
KVCPULoadLongPeriodDuration: metric.NewCounter(kvCPULoadLongPeriodDuration),
KVSlotAdjusterIncrements: metric.NewCounter(kvSlotAdjusterIncrements),
KVSlotAdjusterDecrements: metric.NewCounter(kvSlotAdjusterDecrements),
KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration),
SQLLeafStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementLeafStartWork), usedSlots)),
SQLRootStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementRootStartWork), usedSlots)),
KVIOTotalTokensTaken: metric.NewCounter(kvIOTotalTokensTaken),
KVIOTotalTokensReturned: metric.NewCounter(kvIOTotalTokensReturned),
KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable),
KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable),
L0CompactedBytes: metric.NewCounter(l0CompactedBytes),
L0TokensProduced: metric.NewCounter(l0TokensProduced),
}
return m
}
Expand Down
67 changes: 41 additions & 26 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,12 @@ type kvStoreTokenGranter struct {
// computing startingIOTokens-availableIOTokens.
startingIOTokens int64
ioTokensExhaustedDurationMetric *metric.Counter
availableTokensMetrics *metric.Gauge
availableTokensMetric *metric.Gauge
availableElasticTokensMetric *metric.Gauge
tookWithoutPermissionMetric *metric.Counter
totalTokensTaken *metric.Counter
exhaustedStart time.Time
tokensReturnedMetric *metric.Counter
tokensTakenMetric *metric.Counter

exhaustedStart time.Time

// Estimation models.
l0WriteLM, l0IngestLM, ingestLM tokensLinearModel
Expand Down Expand Up @@ -404,7 +405,6 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
if sg.coordMu.availableIOTokens > 0 {
sg.subtractTokensLocked(count, count, false)
sg.coordMu.diskBWTokensUsed[wc] += count
sg.totalTokensTaken.Inc(count)
return grantSuccess
}
case admissionpb.ElasticWorkClass:
Expand All @@ -414,7 +414,6 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
sg.subtractTokensLocked(count, count, false)
sg.coordMu.elasticIOTokensUsedByElastic += count
sg.coordMu.diskBWTokensUsed[wc] += count
sg.totalTokensTaken.Inc(count)
return grantSuccess
}
}
Expand Down Expand Up @@ -446,8 +445,6 @@ func (sg *kvStoreTokenGranter) tookWithoutPermission(workClass admissionpb.WorkC
func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHandle int8) {
wc := admissionpb.WorkClass(demuxHandle)
sg.subtractTokensLocked(count, count, false)
sg.tookWithoutPermissionMetric.Inc(count)
sg.totalTokensTaken.Inc(count)
if wc == admissionpb.ElasticWorkClass {
sg.coordMu.elasticDiskBWTokensAvailable -= count
sg.coordMu.elasticIOTokensUsedByElastic += count
Expand All @@ -458,28 +455,35 @@ func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHan
// subtractTokensLocked is a helper function that subtracts count tokens (count
// can be negative, in which case this is really an addition).
func (sg *kvStoreTokenGranter) subtractTokensLocked(
count int64, elasticCount int64, forceTickMetric bool,
count int64, elasticCount int64, settingAvailableTokens bool,
) {
avail := sg.coordMu.availableIOTokens
sg.coordMu.availableIOTokens -= count
sg.coordMu.availableElasticIOTokens -= elasticCount
sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens)
sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens)
if !settingAvailableTokens {
if count > 0 {
sg.tokensTakenMetric.Inc(count)
} else {
sg.tokensReturnedMetric.Inc(-count)
}
}
if count > 0 && avail > 0 && sg.coordMu.availableIOTokens <= 0 {
// Transition from > 0 to <= 0.
sg.exhaustedStart = timeutil.Now()
} else if count < 0 && avail <= 0 && (sg.coordMu.availableIOTokens > 0 || forceTickMetric) {
// Transition from <= 0 to > 0, or forced to tick the metric. The latter
// ensures that if the available tokens stay <= 0, we don't show a sudden
// change in the metric after minutes of exhaustion (we had observed such
// behavior prior to this change).
} else if count < 0 && avail <= 0 && (sg.coordMu.availableIOTokens > 0 || settingAvailableTokens) {
// Transition from <= 0 to > 0, or if we're newly setting available
// tokens. The latter ensures that if the available tokens stay <= 0, we
// don't show a sudden change in the metric after minutes of exhaustion
// (we had observed such behavior prior to this change).
now := timeutil.Now()
exhaustedMicros := now.Sub(sg.exhaustedStart).Microseconds()
sg.ioTokensExhaustedDurationMetric.Inc(exhaustedMicros)
if sg.coordMu.availableIOTokens <= 0 {
sg.exhaustedStart = now
}
}
sg.availableTokensMetrics.Update(sg.coordMu.availableIOTokens)
sg.coordMu.availableElasticIOTokens -= elasticCount
sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens)
}

// requesterHasWaitingRequests implements granterWithLockedCalls.
Expand Down Expand Up @@ -570,10 +574,9 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
sg.coordMu.availableElasticIOTokens =
min(sg.coordMu.availableElasticIOTokens, sg.coordMu.availableIOTokens)
}

sg.startingIOTokens = sg.coordMu.availableIOTokens
sg.availableTokensMetrics.Update(sg.coordMu.availableIOTokens)
sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens)
sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens)
sg.startingIOTokens = sg.coordMu.availableIOTokens

sg.coordMu.elasticDiskBWTokensAvailable += elasticDiskBandwidthTokens
if sg.coordMu.elasticDiskBWTokensAvailable > elasticDiskBandwidthTokensCapacity {
Expand Down Expand Up @@ -736,18 +739,18 @@ var (
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
kvIONumIOTokensTookWithoutPermission = metric.Metadata{
Name: "admission.granter.io_tokens_took_without_permission.kv",
Help: "Total number of tokens taken without permission",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
kvIOTotalTokensTaken = metric.Metadata{
Name: "admission.granter.io_tokens_taken.kv",
Help: "Total number of tokens taken",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
kvIOTotalTokensReturned = metric.Metadata{
Name: "admission.granter.io_tokens_returned.kv",
Help: "Total number of tokens returned",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
kvIOTokensAvailable = metric.Metadata{
Name: "admission.granter.io_tokens_available.kv",
Help: "Number of tokens available",
Expand All @@ -760,6 +763,18 @@ var (
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
l0CompactedBytes = metric.Metadata{
Name: "admission.l0_compacted_bytes.kv",
Help: "Total bytes compacted out of L0 (used to generate IO tokens)",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
l0TokensProduced = metric.Metadata{
Name: "admission.l0_tokens_produced.kv",
Help: "Total bytes produced for L0 writes",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
)

// TODO(irfansharif): we are lacking metrics for IO tokens and load, including
Expand Down
18 changes: 10 additions & 8 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,16 @@ func TestGranterBasic(t *testing.T) {
requesters[numWorkKinds] = req.requesters[admissionpb.ElasticWorkClass]
return req
},
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
kvIOTokensTookWithoutPermission: metrics.KVIOTokensTookWithoutPermission,
kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken,
workQueueMetrics: workQueueMetrics,
disableTickerForTesting: true,
knobs: &TestingKnobs{},
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken,
kvIOTotalTokensReturned: metrics.KVIOTotalTokensReturned,
l0CompactedBytes: metrics.L0CompactedBytes,
l0TokensProduced: metrics.L0TokensProduced,
workQueueMetrics: workQueueMetrics,
disableTickerForTesting: true,
knobs: &TestingKnobs{},
}
var metricsProvider testMetricsProvider
metricsProvider.setMetricsForStores([]int32{1}, pebble.Metrics{})
Expand Down
15 changes: 14 additions & 1 deletion pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -188,6 +189,9 @@ type ioLoadListener struct {
adjustTokensResult
perWorkTokenEstimator storePerWorkTokenEstimator
diskBandwidthLimiter diskBandwidthLimiter

l0CompactedBytes *metric.Counter
l0TokensProduced *metric.Counter
}

type ioLoadListenerState struct {
Expand Down Expand Up @@ -641,7 +645,7 @@ type adjustTokensAuxComputations struct {

// adjustTokensInner is used for computing tokens based on compaction and
// flush bottlenecks.
func (*ioLoadListener) adjustTokensInner(
func (io *ioLoadListener) adjustTokensInner(
ctx context.Context,
prev ioLoadListenerState,
l0Metrics pebble.LevelMetrics,
Expand Down Expand Up @@ -677,7 +681,10 @@ func (*ioLoadListener) adjustTokensInner(
// bytes (gauge).
intL0CompactedBytes = 0
}
io.l0CompactedBytes.Inc(intL0CompactedBytes)

const alpha = 0.5

// Compaction scheduling can be uneven in prioritizing L0 for compactions,
// so smooth out what is being removed by compactions.
smoothedIntL0CompactedBytes := int64(alpha*float64(intL0CompactedBytes) + (1-alpha)*float64(prev.smoothedIntL0CompactedBytes))
Expand Down Expand Up @@ -958,6 +965,9 @@ func (*ioLoadListener) adjustTokensInner(
if totalNumElasticByteTokens > totalNumByteTokens {
totalNumElasticByteTokens = totalNumByteTokens
}

io.l0TokensProduced.Inc(totalNumByteTokens)

// Install the latest cumulative stats.
return adjustTokensResult{
ioLoadListenerState: ioLoadListenerState{
Expand Down Expand Up @@ -1047,6 +1057,9 @@ func (res adjustTokensResult) SafeFormat(p redact.SafePrinter, _ rune) {
ib(m/adjustmentInterval))
switch res.aux.tokenKind {
case compactionTokenKind:
// NB: res.smoothedCompactionByteTokens is the same as
// res.ioLoadListenerState.totalNumByteTokens (printed above) when
// res.aux.tokenKind == compactionTokenKind.
p.Printf(" due to L0 growth")
case flushTokenKind:
p.Printf(" due to memtable flush (multiplier %.3f)", res.flushUtilTargetFraction)
Expand Down
Loading