diff --git a/CHANGELOG.md b/CHANGELOG.md index 72aaba3f..5f4699d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Unreleased +- Ensure proper termination when errors happen. - Fix mutation timestamps to match on system under test and test oracle. - Gemini now tries to perform mutation on both systems regardless of if one of them fail. diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index d5af82cb..90d584d1 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -185,7 +185,7 @@ func run(cmd *cobra.Command, args []string) { if verbose { fmt.Println(stmt) } - if err := store.Mutate(createBuilder{stmt: stmt}); err != nil { + if err := store.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil { fmt.Printf("%v", err) return } @@ -195,7 +195,7 @@ func run(cmd *cobra.Command, args []string) { if verbose { fmt.Println(stmt) } - if err := store.Mutate(createBuilder{stmt: stmt}); err != nil { + if err := store.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil { fmt.Printf("%v", err) return } @@ -244,12 +244,12 @@ func runJob(f testJob, schema *gemini.Schema, s store.Store, mode string, out *o for { select { case <-timer.C: + cancelWorkers() + testRes = drain(c, testRes) testRes.PrintResult(out) fmt.Println("Test run completed. Exiting.") - cancelWorkers() return case <-reporterCtx.Done(): - testRes.PrintResult(out) return case res := <-c: testRes = res.Merge(&testRes) @@ -257,24 +257,26 @@ func runJob(f testJob, schema *gemini.Schema, s store.Store, mode string, out *o sp.Suffix = fmt.Sprintf(" Running Gemini... %v", testRes) } if testRes.ReadErrors > 0 { - testRes.PrintResult(out) - fmt.Println(testRes.Errors) if failFast { fmt.Println("Error in data validation. Exiting.") cancelWorkers() + testRes = drain(c, testRes) + testRes.PrintResult(out) return } + testRes.PrintResult(out) } } } }(duration) workers.Wait() + close(c) cancelReporter() reporter.Wait() } -func mutationJob(schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) { +func mutationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) { mutateStmt, err := schema.GenMutateStmt(table, &p) if err != nil { fmt.Printf("Failed! Mutation statement generation failed: '%v'\n", err) @@ -286,7 +288,7 @@ func mutationJob(schema *gemini.Schema, table *gemini.Table, s store.Store, p ge if verbose { fmt.Println(mutateStmt.PrettyCQL()) } - if err := s.Mutate(mutateQuery, mutateValues...); err != nil { + if err := s.Mutate(ctx, mutateQuery, mutateValues...); err != nil { e := JobError{ Timestamp: time.Now(), Message: "Mutation failed: " + err.Error(), @@ -299,14 +301,14 @@ func mutationJob(schema *gemini.Schema, table *gemini.Table, s store.Store, p ge } } -func validationJob(schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) { +func validationJob(ctx context.Context, schema *gemini.Schema, table *gemini.Table, s store.Store, p gemini.PartitionRange, testStatus *Status, out *os.File) { checkStmt := schema.GenCheckStmt(table, &p) checkQuery := checkStmt.Query checkValues := checkStmt.Values() if verbose { fmt.Println(checkStmt.PrettyCQL()) } - if err := s.Check(table, checkQuery, checkValues...); err != nil { + if err := s.Check(ctx, table, checkQuery, checkValues...); err != nil { // De-duplication needed? e := JobError{ Timestamp: time.Now(), @@ -333,15 +335,15 @@ func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table * } switch mode { case writeMode: - mutationJob(schema, table, s, p, &testStatus, out) + mutationJob(ctx, schema, table, s, p, &testStatus, out) case readMode: - validationJob(schema, table, s, p, &testStatus, out) + validationJob(ctx, schema, table, s, p, &testStatus, out) default: ind := p.Rand.Intn(100000) % 2 if ind == 0 { - mutationJob(schema, table, s, p, &testStatus, out) + mutationJob(ctx, schema, table, s, p, &testStatus, out) } else { - validationJob(schema, table, s, p, &testStatus, out) + validationJob(ctx, schema, table, s, p, &testStatus, out) } } @@ -349,7 +351,7 @@ func Job(ctx context.Context, wg *sync.WaitGroup, schema *gemini.Schema, table * c <- testStatus testStatus = Status{} } - if failFast && testStatus.ReadErrors > 0 { + if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) { break } i++ @@ -405,3 +407,10 @@ func printSetup() error { tw.Flush() return nil } + +func drain(ch chan Status, testRes Status) Status { + for res := range ch { + testRes = res.Merge(&testRes) + } + return testRes +} diff --git a/store/cqlstore.go b/store/cqlstore.go index b05940df..05db1f3c 100644 --- a/store/cqlstore.go +++ b/store/cqlstore.go @@ -1,6 +1,7 @@ package store import ( + "context" "time" "github.com/gocql/gocql" @@ -15,24 +16,24 @@ type cqlStore struct { schema *gemini.Schema } -func (cs *cqlStore) mutate(builder qb.Builder, ts time.Time, values ...interface{}) error { +func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, ts time.Time, values ...interface{}) error { query, _ := builder.ToCql() var tsUsec int64 = ts.UnixNano() / 1000 - if err := cs.session.Query(query, values...).WithTimestamp(tsUsec).Exec(); err != nil { + if err := cs.session.Query(query, values...).WithContext(ctx).WithTimestamp(tsUsec).Exec(); !ignore(err) { return errors.Errorf("%v [cluster = test, query = '%s']", err, query) } return nil } -func (cs *cqlStore) load(builder qb.Builder, values []interface{}) (result []map[string]interface{}, err error) { +func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values []interface{}) (result []map[string]interface{}, err error) { query, _ := builder.ToCql() - testIter := cs.session.Query(query, values...).Iter() - oracleIter := cs.session.Query(query, values...).Iter() + testIter := cs.session.Query(query, values...).WithContext(ctx).Iter() + oracleIter := cs.session.Query(query, values...).WithContext(ctx).Iter() defer func() { - if e := testIter.Close(); e != nil { + if e := testIter.Close(); !ignore(e) { err = multierr.Append(err, errors.Errorf("test system failed: %s", e.Error())) } - if e := oracleIter.Close(); e != nil { + if e := oracleIter.Close(); !ignore(e) { err = multierr.Append(err, errors.Errorf("oracle failed: %s", e.Error())) } }() @@ -54,3 +55,15 @@ func newSession(hosts []string) *gocql.Session { } return session } + +func ignore(err error) bool { + if err == nil { + return true + } + switch err { + case context.Canceled, context.DeadlineExceeded: + return true + default: + return false + } +} diff --git a/store/store.go b/store/store.go index 6cdd52cd..74c9e2ee 100644 --- a/store/store.go +++ b/store/store.go @@ -1,6 +1,7 @@ package store import ( + "context" "fmt" "math/big" "sort" @@ -18,11 +19,11 @@ import ( ) type loader interface { - load(qb.Builder, []interface{}) ([]map[string]interface{}, error) + load(context.Context, qb.Builder, []interface{}) ([]map[string]interface{}, error) } type storer interface { - mutate(qb.Builder, time.Time, ...interface{}) error + mutate(context.Context, qb.Builder, time.Time, ...interface{}) error } type storeLoader interface { @@ -32,8 +33,8 @@ type storeLoader interface { } type Store interface { - Mutate(qb.Builder, ...interface{}) error - Check(*gemini.Table, qb.Builder, ...interface{}) error + Mutate(context.Context, qb.Builder, ...interface{}) error + Check(context.Context, *gemini.Table, qb.Builder, ...interface{}) error Close() error } @@ -55,23 +56,23 @@ type delegatingStore struct { testStore storeLoader } -func (ds delegatingStore) Mutate(builder qb.Builder, values ...interface{}) error { +func (ds delegatingStore) Mutate(ctx context.Context, builder qb.Builder, values ...interface{}) error { ts := time.Now() - if err := ds.testStore.mutate(builder, ts, values...); err != nil { + if err := ds.testStore.mutate(ctx, builder, ts, values...); err != nil { return errors.Wrapf(err, "unable to apply mutations to the test store") } - if err := ds.oracleStore.mutate(builder, ts, values...); err != nil { + if err := ds.oracleStore.mutate(ctx, builder, ts, values...); err != nil { return errors.Wrapf(err, "unable to apply mutations to the oracle store") } return nil } -func (ds delegatingStore) Check(table *gemini.Table, builder qb.Builder, values ...interface{}) error { - testRows, err := load(ds.testStore, builder, values) +func (ds delegatingStore) Check(ctx context.Context, table *gemini.Table, builder qb.Builder, values ...interface{}) error { + testRows, err := load(ctx, ds.testStore, builder, values) if err != nil { return errors.Wrapf(err, "unable to load check data from the test store") } - oracleRows, err := load(ds.oracleStore, builder, values) + oracleRows, err := load(ctx, ds.oracleStore, builder, values) if err != nil { return errors.Wrapf(err, "unable to load check data from the test store") } @@ -109,8 +110,8 @@ func (ds delegatingStore) Check(table *gemini.Table, builder qb.Builder, values return nil } -func load(l loader, builder qb.Builder, values []interface{}) ([]map[string]interface{}, error) { - return l.load(builder, values) +func load(ctx context.Context, l loader, builder qb.Builder, values []interface{}) ([]map[string]interface{}, error) { + return l.load(ctx, builder, values) } func (ds delegatingStore) Close() (err error) {