Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asim: add randomized range generation #107354

Merged
merged 1 commit into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions pkg/kv/kvserver/asim/gen/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,34 +192,43 @@ type PlacementType int
const (
Even PlacementType = iota
Skewed
Random
WeightedRandom
)

// BaseRanges provide fundamental range functionality and are embedded in
// specialized range structs. These structs implement the RangeGen interface
// which is then utilized to generate allocator simulation. Key structs that
// embed BaseRanges are: BasicRanges.
// embed BaseRanges are: BasicRanges, RandomizedBasicRanges, and
// WeightedRandomizedBasicRanges.
type BaseRanges struct {
Ranges int
KeySpace int
ReplicationFactor int
Bytes int64
}

// getRangesInfo generates and distributes ranges across stores based on
// GetRangesInfo generates and distributes ranges across stores based on
// PlacementType while using other BaseRanges fields for range configuration.
func (b BaseRanges) getRangesInfo(pType PlacementType, numOfStores int) state.RangesInfo {
func (b BaseRanges) GetRangesInfo(
pType PlacementType, numOfStores int, randSource *rand.Rand, weightedRandom []float64,
) state.RangesInfo {
switch pType {
case Even:
return state.RangesInfoEvenDistribution(numOfStores, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes)
case Skewed:
return state.RangesInfoSkewedDistribution(numOfStores, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes)
case Random:
return state.RangesInfoRandDistribution(randSource, numOfStores, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes)
case WeightedRandom:
return state.RangesInfoWeightedRandDistribution(randSource, weightedRandom, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes)
default:
panic(fmt.Sprintf("unexpected range placement type %v", pType))
}
}

// loadRangeInfo loads the given state with the specified rangesInfo.
func (b BaseRanges) loadRangeInfo(s state.State, rangesInfo state.RangesInfo) {
// LoadRangeInfo loads the given state with the specified rangesInfo.
func (b BaseRanges) LoadRangeInfo(s state.State, rangesInfo state.RangesInfo) {
for _, rangeInfo := range rangesInfo {
rangeInfo.Size = b.Bytes
}
Expand All @@ -239,8 +248,11 @@ type BasicRanges struct {
func (br BasicRanges) Generate(
seed int64, settings *config.SimulationSettings, s state.State,
) state.State {
rangesInfo := br.getRangesInfo(br.PlacementType, len(s.Stores()))
br.loadRangeInfo(s, rangesInfo)
if br.PlacementType == Random || br.PlacementType == WeightedRandom {
panic("BasicRanges generate only uniform or skewed distributions")
}
rangesInfo := br.GetRangesInfo(br.PlacementType, len(s.Stores()), nil, []float64{})
br.LoadRangeInfo(s, rangesInfo)
return s
}

Expand Down
161 changes: 161 additions & 0 deletions pkg/kv/kvserver/asim/state/new_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package state

import (
"fmt"
"math/rand"
"sort"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
Expand Down Expand Up @@ -65,6 +66,79 @@ func exactDistribution(counts []int) []float64 {
return distribution
}

// weighted struct handles weighted random index selection from an input array,
// weightedStores.
//
// For example, consider input weightedStores = [0.1, 0.2, 0.7].
// - newWeighted constructs cumulative weighs, creating cumulativeWeighted [0.1,
// 0.3, 1.0].
// - rand function then randomly selects a number n within the range of [0.0,
// 1.0) and finds which bucket ([0.0, 0.1], (0.1, 0.3], (0.3, 1.0]) n falls
// under. It finds the smallest index within cumulativeWeights that >= n. Thus,
// indices with greater weights have a higher probability of being selected as
// they cover larger cumulative weights range. For instance, if it selects 0.5,
// Rand would return index 2 since 0.7 is the smallest index that is >= 0.5.
type weighted struct {
cumulativeWeights []float64
}

// newWeighted constructs cumulative weights that are used later to select a
// single random index from weightedStores based on the associated weights.
func newWeighted(weightedStores []float64) weighted {
cumulativeWeights := make([]float64, len(weightedStores))
prefixSumWeight := float64(0)
for i, item := range weightedStores {
prefixSumWeight += item
cumulativeWeights[i] = prefixSumWeight
}
if cumulativeWeights[len(weightedStores)-1] != float64(1) {
panic(fmt.Sprintf("total cumulative weights for all stores should sum up to one but got %.2f\n",
cumulativeWeights[len(weightedStores)-1]))
}
return weighted{cumulativeWeights: cumulativeWeights}
}

// rand randomly picks an index from weightedStores based on the associated
// weights.
func (w weighted) rand(randSource *rand.Rand) int {
r := randSource.Float64()
index := sort.Search(len(w.cumulativeWeights), func(i int) bool { return w.cumulativeWeights[i] >= r })
return index
}

// weightedRandDistribution generates a weighted random distribution across
// stores. It achieves this by randomly selecting an index from weightedStores
// 10 times while considering the weights, and repeating this process ten times.
// The output is a weighted random distribution reflecting the selections made.
func weightedRandDistribution(randSource *rand.Rand, weightedStores []float64) []float64 {
w := newWeighted(weightedStores)
numSamples := 10
votes := make([]int, len(weightedStores))
for i := 0; i < numSamples; i++ {
index := w.rand(randSource)
votes[index] += 1
}
return exactDistribution(votes)
}

// randDistribution generates a random distribution across stores. It achieves
// this by creating an array of size n, selecting random numbers from [0, 10)
// for each index, and returning the exact distribution of this result.
func randDistribution(randSource *rand.Rand, n int) []float64 {
total := float64(0)
distribution := make([]float64, n)
for i := 0; i < n; i++ {
num := float64(randSource.Intn(10))
distribution[i] = num
total += num
}

for i := 0; i < n; i++ {
distribution[i] = distribution[i] / total
}
return distribution
}

// RangesInfoWithDistribution returns a RangesInfo, where the stores given are
// initialized with the specified % of the replicas. This is done on a best
// effort basis, given the replication factor. It may be impossible to satisfy
Expand Down Expand Up @@ -250,6 +324,61 @@ func RangesInfoEvenDistribution(
int64(MinKey), int64(keyspace), rangeSize)
}

// RangesInfoWeightedRandDistribution returns a RangesInfo, where ranges are
// generated with a weighted random distribution across stores.
func RangesInfoWeightedRandDistribution(
randSource *rand.Rand,
weightedStores []float64,
ranges int,
keyspace int,
replicationFactor int,
rangeSize int64,
) RangesInfo {
if randSource == nil || len(weightedStores) == 0 {
panic("randSource cannot be nil and weightedStores must be non-empty in order to generate weighted random range info")
}
distribution := weightedRandDistribution(randSource, weightedStores)
storeList := makeStoreList(len(weightedStores))
spanConfig := defaultSpanConfig
spanConfig.NumReplicas = int32(replicationFactor)
spanConfig.NumVoters = int32(replicationFactor)
return RangesInfoWithDistribution(
storeList,
distribution,
distribution,
ranges,
spanConfig,
int64(MinKey),
int64(keyspace),
rangeSize, /* rangeSize */
)
}

// RangesInfoRandDistribution returns a RangesInfo, where ranges are generated
// with a random distribution across stores.
func RangesInfoRandDistribution(
randSource *rand.Rand,
stores int,
ranges int,
keyspace int,
replicationFactor int,
rangeSize int64,
) RangesInfo {
if randSource == nil {
panic("randSource cannot be nil in order to generate random range info")
}
distribution := randDistribution(randSource, stores)
storeList := makeStoreList(stores)

spanConfig := defaultSpanConfig
spanConfig.NumReplicas = int32(replicationFactor)
spanConfig.NumVoters = int32(replicationFactor)

return RangesInfoWithDistribution(
storeList, distribution, distribution, ranges, spanConfig,
int64(MinKey), int64(keyspace), rangeSize)
}

// NewStateWithDistribution returns a State where the stores given are
// initialized with the specified % of the replicas. This is done on a best
// effort basis, given the replication factor. It may be impossible to satisfy
Expand Down Expand Up @@ -320,3 +449,35 @@ func NewStateSkewedDistribution(
rangesInfo := RangesInfoSkewedDistribution(stores, ranges, keyspace, replicationFactor, 0 /* rangeSize */)
return LoadConfig(clusterInfo, rangesInfo, settings)
}

// NewStateRandDistribution returns a new State where the replica count per
// store is randomized.
func NewStateRandDistribution(
seed int64,
stores int,
ranges int,
keyspace int,
replicationFactor int,
settings *config.SimulationSettings,
) State {
randSource := rand.New(rand.NewSource(seed))
clusterInfo := ClusterInfoWithStoreCount(stores, 1 /* storesPerNode */)
rangesInfo := RangesInfoRandDistribution(randSource, stores, ranges, keyspace, replicationFactor, 0 /* rangeSize */)
return LoadConfig(clusterInfo, rangesInfo, settings)
}

// NewStateWeightedRandDistribution returns a new State where the replica count
// per store is weighted randomized based on weightedStores.
func NewStateWeightedRandDistribution(
seed int64,
weightedStores []float64,
ranges int,
keyspace int,
replicationFactor int,
settings *config.SimulationSettings,
) State {
randSource := rand.New(rand.NewSource(seed))
clusterInfo := ClusterInfoWithStoreCount(len(weightedStores), 1 /* storesPerNode */)
rangesInfo := RangesInfoWeightedRandDistribution(randSource, weightedStores, ranges, keyspace, replicationFactor, 0 /* rangeSize */)
return LoadConfig(clusterInfo, rangesInfo, settings)
}
50 changes: 49 additions & 1 deletion pkg/kv/kvserver/asim/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,20 @@ func TestOrderedStateLists(t *testing.T) {
// Test a skewed distribution with 100 stores, 10k ranges and 1m keyspace.
s = NewStateSkewedDistribution(100, 10000, 3, 1000000, settings)
assertListsOrdered(s)

const defaultSeed = 42
s = NewStateRandDistribution(defaultSeed, 7, 1400, 10000, 3, settings)
assertListsOrdered(s)

s = NewStateWeightedRandDistribution(defaultSeed, []float64{0.0, 0.1, 0.3, 0.6}, 1400, 10000, 3, settings)
assertListsOrdered(s)
}

// TestNewStateDeterministic asserts that the state returned from the new state
// utility functions is deterministic.
func TestNewStateDeterministic(t *testing.T) {
settings := config.DefaultSimulationSettings()

const defaultSeed = 42
testCases := []struct {
desc string
newStateFn func() State
Expand All @@ -409,6 +416,18 @@ func TestNewStateDeterministic(t *testing.T) {
return NewStateWithDistribution([]float64{0.2, 0.2, 0.2, 0.2, 0.2}, 5, 3, 10000, settings)
},
},
{
desc: "rand distribution ",
newStateFn: func() State {
return NewStateRandDistribution(defaultSeed, 7, 1400, 10000, 3, settings)
},
},
{
desc: "weighted rand distribution ",
newStateFn: func() State {
return NewStateWeightedRandDistribution(defaultSeed, []float64{0.0, 0.1, 0.3, 0.6}, 1400, 10000, 3, settings)
},
},
}

for _, tc := range testCases {
Expand All @@ -421,6 +440,35 @@ func TestNewStateDeterministic(t *testing.T) {
}
}

// TestRandDistribution asserts that the distribution returned from
// randDistribution and weightedRandDistribution sum up to 1.
func TestRandDistribution(t *testing.T) {
const defaultSeed = 42
randSource := rand.New(rand.NewSource(defaultSeed))
testCases := []struct {
desc string
distribution []float64
}{
{
desc: "random distribution",
distribution: randDistribution(randSource, 7),
},
{
desc: "weighted random distribution",
distribution: weightedRandDistribution(randSource, []float64{0.0, 0.1, 0.3, 0.6}),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
total := float64(0)
for i := 0; i < len(tc.distribution); i++ {
total += tc.distribution[i]
}
require.Equal(t, float64(1), total)
})
}
}

// TestSplitRangeDeterministic asserts that range splits are deterministic.
func TestSplitRangeDeterministic(t *testing.T) {
settings := config.DefaultSimulationSettings()
Expand Down
23 changes: 22 additions & 1 deletion pkg/kv/kvserver/asim/tests/default_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func defaultLoadGen() gen.BasicLoad {
const (
defaultRanges = 1
defaultPlacementType = gen.Even
defaultReplicationFactor = 1
defaultReplicationFactor = 3
defaultBytes = 0
)

Expand Down Expand Up @@ -108,3 +108,24 @@ func defaultPlotSettings() plotSettings {
width: defaultWidth,
}
}

type rangeGenSettings struct {
rangeKeyGenType generatorType
keySpaceGenType generatorType
weightedRand []float64
}

const (
defaultRangeKeyGenType = uniformGenerator
defaultKeySpaceGenType = uniformGenerator
)

var defaultWeightedRand []float64

func defaultRangeGenSettings() rangeGenSettings {
return rangeGenSettings{
rangeKeyGenType: defaultRangeKeyGenType,
keySpaceGenType: defaultKeySpaceGenType,
weightedRand: defaultWeightedRand,
}
}
Loading