Skip to content

Commit

Permalink
Merge 'Schema generation support' from Pekka
Browse files Browse the repository at this point in the history
"This pull request implement schema generation in the Gemini tool. Previously,
schema was configured via an external file, schema.json. This adds a --schema
command line option, which users can use for the old behavior. If no schema
file is configured, the tool generates one.

Fixes #9."

Reviewed-by: Henrik Johansson <[email protected]>

* origin/schema-gen:
  schema: Separate value range generation to genValueRange
  schema: Rename generateValue to genValue
  schema: Schema generation
  schema: Fix error handling in generateValue
  • Loading branch information
penberg committed Mar 6, 2019
2 parents 6b62ff1 + c458b61 commit c6b421f
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 76 deletions.
44 changes: 24 additions & 20 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
var (
testClusterHost string
oracleClusterHost string
schemaFile string
maxTests int
concurrency int
pkNumberPerThread int
Expand All @@ -27,7 +28,6 @@ var (
)

const (
confFile = "schema.json"
writeMode = "write"
readMode = "read"
mixedMode = "mixed"
Expand All @@ -45,12 +45,7 @@ type Results interface {
Print()
}

type jsonSchema struct {
Keyspace gemini.Keyspace `json:"keyspace"`
Tables []gemini.Table `json:"tables"`
}

type testJob func(gemini.Schema, gemini.Table, *gemini.Session, gemini.PartitionRange, chan Status, string)
type testJob func(*gemini.Schema, gemini.Table, *gemini.Session, gemini.PartitionRange, chan Status, string)

func (r *Status) Merge(sum *Status) Status {
sum.WriteOps += r.WriteOps
Expand All @@ -72,14 +67,13 @@ func (r Status) String() string {
return fmt.Sprintf("write ops: %v | read ops: %v | write errors: %v | read errors: %v", r.WriteOps, r.ReadOps, r.WriteErrors, r.ReadErrors)
}

func createSchema() (gemini.Schema, error) {
func readSchema(confFile string) (*gemini.Schema, error) {
byteValue, err := ioutil.ReadFile(confFile)
if err != nil {
return nil, err
}
fmt.Printf("Schema: %v", string(byteValue))

var shm jsonSchema
var shm gemini.Schema

err = json.Unmarshal(byteValue, &shm)
if err != nil {
Expand All @@ -103,12 +97,21 @@ func run(cmd *cobra.Command, args []string) {
fmt.Printf("Test cluster: %s\n", testClusterHost)
fmt.Printf("Oracle cluster: %s\n", oracleClusterHost)

schema, err := createSchema()
if err != nil {
fmt.Printf("cannot create schema: %v", err)
return
var schema *gemini.Schema
if len(schemaFile) > 0 {
var err error
schema, err = readSchema(schemaFile)
if err != nil {
fmt.Printf("cannot create schema: %v", err)
return
}
} else {
schema = gemini.GenSchema()
}

jsonSchema, _ := json.MarshalIndent(schema, "", " ")
fmt.Printf("Schema: %v\n", string(jsonSchema))

session := gemini.NewSession(testClusterHost, oracleClusterHost)
defer session.Close()

Expand Down Expand Up @@ -136,20 +139,20 @@ func run(cmd *cobra.Command, args []string) {
runJob(Job, schema, session, mode)
}

func runJob(f testJob, schema gemini.Schema, s *gemini.Session, mode string) {
func runJob(f testJob, schema *gemini.Schema, s *gemini.Session, mode string) {
c := make(chan Status)
minRange := 0
maxRange := pkNumberPerThread

for _, table := range schema.Tables() {
for _, table := range schema.Tables {
for i := 0; i < concurrency; i++ {
p := gemini.PartitionRange{Min: minRange + i*maxRange, Max: maxRange + i*maxRange}
go f(schema, table, s, p, c, mode)
}
}

var testRes Status
for i := 0; i < concurrency*len(schema.Tables()); i++ {
for i := 0; i < concurrency*len(schema.Tables); i++ {
res := <-c
testRes = res.Merge(&testRes)
if testRes.ReadErrors > 0 {
Expand All @@ -162,7 +165,7 @@ func runJob(f testJob, schema gemini.Schema, s *gemini.Session, mode string) {
testRes.PrintResult()
}

func mutationJob(schema gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status) {
func mutationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status) {
mutateStmt := schema.GenMutateStmt(table, &p)
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values()
Expand All @@ -176,7 +179,7 @@ func mutationJob(schema gemini.Schema, table gemini.Table, s *gemini.Session, p
}
}

func validationJob(schema gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status) {
func validationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status) {
checkStmt := schema.GenCheckStmt(table, &p)
checkQuery := checkStmt.Query
checkValues := checkStmt.Values()
Expand All @@ -194,7 +197,7 @@ func validationJob(schema gemini.Schema, table gemini.Table, s *gemini.Session,
}
}

func Job(schema gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, c chan Status, mode string) {
func Job(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, c chan Status, mode string) {
testStatus := Status{}

for i := 0; i < maxTests; i++ {
Expand Down Expand Up @@ -240,6 +243,7 @@ func init() {
rootCmd.MarkFlagRequired("test-cluster")
rootCmd.Flags().StringVarP(&oracleClusterHost, "oracle-cluster", "o", "", "Host name of the oracle cluster that provides correct answers")
rootCmd.MarkFlagRequired("oracle-cluster")
rootCmd.Flags().StringVarP(&schemaFile, "schema", "", "", "Schema JSON config file")
rootCmd.Flags().StringVarP(&mode, "mode", "m", mixedMode, "Query operation mode. Mode options: write, read, mixed (default)")
rootCmd.Flags().IntVarP(&maxTests, "max-tests", "n", 100, "Maximum number of test iterations to run")
rootCmd.Flags().IntVarP(&concurrency, "concurrency", "c", 10, "Number of threads per table to run concurrently")
Expand Down
Loading

0 comments on commit c6b421f

Please sign in to comment.