From 7f2459dd12188280c2ec4fa3c9094428b98e2221 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 19 Dec 2022 22:25:43 +0000 Subject: [PATCH 1/5] asim: move test workload gen into workload pkg Previously the testing utility to generate a simple workload existed within the `asim` pkg. This patch moves it into the `workload` pkg so that it can be depended upon by additional pkgs, such as metrics. Release note: None --- pkg/kv/kvserver/asim/asim_test.go | 28 +++----------------- pkg/kv/kvserver/asim/metrics_tracker_test.go | 2 +- pkg/kv/kvserver/asim/workload/workload.go | 21 +++++++++++++++ 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/pkg/kv/kvserver/asim/asim_test.go b/pkg/kv/kvserver/asim/asim_test.go index bdb9fe2f5c40..759a3019e27c 100644 --- a/pkg/kv/kvserver/asim/asim_test.go +++ b/pkg/kv/kvserver/asim/asim_test.go @@ -14,7 +14,6 @@ import ( "context" "fmt" "math" - "math/rand" "os" "testing" "time" @@ -36,7 +35,7 @@ func TestRunAllocatorSimulator(t *testing.T) { end := start.Add(1000 * time.Second) interval := 10 * time.Second rwg := make([]workload.Generator, 1) - rwg[0] = testCreateWorkloadGenerator(start, 1, 10) + rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, start, 1, 10) m := asim.NewMetricsTracker(os.Stdout) changer := state.NewReplicaChanger() s := state.LoadConfig(state.ComplexConfig) @@ -45,27 +44,6 @@ func TestRunAllocatorSimulator(t *testing.T) { sim.RunSim(ctx) } -// testCreateWorkloadGenerator creates a simple uniform workload generator that -// will generate load events at a rate of 500 per store. The read ratio is -// fixed to 0.95. -func testCreateWorkloadGenerator(start time.Time, stores int, keySpan int64) workload.Generator { - readRatio := 0.95 - minWriteSize := 128 - maxWriteSize := 256 - workloadRate := float64(stores * 500) - r := rand.New(rand.NewSource(state.TestingWorkloadSeed())) - - return workload.NewRandomGenerator( - start, - state.TestingWorkloadSeed(), - workload.NewUniformKeyGen(keySpan, r), - workloadRate, - readRatio, - maxWriteSize, - minWriteSize, - ) -} - // TestAllocatorSimulatorSpeed tests that the simulation runs at a rate of at // least 1.67 simulated minutes per wall clock second (1:100) for a 32 node // cluster, with 32000 replicas. The workload is generating 16000 keys per @@ -99,7 +77,7 @@ func TestAllocatorSimulatorSpeed(t *testing.T) { sample := func() int64 { rwg := make([]workload.Generator, 1) - rwg[0] = testCreateWorkloadGenerator(start, stores, int64(keyspace)) + rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, start, stores, int64(keyspace)) changer := state.NewReplicaChanger() m := asim.NewMetricsTracker() // no output replicaDistribution := make([]float64, stores) @@ -171,7 +149,7 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) { for run := 0; run < runs; run++ { rwg := make([]workload.Generator, 1) - rwg[0] = testCreateWorkloadGenerator(start, stores, int64(keyspace)) + rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, start, stores, int64(keyspace)) changer := state.NewReplicaChanger() m := asim.NewMetricsTracker() // no output replicaDistribution := make([]float64, stores) diff --git a/pkg/kv/kvserver/asim/metrics_tracker_test.go b/pkg/kv/kvserver/asim/metrics_tracker_test.go index f10428528b7c..9e9d6cf00936 100644 --- a/pkg/kv/kvserver/asim/metrics_tracker_test.go +++ b/pkg/kv/kvserver/asim/metrics_tracker_test.go @@ -110,7 +110,7 @@ func Example_workload() { end := start.Add(200 * time.Second) interval := 10 * time.Second rwg := make([]workload.Generator, 1) - rwg[0] = testCreateWorkloadGenerator(start, 10, 10000) + rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, start, 10, 10000) m := asim.NewMetricsTracker(os.Stdout) changer := state.NewReplicaChanger() diff --git a/pkg/kv/kvserver/asim/workload/workload.go b/pkg/kv/kvserver/asim/workload/workload.go index 9c57e1cd5d54..fe901ab7cc83 100644 --- a/pkg/kv/kvserver/asim/workload/workload.go +++ b/pkg/kv/kvserver/asim/workload/workload.go @@ -230,3 +230,24 @@ func (g *zipfianGenerator) readKey() int64 { func (g *zipfianGenerator) rand() *rand.Rand { return g.random } + +// TestCreateWorkloadGenerator creates a simple uniform workload generator that +// will generate load events at the rate given. The read ratio is fixed to +// 0.95. +func TestCreateWorkloadGenerator(seed int64, start time.Time, rate int, keySpan int64) Generator { + readRatio := 0.95 + minWriteSize := 128 + maxWriteSize := 256 + workloadRate := float64(rate) + r := rand.New(rand.NewSource(seed)) + + return NewRandomGenerator( + start, + seed, + NewUniformKeyGen(keySpan, r), + workloadRate, + readRatio, + maxWriteSize, + minWriteSize, + ) +} From 86fc7707df2d2c77fbd3aff0577abd4ab3a91124 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 20 Jan 2023 00:15:34 +0000 Subject: [PATCH 2/5] asim: tick workload in foreground Previously, the workload generator was ticked every background interval tick to reduce by enabling greater batching. This is problematic however when the workload ticks infrequently and the values are rated over a short period. An example problem is that it can lead to extremely high rated values for queries-per-second when a batch is applied for 10s of load in only a single tick. Release note: None --- pkg/kv/kvserver/asim/asim.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/kv/kvserver/asim/asim.go b/pkg/kv/kvserver/asim/asim.go index 6166ef0fce7b..c4a45a775c3e 100644 --- a/pkg/kv/kvserver/asim/asim.go +++ b/pkg/kv/kvserver/asim/asim.go @@ -210,15 +210,13 @@ func (s *Simulator) RunSim(ctx context.Context) { // tickWorkload gets the next workload events and applies them to state. func (s *Simulator) tickWorkload(ctx context.Context, tick time.Time) { - if !s.bgLastTick.Add(s.bgInterval).After(tick) { - s.shuffler( - len(s.generators), - func(i, j int) { s.generators[i], s.generators[j] = s.generators[j], s.generators[i] }, - ) - for _, generator := range s.generators { - event := generator.Tick(tick) - s.state.ApplyLoad(event) - } + s.shuffler( + len(s.generators), + func(i, j int) { s.generators[i], s.generators[j] = s.generators[j], s.generators[i] }, + ) + for _, generator := range s.generators { + event := generator.Tick(tick) + s.state.ApplyLoad(event) } } From cd43b72493480136ddf82f816de75413f002247b Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 20 Jan 2023 00:21:44 +0000 Subject: [PATCH 3/5] asim: use replica load in place of replicastats Previously, the allocator simulator used replica stats to track the rated stats of a replica. Replica load is a group of stats objects that is better suited. This patch replaces replica stats with replica load. Release note: None --- pkg/kv/kvserver/asim/state/BUILD.bazel | 3 ++- pkg/kv/kvserver/asim/state/helpers.go | 5 ++-- pkg/kv/kvserver/asim/state/impl.go | 16 ++++++++--- pkg/kv/kvserver/asim/state/load.go | 34 +++++++++++++----------- pkg/kv/kvserver/asim/state/state_test.go | 5 ++-- 5 files changed, 39 insertions(+), 24 deletions(-) diff --git a/pkg/kv/kvserver/asim/state/BUILD.bazel b/pkg/kv/kvserver/asim/state/BUILD.bazel index 031415c2256a..83861b16ff17 100644 --- a/pkg/kv/kvserver/asim/state/BUILD.bazel +++ b/pkg/kv/kvserver/asim/state/BUILD.bazel @@ -25,7 +25,7 @@ go_library( "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/workload", "//pkg/kv/kvserver/liveness/livenesspb", - "//pkg/kv/kvserver/replicastats", + "//pkg/kv/kvserver/load", "//pkg/kv/kvserver/split", "//pkg/roachpb", "//pkg/settings/cluster", @@ -53,6 +53,7 @@ go_test( deps = [ "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/workload", + "//pkg/kv/kvserver/load", "//pkg/roachpb", "@com_github_stretchr_testify//require", ], diff --git a/pkg/kv/kvserver/asim/state/helpers.go b/pkg/kv/kvserver/asim/state/helpers.go index 24f589dc2688..e9fbf064f70b 100644 --- a/pkg/kv/kvserver/asim/state/helpers.go +++ b/pkg/kv/kvserver/asim/state/helpers.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -57,7 +58,7 @@ func TestingSetRangeQPS(s State, rangeID RangeID, qps float64) bool { } rlc := s.ReplicaLoad(rangeID, store.StoreID()).(*ReplicaLoadCounter) - rlc.QPS.SetMeanRateForTesting(qps) + rlc.loadStats.TestingSetStat(load.Queries, qps) return true } @@ -265,7 +266,7 @@ func TestDistributeQPSCounts(s State, qpsCounts []float64) { for _, rng := range lhs { rl := s.ReplicaLoad(rng.RangeID(), storeID) rlc := rl.(*ReplicaLoadCounter) - rlc.QPS.SetMeanRateForTesting(qpsPerRange) + rlc.loadStats.TestingSetStat(load.Queries, qpsPerRange) } } } diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index d306afc8965a..b597aa0ce5bb 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -713,10 +713,20 @@ func (s *state) ReplicaLoad(rangeID RangeID, storeID StoreID) ReplicaLoad { // currently on the store given. Otherwise, return an empty, zero counter // value. store, ok := s.LeaseholderStore(rangeID) - if ok && store.StoreID() == storeID { - return s.load[rangeID] + if !ok { + panic(fmt.Sprintf("no leaseholder store found for range %d", storeID)) + } + + // TODO(kvoli): The requested storeID is not the leaseholder. Non + // leaseholder load tracking is not currently supported but is checked by + // other components such as hot ranges. In this case, ignore it but we + // should also track non leaseholder load. See load.go for more. Return an + // empty initialized load counter here. + if store.StoreID() != storeID { + return NewReplicaLoadCounter(s.clock) } - return &ReplicaLoadCounter{} + + return s.load[rangeID] } // ClusterUsageInfo returns the usage information for the Range with ID diff --git a/pkg/kv/kvserver/asim/state/load.go b/pkg/kv/kvserver/asim/state/load.go index 21201b238e27..9021755820a6 100644 --- a/pkg/kv/kvserver/asim/state/load.go +++ b/pkg/kv/kvserver/asim/state/load.go @@ -13,8 +13,9 @@ package state import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // ReplicaLoad defines the methods a datastructure is required to perform in @@ -50,14 +51,14 @@ type ReplicaLoadCounter struct { ReadKeys int64 ReadBytes int64 clock *ManualSimClock - QPS *replicastats.ReplicaStats + loadStats *load.ReplicaLoad } // NewReplicaLoadCounter returns a new replica load counter. func NewReplicaLoadCounter(clock *ManualSimClock) *ReplicaLoadCounter { return &ReplicaLoadCounter{ - clock: clock, - QPS: replicastats.NewReplicaStats(clock.Now(), nil), + clock: clock, + loadStats: load.NewReplicaLoad(hlc.NewClock(clock, 0), nil), } } @@ -67,20 +68,23 @@ func (rl *ReplicaLoadCounter) ApplyLoad(le workload.LoadEvent) { rl.ReadKeys += le.Reads rl.WriteBytes += le.WriteSize rl.WriteKeys += le.Writes - rl.QPS.RecordCount(rl.clock.Now(), LoadEventQPS(le), 0) + + rl.loadStats.RecordBatchRequests(LoadEventQPS(le), 0) + rl.loadStats.RecordRequests(LoadEventQPS(le)) + rl.loadStats.RecordReadKeys(float64(le.Reads)) + rl.loadStats.RecordReadBytes(float64(le.ReadSize)) + rl.loadStats.RecordWriteKeys(float64(le.Writes)) + rl.loadStats.RecordWriteBytes(float64(le.WriteSize)) } // Load translates the recorded key accesses and size into range usage // information. func (rl *ReplicaLoadCounter) Load() allocator.RangeUsageInfo { - qps := 0.0 - if rl.QPS != nil { - qps, _ = rl.QPS.AverageRatePerSecond(rl.clock.Now()) - } + stats := rl.loadStats.Stats() return allocator.RangeUsageInfo{ LogicalBytes: rl.WriteBytes, - QueriesPerSecond: qps, + QueriesPerSecond: stats.QueriesPerSecond, WritesPerSecond: float64(rl.WriteKeys), } } @@ -88,9 +92,7 @@ func (rl *ReplicaLoadCounter) Load() allocator.RangeUsageInfo { // ResetLoad resets the load of the ReplicaLoad. This only affects rated // counters. func (rl *ReplicaLoadCounter) ResetLoad() { - if rl.QPS != nil { - rl.QPS.ResetRequestCounts(rl.clock.Now()) - } + rl.loadStats.Reset() } // Split halves the load of the ReplicaLoad this method is called on and @@ -101,16 +103,16 @@ func (rl *ReplicaLoadCounter) Split() ReplicaLoad { rl.ReadKeys /= 2 rl.ReadBytes /= 2 - otherQPS := replicastats.NewReplicaStats(rl.clock.Now(), nil) - rl.QPS.SplitRequestCounts(otherQPS) + otherLoadStats := load.NewReplicaLoad(hlc.NewClock(rl.clock, 0), nil) + rl.loadStats.Split(otherLoadStats) return &ReplicaLoadCounter{ WriteKeys: rl.WriteKeys, WriteBytes: rl.WriteBytes, ReadKeys: rl.ReadKeys, ReadBytes: rl.ReadBytes, - QPS: otherQPS, clock: rl.clock, + loadStats: otherLoadStats, } } diff --git a/pkg/kv/kvserver/asim/state/state_test.go b/pkg/kv/kvserver/asim/state/state_test.go index 86c168dc772f..20e31db49674 100644 --- a/pkg/kv/kvserver/asim/state/state_test.go +++ b/pkg/kv/kvserver/asim/state/state_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/stretchr/testify/require" ) @@ -65,8 +66,8 @@ func TestRangeSplit(t *testing.T) { // Assert that the lhs now has half the previous load counters. lhsLoad := s.load[lhs.RangeID()].(*ReplicaLoadCounter) rhsLoad := s.load[rhs.RangeID()].(*ReplicaLoadCounter) - lhsQPS, _ := lhsLoad.QPS.Sum() - rhsQPS, _ := rhsLoad.QPS.Sum() + lhsQPS := lhsLoad.loadStats.TestingGetSum(load.Queries) + rhsQPS := rhsLoad.loadStats.TestingGetSum(load.Queries) require.Equal(t, int64(50), lhsLoad.ReadKeys) require.Equal(t, int64(50), lhsLoad.WriteKeys) require.Equal(t, int64(50), lhsLoad.WriteBytes) From 8bd1556d03713e92acab9e454c1b89ceea2d2b02 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 20 Jan 2023 00:24:07 +0000 Subject: [PATCH 4/5] asim: update local store after lease transfer Previously, the simulator relied on a bug where gossip shared a reference to each store descriptor in the cluster. This was fixed in #93945. This commit updates the simulator code to update the storepool state like the real code. Release note: None --- .../kvserver/asim/storerebalancer/store_rebalancer.go | 11 +++++++++++ .../asim/storerebalancer/store_rebalancer_test.go | 1 - 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go index 23d5345d04fc..fb997e93baad 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go +++ b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go @@ -73,6 +73,7 @@ type storeRebalancerControl struct { allocator allocatorimpl.Allocator controller op.Controller + storepool storepool.AllocatorStorePool } // NewStoreRebalancer returns a new simulator store rebalancer. @@ -104,6 +105,8 @@ func newStoreRebalancerControl( getRaftStatusFn, ) + sr.AddLogTag("s", storeID) + return &storeRebalancerControl{ sr: sr, settings: settings, @@ -112,6 +115,7 @@ func newStoreRebalancerControl( }, storeID: storeID, allocator: allocator, + storepool: storePool, controller: controller, } @@ -142,6 +146,8 @@ func (src *storeRebalancerControl) checkPendingTicket() (done bool, next time.Ti } func (src *storeRebalancerControl) Tick(ctx context.Context, tick time.Time, state state.State) { + src.sr.AddLogTag("tick", tick) + ctx = src.sr.ResetAndAnnotateCtx(ctx) switch src.rebalancerState.phase { case rebalancerSleeping: src.phaseSleep(ctx, tick, state) @@ -202,6 +208,11 @@ func (src *storeRebalancerControl) checkPendingLeaseRebalance(ctx context.Contex } if err == nil { + src.storepool.UpdateLocalStoresAfterLeaseTransfer( + roachpb.StoreID(src.storeID), + src.rebalancerState.pendingTransferTarget.StoreID, + src.rebalancerState.pendingTransfer.RangeUsageInfo(), + ) // The transfer has completed without error, update the local // state to reflect it's success. src.sr.PostLeaseRebalance( diff --git a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go index cdfc697335df..edb8ccfcc5d4 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go +++ b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go @@ -201,7 +201,6 @@ func TestStoreRebalancer(t *testing.T) { s.TickClock(state.OffsetTick(start, tick)) changer.Tick(state.OffsetTick(start, tick), s) controller.Tick(ctx, state.OffsetTick(start, tick), s) - gossip.Tick(ctx, state.OffsetTick(start, tick), s) src.Tick(ctx, state.OffsetTick(start, tick), s) resultsPhase = append(resultsPhase, src.rebalancerState.phase) storeQPS := testingGetStoreQPS(s) From 6185c28a33a4b0fc3dc989b93c4dd8b1139fef06 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 20 Jan 2023 23:59:11 +0000 Subject: [PATCH 5/5] asim: add start time setting and init clocks Previously, the clock that is shared among all simulated stores was initialized to zero. The clock would then jump to the start time after simulator ticking begun. This is problematic for rated components such as replica stats which rely on the clock. This commit initializes the state clock with a passed in argument defining the start time. This commit also updates the simulation settings to use a start time and pass in a duration when running the simulation, as opposed to passing a start and end time. Release note: None --- pkg/kv/kvserver/asim/asim.go | 14 +++++------ pkg/kv/kvserver/asim/asim_test.go | 24 +++++++------------ pkg/kv/kvserver/asim/config/settings.go | 11 +++++++++ pkg/kv/kvserver/asim/gossip/exchange_test.go | 3 +-- pkg/kv/kvserver/asim/gossip/gossip_test.go | 2 +- pkg/kv/kvserver/asim/metrics_tracker_test.go | 8 +++---- pkg/kv/kvserver/asim/op/controller_test.go | 3 +-- pkg/kv/kvserver/asim/state/impl.go | 2 +- pkg/kv/kvserver/asim/state/state_test.go | 5 ++-- .../storerebalancer/store_rebalancer_test.go | 4 ++-- 10 files changed, 39 insertions(+), 37 deletions(-) diff --git a/pkg/kv/kvserver/asim/asim.go b/pkg/kv/kvserver/asim/asim.go index c4a45a775c3e..971679482287 100644 --- a/pkg/kv/kvserver/asim/asim.go +++ b/pkg/kv/kvserver/asim/asim.go @@ -64,11 +64,10 @@ type Simulator struct { // NewSimulator constructs a valid Simulator. func NewSimulator( - start, end time.Time, + duration time.Duration, interval, bgInterval time.Duration, wgs []workload.Generator, initialState state.State, - changer state.Changer, settings *config.SimulationSettings, metrics *MetricsTracker, ) *Simulator { @@ -76,6 +75,7 @@ func NewSimulator( rqs := make(map[state.StoreID]queue.RangeQueue) sqs := make(map[state.StoreID]queue.RangeQueue) srs := make(map[state.StoreID]storerebalancer.StoreRebalancer) + changer := state.NewReplicaChanger() controllers := make(map[state.StoreID]op.Controller) for _, store := range initialState.Stores() { storeID := store.StoreID() @@ -91,14 +91,14 @@ func NewSimulator( settings.ReplicaChangeDelayFn(), allocator, storePool, - start, + settings.StartTime, ) sqs[storeID] = queue.NewSplitQueue( storeID, changer, settings.RangeSplitDelayFn(), settings.RangeSizeSplitThreshold, - start, + settings.StartTime, ) pacers[storeID] = NewScannerReplicaPacer( initialState.NextReplicasFn(storeID), @@ -114,7 +114,7 @@ func NewSimulator( settings, ) srs[storeID] = storerebalancer.NewStoreRebalancer( - start, + settings.StartTime, storeID, controllers[storeID], allocator, @@ -125,8 +125,8 @@ func NewSimulator( } return &Simulator{ - curr: start, - end: end, + curr: settings.StartTime, + end: settings.StartTime.Add(duration), interval: interval, bgInterval: bgInterval, generators: wgs, diff --git a/pkg/kv/kvserver/asim/asim_test.go b/pkg/kv/kvserver/asim/asim_test.go index 759a3019e27c..6857eb5e3359 100644 --- a/pkg/kv/kvserver/asim/asim_test.go +++ b/pkg/kv/kvserver/asim/asim_test.go @@ -31,16 +31,14 @@ import ( func TestRunAllocatorSimulator(t *testing.T) { ctx := context.Background() settings := config.DefaultSimulationSettings() - start := state.TestingStartTime() - end := start.Add(1000 * time.Second) + duration := 1000 * time.Second interval := 10 * time.Second rwg := make([]workload.Generator, 1) - rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, start, 1, 10) + rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, settings.StartTime, 1, 10) m := asim.NewMetricsTracker(os.Stdout) - changer := state.NewReplicaChanger() s := state.LoadConfig(state.ComplexConfig) - sim := asim.NewSimulator(start, end, interval, interval, rwg, s, changer, settings, m) + sim := asim.NewSimulator(duration, interval, interval, rwg, s, settings, m) sim.RunSim(ctx) } @@ -56,11 +54,10 @@ func TestAllocatorSimulatorSpeed(t *testing.T) { skip.UnderStressRace(t, skipString) skip.UnderRace(t, skipString) - start := state.TestingStartTime() settings := config.DefaultSimulationSettings() // Run each simulation for 5 minutes. - end := start.Add(5 * time.Minute) + duration := 5 * time.Minute bgInterval := 10 * time.Second interval := 2 * time.Second @@ -77,8 +74,7 @@ func TestAllocatorSimulatorSpeed(t *testing.T) { sample := func() int64 { rwg := make([]workload.Generator, 1) - rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, start, stores, int64(keyspace)) - changer := state.NewReplicaChanger() + rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, settings.StartTime, stores, int64(keyspace)) m := asim.NewMetricsTracker() // no output replicaDistribution := make([]float64, stores) @@ -94,7 +90,7 @@ func TestAllocatorSimulatorSpeed(t *testing.T) { } s := state.NewTestStateReplDistribution(replicaDistribution, ranges, replsPerRange, keyspace) - sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, changer, settings, m) + sim := asim.NewSimulator(duration, interval, bgInterval, rwg, s, settings, m) startTime := timeutil.Now() sim.RunSim(ctx) @@ -126,11 +122,10 @@ func TestAllocatorSimulatorSpeed(t *testing.T) { func TestAllocatorSimulatorDeterministic(t *testing.T) { - start := state.TestingStartTime() settings := config.DefaultSimulationSettings() runs := 3 - end := start.Add(15 * time.Minute) + duration := 15 * time.Minute bgInterval := 10 * time.Second interval := 2 * time.Second @@ -149,8 +144,7 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) { for run := 0; run < runs; run++ { rwg := make([]workload.Generator, 1) - rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, start, stores, int64(keyspace)) - changer := state.NewReplicaChanger() + rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, settings.StartTime, stores, int64(keyspace)) m := asim.NewMetricsTracker() // no output replicaDistribution := make([]float64, stores) @@ -166,7 +160,7 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) { } s := state.NewTestStateReplDistribution(replicaDistribution, ranges, replsPerRange, keyspace) - sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, changer, settings, m) + sim := asim.NewSimulator(duration, interval, bgInterval, rwg, s, settings, m) ctx := context.Background() sim.RunSim(ctx) diff --git a/pkg/kv/kvserver/asim/config/settings.go b/pkg/kv/kvserver/asim/config/settings.go index b1f051cf32c5..59a7d637af85 100644 --- a/pkg/kv/kvserver/asim/config/settings.go +++ b/pkg/kv/kvserver/asim/config/settings.go @@ -33,9 +33,19 @@ const ( defaultLBRebalancingDimension = 0 // QPS ) +var ( + // defaultStartTime is used as the default beginning time for simulation + // runs. It isn't necessarily meaningful other than for logging and having + // "some" start time for components taking a time.Time. + defaultStartTime = time.Date(2022, 03, 21, 11, 0, 0, 0, time.UTC) +) + // SimulationSettings controls // WIP: Thread these settings through to each of the sim parts. type SimulationSettings struct { + // StartTime is the time to start the simulation at. This is also used to + // init the shared state simulation clock. + StartTime time.Time // Seed is the random source that will be used for any simulator components // that accept a seed. Seed int64 @@ -102,6 +112,7 @@ type SimulationSettings struct { // DefaultSimulationSettings returns a set of default settings for simulation. func DefaultSimulationSettings() *SimulationSettings { return &SimulationSettings{ + StartTime: defaultStartTime, Seed: defaultSeed, ReplicaChangeBaseDelay: defaultReplicaChangeBaseDelay, ReplicaAddRate: defaultReplicaAddDelayFactor, diff --git a/pkg/kv/kvserver/asim/gossip/exchange_test.go b/pkg/kv/kvserver/asim/gossip/exchange_test.go index a52550197b1e..3fc7ca057b5b 100644 --- a/pkg/kv/kvserver/asim/gossip/exchange_test.go +++ b/pkg/kv/kvserver/asim/gossip/exchange_test.go @@ -14,7 +14,6 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/stretchr/testify/require" ) @@ -30,7 +29,7 @@ func TestFixedDelayExchange(t *testing.T) { } settings := config.DefaultSimulationSettings() - tick := state.TestingStartTime() + tick := settings.StartTime exchange := fixedDelayExchange{pending: []exchangeInfo{}, settings: settings} // There should be no updates initially. diff --git a/pkg/kv/kvserver/asim/gossip/gossip_test.go b/pkg/kv/kvserver/asim/gossip/gossip_test.go index cf8598e44504..80a6c794dbaf 100644 --- a/pkg/kv/kvserver/asim/gossip/gossip_test.go +++ b/pkg/kv/kvserver/asim/gossip/gossip_test.go @@ -24,7 +24,7 @@ import ( func TestGossip(t *testing.T) { settings := config.DefaultSimulationSettings() - tick := state.TestingStartTime() + tick := settings.StartTime s := state.NewTestStateEvenDistribution(3, 100, 3, 1000) details := map[state.StoreID]*map[roachpb.StoreID]*storepool.StoreDetail{} diff --git a/pkg/kv/kvserver/asim/metrics_tracker_test.go b/pkg/kv/kvserver/asim/metrics_tracker_test.go index 9e9d6cf00936..8a254002d103 100644 --- a/pkg/kv/kvserver/asim/metrics_tracker_test.go +++ b/pkg/kv/kvserver/asim/metrics_tracker_test.go @@ -105,18 +105,16 @@ func Example_rebalance() { func Example_workload() { ctx := context.Background() - start := state.TestingStartTime() settings := config.DefaultSimulationSettings() - end := start.Add(200 * time.Second) + duration := 200 * time.Second interval := 10 * time.Second rwg := make([]workload.Generator, 1) - rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, start, 10, 10000) + rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, settings.StartTime, 10, 10000) m := asim.NewMetricsTracker(os.Stdout) - changer := state.NewReplicaChanger() s := state.LoadConfig(state.ComplexConfig) - sim := asim.NewSimulator(start, end, interval, interval, rwg, s, changer, settings, m) + sim := asim.NewSimulator(duration, interval, interval, rwg, s, settings, m) sim.RunSim(ctx) // WIP: non deterministic // Output: diff --git a/pkg/kv/kvserver/asim/op/controller_test.go b/pkg/kv/kvserver/asim/op/controller_test.go index d264fbd63123..d12c9a266e84 100644 --- a/pkg/kv/kvserver/asim/op/controller_test.go +++ b/pkg/kv/kvserver/asim/op/controller_test.go @@ -139,9 +139,8 @@ func TestLeaseTransferOp(t *testing.T) { } func TestRelocateRangeOp(t *testing.T) { - start := state.TestingStartTime() - settings := config.DefaultSimulationSettings() + start := settings.StartTime settings.ReplicaAddRate = 1 settings.ReplicaChangeBaseDelay = 5 * time.Second settings.StateExchangeInterval = 1 * time.Second diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index b597aa0ce5bb..d108ae31011d 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -61,7 +61,7 @@ func newState(settings *config.SimulationSettings) *state { nodes: make(map[NodeID]*node), stores: make(map[StoreID]*store), loadsplits: make(map[StoreID]LoadSplitter), - clock: &ManualSimClock{nanos: 0}, + clock: &ManualSimClock{nanos: settings.StartTime.UnixNano()}, ranges: newRMap(), usageInfo: newClusterUsageInfo(), settings: config.DefaultSimulationSettings(), diff --git a/pkg/kv/kvserver/asim/state/state_test.go b/pkg/kv/kvserver/asim/state/state_test.go index 20e31db49674..bdaa49b42985 100644 --- a/pkg/kv/kvserver/asim/state/state_test.go +++ b/pkg/kv/kvserver/asim/state/state_test.go @@ -290,8 +290,9 @@ func TestWorkloadApply(t *testing.T) { // TestReplicaLoadQPS asserts that the rated replica load accounting maintains // the average per second corresponding to the tick clock. func TestReplicaLoadQPS(t *testing.T) { - s := NewState(config.DefaultSimulationSettings()) - start := TestingStartTime() + settings := config.DefaultSimulationSettings() + s := NewState(settings) + start := settings.StartTime n1 := s.AddNode() k1 := Key(100) diff --git a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go index edb8ccfcc5d4..6f11cf4ed094 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go +++ b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go @@ -36,9 +36,9 @@ func testingGetStoreQPS(s state.State) map[state.StoreID]float64 { } func TestStoreRebalancer(t *testing.T) { - start := state.TestingStartTime() testingStore := state.StoreID(1) testSettings := config.DefaultSimulationSettings() + start := testSettings.StartTime testSettings.ReplicaChangeBaseDelay = 5 * time.Second testSettings.StateExchangeDelay = 0 @@ -215,9 +215,9 @@ func TestStoreRebalancer(t *testing.T) { } func TestStoreRebalancerBalances(t *testing.T) { - start := state.TestingStartTime() testingStore := state.StoreID(1) testSettings := config.DefaultSimulationSettings() + start := testSettings.StartTime testSettings.ReplicaAddRate = 1 testSettings.ReplicaChangeBaseDelay = 1 * time.Second testSettings.StateExchangeInterval = 1 * time.Second