diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index b2c0952d122c..536a0d123d35 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -269,7 +269,7 @@ type granterWithIOTokens interface { // negative, though that will be rare, since it is possible for tokens to be // returned. setAvailableTokens( - ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64, + ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64, elasticDiskReadTokens int64, ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskWriteTokensCapacity int64, lastTick bool, ) (tokensUsed int64, tokensUsedByElasticWork int64) diff --git a/pkg/util/admission/disk_bandwidth.go b/pkg/util/admission/disk_bandwidth.go index 511d00ce8812..766f5add0f49 100644 --- a/pkg/util/admission/disk_bandwidth.go +++ b/pkg/util/admission/disk_bandwidth.go @@ -122,12 +122,13 @@ type diskTokens struct { func (d *diskBandwidthLimiter) computeElasticTokens( id intervalDiskLoadInfo, usedTokens [admissionpb.NumStoreWorkTypes]diskTokens, ) diskTokens { - // We are using disk read bytes over the previous adjustment interval as a - // proxy for future reads. It is a somewhat bad proxy, but for now we are ok - // with the inaccuracy. This will be improved once we start to account for - // disk reads in AC. // TODO(aaditya): Include calculation for read and IOPS. // Issue: https://github.com/cockroachdb/cockroach/issues/107623 + + // We are using disk read bytes over the previous adjustment interval as a + // proxy for future reads. This is a bad estimate, but we account for errors + // in this estimate separately at a higher frequency. See + // kvStoreTokenGranter.adjustDiskTokenErrorLocked. const alpha = 0.5 smoothedReadBytes := alpha*float64(id.intReadBytes) + alpha*float64(d.state.diskLoad.intReadBytes) // Pessimistic approach using the max value between the smoothed and current @@ -140,7 +141,7 @@ func (d *diskBandwidthLimiter) computeElasticTokens( totalUsedTokens := sumDiskTokens(usedTokens) tokens := diskTokens{ - readByteTokens: 0, + readByteTokens: intReadBytes, writeByteTokens: diskWriteTokens, readIOPSTokens: 0, writeIOPSTokens: 0, @@ -159,14 +160,16 @@ func (d *diskBandwidthLimiter) computeElasticTokens( func (d *diskBandwidthLimiter) SafeFormat(p redact.SafePrinter, _ rune) { ib := humanizeutil.IBytes p.Printf("diskBandwidthLimiter (tokenUtilization %.2f, tokensUsed (elastic %s, "+ - "snapshot %s, regular %s) tokens (write %s (prev %s)), writeBW %s/s, readBW %s/s, "+ - "provisioned %s/s)", + "snapshot %s, regular %s) tokens (write %s (prev %s), read %s (prev %s)), writeBW %s/s, "+ + "readBW %s/s, provisioned %s/s)", d.state.diskBWUtil, ib(d.state.usedTokens[admissionpb.ElasticStoreWorkType].writeByteTokens), ib(d.state.usedTokens[admissionpb.SnapshotIngestStoreWorkType].writeByteTokens), ib(d.state.usedTokens[admissionpb.RegularStoreWorkType].writeByteTokens), ib(d.state.tokens.writeByteTokens), ib(d.state.prevTokens.writeByteTokens), + ib(d.state.tokens.readByteTokens), + ib(d.state.prevTokens.readByteTokens), ib(d.state.diskLoad.intWriteBytes/adjustmentInterval), ib(d.state.diskLoad.intReadBytes/adjustmentInterval), ib(d.state.diskLoad.intProvisionedDiskBytes/adjustmentInterval), diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index 897583bba54a..f01a04f499d1 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -135,6 +135,23 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider( remainingTicks = ticker.remainingTicks() } + // We do error accounting for disk reads and writes. This is important + // since disk token accounting is based on estimates over adjustment + // intervals. Like any model, these linear models have error terms, and + // need to be adjusted for greater accuracy. We adjust for these errors + // at a higher frequency than the adjustment interval. + if ticker.shouldAdjustForError() { + metrics := pebbleMetricsProvider.GetPebbleMetrics() + for _, m := range metrics { + if gc, ok := sgc.gcMap.Load(m.StoreID); ok { + gc.adjustDiskTokenError(m) + } else { + log.Warningf(ctx, + "seeing metrics for unknown storeID %d", m.StoreID) + } + } + } + sgc.gcMap.Range(func(_ roachpb.StoreID, gc *GrantCoordinator) bool { gc.allocateIOTokensTick(int64(remainingTicks)) // true indicates that iteration should continue after the @@ -709,6 +726,17 @@ func (coord *GrantCoordinator) allocateIOTokensTick(remainingTicks int64) { // GrantCoordinators used for IO, so the if-condition is always true. } +// adjustDiskTokenError is used to account for errors in disk read and write +// token estimation. Refer to the comment in adjustDiskTokenErrorLocked for more +// details. +func (coord *GrantCoordinator) adjustDiskTokenError(m StoreMetrics) { + coord.mu.Lock() + defer coord.mu.Unlock() + if storeGranter, ok := coord.granters[KVWork].(*kvStoreTokenGranter); ok { + storeGranter.adjustDiskTokenErrorLocked(m.DiskStats.BytesRead, m.DiskStats.BytesWritten) + } +} + // testingTryGrant is only for unit tests, since they sometimes cut out // support classes like the ioLoadListener. func (coord *GrantCoordinator) testingTryGrant() { diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index ccf5b29d3cf3..e0a979939799 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -302,7 +302,13 @@ type kvStoreTokenGranter struct { // TODO(aaditya): add support for read/IOPS tokens. // Disk bandwidth tokens. diskTokensAvailable diskTokens - diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens + diskTokensError struct { + prevObservedWrites uint64 + prevObservedReads uint64 + diskWriteTokensAlreadyDeducted int64 + diskReadTokensAlreadyDeducted int64 + } + diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens } ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter @@ -423,6 +429,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 { sg.subtractIOTokensLocked(count, count, false) sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens + sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens return grantSuccess } @@ -433,6 +440,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant sg.subtractIOTokensLocked(count, count, false) sg.coordMu.elasticIOTokensUsedByElastic += count sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens + sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens return grantSuccess } @@ -441,6 +449,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant // writeByteTokens. if sg.coordMu.diskTokensAvailable.writeByteTokens > 0 { sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens + sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens return grantSuccess } @@ -477,15 +486,60 @@ func (sg *kvStoreTokenGranter) subtractTokensForStoreWorkTypeLocked( case admissionpb.RegularStoreWorkType, admissionpb.ElasticStoreWorkType: diskTokenCount := sg.writeAmpLM.applyLinearModel(count) sg.coordMu.diskTokensAvailable.writeByteTokens -= diskTokenCount + sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskTokenCount sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskTokenCount case admissionpb.SnapshotIngestStoreWorkType: // Do not apply the writeAmpLM since these writes do not incur additional // write-amp. sg.coordMu.diskTokensAvailable.writeByteTokens -= count + sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += count sg.coordMu.diskTokensUsed[wt].writeByteTokens += count } } +// adjustDiskTokenErrorLocked is used to account for extra reads and writes that +// are in excess of tokens already deducted. The logic here is a little +// different for accounting for reads and writes. +// +// (1) For writes, we deduct tokens at admission for each request. If the actual +// writes seen on disk for a given interval is higher than the tokens already +// deducted, the delta is the write error. This value is then subtracted from +// available disk tokens. +// +// (2) For reads, we do not deduct any tokens at admission. However, we deduct +// tokens in advance during token estimation in the diskBandwidthLimiter for the +// next adjustment interval. We then use this "bucket" of deducted tokens to +// deduct further tokens until we saturate the bucket. Any additional reads will +// then subtract directly from the available disk tokens. +func (sg *kvStoreTokenGranter) adjustDiskTokenErrorLocked(readBytes uint64, writeBytes uint64) { + intWrites := int64(writeBytes - sg.coordMu.diskTokensError.prevObservedWrites) + intReads := int64(readBytes - sg.coordMu.diskTokensError.prevObservedReads) + + // Compensate for error due to writes. + writeError := intWrites - sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted + if writeError > 0 { + if sg.coordMu.diskTokensAvailable.writeByteTokens < 0 { + sg.coordMu.diskTokensAvailable.writeByteTokens = 0 + } + sg.coordMu.diskTokensAvailable.writeByteTokens -= writeError + } + // We have compensated for error, if any, in this interval, so we reset the + // deducted count for the next compensation interval. + sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted = 0 + + // Compensate for error due to reads. + var readError int64 + sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted -= intReads + if sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted < 0 { + readError = -(sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted) + sg.coordMu.diskTokensAvailable.writeByteTokens -= readError + sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted = 0 + } + + sg.coordMu.diskTokensError.prevObservedWrites = writeBytes + sg.coordMu.diskTokensError.prevObservedReads = readBytes +} + func (sg *kvStoreTokenGranter) tookWithoutPermission( workType admissionpb.StoreWorkType, count int64, ) { @@ -592,6 +646,7 @@ func (sg *kvStoreTokenGranter) setAvailableTokens( ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64, + elasticDiskReadTokens int64, ioTokenCapacity int64, elasticIOTokenCapacity int64, elasticDiskWriteTokensCapacity int64, @@ -638,6 +693,7 @@ func (sg *kvStoreTokenGranter) setAvailableTokens( if sg.coordMu.diskTokensAvailable.writeByteTokens > elasticDiskWriteTokensCapacity { sg.coordMu.diskTokensAvailable.writeByteTokens = elasticDiskWriteTokensCapacity } + sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted = elasticDiskReadTokens return ioTokensUsed, ioTokensUsedByElasticWork } diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index 9b8ffc5af213..a45947487be9 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -39,6 +39,7 @@ import ( // cpu-load runnable= procs= [infrequent=] // init-store-grant-coordinator // set-tokens io-tokens= disk-write-tokens= +// adjust-disk-error actual-write-bytes= actual-read-bytes= func TestGranterBasic(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -249,9 +250,13 @@ func TestGranterBasic(t *testing.T) { case "set-tokens-loop": var ioTokens int var elasticDiskWriteTokens int + var elasticDiskReadTokens int var loop int d.ScanArgs(t, "io-tokens", &ioTokens) d.ScanArgs(t, "disk-write-tokens", &elasticDiskWriteTokens) + if d.HasArg("disk-read-tokens") { + d.ScanArgs(t, "disk-read-tokens", &elasticDiskReadTokens) + } d.ScanArgs(t, "loop", &loop) for loop > 0 { @@ -262,6 +267,7 @@ func TestGranterBasic(t *testing.T) { int64(ioTokens), int64(ioTokens), int64(elasticDiskWriteTokens), + int64(elasticDiskReadTokens), int64(ioTokens*250), int64(ioTokens*250), int64(elasticDiskWriteTokens*250), @@ -274,9 +280,13 @@ func TestGranterBasic(t *testing.T) { case "set-tokens": var ioTokens int var elasticDiskWriteTokens int + var elasticDiskReadTokens int var tickInterval int d.ScanArgs(t, "io-tokens", &ioTokens) d.ScanArgs(t, "disk-write-tokens", &elasticDiskWriteTokens) + if d.HasArg("disk-read-tokens") { + d.ScanArgs(t, "disk-read-tokens", &elasticDiskReadTokens) + } elasticIOTokens := ioTokens if d.HasArg("elastic-io-tokens") { d.ScanArgs(t, "elastic-io-tokens", &elasticIOTokens) @@ -298,6 +308,7 @@ func TestGranterBasic(t *testing.T) { int64(ioTokens), int64(elasticIOTokens), int64(elasticDiskWriteTokens), + int64(elasticDiskReadTokens), int64(ioTokens*burstMultiplier), int64(elasticIOTokens*burstMultiplier), int64(elasticDiskWriteTokens*burstMultiplier), @@ -315,6 +326,17 @@ func TestGranterBasic(t *testing.T) { coord.testingTryGrant() return flushAndReset() + case "adjust-disk-error": + var readBytes, writeBytes int + d.ScanArgs(t, "actual-write-bytes", &writeBytes) + d.ScanArgs(t, "actual-read-bytes", &readBytes) + m := StoreMetrics{DiskStats: DiskStats{ + BytesRead: uint64(readBytes), + BytesWritten: uint64(writeBytes), + }} + coord.adjustDiskTokenError(m) + return flushAndReset() + default: return fmt.Sprintf("unknown command: %s", d.Cmd) } diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index 761ef3239e1e..15d7a57d7cef 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -247,11 +247,15 @@ type ioLoadListenerState struct { totalNumElasticByteTokens int64 elasticByteTokensAllocated int64 - // elasticDiskBWTokens represents the tokens to give out until the next call - // to adjustTokens. They are parceled out in small intervals. - // elasticDiskTokensAllocated represents what has been given out. + // elasticDiskWriteTokens represents the tokens to give out until the next + // call to adjustTokens. They are parceled out in small intervals. + // elasticDiskWriteTokensAllocated represents what has been given out. elasticDiskWriteTokens int64 elasticDiskWriteTokensAllocated int64 + // elasticDiskReadTokens are tokens that were already deducted during token + // estimation. These tokens are used for read error accounting. + elasticDiskReadTokens int64 + elasticDiskReadTokensAllocated int64 } type cumStoreCompactionStats struct { @@ -399,6 +403,7 @@ const loadedDuration = tickDuration(1 * time.Millisecond) type tokenAllocationTicker struct { expectedTickDuration time.Duration adjustmentIntervalStartTime time.Time + lastErrorAdjustmentTime time.Time ticker *time.Ticker } @@ -441,6 +446,14 @@ func (t *tokenAllocationTicker) remainingTicks() uint64 { return uint64((remainingTime + t.expectedTickDuration - 1) / t.expectedTickDuration) } +func (t *tokenAllocationTicker) shouldAdjustForError() bool { + if timeutil.Since(t.lastErrorAdjustmentTime) < time.Second { + return false + } + t.lastErrorAdjustmentTime = timeutil.Now() + return true +} + func (t *tokenAllocationTicker) stop() { t.ticker.Stop() *t = tokenAllocationTicker{} @@ -558,9 +571,23 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) { remainingTicks, ) if toAllocateElasticDiskWriteTokens < 0 { - panic(errors.AssertionFailedf("toAllocateElasticDiskBWTokens is negative %d", + panic(errors.AssertionFailedf("toAllocateElasticDiskWriteTokens is negative %d", toAllocateElasticDiskWriteTokens)) } + io.elasticDiskWriteTokensAllocated += toAllocateElasticDiskWriteTokens + + toAllocateElasticDiskReadTokens := + allocateFunc( + io.elasticDiskReadTokens, + io.elasticDiskReadTokensAllocated, + remainingTicks, + ) + if toAllocateElasticDiskReadTokens < 0 { + panic(errors.AssertionFailedf("toAllocateElasticDiskReadTokens is negative %d", + toAllocateElasticDiskWriteTokens)) + } + io.elasticDiskReadTokensAllocated += toAllocateElasticDiskReadTokens + // INVARIANT: toAllocate >= 0. io.byteTokensAllocated += toAllocateByteTokens if io.byteTokensAllocated < 0 { @@ -571,7 +598,6 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) { panic(errors.AssertionFailedf( "tokens allocated is negative %d", io.elasticByteTokensAllocated)) } - io.elasticDiskWriteTokensAllocated += toAllocateElasticDiskWriteTokens tokensMaxCapacity := allocateFunc( io.totalNumByteTokens, 0, unloadedDuration.ticksInAdjustmentInterval(), @@ -585,6 +611,7 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) { toAllocateByteTokens, toAllocateElasticByteTokens, toAllocateElasticDiskWriteTokens, + toAllocateElasticDiskReadTokens, tokensMaxCapacity, elasticTokensMaxCapacity, diskBWTokenMaxCapacity, @@ -647,6 +674,7 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics intDiskLoadInfo, diskTokensUsed) io.elasticDiskWriteTokens = tokens.writeByteTokens io.elasticDiskWriteTokensAllocated = 0 + io.elasticDiskReadTokens = tokens.readByteTokens } if metrics.DiskStats.ProvisionedBandwidth == 0 || !DiskBandwidthTokensForElasticEnabled.Get(&io.settings.SV) { diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index 555f60120ae3..87388a367fdd 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -423,16 +423,19 @@ func (g *testGranterWithIOTokens) setAvailableTokens( ioTokens int64, elasticIOTokens int64, elasticDiskBandwidthTokens int64, + elasticReadBandwidthTokens int64, maxIOTokens int64, maxElasticIOTokens int64, maxElasticDiskBandwidthTokens int64, lastTick bool, ) (tokensUsed int64, tokensUsedByElasticWork int64) { fmt.Fprintf(&g.buf, "setAvailableTokens: io-tokens=%s(elastic %s) "+ - "elastic-disk-bw-tokens=%s max-byte-tokens=%s(elastic %s) max-disk-bw-tokens=%s lastTick=%t", + "elastic-disk-bw-tokens=%s read-bw-tokens=%s "+ + "max-byte-tokens=%s(elastic %s) max-disk-bw-tokens=%s lastTick=%t", tokensForTokenTickDurationToString(ioTokens), tokensForTokenTickDurationToString(elasticIOTokens), tokensForTokenTickDurationToString(elasticDiskBandwidthTokens), + tokensForTokenTickDurationToString(elasticReadBandwidthTokens), tokensForTokenTickDurationToString(maxIOTokens), tokensForTokenTickDurationToString(maxElasticIOTokens), tokensForTokenTickDurationToString(maxElasticDiskBandwidthTokens), @@ -486,6 +489,7 @@ func (g *testGranterNonNegativeTokens) setAvailableTokens( ioTokens int64, elasticIOTokens int64, elasticDiskBandwidthTokens int64, + elasticReadBandiwdthTokens int64, _ int64, _ int64, _ int64, diff --git a/pkg/util/admission/testdata/disk_bandwidth_limiter b/pkg/util/admission/testdata/disk_bandwidth_limiter index 92c8bec1bcc9..c0d734a82345 100644 --- a/pkg/util/admission/testdata/disk_bandwidth_limiter +++ b/pkg/util/admission/testdata/disk_bandwidth_limiter @@ -1,18 +1,18 @@ init ---- -diskBandwidthLimiter (tokenUtilization 0.00, tokensUsed (elastic 0 B, snapshot 0 B, regular 0 B) tokens (write 0 B (prev 0 B)), writeBW 0 B/s, readBW 0 B/s, provisioned 0 B/s) +diskBandwidthLimiter (tokenUtilization 0.00, tokensUsed (elastic 0 B, snapshot 0 B, regular 0 B) tokens (write 0 B (prev 0 B), read 0 B (prev 0 B)), writeBW 0 B/s, readBW 0 B/s, provisioned 0 B/s) # Since token utilization is from the previous estimation loop, we expect that to be +Inf in the first iteration. compute int-read-bytes=50 int-write-bytes=100 int-provisioned-bytes=1000 regular-tokens-used=60 snapshot-tokens-used=5 elastic-tokens-used=20 ---- -diskBandwidthLimiter (tokenUtilization +Inf, tokensUsed (elastic 20 B, snapshot 5 B, regular 60 B) tokens (write 850 B (prev 0 B)), writeBW 6 B/s, readBW 3 B/s, provisioned 66 B/s) +diskBandwidthLimiter (tokenUtilization +Inf, tokensUsed (elastic 20 B, snapshot 5 B, regular 60 B) tokens (write 850 B (prev 0 B), read 50 B (prev 0 B)), writeBW 6 B/s, readBW 3 B/s, provisioned 66 B/s) # Utilization is now not Inf. For token calculation, we adjust using the max of smoothed read bytes and the current value. Here we use 80. compute int-read-bytes=80 int-write-bytes=150 int-provisioned-bytes=800 regular-tokens-used=100 snapshot-tokens-used=5 elastic-tokens-used=30 ---- -diskBandwidthLimiter (tokenUtilization 0.16, tokensUsed (elastic 30 B, snapshot 5 B, regular 100 B) tokens (write 640 B (prev 850 B)), writeBW 10 B/s, readBW 5 B/s, provisioned 53 B/s) +diskBandwidthLimiter (tokenUtilization 0.16, tokensUsed (elastic 30 B, snapshot 5 B, regular 100 B) tokens (write 640 B (prev 850 B), read 80 B (prev 50 B)), writeBW 10 B/s, readBW 5 B/s, provisioned 53 B/s) # Now we use 65 for read bandwidth adjustment. compute int-read-bytes=50 int-write-bytes=100 int-provisioned-bytes=800 regular-tokens-used=10 snapshot-tokens-used=5 elastic-tokens-used=30 ---- -diskBandwidthLimiter (tokenUtilization 0.07, tokensUsed (elastic 30 B, snapshot 5 B, regular 10 B) tokens (write 655 B (prev 640 B)), writeBW 6 B/s, readBW 3 B/s, provisioned 53 B/s) +diskBandwidthLimiter (tokenUtilization 0.07, tokensUsed (elastic 30 B, snapshot 5 B, regular 10 B) tokens (write 655 B (prev 640 B), read 65 B (prev 80 B)), writeBW 6 B/s, readBW 3 B/s, provisioned 53 B/s) diff --git a/pkg/util/admission/testdata/granter b/pkg/util/admission/testdata/granter index d859aedaf0c6..9dc842b453ff 100644 --- a/pkg/util/admission/testdata/granter +++ b/pkg/util/admission/testdata/granter @@ -824,3 +824,38 @@ return-grant work=kv-snapshot v=20 kv-snapshot: returnGrant(20) GrantCoordinator: (chain: id: 0 active: false index: 5) io-avail: 5(5), disk-write-tokens-avail: 20 + + +##################################################################### +# Test store grant coordinator disk error accounting +init-store-grant-coordinator +---- +GrantCoordinator: +(chain: id: 0 active: false index: 5) io-avail: 153722867280912930(153722867280912930), disk-write-tokens-avail: 153722867280912930 + +set-tokens io-tokens=50 disk-write-tokens=70 disk-read-tokens=20 tick-interval=250 +---- +GrantCoordinator: +(chain: id: 0 active: false index: 5) io-avail: 50(50), disk-write-tokens-avail: 70 + +# Since we did not admit any write requests. Tokens already deducted is 0. Any +# writes that occur will be considered error. We accounted for 20B of reads, so +# reads in excess of that will be error. err = (10-0) + (40-20) = 30. +adjust-disk-error actual-write-bytes=10 actual-read-bytes=40 +---- +GrantCoordinator: +(chain: id: 0 active: false index: 5) io-avail: 50(50), disk-write-tokens-avail: 40 + +try-get work=kv v=10 +---- +kv-regular: tryGet(10) returned true +GrantCoordinator: +(chain: id: 0 active: false index: 5) io-avail: 40(40), disk-write-tokens-avail: 30 + +# +20B writes and +5B reads since last error accounting. Since we already +# deducted 10B of write tokens during admission, the error here is 10B. We +# exhausted the read token bucket already, so we will deduct directly this time. +adjust-disk-error actual-write-bytes=30 actual-read-bytes=45 +---- +GrantCoordinator: +(chain: id: 0 active: false index: 5) io-avail: 40(40), disk-write-tokens-avail: 15