Skip to content

Commit

Permalink
backupccl,sqlutils: retry explicit transaction to avoid test failure
Browse files Browse the repository at this point in the history
In recent CI runs, we've been seeing TestFullClusterBackup fail with:

    full_cluster_backup_restore_test.go:143: error executing 'COMMIT':
    pq: restart transaction: TransactionRetryWithProtoRefreshError:
    TransactionAbortedError(ABORT_REASON_ABORTED_RECORD_FOUND): "sql
    txn" meta={id=ec1de683 key=/Table/4/1/"maxroach0"/0
    iso=Serializable pri=0.03058324 epo=0 ts=1686254742.907452755,1
    min=1686254688.329813751,0 seq=8023} lock=true stat=ABORTED
    rts=1686254742.907452755,1 wto=false gul=1686254688.829813751,0
    int=2006

This looks to be the result of a recent change to issue all of the
CREATE USER statements in a single transaction. Since explicit
transactions are only automatically retried in specific circumstances,
we now have to deal with the retry error.

Here, we introduce a small helper that retries anything that looks
like a restart error.

Note that I haven't been able to reproduce the above locally as the
test is very heavyweight and difficult to stress.

Epic: none

Release note: None
  • Loading branch information
stevendanna committed Aug 17, 2023
1 parent 7395d13 commit 761822d
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 68 deletions.
31 changes: 20 additions & 11 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package backupccl

import (
"context"
gosql "database/sql"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -135,12 +136,17 @@ CREATE TABLE data2.foo (a int);
numUsers = 10
}

sqlDB.Exec(t, "BEGIN")
for i := 0; i < numUsers; i++ {
sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i))
sqlDB.Exec(t, fmt.Sprintf("ALTER USER maxroach%d CREATEDB", i))
}
sqlDB.Exec(t, "COMMIT")
sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
for i := 0; i < numUsers; i++ {
if _, err := txn.Exec(fmt.Sprintf("CREATE USER maxroach%d", i)); err != nil {
return err
}
if _, err := txn.Exec(fmt.Sprintf("ALTER USER maxroach%d CREATEDB", i)); err != nil {
return err
}
}
return nil
})

// Populate system.zones.
sqlDB.Exec(t, `ALTER TABLE data.bank CONFIGURE ZONE USING gc.ttlseconds = 3600`)
Expand Down Expand Up @@ -625,11 +631,14 @@ func TestClusterRestoreFailCleanup(t *testing.T) {

// Setup the system systemTablesToVerify to ensure that they are copied to the new cluster.
// Populate system.users.
sqlDB.Exec(t, "BEGIN")
for i := 0; i < 1000; i++ {
sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i))
}
sqlDB.Exec(t, "COMMIT")
sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
for i := 0; i < 1000; i++ {
if _, err := txn.Exec(fmt.Sprintf("CREATE USER maxroach%d", i)); err != nil {
return err
}
}
return nil
})
sqlDB.Exec(t, `BACKUP TO 'nodelocal://1/missing-ssts'`)

// Bugger the backup by removing the SST files. (Note this messes up all of
Expand Down
60 changes: 3 additions & 57 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"os"
"path/filepath"
"reflect"
"regexp"
"runtime/pprof"
"sort"
"strings"
Expand Down Expand Up @@ -643,54 +642,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)

r, err := regexp.Compile("retry txn")
require.NoError(t, err)

executeWithRetriableTxn := func(db *gosql.DB, fn func(txn *gosql.Tx) error) error {
txn, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err != nil {
_ = txn.Rollback()
}

}()

_, err = txn.Exec("SAVEPOINT cockroach_restart")
if err != nil {
return err
}

maxRetries := 10
retryCount := 0
for {
err = fn(txn)
if err == nil {
_, err = txn.Exec("RELEASE SAVEPOINT cockroach_restart")
if err == nil {
return txn.Commit()
}
}

if !r.MatchString(err.Error()) {
return err
}

_, rollbackErr := txn.Exec("ROLLBACK TO SAVEPOINT cockroach_restart")
if rollbackErr != nil {
return errors.CombineErrors(rollbackErr, err)
}

retryCount++
if retryCount > maxRetries {
return errors.Wrap(err, "retries exhausted")
}
}
}

rts.sqlDB.MaxTxnRetries = 10
// Rollback a CANCEL.
{
txn, err := rts.outerDB.Begin()
Expand Down Expand Up @@ -728,7 +680,7 @@ func TestRegistryLifecycle(t *testing.T) {
}
// Now pause it for reals.
{
err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error {
rts.sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
if _, err := txn.Exec("PAUSE JOB $1", job.ID()); err != nil {
return err
}
Expand All @@ -737,9 +689,6 @@ func TestRegistryLifecycle(t *testing.T) {
rts.check(t, "")
return nil
})
if err != nil {
t.Fatal(err)
}
rts.check(t, jobs.StatusPaused)
}
// Rollback a RESUME.
Expand All @@ -758,7 +707,7 @@ func TestRegistryLifecycle(t *testing.T) {
}
// Commit a RESUME.
{
err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error {
rts.sqlDB.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
if _, err := txn.Exec("RESUME JOB $1", job.ID()); err != nil {
return err
}
Expand All @@ -767,9 +716,6 @@ func TestRegistryLifecycle(t *testing.T) {
rts.check(t, "")
return nil
})
if err != nil {
t.Fatal(err)
}
}
rts.mu.e.ResumeStart = true
rts.check(t, jobs.StatusRunning)
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/sqlutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_test(
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
Expand Down
73 changes: 73 additions & 0 deletions pkg/testutils/sqlutils/sql_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
type SQLRunner struct {
DB DBHandle
SucceedsSoonDuration time.Duration // defaults to testutils.DefaultSucceedsSoonDuration
MaxTxnRetries int // defaults to 0 for unlimited retries
}

// DBHandle is an interface that applies to *gosql.DB, *gosql.Conn, and
Expand Down Expand Up @@ -319,6 +320,78 @@ func (sr *SQLRunner) CheckQueryResultsRetry(t Fataler, query string, expected []
})
}

type beginner interface {
Begin() (*gosql.Tx, error)
}

// Begin starts a new transaction. It fails if the underlying DBHandle
// doesn't support starting transactions or if starting the
// transaction fails.
func (sr *SQLRunner) Begin(t Fataler) *gosql.Tx {
helperOrNoop(t)()
b, ok := sr.DB.(beginner)
if !ok {
t.Fatalf("Begin() not supported by this SQLRunner")
}

txn, err := b.Begin()
if err != nil {
t.Fatalf("%v", err)
}
return txn
}

// RunWithRetriableTxn starts a transaction and runs the given
// function in the context of that transaction. The transaction is
// commited when the given fuction returns and is automatically
// retried if it hits a retriable error.
func (sr *SQLRunner) RunWithRetriableTxn(t Fataler, fn func(*gosql.Tx) error) {
if err := sr.runWithRetriableTxnImpl(t, fn); err != nil {
t.Fatalf("%v", err)
}
}

const retryTxnErrorSubstring = "restart transaction"

func (sr *SQLRunner) runWithRetriableTxnImpl(t Fataler, fn func(*gosql.Tx) error) (err error) {
txn := sr.Begin(t)
defer func() {
if err != nil {
_ = txn.Rollback()
}

}()
_, err = txn.Exec("SAVEPOINT cockroach_restart")
if err != nil {
return err
}

retryCount := 0
for {
err = fn(txn)
if err == nil {
_, err = txn.Exec("RELEASE SAVEPOINT cockroach_restart")
if err == nil {
return txn.Commit()
}
}

if !strings.Contains(err.Error(), retryTxnErrorSubstring) {
return err
}

_, rollbackErr := txn.Exec("ROLLBACK TO SAVEPOINT cockroach_restart")
if rollbackErr != nil {
return errors.CombineErrors(rollbackErr, err)
}

retryCount++
if sr.MaxTxnRetries > 0 && retryCount > sr.MaxTxnRetries {
return errors.Wrapf(err, "%d retries exhausted", sr.MaxTxnRetries)
}
}
}

// RoundRobinDBHandle aggregates multiple DBHandles into a single one; each time
// a query is issued, a handle is selected in round-robin fashion.
type RoundRobinDBHandle struct {
Expand Down
49 changes: 49 additions & 0 deletions pkg/testutils/sqlutils/sql_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ package sqlutils_test

import (
"context"
gosql "database/sql"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// Test that the RowsToStrMatrix doesn't swallow errors.
func TestRowsToStrMatrixError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())
Expand All @@ -39,3 +44,47 @@ func TestRowsToStrMatrixError(t *testing.T) {
t.Fatalf("expected 'testing error', got: %v", err)
}
}

// TestRunWithRetriableTxn tests that we actually do retry the
// transaction.
func TestRunWithRetriableTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())

runner := sqlutils.MakeSQLRunner(db)
runner.Exec(t, "SET inject_retry_errors_enabled=true")

t.Run("retries transaction restart errors", func(t *testing.T) {
callCount := 0
runner.RunWithRetriableTxn(t, func(txn *gosql.Tx) error {
callCount++
_, err := txn.Exec("SELECT crdb_internal.force_retry('5ms')")
return err
})
require.GreaterOrEqual(t, callCount, 2)
})
t.Run("respects max retries", func(t *testing.T) {
callCount := 0
runner.MaxTxnRetries = 5
f := &mockFataler{}
runner.RunWithRetriableTxn(f, func(txn *gosql.Tx) error {
callCount++
_, err := txn.Exec("SELECT crdb_internal.force_retry('5h')")
return err
})
require.Equal(t, callCount, runner.MaxTxnRetries+1)
require.Contains(t, f.err, "restart transaction")
require.Contains(t, f.err, "retries exhausted")
})
}

type mockFataler struct {
err string
}

func (f *mockFataler) Fatalf(s string, args ...interface{}) {
f.err = fmt.Sprintf(s, args...)
}

0 comments on commit 761822d

Please sign in to comment.