Skip to content

Commit

Permalink
rac2: add kvflowsimulator equivalent simulation test
Browse files Browse the repository at this point in the history
Add a new simulation test, `TestUsingSimulation`. This test mirrors
`kvflowsimulator.TestUsingSimulation` and its existing datadriven tests
cases.

When comparing the output from each, no behavioral differences show up,
only intentionally different metric names.

Note that like the `kvflowsimulator.TestUsingSimulation`, the simulation
test needs to be deterministic and therefore only uses a single
goroutine, to avoid goroutine scheduling causing non-determinism. As
such, helper methods have been added to facilitate waiting for available
tokens directly on a stream's token counter and on a range in a
non-blocking manner (`testingNonBlockingAdmit`). These added methods
are simplified versions of `WaitForEval`, which always wait for all
connected streams to have available tokens before admission.

Resolves: cockroachdb#130186
Release note: None
  • Loading branch information
kvoli committed Sep 13, 2024
1 parent 259cde8 commit c95cf4e
Show file tree
Hide file tree
Showing 15 changed files with 2,635 additions and 17 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_test(
"log_tracker_test.go",
"priority_test.go",
"range_controller_test.go",
"simulation_test.go",
"store_stream_test.go",
"token_counter_test.go",
"token_tracker_test.go",
Expand All @@ -61,17 +62,21 @@ go_test(
"//pkg/testutils/datapathutils",
"//pkg/testutils/echotest",
"//pkg/util/admission/admissionpb",
"//pkg/util/asciitsdb",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_guptarohit_asciigraph//:asciigraph",
"@com_github_mkungla_bexp_v3//:bexp",
"@com_github_stretchr_testify//require",
],
)
14 changes: 10 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,12 @@ func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) {
rc.mu.nonVoterSet = nil
close(rc.mu.waiterSetRefreshCh)
rc.mu.waiterSetRefreshCh = nil

for _, rs := range rc.replicaMap {
if rs.sendStream != nil {
rs.closeSendStream(ctx)
}
}
}

// InspectRaftMuLocked returns a handle containing the state of the range
Expand Down Expand Up @@ -498,6 +504,9 @@ func (rc *rangeController) updateReplicaSet(ctx context.Context, newSet ReplicaS
for r := range prevSet {
desc, ok := newSet[r]
if !ok {
if rs := rc.replicaMap[r]; rs.sendStream != nil {
rs.closeSendStream(ctx)
}
delete(rc.replicaMap, r)
} else {
rs := rc.replicaMap[r]
Expand Down Expand Up @@ -764,10 +773,7 @@ func (rs *replicaState) handleReadyState(
defer rs.sendStream.mu.Unlock()
return rs.sendStream.mu.connectedState
}() {
case replicate:
rs.sendStream.changeToStateSnapshot(ctx)
shouldWaitChange = true
case probeRecentlyReplicate:
case replicate, probeRecentlyReplicate:
rs.closeSendStream(ctx)
shouldWaitChange = true
case snapshot:
Expand Down
65 changes: 52 additions & 13 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type testingRCState struct {
stopper *stop.Stopper
ts *timeutil.ManualTime
clock *hlc.Clock
ssTokenCounter *StreamTokenCounterProvider
streamTokenProvider *StreamTokenCounterProvider
probeToCloseScheduler ProbeToCloseTimerScheduler
evalMetrics *EvalWaitMetrics
// ranges contains the controllers for each range. It is the main state being
Expand All @@ -73,7 +73,7 @@ func (s *testingRCState) init(t *testing.T, ctx context.Context) {
s.stopper = stop.NewStopper()
s.ts = timeutil.NewManualTime(timeutil.UnixEpoch)
s.clock = hlc.NewClockForTesting(s.ts)
s.ssTokenCounter = NewStreamTokenCounterProvider(s.settings, s.clock)
s.streamTokenProvider = NewStreamTokenCounterProvider(s.settings, s.clock)
s.probeToCloseScheduler = &testingProbeToCloseTimerScheduler{state: s}
s.evalMetrics = NewEvalWaitMetrics()
s.ranges = make(map[roachpb.RangeID]*testingRCRange)
Expand Down Expand Up @@ -141,15 +141,15 @@ func (s *testingRCState) rangeStateString() string {
func (s *testingRCState) tokenCountsString() string {
var b strings.Builder
var streams []kvflowcontrol.Stream
s.ssTokenCounter.evalCounters.Range(func(k kvflowcontrol.Stream, v *tokenCounter) bool {
s.streamTokenProvider.evalCounters.Range(func(k kvflowcontrol.Stream, v *tokenCounter) bool {
streams = append(streams, k)
return true
})
sort.Slice(streams, func(i, j int) bool {
return streams[i].StoreID < streams[j].StoreID
})
for _, stream := range streams {
fmt.Fprintf(&b, "%v: %v\n", stream, s.ssTokenCounter.Eval(stream))
fmt.Fprintf(&b, "%v: %v\n", stream, s.streamTokenProvider.Eval(stream))
}
return b.String()
}
Expand Down Expand Up @@ -211,15 +211,15 @@ func (s *testingRCState) maybeSetInitialTokens(r testingRange) {
if _, ok := s.setTokenCounters[stream]; !ok {
s.setTokenCounters[stream] = struct{}{}
if s.initialRegularTokens != -1 {
s.ssTokenCounter.Eval(stream).testingSetTokens(s.testCtx,
s.streamTokenProvider.Eval(stream).testingSetTokens(s.testCtx,
admissionpb.RegularWorkClass, s.initialRegularTokens)
s.ssTokenCounter.Send(stream).testingSetTokens(s.testCtx,
s.streamTokenProvider.Send(stream).testingSetTokens(s.testCtx,
admissionpb.RegularWorkClass, s.initialRegularTokens)
}
if s.initialElasticTokens != -1 {
s.ssTokenCounter.Eval(stream).testingSetTokens(s.testCtx,
s.streamTokenProvider.Eval(stream).testingSetTokens(s.testCtx,
admissionpb.ElasticWorkClass, s.initialElasticTokens)
s.ssTokenCounter.Send(stream).testingSetTokens(s.testCtx,
s.streamTokenProvider.Send(stream).testingSetTokens(s.testCtx,
admissionpb.ElasticWorkClass, s.initialElasticTokens)
}
}
Expand All @@ -232,11 +232,13 @@ func (s *testingRCState) getOrInitRange(r testingRange) *testingRCRange {
testRC = &testingRCRange{}
testRC.mu.r = r
testRC.mu.evals = make(map[string]*testingRCEval)
testRC.mu.outstandingReturns = make(map[roachpb.ReplicaID]kvflowcontrol.Tokens)
testRC.mu.quorumPosition = kvflowcontrolpb.RaftLogPosition{Term: 1, Index: 0}
options := RangeControllerOptions{
RangeID: r.rangeID,
TenantID: r.tenantID,
LocalReplicaID: r.localReplicaID,
SSTokenCounter: s.ssTokenCounter,
SSTokenCounter: s.streamTokenProvider,
RaftInterface: testRC,
Clock: s.clock,
CloseTimerScheduler: s.probeToCloseScheduler,
Expand Down Expand Up @@ -265,10 +267,23 @@ type testingRCEval struct {

type testingRCRange struct {
rc *rangeController
// snapshots contain snapshots of the tracker state for different replicas,
// at various points in time. It is used in TestUsingSimulation.
snapshots []testingTrackerSnapshot

mu struct {
syncutil.Mutex
r testingRange
r testingRange
// outstandingReturns is used in TestUsingSimulation to track token
// returns. Likewise, for quorumPosition. It is not used in
// TestRangeController.
outstandingReturns map[roachpb.ReplicaID]kvflowcontrol.Tokens
quorumPosition kvflowcontrolpb.RaftLogPosition
// evals is used in TestRangeController for WaitForEval goroutine
// callbacks. It is not used in TestUsingSimulation, as the simulation test
// requires determinism on a smaller timescale than calling WaitForEval via
// multiple goroutines would allow. See testingNonBlockingAdmit to see how
// WaitForEval is done in simulation tests.
evals map[string]*testingRCEval
}
}
Expand Down Expand Up @@ -326,6 +341,29 @@ type testingRange struct {
replicaSet map[roachpb.ReplicaID]testingReplica
}

func makeSingleVoterTestingRange(
rangeID roachpb.RangeID,
tenantID roachpb.TenantID,
localNodeID roachpb.NodeID,
localStoreID roachpb.StoreID,
) testingRange {
return testingRange{
rangeID: rangeID,
tenantID: tenantID,
localReplicaID: 1,
replicaSet: map[roachpb.ReplicaID]testingReplica{
1: {
desc: roachpb.ReplicaDescriptor{
NodeID: localNodeID,
StoreID: localStoreID,
ReplicaID: 1,
Type: roachpb.VOTER_FULL,
},
},
},
}
}

func (t testingRange) replicas() ReplicaSet {
replicas := make(ReplicaSet, len(t.replicaSet))
for i, replica := range t.replicaSet {
Expand Down Expand Up @@ -585,12 +623,13 @@ func (t *testingProbeToCloseTimerScheduler) ScheduleSendStreamCloseRaftMuLocked(
//
// - close_rcs: Closes all range controllers.
//
// - admit: Admits the given store to the given range.
// - admit: Admits up to to_index for the given store, part of the given
// range.
// range_id=<range_id>
// store_id=<store_id> term=<term> to_index=<to_index> pri=<pri>
// ...
//
// - raft_event: Simulates a raft event on the given rangeStateProbe, calling
// - raft_event: Simulates a raft event on the given range, calling
// HandleRaftEvent.
// range_id=<range_id>
// term=<term> index=<index> pri=<pri> size=<size>
Expand Down Expand Up @@ -705,7 +744,7 @@ func TestRangeController(t *testing.T) {
tokens, err := humanizeutil.ParseBytes(tokenString)
require.NoError(t, err)

state.ssTokenCounter.Eval(kvflowcontrol.Stream{
state.streamTokenProvider.Eval(kvflowcontrol.Stream{
StoreID: roachpb.StoreID(store),
TenantID: roachpb.SystemTenantID,
}).adjust(ctx,
Expand Down
Loading

0 comments on commit c95cf4e

Please sign in to comment.