Skip to content

Commit

Permalink
kvserver: [dnm] add TestFlowControlSendQueueRangeMigrate test
Browse files Browse the repository at this point in the history
Add a new RACv2 integration test, `TestFlowControlSendQueueRangeMigrate`.

This test fails, but not by timing out (it will with the below args due
to the timeout flag specified), instead s1 and s3 on the scratch range
appear to ping pong messages back and forth like:

```
-- Test timed out at 2024-11-16 00:39:36 UTC --
I241116 00:39:37.221610 4048 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n3] 725   [raft] r69 Raft message 1->3 MsgApp Term:6 Log:6/25 Commit:27
I241116 00:39:37.222264 4068 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n1] 726   [raft] r69 Raft message 3->1 MsgAppResp Term:6 Log:0/25 Commit:25
I241116 00:39:38.221271 4048 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n3] 727   [raft] r69 Raft message 1->3 MsgApp Term:6 Log:6/25 Commit:27
I241116 00:39:38.221876 4068 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n1] 728   [raft] r69 Raft message 3->1 MsgAppResp Term:6 Log:0/25 Commit:25
I241116 00:39:38.971603 4048 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n3] 729   [raft] r69 Raft message 1->3 MsgApp Term:6 Log:6/25 Commit:27
I241116 00:39:38.972018 4068 kv/kvserver_test/client_raft_helpers_test.go:104  [T1,Vsystem,n1] 730   [raft] r69 Raft message 3->1 MsgAppResp Term:6 Log:0/25 Commit:25
```

Run with the following for more logging:

```bash
dev test pkg/kv/kvserver -v --vmodule='replica_raft=1,kvflowcontroller=2,replica_proposal_buf=1,raft_transport=2,kvflowdispatch=1,kvadmission=1,kvflowhandle=1,work_queue=1,replica_flow_control=1,tracker=1,client_raft_helpers_test=1,raft=1,admission=1,replica_flow_control=1,work_queue=1,replica_raft=1,replica_proposal_buf=1,raft_transport=2,kvadmission=1,work_queue=1,replica_flow_control=1,client_raft_helpers_test=1,range_controller=2,token_counter=2,token_tracker=2,processor=2,kvflowhandle=1' -f  TestFlowControlSendQueueRangeMigrate --show-logs --timeout=60s
```

Epic: none
Release note: None
  • Loading branch information
kvoli committed Nov 16, 2024
1 parent 6345aec commit dd5456f
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 1 deletion.
202 changes: 201 additions & 1 deletion pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -5160,7 +5163,7 @@ func TestFlowControlSendQueueRangeMerge(t *testing.T) {
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Merging RHS back into LHS)`)
// XXX: FAILS HERE ON THE MERGE.
// XXX: FAILS HERE ON THE MERGE.
merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey())
h.comment(`-- (Merged RHS back into LHS)`)

Expand All @@ -5179,6 +5182,203 @@ func TestFlowControlSendQueueRangeMerge(t *testing.T) {
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

func TestFlowControlSendQueueRangeMigrate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
// We're going to be transitioning from startV to endV. Think a cluster of
// binaries running vX, but with active version vX-1.
startV := clusterversion.PreviousRelease.Version()
endV := clusterversion.Latest.Version()
settings := cluster.MakeTestingClusterSettingsWithVersions(endV, startV, false)
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)
disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClusterVersionOverride: startV,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
OverrideV2EnabledWhenLeaderLevel: func() kvflowcontrol.V2EnabledWhenLeaderLevel {
return kvflowcontrol.V2EnabledWhenLeaderV2Encoding
},
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_range_migrate")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)

store := tc.GetFirstStoreFromServer(t, 0)
assertVersion := func(expV roachpb.Version) error {
repl, err := store.GetReplica(desc.RangeID)
if err != nil {
t.Fatal(err)
}
if gotV := repl.Version(); gotV != expV {
return errors.Errorf("expected in-memory version %s, got %s", expV, gotV)
}

sl := stateloader.Make(desc.RangeID)
persistedV, err := sl.LoadVersion(ctx, store.TODOEngine())
if err != nil {
return err
}
if persistedV != expV {
return errors.Errorf("expected persisted version %s, got %s", expV, persistedV)
}
return nil
}

assertVersion(startV)

migrated := false
unregister := batcheval.TestingRegisterMigrationInterceptor(endV, func() {
migrated = true
})
defer unregister()

// Block admission on n3, while allowing every other node to admit.
setTokenReturnEnabled(true /* enabled */, 0, 1)
setTokenReturnEnabled(false /* enabled */, 2)
// Drain the tokens to n3 by blocking admission and issuing the buffer
// size of writes to the range.
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Issuing MigrateRequest to range)`)
// MigrateRequest comment:
// MigrateRequest is used instruct all ranges overlapping with it to exercise
// any relevant (below-raft) migrations in order for its range state to conform
// to what's needed by the specified version. It's a core primitive used in our
// migrations infrastructure to phase out legacy code below raft.
//
// KV waits for this command to durably apply on all replicas before returning,
// guaranteeing to the caller that all pre-migration state has been completely
// purged from the system.

h.comment(`-- (Issuing MigrateRequest to range)`)
req := &kvpb.MigrateRequest{
RequestHeader: kvpb.RequestHeader{
Key: desc.StartKey.AsRawKey(),
EndKey: desc.EndKey.AsRawKey(),
},
// Illogical, however irrelevant because the req times out waiting for all
// replicas to be caught up.
Version: endV,
}
db := tc.Servers[0].DB()
// XXX: FAILS HERE, NEVER RETURNS.
// testutils.SucceedsSoon(t, func() error {
require.NoError(t, func() error {
if _, pErr := kv.SendWrappedWith(ctx, db.GetFactory().NonTransactionalSender(), kvpb.Header{RangeID: desc.RangeID}, req); pErr != nil {
return pErr.GoError()
}
if !migrated {
return errors.Errorf("expected migration interceptor to have been called")
}
return assertVersion(endV)
}())
// })

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.comment(`(Sending 1 MiB put request to the migrated range)`)
h.comment(`(Sent 1 MiB put request to the migrated range)`)

// Allow admission to proceed on n3 and wait for all tokens to be returned.
h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.comment(`
-- Send queue and flow token metrics from n1. All tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
echo
----

0 comments on commit dd5456f

Please sign in to comment.