Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
aadityasondhi committed Nov 5, 2024
1 parent bc87198 commit 1ab57ec
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ func registerDiskBandwidthOverload(r registry.Registry) {
continue
}
totalBW := writeVal + readVal
if totalBW > bandwidthThreshold {
t.Fatalf("write + read bandwidth %f (%f + %f) exceeded threshold of %f", totalBW, writeVal, readVal, bandwidthThreshold)
// TODO(aaditya): We should be asserting on total bandwidth once reads
// are being paced.
if writeVal > bandwidthThreshold {
t.Fatalf("write bandwidth %f exceeded threshold of %f, read bandwidth: %f, total bandwidth: %f", writeVal, bandwidthThreshold, readVal, totalBW)
}
numSuccesses++
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ type granterWithIOTokens interface {
// negative, though that will be rare, since it is possible for tokens to be
// returned.
setAvailableTokens(
ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64,
ioTokens int64, elasticIOTokens int64, elasticDiskWriteTokens int64, elasticDiskReadTokens int64,
ioTokensCapacity int64, elasticIOTokenCapacity int64, elasticDiskWriteTokensCapacity int64,
lastTick bool,
) (tokensUsed int64, tokensUsedByElasticWork int64)
Expand Down
11 changes: 6 additions & 5 deletions pkg/util/admission/disk_bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ type diskTokens struct {
func (d *diskBandwidthLimiter) computeElasticTokens(
id intervalDiskLoadInfo, usedTokens [admissionpb.NumStoreWorkTypes]diskTokens,
) diskTokens {
// We are using disk read bytes over the previous adjustment interval as a
// proxy for future reads. It is a somewhat bad proxy, but for now we are ok
// with the inaccuracy. This will be improved once we start to account for
// disk reads in AC.
// TODO(aaditya): Include calculation for read and IOPS.
// Issue: https://github.com/cockroachdb/cockroach/issues/107623

// We are using disk read bytes over the previous adjustment interval as a
// proxy for future reads. This is a bad estimate, but we account for errors
// in this estimate separately at a higher frequency. See
// kvStoreTokenGranter.adjustDiskTokenErrorLocked.
const alpha = 0.5
smoothedReadBytes := alpha*float64(id.intReadBytes) + alpha*float64(d.state.diskLoad.intReadBytes)
// Pessimistic approach using the max value between the smoothed and current
Expand All @@ -140,7 +141,7 @@ func (d *diskBandwidthLimiter) computeElasticTokens(

totalUsedTokens := sumDiskTokens(usedTokens)
tokens := diskTokens{
readByteTokens: 0,
readByteTokens: intReadBytes,
writeByteTokens: diskWriteTokens,
readIOPSTokens: 0,
writeIOPSTokens: 0,
Expand Down
24 changes: 24 additions & 0 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,23 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
remainingTicks = ticker.remainingTicks()
}

// We do error accounting for disk reads and writes. This is important
// since disk token accounting is based on estimates over adjustment
// intervals. These estimates are linear models that have errors that
// would be otherwise unaccounted for. We adjust for these errors at a
// higher frequency than the adjustment interval.
if ticker.shouldAdjustForError() {
metrics := pebbleMetricsProvider.GetPebbleMetrics()
for _, m := range metrics {
if gc, ok := sgc.gcMap.Load(m.StoreID); ok {
gc.adjustDiskTokenError(m)
} else {
log.Warningf(ctx,
"seeing metrics for unknown storeID %d", m.StoreID)
}
}
}

sgc.gcMap.Range(func(_ roachpb.StoreID, gc *GrantCoordinator) bool {
gc.allocateIOTokensTick(int64(remainingTicks))
// true indicates that iteration should continue after the
Expand Down Expand Up @@ -709,6 +726,13 @@ func (coord *GrantCoordinator) allocateIOTokensTick(remainingTicks int64) {
// GrantCoordinators used for IO, so the if-condition is always true.
}

// adjustDiskTokenError is used to
func (coord *GrantCoordinator) adjustDiskTokenError(m StoreMetrics) {
coord.mu.Lock()
defer coord.mu.Unlock()
coord.granters[KVWork].(*kvStoreTokenGranter).adjustDiskTokenErrorLocked(m.DiskStats.BytesRead, m.DiskStats.BytesWritten)
}

// testingTryGrant is only for unit tests, since they sometimes cut out
// support classes like the ioLoadListener.
func (coord *GrantCoordinator) testingTryGrant() {
Expand Down
42 changes: 41 additions & 1 deletion pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,13 @@ type kvStoreTokenGranter struct {
// TODO(aaditya): add support for read/IOPS tokens.
// Disk bandwidth tokens.
diskTokensAvailable diskTokens
diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens
diskTokensError struct {
prevObservedWrites uint64
prevObservedReads uint64
diskWriteTokensAlreadyDeducted int64
diskReadTokensAlreadyDeducted int64
}
diskTokensUsed [admissionpb.NumStoreWorkTypes]diskTokens
}

ioTokensExhaustedDurationMetric [admissionpb.NumWorkClasses]*metric.Counter
Expand Down Expand Up @@ -477,15 +483,47 @@ func (sg *kvStoreTokenGranter) subtractTokensForStoreWorkTypeLocked(
case admissionpb.RegularStoreWorkType, admissionpb.ElasticStoreWorkType:
diskTokenCount := sg.writeAmpLM.applyLinearModel(count)
sg.coordMu.diskTokensAvailable.writeByteTokens -= diskTokenCount
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += diskTokenCount
sg.coordMu.diskTokensUsed[wt].writeByteTokens += diskTokenCount
case admissionpb.SnapshotIngestStoreWorkType:
// Do not apply the writeAmpLM since these writes do not incur additional
// write-amp.
sg.coordMu.diskTokensAvailable.writeByteTokens -= count
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted += count
sg.coordMu.diskTokensUsed[wt].writeByteTokens += count
}
}

// TODO(aaditya): Explain the equations below.
func (sg *kvStoreTokenGranter) adjustDiskTokenErrorLocked(readBytes uint64, writeBytes uint64) {
intWrites := int64(writeBytes - sg.coordMu.diskTokensError.prevObservedWrites)
intReads := int64(readBytes - sg.coordMu.diskTokensError.prevObservedReads)

// Compensate for error due to writes.
writeError := intWrites - sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted
if writeError > 0 {
if sg.coordMu.diskTokensAvailable.writeByteTokens < 0 {
sg.coordMu.diskTokensAvailable.writeByteTokens = 0
}
sg.coordMu.diskTokensAvailable.writeByteTokens -= writeError
}
// We have compensated for error, if any, in this interval, so we reset the
// deducted count for the next compensation interval.
sg.coordMu.diskTokensError.diskWriteTokensAlreadyDeducted = 0

// Compensate for error due to reads.
var readError int64
sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted -= intReads
if sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted < 0 {
readError = -(sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted)
sg.coordMu.diskTokensAvailable.writeByteTokens -= readError
sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted = 0
}

sg.coordMu.diskTokensError.prevObservedWrites = writeBytes
sg.coordMu.diskTokensError.prevObservedReads = readBytes
}

func (sg *kvStoreTokenGranter) tookWithoutPermission(
workType admissionpb.StoreWorkType, count int64,
) {
Expand Down Expand Up @@ -592,6 +630,7 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
ioTokens int64,
elasticIOTokens int64,
elasticDiskWriteTokens int64,
elasticDiskReadTokens int64,
ioTokenCapacity int64,
elasticIOTokenCapacity int64,
elasticDiskWriteTokensCapacity int64,
Expand Down Expand Up @@ -638,6 +677,7 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
if sg.coordMu.diskTokensAvailable.writeByteTokens > elasticDiskWriteTokensCapacity {
sg.coordMu.diskTokensAvailable.writeByteTokens = elasticDiskWriteTokensCapacity
}
sg.coordMu.diskTokensError.diskReadTokensAlreadyDeducted = elasticDiskReadTokens

return ioTokensUsed, ioTokensUsedByElasticWork
}
Expand Down
38 changes: 33 additions & 5 deletions pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,15 @@ type ioLoadListenerState struct {
totalNumElasticByteTokens int64
elasticByteTokensAllocated int64

// elasticDiskBWTokens represents the tokens to give out until the next call
// to adjustTokens. They are parceled out in small intervals.
// elasticDiskTokensAllocated represents what has been given out.
// elasticDiskWriteTokens represents the tokens to give out until the next
// call to adjustTokens. They are parceled out in small intervals.
// elasticDiskWriteTokensAllocated represents what has been given out.
elasticDiskWriteTokens int64
elasticDiskWriteTokensAllocated int64
// elasticDiskReadTokens are tokens that were already deducted during token
// estimation. These tokens are used for read error accounting.
elasticDiskReadTokens int64
elasticDiskReadTokensAllocated int64
}

type cumStoreCompactionStats struct {
Expand Down Expand Up @@ -399,6 +403,7 @@ const loadedDuration = tickDuration(1 * time.Millisecond)
type tokenAllocationTicker struct {
expectedTickDuration time.Duration
adjustmentIntervalStartTime time.Time
lastErrorAdjustmentTime time.Time
ticker *time.Ticker
}

Expand Down Expand Up @@ -441,6 +446,14 @@ func (t *tokenAllocationTicker) remainingTicks() uint64 {
return uint64((remainingTime + t.expectedTickDuration - 1) / t.expectedTickDuration)
}

func (t *tokenAllocationTicker) shouldAdjustForError() bool {
if timeutil.Since(t.lastErrorAdjustmentTime) < time.Second {
return false
}
t.lastErrorAdjustmentTime = timeutil.Now()
return true
}

func (t *tokenAllocationTicker) stop() {
t.ticker.Stop()
*t = tokenAllocationTicker{}
Expand Down Expand Up @@ -558,9 +571,23 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) {
remainingTicks,
)
if toAllocateElasticDiskWriteTokens < 0 {
panic(errors.AssertionFailedf("toAllocateElasticDiskBWTokens is negative %d",
panic(errors.AssertionFailedf("toAllocateElasticDiskWriteTokens is negative %d",
toAllocateElasticDiskWriteTokens))
}
io.elasticDiskWriteTokensAllocated += toAllocateElasticDiskWriteTokens

toAllocateElasticDiskReadTokens :=
allocateFunc(
io.elasticDiskReadTokens,
io.elasticDiskReadTokensAllocated,
remainingTicks,
)
if toAllocateElasticDiskReadTokens < 0 {
panic(errors.AssertionFailedf("toAllocateElasticDiskReadTokens is negative %d",
toAllocateElasticDiskWriteTokens))
}
io.elasticDiskReadTokensAllocated += toAllocateElasticDiskReadTokens

// INVARIANT: toAllocate >= 0.
io.byteTokensAllocated += toAllocateByteTokens
if io.byteTokensAllocated < 0 {
Expand All @@ -571,7 +598,6 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) {
panic(errors.AssertionFailedf(
"tokens allocated is negative %d", io.elasticByteTokensAllocated))
}
io.elasticDiskWriteTokensAllocated += toAllocateElasticDiskWriteTokens

tokensMaxCapacity := allocateFunc(
io.totalNumByteTokens, 0, unloadedDuration.ticksInAdjustmentInterval(),
Expand All @@ -585,6 +611,7 @@ func (io *ioLoadListener) allocateTokensTick(remainingTicks int64) {
toAllocateByteTokens,
toAllocateElasticByteTokens,
toAllocateElasticDiskWriteTokens,
toAllocateElasticDiskReadTokens,
tokensMaxCapacity,
elasticTokensMaxCapacity,
diskBWTokenMaxCapacity,
Expand Down Expand Up @@ -647,6 +674,7 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics
intDiskLoadInfo, diskTokensUsed)
io.elasticDiskWriteTokens = tokens.writeByteTokens
io.elasticDiskWriteTokensAllocated = 0
io.elasticDiskReadTokens = tokens.readByteTokens
}
if metrics.DiskStats.ProvisionedBandwidth == 0 ||
!DiskBandwidthTokensForElasticEnabled.Get(&io.settings.SV) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/admission/io_load_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func (g *testGranterWithIOTokens) setAvailableTokens(
ioTokens int64,
elasticIOTokens int64,
elasticDiskBandwidthTokens int64,
elasticReadBandwidthTokens int64,
maxIOTokens int64,
maxElasticIOTokens int64,
maxElasticDiskBandwidthTokens int64,
Expand Down Expand Up @@ -486,6 +487,7 @@ func (g *testGranterNonNegativeTokens) setAvailableTokens(
ioTokens int64,
elasticIOTokens int64,
elasticDiskBandwidthTokens int64,
elasticReadBandiwdthTokens int64,
_ int64,
_ int64,
_ int64,
Expand Down

0 comments on commit 1ab57ec

Please sign in to comment.