Skip to content

Commit

Permalink
gemini: separate store interface
Browse files Browse the repository at this point in the history
The session is removed and a store interface is introduced.
This store interface is small and isolated and allows for the
caller to generate arbitrary queries using the builder.

The store encapsulates the two internal stores and performs
checks using data from these two instances.

Currently the internal stores are both cql based but any
backing storage can be implemented without affecting the
surrounding calling code.
  • Loading branch information
Henrik Johansson committed May 16, 2019
1 parent 2feef06 commit 959c209
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 198 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- A new Store abstraction is introduced in preparation to enable
implementations such as an in-memory store.
- Gemini now uses github.com/scylladb/gocqlx/qb builder.
- Gemini ensures that primary key buckets do not overflow int32.
- Gemini now accepts a list of node host names or IPs for the test
and Oracle clusters.
Expand Down
41 changes: 24 additions & 17 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"text/tabwriter"
"time"

"github.com/scylladb/gemini/store"

"github.com/briandowns/spinner"
"github.com/pkg/errors"
"github.com/scylladb/gemini"
Expand Down Expand Up @@ -44,11 +46,16 @@ const (
)

type Status struct {
WriteOps int `json:"write_ops"`
WriteErrors int `json:"write_errors"`
ReadOps int `json:"read_ops"`
ReadErrors int `json:"read_errors"`
Errors []gemini.JobError `json:"errors,omitempty"`
WriteOps int `json:"write_ops"`
WriteErrors int `json:"write_errors"`
ReadOps int `json:"read_ops"`
ReadErrors int `json:"read_errors"`
Errors []JobError `json:"errors,omitempty"`
}

type JobError struct {
Message string `json:"message"`
Query string `json:"query"`
}

type Results interface {
Expand All @@ -60,7 +67,7 @@ func interactive() bool {
return !nonInteractive
}

type testJob func(context.Context, *sync.WaitGroup, *gemini.Schema, gemini.Table, *gemini.Session, gemini.PartitionRange, chan Status, string, *os.File)
type testJob func(context.Context, *sync.WaitGroup, *gemini.Schema, *gemini.Table, store.Store, gemini.PartitionRange, chan Status, string, *os.File)

func (r *Status) Merge(sum *Status) Status {
sum.WriteOps += r.WriteOps
Expand Down Expand Up @@ -170,15 +177,15 @@ func run(cmd *cobra.Command, args []string) {
jsonSchema, _ := json.MarshalIndent(schema, "", " ")
fmt.Printf("Schema: %v\n", string(jsonSchema))

session := gemini.NewSession(testClusterHost, oracleClusterHost)
defer session.Close()
store := store.New(schema, testClusterHost, oracleClusterHost)
defer store.Close()

if dropSchema && mode != readMode {
for _, stmt := range schema.GetDropSchema() {
if verbose {
fmt.Println(stmt)
}
if err := session.Mutate(createBuilder{stmt: stmt}); err != nil {
if err := store.Mutate(createBuilder{stmt: stmt}); err != nil {
fmt.Printf("%v", err)
return
}
Expand All @@ -188,16 +195,16 @@ func run(cmd *cobra.Command, args []string) {
if verbose {
fmt.Println(stmt)
}
if err := session.Mutate(createBuilder{stmt: stmt}); err != nil {
if err := store.Mutate(createBuilder{stmt: stmt}); err != nil {
fmt.Printf("%v", err)
return
}
}

runJob(Job, schema, session, mode, outFile)
runJob(Job, schema, store, mode, outFile)
}

func runJob(f testJob, schema *gemini.Schema, s *gemini.Session, mode string, out *os.File) {
func runJob(f testJob, schema *gemini.Schema, s store.Store, mode string, out *os.File) {
c := make(chan Status)
minRange := 0
maxRange := pkNumberPerThread
Expand Down Expand Up @@ -267,7 +274,7 @@ func runJob(f testJob, schema *gemini.Schema, s *gemini.Session, mode string, ou
reporter.Wait()
}

func mutationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status, out *os.File) {
func mutationJob(schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) {
mutateStmt, err := schema.GenMutateStmt(table, &p)
if err != nil {
fmt.Printf("Failed! Mutation statement generation failed: '%v'\n", err)
Expand All @@ -280,7 +287,7 @@ func mutationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p
fmt.Println(mutateStmt.PrettyCQL())
}
if err := s.Mutate(mutateQuery, mutateValues...); err != nil {
e := gemini.JobError{
e := JobError{
Message: "Mutation failed: " + err.Error(),
Query: mutateStmt.PrettyCQL(),
}
Expand All @@ -291,7 +298,7 @@ func mutationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p
}
}

func validationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, testStatus *Status, out *os.File) {
func validationJob(schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) {
checkStmt := schema.GenCheckStmt(table, &p)
checkQuery := checkStmt.Query
checkValues := checkStmt.Values()
Expand All @@ -300,7 +307,7 @@ func validationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session,
}
if err := s.Check(table, checkQuery, checkValues...); err != nil {
// De-duplication needed?
e := gemini.JobError{
e := JobError{
Message: "Validation failed: " + err.Error(),
Query: checkStmt.PrettyCQL(),
}
Expand All @@ -311,7 +318,7 @@ func validationJob(schema *gemini.Schema, table gemini.Table, s *gemini.Session,
}
}

func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table gemini.Table, s *gemini.Session, p gemini.PartitionRange, c chan Status, mode string, out *os.File) {
func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, c chan Status, mode string, out *os.File) {
defer wg.Done()
testStatus := Status{}

Expand Down
30 changes: 15 additions & 15 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *Stmt) PrettyCQL() string {

type Schema struct {
Keyspace Keyspace `json:"keyspace"`
Tables []Table `json:"tables"`
Tables []*Table `json:"tables"`
}

type PartitionRange struct {
Expand Down Expand Up @@ -202,7 +202,7 @@ func GenSchema() *Schema {
KnownIssuesJsonWithTuples: true,
},
}
builder.Table(table)
builder.Table(&table)
return builder.Build()
}

Expand Down Expand Up @@ -288,7 +288,7 @@ func (s *Schema) GetCreateSchema() []string {
return stmts
}

func (s *Schema) GenInsertStmt(t Table, p *PartitionRange) (*Stmt, error) {
func (s *Schema) GenInsertStmt(t *Table, p *PartitionRange) (*Stmt, error) {
var (
typs []Type
)
Expand Down Expand Up @@ -323,7 +323,7 @@ func (s *Schema) GenInsertStmt(t Table, p *PartitionRange) (*Stmt, error) {
}, nil
}

func (s *Schema) GenInsertJsonStmt(t Table, p *PartitionRange) (*Stmt, error) {
func (s *Schema) GenInsertJsonStmt(t *Table, p *PartitionRange) (*Stmt, error) {
values := make(map[string]interface{})
values = t.PartitionKeys.ToJSONMap(values, p)
values = t.ClusteringKeys.ToJSONMap(values, p)
Expand All @@ -344,7 +344,7 @@ func (s *Schema) GenInsertJsonStmt(t Table, p *PartitionRange) (*Stmt, error) {
}, nil
}

func (s *Schema) GenDeleteRows(t Table, p *PartitionRange) (*Stmt, error) {
func (s *Schema) GenDeleteRows(t *Table, p *PartitionRange) (*Stmt, error) {
var (
values []interface{}
typs []Type
Expand All @@ -370,7 +370,7 @@ func (s *Schema) GenDeleteRows(t Table, p *PartitionRange) (*Stmt, error) {
}, nil
}

func (s *Schema) GenMutateStmt(t Table, p *PartitionRange) (*Stmt, error) {
func (s *Schema) GenMutateStmt(t *Table, p *PartitionRange) (*Stmt, error) {
switch n := p.Rand.Intn(1000); n {
case 10, 100:
return s.GenDeleteRows(t, p)
Expand All @@ -387,7 +387,7 @@ func (s *Schema) GenMutateStmt(t Table, p *PartitionRange) (*Stmt, error) {
}
}

func (s *Schema) GenCheckStmt(t Table, p *PartitionRange) *Stmt {
func (s *Schema) GenCheckStmt(t *Table, p *PartitionRange) *Stmt {
var n int
if len(t.Indexes) > 0 {
n = p.Rand.Intn(5)
Expand All @@ -409,7 +409,7 @@ func (s *Schema) GenCheckStmt(t Table, p *PartitionRange) *Stmt {
return nil
}

func (s *Schema) genSinglePartitionQuery(t Table, p *PartitionRange) *Stmt {
func (s *Schema) genSinglePartitionQuery(t *Table, p *PartitionRange) *Stmt {
tableName := t.Name
partitionKeys := t.PartitionKeys
if len(t.MaterializedViews) > 0 && p.Rand.Int()%2 == 0 {
Expand All @@ -434,7 +434,7 @@ func (s *Schema) genSinglePartitionQuery(t Table, p *PartitionRange) *Stmt {
}
}

func (s *Schema) genMultiplePartitionQuery(t Table, p *PartitionRange) *Stmt {
func (s *Schema) genMultiplePartitionQuery(t *Table, p *PartitionRange) *Stmt {
var (
values []interface{}
typs []Type
Expand Down Expand Up @@ -467,7 +467,7 @@ func (s *Schema) genMultiplePartitionQuery(t Table, p *PartitionRange) *Stmt {
}
}

func (s *Schema) genClusteringRangeQuery(t Table, p *PartitionRange) *Stmt {
func (s *Schema) genClusteringRangeQuery(t *Table, p *PartitionRange) *Stmt {
var (
values []interface{}
typs []Type
Expand Down Expand Up @@ -508,7 +508,7 @@ func (s *Schema) genClusteringRangeQuery(t Table, p *PartitionRange) *Stmt {
}
}

func (s *Schema) genMultiplePartitionClusteringRangeQuery(t Table, p *PartitionRange) *Stmt {
func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, p *PartitionRange) *Stmt {
var (
values []interface{}
typs []Type
Expand Down Expand Up @@ -555,7 +555,7 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t Table, p *PartitionR
}
}

func (s *Schema) genSingleIndexQuery(t Table, p *PartitionRange) *Stmt {
func (s *Schema) genSingleIndexQuery(t *Table, p *PartitionRange) *Stmt {
var (
values []interface{}
typs []Type
Expand Down Expand Up @@ -591,21 +591,21 @@ func (s *Schema) genSingleIndexQuery(t Table, p *PartitionRange) *Stmt {

type SchemaBuilder interface {
Keyspace(Keyspace) SchemaBuilder
Table(Table) SchemaBuilder
Table(*Table) SchemaBuilder
Build() *Schema
}

type schemaBuilder struct {
keyspace Keyspace
tables []Table
tables []*Table
}

func (s *schemaBuilder) Keyspace(keyspace Keyspace) SchemaBuilder {
s.keyspace = keyspace
return s
}

func (s *schemaBuilder) Table(table Table) SchemaBuilder {
func (s *schemaBuilder) Table(table *Table) SchemaBuilder {
s.tables = append(s.tables, table)
return s
}
Expand Down
Loading

0 comments on commit 959c209

Please sign in to comment.