diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 2c7c902d3887..a56fceb4b70a 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1016,7 +1016,7 @@ func (s *Server) newConnExecutorWithTxn( // initialize the state. ex.machine = fsm.MakeMachine( BoundTxnStateTransitions, - stateOpen{ImplicitTxn: fsm.False}, + stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, &ex.state, ) ex.state.resetForNewSQLTxn( @@ -2209,8 +2209,7 @@ func (ex *connExecutor) updateTxnRewindPosMaybe( return nil } if advInfo.txnEvent.eventType == txnStart || - advInfo.txnEvent.eventType == txnRestart || - advInfo.txnEvent.eventType == txnUpgradeToExplicit { + advInfo.txnEvent.eventType == txnRestart { var nextPos CmdPos switch advInfo.code { case stayInPlace: diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 360f797e4d93..c2a4290fac00 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -790,6 +790,7 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) { var fooTableId uint32 testDB.Exec(t, "SET enable_implicit_transaction_for_batch_statements = true") + testDB.Exec(t, "CREATE TABLE bar (a INT PRIMARY KEY)") testDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)") testDB.QueryRow(t, "SELECT 'foo'::regclass::oid").Scan(&fooTableId) @@ -813,8 +814,14 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) { return nil }) - testDB.Exec(t, "SELECT 1; BEGIN; INSERT INTO foo VALUES(1); COMMIT;") + testDB.Exec(t, "INSERT INTO bar VALUES(2); BEGIN; INSERT INTO foo VALUES(1); COMMIT;") require.Equal(t, numToRetry+1, int(retryCount)) + + var x int + testDB.QueryRow(t, "select * from foo").Scan(&x) + require.Equal(t, 1, x) + testDB.QueryRow(t, "select * from bar").Scan(&x) + require.Equal(t, 2, x) } // This test ensures that when in an explicit transaction and statement diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index e2ca0bbdf92d..fea8ede7accc 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -49,7 +49,11 @@ func (stateNoTxn) String() string { } type stateOpen struct { + // ImplicitTxn is false if the txn included a BEGIN command. ImplicitTxn fsm.Bool + // WasUpgraded is true if the txn started as implicit, but a BEGIN made it + // become explicit. + WasUpgraded fsm.Bool } var _ fsm.State = &stateOpen{} @@ -60,7 +64,12 @@ func (stateOpen) String() string { // stateAborted is entered on errors (retriable and non-retriable). A ROLLBACK // TO SAVEPOINT can move the transaction back to stateOpen. -type stateAborted struct{} +type stateAborted struct { + // WasUpgraded is true if the txn started as implicit, but a BEGIN made it + // become explicit. This is needed so that when ROLLBACK TO SAVEPOINT moves + // to stateOpen, we keep tracking WasUpgraded correctly. + WasUpgraded fsm.Bool +} var _ fsm.State = &stateAborted{} @@ -231,7 +240,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ stateNoTxn{}: { eventTxnStart{fsm.Var("implicitTxn")}: { Description: "BEGIN, or before a statement running as an implicit txn", - Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}, + Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn"), WasUpgraded: fsm.False}, Action: noTxnToOpen, }, eventNonRetriableErr{IsCommit: fsm.Any}: { @@ -248,7 +257,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ }, /// Open - stateOpen{ImplicitTxn: fsm.Any}: { + stateOpen{ImplicitTxn: fsm.Any, WasUpgraded: fsm.Any}: { eventTxnFinishCommitted{}: { Description: "COMMIT, or after a statement running as an implicit txn", Next: stateNoTxn{}, @@ -279,17 +288,15 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Action: cleanupAndFinishOnError, }, }, - stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}: { + stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}: { // This is the case where we auto-retry. eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: { - // Rewind and auto-retry - the transaction should stay in the Open state. + // Rewind and auto-retry - the transaction should stay in the Open state Description: "Retriable err; will auto-retry", - Next: stateOpen{ImplicitTxn: fsm.Var("implicitTxn")}, + Next: stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}, Action: prepareTxnForRetryWithRewind, }, - }, - // Handle the errors in implicit txns. They move us to NoTxn. - stateOpen{ImplicitTxn: fsm.True}: { + // Handle the errors in implicit txns. They move us to NoTxn. eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { Next: stateNoTxn{}, Action: cleanupAndFinishOnError, @@ -298,8 +305,9 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Next: stateNoTxn{}, Action: cleanupAndFinishOnError, }, + // Handle a txn getting upgraded to an explicit txn. eventTxnUpgradeToExplicit{}: { - Next: stateOpen{ImplicitTxn: fsm.False}, + Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True}, Action: func(args fsm.Args) error { args.Extended.(*txnState).setAdvanceInfo( advanceOne, @@ -310,10 +318,10 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ }, }, }, - // Handle the errors in explicit txns. They move us to Aborted. - stateOpen{ImplicitTxn: fsm.False}: { + // Handle the errors in explicit txns. + stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}: { eventNonRetriableErr{IsCommit: fsm.False}: { - Next: stateAborted{}, + Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}, Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) ts.setAdvanceInfo(skipBatch, noRewind, txnEvent{eventType: noEvent}) @@ -324,11 +332,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ // txnRestart output event. eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", - Next: stateOpen{ImplicitTxn: fsm.False}, + Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}, Action: prepareTxnForRetry, }, eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}: { - Next: stateAborted{}, + Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}, Action: func(args fsm.Args) error { args.Extended.(*txnState).setAdvanceInfo( skipBatch, @@ -338,6 +346,16 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ return nil }, }, + // This is the case where we auto-retry explicit transactions. + eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.Any}: { + // Rewind and auto-retry - the transaction should stay in the Open state. + // Retrying can cause the transaction to become implicit if we see that + // the transaction was previously upgraded. During the retry, BEGIN + // will be executed again and upgrade the transaction back to explicit. + Description: "Retriable err; will auto-retry", + Next: stateOpen{ImplicitTxn: fsm.Var("wasUpgraded"), WasUpgraded: fsm.False}, + Action: prepareTxnForRetryWithRewind, + }, eventTxnReleased{}: { Description: "RELEASE SAVEPOINT cockroach_restart", Next: stateCommitWait{}, @@ -362,7 +380,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ // // Note that we don't handle any error events here. Any statement but a // ROLLBACK (TO SAVEPOINT) is expected to not be passed to the state machine. - stateAborted{}: { + stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}: { eventTxnFinishAborted{}: { Description: "ROLLBACK", Next: stateNoTxn{}, @@ -378,7 +396,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventNonRetriableErr{IsCommit: fsm.False}: { // This event doesn't change state, but it returns a skipBatch code. Description: "any other statement", - Next: stateAborted{}, + Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}, Action: func(args fsm.Args) error { args.Extended.(*txnState).setAdvanceInfo( skipBatch, @@ -392,13 +410,13 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventNonRetriableErr{IsCommit: fsm.True}: { // This event doesn't change state, but it returns a skipBatch code. Description: "ConnExecutor closing", - Next: stateAborted{}, + Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}, Action: cleanupAndFinishOnError, }, // ROLLBACK TO SAVEPOINT success. eventSavepointRollback{}: { Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) success", - Next: stateOpen{ImplicitTxn: fsm.False}, + Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}, Action: func(args fsm.Args) error { args.Extended.(*txnState).setAdvanceInfo( advanceOne, @@ -412,7 +430,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ eventRetriableErr{CanAutoRetry: fsm.Any, IsCommit: fsm.Any}: { // This event doesn't change state, but it returns a skipBatch code. Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart", - Next: stateAborted{}, + Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")}, Action: func(args fsm.Args) error { args.Extended.(*txnState).setAdvanceInfo( skipBatch, @@ -425,7 +443,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ // ROLLBACK TO SAVEPOINT cockroach_restart. eventTxnRestart{}: { Description: "ROLLBACK TO SAVEPOINT cockroach_restart", - Next: stateOpen{ImplicitTxn: fsm.False}, + Next: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}, Action: prepareTxnForRetry, }, }, @@ -559,7 +577,7 @@ func prepareTxnForRetryWithRewind(args fsm.Args) error { // when running SQL inside a higher-level txn. It's a very limited state // machine: it doesn't allow starting or finishing txns, auto-retries, etc. var BoundTxnStateTransitions = fsm.Compile(fsm.Pattern{ - stateOpen{ImplicitTxn: fsm.False}: { + stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}: { // We accept eventNonRetriableErr with both IsCommit={True, fsm.False}, even // though this state machine does not support COMMIT statements because // connExecutor.close() sends an eventNonRetriableErr{IsCommit: fsm.True} event. diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index edc048da31fe..8f39a6d4bfb5 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -143,6 +143,9 @@ const ( // explicitTxn means that the txn was explicitly started with a BEGIN // statement. explicitTxn + // upgradedExplicitTxn means that the txn started as implicit, but a BEGIN + // in the middle of it caused it to become explicit. + upgradedExplicitTxn ) // resetForNewSQLTxn (re)initializes the txnState for a new transaction. diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index d8171a7a5e85..2eb5f80e7704 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -107,14 +107,17 @@ func (tc *testContext) createOpenState(typ txnType) (fsm.State, *txnState) { state := stateOpen{ ImplicitTxn: fsm.FromBool(typ == implicitTxn), + WasUpgraded: fsm.FromBool(typ == upgradedExplicitTxn), } return state, &ts } // createAbortedState returns a txnState initialized with an aborted txn. -func (tc *testContext) createAbortedState() (fsm.State, *txnState) { - _, ts := tc.createOpenState(explicitTxn) - return stateAborted{}, ts +func (tc *testContext) createAbortedState(typ txnType) (fsm.State, *txnState) { + _, ts := tc.createOpenState(typ) + return stateAborted{ + WasUpgraded: fsm.FromBool(typ == upgradedExplicitTxn), + }, ts } func (tc *testContext) createCommitWaitState() (fsm.State, *txnState, error) { @@ -298,7 +301,7 @@ func TestTransitions(t *testing.T) { ev: eventTxnStart{ImplicitTxn: fsm.True}, evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(), nil /* historicalTimestamp */, tranCtx, sessiondatapb.Normal), - expState: stateOpen{ImplicitTxn: fsm.True}, + expState: stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}, expAdv: expAdvance{ // We expect to stayInPlace; upon starting a txn the statement is // executed again, this time in state Open. @@ -323,7 +326,7 @@ func TestTransitions(t *testing.T) { ev: eventTxnStart{ImplicitTxn: fsm.False}, evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(), nil /* historicalTimestamp */, tranCtx, sessiondatapb.Normal), - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, expAdv: expAdvance{ expCode: advanceOne, expEv: txnStart, @@ -381,6 +384,27 @@ func TestTransitions(t *testing.T) { }, expTxn: nil, }, + { + // Finish an upgraded explicit txn. + name: "Open (upgraded explicit) -> NoTxn", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + // We commit the KV transaction, as that's done by the layer below + // txnState. + if err := ts.mu.txn.Commit(ts.Ctx); err != nil { + return nil, nil, emptyTxnID, err + } + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventTxnFinishCommitted{}, + evPayload: nil, + expState: stateNoTxn{}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: txnCommit, + }, + expTxn: nil, + }, { // Get a retriable error while we can auto-retry. name: "Open + auto-retry", @@ -395,7 +419,31 @@ func TestTransitions(t *testing.T) { } return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.False}, b }, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: rewind, + expEv: txnRestart, + }, + // Expect non-nil txn. + expTxn: &expKVTxn{}, + expAutoRetryCounter: 1, + expAutoRetryError: "test retriable err", + }, + { + // Get a retriable error while we can auto-retry. + name: "Open + auto-retry on upgraded txn", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) { + b := eventRetriableErrPayload{ + err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"), + rewCap: dummyRewCap, + } + return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.False}, b + }, + expState: stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}, expAdv: expAdvance{ expCode: rewind, expEv: txnRestart, @@ -421,7 +469,32 @@ func TestTransitions(t *testing.T) { } return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.True}, b }, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: rewind, + expEv: txnRestart, + }, + // Expect non-nil txn. + expTxn: &expKVTxn{}, + expAutoRetryCounter: 1, + }, + { + // Like the above test - get a retriable error while we can auto-retry, + // except this time the txn was upgraded to explicit. This shouldn't make + // any difference; we should still auto-retry like the above. + name: "Open + auto-retry (COMMIT) on upgraded txn", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) { + b := eventRetriableErrPayload{ + err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"), + rewCap: dummyRewCap, + } + return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.True}, b + }, + expState: stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}, expAdv: expAdvance{ expCode: rewind, expEv: txnRestart, @@ -445,7 +518,34 @@ func TestTransitions(t *testing.T) { } return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b }, - expState: stateAborted{}, + expState: stateAborted{ + WasUpgraded: fsm.False, + }, + expAdv: expAdvance{ + expCode: skipBatch, + expEv: noEvent, + }, + // Expect non-nil txn. + expTxn: &expKVTxn{}, + }, + { + // Get a retriable error when we can no longer auto-retry, but the client + // is doing client-side retries. + name: "Open (upgraded) + client retry", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) { + b := eventRetriableErrPayload{ + err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"), + rewCap: rewindCapability{}, + } + return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b + }, + expState: stateAborted{ + WasUpgraded: fsm.True, + }, expAdv: expAdvance{ expCode: skipBatch, expEv: noEvent, @@ -529,7 +629,27 @@ func TestTransitions(t *testing.T) { }, ev: eventNonRetriableErr{IsCommit: fsm.False}, evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")}, - expState: stateAborted{}, + expState: stateAborted{ + WasUpgraded: fsm.False, + }, + expAdv: expAdvance{ + expCode: skipBatch, + expEv: noEvent, + }, + expTxn: &expKVTxn{}, + }, + { + // We get a non-retriable error. + name: "Open (upgraded) + non-retriable error", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventNonRetriableErr{IsCommit: fsm.False}, + evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")}, + expState: stateAborted{ + WasUpgraded: fsm.True, + }, expAdv: expAdvance{ expCode: skipBatch, expEv: noEvent, @@ -561,7 +681,24 @@ func TestTransitions(t *testing.T) { return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnRestart{}, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: txnRestart, + }, + // We would like to test that the transaction's epoch bumped if the txn + // performed any operations, but it's not easy to do the test. + expTxn: &expKVTxn{}, + }, + { + // Restarting from Open via ROLLBACK TO SAVEPOINT on an upgraded txn. + name: "Open (upgraded) + restart", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createOpenState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventTxnRestart{}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True}, expAdv: expAdvance{ expCode: advanceOne, expEv: txnRestart, @@ -577,7 +714,22 @@ func TestTransitions(t *testing.T) { // The txn finished, such as after a ROLLBACK. name: "Aborted->NoTxn", init: func() (fsm.State, *txnState, uuid.UUID, error) { - s, ts := testCon.createAbortedState() + s, ts := testCon.createAbortedState(explicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventTxnFinishAborted{}, + expState: stateNoTxn{}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: txnRollback, + }, + expTxn: nil, + }, + { + // The upgraded txn finished, such as after a ROLLBACK. + name: "Aborted(upgraded)->NoTxn", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createAbortedState(upgradedExplicitTxn) return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnFinishAborted{}, @@ -592,11 +744,26 @@ func TestTransitions(t *testing.T) { // The txn is starting again (ROLLBACK TO SAVEPOINT while in Aborted). name: "Aborted->Open", init: func() (fsm.State, *txnState, uuid.UUID, error) { - s, ts := testCon.createAbortedState() + s, ts := testCon.createAbortedState(explicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventSavepointRollback{}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: noEvent, + }, + expTxn: &expKVTxn{}, + }, + { + // The upgraded txn is starting again (ROLLBACK TO SAVEPOINT while in Aborted). + name: "Aborted(upgraded)->Open", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createAbortedState(upgradedExplicitTxn) return s, ts, ts.mu.txn.ID(), nil }, ev: eventSavepointRollback{}, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True}, expAdv: expAdvance{ expCode: advanceOne, expEv: noEvent, @@ -607,11 +774,31 @@ func TestTransitions(t *testing.T) { // The txn is starting again (ROLLBACK TO SAVEPOINT cockroach_restart while in Aborted). name: "Aborted->Restart", init: func() (fsm.State, *txnState, uuid.UUID, error) { - s, ts := testCon.createAbortedState() + s, ts := testCon.createAbortedState(explicitTxn) return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnRestart{}, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: txnRestart, + }, + expTxn: &expKVTxn{ + userPriority: &pri, + writeTSNanos: &now.WallTime, + readTSNanos: &now.WallTime, + uncertaintyLimitNanos: &uncertaintyLimit.WallTime, + }, + }, + { + // The upgraded txn is starting again (ROLLBACK TO SAVEPOINT cockroach_restart while in Aborted). + name: "Aborted(upgraded)->Restart", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createAbortedState(upgradedExplicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventTxnRestart{}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True}, expAdv: expAdvance{ expCode: advanceOne, expEv: txnRestart, @@ -629,11 +816,30 @@ func TestTransitions(t *testing.T) { // to the expTxn. name: "Aborted->Starting (historical)", init: func() (fsm.State, *txnState, uuid.UUID, error) { - s, ts := testCon.createAbortedState() + s, ts := testCon.createAbortedState(explicitTxn) + return s, ts, ts.mu.txn.ID(), nil + }, + ev: eventTxnRestart{}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, + expAdv: expAdvance{ + expCode: advanceOne, + expEv: txnRestart, + }, + expTxn: &expKVTxn{ + writeTSNanos: proto.Int64(now.WallTime), + }, + }, + { + // The upgraded txn is starting again (e.g. ROLLBACK TO SAVEPOINT while in Aborted). + // Verify that the historical timestamp from the evPayload is propagated + // to the expTxn. + name: "Aborted(upgraded)->Starting (historical)", + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts := testCon.createAbortedState(upgradedExplicitTxn) return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnRestart{}, - expState: stateOpen{ImplicitTxn: fsm.False}, + expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.True}, expAdv: expAdvance{ expCode: advanceOne, expEv: txnRestart, diff --git a/pkg/sql/txnstatetransitions_diagram.gv b/pkg/sql/txnstatetransitions_diagram.gv index 64e31bc7d849..bdb0cf593d06 100644 --- a/pkg/sql/txnstatetransitions_diagram.gv +++ b/pkg/sql/txnstatetransitions_diagram.gv @@ -13,39 +13,58 @@ digraph finite_state_machine { qi -> "NoTxn{}"; node [shape = circle]; - "Aborted{}" -> "Aborted{}" [label = any other statement>] - "Aborted{}" -> "Aborted{}" [label = ConnExecutor closing>] - "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] - "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] - "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] - "Aborted{}" -> "Aborted{}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] - "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) success>] - "Aborted{}" -> "NoTxn{}" [label = ROLLBACK>] - "Aborted{}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = any other statement>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = ConnExecutor closing>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) success>] + "Aborted{WasUpgraded:false}" -> "NoTxn{}" [label = ROLLBACK>] + "Aborted{WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = any other statement>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = ConnExecutor closing>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart>] + "Aborted{WasUpgraded:true}" -> "Open{ImplicitTxn:false, WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT (not cockroach_restart) success>] + "Aborted{WasUpgraded:true}" -> "NoTxn{}" [label = ROLLBACK>] + "Aborted{WasUpgraded:true}" -> "Open{ImplicitTxn:false, WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] "CommitWait{}" -> "CommitWait{}" [label = any other statement>] "CommitWait{}" -> "CommitWait{}" [label = any other statement>] "CommitWait{}" -> "NoTxn{}" [label = COMMIT>] "NoTxn{}" -> "NoTxn{}" [label = anything but BEGIN or extended protocol command error>] "NoTxn{}" -> "NoTxn{}" [label = anything but BEGIN or extended protocol command error>] - "NoTxn{}" -> "Open{ImplicitTxn:false}" [label = BEGIN, or before a statement running as an implicit txn>] - "NoTxn{}" -> "Open{ImplicitTxn:true}" [label = BEGIN, or before a statement running as an implicit txn>] - "Open{ImplicitTxn:false}" -> "Aborted{}" [label = "NonRetriableErr{IsCommit:false}"] - "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] - "Open{ImplicitTxn:false}" -> "Aborted{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] - "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = Retriable err on COMMIT>] - "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = Retriable err; will auto-retry>] - "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = Retriable err; will auto-retry>] - "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] - "Open{ImplicitTxn:false}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] - "Open{ImplicitTxn:false}" -> "CommitWait{}" [label = RELEASE SAVEPOINT cockroach_restart>] - "Open{ImplicitTxn:false}" -> "Open{ImplicitTxn:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:false}"] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = Retriable err on COMMIT>] - "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] - "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:true}" [label = Retriable err; will auto-retry>] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] - "Open{ImplicitTxn:true}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] - "Open{ImplicitTxn:true}" -> "Open{ImplicitTxn:false}" [label = "TxnUpgradeToExplicit{}"] + "NoTxn{}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = BEGIN, or before a statement running as an implicit txn>] + "NoTxn{}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = BEGIN, or before a statement running as an implicit txn>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = "NonRetriableErr{IsCommit:false}"] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Aborted{WasUpgraded:false}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = Retriable err on COMMIT>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "CommitWait{}" [label = RELEASE SAVEPOINT cockroach_restart>] + "Open{ImplicitTxn:false, WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = "NonRetriableErr{IsCommit:false}"] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = Retriable err on COMMIT>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "CommitWait{}" [label = RELEASE SAVEPOINT cockroach_restart>] + "Open{ImplicitTxn:false, WasUpgraded:true}" -> "Open{ImplicitTxn:false, WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:false}"] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:true}"] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = "RetriableErr{CanAutoRetry:false, IsCommit:false}"] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = Retriable err on COMMIT>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "Open{ImplicitTxn:true, WasUpgraded:false}" [label = Retriable err; will auto-retry>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>] + "Open{ImplicitTxn:true, WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:true}" [label = "TxnUpgradeToExplicit{}"] } diff --git a/pkg/sql/txnstatetransitions_report.txt b/pkg/sql/txnstatetransitions_report.txt index e04d7d82582b..40bad130717f 100644 --- a/pkg/sql/txnstatetransitions_report.txt +++ b/pkg/sql/txnstatetransitions_report.txt @@ -1,6 +1,23 @@ // Code generated; DO NOT EDIT. -Aborted{} +Aborted{WasUpgraded:false} + handled events: + NonRetriableErr{IsCommit:false} + NonRetriableErr{IsCommit:true} + RetriableErr{CanAutoRetry:false, IsCommit:false} + RetriableErr{CanAutoRetry:false, IsCommit:true} + RetriableErr{CanAutoRetry:true, IsCommit:false} + RetriableErr{CanAutoRetry:true, IsCommit:true} + SavepointRollback{} + TxnFinishAborted{} + TxnRestart{} + missing events: + TxnFinishCommitted{} + TxnReleased{} + TxnStart{ImplicitTxn:false} + TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} +Aborted{WasUpgraded:true} handled events: NonRetriableErr{IsCommit:false} NonRetriableErr{IsCommit:true} @@ -51,7 +68,24 @@ NoTxn{} TxnReleased{} TxnRestart{} TxnUpgradeToExplicit{} -Open{ImplicitTxn:false} +Open{ImplicitTxn:false, WasUpgraded:false} + handled events: + NonRetriableErr{IsCommit:false} + NonRetriableErr{IsCommit:true} + RetriableErr{CanAutoRetry:false, IsCommit:false} + RetriableErr{CanAutoRetry:false, IsCommit:true} + RetriableErr{CanAutoRetry:true, IsCommit:false} + RetriableErr{CanAutoRetry:true, IsCommit:true} + TxnFinishAborted{} + TxnFinishCommitted{} + TxnReleased{} + TxnRestart{} + missing events: + SavepointRollback{} + TxnStart{ImplicitTxn:false} + TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} +Open{ImplicitTxn:false, WasUpgraded:true} handled events: NonRetriableErr{IsCommit:false} NonRetriableErr{IsCommit:true} @@ -68,7 +102,7 @@ Open{ImplicitTxn:false} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} TxnUpgradeToExplicit{} -Open{ImplicitTxn:true} +Open{ImplicitTxn:true, WasUpgraded:false} handled events: NonRetriableErr{IsCommit:false} NonRetriableErr{IsCommit:true} @@ -85,3 +119,21 @@ Open{ImplicitTxn:true} TxnRestart{} TxnStart{ImplicitTxn:false} TxnStart{ImplicitTxn:true} +Open{ImplicitTxn:true, WasUpgraded:true} + unreachable! + handled events: + NonRetriableErr{IsCommit:true} + RetriableErr{CanAutoRetry:false, IsCommit:true} + TxnFinishAborted{} + TxnFinishCommitted{} + missing events: + NonRetriableErr{IsCommit:false} + RetriableErr{CanAutoRetry:false, IsCommit:false} + RetriableErr{CanAutoRetry:true, IsCommit:false} + RetriableErr{CanAutoRetry:true, IsCommit:true} + SavepointRollback{} + TxnReleased{} + TxnRestart{} + TxnStart{ImplicitTxn:false} + TxnStart{ImplicitTxn:true} + TxnUpgradeToExplicit{} diff --git a/pkg/sql/txntype_string.go b/pkg/sql/txntype_string.go index c940af8d5d1c..2f50054ae2cb 100644 --- a/pkg/sql/txntype_string.go +++ b/pkg/sql/txntype_string.go @@ -10,11 +10,12 @@ func _() { var x [1]struct{} _ = x[implicitTxn-0] _ = x[explicitTxn-1] + _ = x[upgradedExplicitTxn-2] } -const _txnType_name = "implicitTxnexplicitTxn" +const _txnType_name = "implicitTxnexplicitTxnupgradedExplicitTxn" -var _txnType_index = [...]uint8{0, 11, 22} +var _txnType_index = [...]uint8{0, 11, 22, 41} func (i txnType) String() string { if i < 0 || i >= txnType(len(_txnType_index)-1) { diff --git a/pkg/util/fsm/debug.go b/pkg/util/fsm/debug.go index e1584bb82ca8..5a396c4fdd3a 100644 --- a/pkg/util/fsm/debug.go +++ b/pkg/util/fsm/debug.go @@ -44,7 +44,7 @@ func (di debugInfo) reachable(sName string) bool { func typeName(i interface{}) string { s := fmt.Sprintf("%#v", i) parts := strings.Split(s, ".") - return parts[len(parts)-1] + return strings.Join(parts[1:], ".") } func trimState(s string) string { return strings.TrimPrefix(s, "state") } func trimEvent(s string) string { return strings.TrimPrefix(s, "event") }