diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 563739d89607..0d807c8c64aa 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -291,6 +291,7 @@ go_test( "//pkg/testutils/distsqlutils", "//pkg/testutils/fingerprintutils", "//pkg/testutils/jobutils", + "//pkg/testutils/kvclientutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index c18980c4ca5c..72e6231bf88c 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" + "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -142,6 +143,7 @@ type clusterCfg struct { beforeVersion string testingKnobCfg string defaultTestTenant base.DefaultTestTenantOptions + randomTxnRetries bool } func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error { @@ -149,6 +151,10 @@ func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error { params.ServerArgs.ExternalIODirConfig = cfg.ioConf params.ServerArgs.DefaultTestTenant = cfg.defaultTestTenant + var transactionRetryFilter func(*kv.Txn) bool + if cfg.randomTxnRetries { + transactionRetryFilter = kvclientutils.RandomTransactionRetryFilter() + } params.ServerArgs.Knobs = base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), TenantTestingKnobs: &sql.TenantTestingKnobs{ @@ -156,6 +162,9 @@ func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error { // they get in CREATE TENANT. EnableTenantIDReuse: true, }, + KVClient: &kvcoord.ClientTestingKnobs{ + TransactionRetryFilter: transactionRetryFilter, + }, } settings := cluster.MakeTestingClusterSettings() @@ -513,6 +522,9 @@ func runTestDataDriven(t *testing.T, testFilePathFromWorkspace string) { defaultTestTenant = base.TODOTestTenantDisabled } + // TODO(ssd): Once TestServer starts up reliably enough: + // randomTxnRetries := !d.HasArg("disable-txn-retries") + randomTxnRetries := false lastCreatedCluster = name cfg := clusterCfg{ name: name, @@ -524,6 +536,7 @@ func runTestDataDriven(t *testing.T, testFilePathFromWorkspace string) { beforeVersion: beforeVersion, testingKnobCfg: testingKnobCfg, defaultTestTenant: defaultTestTenant, + randomTxnRetries: randomTxnRetries, } err := ds.addCluster(t, cfg) if err != nil { diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 31543e46826f..2b1a9820edcd 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -994,7 +994,13 @@ func (db *DB) TxnRootKV(ctx context.Context, retryable func(context.Context, *Tx // runTxn runs the given retryable transaction function using the given *Txn. func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn) error) error { err := txn.exec(ctx, func(ctx context.Context, txn *Txn) error { - return retryable(ctx, txn) + if err := retryable(ctx, txn); err != nil { + return err + } + if txn.TestingShouldRetry() { + return txn.GenerateForcedRetryableErr(ctx, "injected retriable error") + } + return nil }) if err != nil { if rollbackErr := txn.Rollback(ctx); rollbackErr != nil { diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index ce457bf1386f..948ea31ebd6c 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -78,6 +78,7 @@ go_library( "//pkg/util/metric", "//pkg/util/pprofutil", "//pkg/util/quotapool", + "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/shuffle", "//pkg/util/startup", diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index a48297fad032..22f2318a89df 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -12,6 +12,7 @@ package kvcoord import ( "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" ) @@ -57,6 +58,10 @@ type ClientTestingKnobs struct { // error which, if non-nil, becomes the result of the batch. Otherwise, execution // continues. OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error + + // TransactionRetryFilter allows transaction retry loops to inject retriable + // errors. + TransactionRetryFilter func(*kv.Txn) bool } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index d2b695011432..c57bbb839271 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -19,8 +19,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -34,6 +37,16 @@ const ( OpTxnCoordSender = "txn coordinator send" ) +// DisableCommitSanityCheck allows opting out of a fatal assertion error that was observed in the wild +// and for which a root cause is not yet available. +// +// See: https://github.com/cockroachdb/cockroach/pull/73512. +var DisableCommitSanityCheck = envutil.EnvOrDefaultBool("COCKROACH_DISABLE_COMMIT_SANITY_CHECK", false) + +// forceTxnRetries enables random transaction retries for test builds +// even if they aren't enabled via testing knobs. +var forceTxnRetries = envutil.EnvOrDefaultBool("COCKROACH_FORCE_RANDOM_TXN_RETRIES", false) + // txnState represents states relating to whether an EndTxn request needs // to be sent. // @@ -1370,9 +1383,9 @@ func (tc *TxnCoordSender) TestingCloneTxn() *roachpb.Transaction { func (tc *TxnCoordSender) Step(ctx context.Context) error { // TODO(nvanbenschoten): it should be possible to make this assertion, but // the API is currently misused by the connExecutor. See #86162. - //if tc.typ != kv.RootTxn { + // if tc.typ != kv.RootTxn { // return errors.AssertionFailedf("cannot step in non-root txn") - //} + // } tc.mu.Lock() defer tc.mu.Unlock() if tc.shouldStepReadTimestampLocked() { @@ -1531,6 +1544,18 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool { return tc.hasPerformedWritesLocked() } +var randRetryRngSource, _ = randutil.NewLockedPseudoRand() + +func (tc *TxnCoordSender) TestingShouldRetry(txn *kv.Txn) bool { + if filter := tc.testingKnobs.TransactionRetryFilter; filter != nil && filter(txn) { + return true + } + if forceTxnRetries && buildutil.CrdbTestBuild { + return randRetryRngSource.Float64() < kv.RandomTxnRetryProbability + } + return false +} + func (tc *TxnCoordSender) hasPerformedReadsLocked() bool { return !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() } diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 038b6e72d868..0be57d7c89ea 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -251,6 +251,11 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool { panic("unimplemented") } +// TestingShouldRetry is part of TxnSenderFactory. +func (m *MockTransactionalSender) TestingShouldRetry(*Txn) bool { + return false +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, *kvpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 9a308c64a41e..9211e94c9c34 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -343,6 +343,13 @@ type TxnSender interface { // HasPerformedWrites returns true if a write has been performed. HasPerformedWrites() bool + + // TestingShouldRetry returns true if + // transaction retry errors should be randomly returned to + // callers. Note, it is the responsibility of (*kv.DB).Txn() + // to return the retries. This lives here since the TxnSender + // is what has access to the server's testing knobs. + TestingShouldRetry(*Txn) bool } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 4e0a3bb83cc7..480653fd1f3a 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -361,10 +361,10 @@ func (txn *Txn) SetDebugName(name string) { func (txn *Txn) DebugName() string { txn.mu.Lock() defer txn.mu.Unlock() - return txn.debugNameLocked() + return txn.DebugNameLocked() } -func (txn *Txn) debugNameLocked() string { +func (txn *Txn) DebugNameLocked() string { return fmt.Sprintf("%s (id: %s)", txn.mu.debugName, txn.mu.ID) } @@ -1035,7 +1035,7 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) error { retryErr, "PrepareForRetry() called on leaf txn"), ctx) } log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s", - txn.debugNameLocked(), retryErr) + txn.DebugNameLocked(), retryErr) txn.resetDeadlineLocked() if txn.mu.ID != retryErr.TxnID { @@ -1430,6 +1430,21 @@ func (txn *Txn) GenerateForcedRetryableErr(ctx context.Context, msg redact.Redac return txn.mu.sender.GenerateForcedRetryableErr(ctx, now.ToTimestamp(), msg) } +const RandomTxnRetryProbability = 0.1 + +// TestingShouldRetry returns true if we should generate a +// random, retriable error for this transaction. +func (txn *Txn) TestingShouldRetry() bool { + txn.mu.Lock() + defer txn.mu.Unlock() + + if txn.mu.sender.ClientFinalized() { + return false + } + + return txn.mu.sender.TestingShouldRetry(txn) +} + // IsSerializablePushAndRefreshNotPossible returns true if the transaction is // serializable, its timestamp has been pushed and there's no chance that // refreshing the read spans will succeed later (thus allowing the transaction diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 59384e64b197..ac2e901d367c 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -866,6 +866,7 @@ go_test( "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/jobutils", + "//pkg/testutils/kvclientutils", "//pkg/testutils/pgtest", "//pkg/testutils/serverutils", "//pkg/testutils/skip", diff --git a/pkg/sql/create_as_test.go b/pkg/sql/create_as_test.go index 45fd63de9570..f274d3a11663 100644 --- a/pkg/sql/create_as_test.go +++ b/pkg/sql/create_as_test.go @@ -17,9 +17,11 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -122,6 +124,8 @@ func TestCreateAsVTable(t *testing.T) { waitForJobsSuccess(t, sqlRunner) } +// TestCreateAsVTable verifies that SHOW commands can be used as the source of +// CREATE TABLE AS and CREATE MATERIALIZED VIEW AS. func TestCreateAsShow(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -323,6 +327,7 @@ AND status != 'succeeded'` sqlRunner.CheckQueryResultsRetry(t, query, [][]string{}) } +// TestFormat verifies the statement in the schema change job description. func TestFormat(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -355,9 +360,9 @@ func TestFormat(t *testing.T) { } ctx := context.Background() - testCluster := serverutils.StartCluster(t, 1, base.TestClusterArgs{}) - defer testCluster.Stopper().Stop(ctx) - sqlRunner := sqlutils.MakeSQLRunner(testCluster.ServerConn(0)) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + sqlRunner := sqlutils.MakeSQLRunner(db) var p parser.Parser for _, tc := range testCases { @@ -390,3 +395,55 @@ AND description LIKE 'CREATE%%%s%%'`, }) } } + +// TestTransactionRetryError tests that the schema changer succeeds if there is +// a retryable transaction error. +func TestTransactionRetryError(t *testing.T) { + defer leaktest.AfterTest(t)() + + testCases := []struct { + desc string + setup string + query string + verifyResults func(sqlutils.Fataler, *sqlutils.SQLRunner) + }{ + { + desc: "CREATE TABLE AS", + setup: "CREATE SEQUENCE seq", + query: "CREATE TABLE t AS SELECT nextval('seq')", + verifyResults: func(t sqlutils.Fataler, sqlRunner *sqlutils.SQLRunner) { + // Result should be 2 but is 3 because of this bug + // https://github.com/cockroachdb/cockroach/issues/78457. + sqlRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"3"}}) + }, + }, + { + desc: "CREATE MATERIALIZED VIEW AS", + setup: "CREATE SEQUENCE seq", + query: "CREATE MATERIALIZED VIEW v AS SELECT nextval('seq')", + verifyResults: func(t sqlutils.Fataler, sqlRunner *sqlutils.SQLRunner) { + sqlRunner.CheckQueryResults(t, "SELECT * FROM v", [][]string{{"2"}}) + }, + }, + } + + ctx := context.Background() + for _, testCase := range testCases { + t.Run(testCase.desc, func(t *testing.T) { + filterFunc, verifyFunc := kvclientutils.PrefixTransactionRetryFilter(t, schemaChangerBackfillTxnDebugName, 1) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransactionRetryFilter: filterFunc, + }, + }, + }) + defer s.Stopper().Stop(ctx) + sqlRunner := sqlutils.MakeSQLRunner(db) + sqlRunner.Exec(t, testCase.setup) + sqlRunner.Exec(t, testCase.query) + verifyFunc() + testCase.verifyResults(t, sqlRunner) + }) + } +} diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index c6ba10aae04b..66ad6691e149 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -1764,6 +1764,13 @@ func (ief *InternalDB) txn( if err != nil { return err } + // We check this testing condition here since a retry cannot be generated + // after a successful commit. Since we commit below, this is our last + // chance to generate a retry for users of (*InternalDB).Txn. + if kvTxn.TestingShouldRetry() { + return kvTxn.GenerateForcedRetryableErr(ctx, "injected retriable error") + } + return commitTxnFn(ctx) }); descs.IsTwoVersionInvariantViolationError(err) { continue diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 2a24bc6d9396..39e4ade3d2c0 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -293,6 +293,8 @@ func (sc *SchemaChanger) refreshMaterializedView( return sc.backfillQueryIntoTable(ctx, tableToRefresh, table.GetViewQuery(), refresh.AsOf(), "refreshView") } +const schemaChangerBackfillTxnDebugName = "schemaChangerBackfill" + func (sc *SchemaChanger) backfillQueryIntoTable( ctx context.Context, table catalog.TableDescriptor, query string, ts hlc.Timestamp, desc string, ) error { @@ -302,7 +304,12 @@ func (sc *SchemaChanger) backfillQueryIntoTable( } } + isTxnRetry := false return sc.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + defer func() { + isTxnRetry = true + }() + txn.KV().SetDebugName(schemaChangerBackfillTxnDebugName) if err := txn.KV().SetFixedTimestamp(ctx, ts); err != nil { return err } @@ -322,6 +329,23 @@ func (sc *SchemaChanger) backfillQueryIntoTable( defer cleanup() localPlanner := p.(*planner) + + // Delete existing span before ingestion to prevent key collisions. + // BulkRowWriter adds SSTables non-transactionally so the writes are not + // rolled back. + if isTxnRetry { + request := kvpb.BatchRequest{ + Header: kvpb.Header{ + Timestamp: ts, + }, + } + tableSpan := table.TableSpan(localPlanner.EvalContext().Codec) + request.Add(kvpb.NewDeleteRange(tableSpan.Key, tableSpan.EndKey, false)) + if _, err := localPlanner.execCfg.DB.NonTransactionalSender().Send(ctx, &request); err != nil { + return err.GoError() + } + } + stmt, err := parser.ParseOne(query) if err != nil { return err diff --git a/pkg/testutils/kvclientutils/BUILD.bazel b/pkg/testutils/kvclientutils/BUILD.bazel index a64084828166..71a206ccb9dc 100644 --- a/pkg/testutils/kvclientutils/BUILD.bazel +++ b/pkg/testutils/kvclientutils/BUILD.bazel @@ -6,6 +6,7 @@ go_library( srcs = [ "api.go", "txn_recovery.go", + "txn_restart.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils", visibility = ["//visibility:public"], @@ -13,7 +14,10 @@ go_library( "//pkg/kv", "//pkg/kv/kvpb", "//pkg/roachpb", + "//pkg/testutils", "//pkg/util/hlc", + "//pkg/util/randutil", + "//pkg/util/syncutil", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/testutils/kvclientutils/txn_restart.go b/pkg/testutils/kvclientutils/txn_restart.go new file mode 100644 index 000000000000..8cebd76255a0 --- /dev/null +++ b/pkg/testutils/kvclientutils/txn_restart.go @@ -0,0 +1,58 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvclientutils + +import ( + "strings" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +var randRetryRngSource, _ = randutil.NewLockedPseudoRand() + +func RandomTransactionRetryFilter() func(*kv.Txn) bool { + return func(*kv.Txn) bool { + return randRetryRngSource.Float64() < kv.RandomTxnRetryProbability + } +} + +func PrefixTransactionRetryFilter( + t testutils.TestErrorer, prefix string, maxCount int, +) (func(*kv.Txn) bool, func()) { + var count int + var mu syncutil.Mutex + verifyFunc := func() { + mu.Lock() + defer mu.Unlock() + if count == 0 { + t.Errorf("expected at least 1 transaction to match prefix %q", prefix) + } + } + filterFunc := func(txn *kv.Txn) bool { + // Use DebugNameLocked because txn is locked by the caller. + if !strings.HasPrefix(txn.DebugNameLocked(), prefix) { + return false + } + + mu.Lock() + defer mu.Unlock() + if count < maxCount { + count++ + return true + } + return false + } + + return filterFunc, verifyFunc +}