Skip to content

Commit

Permalink
sql: fix connExecutor extraTxnState autoRetryCounter race condition
Browse files Browse the repository at this point in the history
Previously, a race condition could happen when calling serialize()
which read the ExtraTxnState.autoRetryCounter and the counter
being incremented.

One case this can happen is when autoRetryCounter is incremented
while querying crdb_internal.node_sessions.

Release note: None
  • Loading branch information
RichardJCai committed Feb 3, 2022
1 parent ecab739 commit dc5f8a8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
12 changes: 7 additions & 5 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,8 @@ func (s *Server) newConnExecutor(

ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}

ex.extraTxnState.atomicAutoRetryCounter = new(int32)

ex.initPlanner(ctx, &ex.planner)

return ex
Expand Down Expand Up @@ -1164,10 +1166,10 @@ type connExecutor struct {
// transaction and it is cleared after the transaction is committed.
schemaChangeJobRecords map[descpb.ID]*jobs.Record

// autoRetryCounter keeps track of the which iteration of a transaction
// atomicAutoRetryCounter keeps track of the which iteration of a transaction
// auto-retry we're currently in. It's 0 whenever the transaction state is not
// stateOpen.
autoRetryCounter int
atomicAutoRetryCounter *int32

// autoRetryReason records the error causing an auto-retryable error event if
// the current transaction is being automatically retried. This is used in
Expand Down Expand Up @@ -2660,7 +2662,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(

advInfo := ex.state.consumeAdvanceInfo()
if advInfo.code == rewind {
ex.extraTxnState.autoRetryCounter++
atomic.AddInt32(ex.extraTxnState.atomicAutoRetryCounter, 1)
}

// If we had an error from DDL statement execution due to the presence of
Expand Down Expand Up @@ -2689,7 +2691,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}
}
case txnStart:
ex.extraTxnState.autoRetryCounter = 0
atomic.StoreInt32(ex.extraTxnState.atomicAutoRetryCounter, 0)
ex.extraTxnState.autoRetryReason = nil
ex.recordTransactionStart()
// Bump the txn counter for logging.
Expand Down Expand Up @@ -2859,7 +2861,7 @@ func (ex *connExecutor) serialize() serverpb.Session {
Start: ex.state.mu.txnStart,
NumStatementsExecuted: int32(ex.state.mu.stmtCount),
NumRetries: int32(txn.Epoch()),
NumAutoRetries: int32(ex.extraTxnState.autoRetryCounter),
NumAutoRetries: atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter),
TxnDescription: txn.String(),
Implicit: ex.implicitTxn(),
AllocBytes: ex.state.mon.AllocBytes(),
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"runtime/pprof"
"strings"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -1004,7 +1005,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
planner.maybeLogStatement(
ctx,
ex.executorType,
ex.extraTxnState.autoRetryCounter,
int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)),
ex.extraTxnState.txnCounter,
res.RowsAffected(),
res.Err(),
Expand Down Expand Up @@ -1080,7 +1081,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
planner.curPlan.flags.Set(planFlagNotDistributed)
}

ex.sessionTracing.TraceRetryInformation(ctx, ex.extraTxnState.autoRetryCounter, ex.extraTxnState.autoRetryReason)
ex.sessionTracing.TraceRetryInformation(ctx, int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), ex.extraTxnState.autoRetryReason)
if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.extraTxnState.autoRetryReason != nil {
ex.server.cfg.TestingKnobs.OnTxnRetry(ex.extraTxnState.autoRetryReason, planner.EvalContext())
}
Expand Down Expand Up @@ -1113,7 +1114,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
// plan has not been closed earlier.
ex.recordStatementSummary(
ctx, planner,
ex.extraTxnState.autoRetryCounter, res.RowsAffected(), res.Err(), stats,
int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), res.RowsAffected(), res.Err(), stats,
)
if ex.server.cfg.TestingKnobs.AfterExecute != nil {
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err())
Expand Down Expand Up @@ -2014,7 +2015,7 @@ func (ex *connExecutor) recordTransaction(
TransactionTimeSec: txnTime.Seconds(),
Committed: ev == txnCommit,
ImplicitTxn: implicit,
RetryCount: int64(ex.extraTxnState.autoRetryCounter),
RetryCount: int64(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)),
StatementFingerprintIDs: ex.extraTxnState.transactionStatementFingerprintIDs,
ServiceLatency: txnServiceLat,
RetryLatency: txnRetryLat,
Expand Down

0 comments on commit dc5f8a8

Please sign in to comment.