Skip to content

Commit

Permalink
admission: allow range snapshot ignored bytes to be normal writes
Browse files Browse the repository at this point in the history
This is needed for cockroachdb#110943
since we are sometime applying incoming snapshots as normal writes.
As a reminder, we do not deduct tokens for data ingested/written to the
LSM for range snapshots since this can starve out normal writes. We
also compensate for these ignored bytes when fitting the linear model.
The change in that compensation logic is what this PR addresses.

Informs cockroachdb#109808

Epic: none

Release note: None
  • Loading branch information
sumeerbhola authored and aliher1911 committed Oct 9, 2023
1 parent ae54751 commit b1e6000
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 161 deletions.
16 changes: 9 additions & 7 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,11 @@ type Controller interface {
// periodically polled for weights. The stopper should be used to terminate
// the periodic polling.
SetTenantWeightProvider(TenantWeightProvider, *stop.Stopper)
// SnapshotIngested informs admission control about a range snapshot
// ingestion.
SnapshotIngested(roachpb.StoreID, pebble.IngestOperationStats)
// SnapshotIngestedOrWritten informs admission control about a range
// snapshot ingestion or a range snapshot written as a normal write.
// writeBytes should roughly correspond to the size of the write when
// flushed to a sstable.
SnapshotIngestedOrWritten(_ roachpb.StoreID, _ pebble.IngestOperationStats, writeBytes uint64)
// FollowerStoreWriteBytes informs admission control about writes
// replicated to a raft follower, that have not been subject to admission
// control.
Expand Down Expand Up @@ -532,15 +534,15 @@ func (n *controllerImpl) SetTenantWeightProvider(
}()
}

// SnapshotIngested implements the Controller interface.
func (n *controllerImpl) SnapshotIngested(
storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats,
// SnapshotIngestedOrWritten implements the Controller interface.
func (n *controllerImpl) SnapshotIngestedOrWritten(
storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats, writeBytes uint64,
) {
storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID))
if storeAdmissionQ == nil {
return
}
storeAdmissionQ.StatsToIgnore(ingestStats)
storeAdmissionQ.StatsToIgnore(ingestStats, writeBytes)
}

// FollowerStoreWriteBytes implements the Controller interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func (r *Replica) applySnapshot(
}
}
if r.store.cfg.KVAdmissionController != nil {
r.store.cfg.KVAdmissionController.SnapshotIngested(r.store.StoreID(), ingestStats)
r.store.cfg.KVAdmissionController.SnapshotIngestedOrWritten(r.store.StoreID(), ingestStats, 0)
}
stats.ingestion = timeutil.Now()

Expand Down
6 changes: 5 additions & 1 deletion pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,11 @@ type storeAdmissionStats struct {
// that PR is closer to the final solution, and this is a step in that
// direction).
statsToIgnore struct {
pebble.IngestOperationStats
// Stats for ingests.
ingestStats pebble.IngestOperationStats
// Stats for regular writes. These roughly correspond to what the writes
// will turn into when written to a flushed sstable.
writeBytes uint64
}
// aux represents additional information carried for informational purposes
// (e.g. for logging).
Expand Down
7 changes: 5 additions & 2 deletions pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,11 @@ func (res adjustTokensResult) SafeFormat(p redact.SafePrinter, _ rune) {
ib := humanizeutil.IBytes
// NB: "≈" indicates smoothed quantities.
p.Printf("compaction score %v (%d ssts, %d sub-levels), ", res.ioThreshold, res.ioThreshold.L0NumFiles, res.ioThreshold.L0NumSubLevels)
p.Printf("L0 growth %s (write %s ingest %s ignored %s): ", ib(res.aux.intL0AddedBytes),
ib(res.aux.perWorkTokensAux.intL0WriteBytes), ib(res.aux.perWorkTokensAux.intL0IngestedBytes),
p.Printf("L0 growth %s (write %s (ignored %s) ingest %s (ignored %s)): ",
ib(res.aux.intL0AddedBytes),
ib(res.aux.perWorkTokensAux.intL0WriteBytes),
ib(res.aux.perWorkTokensAux.intL0IgnoredWriteBytes),
ib(res.aux.perWorkTokensAux.intL0IngestedBytes),
ib(res.aux.perWorkTokensAux.intL0IgnoredIngestedBytes))
// Writes to L0 that we expected because requests told admission control.
// This is the "easy path", from an estimation perspective, if all regular
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/admission/io_load_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,9 @@ func TestBadIOLoadListenerStats(t *testing.T) {
req.stats.workCount = rand.Uint64()
req.stats.writeAccountedBytes = rand.Uint64()
req.stats.ingestedAccountedBytes = rand.Uint64()
req.stats.statsToIgnore.Bytes = rand.Uint64()
req.stats.statsToIgnore.ApproxIngestedIntoL0Bytes = rand.Uint64()
req.stats.statsToIgnore.ingestStats.Bytes = rand.Uint64()
req.stats.statsToIgnore.ingestStats.ApproxIngestedIntoL0Bytes = rand.Uint64()
req.stats.statsToIgnore.writeBytes = rand.Uint64()
}
kvGranter := &testGranterNonNegativeTokens{t: t}
st := cluster.MakeTestingClusterSettings()
Expand Down
31 changes: 22 additions & 9 deletions pkg/util/admission/store_token_estimation.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,13 @@ type perWorkTokensAux struct {
intL0WriteBypassedAccountedBytes int64
intIngestedBypassedAccountedBytes int64

// The ignored bytes are included in intL0IngestedBytes, and in
// intLSMWriteAndIngestedBytes, and may even be higher than that value
// because these are from a different source.
// These ignored bytes are included in intL0WriteBytes, and may even be
// higher than that value because these are from a different source.
intL0IgnoredWriteBytes int64

// These ignored bytes are included in intL0IngestedBytes, and in
// intLSMIngestedBytes, and may even be higher than that value because these
// are from a different source.
intL0IgnoredIngestedBytes int64
}

Expand Down Expand Up @@ -180,9 +184,15 @@ func (e *storePerWorkTokenEstimator) updateEstimates(
return
}
intL0WriteBytes := int64(l0Metrics.BytesFlushed) - int64(e.cumL0WriteBytes)
intL0IgnoredWriteBytes := int64(admissionStats.statsToIgnore.writeBytes) -
int64(e.cumStoreAdmissionStats.statsToIgnore.writeBytes)
adjustedIntL0WriteBytes := intL0WriteBytes - intL0IgnoredWriteBytes
if adjustedIntL0WriteBytes < 0 {
adjustedIntL0WriteBytes = 0
}
intL0IngestedBytes := int64(l0Metrics.BytesIngested) - int64(e.cumL0IngestedBytes)
intL0IgnoredIngestedBytes := int64(admissionStats.statsToIgnore.ApproxIngestedIntoL0Bytes) -
int64(e.cumStoreAdmissionStats.statsToIgnore.ApproxIngestedIntoL0Bytes)
intL0IgnoredIngestedBytes := int64(admissionStats.statsToIgnore.ingestStats.ApproxIngestedIntoL0Bytes) -
int64(e.cumStoreAdmissionStats.statsToIgnore.ingestStats.ApproxIngestedIntoL0Bytes)
adjustedIntL0IngestedBytes := intL0IngestedBytes - intL0IgnoredIngestedBytes
if adjustedIntL0IngestedBytes < 0 {
adjustedIntL0IngestedBytes = 0
Expand All @@ -196,23 +206,25 @@ func (e *storePerWorkTokenEstimator) updateEstimates(
intIngestedAccountedBytes := int64(admissionStats.ingestedAccountedBytes) -
int64(e.cumStoreAdmissionStats.ingestedAccountedBytes)
e.atDoneL0WriteTokensLinearModel.updateModelUsingIntervalStats(
intL0WriteAccountedBytes, intL0WriteBytes, intWorkCount)
intL0WriteAccountedBytes, adjustedIntL0WriteBytes, intWorkCount)
e.atDoneL0IngestTokensLinearModel.updateModelUsingIntervalStats(
intIngestedAccountedBytes, adjustedIntL0IngestedBytes, intWorkCount)
// Ingest across all levels model.
intLSMIngestedBytes := int64(cumLSMIngestedBytes) - int64(e.cumLSMIngestedBytes)
intIgnoredIngestedBytes :=
int64(admissionStats.statsToIgnore.Bytes) - int64(e.cumStoreAdmissionStats.statsToIgnore.Bytes)
int64(admissionStats.statsToIgnore.ingestStats.Bytes) -
int64(e.cumStoreAdmissionStats.statsToIgnore.ingestStats.Bytes)
adjustedIntLSMIngestedBytes := intLSMIngestedBytes - intIgnoredIngestedBytes
if adjustedIntLSMIngestedBytes < 0 {
adjustedIntLSMIngestedBytes = 0
}
e.atDoneIngestTokensLinearModel.updateModelUsingIntervalStats(
intIngestedAccountedBytes, adjustedIntLSMIngestedBytes, intWorkCount)

intL0TotalBytes := intL0WriteBytes + adjustedIntL0IngestedBytes
intL0TotalBytes := adjustedIntL0WriteBytes + adjustedIntL0IngestedBytes
if intWorkCount > 1 && intL0TotalBytes > 0 {
// Update the atAdmissionWorkTokens
// Update the atAdmissionWorkTokens. NB: this is only used for requests
// that don't use replication flow control.
intAtAdmissionWorkTokens := intL0TotalBytes / intWorkCount
const alpha = 0.5
e.atAdmissionWorkTokens = int64(alpha*float64(intAtAdmissionWorkTokens) +
Expand All @@ -235,6 +247,7 @@ func (e *storePerWorkTokenEstimator) updateEstimates(
int64(e.cumStoreAdmissionStats.aux.writeBypassedAccountedBytes),
intIngestedBypassedAccountedBytes: int64(admissionStats.aux.ingestedBypassedAccountedBytes) -
int64(e.cumStoreAdmissionStats.aux.ingestedBypassedAccountedBytes),
intL0IgnoredWriteBytes: intL0IgnoredWriteBytes,
intL0IgnoredIngestedBytes: intL0IgnoredIngestedBytes,
}
// Store the latest cumulative values.
Expand Down
11 changes: 9 additions & 2 deletions pkg/util/admission/store_token_estimation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,15 @@ func TestStorePerWorkTokenEstimator(t *testing.T) {
if d.HasArg("ignore-ingested-into-L0") {
var ignoreIngestedIntoL0 int
d.ScanArgs(t, "ignore-ingested-into-L0", &ignoreIngestedIntoL0)
admissionStats.statsToIgnore.ApproxIngestedIntoL0Bytes += uint64(ignoreIngestedIntoL0)
admissionStats.statsToIgnore.Bytes += uint64(ignoreIngestedIntoL0)
admissionStats.statsToIgnore.ingestStats.ApproxIngestedIntoL0Bytes +=
uint64(ignoreIngestedIntoL0)
admissionStats.statsToIgnore.ingestStats.Bytes +=
uint64(ignoreIngestedIntoL0)
}
if d.HasArg("ignored-written") {
var ignoredWritten int
d.ScanArgs(t, "ignored-written", &ignoredWritten)
admissionStats.statsToIgnore.writeBytes += uint64(ignoredWritten)
}
estimator.updateEstimates(l0Metrics, cumLSMIngestedBytes, admissionStats)
wL0lm, iL0lm, ilm := estimator.getModelsAtDone()
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/admission/testdata/format_adjust_tokens_stats.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
echo
----
zero:
compaction score 0.000 (0 ssts, 0 sub-levels), L0 growth 0 B (write 0 B ingest 0 B ignored 0 B): requests 0 (0 bypassed) with 0 B acc-write (0 B bypassed) + 0 B acc-ingest (0 B bypassed) + write-model 0.00x+0 B (smoothed 0.00x+0 B) + ingested-model 0.00x+0 B (smoothed 0.00x+0 B) + at-admission-tokens 0 B, compacted 0 B [≈0 B], flushed 0 B [≈0 B]; admitting all; elastic-disk-bw tokens 0 B (used 0 B, regular used 0 B): write model 0.00x+0 B ingest model 0.00x+0 B, disk bw read 0 B write 0 B provisioned 0 B; write stalls 12
compaction score 0.000 (0 ssts, 0 sub-levels), L0 growth 0 B (write 0 B (ignored 0 B) ingest 0 B (ignored 0 B)): requests 0 (0 bypassed) with 0 B acc-write (0 B bypassed) + 0 B acc-ingest (0 B bypassed) + write-model 0.00x+0 B (smoothed 0.00x+0 B) + ingested-model 0.00x+0 B (smoothed 0.00x+0 B) + at-admission-tokens 0 B, compacted 0 B [≈0 B], flushed 0 B [≈0 B]; admitting all; elastic-disk-bw tokens 0 B (used 0 B, regular used 0 B): write model 0.00x+0 B ingest model 0.00x+0 B, disk bw read 0 B write 0 B provisioned 0 B; write stalls 12
real-numbers:
compaction score 2.700[L0-overload] (195 ssts, 27 sub-levels), L0 growth 577 MiB (write 0 B ingest 0 B ignored 0 B): requests 0 (0 bypassed) with 0 B acc-write (0 B bypassed) + 0 B acc-ingest (0 B bypassed) + write-model 0.00x+0 B (smoothed 0.00x+0 B) + ingested-model 0.00x+0 B (smoothed 0.00x+0 B) + at-admission-tokens 0 B, compacted 77 MiB [≈62 MiB], flushed 0 B [≈0 B]; admitting 116 MiB (rate 7.7 MiB/s) (elastic 1 B rate 0 B/s) due to L0 growth (used total: 0 B elastic 0 B); elastic-disk-bw tokens 0 B (used 0 B, regular used 0 B): write model 0.00x+0 B ingest model 0.00x+0 B, disk bw read 0 B write 0 B provisioned 0 B; write stalls 2
compaction score 2.700[L0-overload] (195 ssts, 27 sub-levels), L0 growth 577 MiB (write 0 B (ignored 0 B) ingest 0 B (ignored 0 B)): requests 0 (0 bypassed) with 0 B acc-write (0 B bypassed) + 0 B acc-ingest (0 B bypassed) + write-model 0.00x+0 B (smoothed 0.00x+0 B) + ingested-model 0.00x+0 B (smoothed 0.00x+0 B) + at-admission-tokens 0 B, compacted 77 MiB [≈62 MiB], flushed 0 B [≈0 B]; admitting 116 MiB (rate 7.7 MiB/s) (elastic 1 B rate 0 B/s) due to L0 growth (used total: 0 B elastic 0 B); elastic-disk-bw tokens 0 B (used 0 B, regular used 0 B): write model 0.00x+0 B ingest model 0.00x+0 B, disk bw read 0 B write 0 B provisioned 0 B; write stalls 2
Loading

0 comments on commit b1e6000

Please sign in to comment.