diff --git a/pkg/cmd/roachtest/tests/admission_control_disk_bandwidth_overload.go b/pkg/cmd/roachtest/tests/admission_control_disk_bandwidth_overload.go index e8d361831e23..2537f6f1885d 100644 --- a/pkg/cmd/roachtest/tests/admission_control_disk_bandwidth_overload.go +++ b/pkg/cmd/roachtest/tests/admission_control_disk_bandwidth_overload.go @@ -181,8 +181,10 @@ func registerDiskBandwidthOverload(r registry.Registry) { continue } totalBW := writeVal + readVal - if totalBW > bandwidthThreshold { - t.Fatalf("write + read bandwidth %f (%f + %f) exceeded threshold of %f", totalBW, writeVal, readVal, bandwidthThreshold) + // TODO(aaditya): We should be asserting on total bandwidth once reads + // are being paced. + if writeVal > bandwidthThreshold { + t.Fatalf("write bandwidth %f exceeded threshold of %f, read bandwidth: %f, total bandwidth: %f", writeVal, bandwidthThreshold, readVal, totalBW) } numSuccesses++ } diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index b2c0952d122c..536a0d123d35 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -269,7 +269,7 @@ type granterWithIOTokens interface { // negative, though that will be rare, since it is possible for tokens to be // returned. setAvailableTokens( - ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64, + ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64, elasticDiskReadTokens int64, ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskWriteTokensCapacity int64, lastTick bool, ) (tokensUsed int64, tokensUsedByElasticWork int64) diff --git a/pkg/util/admission/disk_bandwidth.go b/pkg/util/admission/disk_bandwidth.go index 511d00ce8812..bf2fd334a380 100644 --- a/pkg/util/admission/disk_bandwidth.go +++ b/pkg/util/admission/disk_bandwidth.go @@ -122,12 +122,13 @@ type diskTokens struct { func (d *diskBandwidthLimiter) computeElasticTokens( id intervalDiskLoadInfo, usedTokens [admissionpb.NumStoreWorkTypes]diskTokens, ) diskTokens { - // We are using disk read bytes over the previous adjustment interval as a - // proxy for future reads. It is a somewhat bad proxy, but for now we are ok - // with the inaccuracy. This will be improved once we start to account for - // disk reads in AC. // TODO(aaditya): Include calculation for read and IOPS. // Issue: https://github.com/cockroachdb/cockroach/issues/107623 + + // We are using disk read bytes over the previous adjustment interval as a + // proxy for future reads. This is a bad estimate, but we account for errors + // in this estimate separately at a higher frequency. See + // kvStoreTokenGranter.adjustDiskTokenErrorLocked. const alpha = 0.5 smoothedReadBytes := alpha*float64(id.intReadBytes) + alpha*float64(d.state.diskLoad.intReadBytes) // Pessimistic approach using the max value between the smoothed and current @@ -140,7 +141,7 @@ func (d *diskBandwidthLimiter) computeElasticTokens( totalUsedTokens := sumDiskTokens(usedTokens) tokens := diskTokens{ - readByteTokens: 0, + readByteTokens: intReadBytes, writeByteTokens: diskWriteTokens, readIOPSTokens: 0, writeIOPSTokens: 0, diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index 897583bba54a..ab6636086d52 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -135,6 +135,23 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider( remainingTicks = ticker.remainingTicks() } + // We do error accounting for disk reads and writes. This is important + // since disk token accounting is based on estimates over adjustment + // intervals. These estimates are linear models that have errors that + // would be otherwise unaccounted for. We adjust for these errors at a + // higher frequency than the adjustment interval. + if ticker.shouldAdjustForError() { + metrics := pebbleMetricsProvider.GetPebbleMetrics() + for _, m := range metrics { + if gc, ok := sgc.gcMap.Load(m.StoreID); ok { + gc.adjustDiskTokenError(m) + } else { + log.Warningf(ctx, + "seeing metrics for unknown storeID %d", m.StoreID) + } + } + } + sgc.gcMap.Range(func(_ roachpb.StoreID, gc *GrantCoordinator) bool { gc.allocateIOTokensTick(int64(remainingTicks)) // true indicates that iteration should continue after the @@ -709,6 +726,13 @@ func (coord *GrantCoordinator) allocateIOTokensTick(remainingTicks int64) { // GrantCoordinators used for IO, so the if-condition is always true. } +// adjustDiskTokenError is used to +func (coord *GrantCoordinator) adjustDiskTokenError(m StoreMetrics) { + coord.mu.Lock() + defer coord.mu.Unlock() + coord.granters[KVWork].(*kvStoreTokenGranter).adjustDiskTokenErrorLocked(m.DiskStats.BytesRead, m.DiskStats.BytesWritten) +} + // testingTryGrant is only for unit tests, since they sometimes cut out // support classes like the ioLoadListener. func (coord *GrantCoordinator) testingTryGrant() { diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index ccf5b29d3cf3..feb59c7b8346 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -302,7 +302,13 @@ type kvStoreTokenGranter struct { // TODO(aaditya): add support for read/IOPS tokens. // Disk bandwidth tokens. diskTokensAvailable diskTokens - diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens + diskTokensError struct { + prevObservedWrites uint64 + prevObservedReads uint64 + diskWriteTokensAlreadyDeducted int64 + diskReadTokensAlreadyDeducted int64 + } + diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens } ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter @@ -477,15 +483,47 @@ func (sg *kvStoreTokenGranter) subtractTokensForStoreWorkTypeLocked( case admissionpb.RegularStoreWorkType, admissionpb.ElasticStoreWorkType: diskTokenCount := sg.writeAmpLM.applyLinearModel(count) sg.coordMu.diskTokensAvailable.writeByteTokens -= diskTokenCount + sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskTokenCount sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskTokenCount case admissionpb.SnapshotIngestStoreWorkType: // Do not apply the writeAmpLM since these writes do not incur additional // write-amp. sg.coordMu.diskTokensAvailable.writeByteTokens -= count + sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += count sg.coordMu.diskTokensUsed[wt].writeByteTokens += count } } +// TODO(aaditya): Explain the equations below. +func (sg *kvStoreTokenGranter) adjustDiskTokenErrorLocked(readBytes uint64, writeBytes uint64) { + intWrites := int64(writeBytes - sg.coordMu.diskTokensError.prevObservedWrites) + intReads := int64(readBytes - sg.coordMu.diskTokensError.prevObservedReads) + + // Compensate for error due to writes. + writeError := intWrites - sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted + if writeError > 0 { + if sg.coordMu.diskTokensAvailable.writeByteTokens < 0 { + sg.coordMu.diskTokensAvailable.writeByteTokens = 0 + } + sg.coordMu.diskTokensAvailable.writeByteTokens -= writeError + } + // We have compensated for error, if any, in this interval, so we reset the + // deducted count for the next compensation interval. + sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted = 0 + + // Compensate for error due to reads. + var readError int64 + sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted -= intReads + if sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted < 0 { + readError = -(sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted) + sg.coordMu.diskTokensAvailable.writeByteTokens -= readError + sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted = 0 + } + + sg.coordMu.diskTokensError.prevObservedWrites = writeBytes + sg.coordMu.diskTokensError.prevObservedReads = readBytes +} + func (sg *kvStoreTokenGranter) tookWithoutPermission( workType admissionpb.StoreWorkType, count int64, ) { @@ -592,6 +630,7 @@ func (sg *kvStoreTokenGranter) setAvailableTokens( ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64, + elasticDiskReadTokens int64, ioTokenCapacity int64, elasticIOTokenCapacity int64, elasticDiskWriteTokensCapacity int64, @@ -638,6 +677,7 @@ func (sg *kvStoreTokenGranter) setAvailableTokens( if sg.coordMu.diskTokensAvailable.writeByteTokens > elasticDiskWriteTokensCapacity { sg.coordMu.diskTokensAvailable.writeByteTokens = elasticDiskWriteTokensCapacity } + sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted = elasticDiskReadTokens return ioTokensUsed, ioTokensUsedByElasticWork } diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index 761ef3239e1e..15d7a57d7cef 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -247,11 +247,15 @@ type ioLoadListenerState struct { totalNumElasticByteTokens int64 elasticByteTokensAllocated int64 - // elasticDiskBWTokens represents the tokens to give out until the next call - // to adjustTokens. They are parceled out in small intervals. - // elasticDiskTokensAllocated represents what has been given out. + // elasticDiskWriteTokens represents the tokens to give out until the next + // call to adjustTokens. They are parceled out in small intervals. + // elasticDiskWriteTokensAllocated represents what has been given out. elasticDiskWriteTokens int64 elasticDiskWriteTokensAllocated int64 + // elasticDiskReadTokens are tokens that were already deducted during token + // estimation. These tokens are used for read error accounting. + elasticDiskReadTokens int64 + elasticDiskReadTokensAllocated int64 } type cumStoreCompactionStats struct { @@ -399,6 +403,7 @@ const loadedDuration = tickDuration(1 * time.Millisecond) type tokenAllocationTicker struct { expectedTickDuration time.Duration adjustmentIntervalStartTime time.Time + lastErrorAdjustmentTime time.Time ticker *time.Ticker } @@ -441,6 +446,14 @@ func (t *tokenAllocationTicker) remainingTicks() uint64 { return uint64((remainingTime + t.expectedTickDuration - 1) / t.expectedTickDuration) } +func (t *tokenAllocationTicker) shouldAdjustForError() bool { + if timeutil.Since(t.lastErrorAdjustmentTime) < time.Second { + return false + } + t.lastErrorAdjustmentTime = timeutil.Now() + return true +} + func (t *tokenAllocationTicker) stop() { t.ticker.Stop() *t = tokenAllocationTicker{} @@ -558,9 +571,23 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) { remainingTicks, ) if toAllocateElasticDiskWriteTokens < 0 { - panic(errors.AssertionFailedf("toAllocateElasticDiskBWTokens is negative %d", + panic(errors.AssertionFailedf("toAllocateElasticDiskWriteTokens is negative %d", toAllocateElasticDiskWriteTokens)) } + io.elasticDiskWriteTokensAllocated += toAllocateElasticDiskWriteTokens + + toAllocateElasticDiskReadTokens := + allocateFunc( + io.elasticDiskReadTokens, + io.elasticDiskReadTokensAllocated, + remainingTicks, + ) + if toAllocateElasticDiskReadTokens < 0 { + panic(errors.AssertionFailedf("toAllocateElasticDiskReadTokens is negative %d", + toAllocateElasticDiskWriteTokens)) + } + io.elasticDiskReadTokensAllocated += toAllocateElasticDiskReadTokens + // INVARIANT: toAllocate >= 0. io.byteTokensAllocated += toAllocateByteTokens if io.byteTokensAllocated < 0 { @@ -571,7 +598,6 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) { panic(errors.AssertionFailedf( "tokens allocated is negative %d", io.elasticByteTokensAllocated)) } - io.elasticDiskWriteTokensAllocated += toAllocateElasticDiskWriteTokens tokensMaxCapacity := allocateFunc( io.totalNumByteTokens, 0, unloadedDuration.ticksInAdjustmentInterval(), @@ -585,6 +611,7 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) { toAllocateByteTokens, toAllocateElasticByteTokens, toAllocateElasticDiskWriteTokens, + toAllocateElasticDiskReadTokens, tokensMaxCapacity, elasticTokensMaxCapacity, diskBWTokenMaxCapacity, @@ -647,6 +674,7 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics intDiskLoadInfo, diskTokensUsed) io.elasticDiskWriteTokens = tokens.writeByteTokens io.elasticDiskWriteTokensAllocated = 0 + io.elasticDiskReadTokens = tokens.readByteTokens } if metrics.DiskStats.ProvisionedBandwidth == 0 || !DiskBandwidthTokensForElasticEnabled.Get(&io.settings.SV) { diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index 555f60120ae3..036cfc523cd5 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -423,6 +423,7 @@ func (g *testGranterWithIOTokens) setAvailableTokens( ioTokens int64, elasticIOTokens int64, elasticDiskBandwidthTokens int64, + elasticReadBandwidthTokens int64, maxIOTokens int64, maxElasticIOTokens int64, maxElasticDiskBandwidthTokens int64, @@ -486,6 +487,7 @@ func (g *testGranterNonNegativeTokens) setAvailableTokens( ioTokens int64, elasticIOTokens int64, elasticDiskBandwidthTokens int64, + elasticReadBandiwdthTokens int64, _ int64, _ int64, _ int64,