diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index a312245fd419..dfa1aa84913f 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -18,6 +18,7 @@ import ( "os" "path/filepath" "reflect" + "regexp" "runtime/pprof" "sort" "strings" @@ -624,6 +625,53 @@ func TestRegistryLifecycle(t *testing.T) { rts.resumeCheckCh <- struct{}{} rts.check(t, jobs.StatusRunning) + r, err := regexp.Compile("retry txn") + require.NoError(t, err) + + executeWithRetriableTxn := func(db *gosql.DB, fn func(txn *gosql.Tx) error) error { + txn, err := db.Begin() + if err != nil { + return err + } + defer func() { + if err != nil { + _ = txn.Rollback() + } + + }() + + _, err = txn.Exec("SAVEPOINT cockroach_restart") + if err != nil { + return err + } + + maxRetries := 10 + retryCount := 0 + for { + err = fn(txn) + if err == nil { + _, err = txn.Exec("RELEASE SAVEPOINT cockroach_restart") + if err == nil { + return txn.Commit() + } + } + + if !r.MatchString(err.Error()) { + return err + } + + _, rollbackErr := txn.Exec("ROLLBACK TO SAVEPOINT cockroach_restart") + if rollbackErr != nil { + return errors.CombineErrors(rollbackErr, err) + } + + retryCount++ + if retryCount > maxRetries { + return errors.Wrap(err, "retries exhausted") + } + } + } + // Rollback a CANCEL. { txn, err := rts.outerDB.Begin() @@ -661,19 +709,18 @@ func TestRegistryLifecycle(t *testing.T) { } // Now pause it for reals. { - txn, err := rts.outerDB.Begin() + err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error { + if _, err := txn.Exec("PAUSE JOB $1", job.ID()); err != nil { + return err + } + // Not committed yet, so state shouldn't have changed. + // Don't check status in txn. + rts.check(t, "") + return nil + }) if err != nil { t.Fatal(err) } - if _, err := txn.Exec("PAUSE JOB $1", job.ID()); err != nil { - t.Fatal(err) - } - // Not committed yet, so state shouldn't have changed. - // Don't check status in txn. - rts.check(t, "") - if err := txn.Commit(); err != nil { - t.Fatal(err) - } rts.check(t, jobs.StatusPaused) } // Rollback a RESUME. @@ -692,19 +739,18 @@ func TestRegistryLifecycle(t *testing.T) { } // Commit a RESUME. { - txn, err := rts.outerDB.Begin() + err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error { + if _, err := txn.Exec("RESUME JOB $1", job.ID()); err != nil { + return err + } + // Not committed yet, so state shouldn't have changed. + // Don't check status in txn. + rts.check(t, "") + return nil + }) if err != nil { t.Fatal(err) } - if _, err := txn.Exec("RESUME JOB $1", job.ID()); err != nil { - t.Fatal(err) - } - // Not committed yet, so state shouldn't have changed. - // Don't check status in txn. - rts.check(t, "") - if err := txn.Commit(); err != nil { - t.Fatal(err) - } } rts.mu.e.ResumeStart = true rts.check(t, jobs.StatusRunning)