Skip to content

Commit

Permalink
kvserver: [dnm] add TestFlowControlSendQueueRangeSplitSnap test
Browse files Browse the repository at this point in the history
This test currently fails, while also churning snapshots towards the
RHS range post-split. It appears that the RHS post-split replica on the
store which has no available tokens (from the leader, n1's perspective)
is effectively wedged.

```bash
dev test pkg/kv/kvserver -v --vmodule='replica_raft=1,kvflowcontroller=2,replica_proposal_buf=1,raft_transport=2,kvflowdispatch=1,kvadmission=1,kvflowhandle=1,work_queue=1,replica_flow_control=1,tracker=1,client_raft_helpers_test=1,raft=1,admission=1,replica_flow_control=1,work_queue=1,replica_raft=1,replica_proposal_buf=1,raft_transport=2,kvadmission=1,work_queue=1,replica_flow_control=1,client_raft_helpers_test=1,range_controller=2,token_counter=2,token_tracker=2,processor=2,kvflowhandle=1' -f  TestFlowControlSendQueueRangeSplitSnap --show-logs --timeout=120s
```

Epic: none
Release note: None
  • Loading branch information
kvoli committed Nov 15, 2024
1 parent d2a21ee commit 2b4d469
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
129 changes: 129 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4917,6 +4917,135 @@ ORDER BY streams DESC;
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

func testingMkFlowStream(serverIdx int) kvflowcontrol.Stream {
return kvflowcontrol.Stream{
StoreID: roachpb.StoreID(serverIdx + 1),
TenantID: roachpb.SystemTenantID,
}
}

func TestFlowControlSendQueueRangeSplitSnap(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)

disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_range_split_snap")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID + 1)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`
-- Start by exhausting the tokens from n1->s3 and blocking admission on s3.
-- (Issuing 4x1MiB regular, 3x replicated write that's not admitted on s3.)`)
setTokenReturnEnabled(true /* enabled */, 0, 1)
setTokenReturnEnabled(false /* enabled */, 2)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to pre-split range)`)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to pre-split range)`)

h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Splitting range.)`)
left, right := tc.SplitRangeOrFatal(t, k.Next())
h.waitForConnectedStreams(ctx, left.RangeID, 3, 0 /* serverIdx */)
// NB: The RHS won't be caught up and will therefore not be in StateReplicate
// post-split. n1 will be attempting to snapshot (raft snapshot queue) the
// RHS on s3.
h.waitForConnectedStreams(ctx, right.RangeID, 3, 0 /* serverIdx */)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
echo
----

0 comments on commit 2b4d469

Please sign in to comment.