Skip to content

Commit

Permalink
kvserver: fix flaky flow token return tests
Browse files Browse the repository at this point in the history
Epic: none
Release note: none
  • Loading branch information
pav-kv committed Oct 10, 2024
1 parent a82fea1 commit dc46773
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 197 deletions.
128 changes: 65 additions & 63 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -3282,10 +3284,10 @@ func TestFlowControlClassPrioritizationV2(t *testing.T) {
})
}

// TestFlowControlQuiescedRangeV2 tests flow token behavior when ranges are
// quiesced. It ensures that we have timely returns of flow tokens even when
// there's no raft traffic to piggyback token returns on top of.
func TestFlowControlQuiescedRangeV2(t *testing.T) {
// TestFlowControlTokenReturnsPiggybackedV2 tests that flow tokens are returned
// by the piggybacking and the fallback dispatch mechanisms. It also ensures
// that the range does not quiesce until all deducted flow tokens are returned.
func TestFlowControlTokenReturnsPiggybackedV2(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -3295,8 +3297,10 @@ func TestFlowControlQuiescedRangeV2(t *testing.T) {
}, func(t *testing.T, v2EnabledWhenLeaderLevel kvflowcontrol.V2EnabledWhenLeaderLevel) {
ctx := context.Background()
var disableWorkQueueGranting atomic.Bool
var disablePiggybackTokenDispatch atomic.Bool
var disableFallbackTokenDispatch atomic.Bool
disableWorkQueueGranting.Store(true)
disablePiggybackTokenDispatch.Store(true)
disableFallbackTokenDispatch.Store(true)

settings := cluster.MakeTestingClusterSettings()
Expand Down Expand Up @@ -3339,8 +3343,7 @@ func TestFlowControlQuiescedRangeV2(t *testing.T) {
return disableFallbackTokenDispatch.Load()
},
DisablePiggyBackedFlowTokenDispatch: func() bool {
// We'll only test using the fallback token mechanism.
return true
return disablePiggybackTokenDispatch.Load()
},
},
},
Expand Down Expand Up @@ -3376,32 +3379,34 @@ func TestFlowControlQuiescedRangeV2(t *testing.T) {
require.NotNil(t, leader)
require.False(t, leader.IsQuiescent())

h.dropAdmissionsFromRaftMessages(desc.RangeID)
h.comment(`
-- (Allow below-raft admission to proceed. We've disabled the fallback token
-- dispatch mechanism so no tokens are returned yet -- quiesced ranges don't
-- use the piggy-backed token return mechanism since there's no raft traffic.)`)
-- (Allow below-raft admission to proceed, and enable piggybacking. All tokens
-- are returned via the piggybacking mechanism.)`)
disablePiggybackTokenDispatch.Store(false)
disableWorkQueueGranting.Store(false)

h.comment(`
-- Flow token metrics from n1 after work gets admitted but fallback token
-- dispatch mechanism is disabled. Deducted elastic tokens from remote stores
-- are yet to be returned. Tokens for the local store are.
`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 2<<20 /* 2*1MiB=2MiB */, 0 /* serverIdx */)
testutils.SucceedsSoon(t, func() error {
return h.checkAllTokensReturned(ctx, 3, 0 /* serverIdx */)
})
h.query(n1, v2FlowTokensQueryStr)

h.comment(`-- (Enable the fallback token dispatch mechanism.)`)
disableFallbackTokenDispatch.Store(false)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.comment(`-- (Issuing another 1x1MiB write, 3x replicated. Not admitted.)`)
disableWorkQueueGranting.Store(true)
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri)
h.query(n1, v2FlowTokensQueryStr)

h.comment(`
-- Flow token metrics from n1 after work gets admitted and all elastic tokens
-- are returned through the fallback mechanism.
`)
-- (Allow below-raft admission to proceed, and only enable the fallback dispatch
-- mechanism. All tokens are returned via the fallback mechanism.)`)
disablePiggybackTokenDispatch.Store(true)
disableFallbackTokenDispatch.Store(false)
disableWorkQueueGranting.Store(false)
testutils.SucceedsSoon(t, func() error {
return h.checkAllTokensReturned(ctx, 3, 0 /* serverIdx */)
})
h.query(n1, v2FlowTokensQueryStr)

// The range eventually quiesces because all the tokens have been returned.
h.comment(`-- (Wait for range to quiesce.)`)
h.comment(`-- (Now the range can quiesce. Wait for it.)`)
testutils.SucceedsSoon(t, func() error {
if !leader.IsQuiescent() {
return errors.Errorf("%s not quiescent", leader)
Expand All @@ -3411,11 +3416,11 @@ func TestFlowControlQuiescedRangeV2(t *testing.T) {
})
}

// TestFlowControlUnquiescedRangeV2 tests flow token behavior when ranges are
// unquiesced. It's a sort of roundabout test to ensure that flow tokens are
// returned through the raft transport piggybacking mechanism, piggybacking on
// raft heartbeats.
func TestFlowControlUnquiescedRangeV2(t *testing.T) {
// TestFlowControlTokenReturnsV2 tests that flow tokens are reliably returned
// via the normal flow of MsgApp and MsgAppResp messages, with MsgApp pings if
// the admissions are lagging. It also ensures that the range does not quiesce
// until all deducted flow tokens are returned.
func TestFlowControlTokenReturnsV2(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -3425,9 +3430,7 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) {
}, func(t *testing.T, v2EnabledWhenLeaderLevel kvflowcontrol.V2EnabledWhenLeaderLevel) {
ctx := context.Background()
var disableWorkQueueGranting atomic.Bool
var disablePiggybackTokenDispatch atomic.Bool
disableWorkQueueGranting.Store(true)
disablePiggybackTokenDispatch.Store(true)

settings := cluster.MakeTestingClusterSettings()
// Override metamorphism to allow range quiescence.
Expand Down Expand Up @@ -3466,13 +3469,10 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) {
},
},
RaftTransport: &kvserver.RaftTransportTestingKnobs{
DisableFallbackFlowTokenDispatch: func() bool {
// We'll only test using the piggy-back token mechanism.
return true
},
DisablePiggyBackedFlowTokenDispatch: func() bool {
return disablePiggybackTokenDispatch.Load()
},
// Test only the MsgApp / MsgAppResp flow, with the piggybacked
// token return channel fully disabled.
DisableFallbackFlowTokenDispatch: func() bool { return true },
DisablePiggyBackedFlowTokenDispatch: func() bool { return true },
},
},
},
Expand Down Expand Up @@ -3508,37 +3508,16 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) {
require.False(t, leader.IsQuiescent())

h.comment(`
-- (Allow below-raft admission to proceed. We've disabled the fallback token
-- dispatch mechanism so no tokens are returned yet -- quiesced ranges don't
-- use the piggy-backed token return mechanism since there's no raft traffic.)`)
-- (Allow below-raft admission to proceed. We've disabled the piggybacked token
-- return mechanism so no tokens are returned via this path. But the tokens will
-- be returned anyway because the range is not quiesced and keeps pinging.)`)
disableWorkQueueGranting.Store(false)

h.comment(`
-- Flow token metrics from n1 after work gets admitted but fallback token
-- dispatch mechanism is disabled. Deducted elastic tokens from remote stores
-- are yet to be returned. Tokens for the local store are.
`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 2<<20 /* 2*1MiB=2MiB */, 0 /* serverIdx */)
h.query(n1, v2FlowTokensQueryStr)

h.comment(`-- (Enable the piggyback token dispatch mechanism.)`)
disablePiggybackTokenDispatch.Store(false)

h.comment(`-- (Unquiesce the range.)`)
testutils.SucceedsSoon(t, func() error {
_, err := tc.GetRaftLeader(t, roachpb.RKey(k)).MaybeUnquiesceAndPropose()
require.NoError(t, err)
return h.checkAllTokensReturned(ctx, 3, 0 /* serverIdx */)
})

h.comment(`
-- Flow token metrics from n1 after work gets admitted and all elastic tokens
-- are returned through the piggyback mechanism.
`)
h.query(n1, v2FlowTokensQueryStr)

// The range eventually quiesces because all the tokens have been returned.
h.comment(`-- (Wait for range to quiesce.)`)
h.comment(`-- (Now the range can quiesce. Wait for it.)`)
testutils.SucceedsSoon(t, func() error {
if !leader.IsQuiescent() {
return errors.Errorf("%s not quiescent", leader)
Expand Down Expand Up @@ -4800,6 +4779,29 @@ func (h *flowControlTestHelper) enableVerboseRaftMsgLoggingForRange(rangeID roac
}
}

// dropAdmissionInRaftMessages installs a raft handler which wipes admitted
// vectors from all raft carrying one, for the given range. Can be used to
// exercise reliability of the admitted vector piggybacking mechanism.
func (h *flowControlTestHelper) dropAdmissionsFromRaftMessages(rangeID roachpb.RangeID) {
for i := 0; i < len(h.tc.Servers); i++ {
si, err := h.tc.Server(i).GetStores().(*kvserver.Stores).GetStore(h.tc.Server(i).GetFirstStoreID())
require.NoError(h.t, err)
h.tc.Servers[i].RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(si.StoreID(),
&unreliableRaftHandler{
rangeID: rangeID,
IncomingRaftMessageHandler: si,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
req.AdmittedState = kvflowcontrolpb.AdmittedState{}
return false
},
dropHB: func(*kvserverpb.RaftHeartbeat) bool { return false },
dropResp: func(*kvserverpb.RaftMessageResponse) bool { return false },
},
})
}
}

// makeV2EnabledTestFileName is a utility function which returns an updated
// filename for the testdata file based on the v2EnabledWhenLeaderLevel.
func makeV2EnabledTestFileName(
Expand Down
62 changes: 42 additions & 20 deletions pkg/kv/kvserver/testdata/flow_control_integration_v2/quiesced_range
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,28 @@ ORDER BY name ASC;
kvflowcontrol.tokens.send.regular.unaccounted | 0 B


-- (Allow below-raft admission to proceed. We've disabled the fallback token
-- dispatch mechanism so no tokens are returned yet -- quiesced ranges don't
-- use the piggy-backed token return mechanism since there's no raft traffic.)


-- Flow token metrics from n1 after work gets admitted but fallback token
-- dispatch mechanism is disabled. Deducted elastic tokens from remote stores
-- are yet to be returned. Tokens for the local store are.
-- (Allow below-raft admission to proceed, and enable piggybacking. All tokens
-- are returned via the piggybacking mechanism.)
SELECT name, crdb_internal.humanize_bytes(value::INT8)
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvflowcontrol%tokens%'
ORDER BY name ASC;

kvflowcontrol.tokens.eval.elastic.available | 22 MiB
kvflowcontrol.tokens.eval.elastic.available | 24 MiB
kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB
kvflowcontrol.tokens.eval.elastic.returned | 1.0 MiB
kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB
kvflowcontrol.tokens.eval.elastic.returned.disconnect | 0 B
kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B
kvflowcontrol.tokens.eval.regular.available | 48 MiB
kvflowcontrol.tokens.eval.regular.deducted | 0 B
kvflowcontrol.tokens.eval.regular.returned | 0 B
kvflowcontrol.tokens.eval.regular.returned.disconnect | 0 B
kvflowcontrol.tokens.eval.regular.unaccounted | 0 B
kvflowcontrol.tokens.send.elastic.available | 22 MiB
kvflowcontrol.tokens.send.elastic.available | 24 MiB
kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB
kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B
kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B
kvflowcontrol.tokens.send.elastic.returned | 1.0 MiB
kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB
kvflowcontrol.tokens.send.elastic.returned.disconnect | 0 B
kvflowcontrol.tokens.send.elastic.unaccounted | 0 B
kvflowcontrol.tokens.send.regular.available | 48 MiB
Expand All @@ -75,19 +69,47 @@ ORDER BY name ASC;
kvflowcontrol.tokens.send.regular.unaccounted | 0 B


-- (Enable the fallback token dispatch mechanism.)
-- (Issuing another 1x1MiB write, 3x replicated. Not admitted.)
SELECT name, crdb_internal.humanize_bytes(value::INT8)
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvflowcontrol%tokens%'
ORDER BY name ASC;

kvflowcontrol.tokens.eval.elastic.available | 21 MiB
kvflowcontrol.tokens.eval.elastic.deducted | 6.0 MiB
kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB
kvflowcontrol.tokens.eval.elastic.returned.disconnect | 0 B
kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B
kvflowcontrol.tokens.eval.regular.available | 48 MiB
kvflowcontrol.tokens.eval.regular.deducted | 0 B
kvflowcontrol.tokens.eval.regular.returned | 0 B
kvflowcontrol.tokens.eval.regular.returned.disconnect | 0 B
kvflowcontrol.tokens.eval.regular.unaccounted | 0 B
kvflowcontrol.tokens.send.elastic.available | 21 MiB
kvflowcontrol.tokens.send.elastic.deducted | 6.0 MiB
kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B
kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B
kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB
kvflowcontrol.tokens.send.elastic.returned.disconnect | 0 B
kvflowcontrol.tokens.send.elastic.unaccounted | 0 B
kvflowcontrol.tokens.send.regular.available | 48 MiB
kvflowcontrol.tokens.send.regular.deducted | 0 B
kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B
kvflowcontrol.tokens.send.regular.returned | 0 B
kvflowcontrol.tokens.send.regular.returned.disconnect | 0 B
kvflowcontrol.tokens.send.regular.unaccounted | 0 B


-- Flow token metrics from n1 after work gets admitted and all elastic tokens
-- are returned through the fallback mechanism.
-- (Allow below-raft admission to proceed, and only enable the fallback dispatch
-- mechanism. All tokens are returned via the fallback mechanism.)
SELECT name, crdb_internal.humanize_bytes(value::INT8)
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvflowcontrol%tokens%'
ORDER BY name ASC;

kvflowcontrol.tokens.eval.elastic.available | 24 MiB
kvflowcontrol.tokens.eval.elastic.deducted | 3.0 MiB
kvflowcontrol.tokens.eval.elastic.returned | 3.0 MiB
kvflowcontrol.tokens.eval.elastic.deducted | 6.0 MiB
kvflowcontrol.tokens.eval.elastic.returned | 6.0 MiB
kvflowcontrol.tokens.eval.elastic.returned.disconnect | 0 B
kvflowcontrol.tokens.eval.elastic.unaccounted | 0 B
kvflowcontrol.tokens.eval.regular.available | 48 MiB
Expand All @@ -96,10 +118,10 @@ ORDER BY name ASC;
kvflowcontrol.tokens.eval.regular.returned.disconnect | 0 B
kvflowcontrol.tokens.eval.regular.unaccounted | 0 B
kvflowcontrol.tokens.send.elastic.available | 24 MiB
kvflowcontrol.tokens.send.elastic.deducted | 3.0 MiB
kvflowcontrol.tokens.send.elastic.deducted | 6.0 MiB
kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B
kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B
kvflowcontrol.tokens.send.elastic.returned | 3.0 MiB
kvflowcontrol.tokens.send.elastic.returned | 6.0 MiB
kvflowcontrol.tokens.send.elastic.returned.disconnect | 0 B
kvflowcontrol.tokens.send.elastic.unaccounted | 0 B
kvflowcontrol.tokens.send.regular.available | 48 MiB
Expand All @@ -110,7 +132,7 @@ ORDER BY name ASC;
kvflowcontrol.tokens.send.regular.unaccounted | 0 B


-- (Wait for range to quiesce.)
-- (Now the range can quiesce. Wait for it.)
----
----

Expand Down
Loading

0 comments on commit dc46773

Please sign in to comment.