Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: move the existing savepoint logic to a separate file #44684

Merged
merged 1 commit into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 32 additions & 160 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -37,13 +36,6 @@ import (
"github.com/cockroachdb/errors"
)

// RestartSavepointName is the only savepoint ident that we accept.
const RestartSavepointName string = "cockroach_restart"

var errSavepointNotUsed = pgerror.Newf(
pgcode.SavepointException,
"savepoint %s has not been used", RestartSavepointName)

// execStmt executes one statement by dispatching according to the current
// state. Returns an Event to be passed to the state machine, or nil if no
// transition is needed. If nil is returned, then the cursor is supposed to
Expand Down Expand Up @@ -233,62 +225,19 @@ func (ex *connExecutor) execStmtInOpenState(
ev, payload := ex.commitSQLTransaction(ctx, stmt.AST)
return ev, payload, nil

case *tree.ReleaseSavepoint:
if err := ex.validateSavepointName(s.Savepoint); err != nil {
return makeErrEvent(err)
}
if !ex.machine.CurState().(stateOpen).RetryIntent.Get() {
return makeErrEvent(errSavepointNotUsed)
}

// ReleaseSavepoint is executed fully here; there's no plan for it.
ev, payload := ex.commitSQLTransaction(ctx, stmt.AST)
res.ResetStmtType((*tree.CommitTransaction)(nil))
return ev, payload, nil

case *tree.RollbackTransaction:
// RollbackTransaction is executed fully here; there's no plan for it.
ev, payload := ex.rollbackSQLTransaction(ctx)
return ev, payload, nil

case *tree.Savepoint:
// Ensure that the user isn't trying to run BEGIN; SAVEPOINT; SAVEPOINT;
if ex.state.activeSavepointName != "" {
err := unimplemented.NewWithIssueDetail(10735, "nested", "SAVEPOINT may not be nested")
return makeErrEvent(err)
}
if err := ex.validateSavepointName(s.Name); err != nil {
return makeErrEvent(err)
}
// We want to disallow SAVEPOINTs to be issued after a KV transaction has
// started running. The client txn's statement count indicates how many
// statements have been executed as part of this transaction. It is
// desirable to allow metadata queries against vtables to proceed
// before starting a SAVEPOINT for better ORM compatibility.
// See also:
// https://github.com/cockroachdb/cockroach/issues/15012
if ex.state.mu.txn.Active() {
err := pgerror.Newf(pgcode.Syntax,
"SAVEPOINT %s needs to be the first statement in a "+
"transaction", RestartSavepointName)
return makeErrEvent(err)
}
ex.state.activeSavepointName = s.Name
// Note that Savepoint doesn't have a corresponding plan node.
// This here is all the execution there is.
return eventRetryIntentSet{}, nil /* payload */, nil
return ex.execSavepointInOpenState(ctx, s, res)

case *tree.RollbackToSavepoint:
if err := ex.validateSavepointName(s.Savepoint); err != nil {
return makeErrEvent(err)
}
if !os.RetryIntent.Get() {
return makeErrEvent(errSavepointNotUsed)
}
ex.state.activeSavepointName = ""
case *tree.ReleaseSavepoint:
return ex.execReleaseSavepointInOpenState(ctx, s, res)

res.ResetStmtType((*tree.Savepoint)(nil))
return eventTxnRestart{}, nil /* payload */, nil
case *tree.RollbackToSavepoint:
return ex.execRollbackToSavepointInOpenState(ctx, s, res)

case *tree.Prepare:
// This is handling the SQL statement "PREPARE". See execPrepare for
Expand Down Expand Up @@ -614,25 +563,32 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error
return retryErr
}

// commitSQLTransaction executes a commit after the execution of a stmt,
// which can be any statement when executing a statement with an implicit
// transaction, or a COMMIT or RELEASE SAVEPOINT statement when using
// an explicit transaction.
// commitSQLTransaction executes a commit after the execution of a
// stmt, which can be any statement when executing a statement with an
// implicit transaction, or a COMMIT statement when using an explicit
// transaction.
func (ex *connExecutor) commitSQLTransaction(
ctx context.Context, stmt tree.Statement,
) (fsm.Event, fsm.EventPayload) {
ex.state.activeSavepointName = ""
isRelease := false
if _, ok := stmt.(*tree.ReleaseSavepoint); ok {
isRelease = true
}
ev, payload, _ := ex.commitSQLTransactionInternal(ctx, stmt)
return ev, payload
}

// commitSQLTransactionInternal is the part of a commit common to
// commitSQLTransaction and runReleaseRestartSavepointAsTxnCommit.
func (ex *connExecutor) commitSQLTransactionInternal(
ctx context.Context, stmt tree.Statement,
) (ev fsm.Event, payload fsm.EventPayload, ok bool) {
ex.clearSavepoints()

if err := ex.checkTableTwoVersionInvariant(ctx); err != nil {
return ex.makeErrEvent(err, stmt)
ev, payload = ex.makeErrEvent(err, stmt)
return ev, payload, false
}

if err := ex.state.mu.txn.Commit(ctx); err != nil {
return ex.makeErrEvent(err, stmt)
ev, payload = ex.makeErrEvent(err, stmt)
return ev, payload, false
}

// Now that we've committed, if we modified any table we need to make sure
Expand All @@ -642,16 +598,14 @@ func (ex *connExecutor) commitSQLTransaction(
ex.extraTxnState.tables.releaseLeases(ctx)
}

if !isRelease {
return eventTxnFinish{}, eventTxnFinishPayload{commit: true}
}
return eventTxnReleased{}, nil
return eventTxnFinish{}, eventTxnFinishPayload{commit: true}, true
}

// rollbackSQLTransaction executes a ROLLBACK statement: the KV transaction is
// rolled-back and an event is produced.
func (ex *connExecutor) rollbackSQLTransaction(ctx context.Context) (fsm.Event, fsm.EventPayload) {
ex.state.activeSavepointName = ""
ex.clearSavepoints()

if err := ex.state.mu.txn.Rollback(ctx); err != nil {
log.Warningf(ctx, "txn rollback failed: %s", err)
}
Expand Down Expand Up @@ -1007,85 +961,25 @@ func (ex *connExecutor) execStmtInAbortedState(
ev, payload := ex.rollbackSQLTransaction(ctx)
return ev, payload
}
ex.state.activeSavepointName = ""
ex.clearSavepoints()

// Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too.
res.ResetStmtType((*tree.RollbackTransaction)(nil))

return eventTxnFinish{}, eventTxnFinishPayload{commit: false}
case *tree.RollbackToSavepoint, *tree.Savepoint:
// We accept both the "ROLLBACK TO SAVEPOINT cockroach_restart" and the
// "SAVEPOINT cockroach_restart" commands to indicate client intent to
// retry a transaction in a RestartWait state.
var spName tree.Name
var isRollback bool
switch n := s.(type) {
case *tree.RollbackToSavepoint:
spName = n.Savepoint
isRollback = true
case *tree.Savepoint:
spName = n.Name
default:
panic("unreachable")
}
// If the user issued a SAVEPOINT in the abort state, validate
// as though there were no active savepoint.
if !isRollback {
ex.state.activeSavepointName = ""
}
if err := ex.validateSavepointName(spName); err != nil {
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{
err: err,
}
return ev, payload
}
// Either clear or reset the current savepoint name so that
// ROLLBACK TO; SAVEPOINT; works.
if isRollback {
ex.state.activeSavepointName = ""
} else {
ex.state.activeSavepointName = spName
}

if !(inRestartWait || ex.machine.CurState().(stateAborted).RetryIntent.Get()) {
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{
err: errSavepointNotUsed,
}
return ev, payload
}
case *tree.RollbackToSavepoint:
return ex.execRollbackToSavepointInAbortedState(ctx, inRestartWait, s, res)

res.ResetStmtType((*tree.RollbackTransaction)(nil))
case *tree.Savepoint:
return ex.execSavepointInAbortedState(ctx, inRestartWait, s, res)

if inRestartWait {
return eventTxnRestart{}, nil
}
// We accept ROLLBACK TO SAVEPOINT even after non-retryable errors to make
// it easy for client libraries that want to indiscriminately issue
// ROLLBACK TO SAVEPOINT after every error and possibly follow it with a
// ROLLBACK and also because we accept ROLLBACK TO SAVEPOINT in the Open
// state, so this is consistent.
// We start a new txn with the same sql timestamp and isolation as the
// current one.

ev := eventTxnStart{
ImplicitTxn: fsm.False,
}
rwMode := tree.ReadWrite
if ex.state.readOnly {
rwMode = tree.ReadOnly
}
payload := makeEventTxnStartPayload(
ex.state.priority, rwMode, ex.state.sqlTimestamp,
nil /* historicalTimestamp */, ex.transitionCtx)
return ev, payload
default:
ev := eventNonRetriableErr{IsCommit: fsm.False}
if inRestartWait {
payload := eventNonRetriableErrPayload{
err: sqlbase.NewTransactionAbortedError(
"Expected \"ROLLBACK TO SAVEPOINT COCKROACH_RESTART\"" /* customMsg */),
"Expected \"ROLLBACK TO SAVEPOINT cockroach_restart\"" /* customMsg */),
}
return ev, payload
}
Expand Down Expand Up @@ -1328,28 +1222,6 @@ func payloadHasError(payload fsm.EventPayload) bool {
return hasErr
}

// validateSavepointName validates that it is that the provided ident
// matches the active savepoint name, begins with RestartSavepointName,
// or that force_savepoint_restart==true. We accept everything with the
// desired prefix because at least the C++ libpqxx appends sequence
// numbers to the savepoint name specified by the user.
func (ex *connExecutor) validateSavepointName(savepoint tree.Name) error {
if ex.state.activeSavepointName != "" {
if savepoint == ex.state.activeSavepointName {
return nil
}
return pgerror.Newf(pgcode.InvalidSavepointSpecification,
`SAVEPOINT %q is in use`, tree.ErrString(&ex.state.activeSavepointName))
}
if !ex.sessionData.ForceSavepointRestart && !strings.HasPrefix(string(savepoint), RestartSavepointName) {
return unimplemented.NewWithIssueHint(10735,
"SAVEPOINT not supported except for "+RestartSavepointName,
"Retryable transactions with arbitrary SAVEPOINT names can be enabled "+
"with SET force_savepoint_restart=true")
}
return nil
}

// recordTransactionStart records the start of the transaction and returns a
// closure to be called once the transaction finishes.
func (ex *connExecutor) recordTransactionStart() func(txnEvent) {
Expand Down
Loading