Skip to content

Commit

Permalink
rac2: add simulation test
Browse files Browse the repository at this point in the history
...

Resolves: cockroachdb#130186
Release note: None
  • Loading branch information
kvoli committed Sep 13, 2024
1 parent a463cb3 commit 7904428
Show file tree
Hide file tree
Showing 17 changed files with 2,658 additions and 18 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_test(
"log_tracker_test.go",
"priority_test.go",
"range_controller_test.go",
"simulation_test.go",
"store_stream_test.go",
"token_counter_test.go",
"token_tracker_test.go",
Expand All @@ -58,16 +59,20 @@ go_test(
"//pkg/settings/cluster",
"//pkg/testutils/datapathutils",
"//pkg/util/admission/admissionpb",
"//pkg/util/asciitsdb",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_guptarohit_asciigraph//:asciigraph",
"@com_github_mkungla_bexp_v3//:bexp",
"@com_github_stretchr_testify//require",
],
)
14 changes: 10 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,22 @@ func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) {
rc.mu.voterSets = nil
close(rc.mu.waiterSetRefreshCh)
rc.mu.waiterSetRefreshCh = nil

for _, rs := range rc.replicaMap {
if rs.sendStream != nil {
rs.closeSendStream(ctx)
}
}
}

func (rc *rangeController) updateReplicaSet(ctx context.Context, newSet ReplicaSet) {
prevSet := rc.replicaSet
for r := range prevSet {
desc, ok := newSet[r]
if !ok {
if rs := rc.replicaMap[r]; rs.sendStream != nil {
rs.closeSendStream(ctx)
}
delete(rc.replicaMap, r)
} else {
rs := rc.replicaMap[r]
Expand Down Expand Up @@ -724,10 +733,7 @@ func (rs *replicaState) handleReadyState(
defer rs.sendStream.mu.Unlock()
return rs.sendStream.mu.connectedState
}() {
case replicate:
rs.sendStream.changeToStateSnapshot(ctx)
shouldWaitChange = true
case probeRecentlyReplicate:
case replicate, probeRecentlyReplicate:
rs.closeSendStream(ctx)
shouldWaitChange = true
case snapshot:
Expand Down
65 changes: 52 additions & 13 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type testingRCState struct {
stopper *stop.Stopper
ts *timeutil.ManualTime
clock *hlc.Clock
ssTokenCounter *StreamTokenCounterProvider
streamTokenProvider *StreamTokenCounterProvider
probeToCloseScheduler ProbeToCloseTimerScheduler
evalMetrics *EvalWaitMetrics
// ranges contains the controllers for each range. It is the main state being
Expand All @@ -72,7 +72,7 @@ func (s *testingRCState) init(t *testing.T, ctx context.Context) {
s.stopper = stop.NewStopper()
s.ts = timeutil.NewManualTime(timeutil.UnixEpoch)
s.clock = hlc.NewClockForTesting(s.ts)
s.ssTokenCounter = NewStreamTokenCounterProvider(s.settings, s.clock)
s.streamTokenProvider = NewStreamTokenCounterProvider(s.settings, s.clock)
s.probeToCloseScheduler = &testingProbeToCloseTimerScheduler{state: s}
s.evalMetrics = NewEvalWaitMetrics()
s.ranges = make(map[roachpb.RangeID]*testingRCRange)
Expand Down Expand Up @@ -140,15 +140,15 @@ func (s *testingRCState) rangeStateString() string {
func (s *testingRCState) tokenCountsString() string {
var b strings.Builder
var streams []kvflowcontrol.Stream
s.ssTokenCounter.evalCounters.Range(func(k kvflowcontrol.Stream, v *tokenCounter) bool {
s.streamTokenProvider.evalCounters.Range(func(k kvflowcontrol.Stream, v *tokenCounter) bool {
streams = append(streams, k)
return true
})
sort.Slice(streams, func(i, j int) bool {
return streams[i].StoreID < streams[j].StoreID
})
for _, stream := range streams {
fmt.Fprintf(&b, "%v: %v\n", stream, s.ssTokenCounter.Eval(stream))
fmt.Fprintf(&b, "%v: %v\n", stream, s.streamTokenProvider.Eval(stream))
}
return b.String()
}
Expand Down Expand Up @@ -210,15 +210,15 @@ func (s *testingRCState) maybeSetInitialTokens(r testingRange) {
if _, ok := s.setTokenCounters[stream]; !ok {
s.setTokenCounters[stream] = struct{}{}
if s.initialRegularTokens != -1 {
s.ssTokenCounter.Eval(stream).testingSetTokens(s.testCtx,
s.streamTokenProvider.Eval(stream).testingSetTokens(s.testCtx,
admissionpb.RegularWorkClass, s.initialRegularTokens)
s.ssTokenCounter.Send(stream).testingSetTokens(s.testCtx,
s.streamTokenProvider.Send(stream).testingSetTokens(s.testCtx,
admissionpb.RegularWorkClass, s.initialRegularTokens)
}
if s.initialElasticTokens != -1 {
s.ssTokenCounter.Eval(stream).testingSetTokens(s.testCtx,
s.streamTokenProvider.Eval(stream).testingSetTokens(s.testCtx,
admissionpb.ElasticWorkClass, s.initialElasticTokens)
s.ssTokenCounter.Send(stream).testingSetTokens(s.testCtx,
s.streamTokenProvider.Send(stream).testingSetTokens(s.testCtx,
admissionpb.ElasticWorkClass, s.initialElasticTokens)
}
}
Expand All @@ -231,11 +231,13 @@ func (s *testingRCState) getOrInitRange(r testingRange) *testingRCRange {
testRC = &testingRCRange{}
testRC.mu.r = r
testRC.mu.evals = make(map[string]*testingRCEval)
testRC.mu.outstandingReturns = make(map[roachpb.ReplicaID]kvflowcontrol.Tokens)
testRC.mu.quorumPosition = kvflowcontrolpb.RaftLogPosition{Term: 1, Index: 0}
options := RangeControllerOptions{
RangeID: r.rangeID,
TenantID: r.tenantID,
LocalReplicaID: r.localReplicaID,
SSTokenCounter: s.ssTokenCounter,
SSTokenCounter: s.streamTokenProvider,
RaftInterface: testRC,
Clock: s.clock,
CloseTimerScheduler: s.probeToCloseScheduler,
Expand Down Expand Up @@ -264,10 +266,23 @@ type testingRCEval struct {

type testingRCRange struct {
rc *rangeController
// snapshots contain snapshots of the tracker state for different replicas,
// at various points in time. It is used in TestUsingSimulation.
snapshots []testingTrackerSnapshot

mu struct {
syncutil.Mutex
r testingRange
r testingRange
// outstandingReturns is used in TestUsingSimulation to track token
// returns. Likewise, for quorumPosition. It is not used in
// TestRangeController.
outstandingReturns map[roachpb.ReplicaID]kvflowcontrol.Tokens
quorumPosition kvflowcontrolpb.RaftLogPosition
// evals is used in TestRangeController for WaitForEval goroutine
// callbacks. It is not used in TestUsingSimulation, as the simulation test
// requires determinism on a smaller timescale than calling WaitForEval via
// multiple goroutines would allow. See testingNonBlockingAdmit to see how
// WaitForEval is done in simulation tests.
evals map[string]*testingRCEval
}
}
Expand Down Expand Up @@ -325,6 +340,29 @@ type testingRange struct {
replicaSet map[roachpb.ReplicaID]testingReplica
}

func makeSingleVoterTestingRange(
rangeID roachpb.RangeID,
tenantID roachpb.TenantID,
localNodeID roachpb.NodeID,
localStoreID roachpb.StoreID,
) testingRange {
return testingRange{
rangeID: rangeID,
tenantID: tenantID,
localReplicaID: 1,
replicaSet: map[roachpb.ReplicaID]testingReplica{
1: {
desc: roachpb.ReplicaDescriptor{
NodeID: localNodeID,
StoreID: localStoreID,
ReplicaID: 1,
Type: roachpb.VOTER_FULL,
},
},
},
}
}

func (t testingRange) replicas() ReplicaSet {
replicas := make(ReplicaSet, len(t.replicaSet))
for i, replica := range t.replicaSet {
Expand Down Expand Up @@ -584,12 +622,13 @@ func (t *testingProbeToCloseTimerScheduler) ScheduleSendStreamCloseRaftMuLocked(
//
// - close_rcs: Closes all range controllers.
//
// - admit: Admits the given store to the given range.
// - admit: Admits up to to_index for the given store, part of the given
// range.
// range_id=<range_id>
// store_id=<store_id> term=<term> to_index=<to_index> pri=<pri>
// ...
//
// - raft_event: Simulates a raft event on the given rangeStateProbe, calling
// - raft_event: Simulates a raft event on the given range, calling
// HandleRaftEvent.
// range_id=<range_id>
// term=<term> index=<index> pri=<pri> size=<size>
Expand Down Expand Up @@ -692,7 +731,7 @@ func TestRangeController(t *testing.T) {
tokens, err := humanizeutil.ParseBytes(tokenString)
require.NoError(t, err)

state.ssTokenCounter.Eval(kvflowcontrol.Stream{
state.streamTokenProvider.Eval(kvflowcontrol.Stream{
StoreID: roachpb.StoreID(store),
TenantID: roachpb.SystemTenantID,
}).adjust(ctx,
Expand Down
Loading

0 comments on commit 7904428

Please sign in to comment.