diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 41f375246302..c3b77836fbc4 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -17,12 +17,15 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -5160,7 +5163,7 @@ func TestFlowControlSendQueueRangeMerge(t *testing.T) { h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) h.comment(`-- (Merging RHS back into LHS)`) - // XXX: FAILS HERE ON THE MERGE. + // XXX: FAILS HERE ON THE MERGE. merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey()) h.comment(`-- (Merged RHS back into LHS)`) @@ -5179,6 +5182,203 @@ func TestFlowControlSendQueueRangeMerge(t *testing.T) { h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) } +func TestFlowControlSendQueueRangeMigrate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + const numNodes = 3 + // We're going to be transitioning from startV to endV. Think a cluster of + // binaries running vX, but with active version vX-1. + startV := clusterversion.PreviousRelease.Version() + endV := clusterversion.Latest.Version() + settings := cluster.MakeTestingClusterSettingsWithVersions(endV, startV, false) + kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll) + // We want to exhaust tokens but not overload the test, so we set the limits + // lower (8 and 16 MiB default). + kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20) + kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20) + disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes) + setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) { + for _, serverIdx := range serverIdxs { + disableWorkQueueGrantingServers[serverIdx].Store(!enabled) + } + } + + argsPerServer := make(map[int]base.TestServerArgs) + for i := range disableWorkQueueGrantingServers { + disableWorkQueueGrantingServers[i].Store(true) + argsPerServer[i] = base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClusterVersionOverride: startV, + DisableAutomaticVersionUpgrade: make(chan struct{}), + }, + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + OverrideV2EnabledWhenLeaderLevel: func() kvflowcontrol.V2EnabledWhenLeaderLevel { + return kvflowcontrol.V2EnabledWhenLeaderV2Encoding + }, + UseOnlyForScratchRanges: true, + OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens { + // Deduct every write as 1 MiB, regardless of how large it + // actually is. + return kvflowcontrol.Tokens(1 << 20) + }, + // We want to test the behavior of the send queue, so we want to + // always have up-to-date stats. This ensures that the send queue + // stats are always refreshed on each call to + // RangeController.HandleRaftEventRaftMuLocked. + OverrideAlwaysRefreshSendStreamStats: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + idx := i + return disableWorkQueueGrantingServers[idx].Load() + }, + }, + }, + } + } + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: argsPerServer, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + h := newFlowControlTestHelper( + t, tc, "flow_control_integration_v2", /* testdata */ + kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */ + ) + h.init(kvflowcontrol.ApplyToAll) + defer h.close("send_queue_range_migrate") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.enableVerboseRaftMsgLoggingForRange(desc.RangeID) + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + h.resetV2TokenMetrics(ctx) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + + store := tc.GetFirstStoreFromServer(t, 0) + assertVersion := func(expV roachpb.Version) error { + repl, err := store.GetReplica(desc.RangeID) + if err != nil { + t.Fatal(err) + } + if gotV := repl.Version(); gotV != expV { + return errors.Errorf("expected in-memory version %s, got %s", expV, gotV) + } + + sl := stateloader.Make(desc.RangeID) + persistedV, err := sl.LoadVersion(ctx, store.TODOEngine()) + if err != nil { + return err + } + if persistedV != expV { + return errors.Errorf("expected persisted version %s, got %s", expV, persistedV) + } + return nil + } + + assertVersion(startV) + + migrated := false + unregister := batcheval.TestingRegisterMigrationInterceptor(endV, func() { + migrated = true + }) + defer unregister() + + // Block admission on n3, while allowing every other node to admit. + setTokenReturnEnabled(true /* enabled */, 0, 1) + setTokenReturnEnabled(false /* enabled */, 2) + // Drain the tokens to n3 by blocking admission and issuing the buffer + // size of writes to the range. + h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */) + + h.comment(`(Sending 1 MiB put request to develop a send queue)`) + h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.comment(`(Sent 1 MiB put request)`) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1)) + h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */) + + h.comment(` +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`) + h.query(n1, flowSendQueueQueryStr) + h.comment(` +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + h.comment(` +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3.`) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + h.comment(`-- (Issuing MigrateRequest to range)`) + // MigrateRequest comment: + // MigrateRequest is used instruct all ranges overlapping with it to exercise + // any relevant (below-raft) migrations in order for its range state to conform + // to what's needed by the specified version. It's a core primitive used in our + // migrations infrastructure to phase out legacy code below raft. + // + // KV waits for this command to durably apply on all replicas before returning, + // guaranteeing to the caller that all pre-migration state has been completely + // purged from the system. + + h.comment(`-- (Issuing MigrateRequest to range)`) + req := &kvpb.MigrateRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: desc.StartKey.AsRawKey(), + EndKey: desc.EndKey.AsRawKey(), + }, + // Illogical, however irrelevant because the req times out waiting for all + // replicas to be caught up. + Version: endV, + } + db := tc.Servers[0].DB() + // XXX: FAILS HERE, NEVER RETURNS. + // testutils.SucceedsSoon(t, func() error { + require.NoError(t, func() error { + if _, pErr := kv.SendWrappedWith(ctx, db.GetFactory().NonTransactionalSender(), kvpb.Header{RangeID: desc.RangeID}, req); pErr != nil { + return pErr.GoError() + } + if !migrated { + return errors.Errorf("expected migration interceptor to have been called") + } + return assertVersion(endV) + }()) + // }) + + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + h.comment(`(Sending 1 MiB put request to the migrated range)`) + h.comment(`(Sent 1 MiB put request to the migrated range)`) + + // Allow admission to proceed on n3 and wait for all tokens to be returned. + h.comment(`-- (Allowing below-raft admission to proceed on n3.)`) + setTokenReturnEnabled(true /* enabled */, 2) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) + h.comment(` +-- Send queue and flow token metrics from n1. All tokens should be returned.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) +} + type flowControlTestHelper struct { t testing.TB tc *testcluster.TestCluster diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_migrate b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_migrate new file mode 100644 index 000000000000..ad07fd2183e0 --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_migrate @@ -0,0 +1,2 @@ +echo +----