Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: clean up randomized retryable error injection #108828

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error {
params.ServerArgs.ExternalIODirConfig = cfg.ioConf

params.ServerArgs.DefaultTestTenant = cfg.defaultTestTenant
var transactionRetryFilter func(*kv.Txn) bool
var transactionRetryFilter func(roachpb.Transaction) bool
if cfg.randomTxnRetries {
transactionRetryFilter = kvclientutils.RandomTransactionRetryFilter()
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,15 +993,7 @@ func (db *DB) TxnRootKV(ctx context.Context, retryable func(context.Context, *Tx

// runTxn runs the given retryable transaction function using the given *Txn.
func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn) error) error {
err := txn.exec(ctx, func(ctx context.Context, txn *Txn) error {
if err := retryable(ctx, txn); err != nil {
return err
}
if txn.TestingShouldRetry() {
return txn.GenerateForcedRetryableErr(ctx, "injected retriable error")
}
return nil
})
err := txn.exec(ctx, retryable)
if err != nil {
if rollbackErr := txn.Rollback(ctx); rollbackErr != nil {
log.Eventf(ctx, "failure aborting transaction: %s; abort caused by: %s", rollbackErr, err)
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ go_library(
"//pkg/util/metric",
"//pkg/util/pprofutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/shuffle",
"//pkg/util/startup",
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ package kvcoord

import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// ClientTestingKnobs contains testing options that dictate the behavior
Expand Down Expand Up @@ -61,7 +61,7 @@ type ClientTestingKnobs struct {

// TransactionRetryFilter allows transaction retry loops to inject retriable
// errors.
TransactionRetryFilter func(*kv.Txn) bool
TransactionRetryFilter func(roachpb.Transaction) bool
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
22 changes: 10 additions & 12 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvcoord

import (
"context"
"math/rand"
"runtime/debug"

"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -23,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand All @@ -37,12 +37,6 @@ const (
OpTxnCoordSender = "txn coordinator send"
)

// DisableCommitSanityCheck allows opting out of a fatal assertion error that was observed in the wild
// and for which a root cause is not yet available.
//
// See: https://github.com/cockroachdb/cockroach/pull/73512.
var DisableCommitSanityCheck = envutil.EnvOrDefaultBool("COCKROACH_DISABLE_COMMIT_SANITY_CHECK", false)

// forceTxnRetries enables random transaction retries for test builds
// even if they aren't enabled via testing knobs.
var forceTxnRetries = envutil.EnvOrDefaultBool("COCKROACH_FORCE_RANDOM_TXN_RETRIES", false)
Expand Down Expand Up @@ -1569,14 +1563,18 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool {
return tc.hasPerformedWritesLocked()
}

var randRetryRngSource, _ = randutil.NewLockedPseudoRand()

func (tc *TxnCoordSender) TestingShouldRetry(txn *kv.Txn) bool {
if filter := tc.testingKnobs.TransactionRetryFilter; filter != nil && filter(txn) {
// TestingShouldRetry is part of the TxnSender interface.
func (tc *TxnCoordSender) TestingShouldRetry() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.txnState == txnFinalized {
return false
}
if filter := tc.testingKnobs.TransactionRetryFilter; filter != nil && filter(tc.mu.txn) {
return true
}
if forceTxnRetries && buildutil.CrdbTestBuild {
return randRetryRngSource.Float64() < kv.RandomTxnRetryProbability
return rand.Float64() < kv.RandomTxnRetryProbability
}
return false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool {
}

// TestingShouldRetry is part of TxnSenderFactory.
func (m *MockTransactionalSender) TestingShouldRetry(*Txn) bool {
func (m *MockTransactionalSender) TestingShouldRetry() bool {
return false
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,11 @@ type TxnSender interface {
// HasPerformedWrites returns true if a write has been performed.
HasPerformedWrites() bool

// TestingShouldRetry returns true if
// transaction retry errors should be randomly returned to
// callers. Note, it is the responsibility of (*kv.DB).Txn()
// to return the retries. This lives here since the TxnSender
// is what has access to the server's testing knobs.
TestingShouldRetry(*Txn) bool
// TestingShouldRetry returns true if transaction retry errors should be
// randomly returned to callers. Note that it is the responsibility of
// (*kv.DB).Txn() to return the retries. This lives here since the
// TxnSender is what has access to the server's client testing knobs.
TestingShouldRetry() bool
}

// SteppingMode is the argument type to ConfigureStepping.
Expand Down
28 changes: 16 additions & 12 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,10 @@ func (txn *Txn) SetDebugName(name string) {
func (txn *Txn) DebugName() string {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.DebugNameLocked()
return txn.debugNameLocked()
}

func (txn *Txn) DebugNameLocked() string {
func (txn *Txn) debugNameLocked() string {
return fmt.Sprintf("%s (id: %s)", txn.mu.debugName, txn.mu.ID)
}

Expand Down Expand Up @@ -958,6 +958,11 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
}
err = fn(ctx, txn)

// Optionally inject retryable errors for testing.
if err == nil && txn.TestingShouldRetry() {
err = txn.GenerateForcedRetryableErr(ctx, "injected retriable error")
}

// Commit on success, unless the txn has already been committed or rolled
// back by the closure. We allow that, as the closure might want to run 1PC
// transactions or might want to rollback on certain conditions.
Expand Down Expand Up @@ -1039,7 +1044,7 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) error {
}

log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s",
txn.DebugNameLocked(), retryErr)
txn.debugNameLocked(), retryErr)
txn.resetDeadlineLocked()

if !retryErr.TxnMustRestartFromBeginning() {
Expand Down Expand Up @@ -1096,7 +1101,7 @@ func (txn *Txn) PrepareForPartialRetry(ctx context.Context) error {
}

log.VEventf(ctx, 2, "partially retrying transaction: %s because of a retryable error: %s",
txn.DebugNameLocked(), retryErr)
txn.debugNameLocked(), retryErr)

txn.mu.sender.ClearRetryableErr(ctx)
return nil
Expand Down Expand Up @@ -1488,19 +1493,18 @@ func (txn *Txn) GenerateForcedRetryableErr(ctx context.Context, msg redact.Redac
return txn.mu.sender.GenerateForcedRetryableErr(ctx, now.ToTimestamp(), false /* mustRestart */, msg)
}

// RandomTxnRetryProbability is the probability that a transaction will inject a
// retryable error when either the RandomTransactionRetryFilter testing filter
// is installed or the COCKROACH_FORCE_RANDOM_TXN_RETRIES environment variable
// is set.
const RandomTxnRetryProbability = 0.1

// TestingShouldRetry returns true if we should generate a
// random, retriable error for this transaction.
// TestingShouldRetry returns true if we should generate a random, retryable
// error for this transaction.
func (txn *Txn) TestingShouldRetry() bool {
txn.mu.Lock()
defer txn.mu.Unlock()

if txn.mu.sender.ClientFinalized() {
return false
}

return txn.mu.sender.TestingShouldRetry(txn)
return txn.mu.sender.TestingShouldRetry()
}

// IsSerializablePushAndRefreshNotPossible returns true if the transaction is
Expand Down
1 change: 0 additions & 1 deletion pkg/testutils/kvclientutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"//pkg/roachpb",
"//pkg/testutils",
"//pkg/util/hlc",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
Expand Down
18 changes: 8 additions & 10 deletions pkg/testutils/kvclientutils/txn_restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,24 @@
package kvclientutils

import (
"math/rand"
"strings"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

var randRetryRngSource, _ = randutil.NewLockedPseudoRand()

func RandomTransactionRetryFilter() func(*kv.Txn) bool {
return func(*kv.Txn) bool {
return randRetryRngSource.Float64() < kv.RandomTxnRetryProbability
func RandomTransactionRetryFilter() func(roachpb.Transaction) bool {
return func(roachpb.Transaction) bool {
return rand.Float64() < kv.RandomTxnRetryProbability
}
}

func PrefixTransactionRetryFilter(
t testutils.TestErrorer, prefix string, maxCount int,
) (func(*kv.Txn) bool, func()) {
) (func(roachpb.Transaction) bool, func()) {
var count int
var mu syncutil.Mutex
verifyFunc := func() {
Expand All @@ -39,9 +38,8 @@ func PrefixTransactionRetryFilter(
t.Errorf("expected at least 1 transaction to match prefix %q", prefix)
}
}
filterFunc := func(txn *kv.Txn) bool {
// Use DebugNameLocked because txn is locked by the caller.
if !strings.HasPrefix(txn.DebugNameLocked(), prefix) {
filterFunc := func(txn roachpb.Transaction) bool {
if !strings.HasPrefix(txn.Name, prefix) {
return false
}

Expand Down