Skip to content

Commit

Permalink
Merge pull request #108988 from stevendanna/backport-release-23.1.9-r…
Browse files Browse the repository at this point in the history
…c-107334

release-23.1.9-rc: backupccl: possibly deflake TestBackupRestoreAppend
  • Loading branch information
stevendanna authored Aug 18, 2023
2 parents 7992305 + 66a402d commit dc298cc
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 70 deletions.
12 changes: 8 additions & 4 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ func TestBackupRestoreAppend(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStressRace(t, "test is too large to run under stress race")
skip.UnderStress(t, "test is too large to run under stress")
skip.UnderRace(t, "test is too large to run under race")

const numAccounts = 1000
ctx := context.Background()
Expand Down Expand Up @@ -659,15 +660,18 @@ func TestBackupRestoreAppend(t *testing.T) {
sqlDB.ExpectErr(t, "A full backup cannot be written to \"/subdir\", a user defined subdirectory",
"BACKUP INTO $4 IN ($1, $2, $3) AS OF SYSTEM TIME "+tsBefore, append(test.collectionsWithSubdir, specifiedSubdir)...)

sqlDB.QueryRow(t, "UPDATE data.bank SET balance = 100 RETURNING cluster_logical_timestamp()").Scan(&ts1)
sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
return txn.QueryRow("UPDATE data.bank SET balance = 100 RETURNING cluster_logical_timestamp()").Scan(&ts1)
})
sqlDB.Exec(t, "BACKUP INTO LATEST IN ($1, $2, $3) AS OF SYSTEM TIME "+ts1, test.collections...)

// Append to latest again, just to prove we can append to an appended one and
// that appended didn't e.g. mess up LATEST.
sqlDB.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts1again)
sqlDB.Exec(t, "BACKUP INTO LATEST IN ($1, $2, $3) AS OF SYSTEM TIME "+ts1again, test.collections...)

sqlDB.QueryRow(t, "UPDATE data.bank SET balance = 200 RETURNING cluster_logical_timestamp()").Scan(&ts2)
sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
return txn.QueryRow("UPDATE data.bank SET balance = 200 RETURNING cluster_logical_timestamp()").Scan(&ts2)
})
rowsTS2 := sqlDB.QueryStr(t, "SELECT * from data.bank ORDER BY id")

// Start a new full-backup in the collection version.
Expand Down
32 changes: 23 additions & 9 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package backupccl

import (
"context"
gosql "database/sql"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -134,10 +135,19 @@ CREATE TABLE data2.foo (a int);
if util.RaceEnabled {
numUsers = 10
}
for i := 0; i < numUsers; i++ {
sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i))
sqlDB.Exec(t, fmt.Sprintf("ALTER USER maxroach%d CREATEDB", i))
}

sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
for i := 0; i < numUsers; i++ {
if _, err := txn.Exec(fmt.Sprintf("CREATE USER maxroach%d", i)); err != nil {
return err
}
if _, err := txn.Exec(fmt.Sprintf("ALTER USER maxroach%d CREATEDB", i)); err != nil {
return err
}
}
return nil
})

// Populate system.zones.
sqlDB.Exec(t, `ALTER TABLE data.bank CONFIGURE ZONE USING gc.ttlseconds = 3600`)
sqlDB.Exec(t, `ALTER TABLE defaultdb.foo CONFIGURE ZONE USING gc.ttlseconds = 45`)
Expand Down Expand Up @@ -621,10 +631,14 @@ func TestClusterRestoreFailCleanup(t *testing.T) {

// Setup the system systemTablesToVerify to ensure that they are copied to the new cluster.
// Populate system.users.
for i := 0; i < 1000; i++ {
sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i))
}

sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
for i := 0; i < 1000; i++ {
if _, err := txn.Exec(fmt.Sprintf("CREATE USER maxroach%d", i)); err != nil {
return err
}
}
return nil
})
sqlDB.Exec(t, `BACKUP TO 'nodelocal://1/missing-ssts'`)

// Bugger the backup by removing the SST files. (Note this messes up all of
Expand Down Expand Up @@ -1082,7 +1096,7 @@ func TestRestoreWithRecreatedDefaultDB(t *testing.T) {

sqlDB.Exec(t, `
DROP DATABASE defaultdb;
CREATE DATABASE defaultdb;
CREATE DATABASE defaultdb;
`)
sqlDB.Exec(t, `BACKUP TO $1`, localFoo)

Expand Down
60 changes: 3 additions & 57 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"os"
"path/filepath"
"reflect"
"regexp"
"runtime/pprof"
"sort"
"strings"
Expand Down Expand Up @@ -644,54 +643,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)

r, err := regexp.Compile("retry txn")
require.NoError(t, err)

executeWithRetriableTxn := func(db *gosql.DB, fn func(txn *gosql.Tx) error) error {
txn, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err != nil {
_ = txn.Rollback()
}

}()

_, err = txn.Exec("SAVEPOINT cockroach_restart")
if err != nil {
return err
}

maxRetries := 10
retryCount := 0
for {
err = fn(txn)
if err == nil {
_, err = txn.Exec("RELEASE SAVEPOINT cockroach_restart")
if err == nil {
return txn.Commit()
}
}

if !r.MatchString(err.Error()) {
return err
}

_, rollbackErr := txn.Exec("ROLLBACK TO SAVEPOINT cockroach_restart")
if rollbackErr != nil {
return errors.CombineErrors(rollbackErr, err)
}

retryCount++
if retryCount > maxRetries {
return errors.Wrap(err, "retries exhausted")
}
}
}

rts.sqlDB.MaxTxnRetries = 10
// Rollback a CANCEL.
{
txn, err := rts.outerDB.Begin()
Expand Down Expand Up @@ -729,7 +681,7 @@ func TestRegistryLifecycle(t *testing.T) {
}
// Now pause it for reals.
{
err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error {
rts.sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
if _, err := txn.Exec("PAUSE JOB $1", job.ID()); err != nil {
return err
}
Expand All @@ -738,9 +690,6 @@ func TestRegistryLifecycle(t *testing.T) {
rts.check(t, "")
return nil
})
if err != nil {
t.Fatal(err)
}
rts.check(t, jobs.StatusPaused)
}
// Rollback a RESUME.
Expand All @@ -759,7 +708,7 @@ func TestRegistryLifecycle(t *testing.T) {
}
// Commit a RESUME.
{
err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error {
rts.sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
if _, err := txn.Exec("RESUME JOB $1", job.ID()); err != nil {
return err
}
Expand All @@ -768,9 +717,6 @@ func TestRegistryLifecycle(t *testing.T) {
rts.check(t, "")
return nil
})
if err != nil {
t.Fatal(err)
}
}
rts.mu.e.ResumeStart = true
rts.check(t, jobs.StatusRunning)
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/sqlutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_test(
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
Expand Down
73 changes: 73 additions & 0 deletions pkg/testutils/sqlutils/sql_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
type SQLRunner struct {
DB DBHandle
SucceedsSoonDuration time.Duration // defaults to testutils.DefaultSucceedsSoonDuration
MaxTxnRetries int // defaults to 0 for unlimited retries
}

// DBHandle is an interface that applies to *gosql.DB, *gosql.Conn, and
Expand Down Expand Up @@ -319,6 +320,78 @@ func (sr *SQLRunner) CheckQueryResultsRetry(t Fataler, query string, expected []
})
}

type beginner interface {
Begin() (*gosql.Tx, error)
}

// Begin starts a new transaction. It fails if the underlying DBHandle
// doesn't support starting transactions or if starting the
// transaction fails.
func (sr *SQLRunner) Begin(t Fataler) *gosql.Tx {
helperOrNoop(t)()
b, ok := sr.DB.(beginner)
if !ok {
t.Fatalf("Begin() not supported by this SQLRunner")
}

txn, err := b.Begin()
if err != nil {
t.Fatalf("%v", err)
}
return txn
}

// RunWithRetriableTxn starts a transaction and runs the given
// function in the context of that transaction. The transaction is
// commited when the given fuction returns and is automatically
// retried if it hits a retriable error.
func (sr *SQLRunner) RunWithRetriableTxn(t Fataler, fn func(*gosql.Tx) error) {
if err := sr.runWithRetriableTxnImpl(t, fn); err != nil {
t.Fatalf("%v", err)
}
}

const retryTxnErrorSubstring = "restart transaction"

func (sr *SQLRunner) runWithRetriableTxnImpl(t Fataler, fn func(*gosql.Tx) error) (err error) {
txn := sr.Begin(t)
defer func() {
if err != nil {
_ = txn.Rollback()
}

}()
_, err = txn.Exec("SAVEPOINT cockroach_restart")
if err != nil {
return err
}

retryCount := 0
for {
err = fn(txn)
if err == nil {
_, err = txn.Exec("RELEASE SAVEPOINT cockroach_restart")
if err == nil {
return txn.Commit()
}
}

if !strings.Contains(err.Error(), retryTxnErrorSubstring) {
return err
}

_, rollbackErr := txn.Exec("ROLLBACK TO SAVEPOINT cockroach_restart")
if rollbackErr != nil {
return errors.CombineErrors(rollbackErr, err)
}

retryCount++
if sr.MaxTxnRetries > 0 && retryCount > sr.MaxTxnRetries {
return errors.Wrapf(err, "%d retries exhausted", sr.MaxTxnRetries)
}
}
}

// RoundRobinDBHandle aggregates multiple DBHandles into a single one; each time
// a query is issued, a handle is selected in round-robin fashion.
type RoundRobinDBHandle struct {
Expand Down
49 changes: 49 additions & 0 deletions pkg/testutils/sqlutils/sql_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ package sqlutils_test

import (
"context"
gosql "database/sql"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// Test that the RowsToStrMatrix doesn't swallow errors.
func TestRowsToStrMatrixError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())
Expand All @@ -39,3 +44,47 @@ func TestRowsToStrMatrixError(t *testing.T) {
t.Fatalf("expected 'testing error', got: %v", err)
}
}

// TestRunWithRetriableTxn tests that we actually do retry the
// transaction.
func TestRunWithRetriableTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())

runner := sqlutils.MakeSQLRunner(db)
runner.Exec(t, "SET inject_retry_errors_enabled=true")

t.Run("retries transaction restart errors", func(t *testing.T) {
callCount := 0
runner.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
callCount++
_, err := txn.Exec("SELECT crdb_internal.force_retry('5ms')")
return err
})
require.GreaterOrEqual(t, callCount, 2)
})
t.Run("respects max retries", func(t *testing.T) {
callCount := 0
runner.MaxTxnRetries = 5
f := &mockFataler{}
runner.RunWithRetriableTxn(f, func(txn *gosql.Tx) error {
callCount++
_, err := txn.Exec("SELECT crdb_internal.force_retry('5h')")
return err
})
require.Equal(t, callCount, runner.MaxTxnRetries+1)
require.Contains(t, f.err, "restart transaction")
require.Contains(t, f.err, "retries exhausted")
})
}

type mockFataler struct {
err string
}

func (f *mockFataler) Fatalf(s string, args ...interface{}) {
f.err = fmt.Sprintf(s, args...)
}

0 comments on commit dc298cc

Please sign in to comment.