Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add TestFlowControlSendQueueRangeRelocate test #135570

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 281 additions & 14 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4536,13 +4536,6 @@ func TestFlowControlSendQueue(t *testing.T) {
}
}

mkStream := func(serverIdx int) kvflowcontrol.Stream {
return kvflowcontrol.Stream{
StoreID: roachpb.StoreID(serverIdx + 1),
TenantID: roachpb.SystemTenantID,
}
}

settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
Expand Down Expand Up @@ -4703,7 +4696,7 @@ func TestFlowControlSendQueue(t *testing.T) {
setTokenReturnEnabled(true /* enabled */, 0, 1)
// Wait for token return on n1, n2. We should only be tracking the tokens for
// n3 now.
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4MiB */, 0 /* serverIdx */)
h.comment(`-- Observe the total tracked tokens per-stream on n1.`)
h.query(n1, `
Expand All @@ -4723,7 +4716,7 @@ func TestFlowControlSendQueue(t *testing.T) {
// NB: The write won't be tracked because the quorum [n1,n2] have tokens for
// eval.
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 5<<20 /* 5 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* 1MiB expSize */, 0 /* serverIdx */)
h.comment(`
-- The send queue metrics from n1 should reflect the 1 MiB write being queued
Expand All @@ -4739,7 +4732,7 @@ func TestFlowControlSendQueue(t *testing.T) {
h.waitForConnectedStreams(ctx, desc.RangeID, 2, 0 /* serverIdx */)
// There should also be 5 MiB of tracked tokens for n1->n3, 4 + 1 MiB.
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 5<<20 /* 5 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize */, 0 /* serverIdx */)
h.comment(`
-- Flow token metrics from n1, the disconnect should be reflected in the metrics.`)
Expand Down Expand Up @@ -4767,7 +4760,7 @@ func TestFlowControlSendQueue(t *testing.T) {
h.comment(`-- (Disabling wait-for-eval bypass.)`)
noopWaitForEval.Store(false)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 6<<20 /* 6 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1))

h.comment(`
-- Send queue metrics from n1, n3's should not be allowed to form a send queue.`)
Expand Down Expand Up @@ -4827,7 +4820,7 @@ ORDER BY streams DESC;
// admission is allowed. While n4,n5 will continue to track as they are
// blocked from admitting.
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 8<<20 /* 8 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1), mkStream(2))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1), testingMkFlowStream(2))
h.comment(`
-- From n1. We should expect to see the unblocked streams quickly
-- untrack as admission is allowed (so not observed here), while n4,n5 will continue
Expand All @@ -4845,7 +4838,7 @@ ORDER BY streams DESC;
// quickly admits and untracks. While n4,n5 queue the write, not sending the
// msg, deducting and tracking the entry tokens.
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 8<<20 /* 8 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1), mkStream(2))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1), testingMkFlowStream(2))
h.comment(`
-- Send queue and flow token metrics from n1. The 1 MiB write should be queued
-- for n4,n5, while the quorum (n1,n2,n3) proceeds.
Expand Down Expand Up @@ -4891,7 +4884,7 @@ ORDER BY streams DESC;
// Expect 4 x 4 MiB tracked tokens for the 4 MiB write = 16 MiB.
// Expect 2 x 1 MiB tracked tokens for the 1 MiB write = 2 MiB.
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 18<<20 /* 18MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0))
h.comment(`
-- Observe the total tracked tokens per-stream on n1. We should expect to see the
-- 1 MiB write being tracked across a quorum of streams, while the 4 MiB write
Expand Down Expand Up @@ -5150,6 +5143,280 @@ func TestFlowControlSendQueueManyInflight(t *testing.T) {
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

func testingMkFlowStream(serverIdx int) kvflowcontrol.Stream {
return kvflowcontrol.Stream{
StoreID: roachpb.StoreID(serverIdx + 1),
TenantID: roachpb.SystemTenantID,
}
}

// TestFlowControlSendQueueRangeRelocate exercises the send queue formation,
// prevention and flushing via selective (logical) admission of entries and token
// return. It also exercises the behavior of the send queue when a range is
// relocated. See the initial comment for an overview of the test structure.
func TestFlowControlSendQueueRangeRelocate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// We use three relocate variations (*=lh,^=send_queue):
// - [n1*,n2 ,n3^,n4 ,n5] -> [n2 ,n3^,n4 ,n5 ,n6*] (transfer_lease)
// - The leader and leaseholder is relocated.
// - [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n4 ,n5 ,n6 ]
// - The replica with a send queue is relocated.
// - [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n3^,n4 ,n6 ]
// - The replica without a send queue is relocated.
testutils.RunValues(t, "from", []int{0, 2, 4}, func(t *testing.T, fromIdx int) {
testutils.RunTrueAndFalse(t, "transfer_lease", func(t *testing.T, transferLease bool) {
const numNodes = 6
// The transferLease arg indicates whether the AdminRelocateRange request
// will also transfer the lease to the voter which is in the first
// position of the target list. We always place n6 in the first position
// and pass in the transferLease arg to AdminRelocateRange.
fromNode := roachpb.NodeID(fromIdx + 1)
toNode := roachpb.NodeID(numNodes)
fromServerIdxs := []int{0, 1, 2, 3, 4}
toServerIdxs := []int{numNodes - 1}
for i := 0; i < numNodes-1; i++ {
if i != fromIdx {
toServerIdxs = append(toServerIdxs, i)
}
}
var fromString string
if fromIdx == 0 {
fromString = "leader_store"
} else if fromIdx == 2 {
fromString = "send_queue_store"
} else {
fromString = "has_token_store"
}
if transferLease {
fromString += "_transfer_lease"
}
// If n1 is removing itself from the range, the leaseholder will be
// transferred to n6 regardless of the value of transferLease.
newLeaseholderIdx := 0
if transferLease || fromIdx == 0 {
newLeaseholderIdx = 5
}
newLeaseNode := roachpb.NodeID(newLeaseholderIdx + 1)

ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)

disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2, 3, 4)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close(fmt.Sprintf("send_queue_range_relocate_from_%s", fromString))

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
newLeaseDB := sqlutils.MakeSQLRunner(tc.ServerConn(newLeaseholderIdx))
h.waitForConnectedStreams(ctx, desc.RangeID, 5, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 5, 0 /* serverIdx */)

// Block admission on n3, while allowing every other node to admit.
setTokenReturnEnabled(true /* enabled */, 0, 1, 3, 4, 5)
setTokenReturnEnabled(false /* enabled */, 2)
// Drain the tokens to n3 by blocking admission and issuing the buffer
// size of writes to the range.
h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1),
testingMkFlowStream(3), testingMkFlowStream(4))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

beforeString := fmt.Sprintf("%v", fromServerIdxs)
afterString := fmt.Sprintf("%v", toServerIdxs)

h.comment(fmt.Sprintf(`
-- Issuing RelocateRange:
-- before=%s
-- after =%s
-- Transferring the lease: %v.`, beforeString, afterString, transferLease))
testutils.SucceedsSoon(t, func() error {
if err := tc.Servers[2].DB().
AdminRelocateRange(
context.Background(),
desc.StartKey.AsRawKey(),
tc.Targets(toServerIdxs...),
nil, /* nonVoterTargets */
transferLease, /* transferLeaseToFirstVoter */
); err != nil {
return err
}
var err error
desc, err = tc.LookupRange(k)
if err != nil {
return err
}
rset := desc.Replicas()
if fullDescs := rset.VoterFullAndNonVoterDescriptors(); len(fullDescs) != 5 {
return errors.Errorf(
"expected 5 voters, got %v (replica_set=%v)", fullDescs, rset)
}
if rset.HasReplicaOnNode(fromNode) {
return errors.Errorf(
"expected no replica on node %v (replica_set=%v)", fromNode, rset)
}
if !rset.HasReplicaOnNode(toNode) {
return errors.Errorf(
"expected replica on node 6 (replica_set=%v)", rset)
}
leaseHolder, err := tc.FindRangeLeaseHolder(desc, nil)
if err != nil {
return err
}
expLeaseTarget := tc.Target(newLeaseholderIdx)
if !leaseHolder.Equal(expLeaseTarget) {
return errors.Errorf(
"expected leaseholder to be on %v found %v (replica_set=%v)",
expLeaseTarget, leaseHolder, rset)
}
return nil
})

h.waitForConnectedStreams(ctx, desc.RangeID, 5, newLeaseholderIdx)
h.comment(`(Sending 1 MiB put request to the relocated range)`)
h.put(contextWithTestGeneratedPut(ctx), k, 1, admissionpb.NormalPri, newLeaseholderIdx)
h.comment(`(Sent 1 MiB put request to the relocated range)`)

h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1),
testingMkFlowStream(3), testingMkFlowStream(4))

toStreams := make([]kvflowcontrol.Stream, 0, len(toServerIdxs))
for _, toServerIdx := range toServerIdxs {
if toServerIdx != 2 /* send queue server */ {
toStreams = append(toStreams, testingMkFlowStream(toServerIdx))
}
}
h.waitForAllTokensReturnedForStreamsV2(ctx, newLeaseholderIdx, toStreams...)

h.comment(`-- Observe the total tracked tokens per-stream on n1.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

if newLeaseholderIdx != 0 {
// Avoid double printing if the lease hasn't moved.
h.comment(fmt.Sprintf(`
-- Observe the total tracked tokens per-stream on new leaseholder n%v.`, newLeaseNode))
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

}

// Allow admission to proceed on n3 and wait for all tokens to be returned.
h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 6 /* expStreamCount */, 0 /* serverIdx */)
if transferLease && fromIdx != 0 {
// When the lease is transferred first, the leaseholder is relocated to
// n6 after the fromNode is removed. In this case, we expect the
// leaseholder will have only 5 streams, because it will have never
// seen s3's stream as its replica was already removed from the range.
h.waitForAllTokensReturned(ctx, 5 /* expStreamCount */, newLeaseholderIdx)
} else {
h.waitForAllTokensReturned(ctx, 6 /* expStreamCount */, newLeaseholderIdx)
}

h.comment(`
-- Send queue and flow token metrics from n1. All tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
h.comment(fmt.Sprintf(`
-- Send queue and flow token metrics from leaseholder n%v.
-- All tokens should be returned.`, newLeaseNode))
h.query(newLeaseDB, flowSendQueueQueryStr)
h.query(newLeaseDB, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
})
})
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
Loading