Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add TestFlowControlSendQueueRangeSplitMerge test #136258

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5460,6 +5460,200 @@ func TestFlowControlSendQueueRangeRelocate(t *testing.T) {
})
}

// TestFlowControlSendQueueRangeSplitMerge exercises the send queue formation,
// prevention and force flushing due to range split and merge operations. See
// the initial comment for an overview of the test structure.
func TestFlowControlSendQueueRangeSplitMerge(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)

disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_range_split_merge")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID + 1)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`
-- We will exhaust the tokens across all streams while admission is blocked on
-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then,
-- we will write a 1 MiB put to the range, split it, write a 1 MiB put to the
-- RHS range, merge the ranges, and write a 1 MiB put to the merged range. We
-- expect that at each stage where a send queue develops n1->s3, the send queue
-- will be flushed by the range merge and range split range operations.`)
h.comment(`
-- Start by exhausting the tokens from n1->s3 and blocking admission on s3.
-- (Issuing 4x1MiB regular, 3x replicated write that's not admitted on s3.)`)
setTokenReturnEnabled(true /* enabled */, 0, 1)
setTokenReturnEnabled(false /* enabled */, 2)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to pre-split range)`)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to pre-split range)`)

h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Splitting range.)`)
left, right := tc.SplitRangeOrFatal(t, k.Next())
h.waitForConnectedStreams(ctx, left.RangeID, 3, 0 /* serverIdx */)
h.waitForConnectedStreams(ctx, right.RangeID, 3, 0 /* serverIdx */)

h.comment(`-- Observe the newly split off replica, with its own three streams.`)
h.query(n1, `
SELECT range_id, count(*) AS streams
FROM crdb_internal.kv_flow_control_handles_v2
GROUP BY (range_id)
ORDER BY streams DESC;
`, "range_id", "stream_count")
h.comment(`
-- Send queue and flow token metrics from n1, post-split.
-- We expect to see a force flush of the send queue for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`(Sending 1 MiB put request to post-split LHS range)`)
h.put(ctx, roachpb.Key(left.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to post-split LHS range)`)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))

h.comment(`(Sending 1 MiB put request to post-split RHS range)`)
h.put(ctx, roachpb.Key(right.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to post-split RHS range)`)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))

h.comment(`
-- Send queue and flow token metrics from n1, post-split and 1 MiB put on
-- each side.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Merging ranges.)`)
merged := tc.MergeRangesOrFatal(t, left.StartKey.AsRawKey())
h.waitForConnectedStreams(ctx, merged.RangeID, 3, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, merged.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue and flow token metrics from n1, post-split-merge.
-- We expect to see a force flush of the send queue for s3 again.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`(Sending 1 MiB put request to post-split-merge range)`)
h.put(ctx, k, 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request to post-split-merge range)`)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, merged.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue and flow token metrics from n1, post-split-merge and 1 MiB put.
-- We expect to see the send queue develop for s3 again.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

h.comment(`-- (Allowing below-raft admission to proceed on [n1,n2,n3].)`)
setTokenReturnEnabled(true /* enabled */, 0, 1, 2)

h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.comment(`
-- Send queue and flow token metrics from n1, all tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ option go_package = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcont

import "gogoproto/gogo.proto";
import "roachpb/data.proto";
import "roachpb/metadata.proto";
import "kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto";

// ControllerRequest is used to inspect the state of the node-level
Expand Down Expand Up @@ -65,6 +66,9 @@ message Handle {
(gogoproto.customname) = "RangeID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"];
repeated ConnectedStream connected_streams = 2 [(gogoproto.nullable) = false];
// ForceFlushIndex is an index up to (and including) which the
// rangeController running in pull mode must force-flush all send streams.
roachpb.ForceFlushIndex force_flush_index = 3 [(gogoproto.nullable) = false];
}

// ConnectedStream represents the in-memory state of a connected stream,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//jsonpb",
],
)

Expand Down
45 changes: 45 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,47 @@ type RaftEvent struct {
ReplicasStateInfo map[roachpb.ReplicaID]ReplicaStateInfo
}

func (re RaftEvent) String() string {
return redact.StringWithoutMarkers(re)
}

func (re RaftEvent) SafeFormat(w redact.SafePrinter, _ rune) {
entryString := func(entries []raftpb.Entry) string {
var b strings.Builder
for i, e := range entries {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(fmt.Sprintf("(index=%d:term=%d:type=%v)",
e.Index, e.Term, e.Type))
}
return b.String()
}

msgString := func(msgs []raftpb.Message) string {
var b strings.Builder
for i, msg := range msgs {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(fmt.Sprintf("(%v->%v term=%v index=%v lead=%v type=%v)",
msg.From, msg.To, msg.Term, msg.Index, msg.Commit, msg.Type))
}
return b.String()
}

msgAppString := func(msgApps map[roachpb.ReplicaID][]raftpb.Message) string {
var b strings.Builder
for replicaID, msgs := range msgApps {
b.WriteString(fmt.Sprintf("%v: %v", replicaID, msgString(msgs)))
}
return b.String()
}

w.Printf("msgAppMode=%v term=%v snap=%v entries=[%v] msgApps=%v replicasStateInfo=%v",
re.MsgAppMode, re.Term, re.Snap, entryString(re.Entries), msgAppString(re.MsgApps), re.ReplicasStateInfo)
}

// Scheduler abstracts the raftScheduler to allow the RangeController to
// schedule its own internal processing. This internal processing is to pop
// some entries from the send queue and send them in a MsgApp.
Expand Down Expand Up @@ -1064,6 +1105,8 @@ func constructRaftEventForReplica(
//
// Requires replica.raftMu to be held.
func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error {
log.Infof(ctx, "%v: HandleRaftEventRaftMuLocked: event=%v ff=%v",
rc.opts.RangeID, e, rc.forceFlushIndex)
// NB: e.Term can be empty when the RaftEvent was not constructed using a
// MsgStorageAppend. Hence, the assertion is gated on the conditions that
// ensure e.Term was initialized.
Expand Down Expand Up @@ -1537,6 +1580,7 @@ func (rc *rangeController) SetLeaseholderRaftMuLocked(
func (rc *rangeController) ForceFlushIndexChangedLocked(ctx context.Context, index uint64) {
rc.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
rc.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld()
log.Infof(ctx, "r%v force flush index changed from %d to %d", rc.opts.RangeID, rc.forceFlushIndex, index)
rc.forceFlushIndex = index
}

Expand Down Expand Up @@ -1614,6 +1658,7 @@ func (rc *rangeController) InspectRaftMuLocked(ctx context.Context) kvflowinspec
return kvflowinspectpb.Handle{
RangeID: rc.opts.RangeID,
ConnectedStreams: streams,
ForceFlushIndex: roachpb.ForceFlushIndex{rc.forceFlushIndex},
}
}

Expand Down
Loading
Loading