Skip to content

Commit

Permalink
kvserver: add TestFlowControlSendQueueRangeRelocate test
Browse files Browse the repository at this point in the history
Add a flow control integration test
`TestFlowControlSendQueueRangeRelocate`. The test has 6 variations,
either transferring or not transferring the lease ontop:

```
We three relocate variations (*=lh,^=send_queue):
- [n1*,n2 ,n3^,n4 ,n5] -> [n2 ,n3^,n4 ,n5 ,n6*] (transfer_lease)
  - The leader and leaseholder is relocated.
- [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n4 ,n5 ,n6 ]
  - The replica with a send queue is relocated.
- [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n3^,n4 ,n6 ]
  - The replica without a send queue is relocated.
```

Epic: none
Release note: None
  • Loading branch information
kvoli committed Nov 16, 2024
1 parent 193c319 commit 561ef87
Show file tree
Hide file tree
Showing 8 changed files with 1,205 additions and 0 deletions.
248 changes: 248 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5379,6 +5379,254 @@ func TestFlowControlSendQueueRangeMigrate(t *testing.T) {
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

func TestFlowControlSendQueueRangeRelocate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// We three relocate variations (*=lh,^=send_queue):
// - [n1*,n2 ,n3^,n4 ,n5] -> [n2 ,n3^,n4 ,n5 ,n6*] (transfer_lease)
// - The leader and leaseholder is relocated.
// - [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n4 ,n5 ,n6 ]
// - The replica with a send queue is relocated.
// - [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n3^,n4 ,n6 ]
// - The replica without a send queue is relocated.
testutils.RunValues(t, "from", []int{0, 2, 4}, func(t *testing.T, fromIdx int) {
testutils.RunTrueAndFalse(t, "transfer_lease", func(t *testing.T, transferLease bool) {
const numNodes = 6
// The transferLease arg indicates whether the AdminRelocateRange request
// will also transfer the lease to the voter which is in the first
// position of the target list. We always place n6 in the first position
// and pass in the transferLease arg to AdminRelocateRange.
fromNode := roachpb.NodeID(fromIdx + 1)
toNode := roachpb.NodeID(numNodes)
fromServerIdxs := []int{0, 1, 2, 3, 4}
toServerIdxs := []int{numNodes - 1}
for i := 0; i < numNodes-1; i++ {
if i != fromIdx {
toServerIdxs = append(toServerIdxs, i)
}
}
var fromString string
if fromIdx == 0 {
fromString = "leader_store"
} else if fromIdx == 2 {
fromString = "send_queue_store"
} else {
fromString = "has_token_store"
}
if transferLease {
fromString += "_transfer_lease"
}
// If n1 is removing itself from the range, the leaseholder will be
// transferred to n6 regardless of the value of transferLease.
newLeaseholderIdx := 0
if transferLease || fromIdx == 0 {
newLeaseholderIdx = 5
}
newLeaseNode := roachpb.NodeID(newLeaseholderIdx + 1)

ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)

disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2, 3, 4)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close(fmt.Sprintf("send_queue_range_relocate_from_%s", fromString))

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
newLeaseDB := sqlutils.MakeSQLRunner(tc.ServerConn(newLeaseholderIdx))
h.waitForConnectedStreams(ctx, desc.RangeID, 5, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 5, 0 /* serverIdx */)

// Block admission on n3, while allowing every other node to admit.
setTokenReturnEnabled(true /* enabled */, 0, 1, 3, 4, 5)
setTokenReturnEnabled(false /* enabled */, 2)
// Drain the tokens to n3 by blocking admission and issuing the buffer
// size of writes to the range.
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1), testingMkFlowStream(3), testingMkFlowStream(4))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

beforeString := fmt.Sprintf("%v", fromServerIdxs)
afterString := fmt.Sprintf("%v", toServerIdxs)

h.comment(fmt.Sprintf(`
-- Issuing RelocateRange:
-- before=%s
-- after =%s
-- Transferring the lease %v.`, beforeString, afterString, transferLease))
testutils.SucceedsSoon(t, func() error {
if err := tc.Servers[2].DB().
AdminRelocateRange(
context.Background(),
desc.StartKey.AsRawKey(),
tc.Targets(toServerIdxs...),
nil, /* nonVoterTargets */
transferLease, /* transferLeaseToFirstVoter */
); err != nil {
return err
}
var err error
desc, err = tc.LookupRange(k)
if err != nil {
return err
}
rset := desc.Replicas()
if fullDescs := rset.VoterFullAndNonVoterDescriptors(); len(fullDescs) != 5 {
return errors.Errorf(
"expected 5 voters, got %v (replica_set=%v)", fullDescs, rset)
}
if rset.HasReplicaOnNode(fromNode) {
return errors.Errorf(
"expected no replica on node %v (replica_set=%v)", fromNode, rset)
}
if !rset.HasReplicaOnNode(toNode) {
return errors.Errorf(
"expected replica on node 6 (replica_set=%v)", rset)
}
leaseHolder, err := tc.FindRangeLeaseHolder(desc, nil)
if err != nil {
return err
}
expLeaseTarget := tc.Target(newLeaseholderIdx)
if !leaseHolder.Equal(expLeaseTarget) {
return errors.Errorf(
"expected leaseholder to be on %v found %v (replica_set=%v)",
expLeaseTarget, leaseHolder, rset)
}
return nil
})

h.waitForConnectedStreams(ctx, desc.RangeID, 5, newLeaseholderIdx)
h.comment(`(Sending 1 MiB put request to the relocated range)`)
h.put(ctx, k, 1, admissionpb.NormalPri, newLeaseholderIdx)
h.comment(`(Sent 1 MiB put request to the relocated range)`)

h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20, /* 4 MiB */
newLeaseholderIdx /* serverIdx */)
h.comment(`-- Observe the total tracked tokens per-stream on n1.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

h.comment(fmt.Sprintf(`
-- Observe the total tracked tokens per-stream on new leaseholder n%v.`, newLeaseNode))
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

// Allow admission to proceed on n3 and wait for all tokens to be returned.
h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 6 /* expStreamCount */, 0 /* serverIdx */)
if transferLease && fromIdx != 0 {
// When the lease is transferred first, the leaseholder is relocated to
// n6 after the fromNode is removed. In this case, we expect the
// leaseholder will have only 5 streams, because it will have never
// seen s3's stream as its replica was already removed from the range.
h.waitForAllTokensReturned(ctx, 5 /* expStreamCount */, newLeaseholderIdx)
} else {
h.waitForAllTokensReturned(ctx, 6 /* expStreamCount */, newLeaseholderIdx)
}

h.comment(`
-- Send queue and flow token metrics from n1. All tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
h.comment(fmt.Sprintf(`
-- Send queue and flow token metrics from leaseholder n%v.
-- All tokens should be returned.`, newLeaseNode))
h.query(newLeaseDB, flowSendQueueQueryStr)
h.query(newLeaseDB, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
})
})
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
echo
----
Loading

0 comments on commit 561ef87

Please sign in to comment.