Skip to content

Commit

Permalink
sql: set the clean-up steps for pausable portal
Browse files Browse the repository at this point in the history
This is part of the implementation of multiple active portals.

To enable executing portals interleavingly, we need to persist certain resources
for it, and delay their clean-up till we close the portal. Also, these resources
don't need to be re-setup when we re-executing a portal.

Thus we stores these cleanup steps in the `__Cleanup` function stacks in
`portalPauseInfo`, and they are called when 1. sql txn is commited; 2. sql txn
is rollbacked; 3. conn executor is closed.

The cleanup functions should be called according to the original order of a normal
portal. Since a portal's execution is via the `execPortal() -> execStmtInOpenState
() -> dispatchToExecutionEngine() ->  flow.Run()` function flow, we categorized the
cleanup functions accordingly into
4 "layers": `exhaustPortal`, `execStmtCleanup` `dispatchToExecEngCleanup` and
`flowCleanup`. The cleanup
is always LIFO, i.e. following the `flowCleanup -> dispatchToExecEngCleanup ->
execStmtCleanup -> exhaustPortal` order. Also, when there's error happens in each
layer, cleanup the current and proceeding layers. e.g. if we encounter an error in
`execStmtInOpenState()`, do `flowCleanup` and `dispatchToExecEngCleanup`
(proceeding) and then `execStmtCleanup` (current), and return the
error to `execPortal()`, where `exhaustPortal` will eventually be called.

We also pass as reference the PreparedPortal to the planner in
`execStmtInOpenState()`, so that the portal's flow can be set and reused.

Release note: None
  • Loading branch information
ZhouXing19 committed Mar 21, 2023
1 parent c5d8fcc commit d25b624
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 98 deletions.
51 changes: 36 additions & 15 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,14 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
txnEvType = txnRollback
}

// Close all portals, otherwise there will be leftover bytes.
ex.extraTxnState.prepStmtsNamespace.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)

if closeType == normalClose {
// We'll cleanup the SQL txn by creating a non-retriable (commit:true) event.
// This event is guaranteed to be accepted in every state.
Expand Down Expand Up @@ -1670,6 +1678,15 @@ func (ns prepStmtNamespace) HasPortal(s string) bool {
return ok
}

func (ns prepStmtNamespace) closeAllPortals(
ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount,
) {
for name, p := range ns.portals {
p.close(ctx, prepStmtsNamespaceMemAcc, name)
delete(ns.portals, name)
}
}

// MigratablePreparedStatements returns a mapping of all prepared statements.
func (ns prepStmtNamespace) MigratablePreparedStatements() []sessiondatapb.MigratableSession_PreparedStatement {
ret := make([]sessiondatapb.MigratableSession_PreparedStatement, 0, len(ns.prepStmts))
Expand Down Expand Up @@ -1721,10 +1738,8 @@ func (ns *prepStmtNamespace) resetTo(
p.decRef(ctx)
delete(ns.prepStmts, name)
}
for name, p := range ns.portals {
p.close(ctx, prepStmtsNamespaceMemAcc, name)
delete(ns.portals, name)
}

ns.closeAllPortals(ctx, prepStmtsNamespaceMemAcc)

for name, ps := range to.prepStmts {
ps.incRef(ctx)
Expand Down Expand Up @@ -1761,10 +1776,9 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {
}

// Close all portals.
for name, p := range ex.extraTxnState.prepStmtsNamespace.portals {
p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}
ex.extraTxnState.prepStmtsNamespace.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)

// Close all cursors.
if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil {
Expand All @@ -1775,10 +1789,9 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {

switch ev.eventType {
case txnCommit, txnRollback:
for name, p := range ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals {
p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
delete(ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals, name)
}
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.savepoints.clear()
ex.onTxnFinish(ctx, ev)
case txnRestart:
Expand Down Expand Up @@ -1925,7 +1938,6 @@ func (ex *connExecutor) run(
return err
}
}

}

// errDrainingComplete is returned by execCmd when the connExecutor previously got
Expand Down Expand Up @@ -1997,7 +2009,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
(tcmd.LastInBatchBeforeShowCommitTimestamp ||
tcmd.LastInBatch || !implicitTxnForBatch)
ev, payload, err = ex.execStmt(
ctx, tcmd.Statement, nil /* prepared */, nil /* pinfo */, stmtRes, canAutoCommit,
ctx, tcmd.Statement, nil /* portal */, nil /* pinfo */, stmtRes, canAutoCommit,
)

return err
Expand Down Expand Up @@ -2067,6 +2079,9 @@ func (ex *connExecutor) execCmd() (retErr error) {
ex.implicitTxn(),
portal.portalPausablity,
)
if portal.pauseInfo != nil {
portal.pauseInfo.curRes = stmtRes
}
res = stmtRes

// In the extended protocol, autocommit is not always allowed. The postgres
Expand All @@ -2085,6 +2100,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
// - ex.statsCollector merely contains a copy of the times, that
// was created when the statement started executing (via the
// reset() method).
// TODO(sql-sessions): fix the phase time for pausable portals.
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionQueryServiced, timeutil.Now())
if err != nil {
return err
Expand Down Expand Up @@ -3486,8 +3502,13 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}

fallthrough
case txnRestart, txnRollback:
case txnRestart:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
case txnRollback:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ex.Ctx(), &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
default:
return advanceInfo{}, errors.AssertionFailedf(
"unexpected event: %v", errors.Safe(advInfo.txnEvent))
Expand Down
Loading

0 comments on commit d25b624

Please sign in to comment.