Skip to content

Commit

Permalink
admission: error accounting for disk reads and writes
Browse files Browse the repository at this point in the history
Previously, we would ignore errors in disk token accounting hoping that
the next adjustment interval would capture them. This caused issues when
async compactions ran within an adjustment interval. These could spike
the bandwidth for a section of the 15 second interval without moving the
average too much (write-amp estimation).

This patch introduces a higher frequency error accounting mechanism. It
runs every 1s to account for any additional reads and writes that could
have occurred. Errors are only deducted if the read and/or write
bandwidth in the interval is greater than the number of tokens already
deducted in that interval.

Fixes cockroachdb#132332.

Release note: None
  • Loading branch information
aadityasondhi committed Nov 8, 2024
1 parent bc87198 commit 4942021
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ type granterWithIOTokens interface {
// negative, though that will be rare, since it is possible for tokens to be
// returned.
setAvailableTokens(
ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64,
ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64, elasticDiskReadTokens int64,
ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskWriteTokensCapacity int64,
lastTick bool,
) (tokensUsed int64, tokensUsedByElasticWork int64)
Expand Down
17 changes: 10 additions & 7 deletions pkg/util/admission/disk_bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ type diskTokens struct {
func (d *diskBandwidthLimiter) computeElasticTokens(
id intervalDiskLoadInfo, usedTokens [admissionpb.NumStoreWorkTypes]diskTokens,
) diskTokens {
// We are using disk read bytes over the previous adjustment interval as a
// proxy for future reads. It is a somewhat bad proxy, but for now we are ok
// with the inaccuracy. This will be improved once we start to account for
// disk reads in AC.
// TODO(aaditya): Include calculation for read and IOPS.
// Issue: https://github.com/cockroachdb/cockroach/issues/107623

// We are using disk read bytes over the previous adjustment interval as a
// proxy for future reads. This is a bad estimate, but we account for errors
// in this estimate separately at a higher frequency. See
// kvStoreTokenGranter.adjustDiskTokenErrorLocked.
const alpha = 0.5
smoothedReadBytes := alpha*float64(id.intReadBytes) + alpha*float64(d.state.diskLoad.intReadBytes)
// Pessimistic approach using the max value between the smoothed and current
Expand All @@ -140,7 +141,7 @@ func (d *diskBandwidthLimiter) computeElasticTokens(

totalUsedTokens := sumDiskTokens(usedTokens)
tokens := diskTokens{
readByteTokens: 0,
readByteTokens: intReadBytes,
writeByteTokens: diskWriteTokens,
readIOPSTokens: 0,
writeIOPSTokens: 0,
Expand All @@ -159,14 +160,16 @@ func (d *diskBandwidthLimiter) computeElasticTokens(
func (d *diskBandwidthLimiter) SafeFormat(p redact.SafePrinter, _ rune) {
ib := humanizeutil.IBytes
p.Printf("diskBandwidthLimiter (tokenUtilization %.2f, tokensUsed (elastic %s, "+
"snapshot %s, regular %s) tokens (write %s (prev %s)), writeBW %s/s, readBW %s/s, "+
"provisioned %s/s)",
"snapshot %s, regular %s) tokens (write %s (prev %s), read %s (prev %s)), writeBW %s/s, "+
"readBW %s/s, provisioned %s/s)",
d.state.diskBWUtil,
ib(d.state.usedTokens[admissionpb.ElasticStoreWorkType].writeByteTokens),
ib(d.state.usedTokens[admissionpb.SnapshotIngestStoreWorkType].writeByteTokens),
ib(d.state.usedTokens[admissionpb.RegularStoreWorkType].writeByteTokens),
ib(d.state.tokens.writeByteTokens),
ib(d.state.prevTokens.writeByteTokens),
ib(d.state.tokens.readByteTokens),
ib(d.state.prevTokens.readByteTokens),
ib(d.state.diskLoad.intWriteBytes/adjustmentInterval),
ib(d.state.diskLoad.intReadBytes/adjustmentInterval),
ib(d.state.diskLoad.intProvisionedDiskBytes/adjustmentInterval),
Expand Down
28 changes: 28 additions & 0 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,23 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
remainingTicks = ticker.remainingTicks()
}

// We do error accounting for disk reads and writes. This is important
// since disk token accounting is based on estimates over adjustment
// intervals. Like any model, these linear models have error terms, and
// need to be adjusted for greater accuracy. We adjust for these errors
// at a higher frequency than the adjustment interval.
if ticker.shouldAdjustForError() {
metrics := pebbleMetricsProvider.GetPebbleMetrics()
for _, m := range metrics {
if gc, ok := sgc.gcMap.Load(m.StoreID); ok {
gc.adjustDiskTokenError(m)
} else {
log.Warningf(ctx,
"seeing metrics for unknown storeID %d", m.StoreID)
}
}
}

sgc.gcMap.Range(func(_ roachpb.StoreID, gc *GrantCoordinator) bool {
gc.allocateIOTokensTick(int64(remainingTicks))
// true indicates that iteration should continue after the
Expand Down Expand Up @@ -709,6 +726,17 @@ func (coord *GrantCoordinator) allocateIOTokensTick(remainingTicks int64) {
// GrantCoordinators used for IO, so the if-condition is always true.
}

// adjustDiskTokenError is used to account for errors in disk read and write
// token estimation. Refer to the comment in adjustDiskTokenErrorLocked for more
// details.
func (coord *GrantCoordinator) adjustDiskTokenError(m StoreMetrics) {
coord.mu.Lock()
defer coord.mu.Unlock()
if storeGranter, ok := coord.granters[KVWork].(*kvStoreTokenGranter); ok {
storeGranter.adjustDiskTokenErrorLocked(m.DiskStats.BytesRead, m.DiskStats.BytesWritten)
}
}

// testingTryGrant is only for unit tests, since they sometimes cut out
// support classes like the ioLoadListener.
func (coord *GrantCoordinator) testingTryGrant() {
Expand Down
58 changes: 57 additions & 1 deletion pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,13 @@ type kvStoreTokenGranter struct {
// TODO(aaditya): add support for read/IOPS tokens.
// Disk bandwidth tokens.
diskTokensAvailable diskTokens
diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens
diskTokensError struct {
prevObservedWrites uint64
prevObservedReads uint64
diskWriteTokensAlreadyDeducted int64
diskReadTokensAlreadyDeducted int64
}
diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens
}

ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter
Expand Down Expand Up @@ -423,6 +429,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 {
sg.subtractIOTokensLocked(count, count, false)
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens
return grantSuccess
}
Expand All @@ -433,6 +440,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
sg.subtractIOTokensLocked(count, count, false)
sg.coordMu.elasticIOTokensUsedByElastic += count
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens
return grantSuccess
}
Expand All @@ -441,6 +449,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
// writeByteTokens.
if sg.coordMu.diskTokensAvailable.writeByteTokens > 0 {
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens
return grantSuccess
}
Expand Down Expand Up @@ -477,15 +486,60 @@ func (sg *kvStoreTokenGranter) subtractTokensForStoreWorkTypeLocked(
case admissionpb.RegularStoreWorkType, admissionpb.ElasticStoreWorkType:
diskTokenCount := sg.writeAmpLM.applyLinearModel(count)
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskTokenCount
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskTokenCount
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskTokenCount
case admissionpb.SnapshotIngestStoreWorkType:
// Do not apply the writeAmpLM since these writes do not incur additional
// write-amp.
sg.coordMu.diskTokensAvailable.writeByteTokens -= count
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += count
sg.coordMu.diskTokensUsed[wt].writeByteTokens += count
}
}

// adjustDiskTokenErrorLocked is used to account for extra reads and writes that
// are in excess of tokens already deducted. The logic here is a little
// different for accounting for reads and writes.
//
// (1) For writes, we deduct tokens at admission for each request. If the actual
// writes seen on disk for a given interval is higher than the tokens already
// deducted, the delta is the write error. This value is then subtracted from
// available disk tokens.
//
// (2) For reads, we do not deduct any tokens at admission. However, we deduct
// tokens in advance during token estimation in the diskBandwidthLimiter for the
// next adjustment interval. We then use this "bucket" of deducted tokens to
// deduct further tokens until we saturate the bucket. Any additional reads will
// then subtract directly from the available disk tokens.
func (sg *kvStoreTokenGranter) adjustDiskTokenErrorLocked(readBytes uint64, writeBytes uint64) {
intWrites := int64(writeBytes - sg.coordMu.diskTokensError.prevObservedWrites)
intReads := int64(readBytes - sg.coordMu.diskTokensError.prevObservedReads)

// Compensate for error due to writes.
writeError := intWrites - sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted
if writeError > 0 {
if sg.coordMu.diskTokensAvailable.writeByteTokens < 0 {
sg.coordMu.diskTokensAvailable.writeByteTokens = 0
}
sg.coordMu.diskTokensAvailable.writeByteTokens -= writeError
}
// We have compensated for error, if any, in this interval, so we reset the
// deducted count for the next compensation interval.
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted = 0

// Compensate for error due to reads.
var readError int64
sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted -= intReads
if sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted < 0 {
readError = -(sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted)
sg.coordMu.diskTokensAvailable.writeByteTokens -= readError
sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted = 0
}

sg.coordMu.diskTokensError.prevObservedWrites = writeBytes
sg.coordMu.diskTokensError.prevObservedReads = readBytes
}

func (sg *kvStoreTokenGranter) tookWithoutPermission(
workType admissionpb.StoreWorkType, count int64,
) {
Expand Down Expand Up @@ -592,6 +646,7 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
ioTokens int64,
elasticIOTokens int64,
elasticDiskWriteTokens int64,
elasticDiskReadTokens int64,
ioTokenCapacity int64,
elasticIOTokenCapacity int64,
elasticDiskWriteTokensCapacity int64,
Expand Down Expand Up @@ -638,6 +693,7 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
if sg.coordMu.diskTokensAvailable.writeByteTokens > elasticDiskWriteTokensCapacity {
sg.coordMu.diskTokensAvailable.writeByteTokens = elasticDiskWriteTokensCapacity
}
sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted = elasticDiskReadTokens

return ioTokensUsed, ioTokensUsedByElasticWork
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
// cpu-load runnable=<int> procs=<int> [infrequent=<bool>]
// init-store-grant-coordinator
// set-tokens io-tokens=<int> disk-write-tokens=<int>
// adjust-disk-error actual-write-bytes=<int> actual-read-bytes=<int>
func TestGranterBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -249,9 +250,13 @@ func TestGranterBasic(t *testing.T) {
case "set-tokens-loop":
var ioTokens int
var elasticDiskWriteTokens int
var elasticDiskReadTokens int
var loop int
d.ScanArgs(t, "io-tokens", &ioTokens)
d.ScanArgs(t, "disk-write-tokens", &elasticDiskWriteTokens)
if d.HasArg("disk-read-tokens") {
d.ScanArgs(t, "disk-read-tokens", &elasticDiskReadTokens)
}
d.ScanArgs(t, "loop", &loop)

for loop > 0 {
Expand All @@ -262,6 +267,7 @@ func TestGranterBasic(t *testing.T) {
int64(ioTokens),
int64(ioTokens),
int64(elasticDiskWriteTokens),
int64(elasticDiskReadTokens),
int64(ioTokens*250),
int64(ioTokens*250),
int64(elasticDiskWriteTokens*250),
Expand All @@ -274,9 +280,13 @@ func TestGranterBasic(t *testing.T) {
case "set-tokens":
var ioTokens int
var elasticDiskWriteTokens int
var elasticDiskReadTokens int
var tickInterval int
d.ScanArgs(t, "io-tokens", &ioTokens)
d.ScanArgs(t, "disk-write-tokens", &elasticDiskWriteTokens)
if d.HasArg("disk-read-tokens") {
d.ScanArgs(t, "disk-read-tokens", &elasticDiskReadTokens)
}
elasticIOTokens := ioTokens
if d.HasArg("elastic-io-tokens") {
d.ScanArgs(t, "elastic-io-tokens", &elasticIOTokens)
Expand All @@ -298,6 +308,7 @@ func TestGranterBasic(t *testing.T) {
int64(ioTokens),
int64(elasticIOTokens),
int64(elasticDiskWriteTokens),
int64(elasticDiskReadTokens),
int64(ioTokens*burstMultiplier),
int64(elasticIOTokens*burstMultiplier),
int64(elasticDiskWriteTokens*burstMultiplier),
Expand All @@ -315,6 +326,17 @@ func TestGranterBasic(t *testing.T) {
coord.testingTryGrant()
return flushAndReset()

case "adjust-disk-error":
var readBytes, writeBytes int
d.ScanArgs(t, "actual-write-bytes", &writeBytes)
d.ScanArgs(t, "actual-read-bytes", &readBytes)
m := StoreMetrics{DiskStats: DiskStats{
BytesRead: uint64(readBytes),
BytesWritten: uint64(writeBytes),
}}
coord.adjustDiskTokenError(m)
return flushAndReset()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
38 changes: 33 additions & 5 deletions pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,15 @@ type ioLoadListenerState struct {
totalNumElasticByteTokens int64
elasticByteTokensAllocated int64

// elasticDiskBWTokens represents the tokens to give out until the next call
// to adjustTokens. They are parceled out in small intervals.
// elasticDiskTokensAllocated represents what has been given out.
// elasticDiskWriteTokens represents the tokens to give out until the next
// call to adjustTokens. They are parceled out in small intervals.
// elasticDiskWriteTokensAllocated represents what has been given out.
elasticDiskWriteTokens int64
elasticDiskWriteTokensAllocated int64
// elasticDiskReadTokens are tokens that were already deducted during token
// estimation. These tokens are used for read error accounting.
elasticDiskReadTokens int64
elasticDiskReadTokensAllocated int64
}

type cumStoreCompactionStats struct {
Expand Down Expand Up @@ -399,6 +403,7 @@ const loadedDuration = tickDuration(1 * time.Millisecond)
type tokenAllocationTicker struct {
expectedTickDuration time.Duration
adjustmentIntervalStartTime time.Time
lastErrorAdjustmentTime time.Time
ticker *time.Ticker
}

Expand Down Expand Up @@ -441,6 +446,14 @@ func (t *tokenAllocationTicker) remainingTicks() uint64 {
return uint64((remainingTime + t.expectedTickDuration - 1) / t.expectedTickDuration)
}

func (t *tokenAllocationTicker) shouldAdjustForError() bool {
if timeutil.Since(t.lastErrorAdjustmentTime) < time.Second {
return false
}
t.lastErrorAdjustmentTime = timeutil.Now()
return true
}

func (t *tokenAllocationTicker) stop() {
t.ticker.Stop()
*t = tokenAllocationTicker{}
Expand Down Expand Up @@ -558,9 +571,23 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) {
remainingTicks,
)
if toAllocateElasticDiskWriteTokens < 0 {
panic(errors.AssertionFailedf("toAllocateElasticDiskBWTokens is negative %d",
panic(errors.AssertionFailedf("toAllocateElasticDiskWriteTokens is negative %d",
toAllocateElasticDiskWriteTokens))
}
io.elasticDiskWriteTokensAllocated += toAllocateElasticDiskWriteTokens

toAllocateElasticDiskReadTokens :=
allocateFunc(
io.elasticDiskReadTokens,
io.elasticDiskReadTokensAllocated,
remainingTicks,
)
if toAllocateElasticDiskReadTokens < 0 {
panic(errors.AssertionFailedf("toAllocateElasticDiskReadTokens is negative %d",
toAllocateElasticDiskWriteTokens))
}
io.elasticDiskReadTokensAllocated += toAllocateElasticDiskReadTokens

// INVARIANT: toAllocate >= 0.
io.byteTokensAllocated += toAllocateByteTokens
if io.byteTokensAllocated < 0 {
Expand All @@ -571,7 +598,6 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) {
panic(errors.AssertionFailedf(
"tokens allocated is negative %d", io.elasticByteTokensAllocated))
}
io.elasticDiskWriteTokensAllocated += toAllocateElasticDiskWriteTokens

tokensMaxCapacity := allocateFunc(
io.totalNumByteTokens, 0, unloadedDuration.ticksInAdjustmentInterval(),
Expand All @@ -585,6 +611,7 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) {
toAllocateByteTokens,
toAllocateElasticByteTokens,
toAllocateElasticDiskWriteTokens,
toAllocateElasticDiskReadTokens,
tokensMaxCapacity,
elasticTokensMaxCapacity,
diskBWTokenMaxCapacity,
Expand Down Expand Up @@ -647,6 +674,7 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics
intDiskLoadInfo, diskTokensUsed)
io.elasticDiskWriteTokens = tokens.writeByteTokens
io.elasticDiskWriteTokensAllocated = 0
io.elasticDiskReadTokens = tokens.readByteTokens
}
if metrics.DiskStats.ProvisionedBandwidth == 0 ||
!DiskBandwidthTokensForElasticEnabled.Get(&io.settings.SV) {
Expand Down
Loading

0 comments on commit 4942021

Please sign in to comment.