Skip to content

Commit

Permalink
workload/schemachange: allow transaction retry errors
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
jayshrivastava committed Nov 25, 2020
1 parent 0bce752 commit 868a286
Showing 1 changed file with 49 additions and 11 deletions.
60 changes: 49 additions & 11 deletions pkg/workload/schemachange/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,20 +268,43 @@ func (w *schemaChangeWorker) runInTxn(tx *pgx.Tx) (string, error) {

if _, err = tx.Exec(op); err != nil {
if w.opGen.screenForExecErrors {
if w.opGen.expectedExecErrors.empty() {
log.WriteString(fmt.Sprintf("***FAIL; Expected no errors, but got %v\n", err))
// If the error not an instance of pgx.PgError, then it is unexpected.
pgErr := pgx.PgError{}
if !errors.As(err, &pgErr) {
log.WriteString(fmt.Sprintf("***FAIL; Non pg error %v\n", err))
return log.String(), errors.Mark(
errors.Wrap(err, "***UNEXPECTED ERROR"),
errRunInTxnFatalSentinel,
)
} else if pgErr := (pgx.PgError{}); !errors.As(err, &pgErr) || errors.As(err, &pgErr) && !w.opGen.expectedExecErrors.empty() && !w.opGen.expectedExecErrors.contains(pgcode.MakeCode(pgErr.Code)) {
log.WriteString(fmt.Sprintf("***FAIL; Expected one of SQLSTATES %s, but got %v\n", w.opGen.expectedExecErrors.string(), err))
}

// Transaction retry errors are acceptable. Allow the transaction
// to rollback.
if pgcode.MakeCode(pgErr.Code) == pgcode.SerializationFailure {
log.WriteString(fmt.Sprintf("ROLLBACK; %v\n", err))
w.recordInHist(timeutil.Since(start), txnRollback)
return log.String(), errors.Mark(
err,
errRunInTxnRbkSentinel,
)
}

// Screen for any unexpected errors.
if w.opGen.expectedExecErrors.empty() || !w.opGen.expectedExecErrors.contains(pgcode.MakeCode(pgErr.Code)) {
var logMsg string
if w.opGen.expectedExecErrors.empty() {
logMsg = fmt.Sprintf("***FAIL; Expected no errors, but got %v\n", err)
} else {
logMsg = fmt.Sprintf("***FAIL; Expected one of SQLSTATES %s, but got %v\n", w.opGen.expectedExecErrors.string(), err)
}
log.WriteString(logMsg)
return log.String(), errors.Mark(
errors.Wrap(err, "***UNEXPECTED ERROR"),
errRunInTxnFatalSentinel,
)
}

// Rollback because the error was anticipated.
log.WriteString(fmt.Sprintf("ROLLBACK; expected SQLSTATE(S) %s, and got %v\n", w.opGen.expectedExecErrors.string(), err))
w.recordInHist(timeutil.Since(start), txnRollback)
return log.String(), errors.Mark(
Expand Down Expand Up @@ -348,14 +371,29 @@ func (w *schemaChangeWorker) run(_ context.Context) error {
}
}

// If there were no errors commit the txn.
histBin := txnOk
cmtErrMsg := ""
if err = tx.Commit(); err != nil {
histBin = txnCommitError
cmtErrMsg = fmt.Sprintf("***FAIL: %v", err)
// If the error not an instance of pgx.PgError, then it is unexpected.
pgErr := pgx.PgError{}
if !errors.As(err, &pgErr) {
w.recordInHist(timeutil.Since(start), txnCommitError)
logs = logs + fmt.Sprintf("***FAIL; Non pg error %v\n", err)
return errors.Mark(
errors.Wrap(err, "***UNEXPECTED ERROR"),
errRunInTxnFatalSentinel,
)
}

// Transaction retry errors are acceptable. Allow the transaction
// to rollback.
if pgcode.MakeCode(pgErr.Code) == pgcode.SerializationFailure {
w.recordInHist(timeutil.Since(start), txnCommitError)
logs = logs + fmt.Sprintf("COMMIT; %v\n", err)
return nil
}
}
w.recordInHist(timeutil.Since(start), histBin)
logs = logs + fmt.Sprintf("COMMIT; %s\n", cmtErrMsg)

// If there were no errors while committing the txn.
w.recordInHist(timeutil.Since(start), txnOk)
logs = logs + "COMMIT; \n"
return nil
}

0 comments on commit 868a286

Please sign in to comment.