Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema: allow transaction retry in schema changer backfill #107954

Merged
merged 5 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ go_test(
"//pkg/testutils/distsqlutils",
"//pkg/testutils/fingerprintutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/kvclientutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -142,20 +143,28 @@ type clusterCfg struct {
beforeVersion string
testingKnobCfg string
defaultTestTenant base.DefaultTestTenantOptions
randomTxnRetries bool
}

func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error {
params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODirConfig = cfg.ioConf

params.ServerArgs.DefaultTestTenant = cfg.defaultTestTenant
var transactionRetryFilter func(*kv.Txn) bool
if cfg.randomTxnRetries {
transactionRetryFilter = kvclientutils.RandomTransactionRetryFilter()
}
params.ServerArgs.Knobs = base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
TenantTestingKnobs: &sql.TenantTestingKnobs{
// The tests in this package are particular about the tenant IDs
// they get in CREATE TENANT.
EnableTenantIDReuse: true,
},
KVClient: &kvcoord.ClientTestingKnobs{
TransactionRetryFilter: transactionRetryFilter,
},
}

settings := cluster.MakeTestingClusterSettings()
Expand Down Expand Up @@ -513,6 +522,9 @@ func runTestDataDriven(t *testing.T, testFilePathFromWorkspace string) {
defaultTestTenant = base.TODOTestTenantDisabled
}

// TODO(ssd): Once TestServer starts up reliably enough:
// randomTxnRetries := !d.HasArg("disable-txn-retries")
randomTxnRetries := false
lastCreatedCluster = name
cfg := clusterCfg{
name: name,
Expand All @@ -524,6 +536,7 @@ func runTestDataDriven(t *testing.T, testFilePathFromWorkspace string) {
beforeVersion: beforeVersion,
testingKnobCfg: testingKnobCfg,
defaultTestTenant: defaultTestTenant,
randomTxnRetries: randomTxnRetries,
}
err := ds.addCluster(t, cfg)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,13 @@ func (db *DB) TxnRootKV(ctx context.Context, retryable func(context.Context, *Tx
// runTxn runs the given retryable transaction function using the given *Txn.
func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn) error) error {
err := txn.exec(ctx, func(ctx context.Context, txn *Txn) error {
return retryable(ctx, txn)
if err := retryable(ctx, txn); err != nil {
return err
}
if txn.TestingShouldRetry() {
return txn.GenerateForcedRetryableErr(ctx, "injected retriable error")
}
return nil
})
if err != nil {
if rollbackErr := txn.Rollback(ctx); rollbackErr != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/pprofutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/shuffle",
"//pkg/util/startup",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvcoord

import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
)

Expand Down Expand Up @@ -57,6 +58,10 @@ type ClientTestingKnobs struct {
// error which, if non-nil, becomes the result of the batch. Otherwise, execution
// continues.
OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error

// TransactionRetryFilter allows transaction retry loops to inject retriable
// errors.
TransactionRetryFilter func(*kv.Txn) bool
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
29 changes: 27 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand All @@ -34,6 +37,16 @@ const (
OpTxnCoordSender = "txn coordinator send"
)

// DisableCommitSanityCheck allows opting out of a fatal assertion error that was observed in the wild
// and for which a root cause is not yet available.
//
// See: https://github.com/cockroachdb/cockroach/pull/73512.
var DisableCommitSanityCheck = envutil.EnvOrDefaultBool("COCKROACH_DISABLE_COMMIT_SANITY_CHECK", false)

// forceTxnRetries enables random transaction retries for test builds
// even if they aren't enabled via testing knobs.
var forceTxnRetries = envutil.EnvOrDefaultBool("COCKROACH_FORCE_RANDOM_TXN_RETRIES", false)

// txnState represents states relating to whether an EndTxn request needs
// to be sent.
//
Expand Down Expand Up @@ -1370,9 +1383,9 @@ func (tc *TxnCoordSender) TestingCloneTxn() *roachpb.Transaction {
func (tc *TxnCoordSender) Step(ctx context.Context) error {
// TODO(nvanbenschoten): it should be possible to make this assertion, but
// the API is currently misused by the connExecutor. See #86162.
//if tc.typ != kv.RootTxn {
// if tc.typ != kv.RootTxn {
// return errors.AssertionFailedf("cannot step in non-root txn")
//}
// }
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.shouldStepReadTimestampLocked() {
Expand Down Expand Up @@ -1531,6 +1544,18 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool {
return tc.hasPerformedWritesLocked()
}

var randRetryRngSource, _ = randutil.NewLockedPseudoRand()

func (tc *TxnCoordSender) TestingShouldRetry(txn *kv.Txn) bool {
if filter := tc.testingKnobs.TransactionRetryFilter; filter != nil && filter(txn) {
return true
}
if forceTxnRetries && buildutil.CrdbTestBuild {
return randRetryRngSource.Float64() < kv.RandomTxnRetryProbability
}
return false
}

func (tc *TxnCoordSender) hasPerformedReadsLocked() bool {
return !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty()
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool {
panic("unimplemented")
}

// TestingShouldRetry is part of TxnSenderFactory.
func (m *MockTransactionalSender) TestingShouldRetry(*Txn) bool {
return false
}

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
senderFunc func(context.Context, *roachpb.Transaction, *kvpb.BatchRequest) (
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,13 @@ type TxnSender interface {

// HasPerformedWrites returns true if a write has been performed.
HasPerformedWrites() bool

// TestingShouldRetry returns true if
// transaction retry errors should be randomly returned to
// callers. Note, it is the responsibility of (*kv.DB).Txn()
// to return the retries. This lives here since the TxnSender
// is what has access to the server's testing knobs.
TestingShouldRetry(*Txn) bool
}

// SteppingMode is the argument type to ConfigureStepping.
Expand Down
21 changes: 18 additions & 3 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,10 @@ func (txn *Txn) SetDebugName(name string) {
func (txn *Txn) DebugName() string {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.debugNameLocked()
return txn.DebugNameLocked()
}

func (txn *Txn) debugNameLocked() string {
func (txn *Txn) DebugNameLocked() string {
return fmt.Sprintf("%s (id: %s)", txn.mu.debugName, txn.mu.ID)
}

Expand Down Expand Up @@ -1035,7 +1035,7 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) error {
retryErr, "PrepareForRetry() called on leaf txn"), ctx)
}
log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s",
txn.debugNameLocked(), retryErr)
txn.DebugNameLocked(), retryErr)
txn.resetDeadlineLocked()

if txn.mu.ID != retryErr.TxnID {
Expand Down Expand Up @@ -1430,6 +1430,21 @@ func (txn *Txn) GenerateForcedRetryableErr(ctx context.Context, msg redact.Redac
return txn.mu.sender.GenerateForcedRetryableErr(ctx, now.ToTimestamp(), msg)
}

const RandomTxnRetryProbability = 0.1

// TestingShouldRetry returns true if we should generate a
// random, retriable error for this transaction.
func (txn *Txn) TestingShouldRetry() bool {
txn.mu.Lock()
defer txn.mu.Unlock()

if txn.mu.sender.ClientFinalized() {
return false
}

return txn.mu.sender.TestingShouldRetry(txn)
}

// IsSerializablePushAndRefreshNotPossible returns true if the transaction is
// serializable, its timestamp has been pushed and there's no chance that
// refreshing the read spans will succeed later (thus allowing the transaction
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ go_test(
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/kvclientutils",
"//pkg/testutils/pgtest",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
Expand Down
63 changes: 60 additions & 3 deletions pkg/sql/create_as_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -122,6 +124,8 @@ func TestCreateAsVTable(t *testing.T) {
waitForJobsSuccess(t, sqlRunner)
}

// TestCreateAsVTable verifies that SHOW commands can be used as the source of
// CREATE TABLE AS and CREATE MATERIALIZED VIEW AS.
func TestCreateAsShow(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -323,6 +327,7 @@ AND status != 'succeeded'`
sqlRunner.CheckQueryResultsRetry(t, query, [][]string{})
}

// TestFormat verifies the statement in the schema change job description.
func TestFormat(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -355,9 +360,9 @@ func TestFormat(t *testing.T) {
}

ctx := context.Background()
testCluster := serverutils.StartCluster(t, 1, base.TestClusterArgs{})
defer testCluster.Stopper().Stop(ctx)
sqlRunner := sqlutils.MakeSQLRunner(testCluster.ServerConn(0))
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
sqlRunner := sqlutils.MakeSQLRunner(db)
var p parser.Parser

for _, tc := range testCases {
Expand Down Expand Up @@ -390,3 +395,55 @@ AND description LIKE 'CREATE%%%s%%'`,
})
}
}

// TestTransactionRetryError tests that the schema changer succeeds if there is
// a retryable transaction error.
func TestTransactionRetryError(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
desc string
setup string
query string
verifyResults func(sqlutils.Fataler, *sqlutils.SQLRunner)
}{
{
desc: "CREATE TABLE AS",
setup: "CREATE SEQUENCE seq",
query: "CREATE TABLE t AS SELECT nextval('seq')",
verifyResults: func(t sqlutils.Fataler, sqlRunner *sqlutils.SQLRunner) {
// Result should be 2 but is 3 because of this bug
// https://github.com/cockroachdb/cockroach/issues/78457.
sqlRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"3"}})
},
},
{
desc: "CREATE MATERIALIZED VIEW AS",
setup: "CREATE SEQUENCE seq",
query: "CREATE MATERIALIZED VIEW v AS SELECT nextval('seq')",
verifyResults: func(t sqlutils.Fataler, sqlRunner *sqlutils.SQLRunner) {
sqlRunner.CheckQueryResults(t, "SELECT * FROM v", [][]string{{"2"}})
},
},
}

ctx := context.Background()
for _, testCase := range testCases {
t.Run(testCase.desc, func(t *testing.T) {
filterFunc, verifyFunc := kvclientutils.PrefixTransactionRetryFilter(t, schemaChangerBackfillTxnDebugName, 1)
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
TransactionRetryFilter: filterFunc,
},
},
})
defer s.Stopper().Stop(ctx)
sqlRunner := sqlutils.MakeSQLRunner(db)
sqlRunner.Exec(t, testCase.setup)
sqlRunner.Exec(t, testCase.query)
verifyFunc()
testCase.verifyResults(t, sqlRunner)
})
}
}
7 changes: 7 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,13 @@ func (ief *InternalDB) txn(
if err != nil {
return err
}
// We check this testing condition here since a retry cannot be generated
// after a successful commit. Since we commit below, this is our last
// chance to generate a retry for users of (*InternalDB).Txn.
if kvTxn.TestingShouldRetry() {
return kvTxn.GenerateForcedRetryableErr(ctx, "injected retriable error")
}

return commitTxnFn(ctx)
}); descs.IsTwoVersionInvariantViolationError(err) {
continue
Expand Down
Loading