Skip to content

Commit

Permalink
kvserver: [dnm] add TestFlowControlSendQueueRangeMigrate test
Browse files Browse the repository at this point in the history
Add a new RACv2 integration test, `TestFlowControlSendQueueRangeMigrate`.

This test fails, but not by timing out (it will with the below args due
to the timeout flag specified), instead s1 and s3 on the scratch range
appear to ping pong messages back and forth like:

```
-- Test timed out at 2024-11-16 00:39:36 UTC --
I241116 00:39:37.221610 4048 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n3] 725   [raft] r69 Raft message 1->3 MsgApp Term:6 Log:6/25 Commit:27
I241116 00:39:37.222264 4068 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n1] 726   [raft] r69 Raft message 3->1 MsgAppResp Term:6 Log:0/25 Commit:25
I241116 00:39:38.221271 4048 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n3] 727   [raft] r69 Raft message 1->3 MsgApp Term:6 Log:6/25 Commit:27
I241116 00:39:38.221876 4068 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n1] 728   [raft] r69 Raft message 3->1 MsgAppResp Term:6 Log:0/25 Commit:25
I241116 00:39:38.971603 4048 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n3] 729   [raft] r69 Raft message 1->3 MsgApp Term:6 Log:6/25 Commit:27
I241116 00:39:38.972018 4068 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n1] 730   [raft] r69 Raft message 3->1 MsgAppResp Term:6 Log:0/25 Commit:25
```

Run with the following for more logging:

```bash
dev test pkg/kv/kvserver -v --vmodule='replica_raft=1,kvflowcontroller=2,replica_proposal_buf=1,raft_transport=2,kvflowdispatch=1,kvadmission=1,kvflowhandle=1,work_queue=1,replica_flow_control=1,tracker=1,client_raft_helpers_test=1,raft=1,admission=1,replica_flow_control=1,work_queue=1,replica_raft=1,replica_proposal_buf=1,raft_transport=2,kvadmission=1,work_queue=1,replica_flow_control=1,client_raft_helpers_test=1,range_controller=2,token_counter=2,token_tracker=2,processor=2,kvflowhandle=1' -f  TestFlowControlSendQueueRangeMigrate --show-logs --timeout=60s
```

Epic: none
Release note: None
  • Loading branch information
kvoli committed Nov 16, 2024
1 parent 6345aec commit 792b620
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 1 deletion.
152 changes: 151 additions & 1 deletion pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
Expand Down Expand Up @@ -5160,7 +5161,7 @@ func TestFlowControlSendQueueRangeMerge(t *testing.T) {
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Merging RHS back into LHS)`)
// XXX: FAILS HERE ON THE MERGE.
// XXX: FAILS HERE ON THE MERGE.
merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey())
h.comment(`-- (Merged RHS back into LHS)`)

Expand All @@ -5179,6 +5180,155 @@ func TestFlowControlSendQueueRangeMerge(t *testing.T) {
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

func TestFlowControlSendQueueRangeMigrate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)

disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_range_migrate")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)

// Block admission on n3, while allowing every other node to admit.
setTokenReturnEnabled(true /* enabled */, 0, 1)
setTokenReturnEnabled(false /* enabled */, 2)
// Drain the tokens to n3 by blocking admission and issuing the buffer
// size of writes to the range.
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Issuing MigrateRequest to range)`)
// MigrateRequest comment:
// MigrateRequest is used instruct all ranges overlapping with it to exercise
// any relevant (below-raft) migrations in order for its range state to conform
// to what's needed by the specified version. It's a core primitive used in our
// migrations infrastructure to phase out legacy code below raft.
//
// KV waits for this command to durably apply on all replicas before returning,
// guaranteeing to the caller that all pre-migration state has been completely
// purged from the system.

h.comment(`-- (Issuing MigrateRequest to range)`)
req := &kvpb.MigrateRequest{
RequestHeader: kvpb.RequestHeader{
Key: desc.StartKey.AsRawKey(),
EndKey: desc.EndKey.AsRawKey(),
},
// Illogical, however irrelevant because the req times out waiting for all
// replicas to be caught up.
Version: clusterversion.PreviousRelease.Version(),
}
db := tc.Servers[0].DB()
// XXX: TIMES OUT HERE.
if _, pErr := kv.SendWrappedWith(ctx, db.GetFactory().NonTransactionalSender(), kvpb.Header{RangeID: desc.RangeID}, req); pErr != nil {
t.Fatal(pErr)
}
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.comment(`(Sending 1 MiB put request to the migrated range)`)
h.comment(`(Sent 1 MiB put request to the migrated range)`)

// Allow admission to proceed on n3 and wait for all tokens to be returned.
h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.comment(`
-- Send queue and flow token metrics from n1. All tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
echo
----

0 comments on commit 792b620

Please sign in to comment.