From 03d96f1073987b9f2da2a966eb48cba76cc569c5 Mon Sep 17 00:00:00 2001 From: richardjcai Date: Tue, 7 Jun 2022 17:25:06 -0400 Subject: [PATCH] sql: put connExecutor's AutoRetry fields into txnState's mutex Auto retry variables could be used outside the connExecutor's goroutine in calls to serialize. If this is the case, the field should be in txnState's mutex struct. Release note: None --- pkg/sql/conn_executor.go | 35 +++++++++-------------------------- pkg/sql/conn_executor_exec.go | 15 +++++++-------- pkg/sql/conn_fsm.go | 8 ++++++++ pkg/sql/txn_state.go | 10 ++++++++++ pkg/sql/txn_state_test.go | 20 ++++++++++++++++++-- 5 files changed, 52 insertions(+), 36 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index cde74ea25512..67833022f3fc 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -932,9 +932,6 @@ func (s *Server) newConnExecutor( ex.transitionCtx.sessionTracing = &ex.sessionTracing ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{} - - ex.extraTxnState.atomicAutoRetryCounter = new(int32) - ex.extraTxnState.createdSequences = make(map[descpb.ID]struct{}) ex.initPlanner(ctx, &ex.planner) @@ -1222,7 +1219,11 @@ type connExecutor struct { // ex.state, above. The rule of thumb is that, if the state influences state // transitions, it should live in state, otherwise it can live here. // This is only used in the Open state. extraTxnState is reset whenever a - // transaction finishes or gets retried. + // transaction finishes or gets retried. Additionally, if the field is + // accessed outside the connExecutor's goroutine, it should + // be added to the mu struct in connExecutor's txnState. Notably if + // the field is accessed in connExecutor's serialize function, it should be + // added to txnState behind the mutex. extraTxnState struct { // descCollection collects descriptors used by the current transaction. descCollection descs.Collection @@ -1240,16 +1241,6 @@ type connExecutor struct { // transaction and it is cleared after the transaction is committed. schemaChangeJobRecords map[descpb.ID]*jobs.Record - // atomicAutoRetryCounter keeps track of the which iteration of a transaction - // auto-retry we're currently in. It's 0 whenever the transaction state is not - // stateOpen. - atomicAutoRetryCounter *int32 - - // autoRetryReason records the error causing an auto-retryable error event if - // the current transaction is being automatically retried. This is used in - // statement traces to give more information in statement diagnostic bundles. - autoRetryReason error - // firstStmtExecuted indicates that the first statement inside this // transaction has been executed. firstStmtExecuted bool @@ -2537,9 +2528,6 @@ func (ex *connExecutor) makeErrEvent(err error, stmt tree.Statement) (fsm.Event, if ex.implicitTxn() || !ex.sessionData().InjectRetryErrorsEnabled { rc, canAutoRetry = ex.getRewindTxnCapability() } - if canAutoRetry { - ex.extraTxnState.autoRetryReason = err - } ev := eventRetriableErr{ IsCommit: fsm.FromBool(isCommit(stmt)), @@ -2735,7 +2723,7 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn, // to the point just before our failed read to ensure we don't try to read // data which may be after the schema change when we retry. var minTSErr *roachpb.MinTimestampBoundUnsatisfiableError - if err := ex.extraTxnState.autoRetryReason; err != nil && errors.As(err, &minTSErr) { + if err := ex.state.mu.autoRetryReason; err != nil && errors.As(err, &minTSErr) { nextMax := minTSErr.MinTimestampBound ex.extraTxnState.descCollection.SetMaxTimestampBound(nextMax) evalCtx.AsOfSystemTime.MaxTimestampBound = nextMax @@ -2842,9 +2830,6 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( } advInfo := ex.state.consumeAdvanceInfo() - if advInfo.code == rewind { - atomic.AddInt32(ex.extraTxnState.atomicAutoRetryCounter, 1) - } // If we had an error from DDL statement execution due to the presence of // other concurrent schema changes when attempting a schema change, wait for @@ -2872,8 +2857,6 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( } } case txnStart: - atomic.StoreInt32(ex.extraTxnState.atomicAutoRetryCounter, 0) - ex.extraTxnState.autoRetryReason = nil ex.extraTxnState.firstStmtExecuted = false ex.recordTransactionStart(advInfo.txnEvent.txnID) // Start of the transaction, so no statements were executed earlier. @@ -3047,8 +3030,8 @@ func (ex *connExecutor) serialize() serverpb.Session { var autoRetryReasonStr string - if ex.extraTxnState.autoRetryReason != nil { - autoRetryReasonStr = ex.extraTxnState.autoRetryReason.Error() + if ex.state.mu.autoRetryReason != nil { + autoRetryReasonStr = ex.state.mu.autoRetryReason.Error() } if txn != nil { @@ -3058,7 +3041,7 @@ func (ex *connExecutor) serialize() serverpb.Session { Start: ex.state.mu.txnStart, NumStatementsExecuted: int32(ex.state.mu.stmtCount), NumRetries: int32(txn.Epoch()), - NumAutoRetries: atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter), + NumAutoRetries: ex.state.mu.autoRetryCounter, TxnDescription: txn.String(), Implicit: ex.implicitTxn(), AllocBytes: ex.state.mon.AllocBytes(), diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 592f47ec5681..556ab7b234c4 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -18,7 +18,6 @@ import ( "runtime/pprof" "strconv" "strings" - "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -1060,7 +1059,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( planner.maybeLogStatement( ctx, ex.executorType, - int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), + int(ex.state.mu.autoRetryCounter), ex.extraTxnState.txnCounter, res.RowsAffected(), res.Err(), @@ -1138,9 +1137,9 @@ func (ex *connExecutor) dispatchToExecutionEngine( planner.curPlan.flags.Set(planFlagNotDistributed) } - ex.sessionTracing.TraceRetryInformation(ctx, int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), ex.extraTxnState.autoRetryReason) - if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.extraTxnState.autoRetryReason != nil { - ex.server.cfg.TestingKnobs.OnTxnRetry(ex.extraTxnState.autoRetryReason, planner.EvalContext()) + ex.sessionTracing.TraceRetryInformation(ctx, int(ex.state.mu.autoRetryCounter), ex.state.mu.autoRetryReason) + if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.state.mu.autoRetryReason != nil { + ex.server.cfg.TestingKnobs.OnTxnRetry(ex.state.mu.autoRetryReason, planner.EvalContext()) } distribute := DistributionType(DistributionTypeNone) if distributePlan.WillDistribute() { @@ -1175,7 +1174,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( // plan has not been closed earlier. ex.recordStatementSummary( ctx, planner, - int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), res.RowsAffected(), res.Err(), stats, + int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats, ) if ex.server.cfg.TestingKnobs.AfterExecute != nil { ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err()) @@ -2145,7 +2144,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) { ex.extraTxnState.rowsWritten = 0 if ex.server.cfg.TestingKnobs.BeforeRestart != nil { - ex.server.cfg.TestingKnobs.BeforeRestart(ctx, ex.extraTxnState.autoRetryReason) + ex.server.cfg.TestingKnobs.BeforeRestart(ctx, ex.state.mu.autoRetryReason) } } } @@ -2249,7 +2248,7 @@ func (ex *connExecutor) recordTransactionFinish( TransactionTimeSec: txnTime.Seconds(), Committed: ev.eventType == txnCommit, ImplicitTxn: implicit, - RetryCount: int64(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), + RetryCount: int64(ex.state.mu.autoRetryCounter), StatementFingerprintIDs: ex.extraTxnState.transactionStatementFingerprintIDs, ServiceLatency: txnServiceLat, RetryLatency: txnRetryLat, diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index cbfbeb21e6a2..0570a993ff2f 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -465,6 +465,11 @@ func noTxnToOpen(args fsm.Args) error { payload := args.Payload.(eventTxnStartPayload) ts := args.Extended.(*txnState) + ts.mu.Lock() + ts.mu.autoRetryCounter = 0 + ts.mu.autoRetryReason = nil + ts.mu.Unlock() + txnTyp := explicitTxn advCode := advanceOne if ev.ImplicitTxn.Get() { @@ -530,9 +535,12 @@ func prepareTxnForRetry(args fsm.Args) error { } func prepareTxnForRetryWithRewind(args fsm.Args) error { + pl := args.Payload.(eventRetriableErrPayload) ts := args.Extended.(*txnState) ts.mu.Lock() ts.mu.txn.PrepareForRetry(ts.Ctx) + ts.mu.autoRetryReason = pl.err + ts.mu.autoRetryCounter += 1 ts.mu.Unlock() // The caller will call rewCap.rewindAndUnlock(). ts.setAdvanceInfo( diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 253b756795d8..2abfaefa3b45 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -60,6 +60,16 @@ type txnState struct { // stmtCount keeps track of the number of statements that the transaction // has executed. stmtCount int + + // autoRetryReason records the error causing an auto-retryable error event if + // the current transaction is being automatically retried. This is used in + // statement traces to give more information in statement diagnostic bundles. + autoRetryReason error + + // autoRetryCounter keeps track of the which iteration of a transaction + // auto-retry we're currently in. It's 0 whenever the transaction state is not + // stateOpen. + autoRetryCounter int32 } // connCtx is the connection's context. This is the parent of Ctx. diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index bf4ebfe1f731..f4cb814812ce 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" ) var noRewindExpected = CmdPos(-1) @@ -279,6 +280,12 @@ func TestTransitions(t *testing.T) { // If nil, the kv txn is expected to be nil. Otherwise, the txn's fields are // compared. expTxn *expKVTxn + + // The expected value of AutoRetryError after fsm transition. + expAutoRetryError string + + // The expected value of AutoRetryCounter after fsm transition. + expAutoRetryCounter int32 } tests := []test{ // @@ -397,7 +404,9 @@ func TestTransitions(t *testing.T) { expEv: txnRestart, }, // Expect non-nil txn. - expTxn: &expKVTxn{}, + expTxn: &expKVTxn{}, + expAutoRetryCounter: 1, + expAutoRetryError: "test retriable err", }, { // Like the above test - get a retriable error while we can auto-retry, @@ -421,7 +430,8 @@ func TestTransitions(t *testing.T) { expEv: txnRestart, }, // Expect non-nil txn. - expTxn: &expKVTxn{}, + expTxn: &expKVTxn{}, + expAutoRetryCounter: 1, }, { // Get a retriable error when we can no longer auto-retry, but the client @@ -720,6 +730,12 @@ func TestTransitions(t *testing.T) { t.Fatal(err) } } + + // Check that AutoRetry information is in the expected state. + require.Equal(t, tc.expAutoRetryCounter, ts.mu.autoRetryCounter) + if tc.expAutoRetryError != "" { + strings.Contains(ts.mu.autoRetryReason.Error(), tc.expAutoRetryError) + } }) } }