diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel index 8fc38396ae17..6d23a98ebcba 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel @@ -40,6 +40,7 @@ go_test( "log_tracker_test.go", "priority_test.go", "range_controller_test.go", + "simulation_test.go", "store_stream_test.go", "token_counter_test.go", "token_tracker_test.go", @@ -58,16 +59,20 @@ go_test( "//pkg/settings/cluster", "//pkg/testutils/datapathutils", "//pkg/util/admission/admissionpb", + "//pkg/util/asciitsdb", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/metric", "//pkg/util/protoutil", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_dustin_go_humanize//:go-humanize", + "@com_github_guptarohit_asciigraph//:asciigraph", + "@com_github_mkungla_bexp_v3//:bexp", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 077cb2996c46..93e162e6c62c 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -451,6 +451,12 @@ func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) { rc.mu.voterSets = nil close(rc.mu.waiterSetRefreshCh) rc.mu.waiterSetRefreshCh = nil + + for _, rs := range rc.replicaMap { + if rs.sendStream != nil { + rs.closeSendStream(ctx) + } + } } func (rc *rangeController) updateReplicaSet(ctx context.Context, newSet ReplicaSet) { @@ -458,6 +464,9 @@ func (rc *rangeController) updateReplicaSet(ctx context.Context, newSet ReplicaS for r := range prevSet { desc, ok := newSet[r] if !ok { + if rs := rc.replicaMap[r]; rs.sendStream != nil { + rs.closeSendStream(ctx) + } delete(rc.replicaMap, r) } else { rs := rc.replicaMap[r] @@ -724,10 +733,7 @@ func (rs *replicaState) handleReadyState( defer rs.sendStream.mu.Unlock() return rs.sendStream.mu.connectedState }() { - case replicate: - rs.sendStream.changeToStateSnapshot(ctx) - shouldWaitChange = true - case probeRecentlyReplicate: + case replicate, probeRecentlyReplicate: rs.closeSendStream(ctx) shouldWaitChange = true case snapshot: diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 319b131c71c8..46ea9e4278f4 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -52,7 +52,7 @@ type testingRCState struct { stopper *stop.Stopper ts *timeutil.ManualTime clock *hlc.Clock - ssTokenCounter *StreamTokenCounterProvider + streamTokenProvider *StreamTokenCounterProvider probeToCloseScheduler ProbeToCloseTimerScheduler evalMetrics *EvalWaitMetrics // ranges contains the controllers for each range. It is the main state being @@ -72,7 +72,7 @@ func (s *testingRCState) init(t *testing.T, ctx context.Context) { s.stopper = stop.NewStopper() s.ts = timeutil.NewManualTime(timeutil.UnixEpoch) s.clock = hlc.NewClockForTesting(s.ts) - s.ssTokenCounter = NewStreamTokenCounterProvider(s.settings, s.clock) + s.streamTokenProvider = NewStreamTokenCounterProvider(s.settings, s.clock) s.probeToCloseScheduler = &testingProbeToCloseTimerScheduler{state: s} s.evalMetrics = NewEvalWaitMetrics() s.ranges = make(map[roachpb.RangeID]*testingRCRange) @@ -140,7 +140,7 @@ func (s *testingRCState) rangeStateString() string { func (s *testingRCState) tokenCountsString() string { var b strings.Builder var streams []kvflowcontrol.Stream - s.ssTokenCounter.evalCounters.Range(func(k kvflowcontrol.Stream, v *tokenCounter) bool { + s.streamTokenProvider.evalCounters.Range(func(k kvflowcontrol.Stream, v *tokenCounter) bool { streams = append(streams, k) return true }) @@ -148,7 +148,7 @@ func (s *testingRCState) tokenCountsString() string { return streams[i].StoreID < streams[j].StoreID }) for _, stream := range streams { - fmt.Fprintf(&b, "%v: %v\n", stream, s.ssTokenCounter.Eval(stream)) + fmt.Fprintf(&b, "%v: %v\n", stream, s.streamTokenProvider.Eval(stream)) } return b.String() } @@ -210,15 +210,15 @@ func (s *testingRCState) maybeSetInitialTokens(r testingRange) { if _, ok := s.setTokenCounters[stream]; !ok { s.setTokenCounters[stream] = struct{}{} if s.initialRegularTokens != -1 { - s.ssTokenCounter.Eval(stream).testingSetTokens(s.testCtx, + s.streamTokenProvider.Eval(stream).testingSetTokens(s.testCtx, admissionpb.RegularWorkClass, s.initialRegularTokens) - s.ssTokenCounter.Send(stream).testingSetTokens(s.testCtx, + s.streamTokenProvider.Send(stream).testingSetTokens(s.testCtx, admissionpb.RegularWorkClass, s.initialRegularTokens) } if s.initialElasticTokens != -1 { - s.ssTokenCounter.Eval(stream).testingSetTokens(s.testCtx, + s.streamTokenProvider.Eval(stream).testingSetTokens(s.testCtx, admissionpb.ElasticWorkClass, s.initialElasticTokens) - s.ssTokenCounter.Send(stream).testingSetTokens(s.testCtx, + s.streamTokenProvider.Send(stream).testingSetTokens(s.testCtx, admissionpb.ElasticWorkClass, s.initialElasticTokens) } } @@ -231,11 +231,13 @@ func (s *testingRCState) getOrInitRange(r testingRange) *testingRCRange { testRC = &testingRCRange{} testRC.mu.r = r testRC.mu.evals = make(map[string]*testingRCEval) + testRC.mu.outstandingReturns = make(map[roachpb.ReplicaID]kvflowcontrol.Tokens) + testRC.mu.quorumPosition = kvflowcontrolpb.RaftLogPosition{Term: 1, Index: 0} options := RangeControllerOptions{ RangeID: r.rangeID, TenantID: r.tenantID, LocalReplicaID: r.localReplicaID, - SSTokenCounter: s.ssTokenCounter, + SSTokenCounter: s.streamTokenProvider, RaftInterface: testRC, Clock: s.clock, CloseTimerScheduler: s.probeToCloseScheduler, @@ -264,10 +266,23 @@ type testingRCEval struct { type testingRCRange struct { rc *rangeController + // snapshots contain snapshots of the tracker state for different replicas, + // at various points in time. It is used in TestUsingSimulation. + snapshots []testingTrackerSnapshot mu struct { syncutil.Mutex - r testingRange + r testingRange + // outstandingReturns is used in TestUsingSimulation to track token + // returns. Likewise, for quorumPosition. It is not used in + // TestRangeController. + outstandingReturns map[roachpb.ReplicaID]kvflowcontrol.Tokens + quorumPosition kvflowcontrolpb.RaftLogPosition + // evals is used in TestRangeController for WaitForEval goroutine + // callbacks. It is not used in TestUsingSimulation, as the simulation test + // requires determinism on a smaller timescale than calling WaitForEval via + // multiple goroutines would allow. See testingNonBlockingAdmit to see how + // WaitForEval is done in simulation tests. evals map[string]*testingRCEval } } @@ -325,6 +340,29 @@ type testingRange struct { replicaSet map[roachpb.ReplicaID]testingReplica } +func makeSingleVoterTestingRange( + rangeID roachpb.RangeID, + tenantID roachpb.TenantID, + localNodeID roachpb.NodeID, + localStoreID roachpb.StoreID, +) testingRange { + return testingRange{ + rangeID: rangeID, + tenantID: tenantID, + localReplicaID: 1, + replicaSet: map[roachpb.ReplicaID]testingReplica{ + 1: { + desc: roachpb.ReplicaDescriptor{ + NodeID: localNodeID, + StoreID: localStoreID, + ReplicaID: 1, + Type: roachpb.VOTER_FULL, + }, + }, + }, + } +} + func (t testingRange) replicas() ReplicaSet { replicas := make(ReplicaSet, len(t.replicaSet)) for i, replica := range t.replicaSet { @@ -584,12 +622,13 @@ func (t *testingProbeToCloseTimerScheduler) ScheduleSendStreamCloseRaftMuLocked( // // - close_rcs: Closes all range controllers. // -// - admit: Admits the given store to the given range. +// - admit: Admits up to to_index for the given store, part of the given +// range. // range_id= // store_id= term= to_index= pri= // ... // -// - raft_event: Simulates a raft event on the given rangeStateProbe, calling +// - raft_event: Simulates a raft event on the given range, calling // HandleRaftEvent. // range_id= // term= index= pri= size= @@ -692,7 +731,7 @@ func TestRangeController(t *testing.T) { tokens, err := humanizeutil.ParseBytes(tokenString) require.NoError(t, err) - state.ssTokenCounter.Eval(kvflowcontrol.Stream{ + state.streamTokenProvider.Eval(kvflowcontrol.Stream{ StoreID: roachpb.StoreID(store), TenantID: roachpb.SystemTenantID, }).adjust(ctx, diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go new file mode 100644 index 000000000000..66a6d0213087 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go @@ -0,0 +1,1181 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rac2 + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" + "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/raft/tracker" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/asciitsdb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/datadriven" + "github.com/dustin/go-humanize" + "github.com/guptarohit/asciigraph" + "github.com/mkungla/bexp/v3" + "github.com/stretchr/testify/require" +) + +// TestUsingSimulation is a data-driven test for the RangeController. It allows +// package authors to understand how flow tokens are maintained for individual +// replication streams, how write bandwidth is shaped by these tokens, and how +// requests queue/dequeue internally. The test provides the following commands: +// +// - "init" +// [handle=] +// ... +// Initialize the test simulator. Optionally, initialize range controllers +// for a given handle (mapping to a range_id), for subsequent use. +// +// - "timeline" +// t=[,) class={regular,elastic} \ +// stream=t/s adjust={+,-}/s rate=/s \ +// [deduction-delay=] (A) +// .... +// t=[,) handle= class={regular,elastic} \ +// adjust={+,-}/s rate=/s [stream=t/s] \ +// [deduction-delay=] (B) +// .... +// t= handle= op=connect stream=t/s \ +// log-position=/ (C) +// .... +// t= handle= op=disconnect stream=t/s (D) +// .... +// t= handle= op={snapshot,close} (E) +// .... +// +// Set up timelines to simulate. There are a few forms: +// +// A. Creates a "thread" that operates during the given time range +// t=[tstart,tend), issuing the specified 'rate' of requests of the given +// work 'class', over the given 'stream', where the flow tokens are +// {deducted,returned} with the given bandwidth. The 'rate' controls the +// granularity of token adjustment, i.e. if adjust=+100bytes/s and +// rate=5/s, then each return adjusts by +100/5 = +20bytes. If flow +// tokens are being deducted (-ve 'adjust'), they go through Admit() +// followed by DeductTokens(). If they're being returned (+ve 'adjust'), +// they simply go through ReturnTokens(). The optional 'deduction-delay' +// parameter controls the number of ticks between each request being +// granted admission and it deducting the corresponding flow tokens. +// +// B. Similar to A except using a given handle instead, which internally +// deducts tokens from all connected streams or if returning tokens, does so +// for the named stream. Token deductions from a range's RangeController +// are tied to monotonically increasing raft log positions starting from +// position the underlying stream was connected to (using C). When +// returning tokens, we translate the byte token value to the corresponding +// raft log prefix (token returns with handle are in terms of raft log +// positions). +// +// C. Connects the handle's RangeController to the specific stream, +// starting at the given log position. Subsequent token deductions using +// the RangeController will deduct from the given stream. +// +// D. Disconnects the specific stream from the handle's RangeController. All deducted +// flow tokens (using the given handle) from that specific stream are +// released. Future token deductions/returns (when using the given handle) +// don't deduct from/return to the stream. +// +// E. Close or snapshot the named handle. When closing a RangeController, +// all deducted flow tokens are released and subsequent operations are +// noops. Snapshots record the internal state (what tokens have been +// deducted-but-not-returned, and for what log positions). This can later +// be rendered using the "snapshot" directive. +// +// - "simulate" [t=[,)] +// Simulate timelines until the optionally specified timestamp. +// +// - "plot" [height=] [width=] [precision=] \ +// [t=[,)] +// unit= [rate=true] +// .... +// Plot the flow controller specified metrics (and optionally its rate of +// change) with the specified units. The following metrics are supported: +// a. kvflowcontrol.tokens.{eval,send}.{regular,elastic}.{available,deducted,returned} +// b. kvflowcontrol.streams.{eval,send}.{regular,elastic}.{blocked,total}_count +// c. kvflowcontrol.eval_wait.{regular,elastic}.requests.{waiting,admitted,errored} +// d. kvflowcontrol.eval_wait.{regular,elastic}.duration +// To overlay metrics onto the same plot, the selector supports curly brace +// expansion. If the unit is one of {MiB,MB,KB,KiB,s,ms,us,μs}, or the +// bandwidth equivalents (/s), the y-axis is automatically +// converted. +// +// - "snapshots" handle= +// Render any captured "snapshots" for the given handle. +// +// This test uses a non-blocking interface for the RangeController to support +// deterministic simulation. +func TestUsingSimulation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + datadriven.Walk(t, datapathutils.TestDataPath(t, "simulation"), func(t *testing.T, path string) { + var ( + sim *simulator + // Used to map named handles to their respective range IDs. + rangeIDSeq roachpb.RangeID + handleToRangeID map[string]roachpb.RangeID + ) + ctx := context.Background() + + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + sim = &simulator{} + sim.init(t, ctx) + defer sim.state.stopper.Stop(ctx) + handleToRangeID = make(map[string]roachpb.RangeID) + + for _, line := range strings.Split(d.Input, "\n") { + handle := strings.TrimPrefix(line, "handle=") + rangeIDSeq++ + rangeID := rangeIDSeq + handleToRangeID[handle] = rangeID + sim.state.getOrInitRange(makeSingleVoterTestingRange( + rangeID, testingLocalTenantID, testingLocalNodeID, testingLocalStoreID)) + } + + case "timeline": + require.NotNil(t, sim, "unintialized simulator (did you use 'init'?)") + for _, line := range strings.Split(d.Input, "\n") { + parts := strings.Fields(line) + + var tl timeline + for i := range parts { + parts[i] = strings.TrimSpace(parts[i]) + inner := strings.Split(parts[i], "=") + require.Len(t, inner, 2) + arg := strings.TrimSpace(inner[1]) + + switch { + case strings.HasPrefix(parts[i], "t="): + // Parse t={,[,)}. + ranged := strings.HasPrefix(arg, "[") + if ranged { + arg = strings.TrimSuffix(strings.TrimPrefix(arg, "["), ")") + args := strings.Split(arg, ",") + dur, err := time.ParseDuration(args[0]) + require.NoError(t, err) + tl.tstart = sim.state.ts.Now().Add(dur) + dur, err = time.ParseDuration(args[1]) + require.NoError(t, err) + tl.tend = sim.state.ts.Now().Add(dur) + } else { + dur, err := time.ParseDuration(arg) + require.NoError(t, err) + tl.tstart = sim.state.ts.Now().Add(dur) + } + + case strings.HasPrefix(parts[i], "class="): + // Parse class={regular,elastic}. + switch arg { + case "regular": + tl.pri = admissionpb.NormalPri + case "elastic": + tl.pri = admissionpb.BulkNormalPri + default: + t.Fatalf("unexpected class: %s", parts[i]) + } + + case strings.HasPrefix(parts[i], "stream="): + // Parse stream=t/s. + inner := strings.Split(arg, "/") + require.Len(t, inner, 2) + ti, err := strconv.Atoi(strings.TrimPrefix(inner[0], "t")) + require.NoError(t, err) + si, err := strconv.Atoi(strings.TrimPrefix(inner[1], "s")) + require.NoError(t, err) + tl.stream = kvflowcontrol.Stream{ + TenantID: roachpb.MustMakeTenantID(uint64(ti)), + StoreID: roachpb.StoreID(si), + } + + case strings.HasPrefix(parts[i], "adjust="): + // Parse adjust={+,-}/s. + isPositive := strings.Contains(arg, "+") + arg = strings.TrimPrefix(arg, "+") + arg = strings.TrimPrefix(arg, "-") + bytes, err := humanize.ParseBytes(strings.TrimSuffix(arg, "/s")) + require.NoError(t, err) + tl.delta = kvflowcontrol.Tokens(int64(bytes)) + if !isPositive { + tl.delta = -tl.delta + } + + case strings.HasPrefix(parts[i], "rate="): + // Parse rate=/s. + var err error + tl.rate, err = strconv.Atoi(strings.TrimSuffix(arg, "/s")) + require.NoError(t, err) + + case strings.HasPrefix(parts[i], "deduction-delay="): + // Parse deduction-delay=. + dur, err := time.ParseDuration(arg) + require.NoError(t, err) + tl.deductionDelay = int(dur.Nanoseconds() / tick.Nanoseconds()) + + case strings.HasPrefix(parts[i], "handle="): + // Parse handle=. + var ok bool + rangeID, ok := handleToRangeID[arg] + require.True(t, ok, "expected to find handle %q, was it initialized?", arg) + tl.handle = sim.state.ranges[roachpb.RangeID(rangeID)] + + case strings.HasPrefix(parts[i], "op="): + // Parse op=. + require.True(t, arg == "connect" || arg == "disconnect" || + arg == "close" || arg == "snapshot") + tl.handleOp = arg + + case strings.HasPrefix(parts[i], "log-position="): + // Parse log-position=/. + inner := strings.Split(arg, "/") + require.Len(t, inner, 2) + term, err := strconv.Atoi(inner[0]) + require.NoError(t, err) + index, err := strconv.Atoi(inner[1]) + require.NoError(t, err) + tl.position = kvflowcontrolpb.RaftLogPosition{ + Term: uint64(term), + Index: uint64(index), + } + + default: + t.Fatalf("unrecognized prefix: %s", parts[i]) + } + } + + sim.timeline(tl) + } + + case "simulate": + require.NotNilf(t, sim, "uninitialized simulator (did you use 'init'?)") + var end time.Time + if d.HasArg("t") { + // Parse t=[,), but ignoring the + // start time. + var tstr string + d.ScanArgs(t, "t", &tstr) + tstr = strings.TrimSuffix(strings.TrimPrefix(tstr, "["), ")") + args := strings.Split(tstr, ",") + dur, err := time.ParseDuration(args[1]) + require.NoError(t, err) + end = sim.state.ts.Now().Add(dur) + } + sim.simulate(ctx, end) + return "" + + case "metric_names": + var buf strings.Builder + for _, name := range sim.tsdb.RegisteredMetricNames() { + buf.WriteString(fmt.Sprintf("%s\n", name)) + } + return buf.String() + + case "plot": + var h, w, p = 15, 40, 1 + if d.HasArg("height") { + d.ScanArgs(t, "height", &h) + } + if d.HasArg("width") { + d.ScanArgs(t, "width", &w) + } + if d.HasArg("precision") { + d.ScanArgs(t, "precision", &p) + } + + var buf strings.Builder + for i, line := range strings.Split(d.Input, "\n") { + var ( + selector, unit string + rated bool + ) + parts := strings.Fields(line) + for i, part := range parts { + part = strings.TrimSpace(part) + if i == 0 { + selector = part + continue + } + + if strings.HasPrefix(part, "rate=") { + var err error + rated, err = strconv.ParseBool(strings.TrimPrefix(part, "rate=")) + require.NoError(t, err) + } + + if strings.HasPrefix(part, "unit=") { + unit = strings.TrimPrefix(part, "unit=") + } + } + + caption := strings.TrimPrefix(selector, "kvflowcontrol.") + if rated { + caption = fmt.Sprintf("rate(%s)", caption) + } + caption = fmt.Sprintf("%s (%s)", caption, unit) + + options := []asciitsdb.Option{ + asciitsdb.WithGraphOptions( + asciigraph.Height(h), + asciigraph.Width(w), + asciigraph.Precision(uint(p)), + asciigraph.Caption(caption), + ), + } + if rated { + options = append(options, asciitsdb.WithRate(int(time.Second/metricTick))) + } + switch unit { + case "μs", "us", "microseconds": + options = append(options, asciitsdb.WithDivisor(float64(time.Microsecond.Nanoseconds())) /* ns => μs conversion */) + case "ms", "milliseconds": + options = append(options, asciitsdb.WithDivisor(float64(time.Millisecond.Nanoseconds())) /* ns => μs conversion */) + case "s", "seconds": + options = append(options, asciitsdb.WithDivisor(float64(time.Second.Nanoseconds())) /* ns => μs conversion */) + case "MiB", "MiB/s": + options = append(options, asciitsdb.WithDivisor(humanize.MiByte) /* 1 MiB */) + case "MB", "MB/s": + options = append(options, asciitsdb.WithDivisor(humanize.MByte) /* 1 MB */) + case "KiB", "KiB/s": + options = append(options, asciitsdb.WithDivisor(humanize.KiByte) /* 1 KiB */) + case "KB", "KB/s": + options = append(options, asciitsdb.WithDivisor(humanize.KByte) /* 1 KB */) + default: + } + + if d.HasArg("t") { + // Parse t=[,). + var tstr string + d.ScanArgs(t, "t", &tstr) + tstr = strings.TrimSuffix(strings.TrimPrefix(tstr, "["), ")") + args := strings.Split(tstr, ",") + + dur, err := time.ParseDuration(args[0]) + require.NoError(t, err) + start := tzero.Add(dur) + options = append(options, asciitsdb.WithOffset(start.Sub(tzero).Nanoseconds()/metricTick.Nanoseconds())) + + dur, err = time.ParseDuration(args[1]) + require.NoError(t, err) + end := tzero.Add(dur) + options = append(options, asciitsdb.WithLimit(end.Sub(start).Nanoseconds()/metricTick.Nanoseconds())) + } + if i > 0 { + buf.WriteString("\n\n\n") + } + metrics := bexp.Parse(strings.TrimSpace(selector)) + buf.WriteString(sim.tsdb.Plot(metrics, options...)) + } + return buf.String() + + case "snapshots": + var name string + d.ScanArgs(t, "handle", &name) + rangeID, ok := handleToRangeID[name] + require.True(t, ok, "expected to find handle %q, was it initialized?", name) + handle := sim.state.ranges[roachpb.RangeID(rangeID)] + require.True(t, ok, "expected to find named handle %q, was it initialized?", name) + var buf strings.Builder + for i, s := range handle.snapshots { + if i > 0 { + buf.WriteString("\n") + } + buf.WriteString(fmt.Sprintf("t=%s stream=%s\n", s.time.Sub(tzero), s.stream)) + buf.WriteString(fmt.Sprintf(" %s", s.data)) + } + return buf.String() + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + return "" + }) + }) +} + +const ( + // testingLocalNodeID is the default node ID used for testing when none is specified. + testingLocalNodeID = roachpb.NodeID(1) + // testingLocalStoreID is the default store ID used for testing when none is specified. + testingLocalStoreID = roachpb.StoreID(1) + // tick is the smallest time interval that we simulate (1ms). + tick = time.Millisecond + // metricTick is the time interval over which we scrape metrics for ASCII plots. + metricTick = 100 * tick +) + +var ( + // tzero represents the t=0, the earliest possible time. All other + // t={,[,)) is relative to this time. + tzero = timeutil.Unix(0, 0) + // testingLocalTenantID is the default tenant ID used for testing when none is specified. + testingLocalTenantID = roachpb.TenantID{InternalValue: 1} +) + +// simulator represents the simulation environment for the RangeController. +type simulator struct { + state *testingRCState + registry *metric.Registry + tsdb *asciitsdb.TSDB + ticker []ticker +} + +func (s *simulator) init(t *testing.T, ctx context.Context) { + s.state = &testingRCState{} + s.state.init(t, ctx) + s.state.initialRegularTokens = kvflowcontrol.Tokens(kvflowcontrol.RegularTokensPerStream.Get(&s.state.settings.SV)) + s.state.initialElasticTokens = kvflowcontrol.Tokens(kvflowcontrol.ElasticTokensPerStream.Get(&s.state.settings.SV)) + s.registry = metric.NewRegistry() + s.tsdb = asciitsdb.New(t, s.registry) + s.registry.AddMetricStruct(s.state.evalMetrics) + s.registry.AddMetricStruct(s.state.streamTokenProvider.Metrics()) + s.tsdb.Register(s.state.evalMetrics) + s.tsdb.Register(s.state.streamTokenProvider.Metrics()) +} + +// timeline is a sequence of events being simulated. It comes in the following +// forms: +// +// A. t=[,) class={regular,elastic} \ +// stream=t/s adjust={+,-}/s rate=/s \ +// [deduction-delay=] +// B. t=[,) class={regular,elastic} handle= \ +// adjust={+,-}/s rate=/s [deduction-delay=] +// C. t= handle= op=connect stream=t/s \ +// log-position=/ +// D. t= handle= op=disconnect stream=t/s +// E. t= handle= op={snapshot,close} +type timeline struct { + // Start and (optional) end time for action being simulated. + tstart, tend time.Time + // Priority (if applicable) of work on behalf of which we're + // deducting/returning flow tokens through kvflowcontrol.Controller or + // kvflowcontrol.Handle. + pri admissionpb.WorkPriority + // Stream over which we're deducting/returning flow tokens (form A) or the + // stream we're connecting to/disconnecting from a given + // kvflowcontrol.Handle. + stream kvflowcontrol.Stream + // The number of tokens either being deducted or returned over + // [tstart,tend). Only applicable to forms A and B. + delta kvflowcontrol.Tokens + // The rate at which we adjust flow tokens, controlling the granularity at + // which 'delta' is adjusted. Only applicable to forms A and B. + rate int + // The # of ticks post-Admit() when we actually deduct tokens. Only + // applicable to forms A and B. + deductionDelay int + // Scoped RangeController handle. Only applicable to forms B-E, when we're + // not dealing with tokenCounters (streams) directly. + handle *testingRCRange + // The specific operation to run on a kvflowcontrol.Handle. Only applicable + // to forms C-E. + handleOp string + // The log position at which we start issuing writes/deducting tokens (form + // B) or the position at which we connect a given stream (form C). + position kvflowcontrolpb.RaftLogPosition +} + +func (s *simulator) timeline(tl timeline) { + // See commentary on the timeline type for its various forms; we do the + // validation here, in-line. For each timeline, we construct an appropriate + // ticker that's ticked during the simulation. + + if tl.handle == nil { + // Form A, interacting with the TokenCounters directly. + + if tl.rate == 0 { + return // nothing to do + } + + require.NotZero(s.state.t, tl.tend) + require.NotZero(s.state.t, tl.stream) + require.LessOrEqual(s.state.t, tl.rate, 1000) + + s.ticker = append(s.ticker, &streamsTicker{ + t: s.state.t, + evalTC: s.state.streamTokenProvider.Eval(tl.stream), + sendTC: s.state.streamTokenProvider.Send(tl.stream), + evalMetrics: s.state.evalMetrics, + + tstart: tl.tstart, + tend: tl.tend, + pri: tl.pri, + stream: tl.stream, + + deductionDelay: tl.deductionDelay, + deduct: make(map[time.Time][]func()), + waitingRequests: make(map[time.Time]waitingRequest), + // Using the parameters above, we figure out two things: + // - On which ticks do we adjust flow tokens? + // - How much by, each time? + // + // If the request rate we're simulating is: + // - 1000/sec, we adjust flow tokens every tick(=1ms). + // - 500/sec, we adjust flow tokens every 2 ticks (=2ms). + // - .... + // + // How much do we adjust by each time? Given we're making 'rate' requests + // per second, and have to deduct 'delta' tokens per second, each request + // just deducts delta/rate. + mod: int(time.Second/tick) / tl.rate, + delta: kvflowcontrol.Tokens(int(tl.delta) / tl.rate), + }) + return + } + + // Forms B-E, using the kvflowcontrol.Handle instead. + require.NotNil(s.state.t, tl.handle) + + if tl.handleOp != "" { + // Forms C-E, where we're either connecting/disconnecting a named + // stream, or closing/snapshotting a handle. + require.Zero(s.state.t, tl.tend) + if tl.handleOp == "connect" { + // Form C. + require.NotZero(s.state.t, tl.stream) + require.NotZero(s.state.t, tl.position) + } + if tl.handleOp == "disconnect" { + // Form D. + require.NotZero(s.state.t, tl.stream) + } + + s.ticker = append(s.ticker, &rangeOpTicker{ + t: s.state.t, + tstart: tl.tstart, + handle: tl.handle, + op: tl.handleOp, + stream: tl.stream, + position: tl.position, + }) + return + } + + // Form B, where we're deducting/returning flow tokens using RangeController. + if tl.rate == 0 { + return // nothing to do + } + + require.NotZero(s.state.t, tl.tend) + require.Zero(s.state.t, tl.position) + s.ticker = append(s.ticker, &rangeTicker{ + t: s.state.t, + + tstart: tl.tstart, + tend: tl.tend, + pri: tl.pri, + handle: tl.handle, + stream: tl.stream, + + deductionDelay: tl.deductionDelay, + deduct: make(map[time.Time][]func()), + + // See commentary on the controllerTicker construction above. + mod: int(time.Second/tick) / tl.rate, + delta: kvflowcontrol.Tokens(int(tl.delta) / tl.rate), + }) +} + +func (s *simulator) simulate(ctx context.Context, tend time.Time) { + s.state.ts.Backwards(s.state.ts.Since(tzero)) // reset to tzero + s.tsdb.Clear() + for i := range s.ticker { + s.ticker[i].reset() + if s.ticker[i].end().After(tend) { + tend = s.ticker[i].end() + } + } + + for { + t := s.state.ts.Now() + if t.After(tend) || t.Equal(tend) { + break + } + for i := range s.ticker { + s.ticker[i].tick(ctx, t) + } + if t.UnixNano()%metricTick.Nanoseconds() == 0 { + s.state.streamTokenProvider.UpdateMetricGauges() + s.tsdb.Scrape(ctx) + } + s.state.ts.Advance(tick) + } +} + +type ticker interface { + tick(ctx context.Context, t time.Time) + reset() + end() time.Time +} + +type streamsTicker struct { + t *testing.T + tstart, tend time.Time + pri admissionpb.WorkPriority + stream kvflowcontrol.Stream + delta kvflowcontrol.Tokens + evalMetrics *EvalWaitMetrics + evalTC, sendTC *tokenCounter + mod, ticks int // used to control the ticks at which we interact with the controller + deductionDelay int + + deduct map[time.Time][]func() + waitingRequests map[time.Time]waitingRequest +} + +var _ ticker = &streamsTicker{} + +type waitingRequest struct { + ctx context.Context + signaled func() bool + admit func() bool +} + +func (st *streamsTicker) tick(ctx context.Context, t time.Time) { + wc := admissionpb.WorkClassFromPri(st.pri) + if ds, ok := st.deduct[t]; ok { + for _, deduct := range ds { + deduct() + } + delete(st.deduct, t) + } + for key, waitingRequest := range st.waitingRequests { + // Process all waiting requests from earlier. Do this even if t > + // ct.tend since these requests could've been generated earlier. + if waitingRequest.ctx.Err() != nil { + delete(st.waitingRequests, key) + continue + } + if !waitingRequest.signaled() { + continue + } + if waitingRequest.admit() { + // Request admitted; proceed with token deductions. + if st.deductionDelay == 0 { + st.evalTC.adjust(ctx, wc, st.delta) + st.sendTC.adjust(ctx, wc, st.delta) + } else { + future := t.Add(tick * time.Duration(st.deductionDelay)) + st.deduct[future] = append(st.deduct[future], func() { + st.evalTC.adjust(ctx, wc, st.delta) + st.sendTC.adjust(ctx, wc, st.delta) + }) + } + delete(st.waitingRequests, key) + return + } + } + + if t.Before(st.tstart) || (t.After(st.tend) || t.Equal(st.tend)) { + return // we're outside our [ct.tstart, ct.tend), there's nothing left to do + } + + defer func() { st.ticks += 1 }() + if st.ticks%st.mod != 0 { + return // nothing to do in this tick + } + + if st.delta >= 0 { // return tokens + st.evalTC.adjust(ctx, wc, st.delta) + st.sendTC.adjust(ctx, wc, st.delta) + return + } + + admitted, signaled, admit := st.evalTC.testingNonBlockingAdmit(ctx, st.pri, st.evalMetrics) + if admitted { + // Request admitted; proceed with token deductions. + if st.deductionDelay == 0 { + // TODO(kvoli): We are assuming here (and throughout) that send tokens + // are deducted immediately regardless of their availability. When the + // send queue is added, we should extend this test to account for that. + st.evalTC.adjust(ctx, wc, st.delta) + st.sendTC.adjust(ctx, wc, st.delta) + } else { + future := t.Add(tick * time.Duration(st.deductionDelay)) + st.deduct[future] = append(st.deduct[future], func() { + st.evalTC.adjust(ctx, wc, st.delta) + st.sendTC.adjust(ctx, wc, st.delta) + }) + } + return + } + // Track waiting request. + st.waitingRequests[t] = waitingRequest{ + ctx: ctx, + signaled: signaled, + admit: admit, + } +} + +func (st *streamsTicker) reset() { + st.ticks = 0 + st.deduct = make(map[time.Time][]func()) + st.waitingRequests = make(map[time.Time]waitingRequest) +} + +func (st *streamsTicker) end() time.Time { + return st.tend +} + +// waitingRequestInRC represents a request waiting for admission (due to +// unavailable flow tokens) when interacting directly with the +// rac2.RangeController. +type waitingRequestInRC struct { + ctx context.Context + signaled []func() bool // whether the request has been signaled (for each underlying stream) + admit []func() bool // invoked once signaled, returns whether the request has been admitted (for each underlying stream) +} + +func (w *waitingRequestInRC) remove(i int) { + w.signaled = append(w.signaled[:i], w.signaled[i+1:]...) + w.admit = append(w.admit[:i], w.admit[i+1:]...) +} + +type rangeTicker struct { + t *testing.T + tstart, tend time.Time + pri admissionpb.WorkPriority + delta kvflowcontrol.Tokens + handle *testingRCRange + stream kvflowcontrol.Stream + mod, ticks int // used to control the ticks at which we interact with the handle + deductionDelay int + + deduct map[time.Time][]func() + waitingRequests map[time.Time]waitingRequestInRC +} + +var _ ticker = &rangeTicker{} + +func (rt *rangeTicker) tick(ctx context.Context, t time.Time) { + if ds, ok := rt.deduct[t]; ok { + for _, deduct := range ds { + deduct() + } + delete(rt.deduct, t) + } + for key, waitingRequest := range rt.waitingRequests { + // Process all waiting requests from earlier. Do this even if t > + // rt.tend since these requests could've been generated earlier. + if waitingRequest.ctx.Err() != nil { + delete(rt.waitingRequests, key) + continue + } + for i := range waitingRequest.signaled { + if !waitingRequest.signaled[i]() { + continue + } + if !waitingRequest.admit[i]() { + continue + } + + // Specific stream is unblocked (either because tokens were + // available, or it disconnected). Stop tracking it. + waitingRequest.remove(i) + break + + // TODO(irfansharif): Are we introducing non-determinism in this + // test by potentially allowing multiple (i) streams of a single + // request, and (ii) requests getting admitted depending on + // (non-deterministic) channel delivery? + } + + if len(waitingRequest.signaled) == 0 { + // All underlying streams have been unblocked; proceed with token + // deductions. + if rt.deductionDelay == 0 { + rt.handle.testingDeductTokens(rt.t, ctx, rt.pri, -rt.delta) + } else { + future := t.Add(tick * time.Duration(rt.deductionDelay)) + rt.deduct[future] = append(rt.deduct[future], func() { + rt.handle.testingDeductTokens(rt.t, ctx, rt.pri, -rt.delta) + }) + } + delete(rt.waitingRequests, key) + } + } + + if t.Before(rt.tstart) || (t.After(rt.tend) || t.Equal(rt.tend)) { + return // we're outside our [rt.tstart, rt.tend), there's nothing left to do + } + + defer func() { rt.ticks += 1 }() + if rt.ticks%rt.mod != 0 { + return // nothing to do in this tick + } + + if rt.delta >= 0 { // return tokens + rt.handle.testingReturnTokens(ctx, rt.pri, rt.delta, rt.stream) + return + } + + admitted, signaled, admit := rt.handle.rc.testingNonBlockingAdmit(ctx, rt.pri) + if admitted { + // Request admitted; proceed with token deductions. + if rt.deductionDelay == 0 { + rt.handle.testingDeductTokens(rt.t, ctx, rt.pri, -rt.delta) + } else { + future := t.Add(tick * time.Duration(rt.deductionDelay)) + rt.deduct[future] = append(rt.deduct[future], func() { + rt.handle.testingDeductTokens(rt.t, ctx, rt.pri, -rt.delta) + }) + } + return + } + + // Track waiting request. + rt.waitingRequests[t] = waitingRequestInRC{ + ctx: ctx, + signaled: signaled, + admit: admit, + } +} + +func (rt *rangeTicker) reset() { + rt.ticks = 0 + rt.deduct = make(map[time.Time][]func()) + rt.waitingRequests = make(map[time.Time]waitingRequestInRC) +} + +func (rt *rangeTicker) end() time.Time { + return rt.tend +} + +type rangeOpTicker struct { + t *testing.T + tstart time.Time + handle *testingRCRange + position kvflowcontrolpb.RaftLogPosition + stream kvflowcontrol.Stream + op string + + done bool +} + +var _ ticker = &rangeOpTicker{} + +// tick is part of the ticker interface. +func (rot *rangeOpTicker) tick(ctx context.Context, t time.Time) { + if rot.done || t.Before(rot.tstart) { + return // nothing to do + } + switch rot.op { + case "close": + rot.handle.rc.CloseRaftMuLocked(ctx) + rot.handle.rc = nil + case "disconnect": + rot.handle.testingDisconnectStream(rot.t, ctx, rot.stream) + case "connect": + rot.handle.testingConnectStream(rot.t, ctx, rot.stream, rot.position) + case "snapshot": + var data string + rid := rot.handle.testingFindReplStreamOrFatal(ctx, rot.stream) + rs := rot.handle.rc.replicaMap[rid] + if rs.sendStream != nil { + rs.sendStream.mu.Lock() + data = rs.sendStream.mu.tracker.testingString() + rs.sendStream.mu.Unlock() + } + rot.handle.snapshots = append(rot.handle.snapshots, testingTrackerSnapshot{ + time: t, + stream: rot.stream, + data: data, + }) + } + rot.done = true + +} + +type testingTrackerSnapshot struct { + time time.Time + stream kvflowcontrol.Stream + data string +} + +// reset is part of the ticker interface. +func (rot *rangeOpTicker) reset() {} + +// end is part of the ticker interface. +func (rot *rangeOpTicker) end() time.Time { + return rot.tstart +} + +func (t *tokenCounter) testingSignalChannel(wc admissionpb.WorkClass) { + t.mu.counters[wc].signal() +} + +func (t *tokenCounter) testingSignaled(wc admissionpb.WorkClass) func() bool { + return func() bool { + select { + case <-t.mu.counters[wc].signalCh: + return true + default: + return false + } + } +} + +func (t *tokenCounter) testingNonBlockingAdmit( + ctx context.Context, pri admissionpb.WorkPriority, metrics *EvalWaitMetrics, +) (admitted bool, signaled func() bool, admit func() bool) { + wc := admissionpb.WorkClassFromPri(pri) + tstart := t.clock.PhysicalTime() + if metrics != nil { + metrics.OnWaiting(wc) + } + + admit = func() bool { + tokens := t.tokens(wc) + if tokens <= 0 { + return false + } + t.testingSignalChannel(wc) + if metrics != nil { + metrics.OnAdmitted(wc, t.clock.PhysicalTime().Sub(tstart)) + } + return true + } + + if admit() { + return true, nil, nil + } + + return false, t.testingSignaled(wc), admit +} + +func (rc *rangeController) testingNonBlockingAdmit( + ctx context.Context, pri admissionpb.WorkPriority, +) (admitted bool, signaled []func() bool, admit []func() bool) { + rc.mu.Lock() + vss := rc.mu.voterSets + rc.mu.Unlock() + + // We are matching the behavior of the existing kvflowsimulator test here, + // where we are abstractly dealing with streams representing replicas, + // connected to the leader. Expect there to be at least one voter set and no + // joint configuration (multiple voter sets). + if len(vss) != 1 { + log.Fatalf(ctx, "expected exactly one voter set, found %d", len(vss)) + } + // Similar to the tokenCounter non-blocking admit, we also don't care about + // replica types, or waiting for only a quorum. We just wait for all + // non-closed streams to have available tokens, regardless of work class. + // + // TODO(kvoli): When we introduce the send queue, we will want to extend the + // simulation testing. However, we will need to diverge from the existing + // kvflowsimulator test to do so. For now, match the testing behavior as + // close as possible. + vs := vss[0] + tstart := rc.opts.Clock.PhysicalTime() + wc := admissionpb.WorkClassFromPri(pri) + rc.opts.EvalWaitMetrics.OnWaiting(wc) + + admitted = true + for _, v := range vs { + // We don't pass in metrics because to avoid duplicate updates to eval_wait + // metrics, done for a stream in tokenCounter.testingNonBlockingAdmit. + vAdmitted, vSignaled, vAdmit := v.evalTokenCounter.testingNonBlockingAdmit(ctx, pri, nil /* metrics */) + if vAdmitted { + continue + } + admit = append(admit, func() bool { + if vAdmit() { + rc.opts.EvalWaitMetrics.OnAdmitted(wc, rc.opts.Clock.PhysicalTime().Sub(tstart)) + return true + } + return false + }) + signaled = append(signaled, vSignaled) + admitted = false + } + if admitted { + rc.opts.EvalWaitMetrics.OnAdmitted(wc, rc.opts.Clock.PhysicalTime().Sub(tstart)) + } + + return admitted, signaled, admit +} + +func (r *testingRCRange) testingDeductTokens( + t *testing.T, ctx context.Context, pri admissionpb.WorkPriority, tokens kvflowcontrol.Tokens, +) { + if r.rc == nil { + return + } + r.mu.Lock() + r.mu.quorumPosition.Index++ + r.mu.Unlock() + + info := entryInfo{ + term: r.mu.quorumPosition.Term, + index: r.mu.quorumPosition.Index, + enc: raftlog.EntryEncodingStandardWithACAndPriority, + tokens: tokens, + pri: AdmissionToRaftPriority(pri), + } + + r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{ + Term: info.term, + Entries: []raftpb.Entry{testingCreateEntry(t, info)}, + }) +} + +func (r *testingRCRange) testingReturnTokens( + ctx context.Context, + pri admissionpb.WorkPriority, + tokens kvflowcontrol.Tokens, + stream kvflowcontrol.Stream, +) { + if r.rc == nil { + return + } + // Find the replica corresponding to the given stream. + var rs *replicaState + for _, r := range r.rc.replicaMap { + if r.desc.StoreID == stream.StoreID { + rs = r + break + } + } + if rs == nil || rs.sendStream == nil { + log.Fatalf(ctx, "expected to find non-closed replica send stream for %v", stream) + } + rid := rs.desc.ReplicaID + + // We need to determine the index at which we're returning tokens via + // AdmittedVector. We do this by iterating over the ountstanding returns + raftPri := AdmissionToRaftPriority(pri) + returnIndex := uint64(0) + + r.mu.Lock() + r.mu.outstandingReturns[rid] += tokens + func() { + rs.sendStream.mu.Lock() + defer rs.sendStream.mu.Unlock() + for _, deduction := range rs.sendStream.mu.tracker.tracked[raftPri] { + if r.mu.outstandingReturns[rid]-deduction.tokens >= 0 { + r.mu.outstandingReturns[rid] -= deduction.tokens + returnIndex = deduction.index + } + } + }() + r.mu.Unlock() + + if returnIndex != 0 { + av := AdmittedVector{Term: r.mu.quorumPosition.Term} + av.Admitted[raftPri] = returnIndex + r.mu.Lock() + repl := r.mu.r.replicaSet[rid] + repl.info.Match = r.mu.quorumPosition.Index + repl.info.Next = r.mu.quorumPosition.Index + 1 + r.mu.r.replicaSet[rid] = repl + r.mu.Unlock() + r.rc.AdmitRaftMuLocked(ctx, rs.desc.ReplicaID, av) + r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}) + } +} + +func (r *testingRCRange) testingFindReplStreamOrFatal( + ctx context.Context, stream kvflowcontrol.Stream, +) roachpb.ReplicaID { + r.mu.Lock() + defer r.mu.Unlock() + + for _, rs := range r.rc.replicaMap { + if rs.desc.StoreID == stream.StoreID { + return rs.desc.ReplicaID + } + } + log.Fatalf(ctx, "expected to find replica for stream %v", stream) + return 0 +} + +func (r *testingRCRange) testingConnectStream( + t *testing.T, + ctx context.Context, + stream kvflowcontrol.Stream, + position kvflowcontrolpb.RaftLogPosition, +) { + if r.rc == nil { + return + } + r.mu.Lock() + r.mu.quorumPosition = position + r.mu.r.replicaSet[roachpb.ReplicaID(stream.StoreID)] = testingReplica{ + desc: roachpb.ReplicaDescriptor{ + // We aren't testing multiple stores per node. + NodeID: roachpb.NodeID(stream.StoreID), + StoreID: stream.StoreID, + ReplicaID: roachpb.ReplicaID(stream.StoreID), + Type: roachpb.VOTER_FULL, + }, + info: FollowerStateInfo{ + State: tracker.StateReplicate, + Match: position.Index, + Next: position.Index + 1, + }, + } + r.mu.Unlock() + require.NoError(t, r.rc.SetReplicasRaftMuLocked(ctx, r.mu.r.replicas())) + // Send an empty raft event in order to trigger state changes. + require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{})) +} + +func (r *testingRCRange) testingDisconnectStream( + t *testing.T, ctx context.Context, stream kvflowcontrol.Stream, +) { + if r.rc == nil { + return + } + rid := r.testingFindReplStreamOrFatal(ctx, stream) + r.mu.Lock() + rs := r.mu.r.replicaSet[rid] + rs.info.State = tracker.StateSnapshot + r.mu.r.replicaSet[rid] = rs + r.mu.Unlock() + // Send an empty raft event in order to trigger state changes. + require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{})) +} + +func (t *Tracker) testingString() string { + var buf strings.Builder + for pri, deductions := range t.tracked { + if len(deductions) == 0 { + continue + } + buf.WriteString(fmt.Sprintf("pri=%s\n", RaftToAdmissionPriority(raftpb.Priority(pri)))) + for _, deduction := range deductions { + buf.WriteString(fmt.Sprintf(" tokens=%s log-position=%v/%v\n", + testingPrintTrimmedTokens(deduction.tokens), deduction.term, deduction.index)) + } + } + + return buf.String() +} + +func testingPrintTrimmedTokens(t kvflowcontrol.Tokens) string { + return strings.TrimPrefix(strings.ReplaceAll(t.String(), " ", ""), "+") +} diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/metric_names b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/metric_names new file mode 100644 index 000000000000..0720d15453f7 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/metric_names @@ -0,0 +1,43 @@ +# This test prints out the scraped metric names for conveinent use in other +# tests. When new metrics are added to the structs in rac/metrics.go, they will +# be reflected here when re-running the test with --rewrite (or cause this test +# to fail). +init +---- + +metric_names +---- +kvflowcontrol.eval_wait.elastic.duration +kvflowcontrol.eval_wait.elastic.requests.admitted +kvflowcontrol.eval_wait.elastic.requests.bypassed +kvflowcontrol.eval_wait.elastic.requests.errored +kvflowcontrol.eval_wait.elastic.requests.waiting +kvflowcontrol.eval_wait.regular.duration +kvflowcontrol.eval_wait.regular.requests.admitted +kvflowcontrol.eval_wait.regular.requests.bypassed +kvflowcontrol.eval_wait.regular.requests.errored +kvflowcontrol.eval_wait.regular.requests.waiting +kvflowcontrol.streams.eval.elastic.blocked_count +kvflowcontrol.streams.eval.elastic.total_count +kvflowcontrol.streams.eval.regular.blocked_count +kvflowcontrol.streams.eval.regular.total_count +kvflowcontrol.streams.send.elastic.blocked_count +kvflowcontrol.streams.send.elastic.total_count +kvflowcontrol.streams.send.regular.blocked_count +kvflowcontrol.streams.send.regular.total_count +kvflowcontrol.tokens.eval.elastic.available +kvflowcontrol.tokens.eval.elastic.deducted +kvflowcontrol.tokens.eval.elastic.returned +kvflowcontrol.tokens.eval.elastic.unaccounted +kvflowcontrol.tokens.eval.regular.available +kvflowcontrol.tokens.eval.regular.deducted +kvflowcontrol.tokens.eval.regular.returned +kvflowcontrol.tokens.eval.regular.unaccounted +kvflowcontrol.tokens.send.elastic.available +kvflowcontrol.tokens.send.elastic.deducted +kvflowcontrol.tokens.send.elastic.returned +kvflowcontrol.tokens.send.elastic.unaccounted +kvflowcontrol.tokens.send.regular.available +kvflowcontrol.tokens.send.regular.deducted +kvflowcontrol.tokens.send.regular.returned +kvflowcontrol.tokens.send.regular.unaccounted diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_overview b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_overview new file mode 100644 index 000000000000..8ad6e564be21 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_overview @@ -0,0 +1,279 @@ +# Walk through the basics of the datadriven syntax for handle tests. + +# Initialize testing components, including a named handle h. +init +handle=h +---- + +# Connect the handle to three replication streams (like the leader+leaseholder +# for a triply replicated range would). +# - Start writing to the range at 2MiB/s across 10 reqs/s. +# - Grab s1 snapshots at t=2 and t=3, to compare how things evolve. +# - Partway through, at t=4s, disconnect s2. +# - From t=[10s,18s), start returning tokens from s1 and s3. +# - Grab s1 snapshots at t=16s, t=17s and t=19s to compare how things evolve. +# - Start writing to the range again at t=[20s,21s) at 2MiB/s across 10 reqs/s. +# - At t=22s, close the handle. +timeline +t=0s handle=h op=connect stream=t1/s1 log-position=4/0 +t=0s handle=h op=connect stream=t1/s2 log-position=4/0 +t=0s handle=h op=connect stream=t1/s3 log-position=4/0 +t=2s handle=h op=snapshot stream=t1/s1 +t=3s handle=h op=snapshot stream=t1/s1 +t=4s handle=h op=disconnect stream=t1/s2 +t=[0s,8s) handle=h class=regular adjust=-2MiB/s rate=10/s +t=[10s,18s) handle=h class=regular adjust=+2MiB/s rate=10/s stream=t1/s1 +t=[10s,18s) handle=h class=regular adjust=+2MiB/s rate=10/s stream=t1/s3 +t=16s handle=h op=snapshot stream=t1/s1 +t=17s handle=h op=snapshot stream=t1/s1 +t=19s handle=h op=snapshot stream=t1/s1 +t=[20s,24s) handle=h class=regular adjust=-2MiB/s rate=10/s +t=21s handle=h op=snapshot stream=t1/s1 +t=25s handle=h op=close +---- + + +simulate t=[0s,28s) +---- + +# We should observe a few things. +# - We've lazily instantiated three streams for regular traffic. Even when the +# handle disconnects the stream from itself, it's still tracked by the +# controller. +# - Given there are three streams, we start off with 3*16MiB = 48MiB of +# regular tokens. +# - When three streams are connected, quorum writes at 2MiB/s translates to +# token deductions at 3*2MiB/s = 6MiB/s. +# - When s2 is disconnected from h: +# - All s2-specific flow tokens deducted by h are returned. By t=4s, this +# translates to 4s * 2MiB/s = 8MiB. We see this in both +# regular_tokens_available and the 8MiB/s spike in +# rate(regular_tokens_returned). +# - The rate of token deductions decreases to 2*2MiB/s = 4MiB/s. +# - By the time quorum writes are blocked, the available regular tokens is +# 16MiB, corresponding to s2's fully available regular tokens. +plot t=[0s,10s) +kvflowcontrol.streams.eval.regular.total_count unit=streams +kvflowcontrol.tokens.eval.regular.available unit=MiB +kvflowcontrol.tokens.eval.regular.{deducted,returned} unit=MiB/s rate=true +---- +---- + 3.0 ┼─────────────────────────────────────── + streams.eval.regular.total_count (streams) + + + 47.4 ┼╮ + 45.3 ┤╰╮ + 43.2 ┤ ╰╮ + 41.1 ┤ ╰─╮ + 39.0 ┤ ╰╮ + 36.9 ┤ ╰─╮ + 34.8 ┤ ╰╮ + 32.7 ┤ ╰╮ + 30.7 ┤ ╰─╮ ╭╮ + 28.6 ┤ ╰╮ │╰──╮ + 26.5 ┤ ╰╮ │ ╰─╮ + 24.4 ┤ ╰─╯ ╰─╮ + 22.3 ┤ ╰─╮ + 20.2 ┤ ╰─╮ + 18.1 ┤ ╰─╮ + 16.0 ┤ ╰───────── + tokens.eval.regular.available (MiB) + + + 8.0 ┤ ╭───╮ + 7.5 ┤ │ │ + 6.9 ┤ │ │ + 6.4 ┤ │ │ + 5.9 ┤ ╭───────────│╮ │ + 5.3 ┤ │ │╰╮ │ + 4.8 ┤ │ │ ╰╮│ + 4.3 ┤ │ │ ╰│───────────╮ + 3.7 ┤ │ │ │ │ + 3.2 ┤ │ │ │ ╰╮ + 2.7 ┤ │ │ │ │ + 2.1 ┤ │ │ │ ╰╮ + 1.6 ┤ │ │ │ │ + 1.1 ┤ │ │ │ ╰╮ + 0.5 ┤ │ │ │ │ + 0.0 ┼───────────────╯ ╰─────────────────── + rate(tokens.eval.regular.{deducted,returned}) (MiB/s) +---- +---- + + +# This period corresponds to: +# t=[20s,24s) handle=h class=regular adjust=-2MiB/s rate=10/s +# t=25s handle=h op=close +# Where the handle is connected to t1/s1 and t1/s3. Note how when the handle is +# closed, all (2+2)MiB/s*4s = 16MiB tokens for s1 and s3 are returned. +# +# NB: There's some degree of smoothing happening in the rate plots below, which +# is why we see a slow ascent/descent to/from 4MiB/s. +plot t=[19s,28s) +kvflowcontrol.tokens.eval.regular.available unit=MiB +kvflowcontrol.tokens.eval.regular.{deducted,returned} unit=MiB/s rate=true +---- +---- + 48.0 ┼────╮ ╭──────────── + 46.9 ┤ ╰╮ │ + 45.9 ┤ ╰╮ │ + 44.8 ┤ ╰─╮ │ + 43.7 ┤ ╰╮ │ + 42.7 ┤ ╰╮ │ + 41.6 ┤ ╰╮ │ + 40.5 ┤ ╰╮ │ + 39.5 ┤ ╰╮ │ + 38.4 ┤ ╰─╮ │ + 37.3 ┤ ╰╮ ╭╯ + 36.3 ┤ ╰╮ │ + 35.2 ┤ ╰╮ │ + 34.1 ┤ ╰╮ │ + 33.1 ┤ ╰╮ │ + 32.0 ┤ ╰────╯ + tokens.eval.regular.available (MiB) + + + 16.0 ┤ ╭───╮ + 14.9 ┤ │ │ + 13.9 ┤ │ │ + 12.8 ┤ │ │ + 11.7 ┤ │ │ + 10.7 ┤ │ │ + 9.6 ┤ │ │ + 8.5 ┤ │ │ + 7.5 ┤ │ │ + 6.4 ┤ │ │ + 5.3 ┤ ╭╯ │ + 4.3 ┤ ╭────────────╮ │ │ + 3.2 ┤ ╭─╯ ╰╮ │ │ + 2.1 ┤ ╭╯ ╰─╮│ │ + 1.1 ┤ ╭╯ ╰│ │ + 0.0 ┼─────────────────────────╯────╰──────── + rate(tokens.eval.regular.{deducted,returned}) (MiB/s) +---- +---- + +# Observe captured snapshots. +# - At 10 reqs/s, by t=2s we have 10*2=20 tracked tokens from log positions +# 4/1 to 4/20. We add 10 more by t=3s, going up to 4/30. +# - At t=16s the we're tracking tokens from log positions 4/62 to 4/80. We +# return tokens in 10 increments of 205KiB over the next second, so at t=17s +# we've reduced the remaining tracked tokens by 10, now starting at 4/62. +# - At t=19s we have no outstanding tokens being tracked -- we've returned +# everything. +# - At t=21s we've tracked a few more tokens; tokens we haven't returned yet. +# +# TODO(irfansharif): Support filtering this output by stream and time range. +snapshots handle=h +---- +---- +t=2s stream=t1/s1 + pri=normal-pri + tokens=205KiB log-position=4/1 + tokens=205KiB log-position=4/2 + tokens=205KiB log-position=4/3 + tokens=205KiB log-position=4/4 + tokens=205KiB log-position=4/5 + tokens=205KiB log-position=4/6 + tokens=205KiB log-position=4/7 + tokens=205KiB log-position=4/8 + tokens=205KiB log-position=4/9 + tokens=205KiB log-position=4/10 + tokens=205KiB log-position=4/11 + tokens=205KiB log-position=4/12 + tokens=205KiB log-position=4/13 + tokens=205KiB log-position=4/14 + tokens=205KiB log-position=4/15 + tokens=205KiB log-position=4/16 + tokens=205KiB log-position=4/17 + tokens=205KiB log-position=4/18 + tokens=205KiB log-position=4/19 + tokens=205KiB log-position=4/20 + +t=3s stream=t1/s1 + pri=normal-pri + tokens=205KiB log-position=4/1 + tokens=205KiB log-position=4/2 + tokens=205KiB log-position=4/3 + tokens=205KiB log-position=4/4 + tokens=205KiB log-position=4/5 + tokens=205KiB log-position=4/6 + tokens=205KiB log-position=4/7 + tokens=205KiB log-position=4/8 + tokens=205KiB log-position=4/9 + tokens=205KiB log-position=4/10 + tokens=205KiB log-position=4/11 + tokens=205KiB log-position=4/12 + tokens=205KiB log-position=4/13 + tokens=205KiB log-position=4/14 + tokens=205KiB log-position=4/15 + tokens=205KiB log-position=4/16 + tokens=205KiB log-position=4/17 + tokens=205KiB log-position=4/18 + tokens=205KiB log-position=4/19 + tokens=205KiB log-position=4/20 + tokens=205KiB log-position=4/21 + tokens=205KiB log-position=4/22 + tokens=205KiB log-position=4/23 + tokens=205KiB log-position=4/24 + tokens=205KiB log-position=4/25 + tokens=205KiB log-position=4/26 + tokens=205KiB log-position=4/27 + tokens=205KiB log-position=4/28 + tokens=205KiB log-position=4/29 + tokens=205KiB log-position=4/30 + +t=16s stream=t1/s1 + pri=normal-pri + tokens=205KiB log-position=4/62 + tokens=205KiB log-position=4/63 + tokens=205KiB log-position=4/64 + tokens=205KiB log-position=4/65 + tokens=205KiB log-position=4/66 + tokens=205KiB log-position=4/67 + tokens=205KiB log-position=4/68 + tokens=205KiB log-position=4/69 + tokens=205KiB log-position=4/70 + tokens=205KiB log-position=4/71 + tokens=205KiB log-position=4/72 + tokens=205KiB log-position=4/73 + tokens=205KiB log-position=4/74 + tokens=205KiB log-position=4/75 + tokens=205KiB log-position=4/76 + tokens=205KiB log-position=4/77 + tokens=205KiB log-position=4/78 + tokens=205KiB log-position=4/79 + tokens=205KiB log-position=4/80 + +t=17s stream=t1/s1 + pri=normal-pri + tokens=205KiB log-position=4/72 + tokens=205KiB log-position=4/73 + tokens=205KiB log-position=4/74 + tokens=205KiB log-position=4/75 + tokens=205KiB log-position=4/76 + tokens=205KiB log-position=4/77 + tokens=205KiB log-position=4/78 + tokens=205KiB log-position=4/79 + tokens=205KiB log-position=4/80 + +t=19s stream=t1/s1 + +t=21s stream=t1/s1 + pri=normal-pri + tokens=205KiB log-position=4/81 + tokens=205KiB log-position=4/82 + tokens=205KiB log-position=4/83 + tokens=205KiB log-position=4/84 + tokens=205KiB log-position=4/85 + tokens=205KiB log-position=4/86 + tokens=205KiB log-position=4/87 + tokens=205KiB log-position=4/88 + tokens=205KiB log-position=4/89 + tokens=205KiB log-position=4/90 + tokens=205KiB log-position=4/91 +---- +---- + +# vim:ft=conf diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_shared_stream b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_shared_stream new file mode 100644 index 000000000000..7725e3a998fe --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_shared_stream @@ -0,0 +1,111 @@ +# Demonstrate the behavior when a single stream is shared across two handles h1 +# and h2. +init +handle=h1 +handle=h2 +---- + +# Set up two handles that connect only to s1, and issue writes at 2MiB/s each. +# Set up token returns for the stream at 1MiB/s. +timeline +t=0s handle=h1 op=connect stream=t1/s1 log-position=4/0 +t=0s handle=h2 op=connect stream=t1/s1 log-position=4/0 +t=[0s,10s) handle=h1 class=regular adjust=-2MiB/s rate=10/s +t=[0s,10s) handle=h2 class=regular adjust=-2MiB/s rate=10/s +t=[0s,10s) stream=t1/s1 class=regular adjust=+1MiB/s rate=200/s +---- + +simulate t=[0,10s) +---- + +# We expect the tokens to get depleted and then eventually observe that the +# aggregate bandwidth of token deductions across the two handles match the +# token return rate. The admission rate goes from 2*10 = 20reqs/s (unthrottled) +# to 5 reqs/s, which is 1/4th of 20. The 1/4th comes from 1MiB/s return rate +# being 1/4th of -4MiB/s demand. +plot + +kvflowcontrol.streams.eval.regular.{blocked,total}_count unit=streams +kvflowcontrol.tokens.eval.regular.available unit=MiB +kvflowcontrol.tokens.eval.regular.{deducted,returned} unit=MiB/s rate=true +kvflowcontrol.eval_wait.regular.requests.{admitted,waiting} unit=reqs/s rate=true +---- +---- + 1.0 ┼─────────────────────────────────────── + 0.9 ┤ ╭╮ ╭╮ │ + 0.9 ┤ ││ ││ ╭╮ │ + 0.8 ┤ ││ ││ ╭╮ ││ │ + 0.7 ┤ ││ ││ ││ ││ │ + 0.7 ┤ ╭╮ ││ ││ ╭╯│ ││ │ + 0.6 ┤ ││ ││ ││ │ │ │╰╮ │ + 0.5 ┤ ││ │╰╮╭╯│ │ │ │ │ │ + 0.5 ┤ ││ │ ││ │ │ │ │ │╭╯ + 0.4 ┤ ││╭╯ ││ ╰╮│ │ │ ││ + 0.3 ┤ │││ ││ ││ │╭╯ ││ + 0.3 ┤ │││ ││ ││ ││ ││ + 0.2 ┤ │││ ││ ││ ╰╯ ││ + 0.1 ┤ │╰╯ ││ ╰╯ ││ + 0.1 ┤ │ ││ ╰╯ + 0.0 ┼────────────────────╯ ╰╯ + streams.eval.regular.{blocked,total}_count (streams) + + + 15.6 ┼╮ + 14.6 ┤╰╮ + 13.5 ┤ ╰─╮ + 12.5 ┤ ╰╮ + 11.4 ┤ ╰─╮ + 10.4 ┤ ╰╮ + 9.3 ┤ ╰╮ + 8.3 ┤ ╰─╮ + 7.2 ┤ ╰╮ + 6.2 ┤ ╰╮ + 5.1 ┤ ╰─╮ + 4.1 ┤ ╰╮ + 3.0 ┤ ╰─╮ + 2.0 ┤ ╰╮ + 1.0 ┤ ╰╮ + -0.1 ┤ ╰─────────────────── + tokens.eval.regular.available (MiB) + + + 4.0 ┤ ╭────────────────╮ + 3.7 ┤ │ ╰╮ + 3.5 ┤ │ │ + 3.2 ┤ │ │ + 2.9 ┤ │ ╰╮ + 2.7 ┤ │ │ + 2.4 ┤ │ │ + 2.1 ┤ │ ╰╮ + 1.9 ┤ │ │ + 1.6 ┤ │ │ + 1.3 ┤ │ ╰╮ + 1.1 ┤ ╭─────────────────────────────────── + 0.8 ┤ │ + 0.5 ┤ │ + 0.3 ┤ │ + 0.0 ┼───╯ + rate(tokens.eval.regular.{deducted,returned}) (MiB/s) + + + 20.0 ┤ ╭────────────────╮ + 18.7 ┤ │ ╰╮ + 17.3 ┤ │ │ + 16.0 ┤ │ │ + 14.7 ┤ │ ╰╮ ╭────────────── + 13.3 ┤ │ │╭╯ + 12.0 ┤ │ ││ + 10.7 ┤ │ ╰│ + 9.3 ┤ │ ╭╯ + 8.0 ┤ │ ││ + 6.7 ┤ │ │╰╮ + 5.3 ┤ │ ╭╯ ╰────────────── + 4.0 ┤ │ │ + 2.7 ┤ │ │ + 1.3 ┤ │ ╭╯ + 0.0 ┼────────────────────╯ + rate(eval_wait.regular.requests.{admitted,waiting}) (reqs/s) +---- +---- + +# vim:ft=conf diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_single_slow_stream b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_single_slow_stream new file mode 100644 index 000000000000..a8f19c3bffa3 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_single_slow_stream @@ -0,0 +1,125 @@ +# Demonstrate that a single slow stream (with lower admission/token return +# rate) will end up pacing quorum writes (through the handle) to just the +# slowest rate. +init +handle=h +---- + +# Set up a triply connected handle (to s1, s2, s3) and start issuing writes at +# 1MiB/s. For two of the streams, return tokens at exactly the rate its being +# deducted (1MiB/s). For the third stream (s3), we return flow tokens at only +# 0.5MiB/s. +timeline +t=0s handle=h op=connect stream=t1/s1 log-position=1/0 +t=0s handle=h op=connect stream=t1/s2 log-position=1/0 +t=0s handle=h op=connect stream=t1/s3 log-position=1/0 +t=[0s,50s) handle=h class=regular adjust=-1MiB/s rate=10/s +t=[0.2s,50s) handle=h class=regular adjust=+1MiB/s rate=10/s stream=t1/s1 +t=[0.2s,50s) handle=h class=regular adjust=+1MiB/s rate=10/s stream=t1/s2 +t=[0.2s,50s) handle=h class=regular adjust=+0.5MiB/s rate=10/s stream=t1/s3 +---- + + +simulate +---- + +# Observe: +# - Total available tokens flatlines at 32MiB since flow tokens for s3 +# eventually depletes and later bounces off of 0MiB. We initially have +# 3*16MiB = 48MiB worth of flow tokens, and end up at 48MiB-16MiB = 32MiB. +# - Initially the rate of token deductions (3*1MiB/s = 3MiB/s) is higher than +# the token returns (1MiB/s+1MiB/s+0.5MiB/s = 2.5MiB/s), but after we start +# shaping it to the slowest stream, they end up matching at (0.5MiB/s*3 = +# 1.5MiB/s). +# - The blocked stream count bounces between 0 and 1 as the s3 stream gets +# blocked/unblocked as tokens are deducted/returned. The demand for tokens +# (1MiB/s) is higher than the replenishment rate (0.5MiB/s). +# - The overall admission rate through the handle is reduced from 10 reqs/s +# (corresponding to 1MiB/s) to 5 reqs/s (corresponding to 0.5MiB/s), the +# difference now found in the +5 reqs/s accumulating in the wait queue. +plot + +kvflowcontrol.tokens.eval.regular.available unit=MiB +kvflowcontrol.tokens.eval.regular.{deducted,returned} unit=MiB/s rate=true +kvflowcontrol.streams.eval.regular.blocked_count unit=streams +kvflowcontrol.eval_wait.regular.requests.{admitted,waiting} unit=reqs/s rate=true +---- +---- + 47.7 ┼╮ + 46.6 ┤╰─╮ + 45.6 ┤ ╰─╮ + 44.5 ┤ ╰╮ + 43.5 ┤ ╰─╮ + 42.4 ┤ ╰╮ + 41.4 ┤ ╰─╮ + 40.3 ┤ ╰─╮ + 39.3 ┤ ╰╮ + 38.2 ┤ ╰─╮ + 37.2 ┤ ╰─╮ + 36.1 ┤ ╰╮ + 35.1 ┤ ╰─╮ + 34.0 ┤ ╰─╮ + 33.0 ┤ ╰╮ + 31.9 ┤ ╰─────────────── + tokens.eval.regular.available (MiB) + + + 3.0 ┤╭───────────────────────╮ + 2.8 ┤│ │ + 2.6 ┤╭────────────────────────╮ + 2.4 ┤│ ╰│ + 2.2 ┤│ │ + 2.0 ┤│ │ + 1.8 ┤│ │ + 1.6 ┤│ ╰───────────── + 1.4 ┤│ + 1.2 ┤│ + 1.0 ┤│ + 0.8 ┤│ + 0.6 ┤│ + 0.4 ┤│ + 0.2 ┤│ + 0.0 ┼╯ + rate(tokens.eval.regular.{deducted,returned}) (MiB/s) + + + 1.0 ┤ ╭╮ ╭ + 0.9 ┤ ╭╮ ││ │ + 0.9 ┤ ││ ││ │ + 0.8 ┤ ││ ││ │ + 0.7 ┤ ││ ││ │ + 0.7 ┤ ╭╮ ││╭╮ ││ │ + 0.6 ┤ ││ ││││ ││╭─╮│ + 0.5 ┤ │╰╮│││╰╮│││ ││ + 0.5 ┤ │ ││││ ││││ ││ + 0.4 ┤ │ ││││ ││││ ││ + 0.3 ┤ │ ││││ ││││ ││ + 0.3 ┤ │ ╰╯││ ││││ ││ + 0.2 ┤ │ ││ ╰╯╰╯ ╰╯ + 0.1 ┤ ╭╯ ╰╯ + 0.1 ┤ │ + 0.0 ┼────────────────────────╯ + streams.eval.regular.blocked_count (streams) + + + 10.0 ┤╭───────────────────────╮ + 9.3 ┤│ │ + 8.7 ┤│ │ + 8.0 ┤│ ╰╮ + 7.3 ┤│ │ + 6.7 ┤│ │ + 6.0 ┤│ │ + 5.3 ┤│ ╭───────────── + 4.7 ┤│ │ + 4.0 ┤│ │ + 3.3 ┤│ │ + 2.7 ┤│ │ + 2.0 ┤│ ╭╯ + 1.3 ┤│ │ + 0.7 ┤│ │ + 0.0 ┼────────────────────────╯ + rate(eval_wait.regular.requests.{admitted,waiting}) (reqs/s) +---- +---- + +# vim:ft=conf diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_stream_connection b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_stream_connection new file mode 100644 index 000000000000..4ce66306cfe3 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_stream_connection @@ -0,0 +1,93 @@ +# Demonstrate the effects of a stream being added part way through the process. +init +handle=h +---- + +# Set up a doubly connected handle (to s1, s2) that later connects to s3 at +# t=5s. We issue writes at 1MiB/s and return tokens for each stream at the rate +# of deduction, only 200ms later. +timeline +t=0s handle=h op=connect stream=t1/s1 log-position=4/0 +t=0s handle=h op=connect stream=t1/s2 log-position=4/0 +t=5s handle=h op=connect stream=t1/s3 log-position=4/50 +t=[0s,10s) handle=h class=regular adjust=-1MiB/s rate=10/s +t=[0.2s,10.2s) handle=h class=regular adjust=+1MiB/s rate=10/s stream=t1/s1 +t=[0.2s,10.2s) handle=h class=regular adjust=+1MiB/s rate=10/s stream=t1/s2 +t=[5.2s,10.2s) handle=h class=regular adjust=+1MiB/s rate=10/s stream=t1/s3 +---- + +simulate t=[0s,11s) +---- + +# We observe: +# - Increase in regular stream count once s3 is added. +# - Regular available tokens increasing by 16MiB, corresponding to the +# newly created stream +# - Token deductions increasing from 2MiB/s (for two connected connected +# streams) to 3MiB/s. +plot + +kvflowcontrol.streams.eval.regular.total_count unit=streams +kvflowcontrol.tokens.eval.regular.available unit=MiB +kvflowcontrol.tokens.eval.regular.{deducted,returned} unit=MiB/s rate=true +---- +---- + 3.0 ┤ ╭───────────────────── + 2.9 ┤ │ + 2.9 ┤ │ + 2.8 ┤ │ + 2.7 ┤ │ + 2.7 ┤ │ + 2.6 ┤ │ + 2.5 ┤ │ + 2.5 ┤ │ + 2.4 ┤ │ + 2.3 ┤ │ + 2.3 ┤ │ + 2.2 ┤ │ + 2.1 ┤ │ + 2.1 ┤ │ + 2.0 ┼─────────────────╯ + streams.eval.regular.total_count (streams) + + + 48.0 ┤ ╭─── + 46.9 ┤ ╭─────────────────╯ + 45.8 ┤ │ + 44.7 ┤ │ + 43.6 ┤ │ + 42.5 ┤ │ + 41.4 ┤ │ + 40.3 ┤ │ + 39.3 ┤ │ + 38.2 ┤ │ + 37.1 ┤ │ + 36.0 ┤ │ + 34.9 ┤ │ + 33.8 ┤ │ + 32.7 ┤ │ + 31.6 ┼─────────────────╯ + tokens.eval.regular.available (MiB) + + + 3.0 ┤ ╭╭──────────────╮ + 2.8 ┤ ╭╯ ││ + 2.6 ┤ ╭│ ╰│ + 2.4 ┤ ╭╭╯ │ + 2.2 ┤ ╭╭╯ ╰╮ + 2.0 ┤ ╭──────────────╯ ││ + 1.8 ┤ │ ││ + 1.6 ┤ │ ╰│ + 1.4 ┤ │ ╰╮ + 1.2 ┤ │ ││ + 1.0 ┤ │ ││ + 0.8 ┤ │ ╰│ + 0.6 ┤ │ ╰ + 0.4 ┤ │ │ + 0.2 ┤ │ │ + 0.0 ┼───╯ ╰ + rate(tokens.eval.regular.{deducted,returned}) (MiB/s) +---- +---- + +# vim:ft=conf diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_stream_disconnection b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_stream_disconnection new file mode 100644 index 000000000000..0688ee79068f --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/range_controller_stream_disconnection @@ -0,0 +1,114 @@ +# Demonstrate the effects of a slow stream (that's causing a build up of +# waiting requests) being disconnected. This test is similar to the one in +# handle_single_slow_stream, except the slow stream is eventually disconnected. + +init +handle=h +---- + +# Set up a triply connected handle (to s1, s2, s3) and start issuing writes at +# 1MiB/s. For two of the streams, return tokens at exactly the rate its being +# deducted (1MiB/s). For the third stream (s3), we return flow tokens at only +# 0.5MiB/s. At t=35s, disconnect the slow stream. +timeline +t=0s handle=h op=connect stream=t1/s1 log-position=4/0 +t=0s handle=h op=connect stream=t1/s2 log-position=4/0 +t=0s handle=h op=connect stream=t1/s3 log-position=4/0 +t=[0s,50s) handle=h class=regular adjust=-1MiB/s rate=10/s +t=[0s,50s) handle=h class=regular adjust=+1MiB/s rate=10/s stream=t1/s1 +t=[0s,50s) handle=h class=regular adjust=+1MiB/s rate=10/s stream=t1/s2 +t=[0s,35s) handle=h class=regular adjust=+0.5MiB/s rate=10/s stream=t1/s3 +t=35s handle=h op=disconnect stream=t1/s3 +---- + +simulate +---- + +# Zoom in near the point where writes are being shaped by the slowest stream +# (s3, at 0.5MiB/s). We see the blocked stream count bouncing between 0 and 1 +# as tokens get depleted and replenished (demand here is higher than the +# replenishment rate). +# +# As soon as s3 is disconnected, we see a release of 16MiB of held tokens back +# into the node-level controller (32MiB -> 48MiB). We see a burst in the number +# of stream-specific/controller-level requests bypassing Admit() due to the +# stream disconnecting. At the handle-level this just appears as a burst in +# admitted requests. After s3 disconnects, the handle-level admission rate goes +# back to what it was before traffic was shaped by s3. +# +# TODO(irfansharif): The post-stream disconnection burst might lead to +# severe over-admission since it may have been long since we observed available +# tokens for the still connected streams. In fact, many requests that started +# waiting on the soon-to-be-disconnected-stream are in the same boat, all of +# which will now get admitted. One thing we could do is to try and observe +# available tokens again for still-connected streams. +plot t=[30s,40s) +kvflowcontrol.streams.eval.regular.blocked_count unit=streams +kvflowcontrol.tokens.eval.regular.available unit=MiB +kvflowcontrol.eval_wait.elastic.requests.bypassed unit=reqs/s rate=true +kvflowcontrol.eval_wait.regular.requests.{admitted,waiting} unit=reqs/s rate=true +---- +---- + 1.0 ┤ ╭╮ + 0.9 ┤ ││ + 0.9 ┤ ╭╮ ││ ╭╮ + 0.8 ┤ ││ ││ ││ + 0.7 ┤ ││ ││ ││ + 0.7 ┤ ││ ││ ││ + 0.6 ┤ │╰╮ ││ ╭╯│ + 0.5 ┤ │ │ ││ │ │ + 0.5 ┤ │ │╭╯╰╮│ │ + 0.4 ┤ │ ││ ││ │ + 0.3 ┤ ╭╯ ││ ││ ╰╮ + 0.3 ┤ │ ││ ││ │ + 0.2 ┤ │ ││ ││ ╰╮ + 0.1 ┤ │ ││ ││ │ + 0.1 ┤ │ ╰╯ ╰╯ │ + 0.0 ┼───────╯ ╰─────────────────── + streams.eval.regular.blocked_count (streams) + + + 48.0 ┤ ╭─────────────────── + 46.9 ┤ │ + 45.9 ┤ │ + 44.8 ┤ │ + 43.7 ┤ │ + 42.6 ┤ │ + 41.6 ┤ │ + 40.5 ┤ │ + 39.4 ┤ │ + 38.3 ┤ │ + 37.3 ┤ │ + 36.2 ┤ │ + 35.1 ┤ │ + 34.0 ┤ │ + 33.0 ┼─╮ │ + 31.9 ┤ ╰─────────────────╯ + tokens.eval.regular.available (MiB) + + + 0.0 ┼─────────────────────────────────────── + rate(eval_wait.elastic.requests.bypassed) (reqs/s) + + + 25.0 ┤ ╭─╮ + 22.3 ┤ ╭╯ │ + 19.7 ┤ │ │ + 17.0 ┤ ╭╯ │ + 14.3 ┤ │ │ + 11.7 ┼───────╮ │ ╰─────────────── + 9.0 ┤ ╰─╮ │ + 6.3 ┤ ╰╭────────╮ + 3.7 ┤ ╭─╯ │ + 1.0 ┼────────╯ │ ╭─────────────── + -1.7 ┤ │ │ + -4.3 ┤ │ │ + -7.0 ┤ ╰╮ │ + -9.7 ┤ │ │ + -12.3 ┤ ╰─╮│ + -15.0 ┤ ╰╯ + rate(eval_wait.regular.requests.{admitted,waiting}) (reqs/s) +---- +---- + +# vim:ft=conf diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_admitting_waiting_choppy b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_admitting_waiting_choppy new file mode 100644 index 000000000000..34b9eb5a5919 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_admitting_waiting_choppy @@ -0,0 +1,199 @@ +# Show how waiting work ends up getting admitted choppily if the flow tokens +# being returned are being done so in a similarly choppy way. This is showing +# that we're shaping incoming writes to exactly the rate of flow token returns, +# i.e. we're controlling the flow tightly. +init +---- + +# Set up an open-loop thread issuing 2MiB/s of regular writes from t=0s to +# t=25s. +timeline +t=[0s,25s) class=regular stream=t1/s1 adjust=-2MiB/s rate=10/s +---- + +# Set up choppy flow token returns starting at t=15s. The average rate of +# returns is lower than 2MiB/s, so we should always have some waiting work. +timeline +t=[15s,16s) class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +t=[16s,17s) class=regular stream=t1/s1 adjust=+2.1MiB/s rate=10/s +t=[17s,18s) class=regular stream=t1/s1 adjust=+0.1MiB/s rate=10/s +t=[18s,19s) class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +t=[19s,20s) class=regular stream=t1/s1 adjust=+2.1MiB/s rate=10/s +t=[20s,21s) class=regular stream=t1/s1 adjust=+0.1MiB/s rate=10/s +t=[21s,22s) class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +t=[22s,23s) class=regular stream=t1/s1 adjust=+2.1MiB/s rate=10/s +t=[23s,24s) class=regular stream=t1/s1 adjust=+0.1MiB/s rate=10/s +t=[24s,25s) class=regular stream=t1/s1 adjust=+0.9MiB/s rate=10/s +---- + +simulate +---- + +# Observe the initial smooth rate of flow token deductions, and later the +# choppier rate of flow token returns which we induced above. Notice that the +# rate of flow token deductions exactly mirrors the flow token returns, so +# traffic shaping is happening. +plot + +kvflowcontrol.tokens.eval.regular.{deducted,returned} unit=MiB/s rate=true +kvflowcontrol.tokens.eval.regular.{deducted,returned} unit=MiB +kvflowcontrol.tokens.eval.regular.available unit=MiB +---- +---- + 2.0 ┤ ╭──────────╮ ╭╮ ╭╮ + 1.9 ┤ │ │ ││ ╭╮ + 1.7 ┤ │ │ ╭╮╮ ││ ││ + 1.6 ┤ │ │ │││ ││ ││ + 1.5 ┤ │ ╰╮ │╰╮ ││ ╭╯│ + 1.3 ┤ │ │ │ │ ││ │╯│ + 1.2 ┤ │ │ │ │ ╭╯│╮ │ │ + 1.1 ┤ │ │ │ │ │ ╰╮ │ │ + 0.9 ┤ │ │ ╭╯ │ │ │ │ │╮╭ + 0.8 ┤ │ │ │╯ │ │ │╭╯ │││ + 0.7 ┤ │ │ │ │ │ ││ ╰╮│ + 0.5 ┤ │ │ │ │╭╯ ││ ││ + 0.4 ┤ │ │ ╭╯ ││╯ ││ ╰╯ + 0.3 ┤ │ │ │╯ ││ ╰╯ ╰╯ + 0.1 ┤ │ ╰╮ │ ╰╯ ╰╯ + 0.0 ┼───────────────────────╯ + rate(tokens.eval.regular.{deducted,returned}) (MiB/s) + + + 26.2 ┤ ╭── + 24.5 ┤ ╭─╯ + 22.7 ┤ ╭──╯ + 21.0 ┤ ╭─╯ + 19.2 ┤ ╭──╯ + 17.5 ┤ ╭─╯ + 15.7 ┤ ╭────────────╯ + 14.0 ┤ ╭╯ + 12.2 ┤ ╭─╯ + 10.5 ┤ ╭╯ ╭─ + 8.7 ┤ ╭─╯ ╭──╯ + 7.0 ┤ ╭╯ ╭──╯ + 5.2 ┤ ╭╯ ╭─╯ + 3.5 ┤ ╭─╯ ╭──╯ + 1.7 ┤╭╯ ╭─╯ + 0.0 ┼────────────────────────╯ + tokens.eval.regular.{deducted,returned} (MiB) + + + 15.8 ┼╮ + 14.7 ┤╰╮ + 13.7 ┤ │ + 12.6 ┤ ╰╮ + 11.5 ┤ ╰╮ + 10.5 ┤ ╰╮ + 9.4 ┤ ╰╮ + 8.3 ┤ ╰╮ + 7.3 ┤ │ + 6.2 ┤ ╰╮ + 5.1 ┤ ╰╮ + 4.1 ┤ ╰╮ + 3.0 ┤ ╰╮ + 1.9 ┤ ╰╮ + 0.9 ┤ │ + -0.2 ┤ ╰─────────────────────────── + tokens.eval.regular.available (MiB) +---- +---- + + +# Zoom into the more interesting second half of the graph, where flow tokens +# are being returned choppily. Given the average rate of returns is lower +# than what's being requested (2MiB/s), the total flow tokens available bounces +# off of zero. +plot t=[15s,30s) +kvflowcontrol.tokens.eval.regular.available unit=MiB +---- + 0.2 ┤ ╭╮ + 0.2 ┤ ╭╮ ╭╯│ + 0.1 ┤ ╭╯│ ╭╯ │ + 0.1 ┤ ╭╯ │ ╭╯ │ + 0.1 ┤ ╭╯ │ ╭╮ ╭╮ │ │ + 0.1 ┤ │ │ ││╭╯│ │ │ + 0.0 ┤ │ │ ╭╮ │╰╯ │ │ │ + 0.0 ┤ │ │ ╭╯│ │ │ ╭╮│ │ ╭╮ + -0.0 ┤╭╮ │ │ │ │ │ │ ╭──╯││ │ ││ ╭ + -0.0 ┤││╭╯ │ │ ╰╮│ │ ╭╯ ││ │ ││╭╯ + -0.1 ┤│╰╯ │ │ ││ │╭╯ ╰╯ │ │╰╯ + -0.1 ┤│ │ │ ╰╯ ╰╯ │ │ + -0.1 ┼╯ │ ╭╯ │ ╭╯ + -0.1 ┤ │ ╭╯ │ ╭╯ + -0.2 ┤ │╭╯ │╭╯ + -0.2 ┤ ╰╯ ╰╯ + tokens.eval.regular.available (MiB) + + +# Note again the mirroring between token returns which immediately allows +# admission, followed by token deductions. +plot t=[15s,30s) +kvflowcontrol.tokens.eval.regular.{deducted,returned} unit=MiB/s rate=true +---- + 2.1 ┤ ╭╮ + 2.0 ┤ ╭╮│ ╭╮ ╭╮╮ + 1.9 ┤ │╰╮ ││╮ │││ + 1.7 ┤ ╭╯╯│ ╭╯╰╮ ╭╯╰╮ + 1.6 ┤ ││ │╮ │╯ │ │╯ │ + 1.4 ┤ │╯ ││ ╭╯ │ ╭╯ │╮ + 1.3 ┤ ╭╯ ╰╮ │ ╰╮ │╯ ││ + 1.1 ┤ │╯ │╮ ╭╯ │ ╭╯ ╰╮ + 1.0 ┤ ╭╯ ││ │╯ │ │╯ │╮ + 0.9 ┤ │╯ ╰╮ ╭╯ │╮ ╭╯ ││ ╭ + 0.7 ┤ ╭╯ │ │ ╰╮ ╭╯ ││ ╭╯ + 0.6 ┤ ╭╯╯ │╮ ╭╯ │ │╯ ╰╮ │╯ + 0.4 ┤ │╯ ││╭╯ │ ╭╯ │╮╭╯ + 0.3 ┤╭╯ ╰─╯│ ╰─╯ │╭╯ + 0.1 ┼╯╯ │╭╯ ╰╯ ╰╯╯ +-0.0 ┼╯ ╰╯ + rate(tokens.eval.regular.{deducted,returned}) (MiB/s) + + +# So we're still admitting work choppily, and observe corresponding deductions +# in the waiting request rate. But given the open-loop thread above, the # of +# waiting request is still growing unboundedly. +plot t=[15s,30s) +kvflowcontrol.eval_wait.regular.requests.{admitted,waiting} unit=reqs/s rate=true +kvflowcontrol.eval_wait.regular.requests.waiting unit=reqs +---- +---- + 10.7 ┤ ╭╮ + 9.9 ┼╮ ││ ╭╮ ╭╮ ╭╮ + 9.2 ┤╰╮ ╭╯│ │╰╮ ╭─╮ ╭╮ ╭╯│ ││ + 8.4 ┤ │ │ ╰╮ │ │ ╭╯ │ ╭╯╰╮ ╭╯ │ │╰╮ + 7.7 ┤ │ ╭╯ │ │ ╰╮ │ │ │ │ │ ╰╮╭╯ │ + 6.9 ┤ ╰╮ │ │╭╯ │ ╭╯ ╰╮│ ╰╮ ╭╯ ││ ╰╮ + 6.1 ┤ ╰╮╭╯ ││ ╰╮│ ╭╯ │ │ ││ ╰ + 5.4 ┤ ││ ╰│ │╯ │ ╰╮╯ ╭╯ + 4.6 ┤ ╰╮ ╭╯ ╰╮ │ ╭╰╮ │╮ + 3.9 ┤ ╭╰╮ ││ ╭╯│ │╮ │ │ ││ ╭ + 3.1 ┤ ╭╯ ╰╮ │╰╮ │ ╰╮ ╭╯│ ╭╯ ╰╮ ╭╯│ ╭╯ + 2.3 ┤ ╭╯ │ │ │ ╭╯ ╰╮ │ │ ╭╯ ╰╮ │ ╰╮╭╯ + 1.6 ┤ │ ╰╮╭╯ │ │ │ │ ╰╮│ │ │ ││ + 0.8 ┤╭╯ ││ │╭╯ ╰─╯ ╰╯ ╰╮│ ╰╯ + 0.1 ┼╯ ││ ╰╯ ╰╯ + -0.7 ┤ ╰╯ + rate(eval_wait.regular.requests.{admitted,waiting}) (reqs/s) + + + 118 ┤ ╭ + 115 ┤ ╭──╯ + 112 ┤ ╭╯ + 108 ┤ ╭╯ + 105 ┤ ╭─────╯ + 102 ┤ ╭─╯ + 99 ┤ ╭─╯ + 96 ┤ ╭─╯ + 92 ┤ ╭╯ + 89 ┤ ╭─────╯ + 86 ┤ ╭─╯ + 83 ┤ ╭─╯ + 80 ┤ ╭╯ + 76 ┤ ╭╯ + 73 ┤ ╭──────╯ + 70 ┼─╯ + eval_wait.regular.requests.waiting (reqs) +---- +---- + +# vim:ft=conf diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_over_admission b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_over_admission new file mode 100644 index 000000000000..25591049d74f --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_over_admission @@ -0,0 +1,97 @@ +# Demonstrate how any delay in token deduction after being admitted can lead to +# over-admission. +init +---- + +# Take away all 16MiB of regular flow tokens; we want a buildup of waiting +# requests to over-admit from. +timeline +t=[0s,1s) class=regular stream=t1/s1 adjust=-16MiB/s rate=1/s +---- + +# Queue 10/s*10s=100 requests for admission, asking for 10*4MiB=40MiB of +# tokens. For these requests, induce a 2s delay between Admit() and +# DeductTokens(). +timeline +t=[10s,20s) class=regular stream=t1/s1 adjust=-4MiB/s rate=10/s deduction-delay=2s +---- + +# Return 1KiB of flow tokens at t=30. +timeline +t=[30s,31s) class=regular stream=t1/s1 adjust=+1KiB/s rate=1/s +---- + +simulate t=[0s,40s) +---- + +# Observe how the single 1KiB flow token return ends up admitting all 100 +# waiting requests, over-admitting by 40MiB. +# +# TODO(kvoli,sumeerbhola): Introduce a "tentative deducted counter" on a per-stream +# basis, to prevent this kind of over-admission. It's likely to occur any time +# there's AC queueing due to CPU control, waiting on locks/latches, etc. +plot t=[0s,40s) +kvflowcontrol.tokens.eval.regular.available unit=MiB +kvflowcontrol.eval_wait.regular.requests.admitted unit=reqs/s rate=true +kvflowcontrol.eval_wait.regular.requests.waiting unit=reqs/s +---- +---- + 0.0 ┼───────────────────────────────╮ + -2.7 ┤ │ + -5.3 ┤ │ + -8.0 ┤ │ + -10.7 ┤ │ + -13.3 ┤ │ + -16.0 ┤ │ + -18.7 ┤ │ + -21.3 ┤ │ + -24.0 ┤ │ + -26.7 ┤ │ + -29.3 ┤ │ + -32.0 ┤ │ + -34.7 ┤ │ + -37.3 ┤ │ + -40.0 ┤ ╰─────── + tokens.eval.regular.available (MiB) + + + 100.0 ┤ ╭╮ + 93.3 ┤ ││ + 86.7 ┤ ││ + 80.0 ┤ ││ + 73.3 ┤ ││ + 66.7 ┤ ││ + 60.0 ┤ ││ + 53.3 ┤ ││ + 46.7 ┤ ││ + 40.0 ┤ ││ + 33.3 ┤ ││ + 26.7 ┤ ││ + 20.0 ┤ ││ + 13.3 ┤ ││ + 6.7 ┤ ││ + 0.0 ┼─────────────────────────────╯╰──────── + rate(eval_wait.regular.requests.admitted) (reqs/s) + + + 100.0 ┤ ╭─────────╮ + 93.3 ┤ ╭╯ │ + 86.7 ┤ ╭╯ │ + 80.0 ┤ │ │ + 73.3 ┤ ╭╯ │ + 66.7 ┤ ╭╯ │ + 60.0 ┤ │ │ + 53.3 ┤ ╭╯ │ + 46.7 ┤ ╭╯ │ + 40.0 ┤ │ │ + 33.3 ┤ ╭╯ │ + 26.7 ┤ ╭╯ │ + 20.0 ┤ │ │ + 13.3 ┤ ╭╯ │ + 6.7 ┤ │ │ + 0.0 ┼──────────╯ ╰───────── + eval_wait.regular.requests.waiting (reqs/s) +---- +---- + +# vim:ft=conf diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_overview b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_overview new file mode 100644 index 000000000000..2ab35c374920 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_overview @@ -0,0 +1,188 @@ +# Walk through the basics of the datadriven syntax for flow controller tests. +init +---- + +# Set up a worker deducting flow tokens at 2MiB/s over 100 reqs/s for regular +# write work to two stores, s1 and s2. With 16MiB or regular tokens, we expect +# it to get depleted in 8s. The remaining 8s of incoming work ends up getting +# queued. +timeline +t=[0s,16s) class=regular stream=t1/s1 adjust=-2MiB/s rate=100/s +t=[0s,16s) class=regular stream=t1/s2 adjust=-2MiB/s rate=100/s +---- + +# Schedule the return of flow tokens at t=20s in similar increments. +timeline +t=[20s,36s) class=regular stream=t1/s1 adjust=+2MiB/s rate=100/s +t=[20s,36s) class=regular stream=t1/s2 adjust=+2MiB/s rate=100/s +---- + +simulate t=[0s,40s) +---- + +# There two replication streams, so we initially have 16*2=32MiB of regular +# flow tokens and 8*2=16MiB of elastic flow tokens. Since we're not returning +# flow tokens until t=20s, we deplete them at t=8s. Also note that despite this +# being regular traffic, we deduct elastic flow tokens for coarse intra-tenant +# prioritization -- those tokens get depleted at t=4s. Both are deducted at +# 4MiB/s (2MiB/s for each stream) until we have 0MiB regular tokens and -16MiB +# elastic tokens. +plot t=[0s,20s) +kvflowcontrol.tokens.eval.{regular,elastic}.available unit=MiB +kvflowcontrol.tokens.eval.{regular,elastic}.deducted unit=MiB/s rate=true +---- +---- + 32.0 ┼╮ + 28.8 ┤╰─╮ + 25.6 ┤ ╰╮ + 22.4 ┤ ╰─╮ + 19.2 ┤ ╰─╮ + 16.0 ┼╮ ╰╮ + 12.8 ┤╰─╮ ╰─╮ + 9.6 ┤ ╰╮ ╰╮ + 6.4 ┤ ╰─╮ ╰─╮ + 3.2 ┤ ╰─╮ ╰╮ + -0.0 ┤ ╰╮ ╰──────────────────────── + -3.2 ┤ ╰─╮ + -6.4 ┤ ╰╮ + -9.6 ┤ ╰─╮ + -12.8 ┤ ╰╮ + -16.0 ┤ ╰──────────────────────── + tokens.eval.{regular,elastic}.available (MiB) + + + 4.0 ┤ ╭─────────────╮ + 3.7 ┤ │ │ + 3.5 ┤ │ ╰╮ + 3.2 ┤ │ │ + 2.9 ┤ │ │ + 2.7 ┤ │ │ + 2.4 ┤ │ │ + 2.1 ┤ │ │ + 1.9 ┤ │ │ + 1.6 ┤ │ │ + 1.3 ┤ │ ╰╮ + 1.1 ┤ │ │ + 0.8 ┤ │ │ + 0.5 ┤ │ │ + 0.3 ┤ │ │ + 0.0 ┼─╯ ╰───────────────────── + rate(tokens.eval.{regular,elastic}.deducted) (MiB/s) +---- +---- + +# Plot the rates at which we (i) admit work when there are flow tokens +# available, and (ii) enqueue work when there are none. Since we're generating +# 100 requests/s for two streams, we observe an aggregate admission rate of +# ~200/s, and then later 200/s of waiting requests growth. There are no +# errors. +plot t=[0s,20s) +kvflowcontrol.eval_wait.regular.requests.{admitted,waiting} unit=reqs/s rate=true +kvflowcontrol.eval_wait.{regular,elastic}.requests.errored unit=reqs/s rate=true +---- +---- + 200 ┤ ╭─────────────╮ ╭─────────────╮ + 187 ┤ │ │ │ │ + 173 ┤ │ ╰╮│ │ + 160 ┤ │ ││ │ + 147 ┤ │ ││ │ + 133 ┤ │ ╭╯ ╰╮ + 120 ┤ │ │ │ + 107 ┤ │ │ │ + 93 ┤ │ │ │ + 80 ┤ │ │ │ + 67 ┤ │ │╮ │ + 53 ┤ │ ││ │ + 40 ┤ │ ││ │ + 27 ┤ │ ╭╯│ ╰╮ + 13 ┤ │ │ │ │ + 0 ┼───────────────╯ ╰───────────────╰───── + rate(eval_wait.regular.requests.{admitted,waiting}) (reqs/s) + + + 0.0 ┼─────────────────────────────────────── + rate(eval_wait.{regular,elastic}.requests.errored) (reqs/s) +---- +---- + +# Confirm that there are two streams underneath, both of which eventually get +# blocked for {regular,elastic} traffic, with the latter happening first. +plot t=[0s,20s) +kvflowcontrol.streams.eval.regular.total_count unit=streams +kvflowcontrol.streams.eval.{regular,elastic}.blocked_count unit=streams +---- +---- + 2.0 ┼─────────────────────────────────────── + streams.eval.regular.total_count (streams) + + + 2.0 ┤ ╭─────────────────────────────── + 1.9 ┤ │ │ + 1.7 ┤ │ │ + 1.6 ┤ │ │ + 1.5 ┤ │ │ + 1.3 ┤ │ │ + 1.2 ┤ │ │ + 1.1 ┤ │ │ + 0.9 ┤ │ │ + 0.8 ┤ │ │ + 0.7 ┤ │ │ + 0.5 ┤ │ │ + 0.4 ┤ │ │ + 0.3 ┤ │ │ + 0.1 ┤ │ │ + 0.0 ┼───────╯───────╯ + streams.eval.{regular,elastic}.blocked_count (streams) +---- +---- + +# Observe what happens once flow tokens are returned -- we start admitting work +# at ~200/s, which matches the rate at which we're reducing the number of +# waiting requests. By t=36s we'll have returned all {regular,elastic} flow +# tokens, including for the requests that had to wait for admission. +plot t=[18s,40s) +kvflowcontrol.eval_wait.regular.requests.{admitted,waiting} unit=reqs/s rate=true +kvflowcontrol.tokens.eval.{regular,elastic}.available unit=MiB +---- +---- + 200 ┤ ╭───────────╮ + 175 ┤ │ ╰╮ + 150 ┤ ╭╯ │ + 125 ┤ │ │ + 100 ┤ │ │ + 75 ┤ │ │ + 50 ┤ ╭╯ ╰╮ + 25 ┤ │ │ + 0 ┼───╮ ╭─────────────────── + -25 ┤ │ │ + -50 ┤ ╰╮ ╭╯ + -75 ┤ │ │ + -100 ┤ │ │ + -125 ┤ │ │ + -150 ┤ ╰╮ │ + -175 ┤ │ ╭╯ + -200 ┤ ╰───────────╯ + rate(eval_wait.regular.requests.{admitted,waiting}) (reqs/s) + + + 32.0 ┤ ╭─────── + 28.8 ┤ ╭─╯ + 25.6 ┤ ╭╯ + 22.4 ┤ ╭╯ + 19.2 ┤ ╭─╯ + 16.0 ┤ ╭╯ ╭─────── + 12.8 ┤ ╭─╯ ╭─╯ + 9.6 ┤ ╭╯ ╭╯ + 6.4 ┤ ╭─╯ ╭╯ + 3.2 ┤ ╭╯ ╭─╯ + -0.0 ┼──────────────────╯ ╭╯ + -3.2 ┤ ╭─╯ + -6.4 ┤ ╭╯ + -9.6 ┤ ╭─╯ + -12.8 ┤ ╭╯ + -16.0 ┼──────────────────╯ + tokens.eval.{regular,elastic}.available (MiB) +---- +---- + +# vim:ft=conf diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_regular_elastic_prioritization b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_regular_elastic_prioritization new file mode 100644 index 000000000000..48a0ae26b738 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/simulation/streams_regular_elastic_prioritization @@ -0,0 +1,119 @@ +# Show that elastic work can get completely starved out by regular work, but +# not the other way around. +init +---- + +# Kick off two threads, one for each work class, each writing at 1MiB/s over +# across reqs/s. +timeline +t=[0s,16s) class=regular stream=t1/s1 adjust=-1MiB/s rate=100/s +t=[0s,8s) class=elastic stream=t1/s1 adjust=-1MiB/s rate=100/s +---- + +simulate +---- + +# Observe that initially elastic tokens deplete faster than regular tokens +# (while elastic tokens > 0MiB), since regular work deducts from both but +# elastic work only deducts from the elastic bucket. Eventually the rate of +# elastic token deductions slows down since elastic requests stop being +# admitted (and thus deducting tokens) once elastic tokens <= 0 MiB. So it's +# only regular request deductions from that point on. See +# TestFlowTokenAdjustment for more details. +plot +kvflowcontrol.tokens.eval.{regular,elastic}.available unit=MiB +---- + 16.0 ┼╮ + 14.1 ┤╰────╮ + 12.3 ┤ ╰───╮ + 10.4 ┤ ╰────╮ + 8.6 ┤ ╰───╮ + 6.7 ┼─╮ ╰────╮ + 4.8 ┤ ╰──╮ ╰───╮ + 3.0 ┤ ╰─╮ ╰────╮ + 1.1 ┤ ╰─╮ ╰───╮ + -0.7 ┤ ╰───╮ ╰── + -2.6 ┤ ╰───╮ + -4.5 ┤ ╰────╮ + -6.3 ┤ ╰───╮ + -8.2 ┤ ╰────╮ + -10.0 ┤ ╰───╮ + -11.9 ┤ ╰──── + tokens.eval.{regular,elastic}.available (MiB) + + +# Confirm that all throughout we're able to admit regular requests at its +# incoming rate of 100/s. But for elastic requests, once we're out of elastic +# flow tokens, we stop admitting and start waiting instead. We run of elastic +# tokens faster since there are fewer of them (8MiB instead of 16MiB), and also +# they're deducted by both regular and elastic work, compared to regular tokens +# that are deducted only by regular work. +plot +kvflowcontrol.eval_wait.regular.requests.{admitted,waiting} unit=reqs/s rate=true +kvflowcontrol.eval_wait.elastic.requests.{admitted,waiting} unit=reqs/s rate=true +---- +---- + 100.0 ┤ ╭──────────────────────────────────── + 93.3 ┤ │ + 86.7 ┤ │ + 80.0 ┤ │ + 73.3 ┤ │ + 66.7 ┤ │ + 60.0 ┤ │ + 53.3 ┤ │ + 46.7 ┤ │ + 40.0 ┤ │ + 33.3 ┤ │ + 26.7 ┤ │ + 20.0 ┤ │ + 13.3 ┤ │ + 6.7 ┤ │ + 0.0 ┼─────────────────────────────────────── + rate(eval_wait.regular.requests.{admitted,waiting}) (reqs/s) + + + 100.0 ┤ ╭──────╮ ╭──────╮ + 93.3 ┤ │ ╰╮╭╯ │ + 86.7 ┤ │ ││ ╰╮ + 80.0 ┤ │ ││ │ + 73.3 ┤ │ ││ │ + 66.7 ┤ │ ││ │ + 60.0 ┤ │ ││ │ + 53.3 ┤ │ ╰│ │ + 46.7 ┤ │ ╭╯ │ + 40.0 ┤ │ ││ ╰╮ + 33.3 ┤ │ ││ │ + 26.7 ┤ │ ││ │ + 20.0 ┤ │ ││ │ + 13.3 ┤ │ ││ │ + 6.7 ┤ │ ╭╯╰╮ │ + 0.0 ┼─────────╯ ╰────────╰───────────────── + rate(eval_wait.elastic.requests.{admitted,waiting}) (reqs/s) +---- +---- + +# Confirm the above -- when both regular and elastic work gets admitted, we're +# deducting elastic tokens at 2MiB/s, and at t=4s when elastic work gets +# blocked, we start deducting at 1MiB/s. +plot +kvflowcontrol.tokens.eval.elastic.deducted unit=MiB/s rate=true +---- + 2.0 ┤ ╭──────╮ + 1.9 ┤ │ ╰╮ + 1.7 ┤ │ │ + 1.6 ┤ │ │ + 1.5 ┤ │ ╰╮ + 1.3 ┤ │ │ + 1.2 ┤ │ │ + 1.1 ┤ │ ╰─────────────────────────── + 0.9 ┤ │ + 0.8 ┤ │ + 0.7 ┤ │ + 0.5 ┤ │ + 0.4 ┤ │ + 0.3 ┤ │ + 0.1 ┤ │ + 0.0 ┼──╯ + rate(tokens.eval.elastic.deducted) (MiB/s) + +# vim:ft=conf diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel index 94a9f34bf2da..f171a779f5d1 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel @@ -39,6 +39,7 @@ go_test( "admission_test.go", "close_scheduler_test.go", "processor_test.go", + "simulation_test.go", ], data = glob(["testdata/**"]), embed = [":replica_rac2"], diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/simulation_test.go new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/pkg/util/asciitsdb/asciitsdb.go b/pkg/util/asciitsdb/asciitsdb.go index ec8c282e065f..374e8de400a3 100644 --- a/pkg/util/asciitsdb/asciitsdb.go +++ b/pkg/util/asciitsdb/asciitsdb.go @@ -15,6 +15,8 @@ import ( "fmt" "math" "reflect" + "sort" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -92,7 +94,6 @@ func (t *TSDB) Register(mstruct interface{}) { // registered metrics. func (t *TSDB) Scrape(ctx context.Context) { // TB: This code is cargo culted entirely from the TSDB scraper. - t.mu.Lock() defer t.mu.Unlock() t.mu.scraped = true @@ -156,6 +157,9 @@ func (t *TSDB) Plot(metrics []string, options ...Option) string { for _, metric := range metrics { points, ok := t.read(metric) require.Truef(t.t, ok, "%s not found", metric) + if len(points) == 0 { + require.NotEqual(t.t, len(points), 0, "no data points for %s:\n%s", metric, t.String()) + } if c.rate != 0 { points = rate(points, c.rate) } @@ -187,6 +191,42 @@ func (t *TSDB) read(metric string) ([]float64, bool) { return points, ok } +// TODO(kvoli): Delete this after debugging. +func (t *TSDB) String() string { + var buf strings.Builder + t.mu.Lock() + defer t.mu.Unlock() + + for metric, points := range t.mu.points { + buf.WriteString(fmt.Sprintf("%s: [", metric)) + for i, point := range points { + if i > 0 { + buf.WriteString(", ") + + } + buf.WriteString(fmt.Sprintf("%-2f", point)) + } + buf.WriteString("]\n") + } + + return buf.String() +} + +func (t *TSDB) RegisteredMetricNames() []string { + var names []string + t.mu.Lock() + defer t.mu.Unlock() + + for metric := range t.mu.points { + names = append(names, metric) + } + + // For deterministic output. + sort.Strings(names) + + return names +} + func (t *TSDB) registerMetricValue(val reflect.Value, name string, skipNil bool) { if val.Kind() == reflect.Ptr && val.IsNil() { if skipNil {