diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index f0d149848862..076f6eaa492b 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2181,7 +2181,9 @@ func (ex *connExecutor) updateTxnRewindPosMaybe( if _, ok := ex.machine.CurState().(stateOpen); !ok { return nil } - if advInfo.txnEvent.eventType == txnStart || advInfo.txnEvent.eventType == txnRestart { + if advInfo.txnEvent.eventType == txnStart || + advInfo.txnEvent.eventType == txnRestart || + advInfo.txnEvent.eventType == txnUpgradeToExplicit { var nextPos CmdPos switch advInfo.code { case stayInPlace: @@ -2859,7 +2861,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( // Handle transaction events which cause updates to txnState. switch advInfo.txnEvent.eventType { - case noEvent: + case noEvent, txnUpgradeToExplicit: _, nextStateIsAborted := ex.machine.CurState().(stateAborted) // Update the deadline on the transaction based on the collections, // if the transaction is currently open. If the next state is aborted diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index e9dc940b7529..568417f825a6 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -727,6 +727,58 @@ func TestRetriableErrorDuringPrepare(t *testing.T) { defer func() { _ = stmt.Close() }() } +// TestRetriableErrorDuringUpgradedTransaction ensures that a retriable error +// that happens during a transaction that was upgraded from an implicit +// transaction into an explicit transaction does not cause the BEGIN to be +// re-executed. +func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + var retryCount int64 + const numToRetry = 2 // only fail on the first two attempts + filter := newDynamicRequestFilter() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: filter.filter, + }, + }, + }) + defer s.Stopper().Stop(context.Background()) + + conn, err := sqlDB.Conn(context.Background()) + require.NoError(t, err) + testDB := sqlutils.MakeSQLRunner(conn) + + var fooTableId uint32 + testDB.Exec(t, "SET enable_implicit_transaction_for_batch_statements = true") + testDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)") + testDB.QueryRow(t, "SELECT 'foo'::regclass::oid").Scan(&fooTableId) + + // Inject an error that will happen during execution. + filter.setFilter(func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if ba.Txn == nil { + return nil + } + if req, ok := ba.GetArg(roachpb.ConditionalPut); ok { + put := req.(*roachpb.ConditionalPutRequest) + _, tableID, err := keys.SystemSQLCodec.DecodeTablePrefix(put.Key) + if err != nil || tableID != fooTableId { + return nil + } + if atomic.AddInt64(&retryCount, 1) <= numToRetry { + return roachpb.NewErrorWithTxn( + roachpb.NewTransactionRetryError(roachpb.RETRY_REASON_UNKNOWN, "injected retry error"), ba.Txn, + ) + } + } + return nil + }) + + testDB.Exec(t, "SELECT 1; BEGIN; INSERT INTO foo VALUES(1); COMMIT;") + require.Equal(t, numToRetry+1, int(retryCount)) +} + // This test ensures that when in an explicit transaction and statement // preparation uses the user's transaction, errors during those planning queries // are handled correctly. diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index cbfbeb21e6a2..177e43af8ae1 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -303,7 +303,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ args.Extended.(*txnState).setAdvanceInfo( advanceOne, noRewind, - txnEvent{eventType: noEvent}, + txnEvent{eventType: txnUpgradeToExplicit}, ) return nil }, diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 2b2ac13d08d7..3c1428fae1f3 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -402,6 +402,11 @@ const ( // the transaction. This allows such savepoints to reset more state than other // savepoints. txnRestart + // txnUpgradeToExplicit means that the current implicit transaction was + // upgraded to an explicit one. This happens when BEGIN is executed during the + // extended protocol or as part of a batch of statements. It's used to + // indicate that the transaction rewind position should be updated. + txnUpgradeToExplicit ) // advanceInfo represents instructions for the connExecutor about what statement diff --git a/pkg/sql/txneventtype_string.go b/pkg/sql/txneventtype_string.go index 91e7607ca6e7..8b03b37454a8 100644 --- a/pkg/sql/txneventtype_string.go +++ b/pkg/sql/txneventtype_string.go @@ -13,11 +13,12 @@ func _() { _ = x[txnCommit-2] _ = x[txnRollback-3] _ = x[txnRestart-4] + _ = x[txnUpgradeToExplicit-5] } -const _txnEventType_name = "noEventtxnStarttxnCommittxnRollbacktxnRestart" +const _txnEventType_name = "noEventtxnStarttxnCommittxnRollbacktxnRestarttxnUpgradeToExplicit" -var _txnEventType_index = [...]uint8{0, 7, 15, 24, 35, 45} +var _txnEventType_index = [...]uint8{0, 7, 15, 24, 35, 45, 65} func (i txnEventType) String() string { if i < 0 || i >= txnEventType(len(_txnEventType_index)-1) {