Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: put connExecutor's AutoRetry fields into txnState's mutex #82561

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 9 additions & 26 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,9 +932,6 @@ func (s *Server) newConnExecutor(
ex.transitionCtx.sessionTracing = &ex.sessionTracing

ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}

ex.extraTxnState.atomicAutoRetryCounter = new(int32)

ex.extraTxnState.createdSequences = make(map[descpb.ID]struct{})

ex.initPlanner(ctx, &ex.planner)
Expand Down Expand Up @@ -1222,7 +1219,11 @@ type connExecutor struct {
// ex.state, above. The rule of thumb is that, if the state influences state
// transitions, it should live in state, otherwise it can live here.
// This is only used in the Open state. extraTxnState is reset whenever a
// transaction finishes or gets retried.
// transaction finishes or gets retried. Additionally, if the field is
// accessed outside the connExecutor's goroutine, it should
// be added to the mu struct in connExecutor's txnState. Notably if
// the field is accessed in connExecutor's serialize function, it should be
// added to txnState behind the mutex.
extraTxnState struct {
// descCollection collects descriptors used by the current transaction.
descCollection descs.Collection
Expand All @@ -1240,16 +1241,6 @@ type connExecutor struct {
// transaction and it is cleared after the transaction is committed.
schemaChangeJobRecords map[descpb.ID]*jobs.Record

// atomicAutoRetryCounter keeps track of the which iteration of a transaction
// auto-retry we're currently in. It's 0 whenever the transaction state is not
// stateOpen.
atomicAutoRetryCounter *int32

// autoRetryReason records the error causing an auto-retryable error event if
// the current transaction is being automatically retried. This is used in
// statement traces to give more information in statement diagnostic bundles.
autoRetryReason error

// firstStmtExecuted indicates that the first statement inside this
// transaction has been executed.
firstStmtExecuted bool
Expand Down Expand Up @@ -2537,9 +2528,6 @@ func (ex *connExecutor) makeErrEvent(err error, stmt tree.Statement) (fsm.Event,
if ex.implicitTxn() || !ex.sessionData().InjectRetryErrorsEnabled {
rc, canAutoRetry = ex.getRewindTxnCapability()
}
if canAutoRetry {
ex.extraTxnState.autoRetryReason = err
}

ev := eventRetriableErr{
IsCommit: fsm.FromBool(isCommit(stmt)),
Expand Down Expand Up @@ -2735,7 +2723,7 @@ func (ex *connExecutor) resetEvalCtx(evalCtx *extendedEvalContext, txn *kv.Txn,
// to the point just before our failed read to ensure we don't try to read
// data which may be after the schema change when we retry.
var minTSErr *roachpb.MinTimestampBoundUnsatisfiableError
if err := ex.extraTxnState.autoRetryReason; err != nil && errors.As(err, &minTSErr) {
if err := ex.state.mu.autoRetryReason; err != nil && errors.As(err, &minTSErr) {
nextMax := minTSErr.MinTimestampBound
ex.extraTxnState.descCollection.SetMaxTimestampBound(nextMax)
evalCtx.AsOfSystemTime.MaxTimestampBound = nextMax
Expand Down Expand Up @@ -2842,9 +2830,6 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}

advInfo := ex.state.consumeAdvanceInfo()
if advInfo.code == rewind {
atomic.AddInt32(ex.extraTxnState.atomicAutoRetryCounter, 1)
}

// If we had an error from DDL statement execution due to the presence of
// other concurrent schema changes when attempting a schema change, wait for
Expand Down Expand Up @@ -2872,8 +2857,6 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}
}
case txnStart:
atomic.StoreInt32(ex.extraTxnState.atomicAutoRetryCounter, 0)
ex.extraTxnState.autoRetryReason = nil
ex.extraTxnState.firstStmtExecuted = false
ex.recordTransactionStart(advInfo.txnEvent.txnID)
// Start of the transaction, so no statements were executed earlier.
Expand Down Expand Up @@ -3047,8 +3030,8 @@ func (ex *connExecutor) serialize() serverpb.Session {

var autoRetryReasonStr string

if ex.extraTxnState.autoRetryReason != nil {
autoRetryReasonStr = ex.extraTxnState.autoRetryReason.Error()
if ex.state.mu.autoRetryReason != nil {
autoRetryReasonStr = ex.state.mu.autoRetryReason.Error()
}

if txn != nil {
Expand All @@ -3058,7 +3041,7 @@ func (ex *connExecutor) serialize() serverpb.Session {
Start: ex.state.mu.txnStart,
NumStatementsExecuted: int32(ex.state.mu.stmtCount),
NumRetries: int32(txn.Epoch()),
NumAutoRetries: atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter),
NumAutoRetries: ex.state.mu.autoRetryCounter,
TxnDescription: txn.String(),
Implicit: ex.implicitTxn(),
AllocBytes: ex.state.mon.AllocBytes(),
Expand Down
15 changes: 7 additions & 8 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"runtime/pprof"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -1060,7 +1059,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
planner.maybeLogStatement(
ctx,
ex.executorType,
int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)),
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
res.RowsAffected(),
res.Err(),
Expand Down Expand Up @@ -1138,9 +1137,9 @@ func (ex *connExecutor) dispatchToExecutionEngine(
planner.curPlan.flags.Set(planFlagNotDistributed)
}

ex.sessionTracing.TraceRetryInformation(ctx, int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), ex.extraTxnState.autoRetryReason)
if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.extraTxnState.autoRetryReason != nil {
ex.server.cfg.TestingKnobs.OnTxnRetry(ex.extraTxnState.autoRetryReason, planner.EvalContext())
ex.sessionTracing.TraceRetryInformation(ctx, int(ex.state.mu.autoRetryCounter), ex.state.mu.autoRetryReason)
if ex.server.cfg.TestingKnobs.OnTxnRetry != nil && ex.state.mu.autoRetryReason != nil {
ex.server.cfg.TestingKnobs.OnTxnRetry(ex.state.mu.autoRetryReason, planner.EvalContext())
}
distribute := DistributionType(DistributionTypeNone)
if distributePlan.WillDistribute() {
Expand Down Expand Up @@ -1175,7 +1174,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
// plan has not been closed earlier.
ex.recordStatementSummary(
ctx, planner,
int(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)), res.RowsAffected(), res.Err(), stats,
int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats,
)
if ex.server.cfg.TestingKnobs.AfterExecute != nil {
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err())
Expand Down Expand Up @@ -2145,7 +2144,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) {
ex.extraTxnState.rowsWritten = 0

if ex.server.cfg.TestingKnobs.BeforeRestart != nil {
ex.server.cfg.TestingKnobs.BeforeRestart(ctx, ex.extraTxnState.autoRetryReason)
ex.server.cfg.TestingKnobs.BeforeRestart(ctx, ex.state.mu.autoRetryReason)
}
}
}
Expand Down Expand Up @@ -2249,7 +2248,7 @@ func (ex *connExecutor) recordTransactionFinish(
TransactionTimeSec: txnTime.Seconds(),
Committed: ev.eventType == txnCommit,
ImplicitTxn: implicit,
RetryCount: int64(atomic.LoadInt32(ex.extraTxnState.atomicAutoRetryCounter)),
RetryCount: int64(ex.state.mu.autoRetryCounter),
StatementFingerprintIDs: ex.extraTxnState.transactionStatementFingerprintIDs,
ServiceLatency: txnServiceLat,
RetryLatency: txnRetryLat,
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/conn_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,17 @@ func prepareTxnForRetry(args fsm.Args) error {
}

func prepareTxnForRetryWithRewind(args fsm.Args) error {
pl := args.Payload.(eventRetriableErrPayload)
ts := args.Extended.(*txnState)
ts.mu.Lock()
ts.mu.txn.PrepareForRetry(ts.Ctx)
ts.mu.autoRetryReason = pl.err
ts.mu.autoRetryCounter++
ts.mu.Unlock()
// The caller will call rewCap.rewindAndUnlock().
ts.setAdvanceInfo(
rewind,
args.Payload.(eventRetriableErrPayload).rewCap,
pl.rewCap,
txnEvent{eventType: txnRestart},
)
return nil
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/txn_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ type txnState struct {
// stmtCount keeps track of the number of statements that the transaction
// has executed.
stmtCount int

// autoRetryReason records the error causing an auto-retryable error event if
// the current transaction is being automatically retried. This is used in
// statement traces to give more information in statement diagnostic bundles.
autoRetryReason error

// autoRetryCounter keeps track of the which iteration of a transaction
// auto-retry we're currently in. It's 0 whenever the transaction state is not
// stateOpen.
autoRetryCounter int32
}

// connCtx is the connection's context. This is the parent of Ctx.
Expand Down Expand Up @@ -215,6 +225,8 @@ func (ts *txnState) resetForNewSQLTxn(
txnID = ts.mu.txn.ID()
sp.SetTag("txn", attribute.StringValue(ts.mu.txn.ID().String()))
ts.mu.txnStart = timeutil.Now()
ts.mu.autoRetryCounter = 0
ts.mu.autoRetryReason = nil
ts.mu.Unlock()
if historicalTimestamp != nil {
if err := ts.setHistoricalTimestamp(ts.Ctx, *historicalTimestamp); err != nil {
Expand Down
20 changes: 18 additions & 2 deletions pkg/sql/txn_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
)

var noRewindExpected = CmdPos(-1)
Expand Down Expand Up @@ -279,6 +280,12 @@ func TestTransitions(t *testing.T) {
// If nil, the kv txn is expected to be nil. Otherwise, the txn's fields are
// compared.
expTxn *expKVTxn

// The expected value of AutoRetryError after fsm transition.
expAutoRetryError string

// The expected value of AutoRetryCounter after fsm transition.
expAutoRetryCounter int32
}
tests := []test{
//
Expand Down Expand Up @@ -397,7 +404,9 @@ func TestTransitions(t *testing.T) {
expEv: txnRestart,
},
// Expect non-nil txn.
expTxn: &expKVTxn{},
expTxn: &expKVTxn{},
expAutoRetryCounter: 1,
expAutoRetryError: "test retriable err",
},
{
// Like the above test - get a retriable error while we can auto-retry,
Expand All @@ -421,7 +430,8 @@ func TestTransitions(t *testing.T) {
expEv: txnRestart,
},
// Expect non-nil txn.
expTxn: &expKVTxn{},
expTxn: &expKVTxn{},
expAutoRetryCounter: 1,
},
{
// Get a retriable error when we can no longer auto-retry, but the client
Expand Down Expand Up @@ -720,6 +730,12 @@ func TestTransitions(t *testing.T) {
t.Fatal(err)
}
}

// Check that AutoRetry information is in the expected state.
require.Equal(t, tc.expAutoRetryCounter, ts.mu.autoRetryCounter)
if tc.expAutoRetryError != "" {
strings.Contains(ts.mu.autoRetryReason.Error(), tc.expAutoRetryError)
}
})
}
}