diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index cde74ea25512..67833022f3fc 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -932,9 +932,6 @@ func (s *Server) newConnExecutor( ex.transitionCtx.sessionTracing = &ex.sessionTracing ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{} - - ex.extraTxnState.atomicAutoRetryCounter = new(int32) - ex.extraTxnState.createdSequences = make(map[descpb.ID]struct{}) ex.initPlanner(ctx, &ex.planner) @@ -1222,7 +1219,11 @@ type connExecutor struct { // ex.state, above. The rule of thumb is that, if the state influences state // transitions, it should live in state, otherwise it can live here. // This is only used in the Open state. extraTxnState is reset whenever a - // transaction finishes or gets retried. + // transaction finishes or gets retried. Additionally, if the field is + // accessed outside the connExecutor's goroutine, it should + // be added to the mu struct in connExecutor's txnState. Notably if + // the field is accessed in connExecutor's serialize function, it should be + // added to txnState behind the mutex. extraTxnState struct { // descCollection collects descriptors used by the current transaction. descCollection descs.Collection @@ -1240,16 +1241,6 @@ type connExecutor struct { // transaction and it is cleared after the transaction is committed. schemaChangeJobRecords map[descpb.ID]*jobs.Record - // atomicAutoRetryCounter keeps track of the which iteration of a transaction - // auto-retry we're currently in. It's 0 whenever the transaction state is not - // stateOpen. - atomicAutoRetryCounter *int32 - - // autoRetryReason records the error causing an auto-retryable error event if - // the current transaction is being automatically retried. This is used in - // statement traces to give more information in statement diagnostic bundles. - autoRetryReason error - // firstStmtExecuted indicates that the first statement inside this // transaction has been executed. firstStmtExecuted bool @@ -2537,9 +2528,6 @@ func (ex *connExecutor) makeErrEvent(err error, stmt tree.Statement) (fsm.Event, if ex.implicitTxn() || !ex.sessionData().InjectRetryErrorsEnabled { rc, canAutoRetry = ex.getRewindTxnCapability() } - if canAutoRetry { - ex.extraTxnState.autoRetryReason = err - } ev := eventRetriableErr{ IsCommit: fsm.FromBool(isCommit(stmt)), @@ -2735,7 +2723,7 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, // to the point just before our failed read to ensure we don't try to read // data which may be after the schema change when we retry. var minTSErr *roachpb.MinTimestampBoundUnsatisfiableError - if err := ex.extraTxnState.autoRetryReason; err != nil && errors.As(err, &minTSErr) { + if err := ex.state.mu.autoRetryReason; err != nil && errors.As(err, &minTSErr) { nextMax := minTSErr.MinTimestampBound ex.extraTxnState.descCollection.SetMaxTimestampBound(nextMax) evalCtx.AsOfSystemTime.MaxTimestampBound = nextMax @@ -2842,9 +2830,6 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( } advInfo := ex.state.consumeAdvanceInfo() - if advInfo.code == rewind { - atomic.AddInt32(ex.extraTxnState.atomicAutoRetryCounter, 1) - } // If we had an error from DDL statement execution due to the presence of // other concurrent schema changes when attempting a schema change, wait for @@ -2872,8 +2857,6 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( } } case txnStart: - atomic.StoreInt32(ex.extraTxnState.atomicAutoRetryCounter, 0) - ex.extraTxnState.autoRetryReason = nil ex.extraTxnState.firstStmtExecuted = false ex.recordTransactionStart(advInfo.txnEvent.txnID) // Start of the transaction, so no statements were executed earlier. @@ -3047,8 +3030,8 @@ func (ex *connExecutor) serialize() serverpb.Session { var autoRetryReasonStr string - if ex.extraTxnState.autoRetryReason != nil { - autoRetryReasonStr = ex.extraTxnState.autoRetryReason.Error() + if ex.state.mu.autoRetryReason != nil { + autoRetryReasonStr = ex.state.mu.autoRetryReason.Error() } if txn != nil { @@ -3058,7 +3041,7 @@ func (ex *connExecutor) serialize() serverpb.Session { Start: ex.state.mu.txnStart, NumStatementsExecuted: int32(ex.state.mu.stmtCount), NumRetries: int32(txn.Epoch()), - NumAutoRetries: atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter), + NumAutoRetries: ex.state.mu.autoRetryCounter, TxnDescription: txn.String(), Implicit: ex.implicitTxn(), AllocBytes: ex.state.mon.AllocBytes(), diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 592f47ec5681..556ab7b234c4 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -18,7 +18,6 @@ import ( "runtime/pprof" "strconv" "strings" - "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -1060,7 +1059,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( planner.maybeLogStatement( ctx, ex.executorType, - int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), + int(ex.state.mu.autoRetryCounter), ex.extraTxnState.txnCounter, res.RowsAffected(), res.Err(), @@ -1138,9 +1137,9 @@ func (ex *connExecutor) dispatchToExecutionEngine( planner.curPlan.flags.Set(planFlagNotDistributed) } - ex.sessionTracing.TraceRetryInformation(ctx, int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), ex.extraTxnState.autoRetryReason) - if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.extraTxnState.autoRetryReason != nil { - ex.server.cfg.TestingKnobs.OnTxnRetry(ex.extraTxnState.autoRetryReason, planner.EvalContext()) + ex.sessionTracing.TraceRetryInformation(ctx, int(ex.state.mu.autoRetryCounter), ex.state.mu.autoRetryReason) + if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.state.mu.autoRetryReason != nil { + ex.server.cfg.TestingKnobs.OnTxnRetry(ex.state.mu.autoRetryReason, planner.EvalContext()) } distribute := DistributionType(DistributionTypeNone) if distributePlan.WillDistribute() { @@ -1175,7 +1174,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( // plan has not been closed earlier. ex.recordStatementSummary( ctx, planner, - int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), res.RowsAffected(), res.Err(), stats, + int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats, ) if ex.server.cfg.TestingKnobs.AfterExecute != nil { ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err()) @@ -2145,7 +2144,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) { ex.extraTxnState.rowsWritten = 0 if ex.server.cfg.TestingKnobs.BeforeRestart != nil { - ex.server.cfg.TestingKnobs.BeforeRestart(ctx, ex.extraTxnState.autoRetryReason) + ex.server.cfg.TestingKnobs.BeforeRestart(ctx, ex.state.mu.autoRetryReason) } } } @@ -2249,7 +2248,7 @@ func (ex *connExecutor) recordTransactionFinish( TransactionTimeSec: txnTime.Seconds(), Committed: ev.eventType == txnCommit, ImplicitTxn: implicit, - RetryCount: int64(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), + RetryCount: int64(ex.state.mu.autoRetryCounter), StatementFingerprintIDs: ex.extraTxnState.transactionStatementFingerprintIDs, ServiceLatency: txnServiceLat, RetryLatency: txnRetryLat, diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index cbfbeb21e6a2..f99232219c47 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -530,14 +530,17 @@ func prepareTxnForRetry(args fsm.Args) error { } func prepareTxnForRetryWithRewind(args fsm.Args) error { + pl := args.Payload.(eventRetriableErrPayload) ts := args.Extended.(*txnState) ts.mu.Lock() ts.mu.txn.PrepareForRetry(ts.Ctx) + ts.mu.autoRetryReason = pl.err + ts.mu.autoRetryCounter++ ts.mu.Unlock() // The caller will call rewCap.rewindAndUnlock(). ts.setAdvanceInfo( rewind, - args.Payload.(eventRetriableErrPayload).rewCap, + pl.rewCap, txnEvent{eventType: txnRestart}, ) return nil diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 253b756795d8..7f32fb19bdd0 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -60,6 +60,16 @@ type txnState struct { // stmtCount keeps track of the number of statements that the transaction // has executed. stmtCount int + + // autoRetryReason records the error causing an auto-retryable error event if + // the current transaction is being automatically retried. This is used in + // statement traces to give more information in statement diagnostic bundles. + autoRetryReason error + + // autoRetryCounter keeps track of the which iteration of a transaction + // auto-retry we're currently in. It's 0 whenever the transaction state is not + // stateOpen. + autoRetryCounter int32 } // connCtx is the connection's context. This is the parent of Ctx. @@ -215,6 +225,8 @@ func (ts *txnState) resetForNewSQLTxn( txnID = ts.mu.txn.ID() sp.SetTag("txn", attribute.StringValue(ts.mu.txn.ID().String())) ts.mu.txnStart = timeutil.Now() + ts.mu.autoRetryCounter = 0 + ts.mu.autoRetryReason = nil ts.mu.Unlock() if historicalTimestamp != nil { if err := ts.setHistoricalTimestamp(ts.Ctx, *historicalTimestamp); err != nil { diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index bf4ebfe1f731..f4cb814812ce 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" ) var noRewindExpected = CmdPos(-1) @@ -279,6 +280,12 @@ func TestTransitions(t *testing.T) { // If nil, the kv txn is expected to be nil. Otherwise, the txn's fields are // compared. expTxn *expKVTxn + + // The expected value of AutoRetryError after fsm transition. + expAutoRetryError string + + // The expected value of AutoRetryCounter after fsm transition. + expAutoRetryCounter int32 } tests := []test{ // @@ -397,7 +404,9 @@ func TestTransitions(t *testing.T) { expEv: txnRestart, }, // Expect non-nil txn. - expTxn: &expKVTxn{}, + expTxn: &expKVTxn{}, + expAutoRetryCounter: 1, + expAutoRetryError: "test retriable err", }, { // Like the above test - get a retriable error while we can auto-retry, @@ -421,7 +430,8 @@ func TestTransitions(t *testing.T) { expEv: txnRestart, }, // Expect non-nil txn. - expTxn: &expKVTxn{}, + expTxn: &expKVTxn{}, + expAutoRetryCounter: 1, }, { // Get a retriable error when we can no longer auto-retry, but the client @@ -720,6 +730,12 @@ func TestTransitions(t *testing.T) { t.Fatal(err) } } + + // Check that AutoRetry information is in the expected state. + require.Equal(t, tc.expAutoRetryCounter, ts.mu.autoRetryCounter) + if tc.expAutoRetryError != "" { + strings.Contains(ts.mu.autoRetryReason.Error(), tc.expAutoRetryError) + } }) } }