Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: [dnm] add TestFlowControlSendQueueRangeSplitMerge test (picked) #136495

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -401,4 +401,4 @@ trace.span_registry.enabled boolean false if set, ongoing traces can be seen at
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. application
ui.database_locality_metadata.enabled boolean true if enabled shows extended locality data about databases and tables in DB Console which can be expensive to compute application
ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application
version version 1000024.3-upgrading-to-1000025.1-step-004 set the active cluster version in the format '<major>.<minor>' application
version version 1000024.3-upgrading-to-1000025.1-step-006 set the active cluster version in the format '<major>.<minor>' application
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,6 @@
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-database-locality-metadata-enabled" class="anchored"><code>ui.database_locality_metadata.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled shows extended locality data about databases and tables in DB Console which can be expensive to compute</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-display-timezone" class="anchored"><code>ui.display_timezone</code></div></td><td>enumeration</td><td><code>etc/utc</code></td><td>the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000024.3-upgrading-to-1000025.1-step-004</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000024.3-upgrading-to-1000025.1-step-006</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
7 changes: 6 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ const (
// V25_1_AddJobsTables added new jobs tables.
V25_1_AddJobsTables

// V25_1_AddRangeForceFlushKey adds the RangeForceFlushKey, a replicated
// range-ID local key, which is written below raft.
V25_1_AddRangeForceFlushKey

// *************************************************
// Step (1) Add new versions above this comment.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -293,7 +297,8 @@ var versionTable = [numKeys]roachpb.Version{
// v25.1 versions. Internal versions must be even.
V25_1_Start: {Major: 24, Minor: 3, Internal: 2},

V25_1_AddJobsTables: {Major: 24, Minor: 3, Internal: 4},
V25_1_AddJobsTables: {Major: 24, Minor: 3, Internal: 4},
V25_1_AddRangeForceFlushKey: {Major: 24, Minor: 3, Internal: 6},

// *************************************************
// Step (2): Add new versions above this comment.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/build",
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/abortspan",
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
Expand Down Expand Up @@ -1363,6 +1364,14 @@ func splitTriggerHelper(
// hand side range (i.e. it goes from zero to its stats).
RHSDelta: *h.AbsPostSplitRight(),
}
// Set DoTimelyApplicationToAllReplicas since splits that are not applied on
// all replicas eventually cause snapshots for the RHS to be sent to
// replicas that already have the unsplit range, *and* these snapshots are
// rejected (which is very wasteful). See the long comment in
// split_delay_helper.go for more details.
if rec.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) {
pd.Replicated.DoTimelyApplicationToAllReplicas = true
}

pd.Local.Metrics = &result.Metrics{
SplitsWithEstimatedStats: h.splitsWithEstimates,
Expand Down Expand Up @@ -1459,6 +1468,15 @@ func mergeTrigger(
pd.Replicated.Merge = &kvserverpb.Merge{
MergeTrigger: *merge,
}
// Set DoTimelyApplicationToAllReplicas so that merges are applied on all
// replicas. This is not technically necessary since even though
// Replica.AdminMerge calls waitForApplication, that call happens earlier in
// the merge distributed txn, when sending a kvpb.SubsumeRequest. But since
// we have force-flushed once during the merge txn anyway, we choose to
// complete the merge story and finish the merge on all replicas.
if rec.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) {
pd.Replicated.DoTimelyApplicationToAllReplicas = true
}

{
// If we have GC hints populated that means we are trying to perform
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
Expand Down Expand Up @@ -81,6 +82,13 @@ func Migrate(
// as all below-raft migrations (the only users of Migrate) were introduced
// after it.
pd.Replicated.State.Version = &migrationVersion
// Set DoTimelyApplicationToAllReplicas so that migrates are applied on all
// replicas. This is done since MigrateRequests trigger a call to
// waitForApplication (see Replica.executeWriteBatch).
if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) {
pd.Replicated.DoTimelyApplicationToAllReplicas = true
}

return pd, nil
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
Expand Down Expand Up @@ -156,5 +157,12 @@ func Subsume(
reply.ReadSummary = &priorReadSum
reply.ClosedTimestamp = cArgs.EvalCtx.GetCurrentClosedTimestamp(ctx)

return result.Result{}, nil
var pd result.Result
// Set DoTimelyApplicationToAllReplicas so that merges are applied on all
// replicas. This is needed since Replica.AdminMerge calls
// waitForApplication when sending a kvpb.SubsumeRequest.
if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) {
pd.Replicated.DoTimelyApplicationToAllReplicas = true
}
return pd, nil
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ func (p *Result) MergeAndDestroy(q Result) error {
}
q.Replicated.IsProbe = false

if q.Replicated.DoTimelyApplicationToAllReplicas {
p.Replicated.DoTimelyApplicationToAllReplicas = true
}
q.Replicated.DoTimelyApplicationToAllReplicas = false

if p.Local.EncounteredIntents == nil {
p.Local.EncounteredIntents = q.Local.EncounteredIntents
} else {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/batcheval/result/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,10 @@ func TestMergeAndDestroy(t *testing.T) {
ForceFlushIndex: roachpb.ForceFlushIndex{Index: 3},
}
require.ErrorContains(t, r0.MergeAndDestroy(r3), "must not specify ForceFlushIndex")

var r4 Result
r4.Replicated.DoTimelyApplicationToAllReplicas = true
require.False(t, r0.Replicated.DoTimelyApplicationToAllReplicas)
require.NoError(t, r0.MergeAndDestroy(r4))
require.True(t, r0.Replicated.DoTimelyApplicationToAllReplicas)
}
187 changes: 187 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5460,6 +5460,193 @@ func TestFlowControlSendQueueRangeRelocate(t *testing.T) {
})
}

// TestFlowControlSendQueueRangeSplitMerge exercises the send queue formation,
// prevention and force flushing due to range split and merge operations. See
// the initial comment for an overview of the test structure.
func TestFlowControlSendQueueRangeSplitMerge(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)

disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_range_split_merge")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID + 1)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`
-- We will exhaust the tokens across all streams while admission is blocked on
-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then,
-- we will write a 1 MiB put to the range, split it, write a 1 MiB put to the
-- RHS range, merge the ranges, and write a 1 MiB put to the merged range. We
-- expect that at each stage where a send queue develops n1->s3, the send queue
-- will be flushed by the range merge and range split range operations.`)
h.comment(`
-- Start by exhausting the tokens from n1->s3 and blocking admission on s3.
-- (Issuing 4x1MiB regular, 3x replicated write that's not admitted on s3.)`)
setTokenReturnEnabled(true /* enabled */, 0, 1)
setTokenReturnEnabled(false /* enabled */, 2)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to pre-split range)`)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to pre-split range)`)

h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Splitting range.)`)
left, right := tc.SplitRangeOrFatal(t, k.Next())
h.waitForConnectedStreams(ctx, left.RangeID, 3, 0 /* serverIdx */)
h.waitForConnectedStreams(ctx, right.RangeID, 3, 0 /* serverIdx */)

h.comment(`-- Observe the newly split off replica, with its own three streams.`)
h.query(n1, `
SELECT range_id, count(*) AS streams
FROM crdb_internal.kv_flow_control_handles_v2
GROUP BY (range_id)
ORDER BY streams DESC;
`, "range_id", "stream_count")
h.comment(`
-- Send queue and flow token metrics from n1, post-split.
-- We expect to see a force flush of the send queue for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`(Sending 1 MiB put request to post-split RHS range)`)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to post-split RHS range)`)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))

h.comment(`
-- Send queue and flow token metrics from n1, post-split and 1 MiB put.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Merging ranges.)`)
merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey())
h.waitForConnectedStreams(ctx, merged.RangeID, 3, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, merged.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue and flow token metrics from n1, post-split-merge.
-- We expect to see a force flush of the send queue for s3 again.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`(Sending 1 MiB put request to post-split-merge range)`)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to post-split-merge range)`)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, merged.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue and flow token metrics from n1, post-split-merge and 1 MiB put.
-- We expect to see the send queue develop for s3 again.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Allowing below-raft admission to proceed on [n1,n2,n3].)`)
setTokenReturnEnabled(true /* enabled */, 0, 1, 2)

h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.comment(`
-- Send queue and flow token metrics from n1, all tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
Loading