diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index edf20a8f1b92..c96b15665441 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -401,4 +401,4 @@ trace.span_registry.enabled boolean false if set, ongoing traces can be seen at trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. application ui.database_locality_metadata.enabled boolean true if enabled shows extended locality data about databases and tables in DB Console which can be expensive to compute application ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application -version version 1000024.3-upgrading-to-1000025.1-step-004 set the active cluster version in the format '.' application +version version 1000024.3-upgrading-to-1000025.1-step-006 set the active cluster version in the format '.' application diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 6965082be54c..97bd56cd369f 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -360,6 +360,6 @@
trace.zipkin.collector
stringthe address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.Serverless/Dedicated/Self-Hosted
ui.database_locality_metadata.enabled
booleantrueif enabled shows extended locality data about databases and tables in DB Console which can be expensive to computeServerless/Dedicated/Self-Hosted
ui.display_timezone
enumerationetc/utcthe timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]Serverless/Dedicated/Self-Hosted -
version
version1000024.3-upgrading-to-1000025.1-step-004set the active cluster version in the format '<major>.<minor>'Serverless/Dedicated/Self-Hosted +
version
version1000024.3-upgrading-to-1000025.1-step-006set the active cluster version in the format '<major>.<minor>'Serverless/Dedicated/Self-Hosted diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 8e08749e842e..35ec2e7f28f0 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -238,6 +238,10 @@ const ( // V25_1_AddJobsTables added new jobs tables. V25_1_AddJobsTables + // V25_1_AddRangeForceFlushKey adds the RangeForceFlushKey, a replicated + // range-ID local key, which is written below raft. + V25_1_AddRangeForceFlushKey + // ************************************************* // Step (1) Add new versions above this comment. // Do not add new versions to a patch release. @@ -293,7 +297,8 @@ var versionTable = [numKeys]roachpb.Version{ // v25.1 versions. Internal versions must be even. V25_1_Start: {Major: 24, Minor: 3, Internal: 2}, - V25_1_AddJobsTables: {Major: 24, Minor: 3, Internal: 4}, + V25_1_AddJobsTables: {Major: 24, Minor: 3, Internal: 4}, + V25_1_AddRangeForceFlushKey: {Major: 24, Minor: 3, Internal: 6}, // ************************************************* // Step (2): Add new versions above this comment. diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 8afb9c324520..4affe4876eb9 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -57,6 +57,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/build", + "//pkg/clusterversion", "//pkg/keys", "//pkg/kv/kvpb", "//pkg/kv/kvserver/abortspan", diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index 98c2647b6c1b..20d174ed94d4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" @@ -1363,6 +1364,14 @@ func splitTriggerHelper( // hand side range (i.e. it goes from zero to its stats). RHSDelta: *h.AbsPostSplitRight(), } + // Set DoTimelyApplicationToAllReplicas since splits that are not applied on + // all replicas eventually cause snapshots for the RHS to be sent to + // replicas that already have the unsplit range, *and* these snapshots are + // rejected (which is very wasteful). See the long comment in + // split_delay_helper.go for more details. + if rec.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) { + pd.Replicated.DoTimelyApplicationToAllReplicas = true + } pd.Local.Metrics = &result.Metrics{ SplitsWithEstimatedStats: h.splitsWithEstimates, @@ -1459,6 +1468,15 @@ func mergeTrigger( pd.Replicated.Merge = &kvserverpb.Merge{ MergeTrigger: *merge, } + // Set DoTimelyApplicationToAllReplicas so that merges are applied on all + // replicas. This is not technically necessary since even though + // Replica.AdminMerge calls waitForApplication, that call happens earlier in + // the merge distributed txn, when sending a kvpb.SubsumeRequest. But since + // we have force-flushed once during the merge txn anyway, we choose to + // complete the merge story and finish the merge on all replicas. + if rec.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) { + pd.Replicated.DoTimelyApplicationToAllReplicas = true + } { // If we have GC hints populated that means we are trying to perform diff --git a/pkg/kv/kvserver/batcheval/cmd_migrate.go b/pkg/kv/kvserver/batcheval/cmd_migrate.go index ca7c055839e1..6ff111152357 100644 --- a/pkg/kv/kvserver/batcheval/cmd_migrate.go +++ b/pkg/kv/kvserver/batcheval/cmd_migrate.go @@ -9,6 +9,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" @@ -81,6 +82,13 @@ func Migrate( // as all below-raft migrations (the only users of Migrate) were introduced // after it. pd.Replicated.State.Version = &migrationVersion + // Set DoTimelyApplicationToAllReplicas so that migrates are applied on all + // replicas. This is done since MigrateRequests trigger a call to + // waitForApplication (see Replica.executeWriteBatch). + if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) { + pd.Replicated.DoTimelyApplicationToAllReplicas = true + } + return pd, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index ed3dd183d7fb..9f51dfad31f2 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -10,6 +10,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" @@ -156,5 +157,12 @@ func Subsume( reply.ReadSummary = &priorReadSum reply.ClosedTimestamp = cArgs.EvalCtx.GetCurrentClosedTimestamp(ctx) - return result.Result{}, nil + var pd result.Result + // Set DoTimelyApplicationToAllReplicas so that merges are applied on all + // replicas. This is needed since Replica.AdminMerge calls + // waitForApplication when sending a kvpb.SubsumeRequest. + if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) { + pd.Replicated.DoTimelyApplicationToAllReplicas = true + } + return pd, nil } diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 4448722aa2fd..9aeda9c90445 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -353,6 +353,11 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Replicated.IsProbe = false + if q.Replicated.DoTimelyApplicationToAllReplicas { + p.Replicated.DoTimelyApplicationToAllReplicas = true + } + q.Replicated.DoTimelyApplicationToAllReplicas = false + if p.Local.EncounteredIntents == nil { p.Local.EncounteredIntents = q.Local.EncounteredIntents } else { diff --git a/pkg/kv/kvserver/batcheval/result/result_test.go b/pkg/kv/kvserver/batcheval/result/result_test.go index 0f3cb0915baf..e1d5f1606742 100644 --- a/pkg/kv/kvserver/batcheval/result/result_test.go +++ b/pkg/kv/kvserver/batcheval/result/result_test.go @@ -86,4 +86,10 @@ func TestMergeAndDestroy(t *testing.T) { ForceFlushIndex: roachpb.ForceFlushIndex{Index: 3}, } require.ErrorContains(t, r0.MergeAndDestroy(r3), "must not specify ForceFlushIndex") + + var r4 Result + r4.Replicated.DoTimelyApplicationToAllReplicas = true + require.False(t, r0.Replicated.DoTimelyApplicationToAllReplicas) + require.NoError(t, r0.MergeAndDestroy(r4)) + require.True(t, r0.Replicated.DoTimelyApplicationToAllReplicas) } diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index f35246a87a4b..aefe6c3d218e 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -5460,6 +5460,193 @@ func TestFlowControlSendQueueRangeRelocate(t *testing.T) { }) } +// TestFlowControlSendQueueRangeSplitMerge exercises the send queue formation, +// prevention and force flushing due to range split and merge operations. See +// the initial comment for an overview of the test structure. +func TestFlowControlSendQueueRangeSplitMerge(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + const numNodes = 3 + settings := cluster.MakeTestingClusterSettings() + kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll) + // We want to exhaust tokens but not overload the test, so we set the limits + // lower (8 and 16 MiB default). + kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20) + kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20) + + disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes) + setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) { + for _, serverIdx := range serverIdxs { + disableWorkQueueGrantingServers[serverIdx].Store(!enabled) + } + } + + argsPerServer := make(map[int]base.TestServerArgs) + for i := range disableWorkQueueGrantingServers { + disableWorkQueueGrantingServers[i].Store(true) + argsPerServer[i] = base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens { + // Deduct every write as 1 MiB, regardless of how large it + // actually is. + return kvflowcontrol.Tokens(1 << 20) + }, + // We want to test the behavior of the send queue, so we want to + // always have up-to-date stats. This ensures that the send queue + // stats are always refreshed on each call to + // RangeController.HandleRaftEventRaftMuLocked. + OverrideAlwaysRefreshSendStreamStats: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + idx := i + return disableWorkQueueGrantingServers[idx].Load() + }, + }, + }, + } + } + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: argsPerServer, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + h := newFlowControlTestHelper( + t, tc, "flow_control_integration_v2", /* testdata */ + kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */ + ) + h.init(kvflowcontrol.ApplyToAll) + defer h.close("send_queue_range_split_merge") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.enableVerboseRaftMsgLoggingForRange(desc.RangeID) + h.enableVerboseRaftMsgLoggingForRange(desc.RangeID + 1) + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + // Reset the token metrics, since a send queue may have instantly + // formed when adding one of the replicas, before being quickly + // drained. + h.resetV2TokenMetrics(ctx) + + h.comment(` +-- We will exhaust the tokens across all streams while admission is blocked on +-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then, +-- we will write a 1 MiB put to the range, split it, write a 1 MiB put to the +-- RHS range, merge the ranges, and write a 1 MiB put to the merged range. We +-- expect that at each stage where a send queue develops n1->s3, the send queue +-- will be flushed by the range merge and range split range operations.`) + h.comment(` +-- Start by exhausting the tokens from n1->s3 and blocking admission on s3. +-- (Issuing 4x1MiB regular, 3x replicated write that's not admitted on s3.)`) + setTokenReturnEnabled(true /* enabled */, 0, 1) + setTokenReturnEnabled(false /* enabled */, 2) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */) + + h.comment(`(Sending 1 MiB put request to pre-split range)`) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.comment(`(Sent 1 MiB put request to pre-split range)`) + + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */ + testingMkFlowStream(0), testingMkFlowStream(1)) + h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */) + + h.comment(` +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`) + h.query(n1, flowSendQueueQueryStr) + h.comment(` +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + h.comment(` +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3.`) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + h.comment(`-- (Splitting range.)`) + left, right := tc.SplitRangeOrFatal(t, k.Next()) + h.waitForConnectedStreams(ctx, left.RangeID, 3, 0 /* serverIdx */) + h.waitForConnectedStreams(ctx, right.RangeID, 3, 0 /* serverIdx */) + + h.comment(`-- Observe the newly split off replica, with its own three streams.`) + h.query(n1, ` + SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; +`, "range_id", "stream_count") + h.comment(` +-- Send queue and flow token metrics from n1, post-split. +-- We expect to see a force flush of the send queue for s3.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + h.comment(`(Sending 1 MiB put request to post-split RHS range)`) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.comment(`(Sent 1 MiB put request to post-split RHS range)`) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */ + testingMkFlowStream(0), testingMkFlowStream(1)) + + h.comment(` +-- Send queue and flow token metrics from n1, post-split and 1 MiB put.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + h.comment(`-- (Merging ranges.)`) + merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey()) + h.waitForConnectedStreams(ctx, merged.RangeID, 3, 0 /* serverIdx */) + h.waitForSendQueueSize(ctx, merged.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */) + + h.comment(` +-- Send queue and flow token metrics from n1, post-split-merge. +-- We expect to see a force flush of the send queue for s3 again.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + h.comment(`(Sending 1 MiB put request to post-split-merge range)`) + h.put(ctx, k, 1, admissionpb.NormalPri) + h.comment(`(Sent 1 MiB put request to post-split-merge range)`) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */ + testingMkFlowStream(0), testingMkFlowStream(1)) + h.waitForSendQueueSize(ctx, merged.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */) + + h.comment(` +-- Send queue and flow token metrics from n1, post-split-merge and 1 MiB put. +-- We expect to see the send queue develop for s3 again.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + h.comment(`-- (Allowing below-raft admission to proceed on [n1,n2,n3].)`) + setTokenReturnEnabled(true /* enabled */, 0, 1, 2) + + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) + h.comment(` +-- Send queue and flow token metrics from n1, all tokens should be returned.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) +} + type flowControlTestHelper struct { t testing.TB tc *testcluster.TestCluster diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index bd8eeb37623a..33e8a10dced9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -37,9 +37,9 @@ import ( // closed if the term changes. // // Almost none of the methods are called with Replica.mu held. The caller and -// callee should order their mutexes before Replica.mu. The one exception is -// HoldsSendTokensLocked, which holds both raftMu and Replica.mu. The callee -// must not acquire its own mutex. +// callee should order their mutexes before Replica.mu. The exceptions are +// HoldsSendTokensLocked, ForceFlushIndexChangedLocked, which hold both raftMu +// and Replica.mu. The callee must not acquire its own mutex. // // RangeController dynamically switches between push and pull mode based on // RaftEvent handling. In general, the code here is oblivious to the fact that @@ -100,6 +100,13 @@ type RangeController interface { // // Requires replica.raftMu to be held. SetLeaseholderRaftMuLocked(ctx context.Context, replica roachpb.ReplicaID) + // ForceFlushIndexChangedLocked sets the force flush index, i.e., the index + // (inclusive) up to which all replicas with a send-queue must be + // force-flushed in MsgAppPull mode. It may be rarely called with no change + // to the index. + // + // Requires replica.raftMu and replica.mu to be held. + ForceFlushIndexChangedLocked(ctx context.Context, index uint64) // CloseRaftMuLocked closes the range controller. // // Requires replica.raftMu to be held. @@ -573,6 +580,9 @@ type RangeControllerInitState struct { // NextRaftIndex is the first index that will appear in the next non-empty // RaftEvent.Entries handled by this RangeController. NextRaftIndex uint64 + // FirstFlushIndex is an index up to (and including) which the + // rangeController running in pull mode must force-flush all send streams. + ForceFlushIndex uint64 } // rangeController is tied to a single leader term. @@ -587,8 +597,9 @@ type rangeController struct { replicaSet ReplicaSet // leaseholder can be NoReplicaID or not be in ReplicaSet, i.e., it is // eventually consistent with the set of replicas. - leaseholder roachpb.ReplicaID - nextRaftIndex uint64 + leaseholder roachpb.ReplicaID + nextRaftIndex uint64 + forceFlushIndex uint64 mu struct { // All the fields in this struct are modified while holding raftMu and @@ -673,11 +684,12 @@ func NewRangeController( log.VInfof(ctx, 1, "r%v creating range controller", o.RangeID) } rc := &rangeController{ - opts: o, - term: init.Term, - leaseholder: init.Leaseholder, - nextRaftIndex: init.NextRaftIndex, - replicaMap: make(map[roachpb.ReplicaID]*replicaState), + opts: o, + term: init.Term, + leaseholder: init.Leaseholder, + nextRaftIndex: init.NextRaftIndex, + forceFlushIndex: init.ForceFlushIndex, + replicaMap: make(map[roachpb.ReplicaID]*replicaState), } rc.scheduledMu.replicas = make(map[roachpb.ReplicaID]struct{}) rc.mu.waiterSetRefreshCh = make(chan struct{}) @@ -893,6 +905,10 @@ type existingSendStreamState struct { indexToSend uint64 } +// infinityEntryIndex is an exclusive upper-bound on the index of an actual +// entry. +const infinityEntryIndex uint64 = math.MaxUint64 + // constructRaftEventForReplica is called iff latestFollowerStateInfo.State is // StateReplicate. // @@ -910,7 +926,7 @@ func constructRaftEventForReplica( logSnapshot raft.LogSnapshot, scratchSendingEntries []entryFCState, ) (_ raftEventForReplica, scratch []entryFCState) { - firstNewEntryIndex, lastNewEntryIndex := uint64(math.MaxUint64), uint64(math.MaxUint64) + firstNewEntryIndex, lastNewEntryIndex := infinityEntryIndex, infinityEntryIndex if n := len(raftEventAppendState.newEntries); n > 0 { firstNewEntryIndex = raftEventAppendState.newEntries[0].id.index lastNewEntryIndex = raftEventAppendState.newEntries[n-1].id.index + 1 @@ -1126,18 +1142,18 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra // the new entries. rs.scratchVoterStreamState = rs.computeReplicaStreamStateRaftMuLocked(ctx, needsTokens) if (rs.scratchVoterStreamState.noSendQ && rs.scratchVoterStreamState.hasSendTokens) || - rs.scratchVoterStreamState.forceFlushing { + rs.scratchVoterStreamState.forceFlushStopIndex.active() { if rs.desc.IsVoterOldConfig() { votersContributingToQuorum[0]++ - if rs.scratchVoterStreamState.forceFlushing && - !rs.scratchVoterStreamState.forceFlushingBecauseLeaseholder { + if rs.scratchVoterStreamState.forceFlushStopIndex.untilInfinity() && + !rs.scratchVoterStreamState.forceFlushBecauseLeaseholder { numOptionalForceFlushes[0]++ } } if numSets > 1 && rs.desc.IsVoterNewConfig() { votersContributingToQuorum[1]++ - if rs.scratchVoterStreamState.forceFlushing && - !rs.scratchVoterStreamState.forceFlushingBecauseLeaseholder { + if rs.scratchVoterStreamState.forceFlushStopIndex.untilInfinity() && + !rs.scratchVoterStreamState.forceFlushBecauseLeaseholder { // We never actually use numOptionalForceFlushes[1]. Just doing this // for symmetry. numOptionalForceFlushes[1]++ @@ -1186,8 +1202,22 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra // there is no adjustment needed to ensure quorum. ss = rs.computeReplicaStreamStateRaftMuLocked(ctx, needsTokens) } + // Make a final adjustment to start force-flushing due to + // rc.forceFlushIndex. We deliberately leave this until the end, since + // quorum or leaseholder requirements may already have ensured that this + // replica must force-flush. + // + // NB: Next is exclusive and the first entry that has not yet been sent. + // And forceFlushIndex is inclusive. Therefore, [Next, forceFlushIndex] + // needs to have been sent for force-flush to not be needed, and we + // check for non-emptiness of the interval below. + if rs.scratchEvent.replicaStateInfo.Next <= rc.forceFlushIndex && + ss.isReplicate && !ss.noSendQ && + (!ss.forceFlushStopIndex.active() || uint64(ss.forceFlushStopIndex) < rc.forceFlushIndex) { + ss.forceFlushStopIndex = forceFlushStopIndex(rc.forceFlushIndex) + } rd = replicaDirective{ - forceFlush: ss.forceFlushing, + forceFlushStopIndex: ss.forceFlushStopIndex, hasSendTokens: ss.hasSendTokens, preventSendQNoForceFlush: ss.preventSendQNoForceFlush, } @@ -1245,7 +1275,13 @@ func (rc *rangeController) computeVoterDirectives( // NB: this also includes probeRecentlyNoSendQ. continue } - if rs.scratchVoterStreamState.forceFlushingBecauseLeaseholder { + if rs.scratchVoterStreamState.forceFlushBecauseLeaseholder { + // No choice in whether to force-flush, so not added to any slices. + continue + } + if rs.scratchVoterStreamState.forceFlushStopIndex.active() && + !rs.scratchVoterStreamState.forceFlushStopIndex.untilInfinity() { + // No choice in whether to force-flush, so not added to any slices. continue } // INVARIANTS: @@ -1265,7 +1301,7 @@ func (rc *rangeController) computeVoterDirectives( bucketedTokensSend: bucketedSendTokens, tokensEval: rs.evalTokenCounter.tokens(admissionpb.ElasticWorkClass), } - if rs.scratchVoterStreamState.forceFlushing { + if rs.scratchVoterStreamState.forceFlushStopIndex.active() { forceFlushingScores = append(forceFlushingScores, score) } else if rs.scratchVoterStreamState.noSendQ { candidateDenySendQScores = append(candidateDenySendQScores, score) @@ -1309,7 +1345,7 @@ func (rc *rangeController) computeVoterDirectives( } // Since there is a single set, this must be a member. rs := rc.replicaMap[forceFlushingScores[i].replicaID] - rs.scratchVoterStreamState.forceFlushing = false + rs.scratchVoterStreamState.forceFlushStopIndex = 0 gap++ } } else if gap > 0 { @@ -1353,7 +1389,8 @@ func (rc *rangeController) computeVoterDirectives( if !isSetMember { continue } - rs.scratchVoterStreamState.forceFlushing = true + + rs.scratchVoterStreamState.forceFlushStopIndex = forceFlushStopIndex(infinityEntryIndex) rs.scratchVoterStreamState.preventSendQNoForceFlush = false gap-- if i == 0 && len(voterSets) > 1 && rs.desc.IsVoterNewConfig() { @@ -1493,6 +1530,11 @@ func (rc *rangeController) SetLeaseholderRaftMuLocked( rc.updateWaiterSetsRaftMuLocked() } +// ForceFlushIndexChangedLocked implements RangeController. +func (rc *rangeController) ForceFlushIndexChangedLocked(ctx context.Context, index uint64) { + rc.forceFlushIndex = index +} + // CloseRaftMuLocked closes the range controller. // // Requires replica.raftMu to be held. @@ -1856,8 +1898,6 @@ func (rc *rangeController) checkConsistencyRaftMuLocked(ctx context.Context) { // replicaState holds state for each replica. All methods are called with // raftMu held, hence it does not have its own mutex. -// -// TODO(sumeer): add mutex held assertions. type replicaState struct { parent *rangeController // stream aggregates across the streams for the same (tenant, store). This @@ -1886,14 +1926,27 @@ type replicaStreamState struct { // The remaining fields serve as output from replicaState and subsequent // input into replicaState. - // forceFlushing is true iff in StateReplicate and there is a send-queue and - // is being force-flushed. When provided as subsequent input, it should be - // interpreted as a directive that *may* change the current behavior, i.e., - // it may be asking the stream to start a force-flush or stop a force-flush. + // forceFlushStopIndex.active() is true iff in StateReplicate and there is a + // send-queue (!noSendQ) and is being force-flushed. When provided as + // subsequent input, it should be interpreted as a directive that *may* + // change the current behavior, i.e., it may be asking the stream to start a + // force-flush or stop a force-flush. + // + // INVARIANT: forceFlushStopIndex.active() => !noSendQ && !hasSendTokens. // - // INVARIANT: forceFlushing => !noSendQ && !hasSendTokens. - forceFlushing bool - forceFlushingBecauseLeaseholder bool + // When forceFlushStopIndex.active() is true and forceFlushStopIndex < + // infinityEntryIndex, the force-flush is being done due to the + // externally provided force-flush index. + // + // INVARIANT: forceFlushBecauseLeaseholder => + // forceFlushStopIndex==infinityEntryIndex. + forceFlushStopIndex forceFlushStopIndex + // A true value is always a directive, that is computed in the first-pass, + // in computeReplicaStreamStateRaftMuLocked. + forceFlushBecauseLeaseholder bool + // indexToSend is the state of the replicaSendStream. It is only populated + // in StateReplicate. + indexToSend uint64 // True only if noSendQ. When interpreted as a directive in subsequent // input, it may have been changed from false to true to prevent formation // of a send-queue. @@ -1911,8 +1964,8 @@ type replicaStreamState struct { // whether it has send tokens or should be force flushing. Only relevant for // pull mode. type replicaDirective struct { - forceFlush bool - hasSendTokens bool + forceFlushStopIndex forceFlushStopIndex + hasSendTokens bool // preventSendQNoForceFlush is only used for observability and debugging. preventSendQNoForceFlush bool } @@ -1936,9 +1989,6 @@ func NewReplicaState( // replicaSendStream maintains state for a replica to which we (typically) are // actively replicating. -// -// TODO(sumeer): assert that raftMu is held on the various methods that say it -// must be held. type replicaSendStream struct { parent *replicaState @@ -2034,23 +2084,23 @@ type replicaSendStream struct { // an index >= nextRaftIndexInitial and >= indexToSend. preciseSizeSum kvflowcontrol.Tokens - // tokenWatcherHandle, deductedForSchedulerTokens, forceFlushScheduled + // tokenWatcherHandle, deductedForSchedulerTokens, forceFlushStopIndex // can only be non-zero when connectedState == replicate, and the // send-queue is non-empty. // // INVARIANTS: // - // forceFlushScheduled => tokenWatcherHandle is zero and + // forceFlushStopIndex.active() => tokenWatcherHandle is zero and // deductedForSchedulerTokens == 0. // // tokenWatcherHandle is non-zero => deductedForSchedulerTokens == 0 and - // !forceFlushScheduled. + // !forceFlushStopIndex.active(). // // It follows from the above that: // // deductedForSchedulerTokens != 0 => tokenWatcherHandle is zero and - // !forceFlushScheduled. - forceFlushScheduled bool + // !forceFlushStopIndex.active() + forceFlushStopIndex forceFlushStopIndex tokenWatcherHandle SendTokenWatcherHandle deductedForSchedulerTokens kvflowcontrol.Tokens @@ -2239,13 +2289,15 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( ) replicaStreamState { rs.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld() if rs.sendStream == nil { + // This is the zero value of replicaStreamState. Listed for readability. return replicaStreamState{ - isReplicate: false, - noSendQ: false, - forceFlushing: false, - forceFlushingBecauseLeaseholder: false, - hasSendTokens: false, - preventSendQNoForceFlush: false, + isReplicate: false, + noSendQ: false, + forceFlushStopIndex: 0, + forceFlushBecauseLeaseholder: false, + indexToSend: 0, + hasSendTokens: false, + preventSendQNoForceFlush: false, } } rss := rs.sendStream @@ -2264,16 +2316,15 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( // for these two situations is more complicated, and we accept the // slight increase in latency when applying this behavior in the latter // situation. - noSendQ: true, - forceFlushing: false, - forceFlushingBecauseLeaseholder: false, - hasSendTokens: true, + noSendQ: true, + hasSendTokens: true, } } vss := replicaStreamState{ isReplicate: true, noSendQ: rss.isEmptySendQueueStreamLocked(), - forceFlushing: rss.mu.sendQueue.forceFlushScheduled, + forceFlushStopIndex: rss.mu.sendQueue.forceFlushStopIndex, + indexToSend: rss.mu.sendQueue.indexToSend, preventSendQNoForceFlush: false, } if rs.desc.ReplicaID == rs.parent.leaseholder { @@ -2283,8 +2334,8 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( } else { // The leaseholder may not be force-flushing yet, but this will start // force-flushing. - vss.forceFlushing = true - vss.forceFlushingBecauseLeaseholder = true + vss.forceFlushStopIndex = forceFlushStopIndex(infinityEntryIndex) + vss.forceFlushBecauseLeaseholder = true } return vss } @@ -2294,7 +2345,7 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( return vss } // Non-leaseholder and non-leader replica. - if vss.noSendQ && !vss.forceFlushing { + if vss.noSendQ { vss.hasSendTokens = true // If tokens are available, that is > 0, we decide we can send all the new // entries. This allows for a burst, but it is too complicated to make a @@ -2441,7 +2492,7 @@ func (rs *replicaState) scheduledRaftMuLocked( rss := rs.sendStream rss.mu.Lock() defer rss.mu.Unlock() - if !rss.mu.sendQueue.forceFlushScheduled && rss.mu.sendQueue.deductedForSchedulerTokens == 0 { + if !rss.mu.sendQueue.forceFlushStopIndex.active() && rss.mu.sendQueue.deductedForSchedulerTokens == 0 { // NB: it is possible mode != rss.mu.mode, and we will ignore the change // here. This is fine in that we will pick up the change in the next // RaftEvent. @@ -2461,7 +2512,7 @@ func (rs *replicaState) scheduledRaftMuLocked( // 4MB. Don't want to hog the scheduler thread for too long. const MaxBytesToSend kvflowcontrol.Tokens = 4 << 20 bytesToSend := MaxBytesToSend - if !rss.mu.sendQueue.forceFlushScheduled && + if !rss.mu.sendQueue.forceFlushStopIndex.active() && rss.mu.sendQueue.deductedForSchedulerTokens < bytesToSend { bytesToSend = rss.mu.sendQueue.deductedForSchedulerTokens } @@ -2512,8 +2563,17 @@ func (rs *replicaState) scheduledRaftMuLocked( return false, true } // Still have a send-queue. + if rss.mu.sendQueue.forceFlushStopIndex.active() && + uint64(rss.mu.sendQueue.forceFlushStopIndex) < rss.mu.sendQueue.indexToSend { + // It is possible that we don't have a quorum with no send-queue and we + // needed to rely on this force-flush until the send-queue was empty. That + // knowledge will become known in the next + // rangeController.handleRaftEvent*, which will happen at the next tick. + // We accept a latency hiccup in this case for now. + rss.mu.sendQueue.forceFlushStopIndex = 0 + } watchForTokens := - !rss.mu.sendQueue.forceFlushScheduled && rss.mu.sendQueue.deductedForSchedulerTokens == 0 + !rss.mu.sendQueue.forceFlushStopIndex.active() && rss.mu.sendQueue.deductedForSchedulerTokens == 0 if watchForTokens { rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) } @@ -2574,25 +2634,28 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld() rss.mu.AssertHeld() wasEmptySendQ := rss.isEmptySendQueueStreamLocked() - rss.tryHandleModeChangeRaftMuAndStreamLocked(ctx, event.mode, wasEmptySendQ, directive.forceFlush) + rss.tryHandleModeChangeRaftMuAndStreamLocked( + ctx, event.mode, wasEmptySendQ, directive.forceFlushStopIndex.active()) if event.mode == MsgAppPull { // MsgAppPull mode (i.e., followers). Populate sendingEntries. n := len(event.sendingEntries) if n != 0 { panic(errors.AssertionFailedf("pull mode must not have sending entries")) } - if directive.forceFlush { - if !rss.mu.sendQueue.forceFlushScheduled { + if directive.forceFlushStopIndex.active() { + if !rss.mu.sendQueue.forceFlushStopIndex.active() { // Must have a send-queue, so sendingEntries should stay empty // (these will be queued). - rss.startForceFlushRaftMuAndStreamLocked(ctx) + rss.startForceFlushRaftMuAndStreamLocked(ctx, directive.forceFlushStopIndex) + } else if rss.mu.sendQueue.forceFlushStopIndex != directive.forceFlushStopIndex { + rss.mu.sendQueue.forceFlushStopIndex = directive.forceFlushStopIndex } } else { - // INVARIANT: !directive.forceFlush. - if rss.mu.sendQueue.forceFlushScheduled { + // INVARIANT: !directive.forceFlushStopIndex.active() + if rss.mu.sendQueue.forceFlushStopIndex.active() { // Must have a send-queue, so sendingEntries should stay empty (these // will be queued). - rss.mu.sendQueue.forceFlushScheduled = false + rss.mu.sendQueue.forceFlushStopIndex = 0 rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Dec(1) rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) if directive.hasSendTokens { @@ -2718,7 +2781,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( // NB: this will not do IO since everything here is in the unstable log // (see raft.LogSnapshot.unstable). slice, err := event.logSnapshot.LogSlice( - event.sendingEntries[0].id.index-1, event.sendingEntries[n-1].id.index, math.MaxInt64) + event.sendingEntries[0].id.index-1, event.sendingEntries[n-1].id.index, infinityEntryIndex) if err != nil { return false, err } @@ -2732,7 +2795,8 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( } hasEmptySendQ := rss.isEmptySendQueueStreamLocked() - if event.mode == MsgAppPull && wasEmptySendQ && !hasEmptySendQ && !rss.mu.sendQueue.forceFlushScheduled { + if event.mode == MsgAppPull && wasEmptySendQ && !hasEmptySendQ && + !rss.mu.sendQueue.forceFlushStopIndex.active() { rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) } // NB: we don't special case to an empty send-queue in push mode, where Raft @@ -2787,11 +2851,13 @@ func (rss *replicaSendStream) tryHandleModeChangeRaftMuAndStreamLocked( } } -func (rss *replicaSendStream) startForceFlushRaftMuAndStreamLocked(ctx context.Context) { +func (rss *replicaSendStream) startForceFlushRaftMuAndStreamLocked( + ctx context.Context, forceFlushStopIndex forceFlushStopIndex, +) { rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld() rss.mu.AssertHeld() rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Inc(1) - rss.mu.sendQueue.forceFlushScheduled = true + rss.mu.sendQueue.forceFlushStopIndex = forceFlushStopIndex rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID) rss.stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked(ctx, false) } @@ -2844,7 +2910,7 @@ func (rss *replicaSendStream) dequeueFromQueueAndSendRaftMuAndStreamLocked( rss.mu.sendQueue.entryTokensApproximator.addStats( approximatedNumEntries, approximatedNumActualTokens) } - if !rss.mu.sendQueue.forceFlushScheduled { + if !rss.mu.sendQueue.forceFlushStopIndex.active() { // Subtract from already deducted tokens. beforeDeductedTokens := rss.mu.sendQueue.deductedForSchedulerTokens rss.mu.sendQueue.deductedForSchedulerTokens -= tokensNeeded @@ -2868,7 +2934,7 @@ func (rss *replicaSendStream) dequeueFromQueueAndSendRaftMuAndStreamLocked( } if tokensNeeded > 0 { flag := AdjNormal - if rss.mu.sendQueue.forceFlushScheduled { + if rss.mu.sendQueue.forceFlushStopIndex.active() { flag = AdjForceFlush } rss.parent.sendTokenCounter.Deduct(ctx, admissionpb.ElasticWorkClass, tokensNeeded, flag) @@ -2911,7 +2977,7 @@ func (rss *replicaSendStream) changeToProbeRaftMuAndStreamLocked( if !rss.isEmptySendQueueStreamLocked() { panic(errors.AssertionFailedf("transitioning to probeRecentlyNoSendQ when have a send-queue")) } - if rss.mu.sendQueue.forceFlushScheduled { + if rss.mu.sendQueue.forceFlushStopIndex.active() { panic(errors.AssertionFailedf("no send-queue but force-flushing")) } if rss.mu.sendQueue.deductedForSchedulerTokens != 0 || @@ -2925,8 +2991,8 @@ func (rss *replicaSendStream) stopAttemptingToEmptySendQueueRaftMuAndStreamLocke ) { rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld() rss.mu.AssertHeld() - if rss.mu.sendQueue.forceFlushScheduled { - rss.mu.sendQueue.forceFlushScheduled = false + if rss.mu.sendQueue.forceFlushStopIndex.active() { + rss.mu.sendQueue.forceFlushStopIndex = 0 rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Dec(1) } rss.stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked(ctx, disconnect) @@ -2976,7 +3042,7 @@ func (rss *replicaSendStream) startAttemptingToEmptySendQueueViaWatcherStreamLoc ctx context.Context, ) { rss.mu.AssertHeld() - if rss.mu.sendQueue.forceFlushScheduled { + if rss.mu.sendQueue.forceFlushStopIndex.active() { panic(errors.AssertionFailedf("already trying to empty send-queue using force-flush")) } if rss.mu.sendQueue.deductedForSchedulerTokens != 0 || @@ -3016,8 +3082,8 @@ func (rss *replicaSendStream) Notify(ctx context.Context) { queueSize = 4096 } flag := AdjNormal - if rss.mu.sendQueue.forceFlushScheduled { - flag = AdjForceFlush + if rss.mu.sendQueue.forceFlushStopIndex.active() { + panic(errors.AssertionFailedf("cannot be force-flushing")) } tokens := rss.parent.sendTokenCounter.TryDeduct(ctx, admissionpb.ElasticWorkClass, queueSize, flag) if tokens == 0 { @@ -3249,3 +3315,20 @@ func (a *entryTokensApproximator) meanTokensPerEntry() kvflowcontrol.Tokens { } return mean } + +// forceFlushStopIndex is the inclusive index to send before force-flush can +// stop. When set to infinityEntryIndex, force-flush must continue until the +// send-queue is empty. The zero value implies no force-flush, even though +// this index is inclusive, since index 0 is never used in CockroachDB's use +// of Raft (see stateloader.RaftInitialLogIndex). +type forceFlushStopIndex uint64 + +// active returns whether the stream is force-flushing. +func (i forceFlushStopIndex) active() bool { + return i != 0 +} + +// untilInfinity implies active. +func (i forceFlushStopIndex) untilInfinity() bool { + return uint64(i) == infinityEntryIndex +} diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 0ad8eb8ff378..e483c53cc29a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -9,7 +9,6 @@ import ( "cmp" "context" "fmt" - "math" "slices" "sort" "strconv" @@ -240,8 +239,12 @@ func (s *testingRCState) sendStreamString(rangeID roachpb.RangeID) string { testRepl.info.Match+1, rss.mu.sendQueue.indexToSend, rss.mu.sendQueue.indexToSend, rss.mu.sendQueue.nextRaftIndex, rss.mu.sendQueue.preciseSizeSum) - if rss.mu.sendQueue.forceFlushScheduled { - fmt.Fprintf(&b, " force-flushing") + if rss.mu.sendQueue.forceFlushStopIndex.active() { + var stopStr string + if !rss.mu.sendQueue.forceFlushStopIndex.untilInfinity() { + stopStr = fmt.Sprintf(" (stop=%d)", rss.mu.sendQueue.forceFlushStopIndex) + } + fmt.Fprintf(&b, " force-flushing%s", stopStr) } if rss.mu.sendQueue.deductedForSchedulerTokens > 0 { fmt.Fprintf(&b, " deducted=%v", rss.mu.sendQueue.deductedForSchedulerTokens) @@ -355,10 +358,11 @@ func (s *testingRCState) getOrInitRange( } init := RangeControllerInitState{ - Term: 1, - ReplicaSet: r.replicas(), - Leaseholder: r.localReplicaID, - NextRaftIndex: r.nextRaftIndex, + Term: 1, + ReplicaSet: r.replicas(), + Leaseholder: r.localReplicaID, + NextRaftIndex: r.nextRaftIndex, + ForceFlushIndex: r.forceFlushIndex, } options.ReplicaMutexAsserter.RaftMu.Lock() testRC.rc = NewRangeController(s.testCtx, options, init) @@ -545,11 +549,12 @@ func (r *testingRCRange) admit(ctx context.Context, storeID roachpb.StoreID, av } type testingRange struct { - rangeID roachpb.RangeID - tenantID roachpb.TenantID - localReplicaID roachpb.ReplicaID - nextRaftIndex uint64 - replicaSet map[roachpb.ReplicaID]testingReplica + rangeID roachpb.RangeID + tenantID roachpb.TenantID + localReplicaID roachpb.ReplicaID + nextRaftIndex uint64 + forceFlushIndex uint64 + replicaSet map[roachpb.ReplicaID]testingReplica } // Used by simulation test. @@ -1196,6 +1201,35 @@ func TestRangeController(t *testing.T) { }() return state.rangeStateString() + case "set_force_flush_index": + var rangeID int + d.ScanArgs(t, "range_id", &rangeID) + var index int + d.ScanArgs(t, "index", &index) + mode := MsgAppPull + if d.HasArg("push-mode") { + mode = MsgAppPush + } + testRC := state.ranges[roachpb.RangeID(rangeID)] + func() { + testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Lock() + defer testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock() + testRC.rc.opts.ReplicaMutexAsserter.ReplicaMu.Lock() + defer testRC.rc.opts.ReplicaMutexAsserter.ReplicaMu.Unlock() + testRC.rc.ForceFlushIndexChangedLocked(ctx, uint64(index)) + }() + // Send an empty raft event in order to trigger any potential changes. + event := testRC.makeRaftEventWithReplicasState() + event.MsgAppMode = mode + func() { + testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Lock() + defer testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock() + require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(ctx, event)) + }() + // Sleep for a bit to allow any timers to fire. + time.Sleep(20 * time.Millisecond) + return state.sendStreamString(roachpb.RangeID(rangeID)) + case "close_rcs": for _, r := range state.ranges { func() { @@ -1313,7 +1347,7 @@ func TestRangeController(t *testing.T) { } else { fromIndex := event.sendingEntryRange[replicaID].fromIndex toIndex := event.sendingEntryRange[replicaID].toIndex - entries, err := testRC.raftLog.Entries(fromIndex, toIndex+1, math.MaxUint64) + entries, err := testRC.raftLog.Entries(fromIndex, toIndex+1, infinityEntryIndex) require.NoError(t, err) msgApp.Entries = entries } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index new file mode 100644 index 000000000000..2eeb748a43f9 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index @@ -0,0 +1,478 @@ +# Initialize a range with three replicas, none of which have send tokens. +init regular_init=0 elastic_init=0 +range_id=1 tenant_id=1 local_replica_id=1 next_raft_index=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=1 + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate next=1 + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate next=1 +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] +t1/s1: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +# Append five entries. Replica 1 has no send tokens, but is not allowed to +# form a send-queue since it is the leader. Replica 3 also has no send tokens, +# but is not allowed to form a send-queue to maintain quorum. +raft_event pull-mode +range_id=1 + entries + term=1 index=1 pri=NormalPri size=6MiB + term=1 index=2 pri=NormalPri size=6MiB + term=1 index=3 pri=NormalPri size=6MiB + term=1 index=4 pri=NormalPri size=6MiB + term=1 index=5 pri=NormalPri size=6MiB +---- +t1/s1: eval reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB + send reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=-30 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s3: eval reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB + send reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,6) precise_q_size=+30 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+30 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 3, lowPri: false entries: [1 2 3 4 5] +++++ + +# Force flush up to index 2. Replica 2 starts force-flushing. +set_force_flush_index range_id=1 index=2 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,6) precise_q_size=+30 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+30 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +schedule-controller-event-count: 1 +scheduled-replicas: 2 + +# Scheduler event. Entry at index 1 is popped from replica 2. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,2) send_queue=[2,6) precise_q_size=+24 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+24 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [1] +++++ +schedule-controller-event-count: 2 +scheduled-replicas: 2 + +# Scheduler event. Entry at index 2 is popped from replica 2, and force-flush +# stops. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,6) precise_q_size=+18 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [2] +++++ +schedule-controller-event-count: 2 + +# Force flush up to index 3. Replica 2 starts force-flushing. +set_force_flush_index range_id=1 index=3 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,6) precise_q_size=+18 MiB force-flushing (stop=3) +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +schedule-controller-event-count: 3 +scheduled-replicas: 2 + +# Scheduler event. Entry at index 3 is popped from replica 2, and force-flush +# stops. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[3,4) send_queue=[4,6) precise_q_size=+12 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [3] +++++ +schedule-controller-event-count: 3 + +# Force flush up to index 3, which is a noop. +set_force_flush_index range_id=1 index=3 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[3,4) send_queue=[4,6) precise_q_size=+12 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +schedule-controller-event-count: 3 + +# Force flush up to index 4. Replica 2 starts force-flushing. +set_force_flush_index range_id=1 index=4 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[3,4) send_queue=[4,6) precise_q_size=+12 MiB force-flushing (stop=4) +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +schedule-controller-event-count: 4 +scheduled-replicas: 2 + +# Transition replica 3 to StateSnapshot. Replica 2 needs to force-flush, but +# since it is already force-flushing, nothing changes. +set_replicas pull-mode +range_id=1 tenant_id=1 local_replica_id=1 next_raft_index=4 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=6 + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate next=4 + store_id=3 replica_id=3 type=VOTER_FULL state=StateSnapshot next=6 +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,4) send_queue=[4,6) precise_q_size=+12 MiB force-flushing (stop=4) +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n3,s3):3: closed +++++ +schedule-controller-event-count: 4 +scheduled-replicas: 2 + +# Scheduler event. Entry at index 4 is popped from replica 2, and force-flush +# stops. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[4,5) send_queue=[5,6) precise_q_size=+6.0 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 +++++ +(n3,s3):3: closed +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [4] +++++ +schedule-controller-event-count: 4 + +# Schedule a raft event. It will get replica 2 to start force-flushing again. +raft_event pull-mode +range_id=1 +---- +t1/s1: eval reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB + send reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=-30 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-24 MiB/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[4,5) send_queue=[5,6) precise_q_size=+6.0 MiB force-flushing +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 +++++ +(n3,s3):3: closed +++++ +schedule-controller-event-count: 5 +scheduled-replicas: 2 + +# Entry 5 is popped and force-flushing stops. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[5,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+0 B ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n3,s3):3: closed +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [5] +++++ +schedule-controller-event-count: 5 + +# Add another entry. Neither replica is allowed to form a send-queue. +raft_event pull-mode +range_id=1 + entries + term=1 index=6 pri=NormalPri size=6MiB +---- +t1/s1: eval reg=-36 MiB/+16 MiB ela=-36 MiB/+8.0 MiB + send reg=-36 MiB/+16 MiB ela=-36 MiB/+8.0 MiB +t1/s2: eval reg=-6.0 MiB/+16 MiB ela=-36 MiB/+8.0 MiB + send reg=-6.0 MiB/+16 MiB ela=-36 MiB/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[6,7) send_queue=[7,7) precise_q_size=+0 B +eval deducted: reg=+36 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 + term=1 index=6 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[6,7) send_queue=[7,7) precise_q_size=+0 B +eval deducted: reg=+6.0 MiB ela=+30 MiB +eval original in send-q: reg=+0 B ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +NormalPri: + term=1 index=6 tokens=6291456 +++++ +(n3,s3):3: closed +++++ +MsgApps sent in pull mode: + to: 2, lowPri: false entries: [6] +++++ +schedule-controller-event-count: 5 diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index_push_pull b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index_push_pull new file mode 100644 index 000000000000..785633082977 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index_push_pull @@ -0,0 +1,349 @@ +# Initialize a range with three replicas, none of which have send tokens. +init regular_init=0 elastic_init=0 +range_id=1 tenant_id=1 local_replica_id=1 next_raft_index=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=1 + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate next=1 + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate next=1 +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] +t1/s1: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +# Replica 1 is the leader so sending [1,1) is ignored and everything is +# considered sent (the leader does not see MsgApps for itself). +raft_event +range_id=1 + entries + term=1 index=1 pri=NormalPri size=6MiB + term=1 index=2 pri=NormalPri size=6MiB + term=1 index=3 pri=NormalPri size=6MiB + sending + replica_id=1 [1,1) + replica_id=2 [1,1) + replica_id=3 [1,1) +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s3: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +# Replicas 2 and 3 have a send-queue which is possible in push mode. +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+18 MiB ela=+0 B +++++ + +# Force flush up to index 1. Still in push-mode so it has no effect. +set_force_flush_index range_id=1 index=1 push-mode +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+18 MiB ela=+0 B +++++ + +# Switch to pull mode. +raft_event pull-mode +range_id=1 +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +# Replica 3 starts force-flushing with no limit, since it was necessary for +# quorum. Replica 2 is force-flushing up to index 1. +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB force-flushing (stop=1) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB force-flushing +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +schedule-controller-event-count: 1 +scheduled-replicas: 2 3 + +# Update the force-flush index to 2. This propagates to replica 2. Also, since +# replica 2 is seen to be force-flushing, the second-pass decision (see code), +# decides replica 3 does not need to force-flush for quorum. So both have a +# stop index of 2. +set_force_flush_index range_id=1 index=2 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +schedule-controller-event-count: 1 +scheduled-replicas: 2 3 + +# Scheduler event. Entry at index 1 is popped from replicas 2, 3. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [1] + to: 3, lowPri: true entries: [1] +++++ +schedule-controller-event-count: 2 +scheduled-replicas: 2 3 + +# Switch back to push mode. Force-flushing stops. +raft_event +range_id=1 +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=-12 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-6.0 MiB/+8.0 MiB +t1/s3: eval reg=-12 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-6.0 MiB/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB +eval deducted: reg=+12 MiB ela=+6.0 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB +eval deducted: reg=+12 MiB ela=+6.0 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +schedule-controller-event-count: 2 +scheduled-replicas: 2 3 + +# Switch back to pull mode. Force-flushing resumes. Replica 3 needs to +# force-flush for quorum, so the stopping point is infinity. +raft_event pull-mode +range_id=1 +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-6.0 MiB/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-6.0 MiB/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB force-flushing +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +schedule-controller-event-count: 2 +scheduled-replicas: 2 3 + +# Scheduler event. Entry at index 2 is popped from replicas 2, 3. Replica 2 +# stops force-flushing. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+6.0 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+6.0 MiB force-flushing +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [2] + to: 3, lowPri: true entries: [2] +++++ +schedule-controller-event-count: 3 +scheduled-replicas: 3 + +# Switch back to push mode and then back to pull mode. Replica 2 should not be +# force-flushing and replica 3 should be force-flushing. +raft_event +range_id=1 +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=-6.0 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-12 MiB/+8.0 MiB +t1/s3: eval reg=-6.0 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-12 MiB/+8.0 MiB + +raft_event pull-mode +range_id=1 +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-12 MiB/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-12 MiB/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+6.0 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+6.0 MiB force-flushing +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +schedule-controller-event-count: 3 +scheduled-replicas: 3 + +# Replica 3 stops force-flushing. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+6.0 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[3,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+0 B ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 3, lowPri: true entries: [3] +++++ +schedule-controller-event-count: 3 diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 10449c5ff2b7..d2fa094d124f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -113,10 +113,11 @@ type ACWorkQueue interface { } type rangeControllerInitState struct { - term uint64 - replicaSet rac2.ReplicaSet - leaseholder roachpb.ReplicaID - nextRaftIndex uint64 + term uint64 + replicaSet rac2.ReplicaSet + leaseholder roachpb.ReplicaID + nextRaftIndex uint64 + forceFlushIndex uint64 // These fields are required options for the RangeController specific to the // replica and range, rather than the store or node, so we pass them as part // of the range controller init state. @@ -202,9 +203,10 @@ type SideChannelInfoUsingRaftMessageRequest struct { // We *strongly* prefer methods to be called without holding Replica.mu, since // then the callee (implementation of Processor) does not need to worry about // (a) deadlocks, since it sometimes needs to lock Replica.mu itself, (b) the -// amount of work it is doing under this critical section. There are three +// amount of work it is doing under this critical section. There are four // exceptions to this, due to difficulty in changing the calling code: -// InitRaftLocked, OnDescChangedLocked, HoldsSendTokensLocked. +// InitRaftLocked, OnDescChangedLocked, ForceFlushIndexChangedLocked, +// HoldsSendTokensLocked. type Processor interface { // InitRaftLocked is called when raft.RawNode is initialized for the // Replica. NB: can be called twice before the Replica is fully initialized. @@ -246,6 +248,14 @@ type Processor interface { OnDescChangedLocked( ctx context.Context, desc *roachpb.RangeDescriptor, tenantID roachpb.TenantID) + // ForceFlushIndexChangedLocked sets the force flush index, i.e., the index + // (inclusive) up to which all replicas with a send-queue must be + // force-flushed in MsgAppPull mode. It may be rarely called with no change + // to the index. + // + // Both Replica.raftMu and Replica.mu are held. + ForceFlushIndexChangedLocked(ctx context.Context, index uint64) + // HandleRaftReadyRaftMuLocked corresponds to processing that happens when // Replica.handleRaftReadyRaftMuLocked is called. It must be called even // if there was no Ready, since it can be used to advance Admitted, and do @@ -403,6 +413,8 @@ type processorImpl struct { // leaseholderID is the currently known leaseholder replica. leaseholderID roachpb.ReplicaID + forceFlushIndex uint64 + // State at a follower. follower struct { // isLeaderUsingV2Protocol is true when the leaderID indicated that it's @@ -596,6 +608,21 @@ func (p *processorImpl) OnDescChangedLocked( } } +// ForceFlushIndexChangedLocked implements Processor. +func (p *processorImpl) ForceFlushIndexChangedLocked(ctx context.Context, index uint64) { + if buildutil.CrdbTestBuild && p.forceFlushIndex > index { + panic(errors.AssertionFailedf("force-flush index decreased from %d to %d", + p.forceFlushIndex, index)) + } + p.forceFlushIndex = index + if p.leader.rc != nil { + p.leader.rc.ForceFlushIndexChangedLocked(ctx, index) + // Schedule ready processing so that the RangeController can act on the + // change. + p.opts.RaftScheduler.EnqueueRaftReady(p.opts.RangeID) + } +} + // makeStateConsistentRaftMuLocked uses the union of the latest state // retrieved from RaftNode and the p.desc.replicas set to initialize or update // the internal state of processorImpl. @@ -720,16 +747,17 @@ func (p *processorImpl) createLeaderStateRaftMuLocked( } p.term = term rc := p.opts.RangeControllerFactory.New(ctx, rangeControllerInitState{ - term: term, - replicaSet: p.desc.replicas, - leaseholder: p.leaseholderID, - nextRaftIndex: nextUnstableIndex, - rangeID: p.opts.RangeID, - tenantID: p.desc.tenantID, - localReplicaID: p.opts.ReplicaID, - raftInterface: p.raftInterface, - msgAppSender: p.opts.MsgAppSender, - muAsserter: p.opts.ReplicaMutexAsserter, + term: term, + replicaSet: p.desc.replicas, + leaseholder: p.leaseholderID, + nextRaftIndex: nextUnstableIndex, + forceFlushIndex: p.forceFlushIndex, + rangeID: p.opts.RangeID, + tenantID: p.desc.tenantID, + localReplicaID: p.opts.ReplicaID, + raftInterface: p.raftInterface, + msgAppSender: p.opts.MsgAppSender, + muAsserter: p.opts.ReplicaMutexAsserter, }) func() { @@ -1232,10 +1260,11 @@ func (f RangeControllerFactoryImpl) New( Knobs: f.knobs, }, rac2.RangeControllerInitState{ - Term: state.term, - ReplicaSet: state.replicaSet, - Leaseholder: state.leaseholder, - NextRaftIndex: state.nextRaftIndex, + Term: state.term, + ReplicaSet: state.replicaSet, + Leaseholder: state.leaseholder, + NextRaftIndex: state.nextRaftIndex, + ForceFlushIndex: state.forceFlushIndex, }, ) } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 40c6d395a6a7..db6127f5f745 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -138,8 +138,9 @@ type testRangeControllerFactory struct { func (f *testRangeControllerFactory) New( ctx context.Context, state rangeControllerInitState, ) rac2.RangeController { - fmt.Fprintf(f.b, " RangeControllerFactory.New(replicaSet=%s, leaseholder=%s, nextRaftIndex=%d)\n", - state.replicaSet, state.leaseholder, state.nextRaftIndex) + fmt.Fprintf(f.b, + " RangeControllerFactory.New(replicaSet=%s, leaseholder=%s, nextRaftIndex=%d, forceFlushIndex=%d)\n", + state.replicaSet, state.leaseholder, state.nextRaftIndex, state.forceFlushIndex) rc := &testRangeController{b: f.b, waited: true} f.rcs = append(f.rcs, rc) return rc @@ -227,6 +228,10 @@ func (c *testRangeController) SetLeaseholderRaftMuLocked( fmt.Fprintf(c.b, " RangeController.SetLeaseholderRaftMuLocked(%s)\n", replica) } +func (c *testRangeController) ForceFlushIndexChangedLocked(ctx context.Context, index uint64) { + fmt.Fprintf(c.b, " RangeController.ForceFlushIndexChangedLocked(%d)\n", index) +} + func (c *testRangeController) CloseRaftMuLocked(ctx context.Context) { fmt.Fprintf(c.b, " RangeController.CloseRaftMuLocked\n") } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 5f28570d93dc..69a39915014c 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -296,7 +296,7 @@ Raft: term: 52 leader: 5 leaseholder: 10 mark: {Term:51 Index:28} next-unstable: handle-raft-ready-and-admit entries=v1/i28/t46/pri0/time2/len100 leader-term=52 ---- HandleRaftReady: - RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=28) + RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=28, forceFlushIndex=0) RangeController.AdmitRaftMuLocked(5, term:52, admitted:[LowPri:26,NormalPri:26,AboveNormalPri:26,HighPri:26]) RangeController.HandleRaftEventRaftMuLocked([28]) ..... @@ -392,7 +392,7 @@ handle-raft-ready-and-admit ---- HandleRaftReady: RangeController.CloseRaftMuLocked - RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=29) + RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=29, forceFlushIndex=0) RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -500,7 +500,7 @@ LogTracker: mark:{Term:50 Index:25}, stable:24, admitted:[24 24 24 24] # RangeController is created. set-enabled-level enabled-level=v1-encoding ---- - RangeControllerFactory.New(replicaSet=[(n11,s11):11,(n13,s13):13], leaseholder=5, nextRaftIndex=26) + RangeControllerFactory.New(replicaSet=[(n11,s11):11,(n13,s13):13], leaseholder=5, nextRaftIndex=26, forceFlushIndex=0) set-raft-state log-term=50 log-index=26 next-unstable-index=27 ---- diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.go b/pkg/kv/kvserver/kvserverpb/proposer_kv.go index 3d494aef3739..fcfb85b23ae9 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.go +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.go @@ -70,5 +70,12 @@ func (r *ReplicatedEvalResult) IsTrivial() bool { allowlist.PrevLeaseProposal = nil allowlist.IsProbe = false // probes are trivial, they always get refused in CheckForcedErr allowlist.State = nil + // DoTimelyApplicationToAllReplicas is trivial since it can be combined in + // an apply.Batch -- this is done by using the Command index in + // apply.Batch.Stage to set the ForceFlushIndex. Different replicas can + // combine different sets of Commands in an apply.Batch, but since the + // Command index that specified DoTimelyApplicationToAllReplicas is the + // same, the state machine will have the same state. + allowlist.DoTimelyApplicationToAllReplicas = false return allowlist.IsZero() } diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index fa9a5aec521d..b72c7852c862 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -263,6 +263,16 @@ message ReplicatedEvalResult { LinkExternalSSTable link_external_sstable = 27 [(gogoproto.customname) = "LinkExternalSSTable"]; + // DoTimelyApplicationToAllReplicas is set to true when this proposal needs + // to be applied on all replicas in a timely manner for various reasons + // (currently for certain proposals during a split or merge or below-raft + // migration). + // + // Must only be set to true when the cluster version is + // V25_1_AddRangeForceFlushKey, since it causes below Raft code to write a + // replicated range-id local key. + bool do_timely_application_to_all_replicas = 28; + reserved 1, 5, 7, 9, 10, 14, 15, 16, 19, 10001 to 10013; } diff --git a/pkg/kv/kvserver/replica_app_batch.go b/pkg/kv/kvserver/replica_app_batch.go index 3758940b9b50..72744ac7ef80 100644 --- a/pkg/kv/kvserver/replica_app_batch.go +++ b/pkg/kv/kvserver/replica_app_batch.go @@ -158,7 +158,9 @@ func (b *replicaAppBatch) Stage( // non-trivial commands will be in their own batch, so delaying their // non-trivial ReplicatedState updates until later (without ever staging // them in the batch) is sufficient. - b.stageTrivialReplicatedEvalResult(ctx, cmd) + if err := b.stageTrivialReplicatedEvalResult(ctx, cmd); err != nil { + return nil, err + } b.ab.numEntriesProcessed++ size := len(cmd.Data) b.ab.numEntriesProcessedBytes += int64(size) @@ -538,7 +540,7 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly( // inspect the command's ReplicatedEvalResult. func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( ctx context.Context, cmd *replicatedCmd, -) { +) error { b.state.RaftAppliedIndex = cmd.Index() b.state.RaftAppliedIndexTerm = kvpb.RaftTerm(cmd.Term) @@ -560,6 +562,19 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( // serialize on the stats key. deltaStats := res.Delta.ToStats() b.state.Stats.Add(deltaStats) + + if res.DoTimelyApplicationToAllReplicas && !b.changeRemovesReplica { + // Update in-memory and persistent state. A later command accumulated in + // this batch may update these again. Also, a later command may set + // changeRemovesReplica to true and wipe out the state in the batch. These + // are all safe. + b.state.ForceFlushIndex = roachpb.ForceFlushIndex{Index: cmd.Entry.Index} + if err := b.r.raftMu.stateLoader.SetForceFlushIndex( + ctx, b.batch, b.state.Stats, &b.state.ForceFlushIndex); err != nil { + return err + } + } + return nil } // ApplyToStateMachine implements the apply.Batch interface. The method handles @@ -630,8 +645,13 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { logcrash.ReportOrPanic(ctx, &b.r.ClusterSettings().SV, "%v", err) } r.mu.closedTimestampSetter = b.closedTimestampSetter - closedTimestampUpdated := r.shMu.state.RaftClosedTimestamp.Forward(b.state.RaftClosedTimestamp) + + if b.state.ForceFlushIndex != r.shMu.state.ForceFlushIndex { + r.shMu.state.ForceFlushIndex = b.state.ForceFlushIndex + r.flowControlV2.ForceFlushIndexChangedLocked(ctx, b.state.ForceFlushIndex.Index) + } + prevStats := *r.shMu.state.Stats *r.shMu.state.Stats = *b.state.Stats diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 998f7258d3a5..e85dbb953c8f 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -58,6 +58,7 @@ func clearTrivialReplicatedEvalResultFields(r *kvserverpb.ReplicatedEvalResult) r.Delta = enginepb.MVCCStatsDelta{} // Rangefeeds have been disconnected prior to application. r.MVCCHistoryMutation = nil + r.DoTimelyApplicationToAllReplicas = false } // prepareLocalResult is performed after the command has been committed to the diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index c5744bf238a4..5a1944b06476 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -966,6 +966,16 @@ func (r *Replica) AdminMerge( } } +// waitForApplication is waiting for application at all replicas (voters or +// non-voters). This is an outlier in that the system is typically expected to +// function with only a quorum of voters being available. So it should be used +// extremely sparingly. +// +// IMPORTANT: if adding a call to this method, ensure that whatever command is +// needing this behavior sets +// ReplicatedEvalResult.DoTimelyApplicationToAllReplicas. That ensures that +// replication flow control will not arbitrarily delay application on a +// replica by maintaining a non-empty send-queue. func waitForApplication( ctx context.Context, dialer *nodedialer.Dialer, diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index b844415cde96..53eea2dcb115 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -285,6 +285,9 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(s kvstorage.LoadedReplicaState r.setStartKeyLocked(desc.StartKey) r.shMu.state = s.ReplState + if r.shMu.state.ForceFlushIndex != (roachpb.ForceFlushIndex{}) { + r.flowControlV2.ForceFlushIndexChangedLocked(context.TODO(), r.shMu.state.ForceFlushIndex.Index) + } r.shMu.lastIndexNotDurable = s.LastIndex r.shMu.lastTermNotDurable = invalidLastTerm diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 7c5283e8c2bc..4d45b50a642c 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -749,6 +749,9 @@ func (r *Replica) applySnapshot( // by r.leasePostApply, but we called those above, so now it's safe to // wholesale replace r.mu.state. r.shMu.state = state + if r.shMu.state.ForceFlushIndex != (roachpb.ForceFlushIndex{}) { + r.flowControlV2.ForceFlushIndexChangedLocked(ctx, r.shMu.state.ForceFlushIndex.Index) + } // Snapshots typically have fewer log entries than the leaseholder. The next // time we hold the lease, recompute the log size before making decisions. r.shMu.raftLogSizeTrusted = false diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_split_merge b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_split_merge new file mode 100644 index 000000000000..f0444f81fceb --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_split_merge @@ -0,0 +1,249 @@ +echo +---- +---- +-- We will exhaust the tokens across all streams while admission is blocked on +-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then, +-- we will write a 1 MiB put to the range, split it, write a 1 MiB put to the +-- RHS range, merge the ranges, and write a 1 MiB put to the merged range. We +-- expect that at each stage where a send queue develops n1->s3, the send queue +-- will be flushed by the range merge and range split range operations. + + +-- Start by exhausting the tokens from n1->s3 and blocking admission on s3. +-- (Issuing 4x1MiB regular, 3x replicated write that's not admitted on s3.) + + +(Sending 1 MiB put request to pre-split range) + + +(Sent 1 MiB put request to pre-split range) + + +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B + + +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 74 | 1 | 0 B + 74 | 2 | 0 B + 74 | 3 | 4.0 MiB + + +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -3.0 MiB | 0 B | -2.0 MiB + + +-- (Splitting range.) + + +-- Observe the newly split off replica, with its own three streams. +SELECT range_id, count(*) AS streams + FROM crdb_internal.kv_flow_control_handles_v2 +GROUP BY (range_id) +ORDER BY streams DESC; + + range_id | stream_count +-----------+--------------- + 74 | 3 + 75 | 3 + + +-- Send queue and flow token metrics from n1, post-split. +-- We expect to see a force flush of the send queue for s3. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 1.0 MiB + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -3.0 MiB | 0 B | -3.0 MiB + + +(Sending 1 MiB put request to post-split RHS range) + + +(Sent 1 MiB put request to post-split RHS range) + + +-- Send queue and flow token metrics from n1, post-split and 1 MiB put. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 1.0 MiB + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -4.0 MiB | 0 B | -3.0 MiB + + +-- (Merging ranges.) + + +-- Send queue and flow token metrics from n1, post-split-merge. +-- We expect to see a force flush of the send queue for s3 again. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 2.0 MiB + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -4.0 MiB | 0 B | -4.0 MiB + + +(Sending 1 MiB put request to post-split-merge range) + + +(Sent 1 MiB put request to post-split-merge range) + + +-- Send queue and flow token metrics from n1, post-split-merge and 1 MiB put. +-- We expect to see the send queue develop for s3 again. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 2.0 MiB + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -5.0 MiB | 0 B | -4.0 MiB + + +-- (Allowing below-raft admission to proceed on [n1,n2,n3].) + + +-- Send queue and flow token metrics from n1, all tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 2.0 MiB + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB +---- +---- + +# vim:ft=sql