Skip to content

Commit

Permalink
kvserver: add v1->v2 flow control integration test
Browse files Browse the repository at this point in the history
Add `TestFlowControlV1ToV2Transition`, which ratchets up the enabled
version of replication flow control v2:

```
v1 protocol with v1 encoding =>
v2 protocol with v1 encoding =>
v2 protocol with v2 encoding
```

The test is structured to issue writes and wait for returned tokens
whenever the protocol transitions from v1 to v2, or a leader changes.

More specifically, the test takes the following steps:

```
(1) Start n1, n2, n3 with v1 protocol and v1 encoding.
(2) Upgrade n1 to v2 protocol with v1 encoding.
(3) Transfer the range lease to n2.
(4) Upgrade n2,n3 to v2 protocol with v1 encoding.
(5) Upgrade n1 to v2 protocol with v2 encoding.
(6) Transfer the range lease to n3.
(7) Upgrade n2,n3 to v2 protocol with v2 encoding.
```

Resolves: cockroachdb#130431
Release note: None
  • Loading branch information
kvoli committed Sep 20, 2024
1 parent c17a96c commit d36767f
Show file tree
Hide file tree
Showing 2 changed files with 876 additions and 0 deletions.
365 changes: 365 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3807,6 +3807,371 @@ func TestFlowControlGranterAdmitOneByOneV2(t *testing.T) {
})
}

// TestFlowControlV1ToV2Transition exercises the transition from replication
// flow control:
//
// - v1 protocol with v1 encoding =>
// - v2 protocol with v1 encoding =>
// - v2 protocol with v2 encoding
//
// The test is structured as follows:
//
// (1) Start n1, n2, n3 with v1 protocol and v1 encoding.
// (2) Upgrade n1 to v2 protocol with v1 encoding.
// (3) Transfer the range lease to n2.
// (4) Upgrade n2,n3 to v2 protocol with v1 encoding.
// (5) Upgrade n1 to v2 protocol with v2 encoding.
// (6) Transfer the range lease to n3.
// (7) Upgrade n2,n3 to v2 protocol with v2 encoding.
//
// Between each step, we issue writes and observe the flow control metrics.
func TestFlowControlV1ToV2Transition(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
var disableWorkQueueGranting atomic.Bool
disableWorkQueueGranting.Store(true)
serverLevels := make([]atomic.Uint32, numNodes)
settings := cluster.MakeTestingClusterSettings()

argsPerServer := make(map[int]base.TestServerArgs)
for i := range serverLevels {
// Every node starts off using the v1 protocol but we will ratchet up the
// levels on servers at different times as we go to test the transition.
serverLevels[i].Store(kvflowcontrol.V2NotEnabledWhenLeader)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
RaftConfig: base.RaftConfig{
// Suppress timeout-based elections. This test doesn't want to deal
// with leadership changing hands unintentionally.
RaftElectionTimeoutTicks: 1000000,
},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideV2EnabledWhenLeaderLevel: func() kvflowcontrol.V2EnabledWhenLeaderLevel {
return kvflowcontrol.V2EnabledWhenLeaderLevel(serverLevels[i].Load())
},
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
return disableWorkQueueGranting.Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)
// We use the base constructor here because we will be modifying the enabled
// level throughout.
h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2NotEnabledWhenLeader, false, /* isStatic */
)

h.init()
defer h.close("v1_to_v2_transition")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
n2 := sqlutils.MakeSQLRunner(tc.ServerConn(1))

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.comment(`
-- This test exercises the transition from replication flow control:
-- - v1 protocol with v1 encoding =>
-- - v2 protocol with v1 encoding =>
-- - v2 protocol with v2 encoding
-- The test is structured as follows:
-- (1) Start n1, n2, n3 with v1 protocol and v1 encoding.
-- (2) Upgrade n1 to v2 protocol with v1 encoding.
-- (3) Transfer the range lease to n2.
-- (4) Upgrade n2,n3 to v2 protocol with v1 encoding.
-- (5) Upgrade n1 to v2 protocol with v2 encoding.
-- (6) Transfer the range lease to n3.
-- (7) Upgrade n2,n3 to v2 protocol with v2 encoding.
-- Between each step, we issue writes and observe the flow control metrics.
--
-- Start by checking that the leader (n1) has 3 connected v1 streams.
`)
h.query(n1, `
SELECT range_id, count(*) AS streams
FROM crdb_internal.kv_flow_control_handles
GROUP BY (range_id)
ORDER BY streams DESC;
`, "range_id", "stream_count")

h.comment(`-- (Issuing 1x1MiB regular, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri)

h.comment(`-- The v1 flow token metrics, there should be 3x1 MiB = 3 MiB of tokens deducted.`)
h.query(n1, v1FlowTokensQueryStr)
h.comment(`-- The v2 flow token metrics, there should be no tokens or deductions.`)
h.query(n1, v2FlowTokensQueryStr)

h.comment(`
-- The v1 tracked tokens per-stream on n1 should be 1 MiB for (s1,s2,s3).
`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles
`, "range_id", "store_id", "total_tracked_tokens")

h.comment(`-- (Upgrading n1 to v2 protocol with v1 encoding.)`)
serverLevels[0].Store(kvflowcontrol.V2EnabledWhenLeaderV1Encoding)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */, kvflowcontrol.V2NotEnabledWhenLeader)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */, kvflowcontrol.V2EnabledWhenLeaderV1Encoding)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */, kvflowcontrol.V2EnabledWhenLeaderV1Encoding)

h.comment(`
-- Viewing the range's v2 connected streams, there now should be three.
-- These are lazily instantiated on the first raft event the leader
-- RangeController sees.
`)
h.query(n1, `
SELECT range_id, count(*) AS streams
FROM crdb_internal.kv_flow_control_handles_v2
GROUP BY (range_id)
ORDER BY streams DESC;
`, "range_id", "stream_count")

h.comment(`
-- The v1 flow token metrics, all deducted tokens should be returned after
-- the leader switches to the rac2 protocol.
`)
h.query(n1, v1FlowTokensQueryStr)

h.comment(`-- (Issuing 1x2MiB regular, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 2<<20 /* 2MiB */, admissionpb.NormalPri)

h.comment(`
-- The v2 flow token metrics, the 3 MiB of earlier token deductions from v1 are dropped.
-- Expect 3 * 2 MiB = 6 MiB of deductions, from the most recent write.
-- Note that the v2 protocol with v1 encoding will only ever deduct elastic tokens.
`)
h.query(n1, v2FlowTokensQueryStr)

h.comment(`
-- The v2 tracked tokens per-stream on n1 should now also be 2 MiB for (s1,s2,s3).
`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

h.comment(`-- (Allow below-raft admission to proceed.)`)
disableWorkQueueGranting.Store(false)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */, kvflowcontrol.V2EnabledWhenLeaderV1Encoding)
h.comment(`-- The v2 flow token metrics. The 6 MiB of tokens should be returned.`)
h.query(n1, v2FlowTokensQueryStr)

h.comment(`-- (Block below-raft admission again.)`)
disableWorkQueueGranting.Store(true)

h.comment(`-- (Issuing 1 x 1MiB regular, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri)

h.comment(`
-- The v2 tracked tokens per-stream on n1 reflect the most recent write
-- and should be 1 MiB per stream now.
`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

h.comment(`
-- There should also be a corresponding elastic token deduction (not regular),
-- as v2 protocol with v1 encoding will only ever deduct elastic tokens.
`)
h.query(n1, v2FlowTokensQueryStr)

h.comment(`
-- (Transferring range lease to n2 (running v1) and allowing leadership to follow.)
`)
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))
testutils.SucceedsSoon(t, func() error {
if leader := tc.GetRaftLeader(t, roachpb.RKey(k)); leader.NodeID() != tc.Target(1).NodeID {
return errors.Errorf("expected raft leadership to transfer to n2, found n%d", leader.NodeID())
}
return nil
})
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */, kvflowcontrol.V2EnabledWhenLeaderV1Encoding)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 1 /* serverIdx */)

h.comment(`
-- The v2 flow token metrics from n1 having lost the lease and raft leadership.
-- All deducted tokens are returned.
`)
h.query(n1, v2FlowTokensQueryStr)

h.comment(`
-- Now expect to see 3 connected v1 streams on n2.
`)
h.query(n2, `
SELECT range_id, count(*) AS streams
FROM crdb_internal.kv_flow_control_handles
GROUP BY (range_id)
ORDER BY streams DESC;
`, "range_id", "stream_count")

h.comment(`-- (Issuing 1 x 3MiB regular, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 3<<20 /* 3MiB */, admissionpb.NormalPri, 1 /* serverIdx */)

h.comment(`
-- The v1 tracked tokens per-stream on n2 should be 3 MiB.
`)
h.query(n2, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles
`, "range_id", "store_id", "total_tracked_tokens")

h.comment(`
-- Corresponding v1 token metrics on the new leader n2.
-- These should reflect the 3 x 3 MiB = 9 MiB write.
`)
h.query(n2, v1FlowTokensQueryStr)
h.comment(`
-- Corresponding v2 token metrics on the new leader n2.
-- These should be unpopulated, similar to when n1 was first the leader.
`)
h.query(n2, v2FlowTokensQueryStr)

h.comment(`-- (Upgrading n2,n3 to v2 protocol with v1 encoding.)`)
serverLevels[1].Store(kvflowcontrol.V2EnabledWhenLeaderV1Encoding)
serverLevels[2].Store(kvflowcontrol.V2EnabledWhenLeaderV1Encoding)
h.waitForAllTokensReturned(ctx, 3, 1 /* serverIdx */)

h.comment(`-- (Issuing another 1x1MiB regular, 3x replicated write that's not admitted.)`)
// We specify the serverIdx to ensure that the write is routed to n2 and not
// n1. If the write were routed to n1, it would skip flow control because
// there isn't a handle (leader isn't there) and instead block indefinitely
// on the store work queue.
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri, 1 /* serverIdx */)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 1 /* serverIdx */, kvflowcontrol.V2EnabledWhenLeaderV1Encoding)

h.comment(`
-- Corresponding v1 token metrics on the new leader n2.
-- All tokens should be returned.
`)
h.query(n2, v1FlowTokensQueryStr)

h.comment(`
-- Corresponding v2 token metrics on the new leader n2. The most recent
-- 3 x 1 MiB = 3 MiB write should be reflected in the token deductions.
-- Recall that v2 protocol with v1 encoding will only ever deduct elastic tokens.
`)
h.query(n2, v2FlowTokensQueryStr)

h.comment(`-- (Allow below-raft admission to proceed.)`)
disableWorkQueueGranting.Store(false)
h.waitForAllTokensReturned(ctx, 3, 1 /* serverIdx */, kvflowcontrol.V2EnabledWhenLeaderV1Encoding)

h.comment(`
-- The v2 flow token metrics on n2.
-- The 3 MiB of elastic tokens should be returned.
`)
h.query(n2, v2FlowTokensQueryStr)

h.comment(`-- (Block below-raft admission.)`)
disableWorkQueueGranting.Store(true)

h.comment(`-- (Issuing 1x1MiB regular, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri, 1 /* serverIdx */)

h.comment(`
-- The v2 tracked tokens per-stream on n2 should be 1 MiB.
`)
h.query(n2, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

h.comment(`-- (Upgrading n1 to use V2 encoding and protocol.)`)
serverLevels[0].Store(kvflowcontrol.V2EnabledWhenLeaderV2Encoding)

h.comment(`-- (Transferring range lease back to n1.)`)
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0))
testutils.SucceedsSoon(t, func() error {
if leader := tc.GetRaftLeader(t, roachpb.RKey(k)); leader.NodeID() != tc.Target(0).NodeID {
return errors.Errorf("expected raft leadership to transfer to n1, found n%d", leader.NodeID())
}
return nil
})
h.waitForAllTokensReturned(ctx, 3, 1 /* serverIdx */, kvflowcontrol.V2EnabledWhenLeaderV1Encoding)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */, kvflowcontrol.V2EnabledWhenLeaderV2Encoding)

h.comment(`
-- There should no longer be any tracked tokens on n2, as it's no longer the
-- leader.
`)
h.query(n2, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

h.comment(`
-- Corresponding v2 token metrics on n2. All tokens should be returned.
`)
h.query(n2, v2FlowTokensQueryStr)

h.comment(`
-- Viewing n1's v2 connected streams, there now should be three, as n1 acquired
-- the leadership and lease.
`)
h.query(n1, `
SELECT range_id, count(*) AS streams
FROM crdb_internal.kv_flow_control_handles_v2
GROUP BY (range_id)
ORDER BY streams DESC;
`, "range_id", "stream_count")

h.comment(`-- (Issuing 1x1MiB regular, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri)

h.comment(`
-- The v2 tracked tokens per-stream on n1.
`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

h.comment(`-- (Upgrading n2,n3 to use V2 encoding and protocol.)`)
serverLevels[1].Store(kvflowcontrol.V2EnabledWhenLeaderV2Encoding)
serverLevels[2].Store(kvflowcontrol.V2EnabledWhenLeaderV2Encoding)

h.comment(`-- (Allow below-raft admission to proceed.)`)
disableWorkQueueGranting.Store(false)

// Ensure that there are no outstanding tokens in either protocol after
// allowing admission one last time.
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */, kvflowcontrol.V2NotEnabledWhenLeader)
h.waitForAllTokensReturned(ctx, 3, 1 /* serverIdx */, kvflowcontrol.V2NotEnabledWhenLeader)
h.waitForAllTokensReturned(ctx, 0, 2 /* serverIdx */, kvflowcontrol.V2NotEnabledWhenLeader)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */, kvflowcontrol.V2EnabledWhenLeaderV2Encoding)
h.waitForAllTokensReturned(ctx, 3, 1 /* serverIdx */, kvflowcontrol.V2EnabledWhenLeaderV2Encoding)
h.waitForAllTokensReturned(ctx, 0, 2 /* serverIdx */, kvflowcontrol.V2EnabledWhenLeaderV2Encoding)

h.comment(`-- The v2 flow token metrics on n1.`)
h.query(n2, v2FlowTokensQueryStr)
}

type flowControlTestHelper struct {
t *testing.T
tc *testcluster.TestCluster
Expand Down
Loading

0 comments on commit d36767f

Please sign in to comment.