diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index c136d1cdcefa..b541b4c66924 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -4966,7 +4966,7 @@ func BenchmarkFlowControlV2Basic(b *testing.B) { testutils.RunValues(b, "kvadmission.flow_control.mode", []kvflowcontrol.ModeT{ kvflowcontrol.ApplyToElastic, kvflowcontrol.ApplyToAll, - }, func(t *testing.B, mode kvflowcontrol.ModeT) { + }, func(b *testing.B, mode kvflowcontrol.ModeT) { ctx := context.Background() settings := cluster.MakeTestingClusterSettings() tc := testcluster.StartTestCluster(b, 3, base.TestClusterArgs{ @@ -4980,6 +4980,11 @@ func BenchmarkFlowControlV2Basic(b *testing.B) { OverrideV2EnabledWhenLeaderLevel: func() kvflowcontrol.V2EnabledWhenLeaderLevel { return v2EnabledWhenLeaderLevel }, + OverrideTokenDeduction: func(_ kvflowcontrol.Tokens) kvflowcontrol.Tokens { + // This test makes use of (small) increment requests, but + // wants to see large token deductions/returns. + return kvflowcontrol.Tokens(1 << 20 /* 1MiB */) + }, }, }, AdmissionControl: &admission.TestingKnobs{ @@ -5001,9 +5006,17 @@ func BenchmarkFlowControlV2Basic(b *testing.B) { require.NoError(b, err) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + incArgs := incrementArgs(k, int64(1)) b.ResetTimer() for i := 0; i < b.N; i++ { - h.put(ctx, k, 1<<10 /* 1KiB */, testFlowModeToPri(mode)) + if _, err := kv.SendWrappedWithAdmission( + ctx, tc.Server(0).DB().NonTransactionalSender(), kvpb.Header{}, + kvpb.AdmissionHeader{ + Priority: int32(testFlowModeToPri(mode)), + Source: kvpb.AdmissionHeader_FROM_SQL, + }, incArgs); err != nil { + b.Fatal(err) + } } h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) b.StopTimer()