Skip to content

Commit

Permalink
sql: move the existing savepoint logic to a separate file
Browse files Browse the repository at this point in the history
This commit re-organizes the code without any functional change.

Release note: None
  • Loading branch information
knz committed Feb 5, 2020
1 parent 50b12ad commit afc823e
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 164 deletions.
192 changes: 32 additions & 160 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -37,13 +36,6 @@ import (
"github.com/cockroachdb/errors"
)

// RestartSavepointName is the only savepoint ident that we accept.
const RestartSavepointName string = "cockroach_restart"

var errSavepointNotUsed = pgerror.Newf(
pgcode.SavepointException,
"savepoint %s has not been used", RestartSavepointName)

// execStmt executes one statement by dispatching according to the current
// state. Returns an Event to be passed to the state machine, or nil if no
// transition is needed. If nil is returned, then the cursor is supposed to
Expand Down Expand Up @@ -233,62 +225,19 @@ func (ex *connExecutor) execStmtInOpenState(
ev, payload := ex.commitSQLTransaction(ctx, stmt.AST)
return ev, payload, nil

case *tree.ReleaseSavepoint:
if err := ex.validateSavepointName(s.Savepoint); err != nil {
return makeErrEvent(err)
}
if !ex.machine.CurState().(stateOpen).RetryIntent.Get() {
return makeErrEvent(errSavepointNotUsed)
}

// ReleaseSavepoint is executed fully here; there's no plan for it.
ev, payload := ex.commitSQLTransaction(ctx, stmt.AST)
res.ResetStmtType((*tree.CommitTransaction)(nil))
return ev, payload, nil

case *tree.RollbackTransaction:
// RollbackTransaction is executed fully here; there's no plan for it.
ev, payload := ex.rollbackSQLTransaction(ctx)
return ev, payload, nil

case *tree.Savepoint:
// Ensure that the user isn't trying to run BEGIN; SAVEPOINT; SAVEPOINT;
if ex.state.activeSavepointName != "" {
err := unimplemented.NewWithIssueDetail(10735, "nested", "SAVEPOINT may not be nested")
return makeErrEvent(err)
}
if err := ex.validateSavepointName(s.Name); err != nil {
return makeErrEvent(err)
}
// We want to disallow SAVEPOINTs to be issued after a KV transaction has
// started running. The client txn's statement count indicates how many
// statements have been executed as part of this transaction. It is
// desirable to allow metadata queries against vtables to proceed
// before starting a SAVEPOINT for better ORM compatibility.
// See also:
// https://github.com/cockroachdb/cockroach/issues/15012
if ex.state.mu.txn.Active() {
err := pgerror.Newf(pgcode.Syntax,
"SAVEPOINT %s needs to be the first statement in a "+
"transaction", RestartSavepointName)
return makeErrEvent(err)
}
ex.state.activeSavepointName = s.Name
// Note that Savepoint doesn't have a corresponding plan node.
// This here is all the execution there is.
return eventRetryIntentSet{}, nil /* payload */, nil
return ex.execSavepointInOpenState(ctx, s, res)

case *tree.RollbackToSavepoint:
if err := ex.validateSavepointName(s.Savepoint); err != nil {
return makeErrEvent(err)
}
if !os.RetryIntent.Get() {
return makeErrEvent(errSavepointNotUsed)
}
ex.state.activeSavepointName = ""
case *tree.ReleaseSavepoint:
return ex.execReleaseSavepointInOpenState(ctx, s, res)

res.ResetStmtType((*tree.Savepoint)(nil))
return eventTxnRestart{}, nil /* payload */, nil
case *tree.RollbackToSavepoint:
return ex.execRollbackToSavepointInOpenState(ctx, s, res)

case *tree.Prepare:
// This is handling the SQL statement "PREPARE". See execPrepare for
Expand Down Expand Up @@ -614,25 +563,32 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error
return retryErr
}

// commitSQLTransaction executes a commit after the execution of a stmt,
// which can be any statement when executing a statement with an implicit
// transaction, or a COMMIT or RELEASE SAVEPOINT statement when using
// an explicit transaction.
// commitSQLTransaction executes a commit after the execution of a
// stmt, which can be any statement when executing a statement with an
// implicit transaction, or a COMMIT statement when using an explicit
// transaction.
func (ex *connExecutor) commitSQLTransaction(
ctx context.Context, stmt tree.Statement,
) (fsm.Event, fsm.EventPayload) {
ex.state.activeSavepointName = ""
isRelease := false
if _, ok := stmt.(*tree.ReleaseSavepoint); ok {
isRelease = true
}
ev, payload, _ := ex.commitSQLTransactionInternal(ctx, stmt)
return ev, payload
}

// commitSQLTransactionInternal is the part of a commit common to
// commitSQLTransaction and runReleaseRestartSavepointAsTxnCommit.
func (ex *connExecutor) commitSQLTransactionInternal(
ctx context.Context, stmt tree.Statement,
) (ev fsm.Event, payload fsm.EventPayload, ok bool) {
ex.clearSavepoints()

if err := ex.checkTableTwoVersionInvariant(ctx); err != nil {
return ex.makeErrEvent(err, stmt)
ev, payload = ex.makeErrEvent(err, stmt)
return ev, payload, false
}

if err := ex.state.mu.txn.Commit(ctx); err != nil {
return ex.makeErrEvent(err, stmt)
ev, payload = ex.makeErrEvent(err, stmt)
return ev, payload, false
}

// Now that we've committed, if we modified any table we need to make sure
Expand All @@ -642,16 +598,14 @@ func (ex *connExecutor) commitSQLTransaction(
ex.extraTxnState.tables.releaseLeases(ctx)
}

if !isRelease {
return eventTxnFinish{}, eventTxnFinishPayload{commit: true}
}
return eventTxnReleased{}, nil
return eventTxnFinish{}, eventTxnFinishPayload{commit: true}, true
}

// rollbackSQLTransaction executes a ROLLBACK statement: the KV transaction is
// rolled-back and an event is produced.
func (ex *connExecutor) rollbackSQLTransaction(ctx context.Context) (fsm.Event, fsm.EventPayload) {
ex.state.activeSavepointName = ""
ex.clearSavepoints()

if err := ex.state.mu.txn.Rollback(ctx); err != nil {
log.Warningf(ctx, "txn rollback failed: %s", err)
}
Expand Down Expand Up @@ -1007,85 +961,25 @@ func (ex *connExecutor) execStmtInAbortedState(
ev, payload := ex.rollbackSQLTransaction(ctx)
return ev, payload
}
ex.state.activeSavepointName = ""
ex.clearSavepoints()

// Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too.
res.ResetStmtType((*tree.RollbackTransaction)(nil))

return eventTxnFinish{}, eventTxnFinishPayload{commit: false}
case *tree.RollbackToSavepoint, *tree.Savepoint:
// We accept both the "ROLLBACK TO SAVEPOINT cockroach_restart" and the
// "SAVEPOINT cockroach_restart" commands to indicate client intent to
// retry a transaction in a RestartWait state.
var spName tree.Name
var isRollback bool
switch n := s.(type) {
case *tree.RollbackToSavepoint:
spName = n.Savepoint
isRollback = true
case *tree.Savepoint:
spName = n.Name
default:
panic("unreachable")
}
// If the user issued a SAVEPOINT in the abort state, validate
// as though there were no active savepoint.
if !isRollback {
ex.state.activeSavepointName = ""
}
if err := ex.validateSavepointName(spName); err != nil {
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{
err: err,
}
return ev, payload
}
// Either clear or reset the current savepoint name so that
// ROLLBACK TO; SAVEPOINT; works.
if isRollback {
ex.state.activeSavepointName = ""
} else {
ex.state.activeSavepointName = spName
}

if !(inRestartWait || ex.machine.CurState().(stateAborted).RetryIntent.Get()) {
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{
err: errSavepointNotUsed,
}
return ev, payload
}
case *tree.RollbackToSavepoint:
return ex.execRollbackToSavepointInAbortedState(ctx, inRestartWait, s, res)

res.ResetStmtType((*tree.RollbackTransaction)(nil))
case *tree.Savepoint:
return ex.execSavepointInAbortedState(ctx, inRestartWait, s, res)

if inRestartWait {
return eventTxnRestart{}, nil
}
// We accept ROLLBACK TO SAVEPOINT even after non-retryable errors to make
// it easy for client libraries that want to indiscriminately issue
// ROLLBACK TO SAVEPOINT after every error and possibly follow it with a
// ROLLBACK and also because we accept ROLLBACK TO SAVEPOINT in the Open
// state, so this is consistent.
// We start a new txn with the same sql timestamp and isolation as the
// current one.

ev := eventTxnStart{
ImplicitTxn: fsm.False,
}
rwMode := tree.ReadWrite
if ex.state.readOnly {
rwMode = tree.ReadOnly
}
payload := makeEventTxnStartPayload(
ex.state.priority, rwMode, ex.state.sqlTimestamp,
nil /* historicalTimestamp */, ex.transitionCtx)
return ev, payload
default:
ev := eventNonRetriableErr{IsCommit: fsm.False}
if inRestartWait {
payload := eventNonRetriableErrPayload{
err: sqlbase.NewTransactionAbortedError(
"Expected \"ROLLBACK TO SAVEPOINT COCKROACH_RESTART\"" /* customMsg */),
"Expected \"ROLLBACK TO SAVEPOINT cockroach_restart\"" /* customMsg */),
}
return ev, payload
}
Expand Down Expand Up @@ -1328,28 +1222,6 @@ func payloadHasError(payload fsm.EventPayload) bool {
return hasErr
}

// validateSavepointName validates that it is that the provided ident
// matches the active savepoint name, begins with RestartSavepointName,
// or that force_savepoint_restart==true. We accept everything with the
// desired prefix because at least the C++ libpqxx appends sequence
// numbers to the savepoint name specified by the user.
func (ex *connExecutor) validateSavepointName(savepoint tree.Name) error {
if ex.state.activeSavepointName != "" {
if savepoint == ex.state.activeSavepointName {
return nil
}
return pgerror.Newf(pgcode.InvalidSavepointSpecification,
`SAVEPOINT %q is in use`, tree.ErrString(&ex.state.activeSavepointName))
}
if !ex.sessionData.ForceSavepointRestart && !strings.HasPrefix(string(savepoint), RestartSavepointName) {
return unimplemented.NewWithIssueHint(10735,
"SAVEPOINT not supported except for "+RestartSavepointName,
"Retryable transactions with arbitrary SAVEPOINT names can be enabled "+
"with SET force_savepoint_restart=true")
}
return nil
}

// recordTransactionStart records the start of the transaction and returns a
// closure to be called once the transaction finishes.
func (ex *connExecutor) recordTransactionStart() func(txnEvent) {
Expand Down
Loading

0 comments on commit afc823e

Please sign in to comment.