Skip to content

Commit

Permalink
concurrency and results collection
Browse files Browse the repository at this point in the history
  • Loading branch information
larisau committed Aug 20, 2018
1 parent d7a2d90 commit f5b638f
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 47 deletions.
87 changes: 72 additions & 15 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,46 @@ package main

import (
"fmt"
"math/rand"

"github.com/scylladb/gemini"
"github.com/spf13/cobra"
"math/rand"
)

var (
testClusterHost string
oracleClusterHost string
maxTests int
threads int
pkNumberPerThread int
seed int
dropSchema bool
verbose bool
)

type Status struct {
WriteOps int
WriteErrors int
ReadOps int
ReadErrors int
}

func collectResults(one Status, sum Status) Status {
sum.WriteOps += one.WriteOps
sum.WriteErrors += one.WriteErrors
sum.ReadOps += one.ReadOps
sum.ReadErrors += one.ReadErrors
return sum
}

func printResults(r Status) {
fmt.Println("Results:")
fmt.Printf("\twrite ops: %v\n", r.WriteOps)
fmt.Printf("\twrite errors: %v\n", r.WriteErrors)
fmt.Printf("\tread ops: %v\n", r.ReadOps)
fmt.Printf("\tread errors: %v\n", r.ReadErrors)
}

func run(cmd *cobra.Command, args []string) {
rand.Seed(int64(seed))
fmt.Printf("Seed: %d\n", seed)
Expand All @@ -34,21 +60,21 @@ func run(cmd *cobra.Command, args []string) {
schemaBuilder.Table(gemini.Table{
Name: "data",
PartitionKeys: []gemini.ColumnDef{
gemini.ColumnDef{
{
Name: "pk",
Type: "int",
},
},
ClusteringKeys: []gemini.ColumnDef{
gemini.ColumnDef{
{
Name: "ck",
Type: "int",
},
},
Columns: []gemini.ColumnDef{
gemini.ColumnDef{
{
Name: "n",
Type: "int",
Type: "blob",
},
},
})
Expand All @@ -74,33 +100,62 @@ func run(cmd *cobra.Command, args []string) {
}
}

nrPassedTests := 0
runJob(MixedJob, schema, session)
}

func runJob(f func(gemini.Schema, *gemini.Session, gemini.PartitionRange, chan Status), schema gemini.Schema, s *gemini.Session) {
testRes := Status{}
c := make(chan Status)
minRange := 0
maxRange := pkNumberPerThread

for i := 0; i < threads; i++ {
p := gemini.PartitionRange{Min: minRange + i*maxRange, Max: maxRange + i*maxRange}
go f(schema, s, p, c)
}

for i := 0; i < threads; i++ {
res := <-c
testRes = collectResults(res, testRes)
}

printResults(testRes)
}

func MixedJob(schema gemini.Schema, s *gemini.Session, p gemini.PartitionRange, c chan Status) {
testStatus := Status{}

for i := 0; i < maxTests; i++ {
mutateStmt := schema.GenMutateStmt()
mutateStmt := schema.GenMutateStmt(&p)
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values()
if verbose {
fmt.Printf("%s (values=%v)\n", mutateQuery, mutateValues)
}
if err := session.Mutate(mutateQuery, mutateValues...); err != nil {
testStatus.WriteOps++
if err := s.Mutate(mutateQuery, mutateValues...); err != nil {
fmt.Printf("Failed! Mutation '%s' (values=%v) caused an error: '%v'\n", mutateQuery, mutateValues, err)
return
testStatus.WriteErrors++
}

checkStmt := schema.GenCheckStmt()
checkStmt := schema.GenCheckStmt(&p)
checkQuery := checkStmt.Query
checkValues := checkStmt.Values()
if verbose {
fmt.Printf("%s (values=%v)\n", checkQuery, checkValues)
}
if diff := session.Check(checkQuery, checkValues...); diff != "" {
fmt.Printf("Failed! Check '%s' (values=%v) rows differ (-oracle +test)\n%s", checkQuery, checkValues, diff)
return
err := s.Check(checkQuery, checkValues...)
if err == nil {
testStatus.ReadOps++
} else {
if err != gemini.ErrReadNoDataReturned {
fmt.Printf("Failed! Check '%s' (values=%v)\n%s\n", checkQuery, checkValues, err)
testStatus.ReadErrors++
}
}
nrPassedTests++
}
fmt.Printf("OK, passed %d tests.\n", nrPassedTests)

c <- testStatus
}

var rootCmd = &cobra.Command{
Expand All @@ -118,6 +173,8 @@ func init() {
rootCmd.Flags().StringVarP(&oracleClusterHost, "oracle-cluster", "o", "", "Host name of the oracle cluster that provides correct answers")
rootCmd.MarkFlagRequired("oracle-cluster")
rootCmd.Flags().IntVarP(&maxTests, "max-tests", "m", 100, "Maximum number of test iterations to run")
rootCmd.Flags().IntVarP(&threads, "threads", "c", 10, "Number of threads to run concurrently")
rootCmd.Flags().IntVarP(&pkNumberPerThread, "max-pk-per-thread", "p", 50, "Maximum number of partition keys per thread")
rootCmd.Flags().IntVarP(&seed, "seed", "s", 1, "PRNG seed value")
rootCmd.Flags().BoolVarP(&dropSchema, "drop-schema", "d", false, "Drop schema before starting tests run")
rootCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output during test run")
Expand Down
64 changes: 38 additions & 26 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"math/rand"
"strings"

"github.com/google/uuid"
)

type Keyspace struct {
Expand All @@ -25,8 +27,8 @@ type Table struct {
type Schema interface {
GetDropSchema() []string
GetCreateSchema() []string
GenMutateStmt() *Stmt
GenCheckStmt() *Stmt
GenMutateStmt(*PartitionRange) *Stmt
GenCheckStmt(*PartitionRange) *Stmt
}

type Stmt struct {
Expand All @@ -39,6 +41,15 @@ type schema struct {
table Table
}

type PartitionRange struct {
Min int `default:0`
Max int `default:100`
}

func randRange(min int, max int) int {
return rand.Intn(max-min) + min
}

func (s *schema) GetDropSchema() []string {
return []string{
fmt.Sprintf("DROP KEYSPACE IF EXISTS %s", s.keyspace.Name),
Expand Down Expand Up @@ -68,7 +79,7 @@ func (s *schema) GetCreateSchema() []string {
}
}

func (s *schema) GenMutateStmt() *Stmt {
func (s *schema) GenMutateStmt(p *PartitionRange) *Stmt {
columns := []string{}
values := []string{}
for _, pk := range s.table.PartitionKeys {
Expand All @@ -89,34 +100,35 @@ func (s *schema) GenMutateStmt() *Stmt {
Values: func() []interface{} {
values := make([]interface{}, 0)
for _, _ = range s.table.PartitionKeys {
values = append(values, rand.Intn(100))
values = append(values, randRange(p.Min, p.Max))
}
for _, _ = range s.table.ClusteringKeys {
values = append(values, rand.Intn(100))
values = append(values, randRange(p.Min, p.Max))
}
for _, _ = range s.table.Columns {
values = append(values, rand.Intn(100))
r, _ := uuid.NewRandom()
values = append(values, r.String())
}
return values
},
}
}

func (s *schema) GenCheckStmt() *Stmt {
func (s *schema) GenCheckStmt(p *PartitionRange) *Stmt {
switch n := rand.Intn(4); n {
case 0:
return s.genSinglePartitionQuery()
return s.genSinglePartitionQuery(p)
case 1:
return s.genMultiplePartitionQuery()
return s.genMultiplePartitionQuery(p)
case 2:
return s.genClusteringRangeQuery()
return s.genClusteringRangeQuery(p)
case 3:
return s.genClusteringRangeQueryComplex()
return s.genClusteringRangeQueryComplex(p)
}
return nil
}

func (s *schema) genSinglePartitionQuery() *Stmt {
func (s *schema) genSinglePartitionQuery(p *PartitionRange) *Stmt {
relations := []string{}
for _, pk := range s.table.PartitionKeys {
relations = append(relations, fmt.Sprintf("%s = ?", pk.Name))
Expand All @@ -125,7 +137,7 @@ func (s *schema) genSinglePartitionQuery() *Stmt {
values := func() []interface{} {
values := make([]interface{}, 0)
for _, _ = range s.table.PartitionKeys {
values = append(values, rand.Intn(100))
values = append(values, randRange(p.Min, p.Max))
}
return values
}
Expand All @@ -135,7 +147,7 @@ func (s *schema) genSinglePartitionQuery() *Stmt {
}
}

func (s *schema) genMultiplePartitionQuery() *Stmt {
func (s *schema) genMultiplePartitionQuery(p *PartitionRange) *Stmt {
relations := []string{}
for _, pk := range s.table.PartitionKeys {
relations = append(relations, fmt.Sprintf("%s IN (?)", pk.Name))
Expand All @@ -146,7 +158,7 @@ func (s *schema) genMultiplePartitionQuery() *Stmt {
for _, _ = range s.table.PartitionKeys {
keys := []int{}
for i := 0; i < rand.Intn(10); i++ {
keys = append(keys, rand.Intn(100))
keys = append(keys, randRange(p.Min, p.Max))
}
values = append(values, keys)
}
Expand All @@ -158,7 +170,7 @@ func (s *schema) genMultiplePartitionQuery() *Stmt {
}
}

func (s *schema) genClusteringRangeQuery() *Stmt {
func (s *schema) genClusteringRangeQuery(p *PartitionRange) *Stmt {
relations := []string{}
for _, pk := range s.table.PartitionKeys {
relations = append(relations, fmt.Sprintf("%s = ?", pk.Name))
Expand All @@ -170,11 +182,11 @@ func (s *schema) genClusteringRangeQuery() *Stmt {
values := func() []interface{} {
values := make([]interface{}, 0)
for _, _ = range s.table.PartitionKeys {
values = append(values, rand.Intn(100))
values = append(values, randRange(p.Min, p.Max))
}
for _, _ = range s.table.ClusteringKeys {
start := rand.Intn(100)
end := start + rand.Intn(100)
start := randRange(p.Min, p.Max)
end := start + randRange(p.Min, p.Max)
values = append(values, start)
values = append(values, end)
}
Expand All @@ -186,27 +198,27 @@ func (s *schema) genClusteringRangeQuery() *Stmt {
}
}

func (s *schema) genClusteringRangeQueryComplex() *Stmt {
func (s *schema) genClusteringRangeQueryComplex(p *PartitionRange) *Stmt {
relations := []string{}
for _, pk := range s.table.PartitionKeys {
relations = append(relations, fmt.Sprintf("%s = ?", pk.Name))
}
for _, ck := range s.table.ClusteringKeys {
relations = append(relations, fmt.Sprintf("%s > ? AND %s < ? AND %s > ? and %s < ?", ck.Name, ck.Name, ck.Name, ck.Name))
relations = append(relations, fmt.Sprintf("%s > ? AND %s < ? OR %s > ? AND %s < ?", ck.Name, ck.Name, ck.Name, ck.Name))
}
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.keyspace.Name, s.table.Name, strings.Join(relations, " AND "))
values := func() []interface{} {
values := make([]interface{}, 0)
for _, _ = range s.table.PartitionKeys {
values = append(values, rand.Intn(100))
values = append(values, randRange(p.Min, p.Max))
}
for _, _ = range s.table.ClusteringKeys {
start := rand.Intn(100)
end := start + rand.Intn(100)
start := randRange(p.Min, p.Max)
end := start + randRange(p.Min, p.Max)
values = append(values, start)
values = append(values, end)
start = rand.Intn(100)
end = start + rand.Intn(100)
start = randRange(p.Min, p.Max)
end = start + randRange(p.Min, p.Max)
values = append(values, start)
values = append(values, end)
}
Expand Down
18 changes: 12 additions & 6 deletions session.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package gemini

import (
"errors"
"fmt"
"time"

"github.com/gocql/gocql"
"github.com/google/go-cmp/cmp"

"fmt"
)

type Session struct {
testSession *gocql.Session
oracleSession *gocql.Session
}

var (
ErrReadNoDataReturned = errors.New("read: no data returned")
)

func NewSession(testClusterHost string, oracleClusterHost string) *Session {
testCluster := gocql.NewCluster(testClusterHost)
testCluster.Timeout = 5 * time.Second
Expand Down Expand Up @@ -50,7 +54,7 @@ func (s *Session) Mutate(query string, values ...interface{}) error {
return nil
}

func (s *Session) Check(query string, values ...interface{}) string {
func (s *Session) Check(query string, values ...interface{}) error {
testIter := s.testSession.Query(query, values...).Iter()
oracleIter := s.oracleSession.Query(query, values...).Iter()
for {
Expand All @@ -62,9 +66,11 @@ func (s *Session) Check(query string, values ...interface{}) string {
if !oracleIter.MapScan(oracleRow) {
break
}
if diff := cmp.Diff(oracleRow, testRow); diff != "" {
return diff
diff := cmp.Diff(oracleRow, testRow)
if diff != "" {
return fmt.Errorf("rows differ (-%v +%v): %v", oracleRow, testRow, diff)
}
return nil
}
return ""
return ErrReadNoDataReturned
}

0 comments on commit f5b638f

Please sign in to comment.