Skip to content

Commit

Permalink
sql: put connExecutor's AutoRetry fields into txnState's mutex
Browse files Browse the repository at this point in the history
Auto retry variables could be used outside the connExecutor's goroutine in calls to
serialize. If this is the case, the field should be in txnState's mutex struct.

Release note: None
  • Loading branch information
RichardJCai committed Jun 8, 2022
1 parent abc8afd commit 03d96f1
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 36 deletions.
35 changes: 9 additions & 26 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,9 +932,6 @@ func (s *Server) newConnExecutor(
ex.transitionCtx.sessionTracing = &ex.sessionTracing

ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}

ex.extraTxnState.atomicAutoRetryCounter = new(int32)

ex.extraTxnState.createdSequences = make(map[descpb.ID]struct{})

ex.initPlanner(ctx, &ex.planner)
Expand Down Expand Up @@ -1222,7 +1219,11 @@ type connExecutor struct {
// ex.state, above. The rule of thumb is that, if the state influences state
// transitions, it should live in state, otherwise it can live here.
// This is only used in the Open state. extraTxnState is reset whenever a
// transaction finishes or gets retried.
// transaction finishes or gets retried. Additionally, if the field is
// accessed outside the connExecutor's goroutine, it should
// be added to the mu struct in connExecutor's txnState. Notably if
// the field is accessed in connExecutor's serialize function, it should be
// added to txnState behind the mutex.
extraTxnState struct {
// descCollection collects descriptors used by the current transaction.
descCollection descs.Collection
Expand All @@ -1240,16 +1241,6 @@ type connExecutor struct {
// transaction and it is cleared after the transaction is committed.
schemaChangeJobRecords map[descpb.ID]*jobs.Record

// atomicAutoRetryCounter keeps track of the which iteration of a transaction
// auto-retry we're currently in. It's 0 whenever the transaction state is not
// stateOpen.
atomicAutoRetryCounter *int32

// autoRetryReason records the error causing an auto-retryable error event if
// the current transaction is being automatically retried. This is used in
// statement traces to give more information in statement diagnostic bundles.
autoRetryReason error

// firstStmtExecuted indicates that the first statement inside this
// transaction has been executed.
firstStmtExecuted bool
Expand Down Expand Up @@ -2537,9 +2528,6 @@ func (ex *connExecutor) makeErrEvent(err error, stmt tree.Statement) (fsm.Event,
if ex.implicitTxn() || !ex.sessionData().InjectRetryErrorsEnabled {
rc, canAutoRetry = ex.getRewindTxnCapability()
}
if canAutoRetry {
ex.extraTxnState.autoRetryReason = err
}

ev := eventRetriableErr{
IsCommit: fsm.FromBool(isCommit(stmt)),
Expand Down Expand Up @@ -2735,7 +2723,7 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn,
// to the point just before our failed read to ensure we don't try to read
// data which may be after the schema change when we retry.
var minTSErr *roachpb.MinTimestampBoundUnsatisfiableError
if err := ex.extraTxnState.autoRetryReason; err != nil && errors.As(err, &minTSErr) {
if err := ex.state.mu.autoRetryReason; err != nil && errors.As(err, &minTSErr) {
nextMax := minTSErr.MinTimestampBound
ex.extraTxnState.descCollection.SetMaxTimestampBound(nextMax)
evalCtx.AsOfSystemTime.MaxTimestampBound = nextMax
Expand Down Expand Up @@ -2842,9 +2830,6 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}

advInfo := ex.state.consumeAdvanceInfo()
if advInfo.code == rewind {
atomic.AddInt32(ex.extraTxnState.atomicAutoRetryCounter, 1)
}

// If we had an error from DDL statement execution due to the presence of
// other concurrent schema changes when attempting a schema change, wait for
Expand Down Expand Up @@ -2872,8 +2857,6 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}
}
case txnStart:
atomic.StoreInt32(ex.extraTxnState.atomicAutoRetryCounter, 0)
ex.extraTxnState.autoRetryReason = nil
ex.extraTxnState.firstStmtExecuted = false
ex.recordTransactionStart(advInfo.txnEvent.txnID)
// Start of the transaction, so no statements were executed earlier.
Expand Down Expand Up @@ -3047,8 +3030,8 @@ func (ex *connExecutor) serialize() serverpb.Session {

var autoRetryReasonStr string

if ex.extraTxnState.autoRetryReason != nil {
autoRetryReasonStr = ex.extraTxnState.autoRetryReason.Error()
if ex.state.mu.autoRetryReason != nil {
autoRetryReasonStr = ex.state.mu.autoRetryReason.Error()
}

if txn != nil {
Expand All @@ -3058,7 +3041,7 @@ func (ex *connExecutor) serialize() serverpb.Session {
Start: ex.state.mu.txnStart,
NumStatementsExecuted: int32(ex.state.mu.stmtCount),
NumRetries: int32(txn.Epoch()),
NumAutoRetries: atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter),
NumAutoRetries: ex.state.mu.autoRetryCounter,
TxnDescription: txn.String(),
Implicit: ex.implicitTxn(),
AllocBytes: ex.state.mon.AllocBytes(),
Expand Down
15 changes: 7 additions & 8 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"runtime/pprof"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -1060,7 +1059,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
planner.maybeLogStatement(
ctx,
ex.executorType,
int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)),
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
res.RowsAffected(),
res.Err(),
Expand Down Expand Up @@ -1138,9 +1137,9 @@ func (ex *connExecutor) dispatchToExecutionEngine(
planner.curPlan.flags.Set(planFlagNotDistributed)
}

ex.sessionTracing.TraceRetryInformation(ctx, int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), ex.extraTxnState.autoRetryReason)
if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.extraTxnState.autoRetryReason != nil {
ex.server.cfg.TestingKnobs.OnTxnRetry(ex.extraTxnState.autoRetryReason, planner.EvalContext())
ex.sessionTracing.TraceRetryInformation(ctx, int(ex.state.mu.autoRetryCounter), ex.state.mu.autoRetryReason)
if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.state.mu.autoRetryReason != nil {
ex.server.cfg.TestingKnobs.OnTxnRetry(ex.state.mu.autoRetryReason, planner.EvalContext())
}
distribute := DistributionType(DistributionTypeNone)
if distributePlan.WillDistribute() {
Expand Down Expand Up @@ -1175,7 +1174,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
// plan has not been closed earlier.
ex.recordStatementSummary(
ctx, planner,
int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), res.RowsAffected(), res.Err(), stats,
int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats,
)
if ex.server.cfg.TestingKnobs.AfterExecute != nil {
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err())
Expand Down Expand Up @@ -2145,7 +2144,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) {
ex.extraTxnState.rowsWritten = 0

if ex.server.cfg.TestingKnobs.BeforeRestart != nil {
ex.server.cfg.TestingKnobs.BeforeRestart(ctx, ex.extraTxnState.autoRetryReason)
ex.server.cfg.TestingKnobs.BeforeRestart(ctx, ex.state.mu.autoRetryReason)
}
}
}
Expand Down Expand Up @@ -2249,7 +2248,7 @@ func (ex *connExecutor) recordTransactionFinish(
TransactionTimeSec: txnTime.Seconds(),
Committed: ev.eventType == txnCommit,
ImplicitTxn: implicit,
RetryCount: int64(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)),
RetryCount: int64(ex.state.mu.autoRetryCounter),
StatementFingerprintIDs: ex.extraTxnState.transactionStatementFingerprintIDs,
ServiceLatency: txnServiceLat,
RetryLatency: txnRetryLat,
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/conn_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,11 @@ func noTxnToOpen(args fsm.Args) error {
payload := args.Payload.(eventTxnStartPayload)
ts := args.Extended.(*txnState)

ts.mu.Lock()
ts.mu.autoRetryCounter = 0
ts.mu.autoRetryReason = nil
ts.mu.Unlock()

txnTyp := explicitTxn
advCode := advanceOne
if ev.ImplicitTxn.Get() {
Expand Down Expand Up @@ -530,9 +535,12 @@ func prepareTxnForRetry(args fsm.Args) error {
}

func prepareTxnForRetryWithRewind(args fsm.Args) error {
pl := args.Payload.(eventRetriableErrPayload)
ts := args.Extended.(*txnState)
ts.mu.Lock()
ts.mu.txn.PrepareForRetry(ts.Ctx)
ts.mu.autoRetryReason = pl.err
ts.mu.autoRetryCounter += 1
ts.mu.Unlock()
// The caller will call rewCap.rewindAndUnlock().
ts.setAdvanceInfo(
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/txn_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ type txnState struct {
// stmtCount keeps track of the number of statements that the transaction
// has executed.
stmtCount int

// autoRetryReason records the error causing an auto-retryable error event if
// the current transaction is being automatically retried. This is used in
// statement traces to give more information in statement diagnostic bundles.
autoRetryReason error

// autoRetryCounter keeps track of the which iteration of a transaction
// auto-retry we're currently in. It's 0 whenever the transaction state is not
// stateOpen.
autoRetryCounter int32
}

// connCtx is the connection's context. This is the parent of Ctx.
Expand Down
20 changes: 18 additions & 2 deletions pkg/sql/txn_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
)

var noRewindExpected = CmdPos(-1)
Expand Down Expand Up @@ -279,6 +280,12 @@ func TestTransitions(t *testing.T) {
// If nil, the kv txn is expected to be nil. Otherwise, the txn's fields are
// compared.
expTxn *expKVTxn

// The expected value of AutoRetryError after fsm transition.
expAutoRetryError string

// The expected value of AutoRetryCounter after fsm transition.
expAutoRetryCounter int32
}
tests := []test{
//
Expand Down Expand Up @@ -397,7 +404,9 @@ func TestTransitions(t *testing.T) {
expEv: txnRestart,
},
// Expect non-nil txn.
expTxn: &expKVTxn{},
expTxn: &expKVTxn{},
expAutoRetryCounter: 1,
expAutoRetryError: "test retriable err",
},
{
// Like the above test - get a retriable error while we can auto-retry,
Expand All @@ -421,7 +430,8 @@ func TestTransitions(t *testing.T) {
expEv: txnRestart,
},
// Expect non-nil txn.
expTxn: &expKVTxn{},
expTxn: &expKVTxn{},
expAutoRetryCounter: 1,
},
{
// Get a retriable error when we can no longer auto-retry, but the client
Expand Down Expand Up @@ -720,6 +730,12 @@ func TestTransitions(t *testing.T) {
t.Fatal(err)
}
}

// Check that AutoRetry information is in the expected state.
require.Equal(t, tc.expAutoRetryCounter, ts.mu.autoRetryCounter)
if tc.expAutoRetryError != "" {
strings.Contains(ts.mu.autoRetryReason.Error(), tc.expAutoRetryError)
}
})
}
}

0 comments on commit 03d96f1

Please sign in to comment.