Skip to content

Commit

Permalink
Merge pull request #96356 from irfansharif/backport22.2-95007
Browse files Browse the repository at this point in the history
release-22.2: admission: CPU slot adjustment and utilization metrics
  • Loading branch information
irfansharif authored Feb 1, 2023
2 parents 10bbf20 + 8832be6 commit 1b80198
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 40 deletions.
1 change: 1 addition & 0 deletions pkg/testutils/lint/lint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,7 @@ func TestLint(t *testing.T) {
"../../sql/row",
"../../kv/kvclient/rangecache",
"../../storage",
"../../util/admission",
); err != nil {
t.Fatal(err)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3581,6 +3581,21 @@ var charts = []sectionDescription{
"admission.granter.used_slots.sql-root-start",
},
},
{
Title: "Granter Slot Counters",
Metrics: []string{
"admission.granter.slot_adjuster_increments.kv",
"admission.granter.slot_adjuster_decrements.kv",
},
},
{
Title: "Granter Slot Durations",
Metrics: []string{
"admission.granter.slots_exhausted_duration.kv",
"admission.granter.cpu_load_short_period_duration.kv",
"admission.granter.cpu_load_long_period_duration.kv",
},
},
{
Title: "Elastic CPU Utilization",
Metrics: []string{
Expand Down
73 changes: 44 additions & 29 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,17 @@ func makeRegularGrantCoordinator(
}

kvSlotAdjuster := &kvSlotAdjuster{
settings: st,
minCPUSlots: opts.MinCPUSlots,
maxCPUSlots: opts.MaxCPUSlots,
totalSlotsMetric: metrics.KVTotalSlots,
totalModerateSlotsMetric: metrics.KVTotalModerateSlots,
moderateSlotsClamp: opts.MaxCPUSlots,
runnableAlphaOverride: opts.RunnableAlphaOverride,
settings: st,
minCPUSlots: opts.MinCPUSlots,
maxCPUSlots: opts.MaxCPUSlots,
moderateSlotsClamp: opts.MaxCPUSlots,
runnableAlphaOverride: opts.RunnableAlphaOverride,
totalSlotsMetric: metrics.KVTotalSlots,
totalModerateSlotsMetric: metrics.KVTotalModerateSlots,
cpuLoadShortPeriodDurationMetric: metrics.KVCPULoadShortPeriodDuration,
cpuLoadLongPeriodDurationMetric: metrics.KVCPULoadLongPeriodDuration,
slotAdjusterIncrementsMetric: metrics.KVSlotAdjusterIncrements,
slotAdjusterDecrementsMetric: metrics.KVSlotAdjusterDecrements,
}
coord := &GrantCoordinator{
ambientCtx: ambientCtx,
Expand All @@ -445,12 +449,13 @@ func makeRegularGrantCoordinator(
}

kvg := &slotGranter{
coord: coord,
workKind: KVWork,
totalHighLoadSlots: opts.MinCPUSlots,
totalModerateLoadSlots: opts.MinCPUSlots,
usedSlotsMetric: metrics.KVUsedSlots,
usedSoftSlotsMetric: metrics.KVUsedSoftSlots,
coord: coord,
workKind: KVWork,
totalHighLoadSlots: opts.MinCPUSlots,
totalModerateLoadSlots: opts.MinCPUSlots,
usedSlotsMetric: metrics.KVUsedSlots,
usedSoftSlotsMetric: metrics.KVUsedSoftSlots,
slotsExhaustedDurationMetric: metrics.KVSlotsExhaustedDuration,
}

kvSlotAdjuster.granter = kvg
Expand Down Expand Up @@ -941,29 +946,39 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, verb rune) {

// GrantCoordinatorMetrics are metrics associated with a GrantCoordinator.
type GrantCoordinatorMetrics struct {
KVTotalSlots *metric.Gauge
KVUsedSlots *metric.Gauge
KVTotalModerateSlots *metric.Gauge
KVUsedSoftSlots *metric.Gauge
KVIOTokensExhaustedDuration *metric.Counter
SQLLeafStartUsedSlots *metric.Gauge
SQLRootStartUsedSlots *metric.Gauge
KVTotalSlots *metric.Gauge
KVUsedSlots *metric.Gauge
KVTotalModerateSlots *metric.Gauge
KVUsedSoftSlots *metric.Gauge
KVSlotsExhaustedDuration *metric.Counter
KVCPULoadShortPeriodDuration *metric.Counter
KVCPULoadLongPeriodDuration *metric.Counter
KVSlotAdjusterIncrements *metric.Counter
KVSlotAdjusterDecrements *metric.Counter
KVIOTokensExhaustedDuration *metric.Counter
SQLLeafStartUsedSlots *metric.Gauge
SQLRootStartUsedSlots *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
func (GrantCoordinatorMetrics) MetricStruct() {}

func makeGrantCoordinatorMetrics() GrantCoordinatorMetrics {
m := GrantCoordinatorMetrics{
KVTotalSlots: metric.NewGauge(totalSlots),
KVUsedSlots: metric.NewGauge(addName(string(workKindString(KVWork)), usedSlots)),
KVTotalModerateSlots: metric.NewGauge(totalModerateSlots),
KVUsedSoftSlots: metric.NewGauge(usedSoftSlots),
KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration),
SQLLeafStartUsedSlots: metric.NewGauge(
addName(string(workKindString(SQLStatementLeafStartWork)), usedSlots)),
SQLRootStartUsedSlots: metric.NewGauge(
addName(string(workKindString(SQLStatementRootStartWork)), usedSlots)),
KVTotalSlots: metric.NewGauge(totalSlots),
KVUsedSlots: metric.NewGauge(addName(string(workKindString(KVWork)), usedSlots)),
// TODO(sumeer): remove moderate load slots and soft slots code and
// metrics #88032.
KVTotalModerateSlots: metric.NewGauge(totalModerateSlots),
KVUsedSoftSlots: metric.NewGauge(usedSoftSlots),
KVSlotsExhaustedDuration: metric.NewCounter(kvSlotsExhaustedDuration),
KVCPULoadShortPeriodDuration: metric.NewCounter(kvCPULoadShortPeriodDuration),
KVCPULoadLongPeriodDuration: metric.NewCounter(kvCPULoadLongPeriodDuration),
KVSlotAdjusterIncrements: metric.NewCounter(kvSlotAdjusterIncrements),
KVSlotAdjusterDecrements: metric.NewCounter(kvSlotAdjusterDecrements),
KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration),
SQLLeafStartUsedSlots: metric.NewGauge(addName(string(workKindString(SQLStatementLeafStartWork)), usedSlots)),
SQLRootStartUsedSlots: metric.NewGauge(addName(string(workKindString(SQLStatementRootStartWork)), usedSlots)),
}
return m
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ type slotGranter struct {

usedSlotsMetric *metric.Gauge
usedSoftSlotsMetric *metric.Gauge
// Non-nil for KV slots.
slotsExhaustedDurationMetric *metric.Counter
exhaustedStart time.Time
}

var _ granterWithLockedCalls = &slotGranter{}
Expand All @@ -127,6 +130,9 @@ func (sg *slotGranter) tryGetLocked(count int64, _ int8) grantResult {
}
if sg.usedSlots < sg.totalHighLoadSlots || sg.skipSlotEnforcement {
sg.usedSlots++
if sg.usedSlots == sg.totalHighLoadSlots && sg.slotsExhaustedDurationMetric != nil {
sg.exhaustedStart = timeutil.Now()
}
sg.usedSlotsMetric.Update(int64(sg.usedSlots))
return grantSuccess
}
Expand Down Expand Up @@ -172,6 +178,11 @@ func (sg *slotGranter) returnGrantLocked(count int64, _ int8) {
if count != 1 {
panic(errors.AssertionFailedf("unexpected count: %d", count))
}
if sg.usedSlots == sg.totalHighLoadSlots && sg.slotsExhaustedDurationMetric != nil {
now := timeutil.Now()
exhaustedMicros := now.Sub(sg.exhaustedStart).Microseconds()
sg.slotsExhaustedDurationMetric.Inc(exhaustedMicros)
}
sg.usedSlots--
if sg.usedSlots < 0 {
panic(errors.AssertionFailedf("used slots is negative %d", sg.usedSlots))
Expand All @@ -190,6 +201,9 @@ func (sg *slotGranter) tookWithoutPermissionLocked(count int64, _ int8) {
panic(errors.AssertionFailedf("unexpected count: %d", count))
}
sg.usedSlots++
if sg.usedSlots == sg.totalHighLoadSlots && sg.slotsExhaustedDurationMetric != nil {
sg.exhaustedStart = timeutil.Now()
}
sg.usedSlotsMetric.Update(int64(sg.usedSlots))
}

Expand Down Expand Up @@ -219,6 +233,32 @@ func (sg *slotGranter) tryGrantLocked(grantChainID grantChainID) grantResult {
return res
}

//gcassert:inline
func (sg *slotGranter) setTotalHighLoadSlotsLocked(totalHighLoadSlots int) {
// Mid-stack inlining.
if totalHighLoadSlots == sg.totalHighLoadSlots {
return
}
sg.setTotalHighLoadSlotsLockedInternal(totalHighLoadSlots)
}

func (sg *slotGranter) setTotalHighLoadSlotsLockedInternal(totalHighLoadSlots int) {
if sg.slotsExhaustedDurationMetric != nil {
if totalHighLoadSlots > sg.totalHighLoadSlots {
if sg.totalHighLoadSlots <= sg.usedSlots && totalHighLoadSlots > sg.usedSlots {
now := timeutil.Now()
exhaustedMicros := now.Sub(sg.exhaustedStart).Microseconds()
sg.slotsExhaustedDurationMetric.Inc(exhaustedMicros)
}
} else if totalHighLoadSlots < sg.totalHighLoadSlots {
if sg.totalHighLoadSlots > sg.usedSlots && totalHighLoadSlots <= sg.usedSlots {
sg.exhaustedStart = timeutil.Now()
}
}
}
sg.totalHighLoadSlots = totalHighLoadSlots
}

// tokenGranter implements granterWithLockedCalls.
type tokenGranter struct {
coord *GrantCoordinator
Expand Down Expand Up @@ -692,6 +732,42 @@ var (
Measurement: "Slots",
Unit: metric.Unit_COUNT,
}
// NB: this metric is independent of whether slots enforcement is happening
// or not.
kvSlotsExhaustedDuration = metric.Metadata{
Name: "admission.granter.slots_exhausted_duration.kv",
Help: "Total duration when KV slots were exhausted, in micros",
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
// We have a metric for both short and long period. These metrics use the
// period provided in CPULoad and not wall time. So if the sum of the rate
// of these two is < 1sec/sec, the CPULoad ticks are not happening at the
// expected frequency (this could happen due to CPU overload).
kvCPULoadShortPeriodDuration = metric.Metadata{
Name: "admission.granter.cpu_load_short_period_duration.kv",
Help: "Total duration when CPULoad was being called with a short period, in micros",
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
kvCPULoadLongPeriodDuration = metric.Metadata{
Name: "admission.granter.cpu_load_long_period_duration.kv",
Help: "Total duration when CPULoad was being called with a long period, in micros",
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
kvSlotAdjusterIncrements = metric.Metadata{
Name: "admission.granter.slot_adjuster_increments.kv",
Help: "Number of increments of the total KV slots",
Measurement: "Slots",
Unit: metric.Unit_COUNT,
}
kvSlotAdjusterDecrements = metric.Metadata{
Name: "admission.granter.slot_adjuster_decrements.kv",
Help: "Number of decrements of the total KV slots",
Measurement: "Slots",
Unit: metric.Unit_COUNT,
}
kvIOTokensExhaustedDuration = metric.Metadata{
Name: "admission.granter.io_tokens_exhausted_duration.kv",
Help: "Total duration when IO tokens were exhausted, in micros",
Expand Down
12 changes: 11 additions & 1 deletion pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,17 @@ func TestGranterBasic(t *testing.T) {
samplePeriod = 250 * time.Millisecond
}
coord.CPULoad(runnable, procs, samplePeriod)
return flushAndReset()
str := flushAndReset()
kvsa := coord.cpuLoadListener.(*kvSlotAdjuster)
microsToMillis := func(micros int64) int64 {
return micros * int64(time.Microsecond) / int64(time.Millisecond)
}
return fmt.Sprintf("%sSlotAdjuster metrics: slots: %d, duration (short, long) millis: (%d, %d), inc: %d, dec: %d\n",
str, kvsa.totalSlotsMetric.Value(),
microsToMillis(kvsa.cpuLoadShortPeriodDurationMetric.Count()),
microsToMillis(kvsa.cpuLoadLongPeriodDurationMetric.Count()),
kvsa.slotAdjusterIncrementsMetric.Count(), kvsa.slotAdjusterDecrementsMetric.Count(),
)

case "set-io-tokens":
var tokens int
Expand Down
42 changes: 32 additions & 10 deletions pkg/util/admission/kv_slot_adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ type kvSlotAdjuster struct {
// A 0 value indicates that there is no override.
runnableAlphaOverride float64

totalSlotsMetric *metric.Gauge
totalModerateSlotsMetric *metric.Gauge
totalSlotsMetric *metric.Gauge
totalModerateSlotsMetric *metric.Gauge
cpuLoadShortPeriodDurationMetric *metric.Counter
cpuLoadLongPeriodDurationMetric *metric.Counter
slotAdjusterIncrementsMetric *metric.Counter
slotAdjusterDecrementsMetric *metric.Counter
}

var _ cpuOverloadIndicator = &kvSlotAdjuster{}
Expand All @@ -68,6 +72,12 @@ var _ CPULoadListener = &kvSlotAdjuster{}
func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, samplePeriod time.Duration) {
threshold := int(KVSlotAdjusterOverloadThreshold.Get(&kvsa.settings.SV))

periodDurationMicros := samplePeriod.Microseconds()
if samplePeriod > time.Millisecond {
kvsa.cpuLoadLongPeriodDurationMetric.Inc(periodDurationMicros)
} else {
kvsa.cpuLoadShortPeriodDurationMetric.Inc(periodDurationMicros)
}
// 0.009 gives weight to at least a few hundred samples at a 1ms sampling rate.
alpha := 0.009 * float64(samplePeriod/time.Millisecond)
if alpha > 0.5 {
Expand All @@ -83,7 +93,7 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, samplePeriod time.D
// Simple heuristic, which worked ok in experiments. More sophisticated ones
// could be devised.
usedSlots := kvsa.granter.usedSlots + kvsa.granter.usedSoftSlots
tryDecreaseSlots := func(total int) int {
tryDecreaseSlots := func(total int, adjustMetric bool) int {
// Overload.
// If using some slots, and the used slots is less than the total slots,
// and total slots hasn't bottomed out at the min, decrease the total
Expand All @@ -98,10 +108,13 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, samplePeriod time.D
// signal or other ways to prevent a fast drop.
if usedSlots > 0 && total > kvsa.minCPUSlots && usedSlots <= total {
total--
if adjustMetric {
kvsa.slotAdjusterDecrementsMetric.Inc(1)
}
}
return total
}
tryIncreaseSlots := func(total int) int {
tryIncreaseSlots := func(total int, adjustMetric bool) int {
// Underload.
// Used all its slots and can increase further, so additive increase. We
// also handle the case where the used slots are a bit less than total
Expand All @@ -114,21 +127,29 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, samplePeriod time.D
// decrease by 1000 slots every second (because the CPULoad ticks are at
// 1ms intervals, and we do additive decrease).
total++
if adjustMetric {
kvsa.slotAdjusterIncrementsMetric.Inc(1)
}
}
return total
}

if runnable >= threshold*procs {
// Very overloaded.
kvsa.granter.totalHighLoadSlots = tryDecreaseSlots(kvsa.granter.totalHighLoadSlots)
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(kvsa.granter.totalModerateLoadSlots)
kvsa.granter.setTotalHighLoadSlotsLocked(
tryDecreaseSlots(kvsa.granter.totalHighLoadSlots, true))
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(
kvsa.granter.totalModerateLoadSlots, false)
} else if float64(runnable) <= float64((threshold*procs)/4) {
// Very underloaded.
kvsa.granter.totalHighLoadSlots = tryIncreaseSlots(kvsa.granter.totalHighLoadSlots)
kvsa.granter.totalModerateLoadSlots = tryIncreaseSlots(kvsa.granter.totalModerateLoadSlots)
kvsa.granter.setTotalHighLoadSlotsLocked(
tryIncreaseSlots(kvsa.granter.totalHighLoadSlots, true))
kvsa.granter.totalModerateLoadSlots = tryIncreaseSlots(
kvsa.granter.totalModerateLoadSlots, false)
} else if float64(runnable) <= float64((threshold*procs)/2) {
// Moderately underloaded -- can afford to increase regular slots.
kvsa.granter.totalHighLoadSlots = tryIncreaseSlots(kvsa.granter.totalHighLoadSlots)
kvsa.granter.setTotalHighLoadSlotsLocked(
tryIncreaseSlots(kvsa.granter.totalHighLoadSlots, true))
} else if runnable >= 3*threshold*procs/4 {
// Moderately overloaded -- should decrease moderate load slots.
//
Expand All @@ -144,7 +165,8 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, samplePeriod time.D
// Where this will help is when what is pushing us over moderate load is
// optional background work, so by decreasing totalModerateLoadSlots we will
// contain the load due to that work.
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(kvsa.granter.totalModerateLoadSlots)
kvsa.granter.totalModerateLoadSlots = tryDecreaseSlots(
kvsa.granter.totalModerateLoadSlots, false)
}
// Consider the following cases, when we started this method with
// totalHighLoadSlots==totalModerateLoadSlots.
Expand Down
Loading

0 comments on commit 1b80198

Please sign in to comment.