Skip to content

Commit

Permalink
admission: error accounting for disk reads and writes
Browse files Browse the repository at this point in the history
Previously, we would ignore errors in disk token accounting hoping that
the next adjustment interval would capture them. This caused issues when
async compactions ran within an adjustment interval. These could spike
the bandwidth for a section of the 15 second interval without moving the
average too much (write-amp estimation).

This patch introduces a higher frequency error accounting mechanism. It
runs every 1s to account for any additional reads and writes that could
have occurred. Errors are only deducted if the read and/or write
bandwidth in the interval is greater than the number of tokens already
deducted in that interval.

Fixes #132332.

Release note: None
  • Loading branch information
aadityasondhi committed Nov 8, 2024
1 parent bc87198 commit adaac96
Show file tree
Hide file tree
Showing 10 changed files with 564 additions and 364 deletions.
4 changes: 2 additions & 2 deletions pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@ type granterWithIOTokens interface {
// negative, though that will be rare, since it is possible for tokens to be
// returned.
setAvailableTokens(
ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64,
ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64, elasticDiskReadTokens int64,
ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskWriteTokensCapacity int64,
lastTick bool,
elasticDiskReadTokensCapacity int64, lastTick bool,
) (tokensUsed int64, tokensUsedByElasticWork int64)
// getDiskTokensUsedAndReset returns the disk bandwidth tokens used since the
// last such call.
Expand Down
18 changes: 11 additions & 7 deletions pkg/util/admission/disk_bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,25 +122,27 @@ type diskTokens struct {
func (d *diskBandwidthLimiter) computeElasticTokens(
id intervalDiskLoadInfo, usedTokens [admissionpb.NumStoreWorkTypes]diskTokens,
) diskTokens {
// We are using disk read bytes over the previous adjustment interval as a
// proxy for future reads. It is a somewhat bad proxy, but for now we are ok
// with the inaccuracy. This will be improved once we start to account for
// disk reads in AC.
// TODO(aaditya): Include calculation for read and IOPS.
// Issue: https://github.com/cockroachdb/cockroach/issues/107623

// We are using disk read bytes over the previous adjustment interval as a
// proxy for future reads. This is a bad estimate, but we account for errors
// in this estimate separately at a higher frequency. See
// kvStoreTokenGranter.adjustDiskTokenErrorLocked.
const alpha = 0.5
smoothedReadBytes := alpha*float64(id.intReadBytes) + alpha*float64(d.state.diskLoad.intReadBytes)
// Pessimistic approach using the max value between the smoothed and current
// reads.
intReadBytes := int64(math.Max(smoothedReadBytes, float64(id.intReadBytes)))
intReadBytes = int64(math.Max(0, float64(intReadBytes)))
diskWriteTokens := int64(float64(id.intProvisionedDiskBytes)*id.elasticBandwidthMaxUtil) - intReadBytes
// TODO(aaditya): consider setting a different floor to avoid starving out
// elastic writes completely due to out-sized reads from above.
diskWriteTokens = int64(math.Max(0, float64(diskWriteTokens)))

totalUsedTokens := sumDiskTokens(usedTokens)
tokens := diskTokens{
readByteTokens: 0,
readByteTokens: intReadBytes,
writeByteTokens: diskWriteTokens,
readIOPSTokens: 0,
writeIOPSTokens: 0,
Expand All @@ -159,14 +161,16 @@ func (d *diskBandwidthLimiter) computeElasticTokens(
func (d *diskBandwidthLimiter) SafeFormat(p redact.SafePrinter, _ rune) {
ib := humanizeutil.IBytes
p.Printf("diskBandwidthLimiter (tokenUtilization %.2f, tokensUsed (elastic %s, "+
"snapshot %s, regular %s) tokens (write %s (prev %s)), writeBW %s/s, readBW %s/s, "+
"provisioned %s/s)",
"snapshot %s, regular %s) tokens (write %s (prev %s), read %s (prev %s)), writeBW %s/s, "+
"readBW %s/s, provisioned %s/s)",
d.state.diskBWUtil,
ib(d.state.usedTokens[admissionpb.ElasticStoreWorkType].writeByteTokens),
ib(d.state.usedTokens[admissionpb.SnapshotIngestStoreWorkType].writeByteTokens),
ib(d.state.usedTokens[admissionpb.RegularStoreWorkType].writeByteTokens),
ib(d.state.tokens.writeByteTokens),
ib(d.state.prevTokens.writeByteTokens),
ib(d.state.tokens.readByteTokens),
ib(d.state.prevTokens.readByteTokens),
ib(d.state.diskLoad.intWriteBytes/adjustmentInterval),
ib(d.state.diskLoad.intReadBytes/adjustmentInterval),
ib(d.state.diskLoad.intProvisionedDiskBytes/adjustmentInterval),
Expand Down
28 changes: 28 additions & 0 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,23 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
remainingTicks = ticker.remainingTicks()
}

// We do error accounting for disk reads and writes. This is important
// since disk token accounting is based on estimates over adjustment
// intervals. Like any model, these linear models have error terms, and
// need to be adjusted for greater accuracy. We adjust for these errors
// at a higher frequency than the adjustment interval.
if ticker.shouldAdjustForError() {
metrics := pebbleMetricsProvider.GetPebbleMetrics()
for _, m := range metrics {
if gc, ok := sgc.gcMap.Load(m.StoreID); ok {
gc.adjustDiskTokenError(m)
} else {
log.Warningf(ctx,
"seeing metrics for unknown storeID %d", m.StoreID)
}
}
}

sgc.gcMap.Range(func(_ roachpb.StoreID, gc *GrantCoordinator) bool {
gc.allocateIOTokensTick(int64(remainingTicks))
// true indicates that iteration should continue after the
Expand Down Expand Up @@ -709,6 +726,17 @@ func (coord *GrantCoordinator) allocateIOTokensTick(remainingTicks int64) {
// GrantCoordinators used for IO, so the if-condition is always true.
}

// adjustDiskTokenError is used to account for errors in disk read and write
// token estimation. Refer to the comment in adjustDiskTokenErrorLocked for more
// details.
func (coord *GrantCoordinator) adjustDiskTokenError(m StoreMetrics) {
coord.mu.Lock()
defer coord.mu.Unlock()
if storeGranter, ok := coord.granters[KVWork].(*kvStoreTokenGranter); ok {
storeGranter.adjustDiskTokenErrorLocked(m.DiskStats.BytesRead, m.DiskStats.BytesWritten)
}
}

// testingTryGrant is only for unit tests, since they sometimes cut out
// support classes like the ioLoadListener.
func (coord *GrantCoordinator) testingTryGrant() {
Expand Down
66 changes: 65 additions & 1 deletion pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,13 @@ type kvStoreTokenGranter struct {
// TODO(aaditya): add support for read/IOPS tokens.
// Disk bandwidth tokens.
diskTokensAvailable diskTokens
diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens
diskTokensError struct {
prevObservedWrites uint64
prevObservedReads uint64
diskWriteTokensAlreadyDeducted int64
diskReadTokensAlreadyDeducted int64
}
diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens
}

ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter
Expand Down Expand Up @@ -423,6 +429,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 {
sg.subtractIOTokensLocked(count, count, false)
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens
return grantSuccess
}
Expand All @@ -433,6 +440,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
sg.subtractIOTokensLocked(count, count, false)
sg.coordMu.elasticIOTokensUsedByElastic += count
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens
return grantSuccess
}
Expand All @@ -441,6 +449,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
// writeByteTokens.
if sg.coordMu.diskTokensAvailable.writeByteTokens > 0 {
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens
return grantSuccess
}
Expand Down Expand Up @@ -477,15 +486,63 @@ func (sg *kvStoreTokenGranter) subtractTokensForStoreWorkTypeLocked(
case admissionpb.RegularStoreWorkType, admissionpb.ElasticStoreWorkType:
diskTokenCount := sg.writeAmpLM.applyLinearModel(count)
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskTokenCount
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskTokenCount
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskTokenCount
case admissionpb.SnapshotIngestStoreWorkType:
// Do not apply the writeAmpLM since these writes do not incur additional
// write-amp.
sg.coordMu.diskTokensAvailable.writeByteTokens -= count
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += count
sg.coordMu.diskTokensUsed[wt].writeByteTokens += count
}
}

// adjustDiskTokenErrorLocked is used to account for extra reads and writes that
// are in excess of tokens already deducted. The logic here is a little
// different for accounting for reads and writes.
//
// (1) For writes, we deduct tokens at admission for each request. If the actual
// writes seen on disk for a given interval is higher than the tokens already
// deducted, the delta is the write error. This value is then subtracted from
// available disk tokens.
//
// (2) For reads, we do not deduct any tokens at admission. However, we deduct
// tokens in advance during token estimation in the diskBandwidthLimiter for the
// next adjustment interval. We then use this "bucket" of deducted tokens to
// deduct further tokens until we saturate the bucket. Any additional reads will
// then subtract directly from the available disk tokens.
func (sg *kvStoreTokenGranter) adjustDiskTokenErrorLocked(readBytes uint64, writeBytes uint64) {
intWrites := int64(writeBytes - sg.coordMu.diskTokensError.prevObservedWrites)
intReads := int64(readBytes - sg.coordMu.diskTokensError.prevObservedReads)

// Compensate for error due to writes.
writeError := intWrites - sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted
if writeError > 0 {
if sg.coordMu.diskTokensAvailable.writeByteTokens < 0 {
sg.coordMu.diskTokensAvailable.writeByteTokens = 0
}
sg.coordMu.diskTokensAvailable.writeByteTokens -= writeError
}
// We have compensated for error, if any, in this interval, so we reset the
// deducted count for the next compensation interval.
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted = 0

// Compensate for error due to reads.
var readError int64
sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted -= intReads
if sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted < 0 {
readError = -(sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted)
sg.coordMu.diskTokensAvailable.writeByteTokens -= readError
if sg.coordMu.diskTokensAvailable.writeByteTokens < 0 {
sg.coordMu.diskTokensAvailable.writeByteTokens = 0
}
sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted = 0
}

sg.coordMu.diskTokensError.prevObservedWrites = writeBytes
sg.coordMu.diskTokensError.prevObservedReads = readBytes
}

func (sg *kvStoreTokenGranter) tookWithoutPermission(
workType admissionpb.StoreWorkType, count int64,
) {
Expand Down Expand Up @@ -592,9 +649,11 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
ioTokens int64,
elasticIOTokens int64,
elasticDiskWriteTokens int64,
elasticDiskReadTokens int64,
ioTokenCapacity int64,
elasticIOTokenCapacity int64,
elasticDiskWriteTokensCapacity int64,
elasticDiskReadTokensCapacity int64,
lastTick bool,
) (ioTokensUsed int64, ioTokensUsedByElasticWork int64) {
sg.coord.mu.Lock()
Expand Down Expand Up @@ -639,6 +698,11 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
sg.coordMu.diskTokensAvailable.writeByteTokens = elasticDiskWriteTokensCapacity
}

sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted += elasticDiskReadTokens
if sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted > elasticDiskReadTokensCapacity {
sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted = elasticDiskReadTokensCapacity
}

return ioTokensUsed, ioTokensUsedByElasticWork
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
// cpu-load runnable=<int> procs=<int> [infrequent=<bool>]
// init-store-grant-coordinator
// set-tokens io-tokens=<int> disk-write-tokens=<int>
// adjust-disk-error actual-write-bytes=<int> actual-read-bytes=<int>
func TestGranterBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -249,9 +250,13 @@ func TestGranterBasic(t *testing.T) {
case "set-tokens-loop":
var ioTokens int
var elasticDiskWriteTokens int
var elasticDiskReadTokens int
var loop int
d.ScanArgs(t, "io-tokens", &ioTokens)
d.ScanArgs(t, "disk-write-tokens", &elasticDiskWriteTokens)
if d.HasArg("disk-read-tokens") {
d.ScanArgs(t, "disk-read-tokens", &elasticDiskReadTokens)
}
d.ScanArgs(t, "loop", &loop)

for loop > 0 {
Expand All @@ -262,9 +267,11 @@ func TestGranterBasic(t *testing.T) {
int64(ioTokens),
int64(ioTokens),
int64(elasticDiskWriteTokens),
int64(elasticDiskReadTokens),
int64(ioTokens*250),
int64(ioTokens*250),
int64(elasticDiskWriteTokens*250),
int64(elasticDiskReadTokens*250),
false, // lastTick
)
}
Expand All @@ -274,9 +281,13 @@ func TestGranterBasic(t *testing.T) {
case "set-tokens":
var ioTokens int
var elasticDiskWriteTokens int
var elasticDiskReadTokens int
var tickInterval int
d.ScanArgs(t, "io-tokens", &ioTokens)
d.ScanArgs(t, "disk-write-tokens", &elasticDiskWriteTokens)
if d.HasArg("disk-read-tokens") {
d.ScanArgs(t, "disk-read-tokens", &elasticDiskReadTokens)
}
elasticIOTokens := ioTokens
if d.HasArg("elastic-io-tokens") {
d.ScanArgs(t, "elastic-io-tokens", &elasticIOTokens)
Expand All @@ -298,9 +309,11 @@ func TestGranterBasic(t *testing.T) {
int64(ioTokens),
int64(elasticIOTokens),
int64(elasticDiskWriteTokens),
int64(elasticDiskReadTokens),
int64(ioTokens*burstMultiplier),
int64(elasticIOTokens*burstMultiplier),
int64(elasticDiskWriteTokens*burstMultiplier),
int64(elasticDiskReadTokens*burstMultiplier),
false, // lastTick
)
coord.testingTryGrant()
Expand All @@ -315,6 +328,17 @@ func TestGranterBasic(t *testing.T) {
coord.testingTryGrant()
return flushAndReset()

case "adjust-disk-error":
var readBytes, writeBytes int
d.ScanArgs(t, "actual-write-bytes", &writeBytes)
d.ScanArgs(t, "actual-read-bytes", &readBytes)
m := StoreMetrics{DiskStats: DiskStats{
BytesRead: uint64(readBytes),
BytesWritten: uint64(writeBytes),
}}
coord.adjustDiskTokenError(m)
return flushAndReset()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
Loading

0 comments on commit adaac96

Please sign in to comment.