Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: simplifiy tracking of injected txn retry errors #108817

Merged
merged 2 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 12 additions & 19 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,8 +1012,8 @@ func (ex *connExecutor) execStmtInOpenState(
stmtCtx = ctx
}

var rollbackSP *tree.RollbackToSavepoint
var r *tree.ReleaseSavepoint
var rollbackHomeRegionSavepoint *tree.RollbackToSavepoint
var releaseHomeRegionSavepoint *tree.ReleaseSavepoint
enforceHomeRegion := p.EnforceHomeRegion()
_, isSelectStmt := stmt.AST.(*tree.Select)
// TODO(sql-sessions): ensure this is not broken for pausable portals.
Expand All @@ -1024,28 +1024,23 @@ func (ex *connExecutor) execStmtInOpenState(
// historical timestamp (so that the locality-optimized ops used for error
// reporting can run locally and not incur latency). This is currently only
// supported for SELECT statements.
var b strings.Builder
b.WriteString("enforce_home_region_sp")
// Add some unprintable ASCII characters to the name of the savepoint to
// decrease the likelihood of collision with a user-created savepoint.
b.WriteRune(rune(0x11))
b.WriteRune(rune(0x12))
b.WriteRune(rune(0x13))
enforceHomeRegionSavepointName := tree.Name(b.String())
const enforceHomeRegionSavepointName = "enforce_home_region_sp\x11\x12\x13"
s := &tree.Savepoint{Name: enforceHomeRegionSavepointName}
var event fsm.Event
var eventPayload fsm.EventPayload
if event, eventPayload, err = ex.execSavepointInOpenState(ctx, s, res); err != nil {
return event, eventPayload, err
}

r = &tree.ReleaseSavepoint{Savepoint: enforceHomeRegionSavepointName}
rollbackSP = &tree.RollbackToSavepoint{Savepoint: enforceHomeRegionSavepointName}
releaseHomeRegionSavepoint = &tree.ReleaseSavepoint{Savepoint: enforceHomeRegionSavepointName}
rollbackHomeRegionSavepoint = &tree.RollbackToSavepoint{Savepoint: enforceHomeRegionSavepointName}
defer func() {
// The default case is to roll back the internally-generated savepoint
// after every request. We only need it if a retryable "query has no home
// region" error occurs.
ex.execRelease(ctx, r, res)
ex.execRelease(ctx, releaseHomeRegionSavepoint, res)
}()
}

Expand Down Expand Up @@ -1082,14 +1077,14 @@ func (ex *connExecutor) execStmtInOpenState(
p.EvalContext().Locality = p.EvalContext().OriginalLocality
}
if execinfra.IsDynamicQueryHasNoHomeRegionError(err) {
if rollbackSP != nil {
if rollbackHomeRegionSavepoint != nil {
// A retryable "query has no home region" error has occurred.
// Roll back to the internal savepoint in preparation for the next
// planning and execution of this query with a different gateway region
// (as considered by the optimizer).
p.StmtNoConstantsWithHomeRegionEnforced = p.stmt.StmtNoConstants
event, eventPayload := ex.execRollbackToSavepointInOpenState(
ctx, rollbackSP, res,
ctx, rollbackHomeRegionSavepoint, res,
)
_, isTxnRestart := event.(eventTxnRestart)
rollbackToSavepointFailed := !isTxnRestart || eventPayload != nil
Expand Down Expand Up @@ -1310,12 +1305,11 @@ func (ex *connExecutor) commitSQLTransaction(
ex.extraTxnState.idleLatency += ex.statsCollector.PhaseTimes().
GetIdleLatency(ex.statsCollector.PreviousPhaseTimes())
if ex.sessionData().InjectRetryErrorsOnCommitEnabled && ast.StatementTag() == "COMMIT" {
if ex.planner.Txn().Epoch() < ex.state.lastEpoch+numTxnRetryErrors {
if ex.state.injectedTxnRetryCounter < numTxnRetryErrors {
retryErr := ex.state.mu.txn.GenerateForcedRetryableErr(
ctx, "injected by `inject_retry_errors_on_commit_enabled` session variable")
ex.state.injectedTxnRetryCounter++
return ex.makeErrEvent(retryErr, ast)
} else {
ex.state.lastEpoch = ex.planner.Txn().Epoch()
}
}
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionStartTransactionCommit, timeutil.Now())
Expand Down Expand Up @@ -1745,12 +1739,11 @@ func (ex *connExecutor) dispatchToExecutionEngine(
isSetOrShow := stmt.AST.StatementTag() == "SET" || stmt.AST.StatementTag() == "SHOW"
if ex.sessionData().InjectRetryErrorsEnabled && !isSetOrShow &&
planner.Txn().Sender().TxnStatus() == roachpb.PENDING {
if planner.Txn().Epoch() < ex.state.lastEpoch+numTxnRetryErrors {
if ex.state.injectedTxnRetryCounter < numTxnRetryErrors {
retryErr := planner.Txn().GenerateForcedRetryableErr(
ctx, "injected by `inject_retry_errors_enabled` session variable")
res.SetError(retryErr)
} else {
ex.state.lastEpoch = planner.Txn().Epoch()
ex.state.injectedTxnRetryCounter++
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/txn_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -108,8 +107,10 @@ type txnState struct {
// through the use of AS OF SYSTEM TIME.
isHistorical bool

// lastEpoch is the last observed epoch in the current txn.
lastEpoch enginepb.TxnEpoch
// injectedTxnRetryCounter keeps track of how many errors have been
// injected in this transaction with the inject_retry_errors_enabled
// flag.
injectedTxnRetryCounter int

// mon tracks txn-bound objects like the running state of
// planNode in the midst of performing a computation.
Expand Down Expand Up @@ -191,7 +192,7 @@ func (ts *txnState) resetForNewSQLTxn(
// Reset state vars to defaults.
ts.sqlTimestamp = sqlTimestamp
ts.isHistorical = false
ts.lastEpoch = 0
ts.injectedTxnRetryCounter = 0

// Create a context for this transaction. It will include a root span that
// will contain everything executed as part of the upcoming SQL txn, including
Expand Down