Skip to content

Commit

Permalink
Merge #107954
Browse files Browse the repository at this point in the history
107954: schema: allow transaction retry in schema changer backfill r=fqazi a=ecwall

Fixes #106696

This change adds a test to simulate transaction retries during the schema
changer backfill.

Some retries caused `ingested key collides with an existing one` errors
during addSSTable because the schema change's source data changed.
For example: `CREATE TABLE t AS SELECT nextval('seq');` - `nextval` is
non-transactional so each time it is selected, the value changes.

This change also allows the schema changer to specify that addSSTable is
allowed to shadow (replace) keys' values if they are different to handle
this case.

Release note: None

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Evan Wall <[email protected]>
  • Loading branch information
3 people committed Aug 15, 2023
2 parents a08e6c8 + 514b44d commit aba50b9
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ go_test(
"//pkg/testutils/distsqlutils",
"//pkg/testutils/fingerprintutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/kvclientutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -142,20 +143,28 @@ type clusterCfg struct {
beforeVersion string
testingKnobCfg string
defaultTestTenant base.DefaultTestTenantOptions
randomTxnRetries bool
}

func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error {
params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODirConfig = cfg.ioConf

params.ServerArgs.DefaultTestTenant = cfg.defaultTestTenant
var transactionRetryFilter func(*kv.Txn) bool
if cfg.randomTxnRetries {
transactionRetryFilter = kvclientutils.RandomTransactionRetryFilter()
}
params.ServerArgs.Knobs = base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
TenantTestingKnobs: &sql.TenantTestingKnobs{
// The tests in this package are particular about the tenant IDs
// they get in CREATE TENANT.
EnableTenantIDReuse: true,
},
KVClient: &kvcoord.ClientTestingKnobs{
TransactionRetryFilter: transactionRetryFilter,
},
}

settings := cluster.MakeTestingClusterSettings()
Expand Down Expand Up @@ -513,6 +522,9 @@ func runTestDataDriven(t *testing.T, testFilePathFromWorkspace string) {
defaultTestTenant = base.TODOTestTenantDisabled
}

// TODO(ssd): Once TestServer starts up reliably enough:
// randomTxnRetries := !d.HasArg("disable-txn-retries")
randomTxnRetries := false
lastCreatedCluster = name
cfg := clusterCfg{
name: name,
Expand All @@ -524,6 +536,7 @@ func runTestDataDriven(t *testing.T, testFilePathFromWorkspace string) {
beforeVersion: beforeVersion,
testingKnobCfg: testingKnobCfg,
defaultTestTenant: defaultTestTenant,
randomTxnRetries: randomTxnRetries,
}
err := ds.addCluster(t, cfg)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,13 @@ func (db *DB) TxnRootKV(ctx context.Context, retryable func(context.Context, *Tx
// runTxn runs the given retryable transaction function using the given *Txn.
func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn) error) error {
err := txn.exec(ctx, func(ctx context.Context, txn *Txn) error {
return retryable(ctx, txn)
if err := retryable(ctx, txn); err != nil {
return err
}
if txn.TestingShouldRetry() {
return txn.GenerateForcedRetryableErr(ctx, "injected retriable error")
}
return nil
})
if err != nil {
if rollbackErr := txn.Rollback(ctx); rollbackErr != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/pprofutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/shuffle",
"//pkg/util/startup",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvcoord

import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
)

Expand Down Expand Up @@ -57,6 +58,10 @@ type ClientTestingKnobs struct {
// error which, if non-nil, becomes the result of the batch. Otherwise, execution
// continues.
OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error

// TransactionRetryFilter allows transaction retry loops to inject retriable
// errors.
TransactionRetryFilter func(*kv.Txn) bool
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
29 changes: 27 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand All @@ -34,6 +37,16 @@ const (
OpTxnCoordSender = "txn coordinator send"
)

// DisableCommitSanityCheck allows opting out of a fatal assertion error that was observed in the wild
// and for which a root cause is not yet available.
//
// See: https://github.com/cockroachdb/cockroach/pull/73512.
var DisableCommitSanityCheck = envutil.EnvOrDefaultBool("COCKROACH_DISABLE_COMMIT_SANITY_CHECK", false)

// forceTxnRetries enables random transaction retries for test builds
// even if they aren't enabled via testing knobs.
var forceTxnRetries = envutil.EnvOrDefaultBool("COCKROACH_FORCE_RANDOM_TXN_RETRIES", false)

// txnState represents states relating to whether an EndTxn request needs
// to be sent.
//
Expand Down Expand Up @@ -1370,9 +1383,9 @@ func (tc *TxnCoordSender) TestingCloneTxn() *roachpb.Transaction {
func (tc *TxnCoordSender) Step(ctx context.Context) error {
// TODO(nvanbenschoten): it should be possible to make this assertion, but
// the API is currently misused by the connExecutor. See #86162.
//if tc.typ != kv.RootTxn {
// if tc.typ != kv.RootTxn {
// return errors.AssertionFailedf("cannot step in non-root txn")
//}
// }
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.shouldStepReadTimestampLocked() {
Expand Down Expand Up @@ -1531,6 +1544,18 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool {
return tc.hasPerformedWritesLocked()
}

var randRetryRngSource, _ = randutil.NewLockedPseudoRand()

func (tc *TxnCoordSender) TestingShouldRetry(txn *kv.Txn) bool {
if filter := tc.testingKnobs.TransactionRetryFilter; filter != nil && filter(txn) {
return true
}
if forceTxnRetries && buildutil.CrdbTestBuild {
return randRetryRngSource.Float64() < kv.RandomTxnRetryProbability
}
return false
}

func (tc *TxnCoordSender) hasPerformedReadsLocked() bool {
return !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty()
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool {
panic("unimplemented")
}

// TestingShouldRetry is part of TxnSenderFactory.
func (m *MockTransactionalSender) TestingShouldRetry(*Txn) bool {
return false
}

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
senderFunc func(context.Context, *roachpb.Transaction, *kvpb.BatchRequest) (
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,13 @@ type TxnSender interface {

// HasPerformedWrites returns true if a write has been performed.
HasPerformedWrites() bool

// TestingShouldRetry returns true if
// transaction retry errors should be randomly returned to
// callers. Note, it is the responsibility of (*kv.DB).Txn()
// to return the retries. This lives here since the TxnSender
// is what has access to the server's testing knobs.
TestingShouldRetry(*Txn) bool
}

// SteppingMode is the argument type to ConfigureStepping.
Expand Down
21 changes: 18 additions & 3 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,10 @@ func (txn *Txn) SetDebugName(name string) {
func (txn *Txn) DebugName() string {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.debugNameLocked()
return txn.DebugNameLocked()
}

func (txn *Txn) debugNameLocked() string {
func (txn *Txn) DebugNameLocked() string {
return fmt.Sprintf("%s (id: %s)", txn.mu.debugName, txn.mu.ID)
}

Expand Down Expand Up @@ -1035,7 +1035,7 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) error {
retryErr, "PrepareForRetry() called on leaf txn"), ctx)
}
log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s",
txn.debugNameLocked(), retryErr)
txn.DebugNameLocked(), retryErr)
txn.resetDeadlineLocked()

if txn.mu.ID != retryErr.TxnID {
Expand Down Expand Up @@ -1430,6 +1430,21 @@ func (txn *Txn) GenerateForcedRetryableErr(ctx context.Context, msg redact.Redac
return txn.mu.sender.GenerateForcedRetryableErr(ctx, now.ToTimestamp(), msg)
}

const RandomTxnRetryProbability = 0.1

// TestingShouldRetry returns true if we should generate a
// random, retriable error for this transaction.
func (txn *Txn) TestingShouldRetry() bool {
txn.mu.Lock()
defer txn.mu.Unlock()

if txn.mu.sender.ClientFinalized() {
return false
}

return txn.mu.sender.TestingShouldRetry(txn)
}

// IsSerializablePushAndRefreshNotPossible returns true if the transaction is
// serializable, its timestamp has been pushed and there's no chance that
// refreshing the read spans will succeed later (thus allowing the transaction
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ go_test(
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/kvclientutils",
"//pkg/testutils/pgtest",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
Expand Down
63 changes: 60 additions & 3 deletions pkg/sql/create_as_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -122,6 +124,8 @@ func TestCreateAsVTable(t *testing.T) {
waitForJobsSuccess(t, sqlRunner)
}

// TestCreateAsVTable verifies that SHOW commands can be used as the source of
// CREATE TABLE AS and CREATE MATERIALIZED VIEW AS.
func TestCreateAsShow(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -323,6 +327,7 @@ AND status != 'succeeded'`
sqlRunner.CheckQueryResultsRetry(t, query, [][]string{})
}

// TestFormat verifies the statement in the schema change job description.
func TestFormat(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -355,9 +360,9 @@ func TestFormat(t *testing.T) {
}

ctx := context.Background()
testCluster := serverutils.StartCluster(t, 1, base.TestClusterArgs{})
defer testCluster.Stopper().Stop(ctx)
sqlRunner := sqlutils.MakeSQLRunner(testCluster.ServerConn(0))
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
sqlRunner := sqlutils.MakeSQLRunner(db)
var p parser.Parser

for _, tc := range testCases {
Expand Down Expand Up @@ -390,3 +395,55 @@ AND description LIKE 'CREATE%%%s%%'`,
})
}
}

// TestTransactionRetryError tests that the schema changer succeeds if there is
// a retryable transaction error.
func TestTransactionRetryError(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
desc string
setup string
query string
verifyResults func(sqlutils.Fataler, *sqlutils.SQLRunner)
}{
{
desc: "CREATE TABLE AS",
setup: "CREATE SEQUENCE seq",
query: "CREATE TABLE t AS SELECT nextval('seq')",
verifyResults: func(t sqlutils.Fataler, sqlRunner *sqlutils.SQLRunner) {
// Result should be 2 but is 3 because of this bug
// https://github.com/cockroachdb/cockroach/issues/78457.
sqlRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"3"}})
},
},
{
desc: "CREATE MATERIALIZED VIEW AS",
setup: "CREATE SEQUENCE seq",
query: "CREATE MATERIALIZED VIEW v AS SELECT nextval('seq')",
verifyResults: func(t sqlutils.Fataler, sqlRunner *sqlutils.SQLRunner) {
sqlRunner.CheckQueryResults(t, "SELECT * FROM v", [][]string{{"2"}})
},
},
}

ctx := context.Background()
for _, testCase := range testCases {
t.Run(testCase.desc, func(t *testing.T) {
filterFunc, verifyFunc := kvclientutils.PrefixTransactionRetryFilter(t, schemaChangerBackfillTxnDebugName, 1)
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
TransactionRetryFilter: filterFunc,
},
},
})
defer s.Stopper().Stop(ctx)
sqlRunner := sqlutils.MakeSQLRunner(db)
sqlRunner.Exec(t, testCase.setup)
sqlRunner.Exec(t, testCase.query)
verifyFunc()
testCase.verifyResults(t, sqlRunner)
})
}
}
7 changes: 7 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,13 @@ func (ief *InternalDB) txn(
if err != nil {
return err
}
// We check this testing condition here since a retry cannot be generated
// after a successful commit. Since we commit below, this is our last
// chance to generate a retry for users of (*InternalDB).Txn.
if kvTxn.TestingShouldRetry() {
return kvTxn.GenerateForcedRetryableErr(ctx, "injected retriable error")
}

return commitTxnFn(ctx)
}); descs.IsTwoVersionInvariantViolationError(err) {
continue
Expand Down
Loading

0 comments on commit aba50b9

Please sign in to comment.