Skip to content

Commit

Permalink
kvserver: add TestFlowControlSendQueueRangeMigrate test
Browse files Browse the repository at this point in the history
Add a new rac2 flow control integration test,
`TestFlowControlSendQueueRangeMigrate`.

This test takes the following steps:

```sql
-- We will exhaust the tokens across all streams while admission is blocked on
-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then,
-- we will write a 1 MiB put to the range, migrate the range, and write a 1 MiB
-- put to the migrated range. We expect that the migration will trigger a force
-- flush of the send queue.
```

Part of: #132614
Release note: None
  • Loading branch information
kvoli committed Dec 2, 2024
1 parent e6d48ec commit 5a21e39
Show file tree
Hide file tree
Showing 4 changed files with 393 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func Migrate(
// Set DoTimelyApplicationToAllReplicas so that migrates are applied on all
// replicas. This is done since MigrateRequests trigger a call to
// waitForApplication (see Replica.executeWriteBatch).
if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) {
if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) ||
cArgs.EvalCtx.EvalKnobs().OverrideDoTimelyApplicationToAllReplicas {
pd.Replicated.DoTimelyApplicationToAllReplicas = true
}

return pd, nil
}

Expand Down
219 changes: 219 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -5460,6 +5464,221 @@ func TestFlowControlSendQueueRangeRelocate(t *testing.T) {
})
}

func TestFlowControlSendQueueRangeMigrate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
// We're going to be transitioning from startV to endV. Think a cluster of
// binaries running vX, but with active version vX-1.
startV := clusterversion.PreviousRelease.Version()
endV := clusterversion.Latest.Version()
settings := cluster.MakeTestingClusterSettingsWithVersions(endV, startV, false)
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)
disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClusterVersionOverride: startV,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
Store: &kvserver.StoreTestingKnobs{
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
// Because we are migrating from a version (currently) prior to the
// range force flush key version gate, we won't trigger the force
// flush via migrate until we're on the endV, which defeats the
// purpose of this test. We override the behavior here to allow the
// force flush to be triggered on the startV from a Migrate
// request.
OverrideDoTimelyApplicationToAllReplicas: true,
},
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
OverrideV2EnabledWhenLeaderLevel: func() kvflowcontrol.V2EnabledWhenLeaderLevel {
return kvflowcontrol.V2EnabledWhenLeaderV2Encoding
},
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_range_migrate")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)

store := tc.GetFirstStoreFromServer(t, 0)
assertVersion := func(expV roachpb.Version) error {
repl, err := store.GetReplica(desc.RangeID)
if err != nil {
t.Fatal(err)
}
if gotV := repl.Version(); gotV != expV {
return errors.Errorf("expected in-memory version %s, got %s", expV, gotV)
}

sl := stateloader.Make(desc.RangeID)
persistedV, err := sl.LoadVersion(ctx, store.TODOEngine())
if err != nil {
return err
}
if persistedV != expV {
return errors.Errorf("expected persisted version %s, got %s", expV, persistedV)
}
return nil
}

require.NoError(t, assertVersion(startV))

migrated := false
unregister := batcheval.TestingRegisterMigrationInterceptor(endV, func() {
migrated = true
})
defer unregister()

h.comment(`
-- We will exhaust the tokens across all streams while admission is blocked on
-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then,
-- we will write a 1 MiB put to the range, migrate the range, and write a 1 MiB
-- put to the migrated range. We expect that the migration will trigger a force
-- flush of the send queue.`)
// Block admission on n3, while allowing every other node to admit.
setTokenReturnEnabled(true /* enabled */, 0, 1)
setTokenReturnEnabled(false /* enabled */, 2)
// Drain the tokens to n3 by blocking admission and issuing the buffer
// size of writes to the range.
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Issuing MigrateRequest to range)`)
req := &kvpb.MigrateRequest{
RequestHeader: kvpb.RequestHeader{
Key: desc.StartKey.AsRawKey(),
EndKey: desc.EndKey.AsRawKey(),
},
Version: endV,
}
kvDB := tc.Servers[0].DB()
require.NoError(t, func() error {
if _, pErr := kv.SendWrappedWith(ctx, kvDB.GetFactory().NonTransactionalSender(), kvpb.Header{RangeID: desc.RangeID}, req); pErr != nil {
return pErr.GoError()
}
if !migrated {
return errors.Errorf("expected migration interceptor to have been called")
}
return assertVersion(endV)
}())

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize */, 0 /* serverIdx */)
h.comment(`
-- Send queue and flow token metrics from n1 post-migrate. The migrate should
-- have triggered a force flush of the send queue.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`(Sending 1 MiB put request to the migrated range)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to the migrated range)`)
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))

h.comment(`
-- Send queue and flow token metrics from n1 post-migrate and post 1 MiB put.
-- We expect to see the send queue develop for s3 again.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue and flow token metrics from n1. All tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/kvserverbase/knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ type BatchEvalTestingKnobs struct {
// its record (which can be resolved synchronously with EndTxn). This is
// useful in certain tests.
DisableTxnAutoGC bool

// OverrideDoTimelyApplicationToAllReplicas overrides setting the timely
// replication directive which force flushes rac2 send queues to all
// replicas, if present. When set to true, the directive is always set, when
// set to false, the default behavior is used.
//
// NOTE: This currently only applies to Migrate requests.
OverrideDoTimelyApplicationToAllReplicas bool
}

// IntentResolverTestingKnobs contains testing helpers that are used during
Expand Down
Loading

0 comments on commit 5a21e39

Please sign in to comment.