Skip to content

Commit

Permalink
Merge pull request #80 from scylladb/executable_printed_statements
Browse files Browse the repository at this point in the history
gemini: pretty print executable CQL queries
  • Loading branch information
Henrik Johansson authored May 2, 2019
2 parents 4ebf960 + dde7562 commit 68ddb47
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Unreleased

- Printing executable CQL statements when logging errors or in verbose mode.
- JSON schema definition file has simpler index definition.

## [0.9.2] - 2019-04-18
Expand Down
27 changes: 18 additions & 9 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ const (
)

type Status struct {
WriteOps int `json:"write_ops"`
WriteErrors int `json:"write_errors"`
ReadOps int `json:"read_ops"`
ReadErrors int `json:"read_errors"`
Errors []error `json:"errors"`
WriteOps int `json:"write_ops"`
WriteErrors int `json:"write_errors"`
ReadOps int `json:"read_ops"`
ReadErrors int `json:"read_errors"`
Errors []gemini.JobError `json:"errors,omitempty"`
}

type Results interface {
Expand Down Expand Up @@ -237,6 +237,7 @@ func runJob(f testJob, schema *gemini.Schema, s *gemini.Session, mode string, ou
}
if testRes.ReadErrors > 0 {
testRes.PrintResult(out)
fmt.Println(testRes.Errors)
if failFast {
fmt.Println("Error in data validation. Exiting.")
cancelWorkers()
Expand All @@ -262,11 +263,15 @@ func mutationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values()
if verbose {
fmt.Printf("%s (values=%v)\n", mutateQuery, mutateValues)
fmt.Println(mutateStmt.PrettyCQL())
}
testStatus.WriteOps++
if err := s.Mutate(mutateQuery, mutateValues...); err != nil {
testStatus.Errors = append(testStatus.Errors, errors.Wrapf(err, "Failed! Mutation '%s' (values=%v) caused an error: '%v'\n", mutateQuery, mutateValues))
e := gemini.JobError{
Message: "Mutation failed: " + err.Error(),
Query: mutateStmt.PrettyCQL(),
}
testStatus.Errors = append(testStatus.Errors, e)
testStatus.WriteErrors++
}
}
Expand All @@ -276,15 +281,19 @@ func validationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session,
checkQuery := checkStmt.Query
checkValues := checkStmt.Values()
if verbose {
fmt.Printf("%s (values=%v)\n", checkQuery, checkValues)
fmt.Println(checkStmt.PrettyCQL())
}
err := s.Check(table, checkQuery, checkValues...)
if err == nil {
testStatus.ReadOps++
} else {
if err != gemini.ErrReadNoDataReturned {
// De-duplication needed?
testStatus.Errors = append(testStatus.Errors, errors.Wrapf(err, "Failed! Check '%s' (values=%v)\n%s\n", checkQuery, checkValues))
e := gemini.JobError{
Message: "Validation failed: " + err.Error(),
Query: checkStmt.PrettyCQL(),
}
testStatus.Errors = append(testStatus.Errors, e)
testStatus.ReadErrors++
}
}
Expand Down
51 changes: 49 additions & 2 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Type interface {
Name() string
CQLDef() string
CQLHolder() string
CQLPretty(string, []interface{}) (string, int)
GenValue(*PartitionRange) []interface{}
GenValueRange(p *PartitionRange) ([]interface{}, []interface{})
Indexable() bool
Expand Down Expand Up @@ -79,6 +80,22 @@ type Table struct {
type Stmt struct {
Query string
Values func() []interface{}
Types []Type
}

func (s *Stmt) PrettyCQL() string {
var replaced int
query := s.Query
values := s.Values()
for _, typ := range s.Types {
query, replaced = typ.CQLPretty(query, values)
if len(values) >= replaced {
values = values[replaced:]
} else {
break
}
}
return query
}

type Schema struct {
Expand Down Expand Up @@ -200,29 +217,34 @@ func (s *Schema) GenInsertStmt(t Table, p *PartitionRange) (*Stmt, error) {
var (
columns []string
placeholders []string
typs []Type
)
values := make([]interface{}, 0)
for _, pk := range t.PartitionKeys {
columns = append(columns, pk.Name)
placeholders = append(placeholders, pk.Type.CQLHolder())
values = appendValue(pk.Type, p, values)
typs = append(typs, pk.Type)
}
for _, ck := range t.ClusteringKeys {
columns = append(columns, ck.Name)
placeholders = append(placeholders, ck.Type.CQLHolder())
values = appendValue(ck.Type, p, values)
typs = append(typs, ck.Type)
}
for _, cdef := range t.Columns {
columns = append(columns, cdef.Name)
placeholders = append(placeholders, cdef.Type.CQLHolder())
values = appendValue(cdef.Type, p, values)
typs = append(typs, cdef.Type)
}
query := fmt.Sprintf("INSERT INTO %s.%s (%s) VALUES (%s)", s.Keyspace.Name, t.Name, strings.Join(columns, ","), strings.Join(placeholders, ","))
return &Stmt{
Query: query,
Values: func() []interface{} {
return values
},
Types: typs,
}, nil
}

Expand All @@ -242,22 +264,26 @@ func (s *Schema) GenInsertJsonStmt(t Table, p *PartitionRange) (*Stmt, error) {
Values: func() []interface{} {
return []interface{}{string(jsonString)}
},
Types: []Type{TYPE_TEXT},
}, nil
}

func (s *Schema) GenDeleteRows(t Table, p *PartitionRange) (*Stmt, error) {
var (
relations []string
values []interface{}
typs []Type
)
for _, pk := range t.PartitionKeys {
relations = append(relations, fmt.Sprintf("%s = ?", pk.Name))
values = appendValue(pk.Type, p, values)
typs = append(typs, pk.Type)
}
if len(t.ClusteringKeys) == 1 {
for _, ck := range t.ClusteringKeys {
relations = append(relations, fmt.Sprintf("%s >= ? AND %s <= ?", ck.Name, ck.Name))
values = appendValueRange(ck.Type, p, values)
typs = append(typs, ck.Type, ck.Type)
}
}
query := fmt.Sprintf("DELETE FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND "))
Expand All @@ -266,6 +292,7 @@ func (s *Schema) GenDeleteRows(t Table, p *PartitionRange) (*Stmt, error) {
Values: func() []interface{} {
return values
},
Types: typs,
}, nil
}

Expand Down Expand Up @@ -311,23 +338,27 @@ func (s *Schema) GenCheckStmt(t Table, p *PartitionRange) *Stmt {
func (s *Schema) genSinglePartitionQuery(t Table, p *PartitionRange) *Stmt {
var relations []string
values := make([]interface{}, 0)
typs := make([]Type, 0, 10)
for _, pk := range t.PartitionKeys {
relations = append(relations, fmt.Sprintf("%s = ?", pk.Name))
values = appendValue(pk.Type, p, values)
typs = append(typs, pk.Type)
}
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND "))
return &Stmt{
Query: query,
Values: func() []interface{} {
return values
},
Types: typs,
}
}

func (s *Schema) genMultiplePartitionQuery(t Table, p *PartitionRange) *Stmt {
var (
relations []string
values []interface{}
typs []Type
)
pkNum := p.Rand.Intn(len(t.PartitionKeys))
if pkNum == 0 {
Expand All @@ -337,6 +368,7 @@ func (s *Schema) genMultiplePartitionQuery(t Table, p *PartitionRange) *Stmt {
relations = append(relations, fmt.Sprintf("%s IN (%s)", pk.Name, strings.TrimRight(strings.Repeat("?,", pkNum), ",")))
for i := 0; i < pkNum; i++ {
values = appendValue(pk.Type, p, values)
typs = append(typs, pk.Type)
}
}
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND "))
Expand All @@ -345,41 +377,48 @@ func (s *Schema) genMultiplePartitionQuery(t Table, p *PartitionRange) *Stmt {
Values: func() []interface{} {
return values
},
Types: typs,
}
}

func (s *Schema) genClusteringRangeQuery(t Table, p *PartitionRange) *Stmt {
var (
relations []string
values []interface{}
typs []Type
)
for _, pk := range t.PartitionKeys {
relations = append(relations, fmt.Sprintf("%s = ?", pk.Name))
values = appendValue(pk.Type, p, values)
typs = append(typs, pk.Type)
}
maxClusteringRels := 0
if len(t.ClusteringKeys) > 1 {
maxClusteringRels = p.Rand.Intn(len(t.ClusteringKeys) - 1)
for i := 0; i < maxClusteringRels; i++ {
relations = append(relations, fmt.Sprintf("%s = ?", t.ClusteringKeys[i].Name))
values = appendValue(t.ClusteringKeys[i].Type, p, values)
typs = append(typs, t.ClusteringKeys[i].Type)
}
}
relations = append(relations, fmt.Sprintf("%s > ? AND %s < ?", t.ClusteringKeys[maxClusteringRels].Name, t.ClusteringKeys[maxClusteringRels].Name))
values = appendValueRange(t.ClusteringKeys[maxClusteringRels].Type, p, values)
typs = append(typs, t.ClusteringKeys[maxClusteringRels].Type, t.ClusteringKeys[maxClusteringRels].Type)
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND "))
return &Stmt{
Query: query,
Values: func() []interface{} {
return values
},
Types: typs,
}
}

func (s *Schema) genMultiplePartitionClusteringRangeQuery(t Table, p *PartitionRange) *Stmt {
var (
relations []string
values []interface{}
typs []Type
)
pkNum := p.Rand.Intn(len(t.PartitionKeys))
if pkNum == 0 {
Expand All @@ -389,6 +428,7 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t Table, p *PartitionR
relations = append(relations, fmt.Sprintf("%s IN (%s)", pk.Name, strings.TrimRight(strings.Repeat("?,", pkNum), ",")))
for i := 0; i < pkNum; i++ {
values = appendValue(pk.Type, p, values)
typs = append(typs, pk.Type)
}
}
maxClusteringRels := 0
Expand All @@ -397,23 +437,27 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t Table, p *PartitionR
for i := 0; i < maxClusteringRels; i++ {
relations = append(relations, fmt.Sprintf("%s = ?", t.ClusteringKeys[i].Name))
values = appendValue(t.ClusteringKeys[i].Type, p, values)
typs = append(typs, t.ClusteringKeys[i].Type)
}
}
relations = append(relations, fmt.Sprintf("%s > ? AND %s < ?", t.ClusteringKeys[maxClusteringRels].Name, t.ClusteringKeys[maxClusteringRels].Name))
values = appendValueRange(t.ClusteringKeys[maxClusteringRels].Type, p, values)
typs = append(typs, t.ClusteringKeys[maxClusteringRels].Type, t.ClusteringKeys[maxClusteringRels].Type)
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.Keyspace.Name, t.Name, strings.Join(relations, " AND "))
return &Stmt{
Query: query,
Values: func() []interface{} {
return values
},
Types: typs,
}
}

func (s *Schema) genSingleIndexQuery(t Table, p *PartitionRange) *Stmt {
var (
relations []string
values []interface{}
typs []Type
)

if len(t.Indexes) == 0 {
Expand All @@ -424,19 +468,22 @@ func (s *Schema) genSingleIndexQuery(t Table, p *PartitionRange) *Stmt {
pkNum = 1
}
for _, pk := range t.PartitionKeys {
relations = append(relations, fmt.Sprintf("%s IN (%s)", pk.Name, strings.TrimRight(strings.Repeat("?,", pkNum), ",")))
for i := 0; i < pkNum; i++ {
relations = append(relations, fmt.Sprintf("%s = ?", pk.Name))
values = appendValue(pk.Type, p, values)
typs = append(typs, pk.Type)
}
}
idx := p.Rand.Intn(len(t.Indexes))
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s AND %s=?", s.Keyspace.Name, t.Name, strings.Join(relations, " AND "), t.Indexes[idx].Column)
values = appendValue(t.Columns[idx].Type, p, nil)
values = appendValue(t.Columns[idx].Type, p, values)
typs = append(typs, t.Columns[idx].Type)
return &Stmt{
Query: query,
Values: func() []interface{} {
return values
},
Types: typs,
}
}

Expand Down
5 changes: 5 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ var (
ErrReadNoDataReturned = errors.New("read: no data returned")
)

type JobError struct {
Message string `json:"message"`
Query string `json:"query"`
}

func NewSession(testClusterHost string, oracleClusterHost string) *Session {
testCluster := gocql.NewCluster(testClusterHost)
testCluster.Timeout = 5 * time.Second
Expand Down
Loading

0 comments on commit 68ddb47

Please sign in to comment.