diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index c4e63185e10c..df4a6b1bfe53 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -4536,13 +4536,6 @@ func TestFlowControlSendQueue(t *testing.T) { } } - mkStream := func(serverIdx int) kvflowcontrol.Stream { - return kvflowcontrol.Stream{ - StoreID: roachpb.StoreID(serverIdx + 1), - TenantID: roachpb.SystemTenantID, - } - } - settings := cluster.MakeTestingClusterSettings() kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll) // We want to exhaust tokens but not overload the test, so we set the limits @@ -4703,7 +4696,7 @@ func TestFlowControlSendQueue(t *testing.T) { setTokenReturnEnabled(true /* enabled */, 0, 1) // Wait for token return on n1, n2. We should only be tracking the tokens for // n3 now. - h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1)) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1)) h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4MiB */, 0 /* serverIdx */) h.comment(`-- Observe the total tracked tokens per-stream on n1.`) h.query(n1, ` @@ -4723,7 +4716,7 @@ func TestFlowControlSendQueue(t *testing.T) { // NB: The write won't be tracked because the quorum [n1,n2] have tokens for // eval. h.waitForTotalTrackedTokens(ctx, desc.RangeID, 5<<20 /* 5 MiB */, 0 /* serverIdx */) - h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0)) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0)) h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* 1MiB expSize */, 0 /* serverIdx */) h.comment(` -- The send queue metrics from n1 should reflect the 1 MiB write being queued @@ -4739,7 +4732,7 @@ func TestFlowControlSendQueue(t *testing.T) { h.waitForConnectedStreams(ctx, desc.RangeID, 2, 0 /* serverIdx */) // There should also be 5 MiB of tracked tokens for n1->n3, 4 + 1 MiB. h.waitForTotalTrackedTokens(ctx, desc.RangeID, 5<<20 /* 5 MiB */, 0 /* serverIdx */) - h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1)) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1)) h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize */, 0 /* serverIdx */) h.comment(` -- Flow token metrics from n1, the disconnect should be reflected in the metrics.`) @@ -4767,7 +4760,7 @@ func TestFlowControlSendQueue(t *testing.T) { h.comment(`-- (Disabling wait-for-eval bypass.)`) noopWaitForEval.Store(false) h.waitForTotalTrackedTokens(ctx, desc.RangeID, 6<<20 /* 6 MiB */, 0 /* serverIdx */) - h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1)) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1)) h.comment(` -- Send queue metrics from n1, n3's should not be allowed to form a send queue.`) @@ -4827,7 +4820,7 @@ ORDER BY streams DESC; // admission is allowed. While n4,n5 will continue to track as they are // blocked from admitting. h.waitForTotalTrackedTokens(ctx, desc.RangeID, 8<<20 /* 8 MiB */, 0 /* serverIdx */) - h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1), mkStream(2)) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1), testingMkFlowStream(2)) h.comment(` -- From n1. We should expect to see the unblocked streams quickly -- untrack as admission is allowed (so not observed here), while n4,n5 will continue @@ -4845,7 +4838,7 @@ ORDER BY streams DESC; // quickly admits and untracks. While n4,n5 queue the write, not sending the // msg, deducting and tracking the entry tokens. h.waitForTotalTrackedTokens(ctx, desc.RangeID, 8<<20 /* 8 MiB */, 0 /* serverIdx */) - h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1), mkStream(2)) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1), testingMkFlowStream(2)) h.comment(` -- Send queue and flow token metrics from n1. The 1 MiB write should be queued -- for n4,n5, while the quorum (n1,n2,n3) proceeds. @@ -4891,7 +4884,7 @@ ORDER BY streams DESC; // Expect 4 x 4 MiB tracked tokens for the 4 MiB write = 16 MiB. // Expect 2 x 1 MiB tracked tokens for the 1 MiB write = 2 MiB. h.waitForTotalTrackedTokens(ctx, desc.RangeID, 18<<20 /* 18MiB */, 0 /* serverIdx */) - h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0)) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0)) h.comment(` -- Observe the total tracked tokens per-stream on n1. We should expect to see the -- 1 MiB write being tracked across a quorum of streams, while the 4 MiB write @@ -5150,6 +5143,280 @@ func TestFlowControlSendQueueManyInflight(t *testing.T) { h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) } +func testingMkFlowStream(serverIdx int) kvflowcontrol.Stream { + return kvflowcontrol.Stream{ + StoreID: roachpb.StoreID(serverIdx + 1), + TenantID: roachpb.SystemTenantID, + } +} + +// TestFlowControlSendQueueRangeRelocate exercises the send queue formation, +// prevention and flushing via selective (logical) admission of entries and token +// return. It also exercises the behavior of the send queue when a range is +// relocated. See the initial comment for an overview of the test structure. +func TestFlowControlSendQueueRangeRelocate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + // We use three relocate variations (*=lh,^=send_queue): + // - [n1*,n2 ,n3^,n4 ,n5] -> [n2 ,n3^,n4 ,n5 ,n6*] (transfer_lease) + // - The leader and leaseholder is relocated. + // - [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n4 ,n5 ,n6 ] + // - The replica with a send queue is relocated. + // - [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n3^,n4 ,n6 ] + // - The replica without a send queue is relocated. + testutils.RunValues(t, "from", []int{0, 2, 4}, func(t *testing.T, fromIdx int) { + testutils.RunTrueAndFalse(t, "transfer_lease", func(t *testing.T, transferLease bool) { + const numNodes = 6 + // The transferLease arg indicates whether the AdminRelocateRange request + // will also transfer the lease to the voter which is in the first + // position of the target list. We always place n6 in the first position + // and pass in the transferLease arg to AdminRelocateRange. + fromNode := roachpb.NodeID(fromIdx + 1) + toNode := roachpb.NodeID(numNodes) + fromServerIdxs := []int{0, 1, 2, 3, 4} + toServerIdxs := []int{numNodes - 1} + for i := 0; i < numNodes-1; i++ { + if i != fromIdx { + toServerIdxs = append(toServerIdxs, i) + } + } + var fromString string + if fromIdx == 0 { + fromString = "leader_store" + } else if fromIdx == 2 { + fromString = "send_queue_store" + } else { + fromString = "has_token_store" + } + if transferLease { + fromString += "_transfer_lease" + } + // If n1 is removing itself from the range, the leaseholder will be + // transferred to n6 regardless of the value of transferLease. + newLeaseholderIdx := 0 + if transferLease || fromIdx == 0 { + newLeaseholderIdx = 5 + } + newLeaseNode := roachpb.NodeID(newLeaseholderIdx + 1) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll) + // We want to exhaust tokens but not overload the test, so we set the limits + // lower (8 and 16 MiB default). + kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20) + kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20) + + disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes) + setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) { + for _, serverIdx := range serverIdxs { + disableWorkQueueGrantingServers[serverIdx].Store(!enabled) + } + } + + argsPerServer := make(map[int]base.TestServerArgs) + for i := range disableWorkQueueGrantingServers { + disableWorkQueueGrantingServers[i].Store(true) + argsPerServer[i] = base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens { + // Deduct every write as 1 MiB, regardless of how large it + // actually is. + return kvflowcontrol.Tokens(1 << 20) + }, + // We want to test the behavior of the send queue, so we want to + // always have up-to-date stats. This ensures that the send queue + // stats are always refreshed on each call to + // RangeController.HandleRaftEventRaftMuLocked. + OverrideAlwaysRefreshSendStreamStats: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + idx := i + return disableWorkQueueGrantingServers[idx].Load() + }, + }, + }, + } + } + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: argsPerServer, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2, 3, 4)...) + + h := newFlowControlTestHelper( + t, tc, "flow_control_integration_v2", /* testdata */ + kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */ + ) + h.init(kvflowcontrol.ApplyToAll) + defer h.close(fmt.Sprintf("send_queue_range_relocate_from_%s", fromString)) + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.enableVerboseRaftMsgLoggingForRange(desc.RangeID) + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + newLeaseDB := sqlutils.MakeSQLRunner(tc.ServerConn(newLeaseholderIdx)) + h.waitForConnectedStreams(ctx, desc.RangeID, 5, 0 /* serverIdx */) + h.resetV2TokenMetrics(ctx) + h.waitForConnectedStreams(ctx, desc.RangeID, 5, 0 /* serverIdx */) + + // Block admission on n3, while allowing every other node to admit. + setTokenReturnEnabled(true /* enabled */, 0, 1, 3, 4, 5) + setTokenReturnEnabled(false /* enabled */, 2) + // Drain the tokens to n3 by blocking admission and issuing the buffer + // size of writes to the range. + h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */) + + h.comment(`(Sending 1 MiB put request to develop a send queue)`) + h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.comment(`(Sent 1 MiB put request)`) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */ + testingMkFlowStream(0), testingMkFlowStream(1), + testingMkFlowStream(3), testingMkFlowStream(4)) + h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */) + + h.comment(` +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`) + h.query(n1, flowSendQueueQueryStr) + h.comment(` +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + h.comment(` +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3.`) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + beforeString := fmt.Sprintf("%v", fromServerIdxs) + afterString := fmt.Sprintf("%v", toServerIdxs) + + h.comment(fmt.Sprintf(` +-- Issuing RelocateRange: +-- before=%s +-- after =%s +-- Transferring the lease: %v.`, beforeString, afterString, transferLease)) + testutils.SucceedsSoon(t, func() error { + if err := tc.Servers[2].DB(). + AdminRelocateRange( + context.Background(), + desc.StartKey.AsRawKey(), + tc.Targets(toServerIdxs...), + nil, /* nonVoterTargets */ + transferLease, /* transferLeaseToFirstVoter */ + ); err != nil { + return err + } + var err error + desc, err = tc.LookupRange(k) + if err != nil { + return err + } + rset := desc.Replicas() + if fullDescs := rset.VoterFullAndNonVoterDescriptors(); len(fullDescs) != 5 { + return errors.Errorf( + "expected 5 voters, got %v (replica_set=%v)", fullDescs, rset) + } + if rset.HasReplicaOnNode(fromNode) { + return errors.Errorf( + "expected no replica on node %v (replica_set=%v)", fromNode, rset) + } + if !rset.HasReplicaOnNode(toNode) { + return errors.Errorf( + "expected replica on node 6 (replica_set=%v)", rset) + } + leaseHolder, err := tc.FindRangeLeaseHolder(desc, nil) + if err != nil { + return err + } + expLeaseTarget := tc.Target(newLeaseholderIdx) + if !leaseHolder.Equal(expLeaseTarget) { + return errors.Errorf( + "expected leaseholder to be on %v found %v (replica_set=%v)", + expLeaseTarget, leaseHolder, rset) + } + return nil + }) + + h.waitForConnectedStreams(ctx, desc.RangeID, 5, newLeaseholderIdx) + h.comment(`(Sending 1 MiB put request to the relocated range)`) + h.put(contextWithTestGeneratedPut(ctx), k, 1, admissionpb.NormalPri, newLeaseholderIdx) + h.comment(`(Sent 1 MiB put request to the relocated range)`) + + h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */ + testingMkFlowStream(0), testingMkFlowStream(1), + testingMkFlowStream(3), testingMkFlowStream(4)) + + toStreams := make([]kvflowcontrol.Stream, 0, len(toServerIdxs)) + for _, toServerIdx := range toServerIdxs { + if toServerIdx != 2 /* send queue server */ { + toStreams = append(toStreams, testingMkFlowStream(toServerIdx)) + } + } + h.waitForAllTokensReturnedForStreamsV2(ctx, newLeaseholderIdx, toStreams...) + + h.comment(`-- Observe the total tracked tokens per-stream on n1.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + if newLeaseholderIdx != 0 { + // Avoid double printing if the lease hasn't moved. + h.comment(fmt.Sprintf(` +-- Observe the total tracked tokens per-stream on new leaseholder n%v.`, newLeaseNode)) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + + } + + // Allow admission to proceed on n3 and wait for all tokens to be returned. + h.comment(`-- (Allowing below-raft admission to proceed on n3.)`) + setTokenReturnEnabled(true /* enabled */, 2) + h.waitForAllTokensReturned(ctx, 6 /* expStreamCount */, 0 /* serverIdx */) + if transferLease && fromIdx != 0 { + // When the lease is transferred first, the leaseholder is relocated to + // n6 after the fromNode is removed. In this case, we expect the + // leaseholder will have only 5 streams, because it will have never + // seen s3's stream as its replica was already removed from the range. + h.waitForAllTokensReturned(ctx, 5 /* expStreamCount */, newLeaseholderIdx) + } else { + h.waitForAllTokensReturned(ctx, 6 /* expStreamCount */, newLeaseholderIdx) + } + + h.comment(` +-- Send queue and flow token metrics from n1. All tokens should be returned.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + h.comment(fmt.Sprintf(` +-- Send queue and flow token metrics from leaseholder n%v. +-- All tokens should be returned.`, newLeaseNode)) + h.query(newLeaseDB, flowSendQueueQueryStr) + h.query(newLeaseDB, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + }) + }) +} + type flowControlTestHelper struct { t testing.TB tc *testcluster.TestCluster diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_has_token_store b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_has_token_store new file mode 100644 index 000000000000..249f4044fee7 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_has_token_store @@ -0,0 +1,153 @@ +echo +---- +---- +(Sending 1 MiB put request to develop a send queue) + + +(Sent 1 MiB put request) + + +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B + + +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 74 | 1 | 0 B + 74 | 2 | 0 B + 74 | 3 | 4.0 MiB + 74 | 4 | 0 B + 74 | 5 | 0 B + + +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -3.0 MiB | 0 B | -2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Issuing RelocateRange: +-- before=[0 1 2 3 4] +-- after =[5 0 1 2 3] +-- Transferring the lease: false. + + +(Sending 1 MiB put request to the relocated range) + + +(Sent 1 MiB put request to the relocated range) + + +-- Observe the total tracked tokens per-stream on n1. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 74 | 1 | 0 B + 74 | 2 | 0 B + 74 | 3 | 4.0 MiB + 74 | 4 | 0 B + 74 | 6 | 0 B + + +-- (Allowing below-raft admission to proceed on n3.) + + +-- Send queue and flow token metrics from n1. All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Send queue and flow token metrics from leaseholder n1. +-- All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_has_token_store_transfer_lease b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_has_token_store_transfer_lease new file mode 100644 index 000000000000..8a267003392f --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_has_token_store_transfer_lease @@ -0,0 +1,155 @@ +echo +---- +---- +(Sending 1 MiB put request to develop a send queue) + + +(Sent 1 MiB put request) + + +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B + + +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 74 | 1 | 0 B + 74 | 2 | 0 B + 74 | 3 | 4.0 MiB + 74 | 4 | 0 B + 74 | 5 | 0 B + + +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -3.0 MiB | 0 B | -2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Issuing RelocateRange: +-- before=[0 1 2 3 4] +-- after =[5 0 1 2 3] +-- Transferring the lease: true. + + +(Sending 1 MiB put request to the relocated range) + + +(Sent 1 MiB put request to the relocated range) + + +-- Observe the total tracked tokens per-stream on n1. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + + +-- Observe the total tracked tokens per-stream on new leaseholder n6. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + + +-- (Allowing below-raft admission to proceed on n3.) + + +-- Send queue and flow token metrics from n1. All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Send queue and flow token metrics from leaseholder n6. +-- All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_leader_store b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_leader_store new file mode 100644 index 000000000000..485094b3fff0 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_leader_store @@ -0,0 +1,156 @@ +echo +---- +---- +(Sending 1 MiB put request to develop a send queue) + + +(Sent 1 MiB put request) + + +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B + + +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 74 | 1 | 0 B + 74 | 2 | 0 B + 74 | 3 | 4.0 MiB + 74 | 4 | 0 B + 74 | 5 | 0 B + + +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -3.0 MiB | 0 B | -2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Issuing RelocateRange: +-- before=[0 1 2 3 4] +-- after =[5 1 2 3 4] +-- Transferring the lease: false. + + +(Sending 1 MiB put request to the relocated range) + + +(Sent 1 MiB put request to the relocated range) + + +-- Observe the total tracked tokens per-stream on n1. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + + +-- Observe the total tracked tokens per-stream on new leaseholder n6. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + + +-- (Allowing below-raft admission to proceed on n3.) + + +-- Send queue and flow token metrics from n1. All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Send queue and flow token metrics from leaseholder n6. +-- All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_leader_store_transfer_lease b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_leader_store_transfer_lease new file mode 100644 index 000000000000..a2177bd3592d --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_leader_store_transfer_lease @@ -0,0 +1,157 @@ +echo +---- +---- +(Sending 1 MiB put request to develop a send queue) + + +(Sent 1 MiB put request) + + +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B + + +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 74 | 1 | 0 B + 74 | 2 | 0 B + 74 | 3 | 4.0 MiB + 74 | 4 | 0 B + 74 | 5 | 0 B + + +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -3.0 MiB | 0 B | -2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Issuing RelocateRange: +-- before=[0 1 2 3 4] +-- after =[5 1 2 3 4] +-- Transferring the lease: true. + + +(Sending 1 MiB put request to the relocated range) + + +(Sent 1 MiB put request to the relocated range) + + +-- Observe the total tracked tokens per-stream on n1. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + + +-- Observe the total tracked tokens per-stream on new leaseholder n6. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + + +-- (Allowing below-raft admission to proceed on n3.) + + +-- Send queue and flow token metrics from n1. All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Send queue and flow token metrics from leaseholder n6. +-- All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB +---- +---- + + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_send_queue_store b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_send_queue_store new file mode 100644 index 000000000000..d1bb218c1a1e --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_send_queue_store @@ -0,0 +1,153 @@ +echo +---- +---- +(Sending 1 MiB put request to develop a send queue) + + +(Sent 1 MiB put request) + + +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B + + +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 74 | 1 | 0 B + 74 | 2 | 0 B + 74 | 3 | 4.0 MiB + 74 | 4 | 0 B + 74 | 5 | 0 B + + +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -3.0 MiB | 0 B | -2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Issuing RelocateRange: +-- before=[0 1 2 3 4] +-- after =[5 0 1 3 4] +-- Transferring the lease: false. + + +(Sending 1 MiB put request to the relocated range) + + +(Sent 1 MiB put request to the relocated range) + + +-- Observe the total tracked tokens per-stream on n1. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 74 | 1 | 0 B + 74 | 2 | 0 B + 74 | 4 | 0 B + 74 | 5 | 0 B + 74 | 6 | 0 B + + +-- (Allowing below-raft admission to proceed on n3.) + + +-- Send queue and flow token metrics from n1. All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Send queue and flow token metrics from leaseholder n1. +-- All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB +---- +---- + +# vim:ft=sql diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_send_queue_store_transfer_lease b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_send_queue_store_transfer_lease new file mode 100644 index 000000000000..48e83a3e3e80 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_relocate_from_send_queue_store_transfer_lease @@ -0,0 +1,155 @@ +echo +---- +---- +(Sending 1 MiB put request to develop a send queue) + + +(Sent 1 MiB put request) + + +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B + + +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 74 | 1 | 0 B + 74 | 2 | 0 B + 74 | 3 | 4.0 MiB + 74 | 4 | 0 B + 74 | 5 | 0 B + + +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -3.0 MiB | 0 B | -2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Issuing RelocateRange: +-- before=[0 1 2 3 4] +-- after =[5 0 1 3 4] +-- Transferring the lease: true. + + +(Sending 1 MiB put request to the relocated range) + + +(Sent 1 MiB put request to the relocated range) + + +-- Observe the total tracked tokens per-stream on n1. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + + +-- Observe the total tracked tokens per-stream on new leaseholder n6. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + + +-- (Allowing below-raft admission to proceed on n3.) + + +-- Send queue and flow token metrics from n1. All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + + +-- Send queue and flow token metrics from leaseholder n6. +-- All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 4 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 5 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 6 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB +---- +---- + +# vim:ft=sql