Skip to content

Commit

Permalink
sql: fix auto-retries for upgraded transactions
Browse files Browse the repository at this point in the history
This is implemented by adding more state to the conn executor state
machine. Now, it tracks if the open transaction was upgraded from an
implicit to an explicit txn. If so, when doing an auto retry, the
transaction is marked as an implicit again, so that the statements in
the transaction can upgrade it to explicit.

No release note is needed for v22.2, but we should use the following
 note when backporting to v22.1.

Placeholder note (bug fix): Fixed a bug where some statements in a batch
would not get executed if the following conditions were met:
- A batch of statements is sent in a single string.
- A BEGIN statement appears in the middle of the batch.
- The enable_implicit_transaction_for_batch_statements session variable
  is set to true. (This defaults to false in v22.1.x)

This bug was introduced in v22.1.2.

Release note: None
  • Loading branch information
rafiss committed Jul 14, 2022
1 parent 76c45dc commit e95ed1a
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 79 deletions.
5 changes: 2 additions & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ func (s *Server) newConnExecutorWithTxn(
// initialize the state.
ex.machine = fsm.MakeMachine(
BoundTxnStateTransitions,
stateOpen{ImplicitTxn: fsm.False},
stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False},
&ex.state,
)
ex.state.resetForNewSQLTxn(
Expand Down Expand Up @@ -2209,8 +2209,7 @@ func (ex *connExecutor) updateTxnRewindPosMaybe(
return nil
}
if advInfo.txnEvent.eventType == txnStart ||
advInfo.txnEvent.eventType == txnRestart ||
advInfo.txnEvent.eventType == txnUpgradeToExplicit {
advInfo.txnEvent.eventType == txnRestart {
var nextPos CmdPos
switch advInfo.code {
case stayInPlace:
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) {

var fooTableId uint32
testDB.Exec(t, "SET enable_implicit_transaction_for_batch_statements = true")
testDB.Exec(t, "CREATE TABLE bar (a INT PRIMARY KEY)")
testDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)")
testDB.QueryRow(t, "SELECT 'foo'::regclass::oid").Scan(&fooTableId)

Expand All @@ -813,8 +814,14 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) {
return nil
})

testDB.Exec(t, "SELECT 1; BEGIN; INSERT INTO foo VALUES(1); COMMIT;")
testDB.Exec(t, "INSERT INTO bar VALUES(2); BEGIN; INSERT INTO foo VALUES(1); COMMIT;")
require.Equal(t, numToRetry+1, int(retryCount))

var x int
testDB.QueryRow(t, "select * from foo").Scan(&x)
require.Equal(t, 1, x)
testDB.QueryRow(t, "select * from bar").Scan(&x)
require.Equal(t, 2, x)
}

// This test ensures that when in an explicit transaction and statement
Expand Down
62 changes: 40 additions & 22 deletions pkg/sql/conn_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ func (stateNoTxn) String() string {
}

type stateOpen struct {
// ImplicitTxn is false if the txn included a BEGIN command.
ImplicitTxn fsm.Bool
// WasUpgraded is true if the txn started as implicit, but a BEGIN made it
// become explicit.
WasUpgraded fsm.Bool
}

var _ fsm.State = &stateOpen{}
Expand All @@ -60,7 +64,12 @@ func (stateOpen) String() string {

// stateAborted is entered on errors (retriable and non-retriable). A ROLLBACK
// TO SAVEPOINT can move the transaction back to stateOpen.
type stateAborted struct{}
type stateAborted struct {
// WasUpgraded is true if the txn started as implicit, but a BEGIN made it
// become explicit. This is needed so that when ROLLBACK TO SAVEPOINT moves
// to stateOpen, we keep tracking WasUpgraded correctly.
WasUpgraded fsm.Bool
}

var _ fsm.State = &stateAborted{}

Expand Down Expand Up @@ -231,7 +240,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
stateNoTxn{}: {
eventTxnStart{fsm.Var("implicitTxn")}: {
Description: "BEGIN, or before a statement running as an implicit txn",
Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")},
Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn"), WasUpgraded: fsm.False},
Action: noTxnToOpen,
},
eventNonRetriableErr{IsCommit: fsm.Any}: {
Expand All @@ -248,7 +257,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
},

/// Open
stateOpen{ImplicitTxn: fsm.Any}: {
stateOpen{ImplicitTxn: fsm.Any, WasUpgraded: fsm.Any}: {
eventTxnFinishCommitted{}: {
Description: "COMMIT, or after a statement running as an implicit txn",
Next: stateNoTxn{},
Expand Down Expand Up @@ -279,17 +288,15 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Action: cleanupAndFinishOnError,
},
},
stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}: {
stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}: {
// This is the case where we auto-retry.
eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: {
// Rewind and auto-retry - the transaction should stay in the Open state.
// Rewind and auto-retry - the transaction should stay in the Open state
Description: "Retriable err; will auto-retry",
Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")},
Next: stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False},
Action: prepareTxnForRetryWithRewind,
},
},
// Handle the errors in implicit txns. They move us to NoTxn.
stateOpen{ImplicitTxn: fsm.True}: {
// Handle the errors in implicit txns. They move us to NoTxn.
eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: {
Next: stateNoTxn{},
Action: cleanupAndFinishOnError,
Expand All @@ -298,8 +305,9 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Next: stateNoTxn{},
Action: cleanupAndFinishOnError,
},
// Handle a txn getting upgraded to an explicit txn.
eventTxnUpgradeToExplicit{}: {
Next: stateOpen{ImplicitTxn: fsm.False},
Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True},
Action: func(args fsm.Args) error {
args.Extended.(*txnState).setAdvanceInfo(
advanceOne,
Expand All @@ -310,10 +318,10 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
},
},
},
// Handle the errors in explicit txns. They move us to Aborted.
stateOpen{ImplicitTxn: fsm.False}: {
// Handle the errors in explicit txns.
stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}: {
eventNonRetriableErr{IsCommit: fsm.False}: {
Next: stateAborted{},
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
Action: func(args fsm.Args) error {
ts := args.Extended.(*txnState)
ts.setAdvanceInfo(skipBatch, noRewind, txnEvent{eventType: noEvent})
Expand All @@ -324,11 +332,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
// txnRestart output event.
eventTxnRestart{}: {
Description: "ROLLBACK TO SAVEPOINT cockroach_restart",
Next: stateOpen{ImplicitTxn: fsm.False},
Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")},
Action: prepareTxnForRetry,
},
eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: {
Next: stateAborted{},
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
Action: func(args fsm.Args) error {
args.Extended.(*txnState).setAdvanceInfo(
skipBatch,
Expand All @@ -338,6 +346,16 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
return nil
},
},
// This is the case where we auto-retry explicit transactions.
eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: {
// Rewind and auto-retry - the transaction should stay in the Open state.
// Retrying can cause the transaction to become implicit if we see that
// the transaction was previously upgraded. During the retry, BEGIN
// will be executed again and upgrade the transaction back to explicit.
Description: "Retriable err; will auto-retry",
Next: stateOpen{ImplicitTxn: fsm.Var("wasUpgraded"), WasUpgraded: fsm.False},
Action: prepareTxnForRetryWithRewind,
},
eventTxnReleased{}: {
Description: "RELEASE SAVEPOINT cockroach_restart",
Next: stateCommitWait{},
Expand All @@ -362,7 +380,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
//
// Note that we don't handle any error events here. Any statement but a
// ROLLBACK (TO SAVEPOINT) is expected to not be passed to the state machine.
stateAborted{}: {
stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}: {
eventTxnFinishAborted{}: {
Description: "ROLLBACK",
Next: stateNoTxn{},
Expand All @@ -378,7 +396,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
eventNonRetriableErr{IsCommit: fsm.False}: {
// This event doesn't change state, but it returns a skipBatch code.
Description: "any other statement",
Next: stateAborted{},
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
Action: func(args fsm.Args) error {
args.Extended.(*txnState).setAdvanceInfo(
skipBatch,
Expand All @@ -392,13 +410,13 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
eventNonRetriableErr{IsCommit: fsm.True}: {
// This event doesn't change state, but it returns a skipBatch code.
Description: "ConnExecutor closing",
Next: stateAborted{},
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
Action: cleanupAndFinishOnError,
},
// ROLLBACK TO SAVEPOINT <not cockroach_restart> success.
eventSavepointRollback{}: {
Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) success",
Next: stateOpen{ImplicitTxn: fsm.False},
Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")},
Action: func(args fsm.Args) error {
args.Extended.(*txnState).setAdvanceInfo(
advanceOne,
Expand All @@ -412,7 +430,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
eventRetriableErr{CanAutoRetry: fsm.Any, IsCommit: fsm.Any}: {
// This event doesn't change state, but it returns a skipBatch code.
Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart",
Next: stateAborted{},
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
Action: func(args fsm.Args) error {
args.Extended.(*txnState).setAdvanceInfo(
skipBatch,
Expand All @@ -425,7 +443,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
// ROLLBACK TO SAVEPOINT cockroach_restart.
eventTxnRestart{}: {
Description: "ROLLBACK TO SAVEPOINT cockroach_restart",
Next: stateOpen{ImplicitTxn: fsm.False},
Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")},
Action: prepareTxnForRetry,
},
},
Expand Down Expand Up @@ -559,7 +577,7 @@ func prepareTxnForRetryWithRewind(args fsm.Args) error {
// when running SQL inside a higher-level txn. It's a very limited state
// machine: it doesn't allow starting or finishing txns, auto-retries, etc.
var BoundTxnStateTransitions = fsm.Compile(fsm.Pattern{
stateOpen{ImplicitTxn: fsm.False}: {
stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}: {
// We accept eventNonRetriableErr with both IsCommit={True, fsm.False}, even
// though this state machine does not support COMMIT statements because
// connExecutor.close() sends an eventNonRetriableErr{IsCommit: fsm.True} event.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/txn_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ const (
// explicitTxn means that the txn was explicitly started with a BEGIN
// statement.
explicitTxn
// upgradedExplicitTxn means that the txn started as implicit, but a BEGIN
// in the middle of it caused it to become explicit.
upgradedExplicitTxn
)

// resetForNewSQLTxn (re)initializes the txnState for a new transaction.
Expand Down
Loading

0 comments on commit e95ed1a

Please sign in to comment.