Skip to content

Commit

Permalink
kv: clean up randomized retryable error injection
Browse files Browse the repository at this point in the history
This commit includes a collection of kv-level cleanups for the
randomized retryable error injection functionality recently introduced
in #107954. Notably, it:
- removes the dependency from `kvcoord.TxnCoordSender` on `kv.Txn` by adjusting
  the parameter of `ClientTestingKnobs.TransactionRetryFilter`.
- unexports `kv.Txn.DebugNameLocked`
- deletes `COCKROACH_DISABLE_COMMIT_SANITY_CHECK` again, which was unused
- removes rng sources in favor of the global rand, for simplicity
- cleans up and adds some comments

Epic: None
Release note: None
  • Loading branch information
nvanbenschoten committed Aug 16, 2023
1 parent c167946 commit 2e79ed8
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 54 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (d *datadrivenTestState) addCluster(t *testing.T, cfg clusterCfg) error {
params.ServerArgs.ExternalIODirConfig = cfg.ioConf

params.ServerArgs.DefaultTestTenant = cfg.defaultTestTenant
var transactionRetryFilter func(*kv.Txn) bool
var transactionRetryFilter func(*roachpb.Transaction) bool
if cfg.randomTxnRetries {
transactionRetryFilter = kvclientutils.RandomTransactionRetryFilter()
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,15 +993,7 @@ func (db *DB) TxnRootKV(ctx context.Context, retryable func(context.Context, *Tx

// runTxn runs the given retryable transaction function using the given *Txn.
func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn) error) error {
err := txn.exec(ctx, func(ctx context.Context, txn *Txn) error {
if err := retryable(ctx, txn); err != nil {
return err
}
if txn.TestingShouldRetry() {
return txn.GenerateForcedRetryableErr(ctx, "injected retriable error")
}
return nil
})
err := txn.exec(ctx, retryable)
if err != nil {
if rollbackErr := txn.Rollback(ctx); rollbackErr != nil {
log.Eventf(ctx, "failure aborting transaction: %s; abort caused by: %s", rollbackErr, err)
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ go_library(
"//pkg/util/metric",
"//pkg/util/pprofutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/shuffle",
"//pkg/util/startup",
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ package kvcoord

import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// ClientTestingKnobs contains testing options that dictate the behavior
Expand Down Expand Up @@ -61,7 +61,7 @@ type ClientTestingKnobs struct {

// TransactionRetryFilter allows transaction retry loops to inject retriable
// errors.
TransactionRetryFilter func(*kv.Txn) bool
TransactionRetryFilter func(*roachpb.Transaction) bool
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
22 changes: 10 additions & 12 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvcoord

import (
"context"
"math/rand"
"runtime/debug"

"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -23,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand All @@ -37,12 +37,6 @@ const (
OpTxnCoordSender = "txn coordinator send"
)

// DisableCommitSanityCheck allows opting out of a fatal assertion error that was observed in the wild
// and for which a root cause is not yet available.
//
// See: https://github.com/cockroachdb/cockroach/pull/73512.
var DisableCommitSanityCheck = envutil.EnvOrDefaultBool("COCKROACH_DISABLE_COMMIT_SANITY_CHECK", false)

// forceTxnRetries enables random transaction retries for test builds
// even if they aren't enabled via testing knobs.
var forceTxnRetries = envutil.EnvOrDefaultBool("COCKROACH_FORCE_RANDOM_TXN_RETRIES", false)
Expand Down Expand Up @@ -1544,14 +1538,18 @@ func (tc *TxnCoordSender) HasPerformedWrites() bool {
return tc.hasPerformedWritesLocked()
}

var randRetryRngSource, _ = randutil.NewLockedPseudoRand()

func (tc *TxnCoordSender) TestingShouldRetry(txn *kv.Txn) bool {
if filter := tc.testingKnobs.TransactionRetryFilter; filter != nil && filter(txn) {
// TestingShouldRetry is part of the TxnSender interface.
func (tc *TxnCoordSender) TestingShouldRetry() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.txnState == txnFinalized {
return false
}
if filter := tc.testingKnobs.TransactionRetryFilter; filter != nil && filter(tc.mu.txn.Clone()) {
return true
}
if forceTxnRetries && buildutil.CrdbTestBuild {
return randRetryRngSource.Float64() < kv.RandomTxnRetryProbability
return rand.Float64() < kv.RandomTxnRetryProbability
}
return false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (m *MockTransactionalSender) HasPerformedWrites() bool {
}

// TestingShouldRetry is part of TxnSenderFactory.
func (m *MockTransactionalSender) TestingShouldRetry(*Txn) bool {
func (m *MockTransactionalSender) TestingShouldRetry() bool {
return false
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,11 @@ type TxnSender interface {
// HasPerformedWrites returns true if a write has been performed.
HasPerformedWrites() bool

// TestingShouldRetry returns true if
// transaction retry errors should be randomly returned to
// callers. Note, it is the responsibility of (*kv.DB).Txn()
// to return the retries. This lives here since the TxnSender
// is what has access to the server's testing knobs.
TestingShouldRetry(*Txn) bool
// TestingShouldRetry returns true if transaction retry errors should be
// randomly returned to callers. Note, it is the responsibility of
// (*kv.DB).Txn() to return the retries. This lives here since the
// TxnSender is what has access to the server's client testing knobs.
TestingShouldRetry() bool
}

// SteppingMode is the argument type to ConfigureStepping.
Expand Down
26 changes: 15 additions & 11 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,10 @@ func (txn *Txn) SetDebugName(name string) {
func (txn *Txn) DebugName() string {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.DebugNameLocked()
return txn.debugNameLocked()
}

func (txn *Txn) DebugNameLocked() string {
func (txn *Txn) debugNameLocked() string {
return fmt.Sprintf("%s (id: %s)", txn.mu.debugName, txn.mu.ID)
}

Expand Down Expand Up @@ -958,6 +958,11 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
}
err = fn(ctx, txn)

// Optionally inject retryable errors for testing.
if err == nil && txn.TestingShouldRetry() {
err = txn.GenerateForcedRetryableErr(ctx, "injected retriable error")
}

// Commit on success, unless the txn has already been committed or rolled
// back by the closure. We allow that, as the closure might want to run 1PC
// transactions or might want to rollback on certain conditions.
Expand Down Expand Up @@ -1035,7 +1040,7 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) error {
retryErr, "PrepareForRetry() called on leaf txn"), ctx)
}
log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s",
txn.DebugNameLocked(), retryErr)
txn.debugNameLocked(), retryErr)
txn.resetDeadlineLocked()

if txn.mu.ID != retryErr.TxnID {
Expand Down Expand Up @@ -1430,19 +1435,18 @@ func (txn *Txn) GenerateForcedRetryableErr(ctx context.Context, msg redact.Redac
return txn.mu.sender.GenerateForcedRetryableErr(ctx, now.ToTimestamp(), msg)
}

// RandomTxnRetryProbability is the probability that a transaction will inject a
// retryable error when either the RandomTransactionRetryFilter testing filter
// is installed or the COCKROACH_FORCE_RANDOM_TXN_RETRIES environment variable
// is set.
const RandomTxnRetryProbability = 0.1

// TestingShouldRetry returns true if we should generate a
// random, retriable error for this transaction.
// TestingShouldRetry returns true if we should generate a random, retryable
// error for this transaction.
func (txn *Txn) TestingShouldRetry() bool {
txn.mu.Lock()
defer txn.mu.Unlock()

if txn.mu.sender.ClientFinalized() {
return false
}

return txn.mu.sender.TestingShouldRetry(txn)
return txn.mu.sender.TestingShouldRetry()
}

// IsSerializablePushAndRefreshNotPossible returns true if the transaction is
Expand Down
1 change: 0 additions & 1 deletion pkg/testutils/kvclientutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"//pkg/roachpb",
"//pkg/testutils",
"//pkg/util/hlc",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
Expand Down
18 changes: 8 additions & 10 deletions pkg/testutils/kvclientutils/txn_restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,24 @@
package kvclientutils

import (
"math/rand"
"strings"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

var randRetryRngSource, _ = randutil.NewLockedPseudoRand()

func RandomTransactionRetryFilter() func(*kv.Txn) bool {
return func(*kv.Txn) bool {
return randRetryRngSource.Float64() < kv.RandomTxnRetryProbability
func RandomTransactionRetryFilter() func(*roachpb.Transaction) bool {
return func(*roachpb.Transaction) bool {
return rand.Float64() < kv.RandomTxnRetryProbability
}
}

func PrefixTransactionRetryFilter(
t testutils.TestErrorer, prefix string, maxCount int,
) (func(*kv.Txn) bool, func()) {
) (func(*roachpb.Transaction) bool, func()) {
var count int
var mu syncutil.Mutex
verifyFunc := func() {
Expand All @@ -39,9 +38,8 @@ func PrefixTransactionRetryFilter(
t.Errorf("expected at least 1 transaction to match prefix %q", prefix)
}
}
filterFunc := func(txn *kv.Txn) bool {
// Use DebugNameLocked because txn is locked by the caller.
if !strings.HasPrefix(txn.DebugNameLocked(), prefix) {
filterFunc := func(txn *roachpb.Transaction) bool {
if !strings.HasPrefix(txn.Name, prefix) {
return false
}

Expand Down

0 comments on commit 2e79ed8

Please sign in to comment.