diff --git a/CHANGELOG.md b/CHANGELOG.md index ae338e24..9b0271de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- DDL statements are now emitted with low frequency if the `--cql-features` is set to at + least `"all"` level. - Data sizes are configurable though a CLI argument `--dataset-size` and the currently supported values are "small" and "large". - CLI toggle `--cql-features` added to let the user select which type of CQL features diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index 062478b9..abe09f6c 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -80,7 +80,7 @@ func interactive() bool { return !nonInteractive } -type testJob func(context.Context, <-chan heartBeat, *sync.WaitGroup, *gemini.Schema, *gemini.Table, store.Store, gemini.PartitionRange, chan Status, string, *os.File, time.Duration) +type testJob func(context.Context, <-chan heartBeat, *sync.WaitGroup, *gemini.Schema, *gemini.SchemaConfig, *gemini.Table, store.Store, gemini.PartitionRange, chan Status, string, *os.File, time.Duration) func (r *Status) Merge(sum *Status) Status { sum.WriteOps += r.WriteOps @@ -350,7 +350,7 @@ func runJob(f testJob, schema *gemini.Schema, schemaConfig *gemini.SchemaConfig, MaxStringLength: schemaConfig.MaxStringLength, MinStringLength: schemaConfig.MinStringLength, } - go f(workerCtx, pump.ch, &workers, schema, table, s, p, c, mode, out, warmup) + go f(workerCtx, pump.ch, &workers, schema, schemaConfig, table, s, p, c, mode, out, warmup) } } @@ -389,7 +389,48 @@ func sampleResults(p *Pump, c chan Status, sp *spinner.Spinner) Status { return testRes } -func mutationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File, deletes bool) { +func ddlJob(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status) { + if sc.CQLFeature != gemini.CQL_FEATURE_ALL { + if verbose { + fmt.Println("ddl statements disabled") + } + return + } + table.Lock() + defer table.Unlock() + ddlStmts, postStmtHook, err := schema.GenDDLStmt(table, &p, sc) + if err != nil { + fmt.Printf("Failed! Mutation statement generation failed: '%v'\n", err) + testStatus.WriteErrors++ + return + } + defer postStmtHook() + defer func() { + if verbose { + jsonSchema, _ := json.MarshalIndent(schema, "", " ") + fmt.Printf("Schema: %v\n", string(jsonSchema)) + } + }() + for _, ddlStmt := range ddlStmts { + ddlQuery := ddlStmt.Query + if verbose { + fmt.Println(ddlStmt.PrettyCQL()) + } + if err := s.Mutate(ctx, ddlQuery); err != nil { + e := JobError{ + Timestamp: time.Now(), + Message: "DDL failed: " + err.Error(), + Query: ddlStmt.PrettyCQL(), + } + testStatus.Errors = append(testStatus.Errors, e) + testStatus.WriteErrors++ + } else { + testStatus.WriteOps++ + } + } +} + +func mutationJob(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File, deletes bool) { mutateStmt, err := schema.GenMutateStmt(table, &p, deletes) if err != nil { fmt.Printf("Failed! Mutation statement generation failed: '%v'\n", err) @@ -414,7 +455,7 @@ func mutationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table } } -func validationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) { +func validationJob(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) { checkStmt := schema.GenCheckStmt(table, &p) checkQuery := checkStmt.Query checkValues := checkStmt.Values() @@ -444,7 +485,7 @@ func (hb heartBeat) await() { time.Sleep(hb.sleep) } } -func Job(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, c chan Status, mode string, out *os.File, warmup time.Duration) { +func Job(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaConfig *gemini.SchemaConfig, table *gemini.Table, s store.Store, p gemini.PartitionRange, c chan Status, mode string, out *os.File, warmup time.Duration) { defer wg.Done() testStatus := Status{} var i int @@ -457,7 +498,7 @@ warmup: case <-warmupTimer.C: break warmup default: - mutationJob(ctx, schema, table, s, p, &testStatus, out, false) + mutationJob(ctx, schema, schemaConfig, table, s, p, &testStatus, out, false) if i%1000 == 0 { c <- testStatus testStatus = Status{} @@ -469,15 +510,19 @@ warmup: hb.await() switch mode { case writeMode: - mutationJob(ctx, schema, table, s, p, &testStatus, out, true) + mutationJob(ctx, schema, schemaConfig, table, s, p, &testStatus, out, true) case readMode: - validationJob(ctx, schema, table, s, p, &testStatus, out) + validationJob(ctx, schema, schemaConfig, table, s, p, &testStatus, out) default: - ind := p.Rand.Intn(100000) % 2 - if ind == 0 { - mutationJob(ctx, schema, table, s, p, &testStatus, out, true) + ind := p.Rand.Intn(1000000) + if ind%2 == 0 { + if ind%100000 == 0 { + ddlJob(ctx, schema, schemaConfig, table, s, p, &testStatus) + } else { + mutationJob(ctx, schema, schemaConfig, table, s, p, &testStatus, out, true) + } } else { - validationJob(ctx, schema, table, s, p, &testStatus, out) + validationJob(ctx, schema, schemaConfig, table, s, p, &testStatus, out) } } diff --git a/schema.go b/schema.go index 4902cb17..a43f486b 100644 --- a/schema.go +++ b/schema.go @@ -2,12 +2,13 @@ package gemini import ( "encoding/json" - "errors" "fmt" "math/rand" "strconv" "strings" + "sync" + "github.com/pkg/errors" "github.com/scylladb/gocqlx/qb" ) @@ -109,9 +110,23 @@ type Table struct { Indexes []IndexDef `json:"indexes"` MaterializedViews []MaterializedView `json:"materialized_views"` KnownIssues map[string]bool `json:"known_issues"` + + // mu protects the table during schema changes + mu sync.RWMutex +} + +func (t *Table) Lock() { + t.mu.Lock() +} + +func (t *Table) Unlock() { + t.mu.Unlock() } func (t *Table) GetCreateTable(ks Keyspace) string { + t.mu.RLock() + defer t.mu.RUnlock() + var ( partitionKeys []string clusteringKeys []string @@ -143,6 +158,9 @@ func (t *Table) GetCreateTable(ks Keyspace) string { } func (t *Table) GetCreateTypes(keyspace Keyspace) []string { + t.mu.RLock() + defer t.mu.RUnlock() + var stmts []string for _, column := range t.Columns { switch c := column.Type.(type) { @@ -158,6 +176,93 @@ func (t *Table) GetCreateTypes(keyspace Keyspace) []string { return stmts } +type AlterTableBuilder struct { + stmt string +} + +func (atb *AlterTableBuilder) ToCql() (string, []string) { + return atb.stmt, nil +} + +func (t *Table) addColumn(keyspace string, sc *SchemaConfig) ([]*Stmt, func(), error) { + var stmts []*Stmt + column := ColumnDef{Name: genColumnName("col", len(t.Columns)+1), Type: genColumnType(len(t.Columns)+1, sc)} + if c, ok := column.Type.(UDTType); ok { + createType := "CREATE TYPE %s.%s (%s);" + var typs []string + for name, typ := range c.Types { + typs = append(typs, name+" "+typ.CQLDef()) + } + stmt := fmt.Sprintf(createType, keyspace, c.TypeName, strings.Join(typs, ",")) + stmts = append(stmts, &Stmt{ + Query: &AlterTableBuilder{ + stmt: stmt, + }, + Values: func() []interface{} { + return nil + }, + }) + } + stmt := "ALTER TABLE " + keyspace + "." + t.Name + " ADD " + column.Name + " " + column.Type.CQLDef() + stmts = append(stmts, &Stmt{ + Query: &AlterTableBuilder{ + stmt: stmt, + }, + Values: func() []interface{} { + return nil + }, + }) + return stmts, func() { + t.Columns = append(t.Columns, column) + }, nil +} + +func (t *Table) alterColumn(keyspace string) ([]*Stmt, func(), error) { + var stmts []*Stmt + idx := rand.Intn(len(t.Columns)) + column := t.Columns[idx] + oldType, isSimpleType := column.Type.(SimpleType) + if !isSimpleType { + return nil, func() {}, errors.Errorf("complex type=%s cannot be altered", column.Name) + } + if compatTypes, ok := compatibleColumnTypes[oldType]; ok { + newType := compatTypes[rand.Intn(len(compatTypes))] + newColumn := ColumnDef{Name: column.Name, Type: newType} + stmt := "ALTER TABLE " + keyspace + "." + t.Name + " ALTER " + column.Name + " TYPE " + column.Type.CQLDef() + stmts = append(stmts, &Stmt{ + Query: &AlterTableBuilder{ + stmt: stmt, + }, + Values: func() []interface{} { + return nil + }, + }) + fmt.Println(stmt) + return stmts, func() { + t.Columns[idx] = newColumn + }, nil + } + return nil, func() {}, errors.Errorf("simple type=%s has no compatible types so it cannot be altered", column.Name) +} + +func (t *Table) dropColumn(keyspace string) ([]*Stmt, func(), error) { + var stmts []*Stmt + idx := rand.Intn(len(t.Columns)) + column := t.Columns[idx] + stmt := "ALTER TABLE " + keyspace + "." + t.Name + " DROP " + column.Name + stmts = append(stmts, &Stmt{ + Query: &AlterTableBuilder{ + stmt: stmt, + }, + Values: func() []interface{} { + return nil + }, + }) + return stmts, func() { + t.Columns = append(t.Columns[:idx], t.Columns[idx+1:]...) + }, nil +} + type MaterializedView struct { Name string `json:"name"` PartitionKeys Columns `json:"partition_keys"` @@ -173,6 +278,9 @@ type Stmt struct { func (s *Stmt) PrettyCQL() string { var replaced int query, _ := s.Query.ToCql() + if len(s.Values()) == 0 { + return query + } values := s.Values() for _, typ := range s.Types { query, replaced = typ.CQLPretty(query, values) @@ -376,6 +484,9 @@ func (s *Schema) GetCreateSchema() []string { } func (s *Schema) GenInsertStmt(t *Table, p *PartitionRange) (*Stmt, error) { + t.mu.RLock() + defer t.mu.RUnlock() + var ( typs []Type ) @@ -411,6 +522,9 @@ func (s *Schema) GenInsertStmt(t *Table, p *PartitionRange) (*Stmt, error) { } func (s *Schema) GenInsertJsonStmt(t *Table, p *PartitionRange) (*Stmt, error) { + t.mu.RLock() + defer t.mu.RUnlock() + values := make(map[string]interface{}) values = t.PartitionKeys.ToJSONMap(values, p) values = t.ClusteringKeys.ToJSONMap(values, p) @@ -432,6 +546,9 @@ func (s *Schema) GenInsertJsonStmt(t *Table, p *PartitionRange) (*Stmt, error) { } func (s *Schema) GenDeleteRows(t *Table, p *PartitionRange) (*Stmt, error) { + t.mu.RLock() + defer t.mu.RUnlock() + var ( values []interface{} typs []Type @@ -457,7 +574,21 @@ func (s *Schema) GenDeleteRows(t *Table, p *PartitionRange) (*Stmt, error) { }, nil } +func (s *Schema) GenDDLStmt(t *Table, p *PartitionRange, sc *SchemaConfig) ([]*Stmt, func(), error) { + switch n := p.Rand.Intn(3); n { + //case 0: // Alter column not supported in Cassandra from 3.0.11 + // return t.alterColumn(s.Keyspace.Name) + case 1: // Delete column + return t.dropColumn(s.Keyspace.Name) + default: // Alter column + return t.addColumn(s.Keyspace.Name, sc) + } +} + func (s *Schema) GenMutateStmt(t *Table, p *PartitionRange, deletes bool) (*Stmt, error) { + t.mu.RLock() + defer t.mu.RUnlock() + if !deletes { return s.GenInsertStmt(t, p) } @@ -500,6 +631,9 @@ func (s *Schema) GenCheckStmt(t *Table, p *PartitionRange) *Stmt { } func (s *Schema) genSinglePartitionQuery(t *Table, p *PartitionRange) *Stmt { + t.mu.RLock() + defer t.mu.RUnlock() + tableName := t.Name partitionKeys := t.PartitionKeys if len(t.MaterializedViews) > 0 && p.Rand.Int()%2 == 0 { @@ -525,6 +659,9 @@ func (s *Schema) genSinglePartitionQuery(t *Table, p *PartitionRange) *Stmt { } func (s *Schema) genMultiplePartitionQuery(t *Table, p *PartitionRange) *Stmt { + t.mu.RLock() + defer t.mu.RUnlock() + var ( values []interface{} typs []Type @@ -558,6 +695,9 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, p *PartitionRange) *Stmt { } func (s *Schema) genClusteringRangeQuery(t *Table, p *PartitionRange) *Stmt { + t.mu.RLock() + defer t.mu.RUnlock() + var ( values []interface{} typs []Type @@ -599,6 +739,9 @@ func (s *Schema) genClusteringRangeQuery(t *Table, p *PartitionRange) *Stmt { } func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, p *PartitionRange) *Stmt { + t.mu.RLock() + defer t.mu.RUnlock() + var ( values []interface{} typs []Type @@ -646,6 +789,9 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, p *Partition } func (s *Schema) genSingleIndexQuery(t *Table, p *PartitionRange) *Stmt { + t.mu.RLock() + defer t.mu.RUnlock() + var ( values []interface{} typs []Type diff --git a/types.go b/types.go index f7a81ab0..83eede9f 100644 --- a/types.go +++ b/types.go @@ -42,8 +42,50 @@ const ( // TODO: Add support for time when gocql bug is fixed. var ( - pkTypes = []SimpleType{TYPE_ASCII, TYPE_BIGINT, TYPE_BLOB, TYPE_DATE, TYPE_DECIMAL, TYPE_DOUBLE, TYPE_FLOAT, TYPE_INET, TYPE_INT, TYPE_SMALLINT, TYPE_TEXT /*TYPE_TIME,*/, TYPE_TIMESTAMP, TYPE_TIMEUUID, TYPE_TINYINT, TYPE_UUID, TYPE_VARCHAR, TYPE_VARINT} - types = append(append([]SimpleType{}, pkTypes...), TYPE_BOOLEAN, TYPE_DURATION) + pkTypes = []SimpleType{TYPE_ASCII, TYPE_BIGINT, TYPE_BLOB, TYPE_DATE, TYPE_DECIMAL, TYPE_DOUBLE, TYPE_FLOAT, TYPE_INET, TYPE_INT, TYPE_SMALLINT, TYPE_TEXT /*TYPE_TIME,*/, TYPE_TIMESTAMP, TYPE_TIMEUUID, TYPE_TINYINT, TYPE_UUID, TYPE_VARCHAR, TYPE_VARINT} + types = append(append([]SimpleType{}, pkTypes...), TYPE_BOOLEAN, TYPE_DURATION) + compatibleColumnTypes = map[SimpleType][]SimpleType{ + TYPE_ASCII: { + TYPE_TEXT, + TYPE_BLOB, + }, + TYPE_BIGINT: { + TYPE_BLOB, + }, + TYPE_BOOLEAN: { + TYPE_BLOB, + }, + TYPE_DECIMAL: { + TYPE_BLOB, + }, + TYPE_FLOAT: { + TYPE_BLOB, + }, + TYPE_INET: { + TYPE_BLOB, + }, + TYPE_INT: { + TYPE_VARINT, + TYPE_BLOB, + }, + TYPE_TIMESTAMP: { + TYPE_BLOB, + }, + TYPE_TIMEUUID: { + TYPE_UUID, + TYPE_BLOB, + }, + TYPE_UUID: { + TYPE_BLOB, + }, + TYPE_VARCHAR: { + TYPE_TEXT, + TYPE_BLOB, + }, + TYPE_VARINT: { + TYPE_BLOB, + }, + } ) type SimpleType string diff --git a/types_test.go b/types_test.go index 05826f8c..aaa2be11 100644 --- a/types_test.go +++ b/types_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "gopkg.in/inf.v0" ) @@ -293,6 +294,11 @@ func TestMarshalUnmarshal(t *testing.T) { }, } + opts := cmp.Options{ + cmp.AllowUnexported(Table{}), + cmpopts.IgnoreUnexported(Table{}), + } + b, err := json.MarshalIndent(s1, " ", " ") if err != nil { t.Fatalf("unable to marshal json, error=%s\n", err) @@ -303,7 +309,7 @@ func TestMarshalUnmarshal(t *testing.T) { t.Fatalf("unable to unmarshal json, error=%s\n", err) } - if diff := cmp.Diff(s1, s2); diff != "" { + if diff := cmp.Diff(s1, s2, opts); diff != "" { t.Errorf("schema not the same after marshal/unmarshal, diff=%s", diff) } }