Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add TestFlowControlSendQueueRangeMigrate test #136264

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func Migrate(
// Set DoTimelyApplicationToAllReplicas so that migrates are applied on all
// replicas. This is done since MigrateRequests trigger a call to
// waitForApplication (see Replica.executeWriteBatch).
if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) {
if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) ||
cArgs.EvalCtx.EvalKnobs().OverrideDoTimelyApplicationToAllReplicas {
pd.Replicated.DoTimelyApplicationToAllReplicas = true
}

return pd, nil
}

Expand Down
219 changes: 219 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -5460,6 +5464,221 @@ func TestFlowControlSendQueueRangeRelocate(t *testing.T) {
})
}

func TestFlowControlSendQueueRangeMigrate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
// We're going to be transitioning from startV to endV. Think a cluster of
// binaries running vX, but with active version vX-1.
startV := clusterversion.PreviousRelease.Version()
endV := clusterversion.Latest.Version()
settings := cluster.MakeTestingClusterSettingsWithVersions(endV, startV, false)
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)
disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClusterVersionOverride: startV,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
Store: &kvserver.StoreTestingKnobs{
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
// Because we are migrating from a version (currently) prior to the
// range force flush key version gate, we won't trigger the force
// flush via migrate until we're on the endV, which defeats the
// purpose of this test. We override the behavior here to allow the
// force flush to be triggered on the startV from a Migrate
// request.
OverrideDoTimelyApplicationToAllReplicas: true,
},
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
OverrideV2EnabledWhenLeaderLevel: func() kvflowcontrol.V2EnabledWhenLeaderLevel {
return kvflowcontrol.V2EnabledWhenLeaderV2Encoding
},
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_range_migrate")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)

store := tc.GetFirstStoreFromServer(t, 0)
assertVersion := func(expV roachpb.Version) error {
repl, err := store.GetReplica(desc.RangeID)
if err != nil {
t.Fatal(err)
}
if gotV := repl.Version(); gotV != expV {
return errors.Errorf("expected in-memory version %s, got %s", expV, gotV)
}

sl := stateloader.Make(desc.RangeID)
persistedV, err := sl.LoadVersion(ctx, store.TODOEngine())
if err != nil {
return err
}
if persistedV != expV {
return errors.Errorf("expected persisted version %s, got %s", expV, persistedV)
}
return nil
}

require.NoError(t, assertVersion(startV))

migrated := false
unregister := batcheval.TestingRegisterMigrationInterceptor(endV, func() {
migrated = true
})
defer unregister()

h.comment(`
-- We will exhaust the tokens across all streams while admission is blocked on
-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then,
-- we will write a 1 MiB put to the range, migrate the range, and write a 1 MiB
-- put to the migrated range. We expect that the migration will trigger a force
-- flush of the send queue.`)
// Block admission on n3, while allowing every other node to admit.
setTokenReturnEnabled(true /* enabled */, 0, 1)
setTokenReturnEnabled(false /* enabled */, 2)
// Drain the tokens to n3 by blocking admission and issuing the buffer
// size of writes to the range.
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Issuing MigrateRequest to range)`)
req := &kvpb.MigrateRequest{
RequestHeader: kvpb.RequestHeader{
Key: desc.StartKey.AsRawKey(),
EndKey: desc.EndKey.AsRawKey(),
},
Version: endV,
}
kvDB := tc.Servers[0].DB()
require.NoError(t, func() error {
if _, pErr := kv.SendWrappedWith(ctx, kvDB.GetFactory().NonTransactionalSender(), kvpb.Header{RangeID: desc.RangeID}, req); pErr != nil {
return pErr.GoError()
}
if !migrated {
return errors.Errorf("expected migration interceptor to have been called")
}
return assertVersion(endV)
}())

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize */, 0 /* serverIdx */)
h.comment(`
-- Send queue and flow token metrics from n1 post-migrate. The migrate should
-- have triggered a force flush of the send queue.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`(Sending 1 MiB put request to the migrated range)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to the migrated range)`)
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))

h.comment(`
-- Send queue and flow token metrics from n1 post-migrate and post 1 MiB put.
-- We expect to see the send queue develop for s3 again.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue and flow token metrics from n1. All tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/kvserverbase/knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ type BatchEvalTestingKnobs struct {
// its record (which can be resolved synchronously with EndTxn). This is
// useful in certain tests.
DisableTxnAutoGC bool

// OverrideDoTimelyApplicationToAllReplicas overrides the cluster version
// check for the timely replication directive which force flushes rac2 send
// queues to all replicas, if present for *Migrate* requests only. When set
// to true, the directive is always set, when set to false, the default
// behavior is used.
//
// NOTE: This currently only applies to Migrate requests and only ignores the
// cluster version.
OverrideDoTimelyApplicationToAllReplicas bool
}

// IntentResolverTestingKnobs contains testing helpers that are used during
Expand Down
Loading
Loading