diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 2ff9840f5d1d..9ef242369945 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -590,7 +590,8 @@ func TestBackupRestoreAppend(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderStressRace(t, "test is too large to run under stress race") + skip.UnderStress(t, "test is too large to run under stress") + skip.UnderRace(t, "test is too large to run under race") const numAccounts = 1000 ctx := context.Background() @@ -659,15 +660,18 @@ func TestBackupRestoreAppend(t *testing.T) { sqlDB.ExpectErr(t, "A full backup cannot be written to \"/subdir\", a user defined subdirectory", "BACKUP INTO $4 IN ($1, $2, $3) AS OF SYSTEM TIME "+tsBefore, append(test.collectionsWithSubdir, specifiedSubdir)...) - sqlDB.QueryRow(t, "UPDATE data.bank SET balance = 100 RETURNING cluster_logical_timestamp()").Scan(&ts1) + sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error { + return txn.QueryRow("UPDATE data.bank SET balance = 100 RETURNING cluster_logical_timestamp()").Scan(&ts1) + }) sqlDB.Exec(t, "BACKUP INTO LATEST IN ($1, $2, $3) AS OF SYSTEM TIME "+ts1, test.collections...) // Append to latest again, just to prove we can append to an appended one and // that appended didn't e.g. mess up LATEST. sqlDB.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts1again) sqlDB.Exec(t, "BACKUP INTO LATEST IN ($1, $2, $3) AS OF SYSTEM TIME "+ts1again, test.collections...) - - sqlDB.QueryRow(t, "UPDATE data.bank SET balance = 200 RETURNING cluster_logical_timestamp()").Scan(&ts2) + sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error { + return txn.QueryRow("UPDATE data.bank SET balance = 200 RETURNING cluster_logical_timestamp()").Scan(&ts2) + }) rowsTS2 := sqlDB.QueryStr(t, "SELECT * from data.bank ORDER BY id") // Start a new full-backup in the collection version. diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index 2326eceb2023..ef2dda7daf9c 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -10,6 +10,7 @@ package backupccl import ( "context" + gosql "database/sql" "fmt" "os" "path/filepath" @@ -134,10 +135,19 @@ CREATE TABLE data2.foo (a int); if util.RaceEnabled { numUsers = 10 } - for i := 0; i < numUsers; i++ { - sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i)) - sqlDB.Exec(t, fmt.Sprintf("ALTER USER maxroach%d CREATEDB", i)) - } + + sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error { + for i := 0; i < numUsers; i++ { + if _, err := txn.Exec(fmt.Sprintf("CREATE USER maxroach%d", i)); err != nil { + return err + } + if _, err := txn.Exec(fmt.Sprintf("ALTER USER maxroach%d CREATEDB", i)); err != nil { + return err + } + } + return nil + }) + // Populate system.zones. sqlDB.Exec(t, `ALTER TABLE data.bank CONFIGURE ZONE USING gc.ttlseconds = 3600`) sqlDB.Exec(t, `ALTER TABLE defaultdb.foo CONFIGURE ZONE USING gc.ttlseconds = 45`) @@ -621,10 +631,14 @@ func TestClusterRestoreFailCleanup(t *testing.T) { // Setup the system systemTablesToVerify to ensure that they are copied to the new cluster. // Populate system.users. - for i := 0; i < 1000; i++ { - sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i)) - } - + sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error { + for i := 0; i < 1000; i++ { + if _, err := txn.Exec(fmt.Sprintf("CREATE USER maxroach%d", i)); err != nil { + return err + } + } + return nil + }) sqlDB.Exec(t, `BACKUP TO 'nodelocal://1/missing-ssts'`) // Bugger the backup by removing the SST files. (Note this messes up all of @@ -1082,7 +1096,7 @@ func TestRestoreWithRecreatedDefaultDB(t *testing.T) { sqlDB.Exec(t, ` DROP DATABASE defaultdb; -CREATE DATABASE defaultdb; +CREATE DATABASE defaultdb; `) sqlDB.Exec(t, `BACKUP TO $1`, localFoo) diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 5f3386ecf33d..7ac6954a1cd9 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -18,7 +18,6 @@ import ( "os" "path/filepath" "reflect" - "regexp" "runtime/pprof" "sort" "strings" @@ -644,54 +643,7 @@ func TestRegistryLifecycle(t *testing.T) { rts.mu.e.ResumeStart = true rts.resumeCheckCh <- struct{}{} rts.check(t, jobs.StatusRunning) - - r, err := regexp.Compile("retry txn") - require.NoError(t, err) - - executeWithRetriableTxn := func(db *gosql.DB, fn func(txn *gosql.Tx) error) error { - txn, err := db.Begin() - if err != nil { - return err - } - defer func() { - if err != nil { - _ = txn.Rollback() - } - - }() - - _, err = txn.Exec("SAVEPOINT cockroach_restart") - if err != nil { - return err - } - - maxRetries := 10 - retryCount := 0 - for { - err = fn(txn) - if err == nil { - _, err = txn.Exec("RELEASE SAVEPOINT cockroach_restart") - if err == nil { - return txn.Commit() - } - } - - if !r.MatchString(err.Error()) { - return err - } - - _, rollbackErr := txn.Exec("ROLLBACK TO SAVEPOINT cockroach_restart") - if rollbackErr != nil { - return errors.CombineErrors(rollbackErr, err) - } - - retryCount++ - if retryCount > maxRetries { - return errors.Wrap(err, "retries exhausted") - } - } - } - + rts.sqlDB.MaxTxnRetries = 10 // Rollback a CANCEL. { txn, err := rts.outerDB.Begin() @@ -729,7 +681,7 @@ func TestRegistryLifecycle(t *testing.T) { } // Now pause it for reals. { - err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error { + rts.sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error { if _, err := txn.Exec("PAUSE JOB $1", job.ID()); err != nil { return err } @@ -738,9 +690,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.check(t, "") return nil }) - if err != nil { - t.Fatal(err) - } rts.check(t, jobs.StatusPaused) } // Rollback a RESUME. @@ -759,7 +708,7 @@ func TestRegistryLifecycle(t *testing.T) { } // Commit a RESUME. { - err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error { + rts.sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error { if _, err := txn.Exec("RESUME JOB $1", job.ID()); err != nil { return err } @@ -768,9 +717,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.check(t, "") return nil }) - if err != nil { - t.Fatal(err) - } } rts.mu.e.ResumeStart = true rts.check(t, jobs.StatusRunning) diff --git a/pkg/testutils/sqlutils/BUILD.bazel b/pkg/testutils/sqlutils/BUILD.bazel index eb82add311a7..1c11e2c08f52 100644 --- a/pkg/testutils/sqlutils/BUILD.bazel +++ b/pkg/testutils/sqlutils/BUILD.bazel @@ -58,6 +58,7 @@ go_test( "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", "@com_github_stretchr_testify//require", diff --git a/pkg/testutils/sqlutils/sql_runner.go b/pkg/testutils/sqlutils/sql_runner.go index b9bcb48289d8..4847309dffdc 100644 --- a/pkg/testutils/sqlutils/sql_runner.go +++ b/pkg/testutils/sqlutils/sql_runner.go @@ -31,6 +31,7 @@ import ( type SQLRunner struct { DB DBHandle SucceedsSoonDuration time.Duration // defaults to testutils.DefaultSucceedsSoonDuration + MaxTxnRetries int // defaults to 0 for unlimited retries } // DBHandle is an interface that applies to *gosql.DB, *gosql.Conn, and @@ -319,6 +320,78 @@ func (sr *SQLRunner) CheckQueryResultsRetry(t Fataler, query string, expected [] }) } +type beginner interface { + Begin() (*gosql.Tx, error) +} + +// Begin starts a new transaction. It fails if the underlying DBHandle +// doesn't support starting transactions or if starting the +// transaction fails. +func (sr *SQLRunner) Begin(t Fataler) *gosql.Tx { + helperOrNoop(t)() + b, ok := sr.DB.(beginner) + if !ok { + t.Fatalf("Begin() not supported by this SQLRunner") + } + + txn, err := b.Begin() + if err != nil { + t.Fatalf("%v", err) + } + return txn +} + +// RunWithRetriableTxn starts a transaction and runs the given +// function in the context of that transaction. The transaction is +// commited when the given fuction returns and is automatically +// retried if it hits a retriable error. +func (sr *SQLRunner) RunWithRetriableTxn(t Fataler, fn func(*gosql.Tx) error) { + if err := sr.runWithRetriableTxnImpl(t, fn); err != nil { + t.Fatalf("%v", err) + } +} + +const retryTxnErrorSubstring = "restart transaction" + +func (sr *SQLRunner) runWithRetriableTxnImpl(t Fataler, fn func(*gosql.Tx) error) (err error) { + txn := sr.Begin(t) + defer func() { + if err != nil { + _ = txn.Rollback() + } + + }() + _, err = txn.Exec("SAVEPOINT cockroach_restart") + if err != nil { + return err + } + + retryCount := 0 + for { + err = fn(txn) + if err == nil { + _, err = txn.Exec("RELEASE SAVEPOINT cockroach_restart") + if err == nil { + return txn.Commit() + } + } + + if !strings.Contains(err.Error(), retryTxnErrorSubstring) { + return err + } + + _, rollbackErr := txn.Exec("ROLLBACK TO SAVEPOINT cockroach_restart") + if rollbackErr != nil { + return errors.CombineErrors(rollbackErr, err) + } + + retryCount++ + if sr.MaxTxnRetries > 0 && retryCount > sr.MaxTxnRetries { + return errors.Wrapf(err, "%d retries exhausted", sr.MaxTxnRetries) + } + } +} + // RoundRobinDBHandle aggregates multiple DBHandles into a single one; each time // a query is issued, a handle is selected in round-robin fashion. type RoundRobinDBHandle struct { diff --git a/pkg/testutils/sqlutils/sql_runner_test.go b/pkg/testutils/sqlutils/sql_runner_test.go index c469c09cd83e..55d20585cf95 100644 --- a/pkg/testutils/sqlutils/sql_runner_test.go +++ b/pkg/testutils/sqlutils/sql_runner_test.go @@ -12,6 +12,8 @@ package sqlutils_test import ( "context" + gosql "database/sql" + "fmt" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -19,11 +21,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" ) // Test that the RowsToStrMatrix doesn't swallow errors. func TestRowsToStrMatrixError(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(context.Background()) @@ -39,3 +44,47 @@ func TestRowsToStrMatrixError(t *testing.T) { t.Fatalf("expected 'testing error', got: %v", err) } } + +// TestRunWithRetriableTxn tests that we actually do retry the +// transaction. +func TestRunWithRetriableTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + runner := sqlutils.MakeSQLRunner(db) + runner.Exec(t, "SET inject_retry_errors_enabled=true") + + t.Run("retries transaction restart errors", func(t *testing.T) { + callCount := 0 + runner.RunWithRetriableTxn(t, func(txn *gosql.Tx) error { + callCount++ + _, err := txn.Exec("SELECT crdb_internal.force_retry('5ms')") + return err + }) + require.GreaterOrEqual(t, callCount, 2) + }) + t.Run("respects max retries", func(t *testing.T) { + callCount := 0 + runner.MaxTxnRetries = 5 + f := &mockFataler{} + runner.RunWithRetriableTxn(f, func(txn *gosql.Tx) error { + callCount++ + _, err := txn.Exec("SELECT crdb_internal.force_retry('5h')") + return err + }) + require.Equal(t, callCount, runner.MaxTxnRetries+1) + require.Contains(t, f.err, "restart transaction") + require.Contains(t, f.err, "retries exhausted") + }) +} + +type mockFataler struct { + err string +} + +func (f *mockFataler) Fatalf(s string, args ...interface{}) { + f.err = fmt.Sprintf(s, args...) +}