Skip to content

Commit

Permalink
admission: error accounting for disk reads and writes
Browse files Browse the repository at this point in the history
Previously, we would ignore errors in disk token accounting hoping that
the next adjustment interval would capture them. This caused issues when
async compactions ran within an adjustment interval. These could spike
the bandwidth for a section of the 15 second interval without moving the
average too much (write-amp estimation).

This patch introduces a higher frequency error accounting mechanism. It
runs every 1s to account for any additional reads and writes that could
have occurred. Errors are only deducted if the read and/or write
bandwidth in the interval is greater than the number of tokens already
deducted in that interval.

Fixes cockroachdb#132332.

Release note: None
  • Loading branch information
aadityasondhi committed Dec 12, 2024
1 parent e8ee6f5 commit 2904c11
Show file tree
Hide file tree
Showing 10 changed files with 737 additions and 458 deletions.
2 changes: 1 addition & 1 deletion pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ type granterWithIOTokens interface {
// negative, though that will be rare, since it is possible for tokens to be
// returned.
setAvailableTokens(
ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64,
ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64, elasticDiskReadTokens int64,
ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskWriteTokensCapacity int64,
lastTick bool,
) (tokensUsed int64, tokensUsedByElasticWork int64)
Expand Down
18 changes: 11 additions & 7 deletions pkg/util/admission/disk_bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,25 +122,27 @@ type diskTokens struct {
func (d *diskBandwidthLimiter) computeElasticTokens(
id intervalDiskLoadInfo, usedTokens [admissionpb.NumStoreWorkTypes]diskTokens,
) diskTokens {
// We are using disk read bytes over the previous adjustment interval as a
// proxy for future reads. It is a somewhat bad proxy, but for now we are ok
// with the inaccuracy. This will be improved once we start to account for
// disk reads in AC.
// TODO(aaditya): Include calculation for read and IOPS.
// Issue: https://github.com/cockroachdb/cockroach/issues/107623

// We are using disk read bytes over the previous adjustment interval as a
// proxy for future reads. This is a bad estimate, but we account for errors
// in this estimate separately at a higher frequency. See
// kvStoreTokenGranter.adjustDiskTokenErrorLocked.
const alpha = 0.5
smoothedReadBytes := alpha*float64(id.intReadBytes) + alpha*float64(d.state.diskLoad.intReadBytes)
// Pessimistic approach using the max value between the smoothed and current
// reads.
intReadBytes := int64(math.Max(smoothedReadBytes, float64(id.intReadBytes)))
intReadBytes = int64(math.Max(0, float64(intReadBytes)))
diskWriteTokens := int64(float64(id.intProvisionedDiskBytes)*id.elasticBandwidthMaxUtil) - intReadBytes
// TODO(aaditya): consider setting a different floor to avoid starving out
// elastic writes completely due to out-sized reads from above.
diskWriteTokens = int64(math.Max(0, float64(diskWriteTokens)))

totalUsedTokens := sumDiskTokens(usedTokens)
tokens := diskTokens{
readByteTokens: 0,
readByteTokens: intReadBytes,
writeByteTokens: diskWriteTokens,
readIOPSTokens: 0,
writeIOPSTokens: 0,
Expand All @@ -159,14 +161,16 @@ func (d *diskBandwidthLimiter) computeElasticTokens(
func (d *diskBandwidthLimiter) SafeFormat(p redact.SafePrinter, _ rune) {
ib := humanizeutil.IBytes
p.Printf("diskBandwidthLimiter (tokenUtilization %.2f, tokensUsed (elastic %s, "+
"snapshot %s, regular %s) tokens (write %s (prev %s)), writeBW %s/s, readBW %s/s, "+
"provisioned %s/s)",
"snapshot %s, regular %s) tokens (write %s (prev %s), read %s (prev %s)), writeBW %s/s, "+
"readBW %s/s, provisioned %s/s)",
d.state.diskBWUtil,
ib(d.state.usedTokens[admissionpb.ElasticStoreWorkType].writeByteTokens),
ib(d.state.usedTokens[admissionpb.SnapshotIngestStoreWorkType].writeByteTokens),
ib(d.state.usedTokens[admissionpb.RegularStoreWorkType].writeByteTokens),
ib(d.state.tokens.writeByteTokens),
ib(d.state.prevTokens.writeByteTokens),
ib(d.state.tokens.readByteTokens),
ib(d.state.prevTokens.readByteTokens),
ib(d.state.diskLoad.intWriteBytes/adjustmentInterval),
ib(d.state.diskLoad.intReadBytes/adjustmentInterval),
ib(d.state.diskLoad.intProvisionedDiskBytes/adjustmentInterval),
Expand Down
49 changes: 45 additions & 4 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,41 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
go func() {
ticker := tokenAllocationTicker{}
done := false
var systemLoaded bool // First adjustment interval is unloaded.
// The first adjustment interval is unloaded. We start as unloaded mainly
// for tests, and do a one-way transition to do 1ms ticks once we encounter
// load in the system.
var systemLoaded bool
ticker.adjustmentStart(false /* loaded */)
for !done {
ticker.tick()
remainingTicks := ticker.remainingTicks()
select {
default:
// We do error accounting for disk reads and writes. This is important
// since disk token accounting is based on estimates over adjustment
// intervals. Like any model, these linear models have error terms, and
// need to be adjusted for greater accuracy. We adjust for these errors
// at a higher frequency than the adjustment interval. The error
// adjustment interval is defined by errorAdjustmentDuration.
//
// NB: We always do error calculation prior to making adjustments to
// make sure we account for errors prior to starting a new adjustment
// interval.
if ticker.shouldAdjustForError(remainingTicks, systemLoaded) {
metrics = pebbleMetricsProvider.GetPebbleMetrics()
for _, m := range metrics {
if gc, ok := sgc.gcMap.Load(m.StoreID); ok {
gc.adjustDiskTokenError(m)
} else {
log.Warningf(ctx,
"seeing metrics for unknown storeID %d", m.StoreID)
}
}
}

// Start a new adjustment interval.
if remainingTicks == 0 {
metrics := pebbleMetricsProvider.GetPebbleMetrics()
metrics = pebbleMetricsProvider.GetPebbleMetrics()
if len(metrics) != sgc.numStores {
log.Warningf(ctx,
"expected %d store metrics and found %d metrics", sgc.numStores, len(metrics))
Expand All @@ -135,6 +161,7 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
remainingTicks = ticker.remainingTicks()
}

// Allocate tokens to the store grant coordinator.
sgc.gcMap.Range(func(_ roachpb.StoreID, gc *GrantCoordinator) bool {
gc.allocateIOTokensTick(int64(remainingTicks))
// true indicates that iteration should continue after the
Expand Down Expand Up @@ -709,6 +736,17 @@ func (coord *GrantCoordinator) allocateIOTokensTick(remainingTicks int64) {
// GrantCoordinators used for IO, so the if-condition is always true.
}

// adjustDiskTokenError is used to account for errors in disk read and write
// token estimation. Refer to the comment in adjustDiskTokenErrorLocked for more
// details.
func (coord *GrantCoordinator) adjustDiskTokenError(m StoreMetrics) {
coord.mu.Lock()
defer coord.mu.Unlock()
if storeGranter, ok := coord.granters[KVWork].(*kvStoreTokenGranter); ok {
storeGranter.adjustDiskTokenErrorLocked(m.DiskStats.BytesRead, m.DiskStats.BytesWritten)
}
}

// testingTryGrant is only for unit tests, since they sometimes cut out
// support classes like the ioLoadListener.
func (coord *GrantCoordinator) testingTryGrant() {
Expand Down Expand Up @@ -998,9 +1036,12 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, _ rune) {
case *slotGranter:
s.Printf("%s%s: used: %d, total: %d", curSep, kind, g.usedSlots, g.totalSlots)
case *kvStoreTokenGranter:
s.Printf(" io-avail: %d(%d), disk-write-tokens-avail: %d", g.coordMu.availableIOTokens[admissionpb.RegularWorkClass],
s.Printf(" io-avail: %d(%d), disk-write-tokens-avail: %d, disk-read-tokens-deducted: %d",
g.coordMu.availableIOTokens[admissionpb.RegularWorkClass],
g.coordMu.availableIOTokens[admissionpb.ElasticWorkClass],
g.coordMu.diskTokensAvailable.writeByteTokens)
g.coordMu.diskTokensAvailable.writeByteTokens,
g.coordMu.diskTokensError.diskReadTokensAlreadyDeducted,
)
}
case SQLStatementLeafStartWork, SQLStatementRootStartWork:
if coord.granters[i] != nil {
Expand Down
80 changes: 74 additions & 6 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,16 @@ type kvStoreTokenGranter struct {
// TODO(aaditya): add support for read/IOPS tokens.
// Disk bandwidth tokens.
diskTokensAvailable diskTokens
diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens
diskTokensError struct {
// prevObserved{Writes,Reads} is the observed disk metrics in the last
// call to adjustDiskTokenErrorLocked. These are used to compute the
// delta.
prevObservedWrites uint64
prevObservedReads uint64
diskWriteTokensAlreadyDeducted int64
diskReadTokensAlreadyDeducted int64
}
diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens
}

ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter
Expand Down Expand Up @@ -423,6 +432,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
if sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass] > 0 {
sg.subtractIOTokensLocked(count, count, false)
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens
return grantSuccess
}
Expand All @@ -433,6 +443,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
sg.subtractIOTokensLocked(count, count, false)
sg.coordMu.elasticIOTokensUsedByElastic += count
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens
return grantSuccess
}
Expand All @@ -441,6 +452,7 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
// writeByteTokens.
if sg.coordMu.diskTokensAvailable.writeByteTokens > 0 {
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskWriteTokens
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskWriteTokens
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskWriteTokens
return grantSuccess
}
Expand Down Expand Up @@ -477,15 +489,62 @@ func (sg *kvStoreTokenGranter) subtractTokensForStoreWorkTypeLocked(
case admissionpb.RegularStoreWorkType, admissionpb.ElasticStoreWorkType:
diskTokenCount := sg.writeAmpLM.applyLinearModel(count)
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskTokenCount
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskTokenCount
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskTokenCount
case admissionpb.SnapshotIngestStoreWorkType:
// Do not apply the writeAmpLM since these writes do not incur additional
// write-amp.
sg.coordMu.diskTokensAvailable.writeByteTokens -= count
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += count
sg.coordMu.diskTokensUsed[wt].writeByteTokens += count
}
}

// adjustDiskTokenErrorLocked is used to account for extra reads and writes that
// are in excess of tokens already deducted.
//
// (1) For writes, we deduct tokens at admission for each request. If the actual
// writes seen on disk for a given interval is higher than the tokens already
// deducted, the delta is the write error. This value is then subtracted from
// available disk tokens.
//
// (2) For reads, we do not deduct any tokens at admission. However, we deduct
// tokens in advance during token estimation in the diskBandwidthLimiter for the
// next adjustment interval. These pre-deducted tokens are then allocated at the
// same interval as write tokens. Any additional reads in the interval are
// considered error and are subtracted from the available disk write tokens.
//
// For both reads, and writes, we reset the
// disk{read,write}TokensAlreadyDeducted to 0 for the next adjustment interval.
// For writes, we do this so that we are accounting for errors only in the given
// interval, and not across them. For reads, this is so that we don't grow
// arbitrarily large "burst" tokens, since they are not capped to an allocation
// period.
func (sg *kvStoreTokenGranter) adjustDiskTokenErrorLocked(readBytes uint64, writeBytes uint64) {
intWrites := int64(writeBytes - sg.coordMu.diskTokensError.prevObservedWrites)
intReads := int64(readBytes - sg.coordMu.diskTokensError.prevObservedReads)

// Compensate for error due to writes.
writeError := intWrites - sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted
if writeError > 0 {
sg.coordMu.diskTokensAvailable.writeByteTokens -= writeError
}

// Compensate for error due to reads.
readError := intReads - sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted
if readError > 0 {
sg.coordMu.diskTokensAvailable.writeByteTokens -= readError
}

// We have compensated for error, if any, in this interval, so we reset the
// deducted count for the next compensation interval.
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted = 0
sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted = 0

sg.coordMu.diskTokensError.prevObservedWrites = writeBytes
sg.coordMu.diskTokensError.prevObservedReads = readBytes
}

func (sg *kvStoreTokenGranter) tookWithoutPermission(
workType admissionpb.StoreWorkType, count int64,
) {
Expand Down Expand Up @@ -591,10 +650,11 @@ func (sg *kvStoreTokenGranter) tryGrantLocked(grantChainID grantChainID) grantRe
func (sg *kvStoreTokenGranter) setAvailableTokens(
ioTokens int64,
elasticIOTokens int64,
elasticDiskWriteTokens int64,
diskWriteTokens int64,
diskReadTokens int64,
ioTokenCapacity int64,
elasticIOTokenCapacity int64,
elasticDiskWriteTokensCapacity int64,
diskWriteTokensCapacity int64,
lastTick bool,
) (ioTokensUsed int64, ioTokensUsedByElasticWork int64) {
sg.coord.mu.Lock()
Expand Down Expand Up @@ -627,17 +687,25 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
// which case we want availableIOTokens[ElasticWorkClass] to not exceed it.
sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass] =
min(sg.coordMu.availableIOTokens[admissionpb.ElasticWorkClass], sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass])
// We also want to avoid very negative disk write tokens, so we reset them.
sg.coordMu.diskTokensAvailable.writeByteTokens = max(sg.coordMu.diskTokensAvailable.writeByteTokens, 0)
}
var w admissionpb.WorkClass
for w = 0; w < admissionpb.NumWorkClasses; w++ {
sg.availableTokensMetric[w].Update(sg.coordMu.availableIOTokens[w])
}
sg.startingIOTokens = sg.coordMu.availableIOTokens[admissionpb.RegularWorkClass]

sg.coordMu.diskTokensAvailable.writeByteTokens += elasticDiskWriteTokens
if sg.coordMu.diskTokensAvailable.writeByteTokens > elasticDiskWriteTokensCapacity {
sg.coordMu.diskTokensAvailable.writeByteTokens = elasticDiskWriteTokensCapacity
// Allocate disk write and read tokens.
sg.coordMu.diskTokensAvailable.writeByteTokens += diskWriteTokens
if sg.coordMu.diskTokensAvailable.writeByteTokens > diskWriteTokensCapacity {
sg.coordMu.diskTokensAvailable.writeByteTokens = diskWriteTokensCapacity
}
// NB: We don't cap the disk read tokens as they are only deducted during the
// error accounting loop. So essentially, we give reads the "burst" capacity
// of the error accounting interval. See `adjustDiskTokenErrorLocked` for the
// error accounting logic, and where we reset this bucket to 0.
sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted += diskReadTokens

return ioTokensUsed, ioTokensUsedByElasticWork
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
// cpu-load runnable=<int> procs=<int> [infrequent=<bool>]
// init-store-grant-coordinator
// set-tokens io-tokens=<int> disk-write-tokens=<int>
// adjust-disk-error actual-write-bytes=<int> actual-read-bytes=<int>
func TestGranterBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -249,9 +250,13 @@ func TestGranterBasic(t *testing.T) {
case "set-tokens-loop":
var ioTokens int
var elasticDiskWriteTokens int
var elasticDiskReadTokens int
var loop int
d.ScanArgs(t, "io-tokens", &ioTokens)
d.ScanArgs(t, "disk-write-tokens", &elasticDiskWriteTokens)
if d.HasArg("disk-read-tokens") {
d.ScanArgs(t, "disk-read-tokens", &elasticDiskReadTokens)
}
d.ScanArgs(t, "loop", &loop)

for loop > 0 {
Expand All @@ -262,6 +267,7 @@ func TestGranterBasic(t *testing.T) {
int64(ioTokens),
int64(ioTokens),
int64(elasticDiskWriteTokens),
int64(elasticDiskReadTokens),
int64(ioTokens*250),
int64(ioTokens*250),
int64(elasticDiskWriteTokens*250),
Expand All @@ -274,9 +280,13 @@ func TestGranterBasic(t *testing.T) {
case "set-tokens":
var ioTokens int
var elasticDiskWriteTokens int
var elasticDiskReadTokens int
var tickInterval int
d.ScanArgs(t, "io-tokens", &ioTokens)
d.ScanArgs(t, "disk-write-tokens", &elasticDiskWriteTokens)
if d.HasArg("disk-read-tokens") {
d.ScanArgs(t, "disk-read-tokens", &elasticDiskReadTokens)
}
elasticIOTokens := ioTokens
if d.HasArg("elastic-io-tokens") {
d.ScanArgs(t, "elastic-io-tokens", &elasticIOTokens)
Expand All @@ -298,6 +308,7 @@ func TestGranterBasic(t *testing.T) {
int64(ioTokens),
int64(elasticIOTokens),
int64(elasticDiskWriteTokens),
int64(elasticDiskReadTokens),
int64(ioTokens*burstMultiplier),
int64(elasticIOTokens*burstMultiplier),
int64(elasticDiskWriteTokens*burstMultiplier),
Expand All @@ -315,6 +326,17 @@ func TestGranterBasic(t *testing.T) {
coord.testingTryGrant()
return flushAndReset()

case "adjust-disk-error":
var readBytes, writeBytes int
d.ScanArgs(t, "actual-write-bytes", &writeBytes)
d.ScanArgs(t, "actual-read-bytes", &readBytes)
m := StoreMetrics{DiskStats: DiskStats{
BytesRead: uint64(readBytes),
BytesWritten: uint64(writeBytes),
}}
coord.adjustDiskTokenError(m)
return flushAndReset()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
Loading

0 comments on commit 2904c11

Please sign in to comment.