Skip to content

Commit

Permalink
sql: add inject_retry_errors_on_commit_enabled session var
Browse files Browse the repository at this point in the history
Release note (sql change): The session variable
inject_retry_errors_on_commit_enabled has been added. When this is true, any
COMMIT statement will return a transaction retry
error if it is run inside of an explicit transaction. The errors will
continue until inject_retry_errors_on_commit_enabled
is set to false. The purpose of this setting is to allow developers to
test their transaction retry logic.
  • Loading branch information
Eric.Yang authored and rafiss committed Feb 28, 2023
1 parent e828ba0 commit 1567139
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 4 deletions.
17 changes: 13 additions & 4 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// numTxnRetryErrors is the number of times an error will be injected if
// the transaction is retried using SAVEPOINTs.
const numTxnRetryErrors = 3

// execStmt executes one statement by dispatching according to the current
// state. Returns an Event to be passed to the state machine, or nil if no
// transition is needed. If nil is returned, then the cursor is supposed to
Expand Down Expand Up @@ -935,7 +939,15 @@ func (ex *connExecutor) commitSQLTransaction(
) (fsm.Event, fsm.EventPayload) {
ex.extraTxnState.idleLatency += ex.statsCollector.PhaseTimes().
GetIdleLatency(ex.statsCollector.PreviousPhaseTimes())

if ex.sessionData().InjectRetryErrorsOnCommitEnabled && ast.StatementTag() == "COMMIT" {
if ex.planner.Txn().Epoch() < ex.state.lastEpoch+numTxnRetryErrors {
retryErr := ex.state.mu.txn.GenerateForcedRetryableError(
ctx, "injected by `inject_retry_errors_on_commit_enabled` session variable")
return ex.makeErrEvent(retryErr, ast)
} else {
ex.state.lastEpoch = ex.planner.Txn().Epoch()
}
}
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionStartTransactionCommit, timeutil.Now())
if err := commitFn(ctx); err != nil {
if descs.IsTwoVersionInvariantViolationError(err) {
Expand Down Expand Up @@ -1265,9 +1277,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ctx, planner, stmt.AST.StatementReturnType(), res, distribute, progAtomic,
)
if res.Err() == nil {
// numTxnRetryErrors is the number of times an error will be injected if
// the transaction is retried using SAVEPOINTs.
const numTxnRetryErrors = 3
isSetOrShow := stmt.AST.StatementTag() == "SET" || stmt.AST.StatementTag() == "SHOW"
if ex.sessionData().InjectRetryErrorsEnabled && !isSetOrShow &&
planner.Txn().Sender().TxnStatus() == roachpb.PENDING {
Expand Down
46 changes: 46 additions & 0 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,52 @@ func TestInjectRetryErrors(t *testing.T) {
})
}

func TestInjectRetryOnCommitErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params := base.TestServerArgs{}
s, db, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
defer db.Close()

_, err := db.Exec("SET inject_retry_errors_on_commit_enabled = 'true'")
require.NoError(t, err)

t.Run("test_injection_failure_on_commit_without_savepoints", func(t *testing.T) {
var txRes int
for attemptCount := 1; attemptCount <= 10; attemptCount++ {
tx, err := db.BeginTx(ctx, nil)
require.NoError(t, err)

// Verify that SHOW is exempt from error injection.
var s string
err = tx.QueryRow("SHOW inject_retry_errors_on_commit_enabled").Scan(&s)
require.NoError(t, err)

if attemptCount == 5 {
_, err = tx.Exec("SET inject_retry_errors_on_commit_enabled = 'false'")
require.NoError(t, err)
}

err = tx.QueryRow("SELECT $1::int8", attemptCount).Scan(&txRes)
require.NoError(t, err)
err = tx.Commit()
if attemptCount >= 5 {
require.NoError(t, err)
break
} else {
pqErr := (*pq.Error)(nil)
require.ErrorAs(t, err, &pqErr)
require.Equal(t, "40001", string(pqErr.Code), "expected a transaction retry error code. got %v", pqErr)
// We should not expect a rollback on commit errors.
}
}
require.Equal(t, 5, txRes)
})
}

func TestTrackOnlyUserOpenTransactionsAndActiveStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3416,6 +3416,10 @@ func (m *sessionDataMutator) SetOptimizerUseImprovedSplitDisjunctionForJoins(val
m.data.OptimizerUseImprovedSplitDisjunctionForJoins = val
}

func (m *sessionDataMutator) SetInjectRetryErrorsOnCommitEnabled(val bool) {
m.data.InjectRetryErrorsOnCommitEnabled = val
}

// Utility functions related to scrubbing sensitive information on SQL Stats.

// quantizeCounts ensures that the Count field in the
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -4975,6 +4975,7 @@ idle_session_timeout 0
index_join_streamer_batch_size 8.0 MiB
index_recommendations_enabled off
inject_retry_errors_enabled off
inject_retry_errors_on_commit_enabled off
integer_datetimes on
intervalstyle postgres
intervalstyle_enabled on
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2616,6 +2616,7 @@ idle_session_timeout 0 NULL
index_join_streamer_batch_size 8.0 MiB NULL NULL NULL string
index_recommendations_enabled off NULL NULL NULL string
inject_retry_errors_enabled off NULL NULL NULL string
inject_retry_errors_on_commit_enabled off NULL NULL NULL string
integer_datetimes on NULL NULL NULL string
intervalstyle postgres NULL NULL NULL string
is_superuser on NULL NULL NULL string
Expand Down Expand Up @@ -2761,6 +2762,7 @@ idle_session_timeout 0 NULL
index_join_streamer_batch_size 8.0 MiB NULL user NULL 8.0 MiB 8.0 MiB
index_recommendations_enabled off NULL user NULL on on
inject_retry_errors_enabled off NULL user NULL off off
inject_retry_errors_on_commit_enabled off NULL user NULL off off
integer_datetimes on NULL user NULL on on
intervalstyle postgres NULL user NULL postgres postgres
is_superuser on NULL user NULL on on
Expand Down Expand Up @@ -2904,6 +2906,7 @@ idle_session_timeout NULL NULL NULL
index_join_streamer_batch_size NULL NULL NULL NULL NULL
index_recommendations_enabled NULL NULL NULL NULL NULL
inject_retry_errors_enabled NULL NULL NULL NULL NULL
inject_retry_errors_on_commit_enabled NULL NULL NULL NULL NULL
integer_datetimes NULL NULL NULL NULL NULL
intervalstyle NULL NULL NULL NULL NULL
is_superuser NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ idle_session_timeout 0
index_join_streamer_batch_size 8.0 MiB
index_recommendations_enabled off
inject_retry_errors_enabled off
inject_retry_errors_on_commit_enabled off
integer_datetimes on
intervalstyle postgres
is_superuser on
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ message LocalOnlySessionData {
// inner, semi, and anti joins will be split. If false, only disjunctions
// potentially containing an equijoin condition will be split.
bool optimizer_use_improved_split_disjunction_for_joins = 91;
// InjectRetryErrorsOnCommitEnabled causes statements inside an explicit
// transaction to return a transaction retry error just before transcation commit.
// It is intended for developers to test their app's retry logic.
bool inject_retry_errors_on_commit_enabled = 92;

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,24 @@ var varGen = map[string]sessionVar{
GlobalDefault: globalFalse,
},

// CockroachDB extension. Allows for testing of transaction retry logic
// using the cockroach_restart savepoint.
`inject_retry_errors_on_commit_enabled`: {
GetStringVal: makePostgresBoolGetStringValFn(`inject_retry_errors_on_commit_enabled`),
Set: func(_ context.Context, m sessionDataMutator, s string) error {
b, err := paramparse.ParseBoolVar("inject_retry_errors_on_commit_enabled", s)
if err != nil {
return err
}
m.SetInjectRetryErrorsOnCommitEnabled(b)
return nil
},
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
return formatBoolAsPostgresSetting(evalCtx.SessionData().InjectRetryErrorsOnCommitEnabled), nil
},
GlobalDefault: globalFalse,
},

// CockroachDB extension.
`join_reader_ordering_strategy_batch_size`: {
Set: func(_ context.Context, m sessionDataMutator, s string) error {
Expand Down

0 comments on commit 1567139

Please sign in to comment.