From c9ed8840c533d6695cdb0e353c132e6bdc3ac9b6 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Mon, 10 Jul 2023 11:55:16 +0100 Subject: [PATCH 1/5] kv,sql: testing knob to randomly inject retryable errors Occasionally, we see bugs caused by mismangement of state inside a function that will be retried. Namely, the functions `(*kv.DB).Txn` and `(*sql.InternalExecutor).Txn` take funcs that must be suitably idempotent. This is testing knob provides cheap way to help us hunt down such bugs. In the long run, it would be nice to turn this on by default, but we should probably make sure TestServer runs reliably with this turned on before flipping the default. I also considered retrying _every_ transaction once rather than relying on randomness. However, that would require adding more state in kv.Txn, which I wasn't sure I really wanted to do for testing purposes. Informs #106417 Epic: none Release note: None --- pkg/ccl/backupccl/datadriven_test.go | 8 ++++++++ pkg/kv/BUILD.bazel | 1 + pkg/kv/db.go | 8 +++++++- pkg/kv/kvclient/kvcoord/testing_knobs.go | 4 ++++ pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 22 +++++++++++++++++++++ pkg/kv/mock_transactional_sender.go | 5 +++++ pkg/kv/sender.go | 7 +++++++ pkg/kv/txn.go | 22 +++++++++++++++++++++ pkg/sql/internal.go | 10 ++++++++++ 9 files changed, 86 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index c18980c4ca5c..a78d4a1d29d4 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -142,6 +142,7 @@ type clusterCfg struct { beforeVersion string testingKnobCfg string defaultTestTenant base.DefaultTestTenantOptions + randomTxnRetries bool } func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error { @@ -156,6 +157,9 @@ func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error { // they get in CREATE TENANT. EnableTenantIDReuse: true, }, + KVClient: &kvcoord.ClientTestingKnobs{ + EnableRandomTransactionRetryErrors: cfg.randomTxnRetries, + }, } settings := cluster.MakeTestingClusterSettings() @@ -513,6 +517,9 @@ func runTestDataDriven(t *testing.T, testFilePathFromWorkspace string) { defaultTestTenant = base.TODOTestTenantDisabled } + // TODO(ssd): Once TestServer starts up reliably enough: + // randomTxnRetries := !d.HasArg("disable-txn-retries") + randomTxnRetries := false lastCreatedCluster = name cfg := clusterCfg{ name: name, @@ -524,6 +531,7 @@ func runTestDataDriven(t *testing.T, testFilePathFromWorkspace string) { beforeVersion: beforeVersion, testingKnobCfg: testingKnobCfg, defaultTestTenant: defaultTestTenant, + randomTxnRetries: randomTxnRetries, } err := ds.addCluster(t, cfg) if err != nil { diff --git a/pkg/kv/BUILD.bazel b/pkg/kv/BUILD.bazel index 4fd83f2ddf47..eb87eecdb7fe 100644 --- a/pkg/kv/BUILD.bazel +++ b/pkg/kv/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/protoutil", + "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/stop", "//pkg/util/syncutil", diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 31543e46826f..80edc2df0870 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -994,7 +994,13 @@ func (db *DB) TxnRootKV(ctx context.Context, retryable func(context.Context, *Tx // runTxn runs the given retryable transaction function using the given *Txn. func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn) error) error { err := txn.exec(ctx, func(ctx context.Context, txn *Txn) error { - return retryable(ctx, txn) + if err := retryable(ctx, txn); err != nil { + return err + } + if txn.TestingShouldReturnRandomRetry() { + return txn.GenerateForcedRetryableErr(ctx, "randomized retriable error") + } + return nil }) if err != nil { if rollbackErr := txn.Rollback(ctx); rollbackErr != nil { diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index a48297fad032..06b29afd299b 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -57,6 +57,10 @@ type ClientTestingKnobs struct { // error which, if non-nil, becomes the result of the batch. Otherwise, execution // continues. OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error + + // EnableRandomTransactionRetryErrors allows transaction retry + // loops to randomly inject retriable errors. + EnableRandomTransactionRetryErrors bool } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index d2b695011432..5d213ad1cd19 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -19,6 +19,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -34,6 +36,16 @@ const ( OpTxnCoordSender = "txn coordinator send" ) +// DisableCommitSanityCheck allows opting out of a fatal assertion error that was observed in the wild +// and for which a root cause is not yet available. +// +// See: https://github.com/cockroachdb/cockroach/pull/73512. +var DisableCommitSanityCheck = envutil.EnvOrDefaultBool("COCKROACH_DISABLE_COMMIT_SANITY_CHECK", false) + +// forceTxnRetries enables random transaction retries for test builds +// even if they aren't enabled via testing knobs. +var forceTxnRetries = envutil.EnvOrDefaultBool("COCKROACH_FORCE_RANDOM_TXN_RETRIES", false) + // txnState represents states relating to whether an EndTxn request needs // to be sent. // @@ -1531,6 +1543,16 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool { return tc.hasPerformedWritesLocked() } +func (tc *TxnCoordSender) TestingRandomRetryableErrorsEnabled() bool { + if tc.testingKnobs.EnableRandomTransactionRetryErrors { + return true + } + if forceTxnRetries && buildutil.CrdbTestBuild { + return true + } + return false +} + func (tc *TxnCoordSender) hasPerformedReadsLocked() bool { return !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() } diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 038b6e72d868..3a3a33a97266 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -251,6 +251,11 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool { panic("unimplemented") } +// TestingRandomRetryableErrorsEnabled is part of TxnSenderFactory. +func (m *MockTransactionalSender) TestingRandomRetryableErrorsEnabled() bool { + return false +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, *kvpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 9a308c64a41e..3134ac134e23 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -343,6 +343,13 @@ type TxnSender interface { // HasPerformedWrites returns true if a write has been performed. HasPerformedWrites() bool + + // TestingRandomRetriableErrorsEnabled returns true if + // transaction retry errors should be randomly returned to + // callers. Note, it is the responsibility of (*kv.DB).Txn() + // to return the retries. This lives here since the TxnSender + // is what has access to the server's testing knobs. + TestingRandomRetryableErrorsEnabled() bool } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 4e0a3bb83cc7..75d83cfc8d91 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -1430,6 +1431,27 @@ func (txn *Txn) GenerateForcedRetryableErr(ctx context.Context, msg redact.Redac return txn.mu.sender.GenerateForcedRetryableErr(ctx, now.ToTimestamp(), msg) } +var ( + randRetryRngSource, _ = randutil.NewPseudoRand() + randomTxnRetryProbability = 0.1 +) + +// TestingShouldReturnRandomRetry returns true if we should generate a +// random, retriable error for this transaction. +func (txn *Txn) TestingShouldReturnRandomRetry() bool { + txn.mu.Lock() + defer txn.mu.Unlock() + + if !txn.mu.sender.TestingRandomRetryableErrorsEnabled() { + return false + } + + if txn.mu.sender.ClientFinalized() { + return false + } + return randRetryRngSource.Float64() < randomTxnRetryProbability +} + // IsSerializablePushAndRefreshNotPossible returns true if the transaction is // serializable, its timestamp has been pushed and there's no chance that // refreshing the read spans will succeed later (thus allowing the transaction diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index c6ba10aae04b..c270f960da39 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -1764,6 +1764,16 @@ func (ief *InternalDB) txn( if err != nil { return err } + // We check this testing condition here since + // a random retry cannot be generated after a + // successful commit. Since we commit below, + // this is our last chance to generate a + // random retry for users of + // (*InternalDB).Txn. + if kvTxn.TestingShouldReturnRandomRetry() { + return kvTxn.GenerateForcedRetryableErr(ctx, "randomized retriable error") + } + return commitTxnFn(ctx) }); descs.IsTwoVersionInvariantViolationError(err) { continue From 173a6db0eacdcc0de72fd2b8542a9c6662deda5e Mon Sep 17 00:00:00 2001 From: Evan Wall Date: Wed, 9 Aug 2023 11:03:25 -0400 Subject: [PATCH 2/5] testing: replace EnableRandomTransactionRetryErrors with TransactionRetryFilter Adds TransactionRetryFilter which allows the retry behavior to be customized. EnableRandomTransactionRetryErrors has been refactored to a TransactionRetryFilter implementation that randomly retries. Release note: None --- pkg/ccl/backupccl/BUILD.bazel | 1 + pkg/ccl/backupccl/datadriven_test.go | 7 +++++- pkg/kv/BUILD.bazel | 1 - pkg/kv/db.go | 4 ++-- pkg/kv/kvclient/kvcoord/BUILD.bazel | 1 + pkg/kv/kvclient/kvcoord/testing_knobs.go | 7 +++--- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 13 ++++++----- pkg/kv/mock_transactional_sender.go | 4 ++-- pkg/kv/sender.go | 4 ++-- pkg/kv/txn.go | 17 +++++---------- pkg/sql/internal.go | 13 +++++------ pkg/testutils/kvclientutils/BUILD.bazel | 2 ++ pkg/testutils/kvclientutils/txn_restart.go | 24 +++++++++++++++++++++ 13 files changed, 62 insertions(+), 36 deletions(-) create mode 100644 pkg/testutils/kvclientutils/txn_restart.go diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 563739d89607..0d807c8c64aa 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -291,6 +291,7 @@ go_test( "//pkg/testutils/distsqlutils", "//pkg/testutils/fingerprintutils", "//pkg/testutils/jobutils", + "//pkg/testutils/kvclientutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index a78d4a1d29d4..72e6231bf88c 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" + "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -150,6 +151,10 @@ func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error { params.ServerArgs.ExternalIODirConfig = cfg.ioConf params.ServerArgs.DefaultTestTenant = cfg.defaultTestTenant + var transactionRetryFilter func(*kv.Txn) bool + if cfg.randomTxnRetries { + transactionRetryFilter = kvclientutils.RandomTransactionRetryFilter() + } params.ServerArgs.Knobs = base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), TenantTestingKnobs: &sql.TenantTestingKnobs{ @@ -158,7 +163,7 @@ func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error { EnableTenantIDReuse: true, }, KVClient: &kvcoord.ClientTestingKnobs{ - EnableRandomTransactionRetryErrors: cfg.randomTxnRetries, + TransactionRetryFilter: transactionRetryFilter, }, } diff --git a/pkg/kv/BUILD.bazel b/pkg/kv/BUILD.bazel index eb87eecdb7fe..4fd83f2ddf47 100644 --- a/pkg/kv/BUILD.bazel +++ b/pkg/kv/BUILD.bazel @@ -34,7 +34,6 @@ go_library( "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/protoutil", - "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/stop", "//pkg/util/syncutil", diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 80edc2df0870..2b1a9820edcd 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -997,8 +997,8 @@ func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn) if err := retryable(ctx, txn); err != nil { return err } - if txn.TestingShouldReturnRandomRetry() { - return txn.GenerateForcedRetryableErr(ctx, "randomized retriable error") + if txn.TestingShouldRetry() { + return txn.GenerateForcedRetryableErr(ctx, "injected retriable error") } return nil }) diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index ce457bf1386f..948ea31ebd6c 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -78,6 +78,7 @@ go_library( "//pkg/util/metric", "//pkg/util/pprofutil", "//pkg/util/quotapool", + "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/shuffle", "//pkg/util/startup", diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index 06b29afd299b..22f2318a89df 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -12,6 +12,7 @@ package kvcoord import ( "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" ) @@ -58,9 +59,9 @@ type ClientTestingKnobs struct { // continues. OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error - // EnableRandomTransactionRetryErrors allows transaction retry - // loops to randomly inject retriable errors. - EnableRandomTransactionRetryErrors bool + // TransactionRetryFilter allows transaction retry loops to inject retriable + // errors. + TransactionRetryFilter func(*kv.Txn) bool } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 5d213ad1cd19..c57bbb839271 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -1382,9 +1383,9 @@ func (tc *TxnCoordSender) TestingCloneTxn() *roachpb.Transaction { func (tc *TxnCoordSender) Step(ctx context.Context) error { // TODO(nvanbenschoten): it should be possible to make this assertion, but // the API is currently misused by the connExecutor. See #86162. - //if tc.typ != kv.RootTxn { + // if tc.typ != kv.RootTxn { // return errors.AssertionFailedf("cannot step in non-root txn") - //} + // } tc.mu.Lock() defer tc.mu.Unlock() if tc.shouldStepReadTimestampLocked() { @@ -1543,12 +1544,14 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool { return tc.hasPerformedWritesLocked() } -func (tc *TxnCoordSender) TestingRandomRetryableErrorsEnabled() bool { - if tc.testingKnobs.EnableRandomTransactionRetryErrors { +var randRetryRngSource, _ = randutil.NewLockedPseudoRand() + +func (tc *TxnCoordSender) TestingShouldRetry(txn *kv.Txn) bool { + if filter := tc.testingKnobs.TransactionRetryFilter; filter != nil && filter(txn) { return true } if forceTxnRetries && buildutil.CrdbTestBuild { - return true + return randRetryRngSource.Float64() < kv.RandomTxnRetryProbability } return false } diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 3a3a33a97266..0be57d7c89ea 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -251,8 +251,8 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool { panic("unimplemented") } -// TestingRandomRetryableErrorsEnabled is part of TxnSenderFactory. -func (m *MockTransactionalSender) TestingRandomRetryableErrorsEnabled() bool { +// TestingShouldRetry is part of TxnSenderFactory. +func (m *MockTransactionalSender) TestingShouldRetry(*Txn) bool { return false } diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 3134ac134e23..9211e94c9c34 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -344,12 +344,12 @@ type TxnSender interface { // HasPerformedWrites returns true if a write has been performed. HasPerformedWrites() bool - // TestingRandomRetriableErrorsEnabled returns true if + // TestingShouldRetry returns true if // transaction retry errors should be randomly returned to // callers. Note, it is the responsibility of (*kv.DB).Txn() // to return the retries. This lives here since the TxnSender // is what has access to the server's testing knobs. - TestingRandomRetryableErrorsEnabled() bool + TestingShouldRetry(*Txn) bool } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 75d83cfc8d91..2eec2199a107 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -1431,25 +1430,19 @@ func (txn *Txn) GenerateForcedRetryableErr(ctx context.Context, msg redact.Redac return txn.mu.sender.GenerateForcedRetryableErr(ctx, now.ToTimestamp(), msg) } -var ( - randRetryRngSource, _ = randutil.NewPseudoRand() - randomTxnRetryProbability = 0.1 -) +const RandomTxnRetryProbability = 0.1 -// TestingShouldReturnRandomRetry returns true if we should generate a +// TestingShouldRetry returns true if we should generate a // random, retriable error for this transaction. -func (txn *Txn) TestingShouldReturnRandomRetry() bool { +func (txn *Txn) TestingShouldRetry() bool { txn.mu.Lock() defer txn.mu.Unlock() - if !txn.mu.sender.TestingRandomRetryableErrorsEnabled() { - return false - } - if txn.mu.sender.ClientFinalized() { return false } - return randRetryRngSource.Float64() < randomTxnRetryProbability + + return txn.mu.sender.TestingShouldRetry(txn) } // IsSerializablePushAndRefreshNotPossible returns true if the transaction is diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index c270f960da39..66ad6691e149 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -1764,14 +1764,11 @@ func (ief *InternalDB) txn( if err != nil { return err } - // We check this testing condition here since - // a random retry cannot be generated after a - // successful commit. Since we commit below, - // this is our last chance to generate a - // random retry for users of - // (*InternalDB).Txn. - if kvTxn.TestingShouldReturnRandomRetry() { - return kvTxn.GenerateForcedRetryableErr(ctx, "randomized retriable error") + // We check this testing condition here since a retry cannot be generated + // after a successful commit. Since we commit below, this is our last + // chance to generate a retry for users of (*InternalDB).Txn. + if kvTxn.TestingShouldRetry() { + return kvTxn.GenerateForcedRetryableErr(ctx, "injected retriable error") } return commitTxnFn(ctx) diff --git a/pkg/testutils/kvclientutils/BUILD.bazel b/pkg/testutils/kvclientutils/BUILD.bazel index a64084828166..c0678adcdde2 100644 --- a/pkg/testutils/kvclientutils/BUILD.bazel +++ b/pkg/testutils/kvclientutils/BUILD.bazel @@ -6,6 +6,7 @@ go_library( srcs = [ "api.go", "txn_recovery.go", + "txn_restart.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils", visibility = ["//visibility:public"], @@ -14,6 +15,7 @@ go_library( "//pkg/kv/kvpb", "//pkg/roachpb", "//pkg/util/hlc", + "//pkg/util/randutil", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/testutils/kvclientutils/txn_restart.go b/pkg/testutils/kvclientutils/txn_restart.go new file mode 100644 index 000000000000..f7c14b2f6fc0 --- /dev/null +++ b/pkg/testutils/kvclientutils/txn_restart.go @@ -0,0 +1,24 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvclientutils + +import ( + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +var randRetryRngSource, _ = randutil.NewLockedPseudoRand() + +func RandomTransactionRetryFilter() func(*kv.Txn) bool { + return func(*kv.Txn) bool { + return randRetryRngSource.Float64() < kv.RandomTxnRetryProbability + } +} From d9554e0ca5410320ed378877c8897d97f00410a8 Mon Sep 17 00:00:00 2001 From: Evan Wall Date: Mon, 14 Aug 2023 11:50:00 -0400 Subject: [PATCH 3/5] testing: add comments to create_as_test.go Release note: None --- pkg/sql/create_as_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/sql/create_as_test.go b/pkg/sql/create_as_test.go index 45fd63de9570..5a1aad668398 100644 --- a/pkg/sql/create_as_test.go +++ b/pkg/sql/create_as_test.go @@ -122,6 +122,8 @@ func TestCreateAsVTable(t *testing.T) { waitForJobsSuccess(t, sqlRunner) } +// TestCreateAsVTable verifies that SHOW commands can be used as the source of +// CREATE TABLE AS and CREATE MATERIALIZED VIEW AS. func TestCreateAsShow(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -323,6 +325,7 @@ AND status != 'succeeded'` sqlRunner.CheckQueryResultsRetry(t, query, [][]string{}) } +// TestFormat verifies the statement in the schema change job description. func TestFormat(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) From 0e0528fe385b59227cf3fad0b8fc6a9eee0258a3 Mon Sep 17 00:00:00 2001 From: Evan Wall Date: Mon, 14 Aug 2023 11:58:58 -0400 Subject: [PATCH 4/5] testing: replace StartNewTestCluster with StartServer in create_as_test.go Release note: None --- pkg/sql/create_as_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/sql/create_as_test.go b/pkg/sql/create_as_test.go index 5a1aad668398..12d744ea895f 100644 --- a/pkg/sql/create_as_test.go +++ b/pkg/sql/create_as_test.go @@ -358,9 +358,9 @@ func TestFormat(t *testing.T) { } ctx := context.Background() - testCluster := serverutils.StartCluster(t, 1, base.TestClusterArgs{}) - defer testCluster.Stopper().Stop(ctx) - sqlRunner := sqlutils.MakeSQLRunner(testCluster.ServerConn(0)) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + sqlRunner := sqlutils.MakeSQLRunner(db) var p parser.Parser for _, tc := range testCases { From 514b44d506884fd698c58fb76ad4354bcb6e2770 Mon Sep 17 00:00:00 2001 From: Evan Wall Date: Tue, 1 Aug 2023 07:11:00 -0400 Subject: [PATCH 5/5] schema: allow transaction retry in schema changer backfill Fixes #106696 This change adds a test to simulate transaction retries during the schema changer backfill. Some retries caused `ingested key collides with an existing one` errors during addSSTable because the schema change's source data changed. For example: `CREATE TABLE t AS SELECT nextval('seq');` - `nextval` is non-transactional so each time it is selected, the value changes. This change also allows the schema changer to specify that addSSTable is allowed to shadow (replace) keys' values if they are different to handle this case. Release note: None --- pkg/kv/txn.go | 6 +-- pkg/sql/BUILD.bazel | 1 + pkg/sql/create_as_test.go | 54 ++++++++++++++++++++++ pkg/sql/schema_changer.go | 24 ++++++++++ pkg/testutils/kvclientutils/BUILD.bazel | 2 + pkg/testutils/kvclientutils/txn_restart.go | 34 ++++++++++++++ 6 files changed, 118 insertions(+), 3 deletions(-) diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 2eec2199a107..480653fd1f3a 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -361,10 +361,10 @@ func (txn *Txn) SetDebugName(name string) { func (txn *Txn) DebugName() string { txn.mu.Lock() defer txn.mu.Unlock() - return txn.debugNameLocked() + return txn.DebugNameLocked() } -func (txn *Txn) debugNameLocked() string { +func (txn *Txn) DebugNameLocked() string { return fmt.Sprintf("%s (id: %s)", txn.mu.debugName, txn.mu.ID) } @@ -1035,7 +1035,7 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) error { retryErr, "PrepareForRetry() called on leaf txn"), ctx) } log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s", - txn.debugNameLocked(), retryErr) + txn.DebugNameLocked(), retryErr) txn.resetDeadlineLocked() if txn.mu.ID != retryErr.TxnID { diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 59384e64b197..ac2e901d367c 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -866,6 +866,7 @@ go_test( "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/jobutils", + "//pkg/testutils/kvclientutils", "//pkg/testutils/pgtest", "//pkg/testutils/serverutils", "//pkg/testutils/skip", diff --git a/pkg/sql/create_as_test.go b/pkg/sql/create_as_test.go index 12d744ea895f..f274d3a11663 100644 --- a/pkg/sql/create_as_test.go +++ b/pkg/sql/create_as_test.go @@ -17,9 +17,11 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -393,3 +395,55 @@ AND description LIKE 'CREATE%%%s%%'`, }) } } + +// TestTransactionRetryError tests that the schema changer succeeds if there is +// a retryable transaction error. +func TestTransactionRetryError(t *testing.T) { + defer leaktest.AfterTest(t)() + + testCases := []struct { + desc string + setup string + query string + verifyResults func(sqlutils.Fataler, *sqlutils.SQLRunner) + }{ + { + desc: "CREATE TABLE AS", + setup: "CREATE SEQUENCE seq", + query: "CREATE TABLE t AS SELECT nextval('seq')", + verifyResults: func(t sqlutils.Fataler, sqlRunner *sqlutils.SQLRunner) { + // Result should be 2 but is 3 because of this bug + // https://github.com/cockroachdb/cockroach/issues/78457. + sqlRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"3"}}) + }, + }, + { + desc: "CREATE MATERIALIZED VIEW AS", + setup: "CREATE SEQUENCE seq", + query: "CREATE MATERIALIZED VIEW v AS SELECT nextval('seq')", + verifyResults: func(t sqlutils.Fataler, sqlRunner *sqlutils.SQLRunner) { + sqlRunner.CheckQueryResults(t, "SELECT * FROM v", [][]string{{"2"}}) + }, + }, + } + + ctx := context.Background() + for _, testCase := range testCases { + t.Run(testCase.desc, func(t *testing.T) { + filterFunc, verifyFunc := kvclientutils.PrefixTransactionRetryFilter(t, schemaChangerBackfillTxnDebugName, 1) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransactionRetryFilter: filterFunc, + }, + }, + }) + defer s.Stopper().Stop(ctx) + sqlRunner := sqlutils.MakeSQLRunner(db) + sqlRunner.Exec(t, testCase.setup) + sqlRunner.Exec(t, testCase.query) + verifyFunc() + testCase.verifyResults(t, sqlRunner) + }) + } +} diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 2a24bc6d9396..39e4ade3d2c0 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -293,6 +293,8 @@ func (sc *SchemaChanger) refreshMaterializedView( return sc.backfillQueryIntoTable(ctx, tableToRefresh, table.GetViewQuery(), refresh.AsOf(), "refreshView") } +const schemaChangerBackfillTxnDebugName = "schemaChangerBackfill" + func (sc *SchemaChanger) backfillQueryIntoTable( ctx context.Context, table catalog.TableDescriptor, query string, ts hlc.Timestamp, desc string, ) error { @@ -302,7 +304,12 @@ func (sc *SchemaChanger) backfillQueryIntoTable( } } + isTxnRetry := false return sc.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + defer func() { + isTxnRetry = true + }() + txn.KV().SetDebugName(schemaChangerBackfillTxnDebugName) if err := txn.KV().SetFixedTimestamp(ctx, ts); err != nil { return err } @@ -322,6 +329,23 @@ func (sc *SchemaChanger) backfillQueryIntoTable( defer cleanup() localPlanner := p.(*planner) + + // Delete existing span before ingestion to prevent key collisions. + // BulkRowWriter adds SSTables non-transactionally so the writes are not + // rolled back. + if isTxnRetry { + request := kvpb.BatchRequest{ + Header: kvpb.Header{ + Timestamp: ts, + }, + } + tableSpan := table.TableSpan(localPlanner.EvalContext().Codec) + request.Add(kvpb.NewDeleteRange(tableSpan.Key, tableSpan.EndKey, false)) + if _, err := localPlanner.execCfg.DB.NonTransactionalSender().Send(ctx, &request); err != nil { + return err.GoError() + } + } + stmt, err := parser.ParseOne(query) if err != nil { return err diff --git a/pkg/testutils/kvclientutils/BUILD.bazel b/pkg/testutils/kvclientutils/BUILD.bazel index c0678adcdde2..71a206ccb9dc 100644 --- a/pkg/testutils/kvclientutils/BUILD.bazel +++ b/pkg/testutils/kvclientutils/BUILD.bazel @@ -14,8 +14,10 @@ go_library( "//pkg/kv", "//pkg/kv/kvpb", "//pkg/roachpb", + "//pkg/testutils", "//pkg/util/hlc", "//pkg/util/randutil", + "//pkg/util/syncutil", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/testutils/kvclientutils/txn_restart.go b/pkg/testutils/kvclientutils/txn_restart.go index f7c14b2f6fc0..8cebd76255a0 100644 --- a/pkg/testutils/kvclientutils/txn_restart.go +++ b/pkg/testutils/kvclientutils/txn_restart.go @@ -11,8 +11,12 @@ package kvclientutils import ( + "strings" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) var randRetryRngSource, _ = randutil.NewLockedPseudoRand() @@ -22,3 +26,33 @@ func RandomTransactionRetryFilter() func(*kv.Txn) bool { return randRetryRngSource.Float64() < kv.RandomTxnRetryProbability } } + +func PrefixTransactionRetryFilter( + t testutils.TestErrorer, prefix string, maxCount int, +) (func(*kv.Txn) bool, func()) { + var count int + var mu syncutil.Mutex + verifyFunc := func() { + mu.Lock() + defer mu.Unlock() + if count == 0 { + t.Errorf("expected at least 1 transaction to match prefix %q", prefix) + } + } + filterFunc := func(txn *kv.Txn) bool { + // Use DebugNameLocked because txn is locked by the caller. + if !strings.HasPrefix(txn.DebugNameLocked(), prefix) { + return false + } + + mu.Lock() + defer mu.Unlock() + if count < maxCount { + count++ + return true + } + return false + } + + return filterFunc, verifyFunc +}