Skip to content

Commit

Permalink
Merge pull request #2 from scylladb/mode_options
Browse files Browse the repository at this point in the history
read schema from json file
  • Loading branch information
larisau authored Oct 2, 2018
2 parents 45dc6ea + 5daad67 commit 7309af4
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 192 deletions.
168 changes: 99 additions & 69 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"

"github.com/scylladb/gemini"
Expand All @@ -19,6 +21,14 @@ var (
seed int
dropSchema bool
verbose bool
mode string
)

const (
confFile = "schema.json"
writeMode = "write"
readMode = "read"
mixedMode = "mixed"
)

type Status struct {
Expand All @@ -28,58 +38,72 @@ type Status struct {
ReadErrors int
}

func collectResults(one Status, sum Status) Status {
sum.WriteOps += one.WriteOps
sum.WriteErrors += one.WriteErrors
sum.ReadOps += one.ReadOps
sum.ReadErrors += one.ReadErrors
return sum
type Results interface {
Merge(*Status) Status
Print()
}

type jsonSchema struct {
Keyspace gemini.Keyspace `json:"keyspace"`
Tables []gemini.Table `json:"tables"`
}

func printResults(r Status) {
type testJob func(gemini.Schema, gemini.Table, *gemini.Session, gemini.PartitionRange, chan Status, string)

func (r *Status) Merge(sum *Status) Status {
sum.WriteOps += r.WriteOps
sum.WriteErrors += r.WriteErrors
sum.ReadOps += r.ReadOps
sum.ReadErrors += r.ReadErrors
return *sum
}

func (r *Status) Print() {
fmt.Println("Results:")
fmt.Printf("\twrite ops: %v\n", r.WriteOps)
fmt.Printf("\twrite errors: %v\n", r.WriteErrors)
fmt.Printf("\tread ops: %v\n", r.ReadOps)
fmt.Printf("\tread errors: %v\n", r.ReadErrors)
}

func createSchema() (gemini.Schema, error) {
byteValue, err := ioutil.ReadFile(confFile)
if err != nil {
return nil, err
}
fmt.Printf("Schema: %v", string(byteValue))

var shm jsonSchema

err = json.Unmarshal(byteValue, &shm)
if err != nil {
return nil, err
}

schemaBuilder := gemini.NewSchemaBuilder()
schemaBuilder.Keyspace(shm.Keyspace)
for _, tbl := range shm.Tables {
schemaBuilder.Table(tbl)
}
return schemaBuilder.Build(), nil
}

func run(cmd *cobra.Command, args []string) {
rand.Seed(int64(seed))
fmt.Printf("Seed: %d\n", seed)
fmt.Printf("Test cluster: %s\n", testClusterHost)
fmt.Printf("Oracle cluster: %s\n", oracleClusterHost)

schema, err := createSchema()
if err != nil {
fmt.Printf("cannot create schema: %v", err)
return
}

session := gemini.NewSession(testClusterHost, oracleClusterHost)
defer session.Close()

schemaBuilder := gemini.NewSchemaBuilder()
schemaBuilder.Keyspace(gemini.Keyspace{
Name: "gemini",
})
schemaBuilder.Table(gemini.Table{
Name: "data",
PartitionKeys: []gemini.ColumnDef{
{
Name: "pk",
Type: "int",
},
},
ClusteringKeys: []gemini.ColumnDef{
{
Name: "ck",
Type: "int",
},
},
Columns: []gemini.ColumnDef{
{
Name: "n",
Type: "blob",
},
},
})
schema := schemaBuilder.Build()
if dropSchema {
if dropSchema && mode != readMode {
for _, stmt := range schema.GetDropSchema() {
if verbose {
fmt.Println(stmt)
Expand All @@ -100,57 +124,62 @@ func run(cmd *cobra.Command, args []string) {
}
}

runJob(MixedJob, schema, session)
runJob(Job, schema, session, mode)
}

func runJob(f func(gemini.Schema, *gemini.Session, gemini.PartitionRange, chan Status), schema gemini.Schema, s *gemini.Session) {
testRes := Status{}
func runJob(f testJob, schema gemini.Schema, s *gemini.Session, mode string) {
c := make(chan Status)
minRange := 0
maxRange := pkNumberPerThread

for i := 0; i < threads; i++ {
p := gemini.PartitionRange{Min: minRange + i*maxRange, Max: maxRange + i*maxRange}
go f(schema, s, p, c)
for _, table := range schema.Tables() {
for i := 0; i < threads; i++ {
p := gemini.PartitionRange{Min: minRange + i*maxRange, Max: maxRange + i*maxRange}
go f(schema, table, s, p, c, mode)
}
}

for i := 0; i < threads; i++ {
var testRes Status
for i := 0; i < threads*len(schema.Tables()); i++ {
res := <-c
testRes = collectResults(res, testRes)
testRes = res.Merge(&testRes)
}

printResults(testRes)
testRes.Print()
}

func MixedJob(schema gemini.Schema, s *gemini.Session, p gemini.PartitionRange, c chan Status) {
func Job(schema gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, c chan Status, mode string) {
testStatus := Status{}

for i := 0; i < maxTests; i++ {
mutateStmt := schema.GenMutateStmt(&p)
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values()
if verbose {
fmt.Printf("%s (values=%v)\n", mutateQuery, mutateValues)
}
testStatus.WriteOps++
if err := s.Mutate(mutateQuery, mutateValues...); err != nil {
fmt.Printf("Failed! Mutation '%s' (values=%v) caused an error: '%v'\n", mutateQuery, mutateValues, err)
testStatus.WriteErrors++
}

checkStmt := schema.GenCheckStmt(&p)
checkQuery := checkStmt.Query
checkValues := checkStmt.Values()
if verbose {
fmt.Printf("%s (values=%v)\n", checkQuery, checkValues)
if mode == writeMode || mode == mixedMode {
mutateStmt := schema.GenMutateStmt(table, &p)
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values()
if verbose {
fmt.Printf("%s (values=%v)\n", mutateQuery, mutateValues)
}
testStatus.WriteOps++
if err := s.Mutate(mutateQuery, mutateValues...); err != nil {
fmt.Printf("Failed! Mutation '%s' (values=%v) caused an error: '%v'\n", mutateQuery, mutateValues, err)
testStatus.WriteErrors++
}
}
err := s.Check(checkQuery, checkValues...)
if err == nil {
testStatus.ReadOps++
} else {
if err != gemini.ErrReadNoDataReturned {
fmt.Printf("Failed! Check '%s' (values=%v)\n%s\n", checkQuery, checkValues, err)
testStatus.ReadErrors++
if mode == readMode || mode == mixedMode {
checkStmt := schema.GenCheckStmt(table, &p)
checkQuery := checkStmt.Query
checkValues := checkStmt.Values()
if verbose {
fmt.Printf("%s (values=%v)\n", checkQuery, checkValues)
}
err := s.Check(checkQuery, checkValues...)
if err == nil {
testStatus.ReadOps++
} else {
if err != gemini.ErrReadNoDataReturned {
fmt.Printf("Failed! Check '%s' (values=%v)\n%s\n", checkQuery, checkValues, err)
testStatus.ReadErrors++
}
}
}
}
Expand All @@ -172,7 +201,8 @@ func init() {
rootCmd.MarkFlagRequired("test-cluster")
rootCmd.Flags().StringVarP(&oracleClusterHost, "oracle-cluster", "o", "", "Host name of the oracle cluster that provides correct answers")
rootCmd.MarkFlagRequired("oracle-cluster")
rootCmd.Flags().IntVarP(&maxTests, "max-tests", "m", 100, "Maximum number of test iterations to run")
rootCmd.Flags().StringVarP(&mode, "mode", "m", "mixed", "Query operation mode. Mode options: write, read, mixed (default)")
rootCmd.Flags().IntVarP(&maxTests, "max-tests", "n", 100, "Maximum number of test iterations to run")
rootCmd.Flags().IntVarP(&threads, "threads", "c", 10, "Number of threads to run concurrently")
rootCmd.Flags().IntVarP(&pkNumberPerThread, "max-pk-per-thread", "p", 50, "Maximum number of partition keys per thread")
rootCmd.Flags().IntVarP(&seed, "seed", "s", 1, "PRNG seed value")
Expand Down
77 changes: 77 additions & 0 deletions cmd/gemini/schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"keyspace": {
"name": "gemini"
},
"tables": [
{
"name": "data1",
"partition_keys": [
{
"name": "pk",
"type": "int"
}
],
"clustering_keys": [
{
"name": "ck",
"type": "int"
}
],
"columns": [
{
"name": "n",
"type": "uuid"
},
{
"name": "t",
"type": "timestamp"
}
]
},
{
"name": "data2",
"partition_keys": [
{
"name": "pk1",
"type": "int"
},
{
"name": "pk2",
"type": "int"
}
],
"clustering_keys": [
{
"name": "ck1",
"type": "int"
},
{
"name": "ck2",
"type": "int"
}
],
"columns": [
{
"name": "a",
"type": "bigint"
},
{
"name": "b",
"type": "blob"
},
{
"name": "c",
"type": "text"
},
{
"name": "d",
"type": "date"
},
{
"name": "e",
"type": "varchar"
}
]
}
]
}
Loading

0 comments on commit 7309af4

Please sign in to comment.