Skip to content

Commit

Permalink
Merge pull request #84593 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1-84389
  • Loading branch information
rafiss authored Jul 19, 2022
2 parents ed20c8b + d43e68f commit 3466754
Show file tree
Hide file tree
Showing 9 changed files with 387 additions and 79 deletions.
5 changes: 2 additions & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ func (s *Server) newConnExecutorWithTxn(
// initialize the state.
ex.machine = fsm.MakeMachine(
BoundTxnStateTransitions,
stateOpen{ImplicitTxn: fsm.False},
stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False},
&ex.state,
)
ex.state.resetForNewSQLTxn(
Expand Down Expand Up @@ -2183,8 +2183,7 @@ func (ex *connExecutor) updateTxnRewindPosMaybe(
return nil
}
if advInfo.txnEvent.eventType == txnStart ||
advInfo.txnEvent.eventType == txnRestart ||
advInfo.txnEvent.eventType == txnUpgradeToExplicit {
advInfo.txnEvent.eventType == txnRestart {
var nextPos CmdPos
switch advInfo.code {
case stayInPlace:
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) {

var fooTableId uint32
testDB.Exec(t, "SET enable_implicit_transaction_for_batch_statements = true")
testDB.Exec(t, "CREATE TABLE bar (a INT PRIMARY KEY)")
testDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)")
testDB.QueryRow(t, "SELECT 'foo'::regclass::oid").Scan(&fooTableId)

Expand All @@ -792,8 +793,14 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) {
return nil
})

testDB.Exec(t, "SELECT 1; BEGIN; INSERT INTO foo VALUES(1); COMMIT;")
testDB.Exec(t, "INSERT INTO bar VALUES(2); BEGIN; INSERT INTO foo VALUES(1); COMMIT;")
require.Equal(t, numToRetry+1, int(retryCount))

var x int
testDB.QueryRow(t, "select * from foo").Scan(&x)
require.Equal(t, 1, x)
testDB.QueryRow(t, "select * from bar").Scan(&x)
require.Equal(t, 2, x)
}

// This test ensures that when in an explicit transaction and statement
Expand Down
68 changes: 46 additions & 22 deletions pkg/sql/conn_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ func (stateNoTxn) String() string {
}

type stateOpen struct {
// ImplicitTxn is false if the txn included a BEGIN command.
ImplicitTxn fsm.Bool
// WasUpgraded is true if the txn started as implicit, but a BEGIN made it
// become explicit.
WasUpgraded fsm.Bool
}

var _ fsm.State = &stateOpen{}
Expand All @@ -59,7 +63,12 @@ func (stateOpen) String() string {

// stateAborted is entered on errors (retriable and non-retriable). A ROLLBACK
// TO SAVEPOINT can move the transaction back to stateOpen.
type stateAborted struct{}
type stateAborted struct {
// WasUpgraded is true if the txn started as implicit, but a BEGIN made it
// become explicit. This is needed so that when ROLLBACK TO SAVEPOINT moves
// to stateOpen, we keep tracking WasUpgraded correctly.
WasUpgraded fsm.Bool
}

var _ fsm.State = &stateAborted{}

Expand Down Expand Up @@ -230,7 +239,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
stateNoTxn{}: {
eventTxnStart{fsm.Var("implicitTxn")}: {
Description: "BEGIN, or before a statement running as an implicit txn",
Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")},
Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn"), WasUpgraded: fsm.False},
Action: noTxnToOpen,
},
eventNonRetriableErr{IsCommit: fsm.Any}: {
Expand All @@ -247,7 +256,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
},

/// Open
stateOpen{ImplicitTxn: fsm.Any}: {
stateOpen{ImplicitTxn: fsm.Any, WasUpgraded: fsm.Any}: {
eventTxnFinishCommitted{}: {
Description: "COMMIT, or after a statement running as an implicit txn",
Next: stateNoTxn{},
Expand Down Expand Up @@ -278,17 +287,15 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Action: cleanupAndFinishOnError,
},
},
stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}: {
stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}: {
// This is the case where we auto-retry.
eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: {
// Rewind and auto-retry - the transaction should stay in the Open state.
// Rewind and auto-retry - the transaction should stay in the Open state
Description: "Retriable err; will auto-retry",
Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")},
Next: stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False},
Action: prepareTxnForRetryWithRewind,
},
},
// Handle the errors in implicit txns. They move us to NoTxn.
stateOpen{ImplicitTxn: fsm.True}: {
// Handle the errors in implicit txns. They move us to NoTxn.
eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: {
Next: stateNoTxn{},
Action: cleanupAndFinishOnError,
Expand All @@ -297,8 +304,9 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Next: stateNoTxn{},
Action: cleanupAndFinishOnError,
},
// Handle a txn getting upgraded to an explicit txn.
eventTxnUpgradeToExplicit{}: {
Next: stateOpen{ImplicitTxn: fsm.False},
Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True},
Action: func(args fsm.Args) error {
args.Extended.(*txnState).setAdvanceInfo(
advanceOne,
Expand All @@ -309,10 +317,10 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
},
},
},
// Handle the errors in explicit txns. They move us to Aborted.
stateOpen{ImplicitTxn: fsm.False}: {
stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}: {
// Handle the errors in explicit txns.
eventNonRetriableErr{IsCommit: fsm.False}: {
Next: stateAborted{},
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
Action: func(args fsm.Args) error {
ts := args.Extended.(*txnState)
ts.setAdvanceInfo(skipBatch, noRewind, txnEvent{eventType: noEvent})
Expand All @@ -321,13 +329,15 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
},
// ROLLBACK TO SAVEPOINT cockroach. There's not much to do other than generating a
// txnRestart output event.
// The next state must be an explicit txn, since the SAVEPOINT
// creation can only happen in an explicit txn.
eventTxnRestart{}: {
Description: "ROLLBACK TO SAVEPOINT cockroach_restart",
Next: stateOpen{ImplicitTxn: fsm.False},
Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")},
Action: prepareTxnForRetry,
},
eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: {
Next: stateAborted{},
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
Action: func(args fsm.Args) error {
args.Extended.(*txnState).setAdvanceInfo(
skipBatch,
Expand All @@ -337,6 +347,16 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
return nil
},
},
// This is the case where we auto-retry explicit transactions.
eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: {
// Rewind and auto-retry - the transaction should stay in the Open state.
// Retrying can cause the transaction to become implicit if we see that
// the transaction was previously upgraded. During the retry, BEGIN
// will be executed again and upgrade the transaction back to explicit.
Description: "Retriable err; will auto-retry",
Next: stateOpen{ImplicitTxn: fsm.Var("wasUpgraded"), WasUpgraded: fsm.False},
Action: prepareTxnForRetryWithRewind,
},
eventTxnReleased{}: {
Description: "RELEASE SAVEPOINT cockroach_restart",
Next: stateCommitWait{},
Expand All @@ -359,7 +379,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
//
// Note that we don't handle any error events here. Any statement but a
// ROLLBACK (TO SAVEPOINT) is expected to not be passed to the state machine.
stateAborted{}: {
stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}: {
eventTxnFinishAborted{}: {
Description: "ROLLBACK",
Next: stateNoTxn{},
Expand All @@ -375,7 +395,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
eventNonRetriableErr{IsCommit: fsm.False}: {
// This event doesn't change state, but it returns a skipBatch code.
Description: "any other statement",
Next: stateAborted{},
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
Action: func(args fsm.Args) error {
args.Extended.(*txnState).setAdvanceInfo(
skipBatch,
Expand All @@ -389,13 +409,15 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
eventNonRetriableErr{IsCommit: fsm.True}: {
// This event doesn't change state, but it returns a skipBatch code.
Description: "ConnExecutor closing",
Next: stateAborted{},
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
Action: cleanupAndFinishOnError,
},
// ROLLBACK TO SAVEPOINT <not cockroach_restart> success.
// The next state must be an explicit txn, since the SAVEPOINT
// creation can only happen in an explicit txn.
eventSavepointRollback{}: {
Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) success",
Next: stateOpen{ImplicitTxn: fsm.False},
Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")},
Action: func(args fsm.Args) error {
args.Extended.(*txnState).setAdvanceInfo(
advanceOne,
Expand All @@ -409,7 +431,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
eventRetriableErr{CanAutoRetry: fsm.Any, IsCommit: fsm.Any}: {
// This event doesn't change state, but it returns a skipBatch code.
Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart",
Next: stateAborted{},
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
Action: func(args fsm.Args) error {
args.Extended.(*txnState).setAdvanceInfo(
skipBatch,
Expand All @@ -420,9 +442,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
},
},
// ROLLBACK TO SAVEPOINT cockroach_restart.
// The next state must be an explicit txn, since the SAVEPOINT
// creation can only happen in an explicit txn.
eventTxnRestart{}: {
Description: "ROLLBACK TO SAVEPOINT cockroach_restart",
Next: stateOpen{ImplicitTxn: fsm.False},
Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")},
Action: prepareTxnForRetry,
},
},
Expand Down Expand Up @@ -547,7 +571,7 @@ func prepareTxnForRetryWithRewind(args fsm.Args) error {
// when running SQL inside a higher-level txn. It's a very limited state
// machine: it doesn't allow starting or finishing txns, auto-retries, etc.
var BoundTxnStateTransitions = fsm.Compile(fsm.Pattern{
stateOpen{ImplicitTxn: fsm.False}: {
stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}: {
// We accept eventNonRetriableErr with both IsCommit={True, fsm.False}, even
// though this state machine does not support COMMIT statements because
// connExecutor.close() sends an eventNonRetriableErr{IsCommit: fsm.True} event.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/txn_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ const (
// explicitTxn means that the txn was explicitly started with a BEGIN
// statement.
explicitTxn
// upgradedExplicitTxn means that the txn started as implicit, but a BEGIN
// in the middle of it caused it to become explicit.
upgradedExplicitTxn
)

// resetForNewSQLTxn (re)initializes the txnState for a new transaction.
Expand Down
Loading

0 comments on commit 3466754

Please sign in to comment.