From 5a1e02a34a167fe61510d92b15f9f3fca8abe0c4 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Wed, 13 Jul 2022 20:40:34 +0000 Subject: [PATCH] sql: fix auto-retries for upgraded transactions This is implemented by adding more state to the conn executor state machine. Now, it tracks if the open transaction was upgraded from an implicit to an explicit txn. If so, when doing an auto retry, the transaction is marked as an implicit again, so that the statements in the transaction can upgrade it to explicit. Release note (bug fix): Fixed a bug where some statements in a batch would not get executed if the following conditions were met: - A batch of statements is sent in a single string. - A BEGIN statement appears in the middle of the batch. - The enable_implicit_transaction_for_batch_statements session variable is set to true. (This defaults to false in v22.1.x) This bug was introduced in v22.1.2. --- pkg/sql/conn_executor.go | 5 +- pkg/sql/conn_executor_test.go | 9 +- pkg/sql/conn_fsm.go | 68 ++++--- pkg/sql/txn_state.go | 3 + pkg/sql/txn_state_test.go | 240 +++++++++++++++++++++++-- pkg/sql/txnstatetransitions_diagram.gv | 79 ++++---- pkg/sql/txnstatetransitions_report.txt | 58 +++++- pkg/sql/txntype_string.go | 5 +- pkg/util/fsm/debug.go | 2 +- 9 files changed, 390 insertions(+), 79 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 704e10beb216..b54e16bd8ef9 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -987,7 +987,7 @@ func (s *Server) newConnExecutorWithTxn( // initialize the state. ex.machine = fsm.MakeMachine( BoundTxnStateTransitions, - stateOpen{ImplicitTxn: fsm.False}, + stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, &ex.state, ) ex.state.resetForNewSQLTxn( @@ -2172,8 +2172,7 @@ func (ex *connExecutor) updateTxnRewindPosMaybe( return nil } if advInfo.txnEvent.eventType == txnStart || - advInfo.txnEvent.eventType == txnRestart || - advInfo.txnEvent.eventType == txnUpgradeToExplicit { + advInfo.txnEvent.eventType == txnRestart { var nextPos CmdPos switch advInfo.code { case stayInPlace: diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 0cc96c1b727e..da12716d8dad 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -769,6 +769,7 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) { var fooTableId uint32 testDB.Exec(t, "SET enable_implicit_transaction_for_batch_statements = true") + testDB.Exec(t, "CREATE TABLE bar (a INT PRIMARY KEY)") testDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)") testDB.QueryRow(t, "SELECT 'foo'::regclass::oid").Scan(&fooTableId) @@ -792,8 +793,14 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) { return nil }) - testDB.Exec(t, "SELECT 1; BEGIN; INSERT INTO foo VALUES(1); COMMIT;") + testDB.Exec(t, "INSERT INTO bar VALUES(2); BEGIN; INSERT INTO foo VALUES(1); COMMIT;") require.Equal(t, numToRetry+1, int(retryCount)) + + var x int + testDB.QueryRow(t, "select * from foo").Scan(&x) + require.Equal(t, 1, x) + testDB.QueryRow(t, "select * from bar").Scan(&x) + require.Equal(t, 2, x) } // This test ensures that when in an explicit transaction and statement diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index 177e43af8ae1..e733ad027573 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -48,7 +48,11 @@ func (stateNoTxn) String() string { } type stateOpen struct { + // ImplicitTxn is false if the txn included a BEGIN command. ImplicitTxn fsm.Bool + // WasUpgraded is true if the txn started as implicit, but a BEGIN made it + // become explicit. + WasUpgraded fsm.Bool } var _ fsm.State = &stateOpen{} @@ -59,7 +63,12 @@ func (stateOpen) String() string { // stateAborted is entered on errors (retriable and non-retriable). A ROLLBACK // TO SAVEPOINT can move the transaction back to stateOpen. -type stateAborted struct{} +type stateAborted struct { + // WasUpgraded is true if the txn started as implicit, but a BEGIN made it + // become explicit. This is needed so that when ROLLBACK TO SAVEPOINT moves + // to stateOpen, we keep tracking WasUpgraded correctly. + WasUpgraded fsm.Bool +} var _ fsm.State = &stateAborted{} @@ -230,7 +239,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ stateNoTxn{}: { eventTxnStart{fsm.Var("implicitTxn")}: { Description: "BEGIN, or before a statement running as an implicit txn", - Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}, + Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn"), WasUpgraded: fsm.False}, Action: noTxnToOpen, }, eventNonRetriableErr{IsCommit: fsm.Any}: { @@ -247,7 +256,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ }, /// Open - stateOpen{ImplicitTxn: fsm.Any}: { + stateOpen{ImplicitTxn: fsm.Any, WasUpgraded: fsm.Any}: { eventTxnFinishCommitted{}: { Description: "COMMIT, or after a statement running as an implicit txn", Next: stateNoTxn{}, @@ -278,17 +287,15 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Action: cleanupAndFinishOnError, }, }, - stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}: { + stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}: { // This is the case where we auto-retry. eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: { - // Rewind and auto-retry - the transaction should stay in the Open state. + // Rewind and auto-retry - the transaction should stay in the Open state Description: "Retriable err; will auto-retry", - Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}, + Next: stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}, Action: prepareTxnForRetryWithRewind, }, - }, - // Handle the errors in implicit txns. They move us to NoTxn. - stateOpen{ImplicitTxn: fsm.True}: { + // Handle the errors in implicit txns. They move us to NoTxn. eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { Next: stateNoTxn{}, Action: cleanupAndFinishOnError, @@ -297,8 +304,9 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Next: stateNoTxn{}, Action: cleanupAndFinishOnError, }, + // Handle a txn getting upgraded to an explicit txn. eventTxnUpgradeToExplicit{}: { - Next: stateOpen{ImplicitTxn: fsm.False}, + Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True}, Action: func(args fsm.Args) error { args.Extended.(*txnState).setAdvanceInfo( advanceOne, @@ -309,10 +317,10 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ }, }, }, - // Handle the errors in explicit txns. They move us to Aborted. - stateOpen{ImplicitTxn: fsm.False}: { + stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}: { + // Handle the errors in explicit txns. eventNonRetriableErr{IsCommit: fsm.False}: { - Next: stateAborted{}, + Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}, Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) ts.setAdvanceInfo(skipBatch, noRewind, txnEvent{eventType: noEvent}) @@ -321,13 +329,15 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ }, // ROLLBACK TO SAVEPOINT cockroach. There's not much to do other than generating a // txnRestart output event. + // The next state must be an explicit txn, since the SAVEPOINT + // creation can only happen in an explicit txn. eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", - Next: stateOpen{ImplicitTxn: fsm.False}, + Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}, Action: prepareTxnForRetry, }, eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { - Next: stateAborted{}, + Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}, Action: func(args fsm.Args) error { args.Extended.(*txnState).setAdvanceInfo( skipBatch, @@ -337,6 +347,16 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, + // This is the case where we auto-retry explicit transactions. + eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: { + // Rewind and auto-retry - the transaction should stay in the Open state. + // Retrying can cause the transaction to become implicit if we see that + // the transaction was previously upgraded. During the retry, BEGIN + // will be executed again and upgrade the transaction back to explicit. + Description: "Retriable err; will auto-retry", + Next: stateOpen{ImplicitTxn: fsm.Var("wasUpgraded"), WasUpgraded: fsm.False}, + Action: prepareTxnForRetryWithRewind, + }, eventTxnReleased{}: { Description: "RELEASE SAVEPOINT cockroach_restart", Next: stateCommitWait{}, @@ -359,7 +379,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ // // Note that we don't handle any error events here. Any statement but a // ROLLBACK (TO SAVEPOINT) is expected to not be passed to the state machine. - stateAborted{}: { + stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}: { eventTxnFinishAborted{}: { Description: "ROLLBACK", Next: stateNoTxn{}, @@ -375,7 +395,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventNonRetriableErr{IsCommit: fsm.False}: { // This event doesn't change state, but it returns a skipBatch code. Description: "any other statement", - Next: stateAborted{}, + Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}, Action: func(args fsm.Args) error { args.Extended.(*txnState).setAdvanceInfo( skipBatch, @@ -389,13 +409,15 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventNonRetriableErr{IsCommit: fsm.True}: { // This event doesn't change state, but it returns a skipBatch code. Description: "ConnExecutor closing", - Next: stateAborted{}, + Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}, Action: cleanupAndFinishOnError, }, // ROLLBACK TO SAVEPOINT success. + // The next state must be an explicit txn, since the SAVEPOINT + // creation can only happen in an explicit txn. eventSavepointRollback{}: { Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) success", - Next: stateOpen{ImplicitTxn: fsm.False}, + Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}, Action: func(args fsm.Args) error { args.Extended.(*txnState).setAdvanceInfo( advanceOne, @@ -409,7 +431,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventRetriableErr{CanAutoRetry: fsm.Any, IsCommit: fsm.Any}: { // This event doesn't change state, but it returns a skipBatch code. Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart", - Next: stateAborted{}, + Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}, Action: func(args fsm.Args) error { args.Extended.(*txnState).setAdvanceInfo( skipBatch, @@ -420,9 +442,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ }, }, // ROLLBACK TO SAVEPOINT cockroach_restart. + // The next state must be an explicit txn, since the SAVEPOINT + // creation can only happen in an explicit txn. eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", - Next: stateOpen{ImplicitTxn: fsm.False}, + Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}, Action: prepareTxnForRetry, }, }, @@ -547,7 +571,7 @@ func prepareTxnForRetryWithRewind(args fsm.Args) error { // when running SQL inside a higher-level txn. It's a very limited state // machine: it doesn't allow starting or finishing txns, auto-retries, etc. var BoundTxnStateTransitions = fsm.Compile(fsm.Pattern{ - stateOpen{ImplicitTxn: fsm.False}: { + stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}: { // We accept eventNonRetriableErr with both IsCommit={True, fsm.False}, even // though this state machine does not support COMMIT statements because // connExecutor.close() sends an eventNonRetriableErr{IsCommit: fsm.True} event. diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 7ace0366a092..cd7c727c8ac8 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -132,6 +132,9 @@ const ( // explicitTxn means that the txn was explicitly started with a BEGIN // statement. explicitTxn + // upgradedExplicitTxn means that the txn started as implicit, but a BEGIN + // in the middle of it caused it to become explicit. + upgradedExplicitTxn ) // resetForNewSQLTxn (re)initializes the txnState for a new transaction. diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index 1e88bc1c2754..e81052df089d 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -109,14 +109,17 @@ func (tc *testContext) createOpenState(typ txnType) (fsm.State, *txnState) { state := stateOpen{ ImplicitTxn: fsm.FromBool(typ == implicitTxn), + WasUpgraded: fsm.FromBool(typ == upgradedExplicitTxn), } return state, &ts } // createAbortedState returns a txnState initialized with an aborted txn. -func (tc *testContext) createAbortedState() (fsm.State, *txnState) { - _, ts := tc.createOpenState(explicitTxn) - return stateAborted{}, ts +func (tc *testContext) createAbortedState(typ txnType) (fsm.State, *txnState) { + _, ts := tc.createOpenState(typ) + return stateAborted{ + WasUpgraded: fsm.FromBool(typ == upgradedExplicitTxn), + }, ts } func (tc *testContext) createCommitWaitState() (fsm.State, *txnState, error) { @@ -294,7 +297,7 @@ func TestTransitions(t *testing.T) { ev: eventTxnStart{ImplicitTxn: fsm.True}, evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(), nil /* historicalTimestamp */, tranCtx, sessiondatapb.Normal), - expState: stateOpen{ImplicitTxn: fsm.True}, + expState: stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}, expAdv: expAdvance{ // We expect to stayInPlace; upon starting a txn the statement is // executed again, this time in state Open. @@ -319,7 +322,7 @@ func TestTransitions(t *testing.T) { ev: eventTxnStart{ImplicitTxn: fsm.False}, evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(), nil /* historicalTimestamp */, tranCtx, sessiondatapb.Normal), - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, expAdv: expAdvance{ expCode: advanceOne, expEv: txnStart, @@ -377,6 +380,27 @@ func TestTransitions(t *testing.T) { }, expTxn: nil, }, + { + // Finish an upgraded explicit txn. + name: "Open (upgraded explicit) -> NoTxn", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + // We commit the KV transaction, as that's done by the layer below + // txnState. + if err := ts.mu.txn.Commit(ts.Ctx); err != nil { + return nil, nil, emptyTxnID, err + } + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventTxnFinishCommitted{}, + evPayload: nil, + expState: stateNoTxn{}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: txnCommit, + }, + expTxn: nil, + }, { // Get a retriable error while we can auto-retry. name: "Open + auto-retry", @@ -391,7 +415,31 @@ func TestTransitions(t *testing.T) { } return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.False}, b }, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: rewind, + expEv: txnRestart, + }, + // Expect non-nil txn. + expTxn: &expKVTxn{}, + expAutoRetryCounter: 1, + expAutoRetryError: "test retriable err", + }, + { + // Get a retriable error while we can auto-retry. + name: "Open + auto-retry on upgraded txn", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) { + b := eventRetriableErrPayload{ + err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"), + rewCap: dummyRewCap, + } + return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.False}, b + }, + expState: stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}, expAdv: expAdvance{ expCode: rewind, expEv: txnRestart, @@ -415,7 +463,32 @@ func TestTransitions(t *testing.T) { } return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.True}, b }, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: rewind, + expEv: txnRestart, + }, + // Expect non-nil txn. + expTxn: &expKVTxn{}, + expAutoRetryCounter: 1, + }, + { + // Like the above test - get a retriable error while we can auto-retry, + // except this time the txn was upgraded to explicit. This shouldn't make + // any difference; we should still auto-retry like the above. + name: "Open + auto-retry (COMMIT) on upgraded txn", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) { + b := eventRetriableErrPayload{ + err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"), + rewCap: dummyRewCap, + } + return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.True}, b + }, + expState: stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}, expAdv: expAdvance{ expCode: rewind, expEv: txnRestart, @@ -438,7 +511,34 @@ func TestTransitions(t *testing.T) { } return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b }, - expState: stateAborted{}, + expState: stateAborted{ + WasUpgraded: fsm.False, + }, + expAdv: expAdvance{ + expCode: skipBatch, + expEv: noEvent, + }, + // Expect non-nil txn. + expTxn: &expKVTxn{}, + }, + { + // Get a retriable error when we can no longer auto-retry, but the client + // is doing client-side retries. + name: "Open (upgraded) + client retry", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) { + b := eventRetriableErrPayload{ + err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"), + rewCap: rewindCapability{}, + } + return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b + }, + expState: stateAborted{ + WasUpgraded: fsm.True, + }, expAdv: expAdvance{ expCode: skipBatch, expEv: noEvent, @@ -522,7 +622,27 @@ func TestTransitions(t *testing.T) { }, ev: eventNonRetriableErr{IsCommit: fsm.False}, evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")}, - expState: stateAborted{}, + expState: stateAborted{ + WasUpgraded: fsm.False, + }, + expAdv: expAdvance{ + expCode: skipBatch, + expEv: noEvent, + }, + expTxn: &expKVTxn{}, + }, + { + // We get a non-retriable error. + name: "Open (upgraded) + non-retriable error", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventNonRetriableErr{IsCommit: fsm.False}, + evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")}, + expState: stateAborted{ + WasUpgraded: fsm.True, + }, expAdv: expAdvance{ expCode: skipBatch, expEv: noEvent, @@ -554,7 +674,24 @@ func TestTransitions(t *testing.T) { return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnRestart{}, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: txnRestart, + }, + // We would like to test that the transaction's epoch bumped if the txn + // performed any operations, but it's not easy to do the test. + expTxn: &expKVTxn{}, + }, + { + // Restarting from Open via ROLLBACK TO SAVEPOINT on an upgraded txn. + name: "Open (upgraded) + restart", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventTxnRestart{}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True}, expAdv: expAdvance{ expCode: advanceOne, expEv: txnRestart, @@ -570,7 +707,22 @@ func TestTransitions(t *testing.T) { // The txn finished, such as after a ROLLBACK. name: "Aborted->NoTxn", init: func() (fsm.State, *txnState, uuid.UUID, error) { - s, ts := testCon.createAbortedState() + s, ts := testCon.createAbortedState(explicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventTxnFinishAborted{}, + expState: stateNoTxn{}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: txnRollback, + }, + expTxn: nil, + }, + { + // The upgraded txn finished, such as after a ROLLBACK. + name: "Aborted(upgraded)->NoTxn", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createAbortedState(upgradedExplicitTxn) return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnFinishAborted{}, @@ -585,11 +737,26 @@ func TestTransitions(t *testing.T) { // The txn is starting again (ROLLBACK TO SAVEPOINT while in Aborted). name: "Aborted->Open", init: func() (fsm.State, *txnState, uuid.UUID, error) { - s, ts := testCon.createAbortedState() + s, ts := testCon.createAbortedState(explicitTxn) return s, ts, ts.mu.txn.ID(), nil }, ev: eventSavepointRollback{}, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: noEvent, + }, + expTxn: &expKVTxn{}, + }, + { + // The upgraded txn is starting again (ROLLBACK TO SAVEPOINT while in Aborted). + name: "Aborted(upgraded)->Open", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createAbortedState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventSavepointRollback{}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True}, expAdv: expAdvance{ expCode: advanceOne, expEv: noEvent, @@ -600,11 +767,31 @@ func TestTransitions(t *testing.T) { // The txn is starting again (ROLLBACK TO SAVEPOINT cockroach_restart while in Aborted). name: "Aborted->Restart", init: func() (fsm.State, *txnState, uuid.UUID, error) { - s, ts := testCon.createAbortedState() + s, ts := testCon.createAbortedState(explicitTxn) return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnRestart{}, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: txnRestart, + }, + expTxn: &expKVTxn{ + userPriority: &pri, + writeTSNanos: &now.WallTime, + readTSNanos: &now.WallTime, + uncertaintyLimitNanos: &uncertaintyLimit.WallTime, + }, + }, + { + // The upgraded txn is starting again (ROLLBACK TO SAVEPOINT cockroach_restart while in Aborted). + name: "Aborted(upgraded)->Restart", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createAbortedState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventTxnRestart{}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True}, expAdv: expAdvance{ expCode: advanceOne, expEv: txnRestart, @@ -622,11 +809,30 @@ func TestTransitions(t *testing.T) { // to the expTxn. name: "Aborted->Starting (historical)", init: func() (fsm.State, *txnState, uuid.UUID, error) { - s, ts := testCon.createAbortedState() + s, ts := testCon.createAbortedState(explicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventTxnRestart{}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: txnRestart, + }, + expTxn: &expKVTxn{ + writeTSNanos: proto.Int64(now.WallTime), + }, + }, + { + // The upgraded txn is starting again (e.g. ROLLBACK TO SAVEPOINT while in Aborted). + // Verify that the historical timestamp from the evPayload is propagated + // to the expTxn. + name: "Aborted(upgraded)->Starting (historical)", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createAbortedState(upgradedExplicitTxn) return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnRestart{}, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True}, expAdv: expAdvance{ expCode: advanceOne, expEv: txnRestart, diff --git a/pkg/sql/txnstatetransitions_diagram.gv b/pkg/sql/txnstatetransitions_diagram.gv index 64e31bc7d849..bdb0cf593d06 100644 --- a/pkg/sql/txnstatetransitions_diagram.gv +++ b/pkg/sql/txnstatetransitions_diagram.gv @@ -13,39 +13,58 @@ digraph finite_state_machine { qi -> "NoTxn{}"; node [shape = circle]; - "Aborted{}" -> "Aborted{}" [label = any other statement>] - "Aborted{}" -> "Aborted{}" [label = ConnExecutor closing>] - "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] - "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] - "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] - "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] - "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) success>] - "Aborted{}" -> "NoTxn{}" [label = ROLLBACK>] - "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = any other statement>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = ConnExecutor closing>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) success>] + "Aborted{WasUpgraded:false}" -> "NoTxn{}" [label = ROLLBACK>] + "Aborted{WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = any other statement>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = ConnExecutor closing>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:true}" -> "Open{ImplicitTxn:false, WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) success>] + "Aborted{WasUpgraded:true}" -> "NoTxn{}" [label = ROLLBACK>] + "Aborted{WasUpgraded:true}" -> "Open{ImplicitTxn:false, WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] "CommitWait{}" -> "CommitWait{}" [label = any other statement>] "CommitWait{}" -> "CommitWait{}" [label = any other statement>] "CommitWait{}" -> "NoTxn{}" [label = COMMIT>] "NoTxn{}" -> "NoTxn{}" [label = anything but BEGIN or extended protocol command error>] "NoTxn{}" -> "NoTxn{}" [label = anything but BEGIN or extended protocol command error>] - "NoTxn{}" -> "Open{ImplicitTxn:false}" [label = BEGIN, or before a statement running as an implicit txn>] - "NoTxn{}" -> "Open{ImplicitTxn:true}" [label = BEGIN, or before a statement running as an implicit txn>] - "Open{ImplicitTxn:false}" -> "Aborted{}" [label = "NonRetriableErr{IsCommit:false}"] - "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] - "Open{ImplicitTxn:false}" -> "Aborted{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] - "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = Retriable err on COMMIT>] - "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = Retriable err; will auto-retry>] - "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = Retriable err; will auto-retry>] - "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] - "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] - "Open{ImplicitTxn:false}" -> "CommitWait{}" [label = RELEASE SAVEPOINT cockroach_restart>] - "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:false}"] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = Retriable err on COMMIT>] - "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] - "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] - "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:false}" [label = "TxnUpgradeToExplicit{}"] + "NoTxn{}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = BEGIN, or before a statement running as an implicit txn>] + "NoTxn{}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = BEGIN, or before a statement running as an implicit txn>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = "NonRetriableErr{IsCommit:false}"] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = Retriable err on COMMIT>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "CommitWait{}" [label = RELEASE SAVEPOINT cockroach_restart>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = "NonRetriableErr{IsCommit:false}"] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = Retriable err on COMMIT>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "CommitWait{}" [label = RELEASE SAVEPOINT cockroach_restart>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Open{ImplicitTxn:false, WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:false}"] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = Retriable err on COMMIT>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:true}" [label = "TxnUpgradeToExplicit{}"] } diff --git a/pkg/sql/txnstatetransitions_report.txt b/pkg/sql/txnstatetransitions_report.txt index e04d7d82582b..40bad130717f 100644 --- a/pkg/sql/txnstatetransitions_report.txt +++ b/pkg/sql/txnstatetransitions_report.txt @@ -1,6 +1,23 @@ // Code generated; DO NOT EDIT. -Aborted{} +Aborted{WasUpgraded:false} + handled events: + NonRetriableErr{IsCommit:false} + NonRetriableErr{IsCommit:true} + RetriableErr{CanAutoRetry:false, IsCommit:false} + RetriableErr{CanAutoRetry:false, IsCommit:true} + RetriableErr{CanAutoRetry:true, IsCommit:false} + RetriableErr{CanAutoRetry:true, IsCommit:true} + SavepointRollback{} + TxnFinishAborted{} + TxnRestart{} + missing events: + TxnFinishCommitted{} + TxnReleased{} + TxnStart{ImplicitTxn:false} + TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} +Aborted{WasUpgraded:true} handled events: NonRetriableErr{IsCommit:false} NonRetriableErr{IsCommit:true} @@ -51,7 +68,24 @@ NoTxn{} TxnReleased{} TxnRestart{} TxnUpgradeToExplicit{} -Open{ImplicitTxn:false} +Open{ImplicitTxn:false, WasUpgraded:false} + handled events: + NonRetriableErr{IsCommit:false} + NonRetriableErr{IsCommit:true} + RetriableErr{CanAutoRetry:false, IsCommit:false} + RetriableErr{CanAutoRetry:false, IsCommit:true} + RetriableErr{CanAutoRetry:true, IsCommit:false} + RetriableErr{CanAutoRetry:true, IsCommit:true} + TxnFinishAborted{} + TxnFinishCommitted{} + TxnReleased{} + TxnRestart{} + missing events: + SavepointRollback{} + TxnStart{ImplicitTxn:false} + TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} +Open{ImplicitTxn:false, WasUpgraded:true} handled events: NonRetriableErr{IsCommit:false} NonRetriableErr{IsCommit:true} @@ -68,7 +102,7 @@ Open{ImplicitTxn:false} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} TxnUpgradeToExplicit{} -Open{ImplicitTxn:true} +Open{ImplicitTxn:true, WasUpgraded:false} handled events: NonRetriableErr{IsCommit:false} NonRetriableErr{IsCommit:true} @@ -85,3 +119,21 @@ Open{ImplicitTxn:true} TxnRestart{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} +Open{ImplicitTxn:true, WasUpgraded:true} + unreachable! + handled events: + NonRetriableErr{IsCommit:true} + RetriableErr{CanAutoRetry:false, IsCommit:true} + TxnFinishAborted{} + TxnFinishCommitted{} + missing events: + NonRetriableErr{IsCommit:false} + RetriableErr{CanAutoRetry:false, IsCommit:false} + RetriableErr{CanAutoRetry:true, IsCommit:false} + RetriableErr{CanAutoRetry:true, IsCommit:true} + SavepointRollback{} + TxnReleased{} + TxnRestart{} + TxnStart{ImplicitTxn:false} + TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} diff --git a/pkg/sql/txntype_string.go b/pkg/sql/txntype_string.go index c940af8d5d1c..2f50054ae2cb 100644 --- a/pkg/sql/txntype_string.go +++ b/pkg/sql/txntype_string.go @@ -10,11 +10,12 @@ func _() { var x [1]struct{} _ = x[implicitTxn-0] _ = x[explicitTxn-1] + _ = x[upgradedExplicitTxn-2] } -const _txnType_name = "implicitTxnexplicitTxn" +const _txnType_name = "implicitTxnexplicitTxnupgradedExplicitTxn" -var _txnType_index = [...]uint8{0, 11, 22} +var _txnType_index = [...]uint8{0, 11, 22, 41} func (i txnType) String() string { if i < 0 || i >= txnType(len(_txnType_index)-1) { diff --git a/pkg/util/fsm/debug.go b/pkg/util/fsm/debug.go index e1584bb82ca8..5a396c4fdd3a 100644 --- a/pkg/util/fsm/debug.go +++ b/pkg/util/fsm/debug.go @@ -44,7 +44,7 @@ func (di debugInfo) reachable(sName string) bool { func typeName(i interface{}) string { s := fmt.Sprintf("%#v", i) parts := strings.Split(s, ".") - return parts[len(parts)-1] + return strings.Join(parts[1:], ".") } func trimState(s string) string { return strings.TrimPrefix(s, "state") } func trimEvent(s string) string { return strings.TrimPrefix(s, "event") }