Skip to content

Commit

Permalink
kvflowcontrol: introduce two new knobs for testing
Browse files Browse the repository at this point in the history
This commit introduces two testing knobs:

```
// OverrideBypassAdmitWaitForEval is used to override the behavior of the
// WaitForEval. When set to true, WaitForEval will return immediately and
// pretend that the request was admitted. Otherwise, when set to false, or
// unset, WaitForEval will behave normally.
OverrideBypassAdmitWaitForEval func() bool
// OverrideAlwaysRefreshSendStreamStats is used to override the behavior of
// the send stream stats refresh. When set to true, the send stream stats
// will always be refreshed on a HandleRaftEventRaftMuLocked call. Otherwise,
// when set to false, the default behavior will be used.
OverrideAlwaysRefreshSendStreamStats bool
```

Also, thread send queue precise size adjustments via a method which also
asserts that the size is consistent between the count and bytes.

Part of: cockroachdb#132614
Release note: None
  • Loading branch information
kvoli committed Oct 31, 2024
1 parent ef26d15 commit 256d5fb
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 15 deletions.
67 changes: 52 additions & 15 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,13 +696,23 @@ func (rc *rangeController) WaitForEval(
rc.opts.EvalWaitMetrics.OnWaiting(wc)
waitCategory, waitCategoryChangeCh := rc.opts.WaitForEvalConfig.Current()
bypass := waitCategory.Bypass(wc)
if knobs := rc.opts.Knobs; knobs != nil &&
knobs.OverrideBypassAdmitWaitForEval != nil &&
rc.opts.Knobs.OverrideBypassAdmitWaitForEval() {
// This is used by some tests to bypass the wait for eval, in order to get
// the entry into HandleRaftEventRaftMuLocked, where normally the write
// would block here.
bypass = true
// We pretend the request also waited, even though it didn't.
waited = true
}
if bypass {
if expensiveLoggingEnabled {
log.VEventf(ctx, 2, "r%v/%v bypassed request (pri=%v)",
rc.opts.RangeID, rc.opts.LocalReplicaID, pri)
}
rc.opts.EvalWaitMetrics.OnBypassed(wc, 0 /* duration */)
return false, nil
return waited, nil
}
waitForAllReplicateHandles := false
if wc == admissionpb.ElasticWorkClass {
Expand Down Expand Up @@ -1584,7 +1594,8 @@ func (rc *rangeController) maybeUpdateSendQueueStatsRaftMuLocked() {
rc.mu.Lock()
defer rc.mu.Unlock()
if nextUpdateTime := rc.mu.lastSendQueueStatRefresh.Add(
sendQueueStatRefreshInterval); now.After(nextUpdateTime) {
sendQueueStatRefreshInterval); now.After(nextUpdateTime) ||
(rc.opts.Knobs != nil && rc.opts.Knobs.OverrideAlwaysRefreshSendStreamStats) {
// We should update the stats, it has been longer than
// sendQueueStatRefreshInterval.
updateStats = true
Expand Down Expand Up @@ -2494,6 +2505,26 @@ func (rss *replicaSendStream) closeRaftMuAndStreamLocked(ctx context.Context) {
rss.mu.closed = true
}

func (rss *replicaSendStream) applySendQueuePreciseSizeDeltaRaftMuAndStreamLocked(
ctx context.Context, delta kvflowcontrol.Tokens,
) {
rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
rss.mu.AssertHeld()

before := rss.mu.sendQueue.preciseSizeSum
after := before + delta
rss.mu.sendQueue.preciseSizeSum = after
if rss.isEmptySendQueueStreamLocked() && after != 0 {
panic(errors.AssertionFailedf(
"empty send-queue with non-zero precise size: "+
"before=%v after=%v delta=%v [queue_len=%v, queue_approx_size=%v]",
before, after, delta,
rss.queueLengthRaftMuAndStreamLocked(),
rss.approxQueueSizeStreamLocked(),
))
}
}

func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
ctx context.Context, event raftEventForReplica, directive replicaDirective,
) (transitionedSendQState bool, err error) {
Expand Down Expand Up @@ -2566,7 +2597,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
if inSendQueue && entry.id.index >= rss.mu.nextRaftIndexInitial {
// Was in send-queue and had eval tokens deducted for it.
rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entry.pri)] -= tokens
rss.mu.sendQueue.preciseSizeSum -= tokens
rss.applySendQueuePreciseSizeDeltaRaftMuAndStreamLocked(ctx, -tokens)
}
rss.raftMu.tracker.Track(ctx, entry.id, pri, tokens)
sendTokensToDeduct[WorkClassFromRaftPriority(pri)] += tokens
Expand Down Expand Up @@ -2597,6 +2628,12 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
}
var pri raftpb.Priority
inSendQueue := false
tokens := entry.tokens
if rss.parent.parent.opts.Knobs != nil {
if fn := rss.parent.parent.opts.Knobs.OverrideTokenDeduction; fn != nil {
tokens = fn(tokens)
}
}
if entry.id.index >= rss.mu.sendQueue.indexToSend {
// Being added to the send-queue.
inSendQueue = true
Expand All @@ -2611,16 +2648,10 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
} else {
pri = raftpb.LowPri
}
rss.mu.sendQueue.preciseSizeSum += entry.tokens
rss.applySendQueuePreciseSizeDeltaRaftMuAndStreamLocked(ctx, +tokens)
} else {
pri = entry.pri
}
tokens := entry.tokens
if rss.parent.parent.opts.Knobs != nil {
if fn := rss.parent.parent.opts.Knobs.OverrideTokenDeduction; fn != nil {
tokens = fn(tokens)
}
}
if inSendQueue && entry.id.index >= rss.mu.nextRaftIndexInitial {
// Is in send-queue and will have eval tokens deducted for it.
rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entry.pri)] += tokens
Expand Down Expand Up @@ -2738,22 +2769,28 @@ func (rss *replicaSendStream) dequeueFromQueueAndSendRaftMuAndStreamLocked(
panic(errors.AssertionFailedf("index %d >= nextRaftIndex %d", entryState.id.index,
rss.mu.sendQueue.nextRaftIndex))
}
tokens := entryState.tokens
if rss.parent.parent.opts.Knobs != nil {
if fn := rss.parent.parent.opts.Knobs.OverrideTokenDeduction; fn != nil {
tokens = fn(tokens)
}
}
rss.mu.sendQueue.indexToSend++
isApproximatedEntry := entryState.id.index < rss.mu.nextRaftIndexInitial
if isApproximatedEntry {
approximatedNumEntries++
if entryState.usesFlowControl {
approximatedNumActualTokens += entryState.tokens
approximatedNumActualTokens += tokens
}
}
if entryState.usesFlowControl {
if !isApproximatedEntry {
rss.mu.sendQueue.preciseSizeSum -= entryState.tokens
rss.applySendQueuePreciseSizeDeltaRaftMuAndStreamLocked(ctx, -tokens)
rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entryState.pri)] -=
entryState.tokens
tokens
}
tokensNeeded += entryState.tokens
rss.raftMu.tracker.Track(ctx, entryState.id, raftpb.LowPri, entryState.tokens)
tokensNeeded += tokens
rss.raftMu.tracker.Track(ctx, entryState.id, raftpb.LowPri, tokens)
}
}
if approximatedNumEntries > 0 {
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ type TestingKnobs struct {
// mode, while also having the ability to switch between the two
// apply_to_(elastic|all) modes.
OverridePullPushMode func() bool
// OverrideBypassAdmitWaitForEval is used to override the behavior of the
// WaitForEval. When set to true, WaitForEval will return immediately and
// pretend that the request was admitted. Otherwise, when set to false, or
// unset, WaitForEval will behave normally.
OverrideBypassAdmitWaitForEval func() bool
// OverrideAlwaysRefreshSendStreamStats is used to override the behavior of
// the send stream stats refresh. When set to true, the send stream stats
// will always be refreshed on a HandleRaftEventRaftMuLocked call. Otherwise,
// when set to false, the default behavior will be used.
OverrideAlwaysRefreshSendStreamStats bool
}

// TestingKnobsV1 are the testing knobs that appply to replication flow control
Expand Down

0 comments on commit 256d5fb

Please sign in to comment.