From 420a85cada05ae2542a96d2aca74d5e189f6a75b Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Sun, 16 Jul 2023 13:42:58 -0400 Subject: [PATCH] fix(generators): address problem when size ot pk is too small When pk is too small gemini had number of problems: 1. If it is one byte size, then generators are iterating over 0-255, while most of them in flights, load threads are getting stuck on waiting for valies, while generators can't provide new one 2. Murmur to get 100% partition coverage as result some partitions are not getting data while load threads still target them making treads stuck forever --- cmd/gemini/generators.go | 12 +++++-- cmd/gemini/root.go | 48 ++++++++++++++++++--------- pkg/generators/generator.go | 59 +++++++++++++++++++++++++++------ pkg/generators/partition.go | 31 ++++++++++++------ pkg/realrandom/source.go | 65 +++++++++++++++++++++++++++++++++++++ pkg/typedef/bag.go | 12 +++++-- pkg/typedef/columns.go | 9 +++++ pkg/typedef/interfaces.go | 10 ++++++ pkg/typedef/schema.go | 21 ++++++++++++ pkg/typedef/simple_type.go | 47 +++++++++++++++++++++++++-- pkg/typedef/tuple.go | 9 +++++ pkg/typedef/types.go | 19 ++++++++++- pkg/typedef/udt.go | 9 +++++ 13 files changed, 307 insertions(+), 44 deletions(-) create mode 100644 pkg/realrandom/source.go diff --git a/cmd/gemini/generators.go b/cmd/gemini/generators.go index b1e188d9..9b4f51de 100644 --- a/cmd/gemini/generators.go +++ b/cmd/gemini/generators.go @@ -31,20 +31,26 @@ func createGenerators( var gs []*generators.Generator for id := range schema.Tables { table := schema.Tables[id] + pkVariations := table.PartitionKeys.ValueVariationsNumber(&partitionRangeConfig) - distFunc, err := createDistributionFunc(partitionKeyDistribution, partitionCount, seed, stdDistMean, oneStdDev) + distFunc, err := createDistributionFunc(partitionKeyDistribution, distributionSize, seed, stdDistMean, oneStdDev) if err != nil { return nil, err } - gCfg := &generators.Config{ + tablePartConfig := &generators.Config{ PartitionsRangeConfig: partitionRangeConfig, PartitionsCount: distributionSize, PartitionsDistributionFunc: distFunc, Seed: seed, PkUsedBufferSize: pkBufferReuseSize, } - g := generators.NewGenerator(table, gCfg, logger.Named("generators")) + g := generators.NewGenerator(table, tablePartConfig, logger.Named("generators")) + if pkVariations < 2^32 { + // Low partition key variation can lead to having staled partitions + // Let's detect and mark them before running test + g.FindAndMarkStalePartitions() + } gs = append(gs, g) } return gs, nil diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index e59111df..3893c680 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -15,7 +15,6 @@ package main import ( - "encoding/binary" "encoding/json" "fmt" "log" @@ -32,6 +31,7 @@ import ( "github.com/scylladb/gemini/pkg/builders" "github.com/scylladb/gemini/pkg/generators" "github.com/scylladb/gemini/pkg/jobs" + "github.com/scylladb/gemini/pkg/realrandom" "github.com/scylladb/gemini/pkg/replication" "github.com/scylladb/gemini/pkg/store" "github.com/scylladb/gemini/pkg/typedef" @@ -40,8 +40,6 @@ import ( "github.com/scylladb/gemini/pkg/status" "github.com/scylladb/gemini/pkg/stop" - crand "crypto/rand" - "github.com/gocql/gocql" "github.com/hailocab/go-hostpool" "github.com/pkg/errors" @@ -188,10 +186,6 @@ func run(_ *cobra.Command, _ []string) error { }() } - if err = printSetup(intSeed, intSchemaSeed); err != nil { - return errors.Wrapf(err, "unable to print setup") - } - outFile, err := createFile(outFileArg, os.Stdout) if err != nil { return err @@ -209,10 +203,15 @@ func run(_ *cobra.Command, _ []string) error { return errors.Wrap(err, "cannot create schema") } } else { - schema = generators.GenSchema(schemaConfig, intSchemaSeed) + schema, intSchemaSeed, err = generateSchema(logger, schemaConfig, schemaSeed) + if err != nil { + return errors.Wrapf(err, "failed to create schema for seed %s", schemaSeed) + } } jsonSchema, _ := json.MarshalIndent(schema, "", " ") + + printSetup(intSeed, intSchemaSeed) fmt.Printf("Schema: %v\n", string(jsonSchema)) testCluster, oracleCluster := createClusters(cons, testHostSelectionPolicy, oracleHostSelectionPolicy, logger) @@ -534,7 +533,7 @@ func init() { rootCmd.Flags().IntVarP(&maxErrorsToStore, "max-errors-to-store", "", 1000, "Maximum number of errors to store and output at the end") } -func printSetup(seed, schemaSeed uint64) error { +func printSetup(seed, schemaSeed uint64) { tw := new(tabwriter.Writer) tw.Init(os.Stdout, 0, 8, 2, '\t', tabwriter.AlignRight) fmt.Fprintf(tw, "Seed:\t%d\n", seed) @@ -550,16 +549,10 @@ func printSetup(seed, schemaSeed uint64) error { fmt.Fprintf(tw, "Output file:\t%s\n", outFileArg) } tw.Flush() - return nil } func RealRandom() uint64 { - var b [8]byte - _, err := crand.Read(b[:]) - if err != nil { - return uint64(time.Now().Nanosecond() * time.Now().Second()) - } - return binary.LittleEndian.Uint64(b[:]) + return rand.New(realrandom.Source).Uint64() } func validateSeed(seed string) error { @@ -577,3 +570,26 @@ func seedFromString(seed string) uint64 { val, _ := strconv.ParseUint(seed, 10, 64) return val } + +// generateSchema generates schema, if schema seed is random and schema did not pass validation it regenerates it +func generateSchema(logger *zap.Logger, sc typedef.SchemaConfig, schemaSeed string) (schema *typedef.Schema, intSchemaSeed uint64, err error) { + intSchemaSeed = seedFromString(schemaSeed) + schema = generators.GenSchema(sc, intSchemaSeed) + err = schema.Validate(partitionCount) + if err == nil { + return schema, intSchemaSeed, nil + } + if schemaSeed != "random" { + // If user provided schema, allow to run it, but log warning + logger.Warn(errors.Wrap(err, "validation failed, running this test could end up in error or stale gemini").Error()) + return schema, intSchemaSeed, nil + } + + for err != nil { + intSchemaSeed = seedFromString(schemaSeed) + schema = generators.GenSchema(sc, intSchemaSeed) + err = schema.Validate(partitionCount) + } + + return schema, intSchemaSeed, nil +} diff --git a/pkg/generators/generator.go b/pkg/generators/generator.go index 0277b9ab..500fc759 100644 --- a/pkg/generators/generator.go +++ b/pkg/generators/generator.go @@ -96,30 +96,43 @@ func NewGenerator(table *typedef.Table, config *Config, logger *zap.Logger) *Gen } func (g *Generator) Get() *typedef.ValueWithToken { - return g.partitions.GetPartitionForToken(g.idxFunc()).get() + targetPart := g.GetPartitionForToken(g.idxFunc()) + for targetPart.Stale() { + targetPart = g.GetPartitionForToken(g.idxFunc()) + } + out := targetPart.get() + return out +} + +func (g *Generator) GetPartitionForToken(token TokenIndex) *Partition { + return g.partitions[g.shardOf(uint64(token))] } // GetOld returns a previously used value and token or a new if // the old queue is empty. func (g *Generator) GetOld() *typedef.ValueWithToken { - return g.partitions.GetPartitionForToken(g.idxFunc()).getOld() + targetPart := g.GetPartitionForToken(g.idxFunc()) + for targetPart.Stale() { + targetPart = g.GetPartitionForToken(g.idxFunc()) + } + return targetPart.getOld() } // GiveOld returns the supplied value for later reuse unless func (g *Generator) GiveOld(v *typedef.ValueWithToken) { - g.partitions.GetPartitionForToken(TokenIndex(v.Token)).giveOld(v) + g.GetPartitionForToken(TokenIndex(v.Token)).giveOld(v) } -// GiveOlds returns the supplied value for later reuse unless -func (g *Generator) GiveOlds(v []*typedef.ValueWithToken) { - for _, token := range v { - g.partitions.GetPartitionForToken(TokenIndex(token.Token)).giveOld(token) +// GiveOlds returns the supplied values for later reuse unless +func (g *Generator) GiveOlds(tokens []*typedef.ValueWithToken) { + for _, token := range tokens { + g.GiveOld(token) } } // ReleaseToken removes the corresponding token from the in-flight tracking. func (g *Generator) ReleaseToken(token uint64) { - g.partitions.GetPartitionForToken(TokenIndex(token)).releaseToken(token) + g.GetPartitionForToken(TokenIndex(token)).releaseToken(token) } func (g *Generator) Start(stopFlag *stop.Flag) { @@ -140,14 +153,36 @@ func (g *Generator) Start(stopFlag *stop.Flag) { }() } +func (g *Generator) FindAndMarkStalePartitions() { + r := rand.New(rand.NewSource(10)) + nonStale := make([]bool, g.partitionCount) + for n := uint64(0); n < g.partitionCount*100; n++ { + values := CreatePartitionKeyValues(g.table, r, &g.partitionsConfig) + token, err := g.routingKeyCreator.GetHash(g.table, values) + if err != nil { + g.logger.Panic(errors.Wrap(err, "failed to get primary key hash").Error()) + } + nonStale[g.shardOf(token)] = true + } + + for idx, val := range nonStale { + if !val { + g.partitions[idx].MarkStale() + } + } +} + // fillAllPartitions guarantees that each partition was tested to be full // at least once since the function started and before it ended. // In other words no partition will be starved. func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) { pFilled := make([]bool, len(g.partitions)) allFilled := func() bool { - for _, filled := range pFilled { + for idx, filled := range pFilled { if !filled { + if g.partitions[idx].Stale() { + continue + } return false } } @@ -162,7 +197,7 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) { g.cntCreated++ idx := token % g.partitionCount partition := g.partitions[idx] - if partition.inFlight.Has(token) { + if partition.Stale() || partition.inFlight.Has(token) { continue } select { @@ -178,3 +213,7 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) { } } } + +func (g *Generator) shardOf(token uint64) int { + return int(token % g.partitionCount) +} diff --git a/pkg/generators/partition.go b/pkg/generators/partition.go index 11603976..e70d46c2 100644 --- a/pkg/generators/partition.go +++ b/pkg/generators/partition.go @@ -28,6 +28,16 @@ type Partition struct { wakeUpSignal chan<- struct{} // wakes up generator closed bool lock sync.RWMutex + isStale bool +} + +func (s *Partition) MarkStale() { + s.isStale = true + s.Close() +} + +func (s *Partition) Stale() bool { + return s.isStale } // get returns a new value and ensures that it's corresponding token @@ -103,18 +113,23 @@ func (s *Partition) safelyGetOldValuesChannel() chan *typedef.ValueWithToken { return s.oldValues } -func (s *Partition) safelyCloseOldValuesChannel() { +func (s *Partition) Close() { + s.lock.RLock() + if s.closed { + s.lock.RUnlock() + return + } + s.lock.RUnlock() s.lock.Lock() + if s.closed { + return + } s.closed = true + close(s.values) close(s.oldValues) s.lock.Unlock() } -func (s *Partition) Close() { - close(s.values) - s.safelyCloseOldValuesChannel() -} - type Partitions []*Partition func (p Partitions) CloseAll() { @@ -123,10 +138,6 @@ func (p Partitions) CloseAll() { } } -func (p Partitions) GetPartitionForToken(token TokenIndex) *Partition { - return p[uint64(token)%uint64(len(p))] -} - func NewPartitions(count, pkBufferSize int, wakeUpSignal chan struct{}) Partitions { partitions := make(Partitions, count) for i := 0; i < len(partitions); i++ { diff --git a/pkg/realrandom/source.go b/pkg/realrandom/source.go new file mode 100644 index 00000000..40631552 --- /dev/null +++ b/pkg/realrandom/source.go @@ -0,0 +1,65 @@ +// Copyright 2019 ScyllaDB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package realrandom + +import ( + crand "crypto/rand" + "encoding/binary" + "math/bits" + "time" + + "golang.org/x/exp/rand" +) + +var Source rand.Source + +type crandSource struct{} + +func (c *crandSource) Uint64() uint64 { + var out [8]byte + _, _ = crand.Read(out[:]) + return binary.LittleEndian.Uint64(out[:]) +} + +func (c *crandSource) Seed(_ uint64) {} + +type TimeSource struct { + source rand.Source +} + +func NewTimeSource() *TimeSource { + now := time.Now() + return &TimeSource{ + source: rand.NewSource(uint64(now.Nanosecond() * now.Second())), + } +} + +func (c *TimeSource) Uint64() uint64 { + now := time.Now() + val := c.source.Uint64() + return bits.RotateLeft64(val^uint64(now.Nanosecond()*now.Second()), -int(val>>58)) +} + +func (c *TimeSource) Seed(_ uint64) {} + +func init() { + var b [8]byte + _, err := crand.Read(b[:]) + if err == nil { + Source = &crandSource{} + } else { + Source = NewTimeSource() + } +} diff --git a/pkg/typedef/bag.go b/pkg/typedef/bag.go index 3a05d525..9178c3cd 100644 --- a/pkg/typedef/bag.go +++ b/pkg/typedef/bag.go @@ -16,11 +16,14 @@ package typedef import ( "fmt" + "math" "reflect" "strings" "github.com/gocql/gocql" "golang.org/x/exp/rand" + + "github.com/scylladb/gemini/pkg/utils" ) type BagType struct { @@ -79,7 +82,7 @@ func (ct *BagType) CQLPretty(query string, value []interface{}) (string, int) { } func (ct *BagType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []interface{} { - count := r.Intn(9) + 1 + count := utils.RandInt2(r, 1, maxBagSize+1) out := make([]interface{}, count) for i := 0; i < count; i++ { out[i] = ct.ValueType.GenValue(r, p)[0] @@ -88,7 +91,7 @@ func (ct *BagType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []interface{} } func (ct *BagType) GenJSONValue(r *rand.Rand, p *PartitionRangeConfig) interface{} { - count := r.Intn(9) + 1 + count := utils.RandInt2(r, 1, maxBagSize+1) out := make([]interface{}, count) for i := 0; i < count; i++ { out[i] = ct.ValueType.GenJSONValue(r, p) @@ -103,3 +106,8 @@ func (ct *BagType) LenValue() int { func (ct *BagType) Indexable() bool { return false } + +// ValueVariationsNumber returns number of bytes generated value holds +func (ct *BagType) ValueVariationsNumber(p *PartitionRangeConfig) float64 { + return math.Pow(ct.ValueType.ValueVariationsNumber(p), maxBagSize) +} diff --git a/pkg/typedef/columns.go b/pkg/typedef/columns.go index cb00f9ea..1333a41e 100644 --- a/pkg/typedef/columns.go +++ b/pkg/typedef/columns.go @@ -146,6 +146,15 @@ func (c Columns) NonCounters() Columns { return out } +// ValueVariationsNumber returns number of bytes generated value holds +func (c Columns) ValueVariationsNumber(p *PartitionRangeConfig) float64 { + var out float64 + for _, col := range c { + out *= col.Type.ValueVariationsNumber(p) + } + return out +} + func GetMapTypeColumn(data map[string]interface{}) (out *ColumnDef, err error) { st := struct { Type map[string]interface{} diff --git a/pkg/typedef/interfaces.go b/pkg/typedef/interfaces.go index c362ce0c..ee05b42d 100644 --- a/pkg/typedef/interfaces.go +++ b/pkg/typedef/interfaces.go @@ -27,6 +27,8 @@ type Type interface { GenValue(*rand.Rand, *PartitionRangeConfig) []interface{} GenJSONValue(*rand.Rand, *PartitionRangeConfig) interface{} LenValue() int + // ValueVariationsNumber returns number of bytes generated value holds + ValueVariationsNumber(*PartitionRangeConfig) float64 Indexable() bool CQLType() gocql.TypeInfo } @@ -40,3 +42,11 @@ func (l Types) LenValue() int { } return out } + +func (l Types) ValueVariationsNumber(p *PartitionRangeConfig) float64 { + var out float64 + for _, t := range l { + out *= t.ValueVariationsNumber(p) + } + return out +} diff --git a/pkg/typedef/schema.go b/pkg/typedef/schema.go index b29ab480..32f36350 100644 --- a/pkg/typedef/schema.go +++ b/pkg/typedef/schema.go @@ -18,6 +18,8 @@ import ( "encoding/json" "strconv" + "github.com/pkg/errors" + "github.com/scylladb/gemini/pkg/murmur" ) @@ -43,6 +45,25 @@ func (s *Schema) GetHash() string { return strconv.FormatUint(uint64(murmur.Murmur3H1(out)), 16) } +func (s *Schema) Validate(distributionSize uint64) error { + prConfig := s.Config.GetPartitionRangeConfig() + for _, table := range s.Tables { + pkVariations := table.PartitionKeys.ValueVariationsNumber(&prConfig) + if pkVariations < 2^24 { + // On small pk size gemini stuck due to the multiple reasons, all the values could stack in inflights + // or partitions can become stale due to the murmur hash not hitting them, since pk low resolution + return errors.Errorf("pk size %d is less than gemini can handle", uint64(pkVariations)) + } + if float64(distributionSize*100) > pkVariations { + // With low partition variations there is a chance that partition can become stale due to the + // murmur hash not hitting it + // To avoid this scenario we need to make sure that every given partition could hold at least 100 values + return errors.Errorf("pk size %d is less than --token-range-slices multiplied by 100", uint64(pkVariations)) + } + } + return nil +} + func (m *MaterializedView) HaveNonPrimaryKey() bool { return m.NonPrimaryKey != nil } diff --git a/pkg/typedef/simple_type.go b/pkg/typedef/simple_type.go index 1a650312..f8ca0105 100644 --- a/pkg/typedef/simple_type.go +++ b/pkg/typedef/simple_type.go @@ -17,6 +17,7 @@ package typedef import ( "encoding/hex" "fmt" + "math" "math/big" "net" "strings" @@ -202,14 +203,56 @@ func (st SimpleType) genValue(r *rand.Rand, p *PartitionRangeConfig) interface{} case TYPE_INT: return r.Int31() case TYPE_SMALLINT: - return int16(r.Uint64n(65535)) + return int16(r.Uint64n(65536)) case TYPE_TIMEUUID, TYPE_UUID: return utils.UUIDFromTime(r) case TYPE_TINYINT: - return int8(r.Uint64n(255)) + return int8(r.Uint64n(256)) case TYPE_VARINT: return big.NewInt(r.Int63()) default: panic(fmt.Sprintf("generate value: not supported type %s", st)) } } + +// ValueVariationsNumber returns number of bytes generated value holds +func (st SimpleType) ValueVariationsNumber(p *PartitionRangeConfig) float64 { + switch st { + case TYPE_ASCII, TYPE_TEXT, TYPE_VARCHAR: + return math.Pow(2, float64(p.MaxStringLength)) + case TYPE_BLOB: + return math.Pow(2, float64(p.MaxBlobLength)) + case TYPE_BIGINT: + return 2 ^ 64 + case TYPE_BOOLEAN: + return 2 + case TYPE_DATE: + return 10000*365 + 2000*4 + case TYPE_TIME: + return 86400000000000 + case TYPE_TIMESTAMP: + return 2 ^ 64 + case TYPE_DECIMAL: + return 2 ^ 64 + case TYPE_DOUBLE: + return 2 ^ 64 + case TYPE_DURATION: + return 2 ^ 64 + case TYPE_FLOAT: + return 2 ^ 64 + case TYPE_INET: + return 2 ^ 32 + case TYPE_INT: + return 2 ^ 32 + case TYPE_SMALLINT: + return 2 ^ 16 + case TYPE_TIMEUUID, TYPE_UUID: + return 2 ^ 64 + case TYPE_TINYINT: + return 2 ^ 8 + case TYPE_VARINT: + return 2 ^ 64 + default: + panic(fmt.Sprintf("generate value: not supported type %s", st)) + } +} diff --git a/pkg/typedef/tuple.go b/pkg/typedef/tuple.go index 75e2cd7f..72c5c6b1 100644 --- a/pkg/typedef/tuple.go +++ b/pkg/typedef/tuple.go @@ -98,3 +98,12 @@ func (t *TupleType) LenValue() int { } return out } + +// ValueVariationsNumber returns number of bytes generated value holds +func (t *TupleType) ValueVariationsNumber(p *PartitionRangeConfig) float64 { + var out float64 + for _, tp := range t.ValueTypes { + out *= out * tp.ValueVariationsNumber(p) + } + return out +} diff --git a/pkg/typedef/types.go b/pkg/typedef/types.go index 478190ea..8ac6f2e6 100644 --- a/pkg/typedef/types.go +++ b/pkg/typedef/types.go @@ -16,6 +16,7 @@ package typedef import ( "fmt" + "math" "reflect" "strings" "sync/atomic" @@ -59,6 +60,11 @@ const ( TYPE_VARINT = SimpleType("varint") ) +const ( + maxMapSize = 10 + maxBagSize = 10 +) + var ( TypesMapKeyBlacklist = map[SimpleType]struct{}{ TYPE_BLOB: {}, @@ -159,7 +165,7 @@ func (mt *MapType) GenJSONValue(r *rand.Rand, p *PartitionRangeConfig) interface } func (mt *MapType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []interface{} { - count := r.Intn(9) + 1 + count := utils.RandInt2(r, 1, maxMapSize+1) vals := reflect.MakeMap(reflect.MapOf(reflect.TypeOf(mt.KeyType.GenValue(r, p)[0]), reflect.TypeOf(mt.ValueType.GenValue(r, p)[0]))) for i := 0; i < count; i++ { vals.SetMapIndex(reflect.ValueOf(mt.KeyType.GenValue(r, p)[0]), reflect.ValueOf(mt.ValueType.GenValue(r, p)[0])) @@ -182,6 +188,11 @@ func (mt *MapType) Indexable() bool { return false } +// ValueVariationsNumber returns number of bytes generated value holds +func (mt *MapType) ValueVariationsNumber(p *PartitionRangeConfig) float64 { + return math.Pow(mt.KeyType.ValueVariationsNumber(p)*mt.ValueType.ValueVariationsNumber(p), maxMapSize) +} + type CounterType struct { Value int64 } @@ -227,3 +238,9 @@ func (ct *CounterType) CQLDef() string { func (ct *CounterType) Indexable() bool { return false } + +// ValueVariationsNumber returns number of bytes generated value holds +func (ct *CounterType) ValueVariationsNumber(_ *PartitionRangeConfig) float64 { + // As a type, counters are a 64-bit signed integer + return 2 ^ 64 +} diff --git a/pkg/typedef/udt.go b/pkg/typedef/udt.go index 387641c9..6f4b7b4f 100644 --- a/pkg/typedef/udt.go +++ b/pkg/typedef/udt.go @@ -93,3 +93,12 @@ func (t *UDTType) GenValue(r *rand.Rand, p *PartitionRangeConfig) []interface{} func (t *UDTType) LenValue() int { return 1 } + +// ValueVariationsNumber returns number of bytes generated value holds +func (t *UDTType) ValueVariationsNumber(p *PartitionRangeConfig) float64 { + var out float64 + for _, tp := range t.ValueTypes { + out *= tp.ValueVariationsNumber(p) + } + return out +}