From 4559caedc89c82120d2cafc1e5c52be0e39bc6ec Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 19 Sep 2024 20:05:48 -0400 Subject: [PATCH] kvserver: refactor v1 flow control integration tests for v2 The existing flow control integration tests, in `flow_control_integration_test.go`, provide a substantial suite of tests to verify flow control integration behavior. Refactor the `flowControlTestHelper` and associated use in tests, to enable sharing the helper between v1 (existing) and not-yet implemented v2 flow control integration tests. Part of: #130187 Release note: None --- .../kvserver/flow_control_integration_test.go | 617 +++++++++--------- 1 file changed, 298 insertions(+), 319 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 62b0564ee155..1c21b116cd01 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" @@ -98,47 +99,24 @@ func TestFlowControlBasic(t *testing.T) { desc, err := tc.LookupRange(k) require.NoError(t, err) - for i := 0; i < numNodes; i++ { - si, err := tc.Server(i).GetStores().(*kvserver.Stores).GetStore(tc.Server(i).GetFirstStoreID()) - require.NoError(t, err) - tc.Servers[i].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(si.StoreID(), - &unreliableRaftHandler{ - rangeID: desc.RangeID, - IncomingRaftMessageHandler: si, - unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ - dropReq: func(req *kvserverpb.RaftMessageRequest) bool { - // Install a raft handler to get verbose raft logging. - // - // TODO(irfansharif): Make this a more ergonomic - // testing knob instead. - return false - }, - }, - }) - } - n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("basic") // this test behaves identically with or without the fast path + h.enableVerboseRaftMsgLoggingForRange(desc.RangeID) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- Flow token metrics, before issuing the regular 1MiB replicated write.`) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- (Issuing + admitting a regular 1MiB, triply replicated write...)`) h.log("sending put request") h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) h.log("sent put request") - h.waitForAllTokensReturned(ctx, 3) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) h.comment(` -- Stream counts as seen by n1 post-write. We should see three {regular,elastic} -- streams given there are three nodes and we're using a replication factor of @@ -165,12 +143,7 @@ ORDER BY streams DESC; -- {regular,elastic} tokens deducted and returned, and {8*3=24MiB,16*3=48MiB} of -- {regular,elastic} tokens available. Everything should be accounted for. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) // When run using -v the vmodule described at the top of this file, this // test demonstrates end-to-end flow control machinery in the happy @@ -270,34 +243,29 @@ func TestFlowControlRangeSplitMerge(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("split_merge") desc, err := tc.LookupRange(k) require.NoError(t, err) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.log("sending put request to pre-split range") h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) h.log("sent put request to pre-split range") - h.waitForAllTokensReturned(ctx, 3) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) h.comment(` -- Flow token metrics from n1 after issuing + admitting the regular 1MiB 3x -- replicated write to the pre-split range. There should be 3MiB of -- {regular,elastic} tokens {deducted,returned}. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- (Splitting range.)`) left, right := tc.SplitRangeOrFatal(t, k.Next()) - h.waitForConnectedStreams(ctx, right.RangeID, 3) + h.waitForConnectedStreams(ctx, right.RangeID, 3, 0 /* serverIdx */) // [T1,n1,s1,r63/1:/{Table/62-Max},*kvpb.AdminSplitRequest] initiating a split of this range at key /Table/Max [r64] (manual) // [T1,n1,s1,r64/1:/{Table/Max-Max},raft] connected to stream: t1/s1 // [T1,n1,s1,r64/1:/{Table/Max-Max},raft] connected to stream: t1/s2 @@ -311,19 +279,14 @@ ORDER BY name ASC; h.put(ctx, roachpb.Key(right.StartKey), 3<<20 /* 3MiB */, admissionpb.NormalPri) h.log("sent 3MiB put request to post-split RHS") - h.waitForAllTokensReturned(ctx, 3) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) h.comment(` -- Flow token metrics from n1 after further issuing 2MiB and 3MiB writes to -- post-split LHS and RHS ranges respectively. We should see 15MiB extra tokens -- {deducted,returned}, which comes from (2MiB+3MiB)*3=15MiB. So we stand at -- 3MiB+15MiB=18MiB now. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- Observe the newly split off replica, with its own three streams.`) h.query(n1, ` @@ -348,18 +311,13 @@ ORDER BY streams DESC; h.put(ctx, roachpb.Key(merged.StartKey), 4<<20 /* 4MiB */, admissionpb.NormalPri) h.log("sent 4MiB put request to post-merged range") - h.waitForAllTokensReturned(ctx, 3) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) h.comment(` -- Flow token metrics from n1 after issuing 4MiB of regular replicated writes to -- the post-merged range. We should see 12MiB extra tokens {deducted,returned}, -- which comes from 4MiB*3=12MiB. So we stand at 18MiB+12MiB=30MiB now. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- Observe only the merged replica with its own three streams.`) h.query(n1, ` @@ -411,13 +369,13 @@ func TestFlowControlBlockedAdmission(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) n2 := sqlutils.MakeSQLRunner(tc.ServerConn(1)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("blocked_admission") desc, err := tc.LookupRange(k) require.NoError(t, err) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Issuing regular 1MiB, 3x replicated write that's not admitted.)`) h.log("sending put requests") @@ -431,12 +389,7 @@ func TestFlowControlBlockedAdmission(t *testing.T) { -- that are yet to get admitted. We see 5*1MiB*3=15MiB deductions of -- {regular,elastic} tokens with no corresponding returns. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- Observe the total tracked tokens per-stream on n1.`) h.query(n1, ` @@ -452,7 +405,7 @@ ORDER BY name ASC; h.comment(`-- (Allow below-raft admission to proceed.)`) disableWorkQueueGranting.Store(false) - h.waitForAllTokensReturned(ctx, 3) // wait for admission + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) // wait for admission h.comment(`-- Observe flow token dispatch metrics from n1.`) h.query(n1, ` @@ -475,12 +428,7 @@ ORDER BY name ASC; -- {regular,elastic} tokens, and the available capacities going back to what -- they were. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) } // TestFlowControlAdmissionPostSplitMerge walks through what happens with flow @@ -529,14 +477,14 @@ func TestFlowControlAdmissionPostSplitMerge(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("admission_post_split_merge") desc, err := tc.LookupRange(k) require.NoError(t, err) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.log("sending put request to pre-split range") h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) @@ -549,16 +497,11 @@ func TestFlowControlAdmissionPostSplitMerge(t *testing.T) { -- {regular,elastic} tokens with no corresponding returns. The 2*1MiB writes -- happened on what is soon going to be the LHS and RHS of a range being split. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- (Splitting range.)`) left, right := tc.SplitRangeOrFatal(t, k.Next()) - h.waitForConnectedStreams(ctx, right.RangeID, 3) + h.waitForConnectedStreams(ctx, right.RangeID, 3, 0 /* serverIdx */) h.log("sending 2MiB put request to post-split LHS") h.put(ctx, k, 2<<20 /* 2MiB */, admissionpb.NormalPri) @@ -574,12 +517,7 @@ ORDER BY name ASC; -- deducted which comes from (2MiB+3MiB)*3=15MiB. So we stand at -- 6MiB+15MiB=21MiB now. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- Observe the newly split off replica, with its own three streams.`) h.query(n1, ` @@ -620,7 +558,7 @@ ORDER BY streams DESC; h.comment(`-- (Allow below-raft admission to proceed.)`) disableWorkQueueGranting.Store(false) - h.waitForAllTokensReturned(ctx, 3) // wait for admission + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) // wait for admission h.comment(` -- Flow token metrics from n1 after work gets admitted. We see all outstanding @@ -628,12 +566,7 @@ ORDER BY streams DESC; -- - the LHS before the merge, and -- - the LHS and RHS before the original split. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) } // TestFlowControlCrashedNode tests flow token behavior in the presence of @@ -694,14 +627,14 @@ func TestFlowControlCrashedNode(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("crashed_node") desc, err := tc.LookupRange(k) require.NoError(t, err) tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0)) - h.waitForConnectedStreams(ctx, desc.RangeID, 2) + h.waitForConnectedStreams(ctx, desc.RangeID, 2, 0 /* serverIdx */) h.comment(`-- (Issuing regular 5x1MiB, 2x replicated writes that are not admitted.)`) h.log("sending put requests") @@ -715,12 +648,7 @@ func TestFlowControlCrashedNode(t *testing.T) { -- that are yet to get admitted. We see 5*1MiB*2=10MiB deductions of -- {regular,elastic} tokens with no corresponding returns. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- Observe the per-stream tracked tokens on n1, before n2 is crashed.`) h.query(n1, ` SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) @@ -737,7 +665,7 @@ ORDER BY name ASC; h.comment(`-- (Crashing n2 but disabling the raft-transport-break token return mechanism.)`) tc.StopServer(1) - h.waitForConnectedStreams(ctx, desc.RangeID, 1) + h.waitForConnectedStreams(ctx, desc.RangeID, 1, 0 /* serverIdx */) h.comment(` -- Observe the per-stream tracked tokens on n1, after n2 crashed. We're no @@ -752,12 +680,7 @@ ORDER BY name ASC; -- Flow token metrics from n1 after n2 crashed. Observe that we've returned the -- 5MiB previously held by n2. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) } // TestFlowControlRaftSnapshot tests flow token behavior when one replica needs @@ -847,7 +770,7 @@ func TestFlowControlRaftSnapshot(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) n4 := sqlutils.MakeSQLRunner(tc.ServerConn(3)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("raft_snapshot") @@ -863,7 +786,7 @@ func TestFlowControlRaftSnapshot(t *testing.T) { tc.AddVotersOrFatal(t, k, tc.Targets(3, 4)...) repl := store.LookupReplica(roachpb.RKey(k)) require.NotNil(t, repl) - h.waitForConnectedStreams(ctx, repl.RangeID, 5) + h.waitForConnectedStreams(ctx, repl.RangeID, 5, 0 /* serverIdx */) // Set up a key to replicate across the cluster. We're going to modify this // key and truncate the raft logs from that command after killing one of the @@ -881,12 +804,7 @@ func TestFlowControlRaftSnapshot(t *testing.T) { -- that's not admitted. Since this test is ignoring crashed nodes for token -- deduction purposes, we see a deduction of 5MiB {regular,elastic} tokens. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; - `) + h.query(n1, v1FlowTokensQueryStr) h.comment(` -- Observe the total tracked tokens per-stream on n1. 1MiB is tracked for n1-n5. `) @@ -951,12 +869,7 @@ ORDER BY name ASC; -- RaftTransport streams). But this test is intentionally suppressing that -- behavior to observe token returns when sending raft snapshots. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; - `) + h.query(n1, v1FlowTokensQueryStr) h.comment(` -- Observe the total tracked tokens per-stream on n1. 2MiB is tracked for n1-n5; -- see last comment for an explanation why we're still deducting for n2, n3. @@ -1007,12 +920,7 @@ ORDER BY name ASC; -- progress state, noting that since we've truncated our log, we need to catch -- it up via snapshot. So we release all held tokens. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(` -- Observe the total tracked tokens per-stream on n1. There's nothing tracked @@ -1024,14 +932,14 @@ ORDER BY name ASC; WHERE total_tracked_tokens > 0 `, "range_id", "store_id", "total_tracked_tokens") - h.waitForConnectedStreams(ctx, repl.RangeID, 5) + h.waitForConnectedStreams(ctx, repl.RangeID, 5, 0 /* serverIdx */) // [T1,n1,s1,r63/1:/{Table/Max-Max},raft] 651 connected to stream: t1/s2 // [T1,n1,s1,r63/1:/{Table/Max-Max},raft] 710 connected to stream: t1/s3 h.comment(`-- (Allow below-raft admission to proceed.)`) disableWorkQueueGranting.Store(false) - h.waitForAllTokensReturned(ctx, 5) + h.waitForAllTokensReturned(ctx, 5, 0 /* serverIdx */) h.comment(`-- Observe flow token dispatch metrics from n4.`) h.query(n4, ` @@ -1045,12 +953,7 @@ ORDER BY name ASC; -- Flow token metrics from n1 after work gets admitted. We see the remaining -- 6MiB of {regular,elastic} tokens returned. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(` -- Observe the total tracked tokens per-stream on n1; there should be nothing. @@ -1119,14 +1022,14 @@ func TestFlowControlRaftTransportBreak(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("raft_transport_break") desc, err := tc.LookupRange(k) require.NoError(t, err) tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0)) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Issuing regular 5x1MiB, 3x replicated writes that are not admitted.)`) h.log("sending put requests") @@ -1140,12 +1043,7 @@ func TestFlowControlRaftTransportBreak(t *testing.T) { -- that are yet to get admitted. We see 5*1MiB*3=15MiB deductions of -- {regular,elastic} tokens with no corresponding returns. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(` -- Observe the per-stream tracked tokens on n1, before n2 is crashed. `) @@ -1165,7 +1063,7 @@ ORDER BY name ASC; h.comment(`-- (Crashing n2 but disabling the last-updated token return mechanism.)`) tc.StopServer(1) - h.waitForConnectedStreams(ctx, desc.RangeID, 2) + h.waitForConnectedStreams(ctx, desc.RangeID, 2, 0 /* serverIdx */) h.comment(` -- Observe the per-stream tracked tokens on n1, after n2 crashed. We're no @@ -1181,12 +1079,7 @@ ORDER BY name ASC; -- Flow token metrics from n1 after n2 crashed. Observe that we've returned the -- 5MiB previously held by n2. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) } // TestFlowControlRaftTransportCulled tests flow token behavior when the raft @@ -1255,14 +1148,14 @@ func TestFlowControlRaftTransportCulled(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("raft_transport_culled") desc, err := tc.LookupRange(k) require.NoError(t, err) tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0)) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Issuing regular 5x1MiB, 3x replicated writes that are not admitted.)`) h.log("sending put requests") @@ -1276,12 +1169,7 @@ func TestFlowControlRaftTransportCulled(t *testing.T) { -- that are yet to get admitted. We see 5*1MiB*3=15MiB deductions of -- {regular,elastic} tokens with no corresponding returns. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(` -- Observe the per-stream tracked tokens on n1, before we cull the n1<->n2 raft -- transport stream out of idleness. @@ -1305,7 +1193,7 @@ ORDER BY name ASC; t.Fatalf("timed out") } - h.waitForTotalTrackedTokens(ctx, desc.RangeID, 10<<20 /* 5*1MiB*2=10MiB */) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 10<<20 /* 5*1MiB*2=10MiB */, 0 /* serverIdx */) h.comment(` -- Observe the per-stream tracked tokens on n1 after n2->n1 raft transport @@ -1322,12 +1210,7 @@ ORDER BY name ASC; -- Flow token metrics from n1 after n2->n1 raft transport stream is culled. -- Observe that we've returned the 5MiB previously held by n2. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) disableWorkerTeardown.Store(true) } @@ -1371,13 +1254,13 @@ func TestFlowControlRaftMembership(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("raft_membership") desc, err := tc.LookupRange(k) require.NoError(t, err) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) @@ -1396,7 +1279,7 @@ ORDER BY name ASC; h.comment(`-- (Adding a voting replica on n4.)`) tc.AddVotersOrFatal(t, k, tc.Target(3)) - h.waitForConnectedStreams(ctx, desc.RangeID, 4) + h.waitForConnectedStreams(ctx, desc.RangeID, 4, 0 /* serverIdx */) h.comment(` -- Observe the total tracked tokens per-stream on n1. s1-s3 should have 1MiB @@ -1421,11 +1304,11 @@ ORDER BY name ASC; h.comment(`-- (Removing voting replica from n3.)`) tc.RemoveVotersOrFatal(t, k, tc.Target(2)) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Adding non-voting replica to n5.)`) tc.AddNonVotersOrFatal(t, k, tc.Target(4)) - h.waitForConnectedStreams(ctx, desc.RangeID, 4) + h.waitForConnectedStreams(ctx, desc.RangeID, 4, 0 /* serverIdx */) h.comment(`-- (Issuing 1x1MiB, 4x replicated write (w/ one non-voter) that's not admitted.`) h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) @@ -1443,7 +1326,7 @@ ORDER BY name ASC; h.comment(`-- (Allow below-raft admission to proceed.)`) disableWorkQueueGranting.Store(false) - h.waitForAllTokensReturned(ctx, 5) + h.waitForAllTokensReturned(ctx, 5, 0 /* serverIdx */) h.comment(`-- Observe that there no tracked tokens across s1,s2,s4,s5.`) h.query(n1, ` @@ -1456,12 +1339,7 @@ ORDER BY name ASC; -- tokens deducted are returned, including from when s3 was removed as a raft -- member. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) } // TestFlowControlRaftMembershipRemoveSelf tests flow token behavior when the @@ -1524,7 +1402,7 @@ func TestFlowControlRaftMembershipRemoveSelf(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("raft_membership_remove_self") // this test behaves identically independent of we transfer the lease first @@ -1533,7 +1411,7 @@ func TestFlowControlRaftMembershipRemoveSelf(t *testing.T) { // Make sure the lease is on n1 and that we're triply connected. tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0)) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) @@ -1568,33 +1446,23 @@ ORDER BY name ASC; } return nil }) - h.waitForAllTokensReturned(ctx, 3) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) h.comment(` -- Flow token metrics from n1 after raft leader removed itself from raft group. -- All {regular,elastic} tokens deducted are returned. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- (Allow below-raft admission to proceed.)`) disableWorkQueueGranting.Store(false) - h.waitForAllTokensReturned(ctx, 3) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) h.comment(` -- Flow token metrics from n1 after work gets admitted. Tokens were already -- returned earlier, so there's no change. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) }) } @@ -1639,13 +1507,13 @@ func TestFlowControlClassPrioritization(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("class_prioritization") desc, err := tc.LookupRange(k) require.NoError(t, err) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri) @@ -1655,12 +1523,7 @@ func TestFlowControlClassPrioritization(t *testing.T) { -- that's not admitted. We see 1*1MiB*3=3MiB deductions of elastic tokens with -- no corresponding returns. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- (Issuing 1x1MiB, 3x replicated regular write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) @@ -1670,27 +1533,17 @@ ORDER BY name ASC; -- that's not admitted. We see 1*1MiB*3=3MiB deductions of {regular,elastic} -- tokens with no corresponding returns. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- (Allow below-raft admission to proceed.)`) disableWorkQueueGranting.Store(false) - h.waitForAllTokensReturned(ctx, 3) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) h.comment(` -- Flow token metrics from n1 after work gets admitted. All {regular,elastic} -- tokens deducted are returned. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) } // TestFlowControlQuiescedRange tests flow token behavior when ranges are @@ -1758,13 +1611,13 @@ func TestFlowControlQuiescedRange(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("quiesced_range") desc, err := tc.LookupRange(k) require.NoError(t, err) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri) @@ -1805,7 +1658,7 @@ ORDER BY name ASC; -- dispatch mechanism is disabled. Deducted elastic tokens from remote stores -- are yet to be returned. Tokens for the local store are. `) - h.waitForTotalTrackedTokens(ctx, desc.RangeID, 2<<20 /* 2*1MiB=2MiB */) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 2<<20 /* 2*1MiB=2MiB */, 0 /* serverIdx */) h.query(n1, ` SELECT name, crdb_internal.humanize_bytes(value::INT8) FROM crdb_internal.node_metrics @@ -1815,7 +1668,7 @@ ORDER BY name ASC; h.comment(`-- (Enable the fallback token dispatch mechanism.)`) disableFallbackTokenDispatch.Store(false) - h.waitForAllTokensReturned(ctx, 3) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) h.comment(` -- Flow token metrics from n1 after work gets admitted and all elastic tokens @@ -1904,33 +1757,15 @@ func TestFlowControlUnquiescedRange(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("unquiesced_range") desc, err := tc.LookupRange(k) require.NoError(t, err) + h.enableVerboseRaftMsgLoggingForRange(desc.RangeID) - for i := 0; i < numNodes; i++ { - si, err := tc.Server(i).GetStores().(*kvserver.Stores).GetStore(tc.Server(i).GetFirstStoreID()) - require.NoError(t, err) - tc.Servers[i].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(si.StoreID(), - &unreliableRaftHandler{ - rangeID: desc.RangeID, - IncomingRaftMessageHandler: si, - unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ - dropReq: func(req *kvserverpb.RaftMessageRequest) bool { - // Install a raft handler to get verbose raft logging. - // - // TODO(irfansharif): Make this a more ergonomic - // testing knob instead. - return false - }, - }, - }) - } - - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri) @@ -1969,7 +1804,7 @@ ORDER BY name ASC; -- dispatch mechanism is disabled. Deducted elastic tokens from remote stores -- are yet to be returned. Tokens for the local store are. `) - h.waitForTotalTrackedTokens(ctx, desc.RangeID, 2<<20 /* 2*1MiB=2MiB */) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 2<<20 /* 2*1MiB=2MiB */, 0 /* serverIdx */) h.query(n1, ` SELECT name, crdb_internal.humanize_bytes(value::INT8) FROM crdb_internal.node_metrics @@ -1984,7 +1819,7 @@ ORDER BY name ASC; testutils.SucceedsSoon(t, func() error { _, err := tc.GetRaftLeader(t, roachpb.RKey(k)).MaybeUnquiesceAndPropose() require.NoError(t, err) - return h.checkAllTokensReturned(ctx, 3) + return h.checkAllTokensReturned(ctx, 3, 0 /* serverIdx */) }) h.comment(` @@ -2039,13 +1874,13 @@ func TestFlowControlTransferLease(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("transfer_lease") desc, err := tc.LookupRange(k) require.NoError(t, err) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) @@ -2070,7 +1905,7 @@ ORDER BY name ASC; } return nil }) - h.waitForAllTokensReturned(ctx, 3) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) h.comment(` -- Flow token metrics from n1 having lost the lease and raft leadership. All @@ -2129,13 +1964,13 @@ func TestFlowControlLeaderNotLeaseholder(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) n2 := sqlutils.MakeSQLRunner(tc.ServerConn(1)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("leader_not_leaseholder") desc, err := tc.LookupRange(k) require.NoError(t, err) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`) h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri) @@ -2197,7 +2032,7 @@ ORDER BY name ASC; h.comment(`-- (Allow below-raft admission to proceed.)`) disableWorkQueueGranting.Store(false) - h.waitForAllTokensReturned(ctx, 3) // wait for admission + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) // wait for admission h.comment(` -- All deducted flow tokens are returned back to where the raft leader is. @@ -2273,13 +2108,13 @@ func TestFlowControlGranterAdmitOneByOne(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - h := newFlowControlTestHelper(t, tc) + h := newFlowControlTestHelperV1(t, tc) h.init() defer h.close("granter_admit_one_by_one") desc, err := tc.LookupRange(k) require.NoError(t, err) - h.waitForConnectedStreams(ctx, desc.RangeID, 3) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.comment(`-- (Issuing regular 1024*1KiB, 3x replicated writes that are not admitted.)`) h.log("sending put requests") @@ -2293,12 +2128,7 @@ func TestFlowControlGranterAdmitOneByOne(t *testing.T) { -- that are yet to get admitted. We see 3*1MiB=3MiB deductions of -- {regular,elastic} tokens with no corresponding returns. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) h.comment(`-- Observe the total tracked tokens per-stream on n1.`) h.query(n1, ` @@ -2308,39 +2138,57 @@ ORDER BY name ASC; h.comment(`-- (Allow below-raft admission to proceed.)`) disableWorkQueueGranting.Store(false) - h.waitForAllTokensReturned(ctx, 3) // wait for admission + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) // wait for admission h.comment(` -- Flow token metrics from n1 after work gets admitted. We see 3MiB returns of -- {regular,elastic} tokens, and the available capacities going back to what -- they were. In #105185, by now we would've observed panics. `) - h.query(n1, ` - SELECT name, crdb_internal.humanize_bytes(value::INT8) - FROM crdb_internal.node_metrics - WHERE name LIKE '%kvadmission%tokens%' -ORDER BY name ASC; -`) + h.query(n1, v1FlowTokensQueryStr) } type flowControlTestHelper struct { - t *testing.T - tc *testcluster.TestCluster - buf *strings.Builder - rng *rand.Rand + t *testing.T + tc *testcluster.TestCluster + st *cluster.Settings + buf *strings.Builder + rng *rand.Rand + testdata string + level kvflowcontrol.V2EnabledWhenLeaderLevel + isStaticLevel bool } -func newFlowControlTestHelper(t *testing.T, tc *testcluster.TestCluster) *flowControlTestHelper { +func newFlowControlTestHelper( + t *testing.T, + tc *testcluster.TestCluster, + testdata string, + level kvflowcontrol.V2EnabledWhenLeaderLevel, + isStatic bool, +) *flowControlTestHelper { rng, _ := randutil.NewPseudoRand() buf := &strings.Builder{} return &flowControlTestHelper{ - t: t, - tc: tc, - buf: buf, - rng: rng, + t: t, + tc: tc, + st: tc.Server(0).ClusterSettings(), + buf: buf, + rng: rng, + testdata: testdata, + level: level, + isStaticLevel: isStatic, } } +func newFlowControlTestHelperV1(t *testing.T, tc *testcluster.TestCluster) *flowControlTestHelper { + return newFlowControlTestHelper(t, + tc, + "flow_control_integration", /* testdata */ + kvflowcontrol.V2NotEnabledWhenLeader, + true, /* isStatic */ + ) +} + func (h *flowControlTestHelper) init() { // Reach into each server's cluster setting and override. This causes any // registered change callbacks to run immediately, which is important since @@ -2352,56 +2200,100 @@ func (h *flowControlTestHelper) init() { } } -func (h *flowControlTestHelper) waitForAllTokensReturned(ctx context.Context, expStreamCount int) { +// waitForAllTokensReturned waits for all tokens to be returned across all +// streams. The expected number of streams and protocol level is passed in as +// an argument, in order to allow switching between v1 and v2 flow control. +func (h *flowControlTestHelper) waitForAllTokensReturned( + ctx context.Context, expStreamCount, serverIdx int, lvl ...kvflowcontrol.V2EnabledWhenLeaderLevel, +) { testutils.SucceedsSoon(h.t, func() error { - return h.checkAllTokensReturned(ctx, expStreamCount) + return h.checkAllTokensReturned(ctx, expStreamCount, serverIdx, lvl...) }) } +// checkAllTokensReturned checks that all tokens have been returned across all +// streams. It also checks that the expected number of streams are present. The +// protocol level is passed in as an argument, in order to allow switching +// between v1 and v2 flow control. func (h *flowControlTestHelper) checkAllTokensReturned( - ctx context.Context, expStreamCount int, + ctx context.Context, expStreamCount, serverIdx int, lvl ...kvflowcontrol.V2EnabledWhenLeaderLevel, ) error { - kfc := h.tc.Server(0).KVFlowController().(kvflowcontrol.Controller) - streams := kfc.Inspect(ctx) + var streams []kvflowinspectpb.Stream + level := h.resolveLevelArgs(lvl...) + switch level { + case kvflowcontrol.V2NotEnabledWhenLeader: + streams = h.tc.Server(serverIdx).KVFlowController().(kvflowcontrol.Controller).Inspect(ctx) + case kvflowcontrol.V2EnabledWhenLeaderV1Encoding, kvflowcontrol.V2EnabledWhenLeaderV2Encoding: + streams = h.tc.GetFirstStoreFromServer(h.t, serverIdx).GetStoreConfig().KVFlowStreamTokenProvider.Inspect(ctx) + default: + h.t.Fatalf("unknown level: %v", level) + } + + elasticTokensPerStream := kvflowcontrol.ElasticTokensPerStream.Get(&h.st.SV) + regularTokensPerStream := kvflowcontrol.RegularTokensPerStream.Get(&h.st.SV) if len(streams) != expStreamCount { - return fmt.Errorf("expected %d replication streams, got %d", expStreamCount, len(streams)) + return fmt.Errorf("expected %d replication streams, got %d [%+v]", expStreamCount, len(streams), streams) } - for _, stream := range streams { - if stream.AvailableEvalRegularTokens != 16<<20 { - return fmt.Errorf("expected %s of regular flow tokens for %s, got %s", - humanize.IBytes(16<<20), - kvflowcontrol.Stream{ - TenantID: stream.TenantID, - StoreID: stream.StoreID, - }, - humanize.IBytes(uint64(stream.AvailableEvalRegularTokens)), + + checkTokens := func( + expTokens, actualTokens int64, + stream kvflowcontrol.Stream, + typName string, + ) error { + if actualTokens != expTokens { + return fmt.Errorf("expected %v of %s flow tokens for %v, got %v", + humanize.IBytes(uint64(expTokens)), typName, stream, + humanize.IBytes(uint64(actualTokens)), ) } - if stream.AvailableEvalElasticTokens != 8<<20 { - return fmt.Errorf("expected %s of elastic flow tokens for %s, got %s", - humanize.IBytes(8<<20), - kvflowcontrol.Stream{ - TenantID: stream.TenantID, - StoreID: stream.StoreID, - }, - humanize.IBytes(uint64(stream.AvailableEvalElasticTokens)), - ) + return nil + } + + for _, stream := range streams { + s := kvflowcontrol.Stream{ + TenantID: stream.TenantID, + StoreID: stream.StoreID, + } + if err := checkTokens( + regularTokensPerStream, stream.AvailableEvalRegularTokens, s, "regular eval", + ); err != nil { + return err + } + if err := checkTokens( + elasticTokensPerStream, stream.AvailableEvalElasticTokens, s, "elastic eval", + ); err != nil { + return err + } + if level > kvflowcontrol.V2NotEnabledWhenLeader { + // V2 flow control also has send tokens. + if err := checkTokens( + regularTokensPerStream, stream.AvailableSendRegularTokens, s, "regular send", + ); err != nil { + return err + } + if err := checkTokens( + elasticTokensPerStream, stream.AvailableSendElasticTokens, s, "elastic send", + ); err != nil { + return err + } } } return nil } func (h *flowControlTestHelper) waitForConnectedStreams( - ctx context.Context, rangeID roachpb.RangeID, expConnectedStreams int, + ctx context.Context, + rangeID roachpb.RangeID, + expConnectedStreams, serverIdx int, + lvl ...kvflowcontrol.V2EnabledWhenLeaderLevel, ) { + level := h.resolveLevelArgs(lvl...) testutils.SucceedsSoon(h.t, func() error { - kfh := h.tc.Server(0).KVFlowHandles().(kvflowcontrol.Handles) - handle, found := kfh.Lookup(rangeID) + state, found := h.getInspectHandlesForLevel(serverIdx, level).LookupInspect(rangeID) if !found { return fmt.Errorf("handle for %s not found", rangeID) } require.True(h.t, found) - state := handle.Inspect(ctx) if len(state.ConnectedStreams) != expConnectedStreams { return fmt.Errorf("expected %d connected streams, got %d", expConnectedStreams, len(state.ConnectedStreams)) @@ -2411,16 +2303,19 @@ func (h *flowControlTestHelper) waitForConnectedStreams( } func (h *flowControlTestHelper) waitForTotalTrackedTokens( - ctx context.Context, rangeID roachpb.RangeID, expTotalTrackedTokens int64, + ctx context.Context, + rangeID roachpb.RangeID, + expTotalTrackedTokens int64, + serverIdx int, + lvl ...kvflowcontrol.V2EnabledWhenLeaderLevel, ) { + level := h.resolveLevelArgs(lvl...) testutils.SucceedsSoon(h.t, func() error { - kfh := h.tc.Server(0).KVFlowHandles().(kvflowcontrol.Handles) - handle, found := kfh.Lookup(rangeID) + state, found := h.getInspectHandlesForLevel(serverIdx, level).LookupInspect(rangeID) if !found { return fmt.Errorf("handle for %s not found", rangeID) } require.True(h.t, found) - state := handle.Inspect(ctx) var totalTracked int64 for _, stream := range state.ConnectedStreams { for _, tracked := range stream.TrackedDeductions { @@ -2451,7 +2346,49 @@ func (h *flowControlTestHelper) log(msg string) { } } +// resolveLevelArgs resolves the level to use for the test. If the level is +// static, the level is returned as is. If the level is dynamic, the level is +// resolved via arguments if provided, otherwise the default given at +// construction is used. The function verifies that no more than one level is +// provided. +func (h *flowControlTestHelper) resolveLevelArgs( + level ...kvflowcontrol.V2EnabledWhenLeaderLevel, +) kvflowcontrol.V2EnabledWhenLeaderLevel { + if h.isStaticLevel { + // The level is static and should not change during the test via arguments. + require.Len(h.t, level, 0) + return h.level + } + // The level is dynamic and should be resolved via arguments if provided, + // otherwise the default given at construction is used. Verify that no more + // than one level is provided. + require.Less(h.t, len(level), 2) + if len(level) == 0 { + return h.level + } + return level[0] +} + +// v1FlowTokensQueryStr is the query string to fetch flow tokens metrics from +// the node metrics table. It fetches all flow token metrics available in v1. +const v1FlowTokensQueryStr = ` + SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvadmission%tokens%' +ORDER BY name ASC; +` + +// query runs the given SQL query against the given SQLRunner, and appends the +// output to the testdata file buffer. func (h *flowControlTestHelper) query(runner *sqlutils.SQLRunner, sql string, headers ...string) { + // NB: We update metric gauges here to ensure that periodically updated + // metrics (via the node metrics loop) are up-to-date. + for _, server := range h.tc.Servers { + require.NoError(h.t, server.GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error { + s.GetStoreConfig().KVFlowStreamTokenProvider.UpdateMetricGauges() + return nil + })) + } sql = strings.TrimSpace(sql) h.log(sql) h.buf.WriteString(fmt.Sprintf("%s\n\n", sql)) @@ -2468,22 +2405,64 @@ func (h *flowControlTestHelper) query(runner *sqlutils.SQLRunner, sql string, he tbl.Render() } +// put issues a put request for the given key at the priority specified, +// against the first server in the cluster. func (h *flowControlTestHelper) put( - ctx context.Context, key roachpb.Key, size int, pri admissionpb.WorkPriority, -) *kvpb.BatchRequest { - value := roachpb.MakeValueFromString(randutil.RandString(h.rng, size, randutil.PrintableKeyAlphabet)) - ba := &kvpb.BatchRequest{} - ba.Add(kvpb.NewPut(key, value)) - ba.AdmissionHeader.Priority = int32(pri) - ba.AdmissionHeader.Source = kvpb.AdmissionHeader_FROM_SQL - if _, pErr := h.tc.Server(0).DB().NonTransactionalSender().Send( - ctx, ba, - ); pErr != nil { - h.t.Fatal(pErr.GoError()) + ctx context.Context, key roachpb.Key, size int, pri admissionpb.WorkPriority, serverIdxs ...int, +) { + if len(serverIdxs) == 0 { + // Default to the first server if none are given. + serverIdxs = []int{0} + } + for _, serverIdx := range serverIdxs { + value := roachpb.MakeValueFromString(randutil.RandString(h.rng, size, randutil.PrintableKeyAlphabet)) + ba := &kvpb.BatchRequest{} + ba.Add(kvpb.NewPut(key, value)) + ba.AdmissionHeader.Priority = int32(pri) + ba.AdmissionHeader.Source = kvpb.AdmissionHeader_FROM_SQL + if _, pErr := h.tc.Server(serverIdx).DB().NonTransactionalSender().Send( + ctx, ba, + ); pErr != nil { + h.t.Fatal(pErr.GoError()) + } } - return ba } +// close writes the buffer to a file in the testdata directory and compares it +// against the expected output. func (h *flowControlTestHelper) close(filename string) { - echotest.Require(h.t, h.buf.String(), datapathutils.TestDataPath(h.t, "flow_control_integration", filename)) + echotest.Require(h.t, h.buf.String(), datapathutils.TestDataPath(h.t, h.testdata, filename)) +} + +func (h *flowControlTestHelper) getInspectHandlesForLevel( + serverIdx int, level kvflowcontrol.V2EnabledWhenLeaderLevel, +) kvflowcontrol.InspectHandles { + switch level { + case kvflowcontrol.V2NotEnabledWhenLeader: + return h.tc.Server(serverIdx).KVFlowHandles().(kvflowcontrol.Handles) + case kvflowcontrol.V2EnabledWhenLeaderV1Encoding, kvflowcontrol.V2EnabledWhenLeaderV2Encoding: + return kvserver.MakeStoresForRACv2(h.tc.Server(serverIdx).GetStores().(*kvserver.Stores)) + default: + h.t.Fatalf("unknown level: %v", level) + } + panic("unreachable") +} + +// enableVerboseRaftMsgLoggingForRange installs a raft handler on each node, +// which in turn enables verbose message logging. +func (h *flowControlTestHelper) enableVerboseRaftMsgLoggingForRange(rangeID roachpb.RangeID) { + for i := 0; i < len(h.tc.Servers); i++ { + si, err := h.tc.Server(i).GetStores().(*kvserver.Stores).GetStore(h.tc.Server(i).GetFirstStoreID()) + require.NoError(h.t, err) + h.tc.Servers[i].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(si.StoreID(), + &unreliableRaftHandler{ + rangeID: rangeID, + IncomingRaftMessageHandler: si, + unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ + dropReq: func(req *kvserverpb.RaftMessageRequest) bool { + return false + }, + }, + }) + } }