Skip to content

Commit

Permalink
Merge 'gemini: terminating properly upon an error' from Henrik
Browse files Browse the repository at this point in the history
"We ensure that we drain the remaining work after a quit signal is
detected. This ensures that no workers get blocked on trying to
send a status message after the receiving end has stopped listening.

We also send down the termination context to the driver to allow
it to abort what it is doing as fast as possible.

Fixes: #106"
  • Loading branch information
penberg authored May 20, 2019
2 parents 5f072a4 + c8b2425 commit 8a1b07d
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Unreleased

- Ensure proper termination when errors happen.
- Fix mutation timestamps to match on system under test and test oracle.
- Gemini now tries to perform mutation on both systems regardless of
if one of them fail.
Expand Down
39 changes: 24 additions & 15 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func run(cmd *cobra.Command, args []string) {
if verbose {
fmt.Println(stmt)
}
if err := store.Mutate(createBuilder{stmt: stmt}); err != nil {
if err := store.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
fmt.Printf("%v", err)
return
}
Expand All @@ -195,7 +195,7 @@ func run(cmd *cobra.Command, args []string) {
if verbose {
fmt.Println(stmt)
}
if err := store.Mutate(createBuilder{stmt: stmt}); err != nil {
if err := store.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
fmt.Printf("%v", err)
return
}
Expand Down Expand Up @@ -244,37 +244,39 @@ func runJob(f testJob, schema *gemini.Schema, s store.Store, mode string, out *o
for {
select {
case <-timer.C:
cancelWorkers()
testRes = drain(c, testRes)
testRes.PrintResult(out)
fmt.Println("Test run completed. Exiting.")
cancelWorkers()
return
case <-reporterCtx.Done():
testRes.PrintResult(out)
return
case res := <-c:
testRes = res.Merge(&testRes)
if sp != nil {
sp.Suffix = fmt.Sprintf(" Running Gemini... %v", testRes)
}
if testRes.ReadErrors > 0 {
testRes.PrintResult(out)
fmt.Println(testRes.Errors)
if failFast {
fmt.Println("Error in data validation. Exiting.")
cancelWorkers()
testRes = drain(c, testRes)
testRes.PrintResult(out)
return
}
testRes.PrintResult(out)
}
}
}
}(duration)

workers.Wait()
close(c)
cancelReporter()
reporter.Wait()
}

func mutationJob(schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) {
func mutationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) {
mutateStmt, err := schema.GenMutateStmt(table, &p)
if err != nil {
fmt.Printf("Failed! Mutation statement generation failed: '%v'\n", err)
Expand All @@ -286,7 +288,7 @@ func mutationJob(schema *gemini.Schema, table *gemini.Table, s store.Store, p ge
if verbose {
fmt.Println(mutateStmt.PrettyCQL())
}
if err := s.Mutate(mutateQuery, mutateValues...); err != nil {
if err := s.Mutate(ctx, mutateQuery, mutateValues...); err != nil {
e := JobError{
Timestamp: time.Now(),
Message: "Mutation failed: " + err.Error(),
Expand All @@ -299,14 +301,14 @@ func mutationJob(schema *gemini.Schema, table *gemini.Table, s store.Store, p ge
}
}

func validationJob(schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) {
func validationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) {
checkStmt := schema.GenCheckStmt(table, &p)
checkQuery := checkStmt.Query
checkValues := checkStmt.Values()
if verbose {
fmt.Println(checkStmt.PrettyCQL())
}
if err := s.Check(table, checkQuery, checkValues...); err != nil {
if err := s.Check(ctx, table, checkQuery, checkValues...); err != nil {
// De-duplication needed?
e := JobError{
Timestamp: time.Now(),
Expand All @@ -333,23 +335,23 @@ func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table *
}
switch mode {
case writeMode:
mutationJob(schema, table, s, p, &testStatus, out)
mutationJob(ctx, schema, table, s, p, &testStatus, out)
case readMode:
validationJob(schema, table, s, p, &testStatus, out)
validationJob(ctx, schema, table, s, p, &testStatus, out)
default:
ind := p.Rand.Intn(100000) % 2
if ind == 0 {
mutationJob(schema, table, s, p, &testStatus, out)
mutationJob(ctx, schema, table, s, p, &testStatus, out)
} else {
validationJob(schema, table, s, p, &testStatus, out)
validationJob(ctx, schema, table, s, p, &testStatus, out)
}
}

if i%1000 == 0 {
c <- testStatus
testStatus = Status{}
}
if failFast && testStatus.ReadErrors > 0 {
if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) {
break
}
i++
Expand Down Expand Up @@ -405,3 +407,10 @@ func printSetup() error {
tw.Flush()
return nil
}

func drain(ch chan Status, testRes Status) Status {
for res := range ch {
testRes = res.Merge(&testRes)
}
return testRes
}
27 changes: 20 additions & 7 deletions store/cqlstore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"context"
"time"

"github.com/gocql/gocql"
Expand All @@ -15,24 +16,24 @@ type cqlStore struct {
schema *gemini.Schema
}

func (cs *cqlStore) mutate(builder qb.Builder, ts time.Time, values ...interface{}) error {
func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, ts time.Time, values ...interface{}) error {
query, _ := builder.ToCql()
var tsUsec int64 = ts.UnixNano() / 1000
if err := cs.session.Query(query, values...).WithTimestamp(tsUsec).Exec(); err != nil {
if err := cs.session.Query(query, values...).WithContext(ctx).WithTimestamp(tsUsec).Exec(); !ignore(err) {
return errors.Errorf("%v [cluster = test, query = '%s']", err, query)
}
return nil
}

func (cs *cqlStore) load(builder qb.Builder, values []interface{}) (result []map[string]interface{}, err error) {
func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values []interface{}) (result []map[string]interface{}, err error) {
query, _ := builder.ToCql()
testIter := cs.session.Query(query, values...).Iter()
oracleIter := cs.session.Query(query, values...).Iter()
testIter := cs.session.Query(query, values...).WithContext(ctx).Iter()
oracleIter := cs.session.Query(query, values...).WithContext(ctx).Iter()
defer func() {
if e := testIter.Close(); e != nil {
if e := testIter.Close(); !ignore(e) {
err = multierr.Append(err, errors.Errorf("test system failed: %s", e.Error()))
}
if e := oracleIter.Close(); e != nil {
if e := oracleIter.Close(); !ignore(e) {
err = multierr.Append(err, errors.Errorf("oracle failed: %s", e.Error()))
}
}()
Expand All @@ -54,3 +55,15 @@ func newSession(hosts []string) *gocql.Session {
}
return session
}

func ignore(err error) bool {
if err == nil {
return true
}
switch err {
case context.Canceled, context.DeadlineExceeded:
return true
default:
return false
}
}
25 changes: 13 additions & 12 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"context"
"fmt"
"math/big"
"sort"
Expand All @@ -18,11 +19,11 @@ import (
)

type loader interface {
load(qb.Builder, []interface{}) ([]map[string]interface{}, error)
load(context.Context, qb.Builder, []interface{}) ([]map[string]interface{}, error)
}

type storer interface {
mutate(qb.Builder, time.Time, ...interface{}) error
mutate(context.Context, qb.Builder, time.Time, ...interface{}) error
}

type storeLoader interface {
Expand All @@ -32,8 +33,8 @@ type storeLoader interface {
}

type Store interface {
Mutate(qb.Builder, ...interface{}) error
Check(*gemini.Table, qb.Builder, ...interface{}) error
Mutate(context.Context, qb.Builder, ...interface{}) error
Check(context.Context, *gemini.Table, qb.Builder, ...interface{}) error
Close() error
}

Expand All @@ -55,23 +56,23 @@ type delegatingStore struct {
testStore storeLoader
}

func (ds delegatingStore) Mutate(builder qb.Builder, values ...interface{}) error {
func (ds delegatingStore) Mutate(ctx context.Context, builder qb.Builder, values ...interface{}) error {
ts := time.Now()
if err := ds.testStore.mutate(builder, ts, values...); err != nil {
if err := ds.testStore.mutate(ctx, builder, ts, values...); err != nil {
return errors.Wrapf(err, "unable to apply mutations to the test store")
}
if err := ds.oracleStore.mutate(builder, ts, values...); err != nil {
if err := ds.oracleStore.mutate(ctx, builder, ts, values...); err != nil {
return errors.Wrapf(err, "unable to apply mutations to the oracle store")
}
return nil
}

func (ds delegatingStore) Check(table *gemini.Table, builder qb.Builder, values ...interface{}) error {
testRows, err := load(ds.testStore, builder, values)
func (ds delegatingStore) Check(ctx context.Context, table *gemini.Table, builder qb.Builder, values ...interface{}) error {
testRows, err := load(ctx, ds.testStore, builder, values)
if err != nil {
return errors.Wrapf(err, "unable to load check data from the test store")
}
oracleRows, err := load(ds.oracleStore, builder, values)
oracleRows, err := load(ctx, ds.oracleStore, builder, values)
if err != nil {
return errors.Wrapf(err, "unable to load check data from the test store")
}
Expand Down Expand Up @@ -109,8 +110,8 @@ func (ds delegatingStore) Check(table *gemini.Table, builder qb.Builder, values
return nil
}

func load(l loader, builder qb.Builder, values []interface{}) ([]map[string]interface{}, error) {
return l.load(builder, values)
func load(ctx context.Context, l loader, builder qb.Builder, values []interface{}) ([]map[string]interface{}, error) {
return l.load(ctx, builder, values)
}

func (ds delegatingStore) Close() (err error) {
Expand Down

0 comments on commit 8a1b07d

Please sign in to comment.