Skip to content

Commit

Permalink
schema: ddl jobs added
Browse files Browse the repository at this point in the history
Mutation jobs now also emit DDL jobs to allow testing with schema
changes. Currently only colum add and drop are supported due to
https://issues.apache.org/jira/browse/CASSANDRA-12443 and the
frequency of the schema changes are not configurable.

The synchronization of the internal Table struct is handled with
explicit locking which may not be ideal but a message passing
solution would require extensive refactoring.
  • Loading branch information
Henrik Johansson committed Jun 20, 2019
1 parent 53f824a commit dc1ec17
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- DDL statements are now emitted with low frequency if the `--cql-features` is set to at
least `"all"` level.
- Data sizes are configurable though a CLI argument `--dataset-size` and the currently
supported values are "small" and "large".
- CLI toggle `--cql-features` added to let the user select which type of CQL features
Expand Down
69 changes: 57 additions & 12 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func interactive() bool {
return !nonInteractive
}

type testJob func(context.Context, <-chan heartBeat, *sync.WaitGroup, *gemini.Schema, *gemini.Table, store.Store, gemini.PartitionRange, chan Status, string, *os.File, time.Duration)
type testJob func(context.Context, <-chan heartBeat, *sync.WaitGroup, *gemini.Schema, *gemini.SchemaConfig, *gemini.Table, store.Store, gemini.PartitionRange, chan Status, string, *os.File, time.Duration)

func (r *Status) Merge(sum *Status) Status {
sum.WriteOps += r.WriteOps
Expand Down Expand Up @@ -350,7 +350,7 @@ func runJob(f testJob, schema *gemini.Schema, schemaConfig *gemini.SchemaConfig,
MaxStringLength: schemaConfig.MaxStringLength,
MinStringLength: schemaConfig.MinStringLength,
}
go f(workerCtx, pump.ch, &workers, schema, table, s, p, c, mode, out, warmup)
go f(workerCtx, pump.ch, &workers, schema, schemaConfig, table, s, p, c, mode, out, warmup)
}
}

Expand Down Expand Up @@ -389,7 +389,48 @@ func sampleResults(p *Pump, c chan Status, sp *spinner.Spinner) Status {
return testRes
}

func mutationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File, deletes bool) {
func ddlJob(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status) {
if sc.CQLFeature != gemini.CQL_FEATURE_ALL {
if verbose {
fmt.Println("ddl statements disabled")
}
return
}
table.Lock()
defer table.Unlock()
ddlStmts, postStmtHook, err := schema.GenDDLStmt(table, &p, sc)
if err != nil {
fmt.Printf("Failed! Mutation statement generation failed: '%v'\n", err)
testStatus.WriteErrors++
return
}
defer postStmtHook()
defer func() {
if verbose {
jsonSchema, _ := json.MarshalIndent(schema, "", " ")
fmt.Printf("Schema: %v\n", string(jsonSchema))
}
}()
for _, ddlStmt := range ddlStmts {
ddlQuery := ddlStmt.Query
if verbose {
fmt.Println(ddlStmt.PrettyCQL())
}
if err := s.Mutate(ctx, ddlQuery); err != nil {
e := JobError{
Timestamp: time.Now(),
Message: "DDL failed: " + err.Error(),
Query: ddlStmt.PrettyCQL(),
}
testStatus.Errors = append(testStatus.Errors, e)
testStatus.WriteErrors++
} else {
testStatus.WriteOps++
}
}
}

func mutationJob(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File, deletes bool) {
mutateStmt, err := schema.GenMutateStmt(table, &p, deletes)
if err != nil {
fmt.Printf("Failed! Mutation statement generation failed: '%v'\n", err)
Expand All @@ -414,7 +455,7 @@ func mutationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table
}
}

func validationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) {
func validationJob(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) {
checkStmt := schema.GenCheckStmt(table, &p)
checkQuery := checkStmt.Query
checkValues := checkStmt.Values()
Expand Down Expand Up @@ -444,7 +485,7 @@ func (hb heartBeat) await() {
time.Sleep(hb.sleep)
}
}
func Job(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, c chan Status, mode string, out *os.File, warmup time.Duration) {
func Job(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaConfig *gemini.SchemaConfig, table *gemini.Table, s store.Store, p gemini.PartitionRange, c chan Status, mode string, out *os.File, warmup time.Duration) {
defer wg.Done()
testStatus := Status{}
var i int
Expand All @@ -457,7 +498,7 @@ warmup:
case <-warmupTimer.C:
break warmup
default:
mutationJob(ctx, schema, table, s, p, &testStatus, out, false)
mutationJob(ctx, schema, schemaConfig, table, s, p, &testStatus, out, false)
if i%1000 == 0 {
c <- testStatus
testStatus = Status{}
Expand All @@ -469,15 +510,19 @@ warmup:
hb.await()
switch mode {
case writeMode:
mutationJob(ctx, schema, table, s, p, &testStatus, out, true)
mutationJob(ctx, schema, schemaConfig, table, s, p, &testStatus, out, true)
case readMode:
validationJob(ctx, schema, table, s, p, &testStatus, out)
validationJob(ctx, schema, schemaConfig, table, s, p, &testStatus, out)
default:
ind := p.Rand.Intn(100000) % 2
if ind == 0 {
mutationJob(ctx, schema, table, s, p, &testStatus, out, true)
ind := p.Rand.Intn(1000000)
if ind%2 == 0 {
if ind%100000 == 0 {
ddlJob(ctx, schema, schemaConfig, table, s, p, &testStatus)
} else {
mutationJob(ctx, schema, schemaConfig, table, s, p, &testStatus, out, true)
}
} else {
validationJob(ctx, schema, table, s, p, &testStatus, out)
validationJob(ctx, schema, schemaConfig, table, s, p, &testStatus, out)
}
}

Expand Down
148 changes: 147 additions & 1 deletion schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package gemini

import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"

"github.com/pkg/errors"
"github.com/scylladb/gocqlx/qb"
)

Expand Down Expand Up @@ -109,9 +110,23 @@ type Table struct {
Indexes []IndexDef `json:"indexes"`
MaterializedViews []MaterializedView `json:"materialized_views"`
KnownIssues map[string]bool `json:"known_issues"`

// mu protects the table during schema changes
mu sync.RWMutex
}

func (t *Table) Lock() {
t.mu.Lock()
}

func (t *Table) Unlock() {
t.mu.Unlock()
}

func (t *Table) GetCreateTable(ks Keyspace) string {
t.mu.RLock()
defer t.mu.RUnlock()

var (
partitionKeys []string
clusteringKeys []string
Expand Down Expand Up @@ -143,6 +158,9 @@ func (t *Table) GetCreateTable(ks Keyspace) string {
}

func (t *Table) GetCreateTypes(keyspace Keyspace) []string {
t.mu.RLock()
defer t.mu.RUnlock()

var stmts []string
for _, column := range t.Columns {
switch c := column.Type.(type) {
Expand All @@ -158,6 +176,93 @@ func (t *Table) GetCreateTypes(keyspace Keyspace) []string {
return stmts
}

type AlterTableBuilder struct {
stmt string
}

func (atb *AlterTableBuilder) ToCql() (string, []string) {
return atb.stmt, nil
}

func (t *Table) addColumn(keyspace string, sc *SchemaConfig) ([]*Stmt, func(), error) {
var stmts []*Stmt
column := ColumnDef{Name: genColumnName("col", len(t.Columns)+1), Type: genColumnType(len(t.Columns)+1, sc)}
if c, ok := column.Type.(UDTType); ok {
createType := "CREATE TYPE %s.%s (%s);"
var typs []string
for name, typ := range c.Types {
typs = append(typs, name+" "+typ.CQLDef())
}
stmt := fmt.Sprintf(createType, keyspace, c.TypeName, strings.Join(typs, ","))
stmts = append(stmts, &Stmt{
Query: &AlterTableBuilder{
stmt: stmt,
},
Values: func() []interface{} {
return nil
},
})
}
stmt := "ALTER TABLE " + keyspace + "." + t.Name + " ADD " + column.Name + " " + column.Type.CQLDef()
stmts = append(stmts, &Stmt{
Query: &AlterTableBuilder{
stmt: stmt,
},
Values: func() []interface{} {
return nil
},
})
return stmts, func() {
t.Columns = append(t.Columns, column)
}, nil
}

func (t *Table) alterColumn(keyspace string) ([]*Stmt, func(), error) {
var stmts []*Stmt
idx := rand.Intn(len(t.Columns))
column := t.Columns[idx]
oldType, isSimpleType := column.Type.(SimpleType)
if !isSimpleType {
return nil, func() {}, errors.Errorf("complex type=%s cannot be altered", column.Name)
}
if compatTypes, ok := compatibleColumnTypes[oldType]; ok {
newType := compatTypes[rand.Intn(len(compatTypes))]
newColumn := ColumnDef{Name: column.Name, Type: newType}
stmt := "ALTER TABLE " + keyspace + "." + t.Name + " ALTER " + column.Name + " TYPE " + column.Type.CQLDef()
stmts = append(stmts, &Stmt{
Query: &AlterTableBuilder{
stmt: stmt,
},
Values: func() []interface{} {
return nil
},
})
fmt.Println(stmt)
return stmts, func() {
t.Columns[idx] = newColumn
}, nil
}
return nil, func() {}, errors.Errorf("simple type=%s has no compatible types so it cannot be altered", column.Name)
}

func (t *Table) dropColumn(keyspace string) ([]*Stmt, func(), error) {
var stmts []*Stmt
idx := rand.Intn(len(t.Columns))
column := t.Columns[idx]
stmt := "ALTER TABLE " + keyspace + "." + t.Name + " DROP " + column.Name
stmts = append(stmts, &Stmt{
Query: &AlterTableBuilder{
stmt: stmt,
},
Values: func() []interface{} {
return nil
},
})
return stmts, func() {
t.Columns = append(t.Columns[:idx], t.Columns[idx+1:]...)
}, nil
}

type MaterializedView struct {
Name string `json:"name"`
PartitionKeys Columns `json:"partition_keys"`
Expand All @@ -173,6 +278,9 @@ type Stmt struct {
func (s *Stmt) PrettyCQL() string {
var replaced int
query, _ := s.Query.ToCql()
if len(s.Values()) == 0 {
return query
}
values := s.Values()
for _, typ := range s.Types {
query, replaced = typ.CQLPretty(query, values)
Expand Down Expand Up @@ -376,6 +484,9 @@ func (s *Schema) GetCreateSchema() []string {
}

func (s *Schema) GenInsertStmt(t *Table, p *PartitionRange) (*Stmt, error) {
t.mu.RLock()
defer t.mu.RUnlock()

var (
typs []Type
)
Expand Down Expand Up @@ -411,6 +522,9 @@ func (s *Schema) GenInsertStmt(t *Table, p *PartitionRange) (*Stmt, error) {
}

func (s *Schema) GenInsertJsonStmt(t *Table, p *PartitionRange) (*Stmt, error) {
t.mu.RLock()
defer t.mu.RUnlock()

values := make(map[string]interface{})
values = t.PartitionKeys.ToJSONMap(values, p)
values = t.ClusteringKeys.ToJSONMap(values, p)
Expand All @@ -432,6 +546,9 @@ func (s *Schema) GenInsertJsonStmt(t *Table, p *PartitionRange) (*Stmt, error) {
}

func (s *Schema) GenDeleteRows(t *Table, p *PartitionRange) (*Stmt, error) {
t.mu.RLock()
defer t.mu.RUnlock()

var (
values []interface{}
typs []Type
Expand All @@ -457,7 +574,21 @@ func (s *Schema) GenDeleteRows(t *Table, p *PartitionRange) (*Stmt, error) {
}, nil
}

func (s *Schema) GenDDLStmt(t *Table, p *PartitionRange, sc *SchemaConfig) ([]*Stmt, func(), error) {
switch n := p.Rand.Intn(3); n {
//case 0: // Alter column not supported in Cassandra from 3.0.11
// return t.alterColumn(s.Keyspace.Name)
case 1: // Delete column
return t.dropColumn(s.Keyspace.Name)
default: // Alter column
return t.addColumn(s.Keyspace.Name, sc)
}
}

func (s *Schema) GenMutateStmt(t *Table, p *PartitionRange, deletes bool) (*Stmt, error) {
t.mu.RLock()
defer t.mu.RUnlock()

if !deletes {
return s.GenInsertStmt(t, p)
}
Expand Down Expand Up @@ -500,6 +631,9 @@ func (s *Schema) GenCheckStmt(t *Table, p *PartitionRange) *Stmt {
}

func (s *Schema) genSinglePartitionQuery(t *Table, p *PartitionRange) *Stmt {
t.mu.RLock()
defer t.mu.RUnlock()

tableName := t.Name
partitionKeys := t.PartitionKeys
if len(t.MaterializedViews) > 0 && p.Rand.Int()%2 == 0 {
Expand All @@ -525,6 +659,9 @@ func (s *Schema) genSinglePartitionQuery(t *Table, p *PartitionRange) *Stmt {
}

func (s *Schema) genMultiplePartitionQuery(t *Table, p *PartitionRange) *Stmt {
t.mu.RLock()
defer t.mu.RUnlock()

var (
values []interface{}
typs []Type
Expand Down Expand Up @@ -558,6 +695,9 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, p *PartitionRange) *Stmt {
}

func (s *Schema) genClusteringRangeQuery(t *Table, p *PartitionRange) *Stmt {
t.mu.RLock()
defer t.mu.RUnlock()

var (
values []interface{}
typs []Type
Expand Down Expand Up @@ -599,6 +739,9 @@ func (s *Schema) genClusteringRangeQuery(t *Table, p *PartitionRange) *Stmt {
}

func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, p *PartitionRange) *Stmt {
t.mu.RLock()
defer t.mu.RUnlock()

var (
values []interface{}
typs []Type
Expand Down Expand Up @@ -646,6 +789,9 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, p *Partition
}

func (s *Schema) genSingleIndexQuery(t *Table, p *PartitionRange) *Stmt {
t.mu.RLock()
defer t.mu.RUnlock()

var (
values []interface{}
typs []Type
Expand Down
Loading

0 comments on commit dc1ec17

Please sign in to comment.