Skip to content

Commit

Permalink
Merge #95685
Browse files Browse the repository at this point in the history
95685: asim: make rebalancing more accurate r=alextalks a=kvoli

This series of commits resolves some inaccuracies in the allocator simulator. With this PR the simulated runs are now reasonably correct for replicate queue replica rebalancing, the store rebalancer (all) and load based splitting.

Notably the replicate queue simulation does not handle lease transfers nor any other action than rebalancing.

resolves: #83989
resolves: #83990

Release note: None

Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
craig[bot] and kvoli committed Jan 28, 2023
2 parents 78fe59d + 6185c28 commit 502acb0
Show file tree
Hide file tree
Showing 15 changed files with 117 additions and 93 deletions.
30 changes: 14 additions & 16 deletions pkg/kv/kvserver/asim/asim.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ type Simulator struct {

// NewSimulator constructs a valid Simulator.
func NewSimulator(
start, end time.Time,
duration time.Duration,
interval, bgInterval time.Duration,
wgs []workload.Generator,
initialState state.State,
changer state.Changer,
settings *config.SimulationSettings,
metrics *MetricsTracker,
) *Simulator {
pacers := make(map[state.StoreID]ReplicaPacer)
rqs := make(map[state.StoreID]queue.RangeQueue)
sqs := make(map[state.StoreID]queue.RangeQueue)
srs := make(map[state.StoreID]storerebalancer.StoreRebalancer)
changer := state.NewReplicaChanger()
controllers := make(map[state.StoreID]op.Controller)
for _, store := range initialState.Stores() {
storeID := store.StoreID()
Expand All @@ -91,14 +91,14 @@ func NewSimulator(
settings.ReplicaChangeDelayFn(),
allocator,
storePool,
start,
settings.StartTime,
)
sqs[storeID] = queue.NewSplitQueue(
storeID,
changer,
settings.RangeSplitDelayFn(),
settings.RangeSizeSplitThreshold,
start,
settings.StartTime,
)
pacers[storeID] = NewScannerReplicaPacer(
initialState.NextReplicasFn(storeID),
Expand All @@ -114,7 +114,7 @@ func NewSimulator(
settings,
)
srs[storeID] = storerebalancer.NewStoreRebalancer(
start,
settings.StartTime,
storeID,
controllers[storeID],
allocator,
Expand All @@ -125,8 +125,8 @@ func NewSimulator(
}

return &Simulator{
curr: start,
end: end,
curr: settings.StartTime,
end: settings.StartTime.Add(duration),
interval: interval,
bgInterval: bgInterval,
generators: wgs,
Expand Down Expand Up @@ -210,15 +210,13 @@ func (s *Simulator) RunSim(ctx context.Context) {

// tickWorkload gets the next workload events and applies them to state.
func (s *Simulator) tickWorkload(ctx context.Context, tick time.Time) {
if !s.bgLastTick.Add(s.bgInterval).After(tick) {
s.shuffler(
len(s.generators),
func(i, j int) { s.generators[i], s.generators[j] = s.generators[j], s.generators[i] },
)
for _, generator := range s.generators {
event := generator.Tick(tick)
s.state.ApplyLoad(event)
}
s.shuffler(
len(s.generators),
func(i, j int) { s.generators[i], s.generators[j] = s.generators[j], s.generators[i] },
)
for _, generator := range s.generators {
event := generator.Tick(tick)
s.state.ApplyLoad(event)
}
}

Expand Down
46 changes: 9 additions & 37 deletions pkg/kv/kvserver/asim/asim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"fmt"
"math"
"math/rand"
"os"
"testing"
"time"
Expand All @@ -32,40 +31,17 @@ import (
func TestRunAllocatorSimulator(t *testing.T) {
ctx := context.Background()
settings := config.DefaultSimulationSettings()
start := state.TestingStartTime()
end := start.Add(1000 * time.Second)
duration := 1000 * time.Second
interval := 10 * time.Second
rwg := make([]workload.Generator, 1)
rwg[0] = testCreateWorkloadGenerator(start, 1, 10)
rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, settings.StartTime, 1, 10)
m := asim.NewMetricsTracker(os.Stdout)
changer := state.NewReplicaChanger()
s := state.LoadConfig(state.ComplexConfig)

sim := asim.NewSimulator(start, end, interval, interval, rwg, s, changer, settings, m)
sim := asim.NewSimulator(duration, interval, interval, rwg, s, settings, m)
sim.RunSim(ctx)
}

// testCreateWorkloadGenerator creates a simple uniform workload generator that
// will generate load events at a rate of 500 per store. The read ratio is
// fixed to 0.95.
func testCreateWorkloadGenerator(start time.Time, stores int, keySpan int64) workload.Generator {
readRatio := 0.95
minWriteSize := 128
maxWriteSize := 256
workloadRate := float64(stores * 500)
r := rand.New(rand.NewSource(state.TestingWorkloadSeed()))

return workload.NewRandomGenerator(
start,
state.TestingWorkloadSeed(),
workload.NewUniformKeyGen(keySpan, r),
workloadRate,
readRatio,
maxWriteSize,
minWriteSize,
)
}

// TestAllocatorSimulatorSpeed tests that the simulation runs at a rate of at
// least 1.67 simulated minutes per wall clock second (1:100) for a 32 node
// cluster, with 32000 replicas. The workload is generating 16000 keys per
Expand All @@ -78,11 +54,10 @@ func TestAllocatorSimulatorSpeed(t *testing.T) {
skip.UnderStressRace(t, skipString)
skip.UnderRace(t, skipString)

start := state.TestingStartTime()
settings := config.DefaultSimulationSettings()

// Run each simulation for 5 minutes.
end := start.Add(5 * time.Minute)
duration := 5 * time.Minute
bgInterval := 10 * time.Second
interval := 2 * time.Second

Expand All @@ -99,8 +74,7 @@ func TestAllocatorSimulatorSpeed(t *testing.T) {

sample := func() int64 {
rwg := make([]workload.Generator, 1)
rwg[0] = testCreateWorkloadGenerator(start, stores, int64(keyspace))
changer := state.NewReplicaChanger()
rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, settings.StartTime, stores, int64(keyspace))
m := asim.NewMetricsTracker() // no output
replicaDistribution := make([]float64, stores)

Expand All @@ -116,7 +90,7 @@ func TestAllocatorSimulatorSpeed(t *testing.T) {
}

s := state.NewTestStateReplDistribution(replicaDistribution, ranges, replsPerRange, keyspace)
sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, changer, settings, m)
sim := asim.NewSimulator(duration, interval, bgInterval, rwg, s, settings, m)

startTime := timeutil.Now()
sim.RunSim(ctx)
Expand Down Expand Up @@ -148,11 +122,10 @@ func TestAllocatorSimulatorSpeed(t *testing.T) {

func TestAllocatorSimulatorDeterministic(t *testing.T) {

start := state.TestingStartTime()
settings := config.DefaultSimulationSettings()

runs := 3
end := start.Add(15 * time.Minute)
duration := 15 * time.Minute
bgInterval := 10 * time.Second
interval := 2 * time.Second

Expand All @@ -171,8 +144,7 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) {

for run := 0; run < runs; run++ {
rwg := make([]workload.Generator, 1)
rwg[0] = testCreateWorkloadGenerator(start, stores, int64(keyspace))
changer := state.NewReplicaChanger()
rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, settings.StartTime, stores, int64(keyspace))
m := asim.NewMetricsTracker() // no output
replicaDistribution := make([]float64, stores)

Expand All @@ -188,7 +160,7 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) {
}

s := state.NewTestStateReplDistribution(replicaDistribution, ranges, replsPerRange, keyspace)
sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, changer, settings, m)
sim := asim.NewSimulator(duration, interval, bgInterval, rwg, s, settings, m)

ctx := context.Background()
sim.RunSim(ctx)
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/asim/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,19 @@ const (
defaultLBRebalancingDimension = 0 // QPS
)

var (
// defaultStartTime is used as the default beginning time for simulation
// runs. It isn't necessarily meaningful other than for logging and having
// "some" start time for components taking a time.Time.
defaultStartTime = time.Date(2022, 03, 21, 11, 0, 0, 0, time.UTC)
)

// SimulationSettings controls
// WIP: Thread these settings through to each of the sim parts.
type SimulationSettings struct {
// StartTime is the time to start the simulation at. This is also used to
// init the shared state simulation clock.
StartTime time.Time
// Seed is the random source that will be used for any simulator components
// that accept a seed.
Seed int64
Expand Down Expand Up @@ -102,6 +112,7 @@ type SimulationSettings struct {
// DefaultSimulationSettings returns a set of default settings for simulation.
func DefaultSimulationSettings() *SimulationSettings {
return &SimulationSettings{
StartTime: defaultStartTime,
Seed: defaultSeed,
ReplicaChangeBaseDelay: defaultReplicaChangeBaseDelay,
ReplicaAddRate: defaultReplicaAddDelayFactor,
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/asim/gossip/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/stretchr/testify/require"
)
Expand All @@ -30,7 +29,7 @@ func TestFixedDelayExchange(t *testing.T) {
}

settings := config.DefaultSimulationSettings()
tick := state.TestingStartTime()
tick := settings.StartTime
exchange := fixedDelayExchange{pending: []exchangeInfo{}, settings: settings}

// There should be no updates initially.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/asim/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
func TestGossip(t *testing.T) {
settings := config.DefaultSimulationSettings()

tick := state.TestingStartTime()
tick := settings.StartTime
s := state.NewTestStateEvenDistribution(3, 100, 3, 1000)
details := map[state.StoreID]*map[roachpb.StoreID]*storepool.StoreDetail{}

Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/asim/metrics_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,16 @@ func Example_rebalance() {

func Example_workload() {
ctx := context.Background()
start := state.TestingStartTime()
settings := config.DefaultSimulationSettings()
end := start.Add(200 * time.Second)
duration := 200 * time.Second
interval := 10 * time.Second
rwg := make([]workload.Generator, 1)
rwg[0] = testCreateWorkloadGenerator(start, 10, 10000)
rwg[0] = workload.TestCreateWorkloadGenerator(settings.Seed, settings.StartTime, 10, 10000)
m := asim.NewMetricsTracker(os.Stdout)

changer := state.NewReplicaChanger()
s := state.LoadConfig(state.ComplexConfig)

sim := asim.NewSimulator(start, end, interval, interval, rwg, s, changer, settings, m)
sim := asim.NewSimulator(duration, interval, interval, rwg, s, settings, m)
sim.RunSim(ctx)
// WIP: non deterministic
// Output:
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/asim/op/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,8 @@ func TestLeaseTransferOp(t *testing.T) {
}

func TestRelocateRangeOp(t *testing.T) {
start := state.TestingStartTime()

settings := config.DefaultSimulationSettings()
start := settings.StartTime
settings.ReplicaAddRate = 1
settings.ReplicaChangeBaseDelay = 5 * time.Second
settings.StateExchangeInterval = 1 * time.Second
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/asim/state/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ go_library(
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/workload",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/replicastats",
"//pkg/kv/kvserver/load",
"//pkg/kv/kvserver/split",
"//pkg/roachpb",
"//pkg/settings/cluster",
Expand Down Expand Up @@ -53,6 +53,7 @@ go_test(
deps = [
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/workload",
"//pkg/kv/kvserver/load",
"//pkg/roachpb",
"@com_github_stretchr_testify//require",
],
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/asim/state/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -57,7 +58,7 @@ func TestingSetRangeQPS(s State, rangeID RangeID, qps float64) bool {
}

rlc := s.ReplicaLoad(rangeID, store.StoreID()).(*ReplicaLoadCounter)
rlc.QPS.SetMeanRateForTesting(qps)
rlc.loadStats.TestingSetStat(load.Queries, qps)
return true
}

Expand Down Expand Up @@ -265,7 +266,7 @@ func TestDistributeQPSCounts(s State, qpsCounts []float64) {
for _, rng := range lhs {
rl := s.ReplicaLoad(rng.RangeID(), storeID)
rlc := rl.(*ReplicaLoadCounter)
rlc.QPS.SetMeanRateForTesting(qpsPerRange)
rlc.loadStats.TestingSetStat(load.Queries, qpsPerRange)
}
}
}
18 changes: 14 additions & 4 deletions pkg/kv/kvserver/asim/state/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newState(settings *config.SimulationSettings) *state {
nodes: make(map[NodeID]*node),
stores: make(map[StoreID]*store),
loadsplits: make(map[StoreID]LoadSplitter),
clock: &ManualSimClock{nanos: 0},
clock: &ManualSimClock{nanos: settings.StartTime.UnixNano()},
ranges: newRMap(),
usageInfo: newClusterUsageInfo(),
settings: config.DefaultSimulationSettings(),
Expand Down Expand Up @@ -713,10 +713,20 @@ func (s *state) ReplicaLoad(rangeID RangeID, storeID StoreID) ReplicaLoad {
// currently on the store given. Otherwise, return an empty, zero counter
// value.
store, ok := s.LeaseholderStore(rangeID)
if ok && store.StoreID() == storeID {
return s.load[rangeID]
if !ok {
panic(fmt.Sprintf("no leaseholder store found for range %d", storeID))
}

// TODO(kvoli): The requested storeID is not the leaseholder. Non
// leaseholder load tracking is not currently supported but is checked by
// other components such as hot ranges. In this case, ignore it but we
// should also track non leaseholder load. See load.go for more. Return an
// empty initialized load counter here.
if store.StoreID() != storeID {
return NewReplicaLoadCounter(s.clock)
}
return &ReplicaLoadCounter{}

return s.load[rangeID]
}

// ClusterUsageInfo returns the usage information for the Range with ID
Expand Down
Loading

0 comments on commit 502acb0

Please sign in to comment.