Skip to content

Commit

Permalink
kvserver: add rac2 send queue functional test
Browse files Browse the repository at this point in the history
Introduce `TestFlowControlSendQueue`, which exercises:

1. Send queue formation via send token exhaustion
2. Send queue force-flushing via a node disconnect
3. Send queue formation prevention via a testing knob to skip
   wait-for-eval

More specifically, the test has the following structure:

```sql
--   Start with three voters on [n1,n2,n3], where n1 is the leader+leaseholder.
--   Large regular write -16 MiB.
--   Allow admission [n1,n2].
--   - Tokens should be returned for n1 and n2.
--   Block admission [n2].
--   Regular write -1 MiB.
--   - Shouldn't be blocked on wait-for-eval because of quorum [n1,n2].
--   - Metrics should reflect send queue formation on n3.
--   Stop n2.
--   Regular write -1 MiB.
--   - Blocks on wait-for-eval, however the test bypasses this instance.
--   - Metrics should reflect n3 being force flushed.
--   Allow admission [n1,n2,n3].
--   Start n2.
--   Add n4, n5, the voters now are [n1,n2,n3,n4,n5].
--   Block admission [n4,n5] (already blocked)
--   Regular write -16 MiB.
--   Regular write -1  MiB.
--   - Shouldn't be blocked on wait-for-eval because of quorum [n1,n2,n3]
--   - Metrics should reflect send queue formation on n4,n5.
--   Unblock admission [n4,n5].
--   - Wait for tokens to be returned.
--   Block admission [n2,n3,n4,n5].
--   Regular write -16 MiB.
--   Regular write -1  MiB.
--   - Blocks on wait-for-eval, however the test bypasses this instance.
--   - Metrics should reflect 2 streams being prevented from forming a send queue.
--   Allow admission [n1,n2,n3,n4,n5] (all).
--   Assert all tokens returned.
```

Part of: cockroachdb#132614
Release note: None
  • Loading branch information
kvoli committed Oct 31, 2024
1 parent ef26d15 commit 41210c5
Show file tree
Hide file tree
Showing 9 changed files with 1,159 additions and 31 deletions.
470 changes: 459 additions & 11 deletions pkg/kv/kvserver/flow_control_integration_test.go

Large diffs are not rendered by default.

67 changes: 52 additions & 15 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,13 +696,23 @@ func (rc *rangeController) WaitForEval(
rc.opts.EvalWaitMetrics.OnWaiting(wc)
waitCategory, waitCategoryChangeCh := rc.opts.WaitForEvalConfig.Current()
bypass := waitCategory.Bypass(wc)
if knobs := rc.opts.Knobs; knobs != nil &&
knobs.OverrideBypassAdmitWaitForEval != nil &&
rc.opts.Knobs.OverrideBypassAdmitWaitForEval() {
// This is used by some tests to bypass the wait for eval, in order to get
// the entry into HandleRaftEventRaftMuLocked, where normally the write
// would block here.
bypass = true
// We pretend the request also waited, even though it didn't.
waited = true
}
if bypass {
if expensiveLoggingEnabled {
log.VEventf(ctx, 2, "r%v/%v bypassed request (pri=%v)",
rc.opts.RangeID, rc.opts.LocalReplicaID, pri)
}
rc.opts.EvalWaitMetrics.OnBypassed(wc, 0 /* duration */)
return false, nil
return waited, nil
}
waitForAllReplicateHandles := false
if wc == admissionpb.ElasticWorkClass {
Expand Down Expand Up @@ -1584,7 +1594,8 @@ func (rc *rangeController) maybeUpdateSendQueueStatsRaftMuLocked() {
rc.mu.Lock()
defer rc.mu.Unlock()
if nextUpdateTime := rc.mu.lastSendQueueStatRefresh.Add(
sendQueueStatRefreshInterval); now.After(nextUpdateTime) {
sendQueueStatRefreshInterval); now.After(nextUpdateTime) ||
(rc.opts.Knobs != nil && rc.opts.Knobs.OverrideAlwaysRefreshSendStreamStats) {
// We should update the stats, it has been longer than
// sendQueueStatRefreshInterval.
updateStats = true
Expand Down Expand Up @@ -2494,6 +2505,26 @@ func (rss *replicaSendStream) closeRaftMuAndStreamLocked(ctx context.Context) {
rss.mu.closed = true
}

func (rss *replicaSendStream) applySendQueuePreciseSizeDeltaRaftMuAndStreamLocked(
ctx context.Context, delta kvflowcontrol.Tokens,
) {
rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
rss.mu.AssertHeld()

before := rss.mu.sendQueue.preciseSizeSum
after := before + delta
rss.mu.sendQueue.preciseSizeSum = after
if rss.isEmptySendQueueStreamLocked() && after != 0 {
panic(errors.AssertionFailedf(
"empty send-queue with non-zero precise size: "+
"before=%v after=%v delta=%v [queue_len=%v, queue_approx_size=%v]",
before, after, delta,
rss.queueLengthRaftMuAndStreamLocked(),
rss.approxQueueSizeStreamLocked(),
))
}
}

func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
ctx context.Context, event raftEventForReplica, directive replicaDirective,
) (transitionedSendQState bool, err error) {
Expand Down Expand Up @@ -2566,7 +2597,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
if inSendQueue && entry.id.index >= rss.mu.nextRaftIndexInitial {
// Was in send-queue and had eval tokens deducted for it.
rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entry.pri)] -= tokens
rss.mu.sendQueue.preciseSizeSum -= tokens
rss.applySendQueuePreciseSizeDeltaRaftMuAndStreamLocked(ctx, -tokens)
}
rss.raftMu.tracker.Track(ctx, entry.id, pri, tokens)
sendTokensToDeduct[WorkClassFromRaftPriority(pri)] += tokens
Expand Down Expand Up @@ -2597,6 +2628,12 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
}
var pri raftpb.Priority
inSendQueue := false
tokens := entry.tokens
if rss.parent.parent.opts.Knobs != nil {
if fn := rss.parent.parent.opts.Knobs.OverrideTokenDeduction; fn != nil {
tokens = fn(tokens)
}
}
if entry.id.index >= rss.mu.sendQueue.indexToSend {
// Being added to the send-queue.
inSendQueue = true
Expand All @@ -2611,16 +2648,10 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
} else {
pri = raftpb.LowPri
}
rss.mu.sendQueue.preciseSizeSum += entry.tokens
rss.applySendQueuePreciseSizeDeltaRaftMuAndStreamLocked(ctx, +tokens)
} else {
pri = entry.pri
}
tokens := entry.tokens
if rss.parent.parent.opts.Knobs != nil {
if fn := rss.parent.parent.opts.Knobs.OverrideTokenDeduction; fn != nil {
tokens = fn(tokens)
}
}
if inSendQueue && entry.id.index >= rss.mu.nextRaftIndexInitial {
// Is in send-queue and will have eval tokens deducted for it.
rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entry.pri)] += tokens
Expand Down Expand Up @@ -2738,22 +2769,28 @@ func (rss *replicaSendStream) dequeueFromQueueAndSendRaftMuAndStreamLocked(
panic(errors.AssertionFailedf("index %d >= nextRaftIndex %d", entryState.id.index,
rss.mu.sendQueue.nextRaftIndex))
}
tokens := entryState.tokens
if rss.parent.parent.opts.Knobs != nil {
if fn := rss.parent.parent.opts.Knobs.OverrideTokenDeduction; fn != nil {
tokens = fn(tokens)
}
}
rss.mu.sendQueue.indexToSend++
isApproximatedEntry := entryState.id.index < rss.mu.nextRaftIndexInitial
if isApproximatedEntry {
approximatedNumEntries++
if entryState.usesFlowControl {
approximatedNumActualTokens += entryState.tokens
approximatedNumActualTokens += tokens
}
}
if entryState.usesFlowControl {
if !isApproximatedEntry {
rss.mu.sendQueue.preciseSizeSum -= entryState.tokens
rss.applySendQueuePreciseSizeDeltaRaftMuAndStreamLocked(ctx, -tokens)
rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entryState.pri)] -=
entryState.tokens
tokens
}
tokensNeeded += entryState.tokens
rss.raftMu.tracker.Track(ctx, entryState.id, raftpb.LowPri, entryState.tokens)
tokensNeeded += tokens
rss.raftMu.tracker.Track(ctx, entryState.id, raftpb.LowPri, tokens)
}
}
if approximatedNumEntries > 0 {
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ type TestingKnobs struct {
// mode, while also having the ability to switch between the two
// apply_to_(elastic|all) modes.
OverridePullPushMode func() bool
// OverrideBypassAdmitWaitForEval is used to override the behavior of the
// WaitForEval. When set to true, WaitForEval will return immediately and
// pretend that the request was admitted. Otherwise, when set to false, or
// unset, WaitForEval will behave normally.
OverrideBypassAdmitWaitForEval func() bool
// OverrideAlwaysRefreshSendStreamStats is used to override the behavior of
// the send stream stats refresh. When set to true, the send stream stats
// will always be refreshed on a HandleRaftEventRaftMuLocked call. Otherwise,
// when set to false, the default behavior will be used.
OverrideAlwaysRefreshSendStreamStats bool
}

// TestingKnobsV1 are the testing knobs that appply to replication flow control
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ SELECT store_id,
FROM crdb_internal.kv_flow_controller
ORDER BY store_id ASC;

range_id | regular_available | elastic_available
store_id | regular_available | elastic_available
-----------+-------------------+--------------------
1 | 16 MiB | 8.0 MiB
2 | 16 MiB | 8.0 MiB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ SELECT store_id,
FROM crdb_internal.kv_flow_controller_v2
ORDER BY store_id ASC;

range_id | eval_regular_available | eval_elastic_available
store_id | eval_regular_available | eval_elastic_available
-----------+------------------------+-------------------------
1 | 16 MiB | 8.0 MiB
2 | 16 MiB | 8.0 MiB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ SELECT store_id,
FROM crdb_internal.kv_flow_controller_v2
ORDER BY store_id ASC;

range_id | eval_regular_available | eval_elastic_available
store_id | eval_regular_available | eval_elastic_available
-----------+------------------------+-------------------------
1 | 16 MiB | 8.0 MiB
2 | 16 MiB | 8.0 MiB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ SELECT store_id,
FROM crdb_internal.kv_flow_controller_v2
ORDER BY store_id ASC;

range_id | eval_regular_available | eval_elastic_available
store_id | eval_regular_available | eval_elastic_available
-----------+------------------------+-------------------------
1 | 16 MiB | 8.0 MiB
2 | 16 MiB | 8.0 MiB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ SELECT store_id,
FROM crdb_internal.kv_flow_controller_v2
ORDER BY store_id ASC;

range_id | eval_regular_available | eval_elastic_available
store_id | eval_regular_available | eval_elastic_available
-----------+------------------------+-------------------------
1 | 16 MiB | 8.0 MiB
2 | 16 MiB | 8.0 MiB
Expand Down
Loading

0 comments on commit 41210c5

Please sign in to comment.