Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(jobs): make them finish test properly when it is stopped #363

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 50 additions & 41 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ var (
warmup = job{name: warmupName, function: warmupJob}
validate = job{name: validateName, function: validationJob}
mutate = job{name: mutateName, function: mutationJob}

errorJobTerminated = errors.New("job terminated")
)

type List struct {
Expand Down Expand Up @@ -183,21 +181,17 @@ func mutationJob(
return nil
case hb := <-pump:
time.Sleep(hb)
ind := r.Intn(1000000)
if ind%100000 == 0 {
if err := ddl(ctx, schema, schemaConfig, table, s, r, p, globalStatus, logger, verbose); err != nil {
return err
}
} else {
if err := mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, true, logger); err != nil {
return err
}
}
if failFast && globalStatus.HasErrors() {
return errorJobTerminated
}
}

ind := r.Intn(1000000)
if ind%100000 == 0 {
_ = ddl(ctx, schema, schemaConfig, table, s, r, p, globalStatus, logger, verbose)
} else {
_ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, true, logger)
}
if failFast && globalStatus.HasErrors() {
stopFlag.SetSoft()
return nil
}
}
}

Expand Down Expand Up @@ -234,26 +228,31 @@ func validationJob(
return nil
case hb := <-pump:
time.Sleep(hb)
stmt := GenCheckStmt(schema, table, g, r, p)
if stmt == nil {
logger.Info("Validation. No statement generated from GenCheckStmt.")
continue
}
}
stmt := GenCheckStmt(schema, table, g, r, p)
if stmt == nil {
logger.Info("Validation. No statement generated from GenCheckStmt.")
continue
}

if err := validation(ctx, schemaConfig, table, s, stmt, g, globalStatus, logger); err != nil {
globalStatus.AddReadError(&joberror.JobError{
Timestamp: time.Now(),
StmtType: stmt.QueryType.ToString(),
Message: "Validation failed: " + err.Error(),
Query: stmt.PrettyCQL(),
})
} else {
globalStatus.ReadOps.Add(1)
}
err := validation(ctx, schemaConfig, table, s, stmt, g, globalStatus, logger)
switch {
case err == nil:
globalStatus.ReadOps.Add(1)
case errors.Is(err, context.Canceled):
return nil
default:
globalStatus.AddReadError(&joberror.JobError{
Timestamp: time.Now(),
StmtType: stmt.QueryType.ToString(),
Message: "Validation failed: " + err.Error(),
Query: stmt.PrettyCQL(),
})
}

if failFast && globalStatus.HasErrors() {
return errorJobTerminated
}
if failFast && globalStatus.HasErrors() {
stopFlag.SetSoft()
return nil
}
}
}
Expand Down Expand Up @@ -283,20 +282,19 @@ func warmupJob(
}()
for {
if stopFlag.IsHardOrSoft() {
logger.Debug("warmup job terminated")
return nil
}
select {
case <-ctx.Done():
logger.Debug("warmup job terminated")
return nil

default:
// Do we care about errors during warmup?
_ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, false, logger)
if failFast && globalStatus.HasErrors() {
return errorJobTerminated
}
}
// Do we care about errors during warmup?
_ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, false, logger)
if failFast && globalStatus.HasErrors() {
stopFlag.SetSoft()
return nil
}
}
}
Expand Down Expand Up @@ -340,6 +338,9 @@ func ddl(
w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL()))
}
if err = s.Mutate(ctx, ddlStmt.Query); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
globalStatus.AddWriteError(&joberror.JobError{
Timestamp: time.Now(),
StmtType: ddlStmts.QueryType.ToString(),
Expand Down Expand Up @@ -394,6 +395,9 @@ func mutation(
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
}
if err = s.Mutate(ctx, mutateQuery, mutateValues...); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
globalStatus.AddWriteError(&joberror.JobError{
Timestamp: time.Now(),
StmtType: mutateStmt.QueryType.ToString(),
Expand Down Expand Up @@ -447,6 +451,11 @@ func validation(
}
return nil
}
if errors.Is(err, context.Canceled) {
// When context is canceled it means that test was commanded to stop
// to skip logging part it is returned here
return err
}
if attempt == maxAttempts {
break
}
Expand Down
26 changes: 1 addition & 25 deletions pkg/store/cqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/scylladb/gocqlx/v2/qb"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/scylladb/gemini/pkg/typedef"
Expand Down Expand Up @@ -76,16 +75,6 @@ func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Ti
query = query.WithTimestamp(ts.UnixNano() / 1000)
}

/*
key, _ := query.GetRoutingKey()
if len(values) >= 2 {
v := values[:2]
s := strings.TrimRight(strings.Repeat("%v,", 2), ",")
format := fmt.Sprintf("{\nvalues: []interface{}{%s},\nwant: createOne(\"%s\"),\n},\n", s, hex.EncodeToString(key))
fmt.Printf(format, v...)
}
if err := query.Exec(); err != nil {
*/
if err := query.Exec(); err != nil {
if errs.Is(err, context.DeadlineExceeded) {
if w := cs.logger.Check(zap.DebugLevel, "deadline exceeded for mutation query"); w != nil {
Expand All @@ -103,20 +92,7 @@ func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values []inter
query, _ := builder.ToCql()
iter := cs.session.Query(query, values...).WithContext(ctx).Iter()
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
defer func() {
if e := iter.Close(); e != nil {
if errs.Is(e, context.DeadlineExceeded) {
if w := cs.logger.Check(zap.DebugLevel, "deadline exceeded for load query"); w != nil {
w.Write(zap.String("system", cs.system), zap.String("query", query), zap.Error(e))
}
}
if !ignore(e) {
err = multierr.Append(err, errors.Errorf("system failed: %s", e.Error()))
}
}
}()
result = loadSet(iter)
return
return loadSet(iter), iter.Close()
}

func (cs cqlStore) close() error {
Expand Down
8 changes: 2 additions & 6 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,11 @@ func mutate(ctx context.Context, s storeLoader, ts time.Time, builder qb.Builder
}

func (ds delegatingStore) Check(ctx context.Context, table *typedef.Table, builder qb.Builder, values ...interface{}) error {
testRows, err := load(ctx, ds.testStore, builder, values)
testRows, err := ds.testStore.load(ctx, builder, values)
if err != nil {
return errors.Wrapf(err, "unable to load check data from the test store")
}
oracleRows, err := load(ctx, ds.oracleStore, builder, values)
oracleRows, err := ds.oracleStore.load(ctx, builder, values)
if err != nil {
return errors.Wrapf(err, "unable to load check data from the oracle store")
}
Expand Down Expand Up @@ -223,10 +223,6 @@ func (ds delegatingStore) Check(ctx context.Context, table *typedef.Table, build
return nil
}

func load(ctx context.Context, l loader, builder qb.Builder, values []interface{}) ([]map[string]interface{}, error) {
return l.load(ctx, builder, values)
}

func (ds delegatingStore) Close() (err error) {
err = multierr.Append(err, ds.testStore.close())
err = multierr.Append(err, ds.oracleStore.close())
Expand Down