diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 5afa6b6fe3e1..b0ba5592c30c 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -250,7 +250,6 @@ sql.multiple_modifications_of_table.enabled boolean false if true, allow stateme
sql.multiregion.drop_primary_region.enabled boolean true allows dropping the PRIMARY REGION of a database if it is the last region tenant-rw
sql.notices.enabled boolean true enable notices in the server/client protocol being sent tenant-rw
sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability tenant-rw
-sql.pgwire.multiple_active_portals.enabled boolean false if true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan tenant-rw
sql.schema.telemetry.recurrence string @weekly cron-tab recurrence for SQL schema telemetry job tenant-ro
sql.show_ranges_deprecated_behavior.enabled boolean true if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES. tenant-rw
sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators tenant-rw
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index a392b0ef9fc9..24da9c53edae 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -202,7 +202,6 @@
sql.multiregion.drop_primary_region.enabled
| boolean | true | allows dropping the PRIMARY REGION of a database if it is the last region | Serverless/Dedicated/Self-Hosted |
sql.notices.enabled
| boolean | true | enable notices in the server/client protocol being sent | Serverless/Dedicated/Self-Hosted |
sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled
| boolean | false | if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability | Serverless/Dedicated/Self-Hosted |
-sql.pgwire.multiple_active_portals.enabled
| boolean | false | if true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan | Serverless/Dedicated/Self-Hosted |
sql.schema.telemetry.recurrence
| string | @weekly | cron-tab recurrence for SQL schema telemetry job | Serverless/Dedicated/Self-Hosted (read-only) |
sql.show_ranges_deprecated_behavior.enabled
| boolean | true | if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES. | Serverless/Dedicated/Self-Hosted |
sql.spatial.experimental_box2d_comparison_operators.enabled
| boolean | false | enables the use of certain experimental box2d comparison operators | Serverless/Dedicated/Self-Hosted |
diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel
index 252322f37995..ddb4e1245896 100644
--- a/pkg/server/BUILD.bazel
+++ b/pkg/server/BUILD.bazel
@@ -252,7 +252,6 @@ go_library(
"//pkg/sql/sqlstats/insights",
"//pkg/sql/sqlstats/persistedsqlstats",
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil",
- "//pkg/sql/sqltelemetry",
"//pkg/sql/stats",
"//pkg/sql/stmtdiagnostics",
"//pkg/sql/syntheticprivilege",
diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go
index ce63e406b18a..0083e4fb14ca 100644
--- a/pkg/server/server_sql.go
+++ b/pkg/server/server_sql.go
@@ -58,7 +58,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher"
- "github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/server/tracedumper"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -102,7 +101,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
- "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache"
@@ -1369,12 +1367,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
vmoduleSetting.SetOnChange(&cfg.Settings.SV, fn)
fn(ctx)
- sql.EnableMultipleActivePortals.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
- if sql.EnableMultipleActivePortals.Get(&cfg.Settings.SV) {
- telemetry.Inc(sqltelemetry.MultipleActivePortalCounter)
- }
- })
-
return &SQLServer{
ambientCtx: cfg.BaseConfig.AmbientCtx,
stopper: cfg.stopper,
diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go
index e170277690c4..d22e84a0e375 100644
--- a/pkg/sql/apply_join.go
+++ b/pkg/sql/apply_join.go
@@ -320,6 +320,10 @@ func runPlanInsidePlan(
// Make a copy of the EvalContext so it can be safely modified.
evalCtx := params.p.ExtendedEvalContextCopy()
plannerCopy := *params.p
+ // If we reach this part when re-executing a pausable portal, we won't want to
+ // resume the flow bound to it. The inner-plan should have its own lifecycle
+ // for its flow.
+ plannerCopy.pausablePortal = nil
distributePlan := getPlanDistribution(
ctx, plannerCopy.Descriptors().HasUncommittedTypes(),
plannerCopy.SessionData().DistSQLMode, plan.main,
diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go
index 6b23641e4b76..896455d1e5e3 100644
--- a/pkg/sql/conn_executor.go
+++ b/pkg/sql/conn_executor.go
@@ -1138,6 +1138,14 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
txnEvType = txnRollback
}
+ // Close all portals, otherwise there will be leftover bytes.
+ ex.extraTxnState.prepStmtsNamespace.closeAllPortals(
+ ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
+ )
+ ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
+ ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
+ )
+
if closeType == normalClose {
// We'll cleanup the SQL txn by creating a non-retriable (commit:true) event.
// This event is guaranteed to be accepted in every state.
@@ -1760,6 +1768,26 @@ func (ns *prepStmtNamespace) touchLRUEntry(name string) {
ns.addLRUEntry(name, 0)
}
+func (ns *prepStmtNamespace) closeAllPortals(
+ ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount,
+) {
+ for name, p := range ns.portals {
+ p.close(ctx, prepStmtsNamespaceMemAcc, name)
+ delete(ns.portals, name)
+ }
+}
+
+func (ns *prepStmtNamespace) closeAllPausablePortals(
+ ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount,
+) {
+ for name, p := range ns.portals {
+ if p.pauseInfo != nil {
+ p.close(ctx, prepStmtsNamespaceMemAcc, name)
+ delete(ns.portals, name)
+ }
+ }
+}
+
// MigratablePreparedStatements returns a mapping of all prepared statements.
func (ns *prepStmtNamespace) MigratablePreparedStatements() []sessiondatapb.MigratableSession_PreparedStatement {
ret := make([]sessiondatapb.MigratableSession_PreparedStatement, 0, len(ns.prepStmts))
@@ -1836,10 +1864,7 @@ func (ns *prepStmtNamespace) resetTo(
for name := range ns.prepStmtsLRU {
delete(ns.prepStmtsLRU, name)
}
- for name, p := range ns.portals {
- p.close(ctx, prepStmtsNamespaceMemAcc, name)
- delete(ns.portals, name)
- }
+ ns.closeAllPortals(ctx, prepStmtsNamespaceMemAcc)
for name, ps := range to.prepStmts {
ps.incRef(ctx)
@@ -1880,10 +1905,9 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {
}
// Close all portals.
- for name, p := range ex.extraTxnState.prepStmtsNamespace.portals {
- p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
- delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
- }
+ ex.extraTxnState.prepStmtsNamespace.closeAllPortals(
+ ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
+ )
// Close all cursors.
if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil {
@@ -1894,10 +1918,9 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {
switch ev.eventType {
case txnCommit, txnRollback:
- for name, p := range ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals {
- p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
- delete(ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals, name)
- }
+ ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
+ ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
+ )
ex.extraTxnState.savepoints.clear()
ex.onTxnFinish(ctx, ev)
case txnRestart:
@@ -2044,7 +2067,6 @@ func (ex *connExecutor) run(
return err
}
}
-
}
// errDrainingComplete is returned by execCmd when the connExecutor previously got
@@ -2116,7 +2138,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
(tcmd.LastInBatchBeforeShowCommitTimestamp ||
tcmd.LastInBatch || !implicitTxnForBatch)
ev, payload, err = ex.execStmt(
- ctx, tcmd.Statement, nil /* prepared */, nil /* pinfo */, stmtRes, canAutoCommit,
+ ctx, tcmd.Statement, nil /* portal */, nil /* pinfo */, stmtRes, canAutoCommit,
)
return err
@@ -2173,6 +2195,12 @@ func (ex *connExecutor) execCmd() (retErr error) {
Values: portal.Qargs,
}
+ // If this is the first-time execution of a portal without a limit set,
+ // it means all rows will be exhausted, so no need to pause this portal.
+ if tcmd.Limit == 0 && portal.pauseInfo != nil && portal.pauseInfo.curRes == nil {
+ portal.pauseInfo = nil
+ }
+
stmtRes := ex.clientComm.CreateStatementResult(
portal.Stmt.AST,
// The client is using the extended protocol, so no row description is
@@ -2186,6 +2214,9 @@ func (ex *connExecutor) execCmd() (retErr error) {
ex.implicitTxn(),
portal.portalPausablity,
)
+ if portal.pauseInfo != nil {
+ portal.pauseInfo.curRes = stmtRes
+ }
res = stmtRes
// In the extended protocol, autocommit is not always allowed. The postgres
@@ -2204,6 +2235,8 @@ func (ex *connExecutor) execCmd() (retErr error) {
// - ex.statsCollector merely contains a copy of the times, that
// was created when the statement started executing (via the
// reset() method).
+ // TODO(sql-sessions): fix the phase time for pausable portals.
+ // https://github.com/cockroachdb/cockroach/issues/99410
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionQueryServiced, timeutil.Now())
if err != nil {
return err
@@ -2313,9 +2346,26 @@ func (ex *connExecutor) execCmd() (retErr error) {
var advInfo advanceInfo
+ // We close all pausable portals when we encounter err payload, otherwise
+ // there will be leftover bytes.
+ shouldClosePausablePortals := func(payload fsm.EventPayload) bool {
+ switch payload.(type) {
+ case eventNonRetriableErrPayload, eventRetriableErrPayload:
+ return true
+ default:
+ return false
+ }
+ }
+
// If an event was generated, feed it to the state machine.
if ev != nil {
var err error
+ if shouldClosePausablePortals(payload) {
+ // We need this as otherwise, there'll be leftover bytes when
+ // txnState.finishSQLTxn() is being called, as the underlying resources of
+ // pausable portals hasn't been cleared yet.
+ ex.extraTxnState.prepStmtsNamespace.closeAllPausablePortals(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc)
+ }
advInfo, err = ex.txnStateTransitionsApplyWrapper(ev, payload, res, pos)
if err != nil {
return err
@@ -2366,6 +2416,16 @@ func (ex *connExecutor) execCmd() (retErr error) {
res.SetError(pe.errorCause())
}
}
+ // For a pausable portal, we don't log the affected rows until we close the
+ // portal. However, we update the result for each execution. Thus, we need
+ // to accumulate the number of affected rows before closing the result.
+ if tcmd, ok := cmd.(*ExecPortal); ok {
+ if portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[tcmd.Name]; ok {
+ if portal.pauseInfo != nil {
+ portal.pauseInfo.dispatchToExecutionEngine.rowsAffected += res.(RestrictedCommandResult).RowsAffected()
+ }
+ }
+ }
res.Close(ctx, stateToTxnStatusIndicator(ex.machine.CurState()))
} else {
res.Discard()
@@ -3598,6 +3658,11 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
fallthrough
case txnRollback:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
+ // Since we're doing a complete rollback, there's no need to keep the
+ // prepared stmts for a txn rewind.
+ ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
+ ex.Ctx(), &ex.extraTxnState.prepStmtsNamespaceMemAcc,
+ )
case txnRestart:
// In addition to resetting the extraTxnState, the restart event may
// also need to reset the sqlliveness.Session.
diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go
index 5729876dc6d2..c9d32d6275b1 100644
--- a/pkg/sql/conn_executor_exec.go
+++ b/pkg/sql/conn_executor_exec.go
@@ -95,7 +95,7 @@ const numTxnRetryErrors = 3
func (ex *connExecutor) execStmt(
ctx context.Context,
parserStmt parser.Statement,
- prepared *PreparedStatement,
+ portal *PreparedPortal,
pinfo *tree.PlaceholderInfo,
res RestrictedCommandResult,
canAutoCommit bool,
@@ -133,8 +133,12 @@ func (ex *connExecutor) execStmt(
ev, payload = ex.execStmtInNoTxnState(ctx, ast, res)
case stateOpen:
- err = ex.execWithProfiling(ctx, ast, prepared, func(ctx context.Context) error {
- ev, payload, err = ex.execStmtInOpenState(ctx, parserStmt, prepared, pinfo, res, canAutoCommit)
+ var preparedStmt *PreparedStatement
+ if portal != nil {
+ preparedStmt = portal.Stmt
+ }
+ err = ex.execWithProfiling(ctx, ast, preparedStmt, func(ctx context.Context) error {
+ ev, payload, err = ex.execStmtInOpenState(ctx, parserStmt, portal, pinfo, res, canAutoCommit)
return err
})
switch ev.(type) {
@@ -201,7 +205,23 @@ func (ex *connExecutor) execPortal(
stmtRes CommandResult,
pinfo *tree.PlaceholderInfo,
canAutoCommit bool,
-) (ev fsm.Event, payload fsm.EventPayload, err error) {
+) (ev fsm.Event, payload fsm.EventPayload, retErr error) {
+ defer func() {
+ if portal.isPausable() {
+ if !portal.pauseInfo.exhaustPortal.cleanup.isComplete {
+ portal.pauseInfo.exhaustPortal.cleanup.appendFunc(namedFunc{fName: "exhaust portal", f: func() {
+ ex.exhaustPortal(portalName)
+ }})
+ portal.pauseInfo.exhaustPortal.cleanup.isComplete = true
+ }
+ // If we encountered an error when executing a pausable portal, clean up
+ // the retained resources.
+ if retErr != nil {
+ portal.pauseInfo.cleanupAll()
+ }
+ }
+ }()
+
switch ex.machine.CurState().(type) {
case stateOpen:
// We're about to execute the statement in an open state which
@@ -223,23 +243,19 @@ func (ex *connExecutor) execPortal(
if portal.exhausted {
return nil, nil, nil
}
- ev, payload, err = ex.execStmt(ctx, portal.Stmt.Statement, portal.Stmt, pinfo, stmtRes, canAutoCommit)
- // Portal suspension is supported via a "side" state machine
- // (see pgwire.limitedCommandResult for details), so when
- // execStmt returns, we know for sure that the portal has been
- // executed to completion, thus, it is exhausted.
- // Note that the portal is considered exhausted regardless of
- // the fact whether an error occurred or not - if it did, we
- // still don't want to re-execute the portal from scratch.
+ ev, payload, retErr = ex.execStmt(ctx, portal.Stmt.Statement, &portal, pinfo, stmtRes, canAutoCommit)
+ // For a non-pausable portal, it is considered exhausted regardless of the
+ // fact whether an error occurred or not - if it did, we still don't want
+ // to re-execute the portal from scratch.
// The current statement may have just closed and deleted the portal,
// so only exhaust it if it still exists.
- if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok {
- ex.exhaustPortal(portalName)
+ if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok && !portal.isPausable() {
+ defer ex.exhaustPortal(portalName)
}
- return ev, payload, err
+ return ev, payload, retErr
default:
- return ex.execStmt(ctx, portal.Stmt.Statement, portal.Stmt, pinfo, stmtRes, canAutoCommit)
+ return ex.execStmt(ctx, portal.Stmt.Statement, &portal, pinfo, stmtRes, canAutoCommit)
}
}
@@ -259,17 +275,86 @@ func (ex *connExecutor) execPortal(
func (ex *connExecutor) execStmtInOpenState(
ctx context.Context,
parserStmt parser.Statement,
- prepared *PreparedStatement,
+ portal *PreparedPortal,
pinfo *tree.PlaceholderInfo,
res RestrictedCommandResult,
canAutoCommit bool,
) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) {
- ctx, sp := tracing.EnsureChildSpan(ctx, ex.server.cfg.AmbientCtx.Tracer, "sql query")
- // TODO(andrei): Consider adding the placeholders as tags too.
- sp.SetTag("statement", attribute.StringValue(parserStmt.SQL))
- defer sp.Finish()
+ // We need this to be function rather than a static bool, because a portal's
+ // "pausability" can be revoked in `dispatchToExecutionEngine()` if the
+ // underlying statement contains sub/post queries. Thus, we should evaluate
+ // whether a portal is pausable when executing the cleanup step.
+ isPausablePortal := func() bool { return portal != nil && portal.isPausable() }
+ // updateRetErrAndPayload ensures that the latest event payload and error is
+ // always recorded by portal.pauseInfo.
+ // TODO(janexing): add test for this.
+ updateRetErrAndPayload := func(err error, payload fsm.EventPayload) {
+ retPayload = payload
+ retErr = err
+ if isPausablePortal() {
+ portal.pauseInfo.execStmtInOpenState.retPayload = payload
+ portal.pauseInfo.execStmtInOpenState.retErr = err
+ }
+ }
+ // For pausable portals, we delay the clean-up until closing the portal by
+ // adding the function to the execStmtInOpenStateCleanup.
+ // Otherwise, perform the clean-up step within every execution.
+ processCleanupFunc := func(fName string, f func()) {
+ if !isPausablePortal() {
+ f()
+ } else if !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete {
+ portal.pauseInfo.execStmtInOpenState.cleanup.appendFunc(namedFunc{
+ fName: fName,
+ f: func() {
+ f()
+ // Some cleanup steps modify the retErr and retPayload. We need to
+ // ensure that cleanup after them can see the update.
+ updateRetErrAndPayload(retErr, retPayload)
+ },
+ })
+ }
+ }
+ defer func() {
+ // This is the first defer, so it will always be called after any cleanup
+ // func being added to the stack from the defers below.
+ if isPausablePortal() && !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete {
+ portal.pauseInfo.execStmtInOpenState.cleanup.isComplete = true
+ }
+ // If there's any error, do the cleanup right here.
+ if (retErr != nil || payloadHasError(retPayload)) && isPausablePortal() {
+ updateRetErrAndPayload(retErr, retPayload)
+ portal.pauseInfo.resumableFlow.cleanup.run()
+ portal.pauseInfo.dispatchToExecutionEngine.cleanup.run()
+ portal.pauseInfo.execStmtInOpenState.cleanup.run()
+ }
+ }()
+
+ // We need this part so that when we check if we need to increment the count
+ // of executed stmt, we are checking the latest error and payload. Otherwise,
+ // we would be checking the ones evaluated at the portal's first-time
+ // execution.
+ defer func() {
+ if isPausablePortal() {
+ updateRetErrAndPayload(retErr, retPayload)
+ }
+ }()
+
ast := parserStmt.AST
- ctx = withStatement(ctx, ast)
+ var sp *tracing.Span
+ if !isPausablePortal() || !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete {
+ ctx, sp = tracing.EnsureChildSpan(ctx, ex.server.cfg.AmbientCtx.Tracer, "sql query")
+ // TODO(andrei): Consider adding the placeholders as tags too.
+ sp.SetTag("statement", attribute.StringValue(parserStmt.SQL))
+ ctx = withStatement(ctx, ast)
+ if isPausablePortal() {
+ portal.pauseInfo.execStmtInOpenState.spCtx = ctx
+ }
+ defer func() {
+ processCleanupFunc("cleanup span", sp.Finish)
+ }()
+ } else {
+ ctx = portal.pauseInfo.execStmtInOpenState.spCtx
+ }
makeErrEvent := func(err error) (fsm.Event, fsm.EventPayload, error) {
ev, payload := ex.makeErrEvent(err, ast)
@@ -277,7 +362,17 @@ func (ex *connExecutor) execStmtInOpenState(
}
var stmt Statement
- queryID := ex.generateID()
+ var queryID clusterunique.ID
+
+ if isPausablePortal() {
+ if !portal.pauseInfo.isQueryIDSet() {
+ portal.pauseInfo.execStmtInOpenState.queryID = ex.generateID()
+ }
+ queryID = portal.pauseInfo.execStmtInOpenState.queryID
+ } else {
+ queryID = ex.generateID()
+ }
+
// Update the deadline on the transaction based on the collections.
err := ex.extraTxnState.descCollection.MaybeUpdateDeadline(ctx, ex.state.mu.txn)
if err != nil {
@@ -285,39 +380,62 @@ func (ex *connExecutor) execStmtInOpenState(
}
os := ex.machine.CurState().(stateOpen)
- isExtendedProtocol := prepared != nil
+ isExtendedProtocol := portal != nil && portal.Stmt != nil
if isExtendedProtocol {
- stmt = makeStatementFromPrepared(prepared, queryID)
+ stmt = makeStatementFromPrepared(portal.Stmt, queryID)
} else {
stmt = makeStatement(parserStmt, queryID)
}
- ex.incrementStartedStmtCounter(ast)
- defer func() {
- if retErr == nil && !payloadHasError(retPayload) {
- ex.incrementExecutedStmtCounter(ast)
- }
- }()
-
- func(st *txnState) {
- st.mu.Lock()
- defer st.mu.Unlock()
- st.mu.stmtCount++
- }(&ex.state)
-
var queryTimeoutTicker *time.Timer
var txnTimeoutTicker *time.Timer
queryTimedOut := false
txnTimedOut := false
-
// queryDoneAfterFunc and txnDoneAfterFunc will be allocated only when
// queryTimeoutTicker or txnTimeoutTicker is non-nil.
var queryDoneAfterFunc chan struct{}
var txnDoneAfterFunc chan struct{}
var cancelQuery context.CancelFunc
- ctx, cancelQuery = contextutil.WithCancel(ctx)
- ex.addActiveQuery(parserStmt, pinfo, queryID, cancelQuery)
+ addActiveQuery := func() {
+ ctx, cancelQuery = contextutil.WithCancel(ctx)
+ ex.incrementStartedStmtCounter(ast)
+ func(st *txnState) {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+ st.mu.stmtCount++
+ }(&ex.state)
+ ex.addActiveQuery(parserStmt, pinfo, queryID, cancelQuery)
+ }
+
+ // For pausable portal, the active query needs to be set up only when
+ // the portal is executed for the first time.
+ if !isPausablePortal() || !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete {
+ addActiveQuery()
+ if isPausablePortal() {
+ portal.pauseInfo.execStmtInOpenState.cancelQueryFunc = cancelQuery
+ portal.pauseInfo.execStmtInOpenState.cancelQueryCtx = ctx
+ }
+ defer func() {
+ processCleanupFunc(
+ "increment executed stmt cnt",
+ func() {
+ // We need to check the latest errors rather than the ones evaluated
+ // when this function is created.
+ if isPausablePortal() {
+ retErr = portal.pauseInfo.execStmtInOpenState.retErr
+ retPayload = portal.pauseInfo.execStmtInOpenState.retPayload
+ }
+ if retErr == nil && !payloadHasError(retPayload) {
+ ex.incrementExecutedStmtCounter(ast)
+ }
+ },
+ )
+ }()
+ } else {
+ ctx = portal.pauseInfo.execStmtInOpenState.cancelQueryCtx
+ cancelQuery = portal.pauseInfo.execStmtInOpenState.cancelQueryFunc
+ }
// Make sure that we always unregister the query. It also deals with
// overwriting res.Error to a more user-friendly message in case of query
@@ -338,25 +456,47 @@ func (ex *connExecutor) execStmtInOpenState(
}
}
- // Detect context cancelation and overwrite whatever error might have been
- // set on the result before. The idea is that once the query's context is
- // canceled, all sorts of actors can detect the cancelation and set all
- // sorts of errors on the result. Rather than trying to impose discipline
- // in that jungle, we just overwrite them all here with an error that's
- // nicer to look at for the client.
- if res != nil && ctx.Err() != nil && res.Err() != nil {
- // Even in the cases where the error is a retryable error, we want to
- // intercept the event and payload returned here to ensure that the query
- // is not retried.
- retEv = eventNonRetriableErr{
- IsCommit: fsm.FromBool(isCommit(ast)),
+ processCleanupFunc("cancel query", func() {
+ cancelQueryCtx := ctx
+ if isPausablePortal() {
+ cancelQueryCtx = portal.pauseInfo.execStmtInOpenState.cancelQueryCtx
}
- res.SetError(cancelchecker.QueryCanceledError)
- retPayload = eventNonRetriableErrPayload{err: cancelchecker.QueryCanceledError}
- }
+ resToPushErr := res
+ // For pausable portals, we retain the query but update the result for
+ // each execution. When the query context is cancelled and we're in the
+ // middle of an portal execution, push the error to the current result.
+ if isPausablePortal() {
+ resToPushErr = portal.pauseInfo.curRes
+ }
+ // Detect context cancelation and overwrite whatever error might have been
+ // set on the result before. The idea is that once the query's context is
+ // canceled, all sorts of actors can detect the cancelation and set all
+ // sorts of errors on the result. Rather than trying to impose discipline
+ // in that jungle, we just overwrite them all here with an error that's
+ // nicer to look at for the client.
+ if resToPushErr != nil && cancelQueryCtx.Err() != nil && resToPushErr.ErrAllowReleased() != nil {
+ // Even in the cases where the error is a retryable error, we want to
+ // intercept the event and payload returned here to ensure that the query
+ // is not retried.
+ retEv = eventNonRetriableErr{
+ IsCommit: fsm.FromBool(isCommit(ast)),
+ }
+ errToPush := cancelchecker.QueryCanceledError
+ // For pausable portal, we can arrive here after encountering a timeout
+ // error and then perform a query-cleanup step. In this case, we don't
+ // want to override the original timeout error with the query-cancelled
+ // error.
+ if isPausablePortal() && (errors.Is(resToPushErr.Err(), sqlerrors.QueryTimeoutError) ||
+ errors.Is(resToPushErr.Err(), sqlerrors.TxnTimeoutError)) {
+ errToPush = resToPushErr.Err()
+ }
+ resToPushErr.SetError(errToPush)
+ retPayload = eventNonRetriableErrPayload{err: errToPush}
+ }
+ ex.removeActiveQuery(queryID, ast)
+ cancelQuery()
+ })
- ex.removeActiveQuery(queryID, ast)
- cancelQuery()
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLActiveStatements.Dec(1)
}
@@ -392,6 +532,9 @@ func (ex *connExecutor) execStmtInOpenState(
ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1)
}
+ // TODO(sql-sessions): persist the planner for a pausable portal, and reuse
+ // it for each re-execution.
+ // https://github.com/cockroachdb/cockroach/issues/99625
p := &ex.planner
stmtTS := ex.server.cfg.Clock.PhysicalTime()
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
@@ -478,25 +621,62 @@ func (ex *connExecutor) execStmtInOpenState(
}
var needFinish bool
- ctx, needFinish = ih.Setup(
- ctx, ex.server.cfg, ex.statsCollector, p, ex.stmtDiagnosticsRecorder,
- stmt.StmtNoConstants, os.ImplicitTxn.Get(), ex.extraTxnState.shouldCollectTxnExecutionStats,
- )
+ // For pausable portal, the instrumentation helper needs to be set up only when
+ // the portal is executed for the first time.
+ if !isPausablePortal() || portal.pauseInfo.execStmtInOpenState.ihWrapper == nil {
+ ctx, needFinish = ih.Setup(
+ ctx, ex.server.cfg, ex.statsCollector, p, ex.stmtDiagnosticsRecorder,
+ stmt.StmtNoConstants, os.ImplicitTxn.Get(), ex.extraTxnState.shouldCollectTxnExecutionStats,
+ )
+ } else {
+ ctx = portal.pauseInfo.execStmtInOpenState.ihWrapper.ctx
+ }
+ // For pausable portals, we need to persist the instrumentationHelper as it
+ // shares the ctx with the underlying flow. If it got cleaned up before we
+ // clean up the flow, we will hit `span used after finished` whenever we log
+ // an event when cleaning up the flow.
+ // We need this seemingly weird wrapper here because we set the planner's ih
+ // with its pointer. However, for pausable portal, we'd like to persist the
+ // ih and reuse it for all re-executions. So the planner's ih and the portal's
+ // ih should never have the same address, otherwise changing the former will
+ // change the latter, and we will never be able to persist it.
+ if isPausablePortal() {
+ if portal.pauseInfo.execStmtInOpenState.ihWrapper == nil {
+ portal.pauseInfo.execStmtInOpenState.ihWrapper = &instrumentationHelperWrapper{
+ ctx: ctx,
+ ih: *ih,
+ }
+ } else {
+ p.instrumentation = portal.pauseInfo.execStmtInOpenState.ihWrapper.ih
+ }
+ }
if needFinish {
sql := stmt.SQL
defer func() {
- retErr = ih.Finish(
- ex.server.cfg,
- ex.statsCollector,
- &ex.extraTxnState.accumulatedStats,
- ih.collectExecStats,
- p,
- ast,
- sql,
- res,
- retPayload,
- retErr,
- )
+ processCleanupFunc("finish instrumentation helper", func() {
+ // We need this weird thing because we need to make sure we're closing
+ // the correct instrumentation helper for the paused portal.
+ ihToFinish := ih
+ curRes := res
+ if isPausablePortal() {
+ ihToFinish = &portal.pauseInfo.execStmtInOpenState.ihWrapper.ih
+ curRes = portal.pauseInfo.curRes
+ retErr = portal.pauseInfo.execStmtInOpenState.retErr
+ retPayload = portal.pauseInfo.execStmtInOpenState.retPayload
+ }
+ retErr = ihToFinish.Finish(
+ ex.server.cfg,
+ ex.statsCollector,
+ &ex.extraTxnState.accumulatedStats,
+ ihToFinish.collectExecStats,
+ p,
+ ast,
+ sql,
+ curRes,
+ retPayload,
+ retErr,
+ )
+ })
}()
}
@@ -562,6 +742,7 @@ func (ex *connExecutor) execStmtInOpenState(
if retEv != nil || retErr != nil {
return
}
+ // As portals are from extended protocol, we don't auto commit for them.
if canAutoCommit && !isExtendedProtocol {
retEv, retPayload = ex.handleAutoCommit(ctx, ast)
}
@@ -660,8 +841,13 @@ func (ex *connExecutor) execStmtInOpenState(
// For regular statements (the ones that get to this point), we
// don't return any event unless an error happens.
- if err := ex.handleAOST(ctx, ast); err != nil {
- return makeErrEvent(err)
+ // For a portal (prepared stmt), since handleAOST() is called when preparing
+ // the statement, and this function is idempotent, we don't need to
+ // call it again during execution.
+ if portal == nil {
+ if err := ex.handleAOST(ctx, ast); err != nil {
+ return makeErrEvent(err)
+ }
}
// The first order of business is to ensure proper sequencing
@@ -709,7 +895,9 @@ func (ex *connExecutor) execStmtInOpenState(
p.extendedEvalCtx.Placeholders = &p.semaCtx.Placeholders
p.extendedEvalCtx.Annotations = &p.semaCtx.Annotations
p.stmt = stmt
- p.cancelChecker.Reset(ctx)
+ if isPausablePortal() {
+ p.pausablePortal = portal
+ }
// Auto-commit is disallowed during statement execution if we previously
// executed any DDL. This is because may potentially create jobs and do other
@@ -723,6 +911,9 @@ func (ex *connExecutor) execStmtInOpenState(
var stmtThresholdSpan *tracing.Span
alreadyRecording := ex.transitionCtx.sessionTracing.Enabled()
+ // TODO(sql-sessions): fix the stmtTraceThreshold for pausable portals, so
+ // that it records all executions.
+ // https://github.com/cockroachdb/cockroach/issues/99404
stmtTraceThreshold := TraceStmtThreshold.Get(&ex.planner.execCfg.Settings.SV)
var stmtCtx context.Context
// TODO(andrei): I think we should do this even if alreadyRecording == true.
@@ -736,6 +927,8 @@ func (ex *connExecutor) execStmtInOpenState(
var r *tree.ReleaseSavepoint
enforceHomeRegion := p.EnforceHomeRegion()
_, isSelectStmt := stmt.AST.(*tree.Select)
+ // TODO(sql-sessions): ensure this is not broken for pausable portals.
+ // https://github.com/cockroachdb/cockroach/issues/99408
if enforceHomeRegion && ex.state.mu.txn.IsOpen() && isSelectStmt {
// Create a savepoint at a point before which rows were read so that we can
// roll back to it, which will allow the txn to be modified with a
@@ -1103,6 +1296,8 @@ func (ex *connExecutor) commitSQLTransactionInternal(ctx context.Context) error
return err
}
+ ex.extraTxnState.prepStmtsNamespace.closeAllPortals(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc)
+
// We need to step the transaction before committing if it has stepping
// enabled. If it doesn't have stepping enabled, then we just set the
// stepping mode back to what it was.
@@ -1191,6 +1386,9 @@ func (ex *connExecutor) rollbackSQLTransaction(
if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil {
return ex.makeErrEvent(err, stmt)
}
+
+ ex.extraTxnState.prepStmtsNamespace.closeAllPortals(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc)
+
if err := ex.state.mu.txn.Rollback(ctx); err != nil {
log.Warningf(ctx, "txn rollback failed: %s", err)
}
@@ -1213,9 +1411,29 @@ func (ex *connExecutor) rollbackSQLTransaction(
// producing an appropriate state machine event.
func (ex *connExecutor) dispatchToExecutionEngine(
ctx context.Context, planner *planner, res RestrictedCommandResult,
-) error {
+) (retErr error) {
+ getPausablePortalInfo := func() *portalPauseInfo {
+ if planner != nil && planner.pausablePortal != nil {
+ return planner.pausablePortal.pauseInfo
+ }
+ return nil
+ }
+ defer func() {
+ if ppInfo := getPausablePortalInfo(); ppInfo != nil {
+ if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
+ ppInfo.dispatchToExecutionEngine.cleanup.isComplete = true
+ }
+ if retErr != nil || res.Err() != nil {
+ ppInfo.resumableFlow.cleanup.run()
+ ppInfo.dispatchToExecutionEngine.cleanup.run()
+ }
+ }
+ }()
+
stmt := planner.stmt
ex.sessionTracing.TracePlanStart(ctx, stmt.AST.StatementTag())
+ // TODO(sql-sessions): fix the phase time for pausable portals.
+ // https://github.com/cockroachdb/cockroach/issues/99410
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerStartLogicalPlan, timeutil.Now())
if multitenant.TenantRUEstimateEnabled.Get(ex.server.cfg.SV()) {
@@ -1241,10 +1459,25 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.extraTxnState.hasAdminRoleCache.IsSet = true
}
}
- // Prepare the plan. Note, the error is processed below. Everything
- // between here and there needs to happen even if there's an error.
- err := ex.makeExecPlan(ctx, planner)
- defer planner.curPlan.close(ctx)
+
+ var err error
+ if ppInfo := getPausablePortalInfo(); ppInfo != nil {
+ if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
+ err = ex.makeExecPlan(ctx, planner)
+ ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
+ ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
+ fName: "close planTop",
+ f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) },
+ })
+ } else {
+ planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop
+ }
+ } else {
+ // Prepare the plan. Note, the error is processed below. Everything
+ // between here and there needs to happen even if there's an error.
+ err = ex.makeExecPlan(ctx, planner)
+ defer planner.curPlan.close(ctx)
+ }
// Include gist in error reports.
ctx = withPlanGist(ctx, planner.instrumentation.planGist.String())
@@ -1262,17 +1495,56 @@ func (ex *connExecutor) dispatchToExecutionEngine(
var stats topLevelQueryStats
defer func() {
var bulkJobId uint64
- // Note that for bulk job query (IMPORT, BACKUP and RESTORE), we don't
- // use this numRows entry. We emit the number of changed rows when the job
- // completes. (see the usages of logutil.LogJobCompletion()).
- nonBulkJobNumRows := res.RowsAffected()
- switch planner.stmt.AST.(type) {
- case *tree.Import, *tree.Restore, *tree.Backup:
- bulkJobId = res.GetBulkJobId()
- }
- planner.maybeLogStatement(ctx, ex.executorType, false, int(ex.state.mu.autoRetryCounter), ex.extraTxnState.txnCounter, nonBulkJobNumRows, bulkJobId, res.Err(), ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived), &ex.extraTxnState.hasAdminRoleCache, ex.server.TelemetryLoggingMetrics, stmtFingerprintID, &stats)
+ if ppInfo := getPausablePortalInfo(); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
+ ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
+ fName: "log statement",
+ f: func() {
+ planner.maybeLogStatement(
+ ctx,
+ ex.executorType,
+ false, /* isCopy */
+ int(ex.state.mu.autoRetryCounter),
+ ex.extraTxnState.txnCounter,
+ ppInfo.dispatchToExecutionEngine.rowsAffected,
+ bulkJobId,
+ ppInfo.curRes.ErrAllowReleased(),
+ ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
+ &ex.extraTxnState.hasAdminRoleCache,
+ ex.server.TelemetryLoggingMetrics,
+ ppInfo.dispatchToExecutionEngine.stmtFingerprintID,
+ ppInfo.dispatchToExecutionEngine.queryStats,
+ )
+ },
+ })
+ } else {
+ // Note that for bulk job query (IMPORT, BACKUP and RESTORE), we don't
+ // use this numRows entry. We emit the number of changed rows when the job
+ // completes. (see the usages of logutil.LogJobCompletion()).
+ nonBulkJobNumRows := res.RowsAffected()
+ switch planner.stmt.AST.(type) {
+ case *tree.Import, *tree.Restore, *tree.Backup:
+ bulkJobId = res.GetBulkJobId()
+ }
+ planner.maybeLogStatement(
+ ctx,
+ ex.executorType,
+ false, /* isCopy */
+ int(ex.state.mu.autoRetryCounter),
+ ex.extraTxnState.txnCounter,
+ nonBulkJobNumRows,
+ bulkJobId,
+ res.Err(),
+ ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
+ &ex.extraTxnState.hasAdminRoleCache,
+ ex.server.TelemetryLoggingMetrics,
+ stmtFingerprintID,
+ &stats,
+ )
+ }
}()
+ // TODO(sql-sessions): fix the phase time for pausable portals.
+ // https://github.com/cockroachdb/cockroach/issues/99410
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerEndLogicalPlan, timeutil.Now())
ex.sessionTracing.TracePlanEnd(ctx, err)
@@ -1293,9 +1565,32 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}
ex.sessionTracing.TracePlanCheckStart(ctx)
+
+ distSQLMode := ex.sessionData().DistSQLMode
+ if planner.pausablePortal != nil {
+ if len(planner.curPlan.subqueryPlans) == 0 &&
+ len(planner.curPlan.cascades) == 0 &&
+ len(planner.curPlan.checkPlans) == 0 {
+ // We only allow non-distributed plan for pausable portals.
+ distSQLMode = sessiondatapb.DistSQLOff
+ } else {
+ telemetry.Inc(sqltelemetry.SubOrPostQueryStmtsTriedWithPausablePortals)
+ // We don't allow sub / post queries for pausable portal. Set it back to an
+ // un-pausable (normal) portal.
+ // With pauseInfo is nil, no cleanup function will be added to the stack
+ // and all clean-up steps will be performed as for normal portals.
+ planner.pausablePortal.pauseInfo = nil
+ // We need this so that the result consumption for this portal cannot be
+ // paused either.
+ if err := res.RevokePortalPausability(); err != nil {
+ res.SetError(err)
+ return nil
+ }
+ }
+ }
distributePlan := getPlanDistribution(
ctx, planner.Descriptors().HasUncommittedTypes(),
- ex.sessionData().DistSQLMode, planner.curPlan.main,
+ distSQLMode, planner.curPlan.main,
)
ex.sessionTracing.TracePlanCheckEnd(ctx, nil, distributePlan.WillDistribute())
@@ -1303,6 +1598,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.server.cfg.TestingKnobs.BeforeExecute(ctx, stmt.String(), planner.Descriptors())
}
+ // TODO(sql-sessions): fix the phase time for pausable portals.
+ // https://github.com/cockroachdb/cockroach/issues/99410
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerStartExecStmt, timeutil.Now())
progAtomic, err := func() (*uint64, error) {
@@ -1351,6 +1648,12 @@ func (ex *connExecutor) dispatchToExecutionEngine(
stats, err = ex.execWithDistSQLEngine(
ctx, planner, stmt.AST.StatementReturnType(), res, distribute, progAtomic,
)
+ if ppInfo := getPausablePortalInfo(); ppInfo != nil {
+ // For pausable portals, we log the stats when closing the portal, so we need
+ // to aggregate the stats for all executions.
+ ppInfo.dispatchToExecutionEngine.queryStats.add(&stats)
+ }
+
if res.Err() == nil {
isSetOrShow := stmt.AST.StatementTag() == "SET" || stmt.AST.StatementTag() == "SHOW"
if ex.sessionData().InjectRetryErrorsEnabled && !isSetOrShow &&
@@ -1365,20 +1668,36 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}
}
ex.sessionTracing.TraceExecEnd(ctx, res.Err(), res.RowsAffected())
+ // TODO(sql-sessions): fix the phase time for pausable portals.
+ // https://github.com/cockroachdb/cockroach/issues/99410
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerEndExecStmt, timeutil.Now())
ex.extraTxnState.rowsRead += stats.rowsRead
ex.extraTxnState.bytesRead += stats.bytesRead
ex.extraTxnState.rowsWritten += stats.rowsWritten
- populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)
+ if ppInfo := getPausablePortalInfo(); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
+ // We need to ensure that we're using the planner bound to the first-time
+ // execution of a portal.
+ curPlanner := *planner
+ ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
+ fName: "populate query level stats and regions",
+ f: func() {
+ populateQueryLevelStatsAndRegions(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
+ ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(
+ ctx, &curPlanner,
+ int(ex.state.mu.autoRetryCounter), ppInfo.dispatchToExecutionEngine.rowsAffected, ppInfo.curRes.ErrAllowReleased(), *ppInfo.dispatchToExecutionEngine.queryStats,
+ )
+ },
+ })
+ } else {
+ populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)
+ stmtFingerprintID = ex.recordStatementSummary(
+ ctx, planner,
+ int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats,
+ )
+ }
- // Record the statement summary. This also closes the plan if the
- // plan has not been closed earlier.
- stmtFingerprintID = ex.recordStatementSummary(
- ctx, planner,
- int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats,
- )
if ex.server.cfg.TestingKnobs.AfterExecute != nil {
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err())
}
@@ -1444,7 +1763,7 @@ func populateQueryLevelStatsAndRegions(
trace := ih.sp.GetRecording(tracingpb.RecordingStructured)
var err error
queryLevelStats, err := execstats.GetQueryLevelStats(
- trace, p.execCfg.TestingKnobs.DeterministicExplain, flowsMetadata)
+ trace, cfg.TestingKnobs.DeterministicExplain, flowsMetadata)
queryLevelStatsWithErr := execstats.MakeQueryLevelStatsWithErr(queryLevelStats, err)
ih.queryLevelStatsWithErr = &queryLevelStatsWithErr
if err != nil {
diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go
index c5fc74d58656..ab76542887e5 100644
--- a/pkg/sql/conn_io.go
+++ b/pkg/sql/conn_io.go
@@ -808,6 +808,16 @@ type RestrictedCommandResult interface {
// GetBulkJobId returns the id of the job for the query, if the query is
// IMPORT, BACKUP or RESTORE.
GetBulkJobId() uint64
+
+ // ErrAllowReleased returns the error without asserting the result is not
+ // released yet. It should be used only in clean-up stages of a pausable
+ // portal.
+ ErrAllowReleased() error
+
+ // RevokePortalPausability is to make a portal un-pausable. It is called when
+ // we find the underlying query is not supported for a pausable portal.
+ // This method is implemented only by pgwire.limitedCommandResult.
+ RevokePortalPausability() error
}
// DescribeResult represents the result of a Describe command (for either
@@ -976,6 +986,16 @@ type streamingCommandResult struct {
var _ RestrictedCommandResult = &streamingCommandResult{}
var _ CommandResultClose = &streamingCommandResult{}
+// ErrAllowReleased is part of the sql.RestrictedCommandResult interface.
+func (r *streamingCommandResult) ErrAllowReleased() error {
+ return r.err
+}
+
+// RevokePortalPausability is part of the sql.RestrictedCommandResult interface.
+func (r *streamingCommandResult) RevokePortalPausability() error {
+ return errors.AssertionFailedf("forPausablePortal is for limitedCommandResult only")
+}
+
// SetColumns is part of the RestrictedCommandResult interface.
func (r *streamingCommandResult) SetColumns(ctx context.Context, cols colinfo.ResultColumns) {
// The interface allows for cols to be nil, yet the iterator result expects
diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go
index 15f2f903c657..aef205315914 100644
--- a/pkg/sql/distsql_running.go
+++ b/pkg/sql/distsql_running.go
@@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
+ "github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -852,15 +853,16 @@ func (dsp *DistSQLPlanner) Run(
var flow flowinfra.Flow
var err error
- if i := planCtx.getPortalPauseInfo(); i != nil && i.flow != nil {
- flow = i.flow
+ if i := planCtx.getPortalPauseInfo(); i != nil && i.resumableFlow.flow != nil {
+ flow = i.resumableFlow.flow
} else {
ctx, flow, err = dsp.setupFlows(
ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL,
)
if i != nil {
- i.flow = flow
- i.outputTypes = plan.GetResultTypes()
+ // TODO(yuzefovich): add a check that this flow runs in a single goroutine.
+ i.resumableFlow.flow = flow
+ i.resumableFlow.outputTypes = plan.GetResultTypes()
}
}
@@ -1545,16 +1547,16 @@ func (r *DistSQLReceiver) PushBatch(
var (
// ErrLimitedResultNotSupported is an error produced by pgwire
// indicating the user attempted to have multiple active portals but
- // either without setting sql.pgwire.multiple_active_portals.enabled to
+ // either without setting session variable multiple_active_portals_enabled to
// true or the underlying query does not satisfy the restriction.
ErrLimitedResultNotSupported = unimplemented.NewWithIssue(
40195,
"multiple active portals not supported, "+
- "please set sql.pgwire.multiple_active_portals.enabled to true. "+
+ "please set session variable multiple_active_portals_enabled to true. "+
"Note: this feature is in preview",
)
// ErrStmtNotSupportedForPausablePortal is returned when the user have set
- // sql.pgwire.multiple_active_portals.enabled to true but set an unsupported
+ // session variable multiple_active_portals_enabled to true but set an unsupported
// statement for a portal.
ErrStmtNotSupportedForPausablePortal = unimplemented.NewWithIssue(
98911,
@@ -1606,7 +1608,15 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
planner *planner,
recv *DistSQLReceiver,
evalCtxFactory func(usedConcurrently bool) *extendedEvalContext,
-) error {
+) (retErr error) {
+ defer func() {
+ if ppInfo := planCtx.getPortalPauseInfo(); ppInfo != nil && !ppInfo.resumableFlow.cleanup.isComplete {
+ ppInfo.resumableFlow.cleanup.isComplete = true
+ }
+ if retErr != nil && planCtx.getPortalPauseInfo() != nil {
+ planCtx.getPortalPauseInfo().resumableFlow.cleanup.run()
+ }
+ }()
if len(planner.curPlan.subqueryPlans) != 0 {
// Create a separate memory account for the results of the subqueries.
// Note that we intentionally defer the closure of the account until we
@@ -1636,6 +1646,25 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
ctx, evalCtx, planCtx, planner.txn, planner.curPlan.main, recv, finishedSetupFn,
)
}()
+
+ if p := planCtx.getPortalPauseInfo(); p != nil {
+ if buildutil.CrdbTestBuild && p.resumableFlow.flow == nil {
+ checkErr := errors.AssertionFailedf("flow for portal %s cannot be found", planner.pausablePortal.Name)
+ if recv.commErr != nil {
+ recv.commErr = errors.CombineErrors(recv.commErr, checkErr)
+ } else {
+ return checkErr
+ }
+ }
+ if !p.resumableFlow.cleanup.isComplete {
+ p.resumableFlow.cleanup.appendFunc(namedFunc{
+ fName: "cleanup flow", f: func() {
+ p.resumableFlow.flow.Cleanup(ctx)
+ },
+ })
+ }
+ }
+
if recv.commErr != nil || recv.getError() != nil {
return recv.commErr
}
diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go
index 0dcf5fe0d5dd..9e04579e7c9a 100644
--- a/pkg/sql/distsql_running_test.go
+++ b/pkg/sql/distsql_running_test.go
@@ -639,6 +639,14 @@ func TestDistSQLReceiverDrainsMeta(t *testing.T) {
p, err := pgtest.NewPGTest(ctx, tc.Server(0).ServingSQLAddr(), username.RootUser)
require.NoError(t, err)
+ // We disable multiple active portals here as it only supports local-only plan.
+ // TODO(sql-sessions): remove this line when we finish
+ // https://github.com/cockroachdb/cockroach/issues/100822.
+ require.NoError(t, p.SendOneLine(`Query {"String": "SET multiple_active_portals_enabled = false"}`))
+ until := pgtest.ParseMessages("ReadyForQuery")
+ _, err = p.Until(false /* keepErrMsg */, until...)
+ require.NoError(t, err)
+
// Execute the test query asking for at most 25 rows.
require.NoError(t, p.SendOneLine(`Query {"String": "USE test"}`))
require.NoError(t, p.SendOneLine(fmt.Sprintf(`Parse {"Query": "%s"}`, testQuery)))
@@ -649,7 +657,7 @@ func TestDistSQLReceiverDrainsMeta(t *testing.T) {
// Retrieve all of the results. We need to receive until two 'ReadyForQuery'
// messages are returned (the first one for "USE test" query and the second
// one is for the limited portal execution).
- until := pgtest.ParseMessages("ReadyForQuery\nReadyForQuery")
+ until = pgtest.ParseMessages("ReadyForQuery\nReadyForQuery")
msgs, err := p.Until(false /* keepErrMsg */, until...)
require.NoError(t, err)
received := pgtest.MsgsToJSONWithIgnore(msgs, &datadriven.TestData{})
diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go
index 111f88e158b3..5e3eb45e77bc 100644
--- a/pkg/sql/exec_util.go
+++ b/pkg/sql/exec_util.go
@@ -734,14 +734,6 @@ var overrideAlterPrimaryRegionInSuperRegion = settings.RegisterBoolSetting(
false,
).WithPublic()
-var EnableMultipleActivePortals = settings.RegisterBoolSetting(
- settings.TenantWritable,
- "sql.pgwire.multiple_active_portals.enabled",
- "if true, portals with read-only SELECT query without sub/post queries "+
- "can be executed in interleaving manner, but with local execution plan",
- false,
-).WithPublic()
-
var errNoTransactionInProgress = errors.New("there is no transaction in progress")
var errTransactionInProgress = errors.New("there is already a transaction in progress")
@@ -3541,6 +3533,10 @@ func (m *sessionDataMutator) SetStreamerEnabled(val bool) {
m.data.StreamerEnabled = val
}
+func (m *sessionDataMutator) SetMultipleActivePortalsEnabled(val bool) {
+ m.data.MultipleActivePortalsEnabled = val
+}
+
// Utility functions related to scrubbing sensitive information on SQL Stats.
// quantizeCounts ensures that the Count field in the
diff --git a/pkg/sql/execinfra/base.go b/pkg/sql/execinfra/base.go
index f6402e247581..6bcef991e51b 100644
--- a/pkg/sql/execinfra/base.go
+++ b/pkg/sql/execinfra/base.go
@@ -43,8 +43,9 @@ const (
NeedMoreRows ConsumerStatus = iota
// SwitchToAnotherPortal indicates that the we received exec command for
// a different portal, and may come back to continue executing the current
- // portal later. If the cluster setting sql.pgwire.multiple_active_portals.enabled
- // is set to be true, we do nothing and return the control to the connExecutor.
+ // portal later. If the cluster setting session variable
+ // multiple_active_portals_enabled is set to be true, we do nothing and return
+ // the control to the connExecutor.
SwitchToAnotherPortal
// DrainRequested indicates that the consumer will not process any more data
// rows, but will accept trailing metadata from the producer.
diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema
index b78033be74dc..2023603c2a39 100644
--- a/pkg/sql/logictest/testdata/logic_test/information_schema
+++ b/pkg/sql/logictest/testdata/logic_test/information_schema
@@ -5169,7 +5169,8 @@ WHERE
'use_declarative_schema_changer',
'distsql_workmem',
'copy_fast_path_enabled',
- 'direct_columnar_scans_enabled'
+ 'direct_columnar_scans_enabled',
+ 'multiple_active_portals_enabled'
);
----
variable value
diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog
index eed7f0640e46..9516136f8819 100644
--- a/pkg/sql/logictest/testdata/logic_test/pg_catalog
+++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog
@@ -2669,7 +2669,7 @@ SELECT
FROM
pg_catalog.pg_settings
WHERE
- name NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled')
+ name NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled', 'multiple_active_portals_enabled')
----
name setting category short_desc extra_desc vartype
allow_ordinal_column_references off NULL NULL NULL string
@@ -2822,7 +2822,7 @@ SELECT
FROM
pg_catalog.pg_settings
WHERE
- name NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled')
+ name NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled', 'multiple_active_portals_enabled')
----
name setting unit context enumvals boot_val reset_val
allow_ordinal_column_references off NULL user NULL off off
@@ -3058,6 +3058,7 @@ lock_timeout NULL NULL NULL
log_timezone NULL NULL NULL NULL NULL
max_identifier_length NULL NULL NULL NULL NULL
max_index_keys NULL NULL NULL NULL NULL
+multiple_active_portals_enabled NULL NULL NULL NULL NULL
node_id NULL NULL NULL NULL NULL
null_ordered_last NULL NULL NULL NULL NULL
on_update_rehome_row_enabled NULL NULL NULL NULL NULL
diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source
index 5222c85b06c7..60f610cd5e99 100644
--- a/pkg/sql/logictest/testdata/logic_test/show_source
+++ b/pkg/sql/logictest/testdata/logic_test/show_source
@@ -23,7 +23,7 @@ UTF8 1
query TT colnames
SELECT *
FROM [SHOW ALL]
-WHERE variable NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled')
+WHERE variable NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled', 'multiple_active_portals_enabled')
----
variable value
allow_ordinal_column_references off
diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go
index 37aa6d030625..254e589ea2d0 100644
--- a/pkg/sql/pgwire/command_result.go
+++ b/pkg/sql/pgwire/command_result.go
@@ -118,6 +118,11 @@ type paramStatusUpdate struct {
var _ sql.CommandResult = &commandResult{}
+// RevokePortalPausability is part of the sql.RestrictedCommandResult interface.
+func (r *commandResult) RevokePortalPausability() error {
+ return errors.AssertionFailedf("RevokePortalPausability is only implemented by limitedCommandResult only")
+}
+
// Close is part of the sql.RestrictedCommandResult interface.
func (r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator) {
r.assertNotReleased()
@@ -197,6 +202,11 @@ func (r *commandResult) Err() error {
return r.err
}
+// ErrAllowReleased is part of the sql.RestrictedCommandResult interface.
+func (r *commandResult) ErrAllowReleased() error {
+ return r.err
+}
+
// SetError is part of the sql.RestrictedCommandResult interface.
//
// We're not going to write any bytes to the buffer in order to support future
@@ -460,21 +470,6 @@ func (c *conn) newMiscResult(pos sql.CmdPos, typ completionMsgType) *commandResu
// to AddRow will block until the associated client connection asks for more
// rows. It essentially implements the "execute portal with limit" part of the
// Postgres protocol.
-//
-// This design is known to be flawed. It only supports a specific subset of the
-// protocol. We only allow a portal suspension in an explicit transaction where
-// the suspended portal is completely exhausted before any other pgwire command
-// is executed, otherwise an error is produced. You cannot, for example,
-// interleave portal executions (a portal must be executed to completion before
-// another can be executed). It also breaks the software layering by adding an
-// additional state machine here, instead of teaching the state machine in the
-// sql package about portals.
-//
-// This has been done because refactoring the executor to be able to correctly
-// suspend a portal will require a lot of work, and we wanted to move
-// forward. The work included is things like auditing all of the defers and
-// post-execution stuff (like stats collection) to have it only execute once
-// per statement instead of once per portal.
type limitedCommandResult struct {
*commandResult
portalName string
@@ -488,6 +483,8 @@ type limitedCommandResult struct {
portalPausablity sql.PortalPausablity
}
+var _ sql.RestrictedCommandResult = &limitedCommandResult{}
+
// AddRow is part of the sql.RestrictedCommandResult interface.
func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) error {
if err := r.commandResult.AddRow(ctx, row); err != nil {
@@ -517,6 +514,12 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro
return nil
}
+// RevokePortalPausability is part of the sql.RestrictedCommandResult interface.
+func (r *limitedCommandResult) RevokePortalPausability() error {
+ r.portalPausablity = sql.NotPausablePortalForUnsupportedStmt
+ return nil
+}
+
// SupportsAddBatch is part of the sql.RestrictedCommandResult interface.
// TODO(yuzefovich): implement limiting behavior for AddBatch.
func (r *limitedCommandResult) SupportsAddBatch() bool {
diff --git a/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals b/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals
new file mode 100644
index 000000000000..2f77613a307f
--- /dev/null
+++ b/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals
@@ -0,0 +1,1250 @@
+send crdb_only
+Query {"String": "SET multiple_active_portals_enabled = true"}
+----
+
+until crdb_only ignore=NoticeResponse
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"SET"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest select_from_individual_resources
+
+send
+Query {"String": "BEGIN"}
+Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,20)"}
+Parse {"Name": "q2", "Query": "SELECT * FROM generate_series(1,20)"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Sync
+----
+
+until
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Query {"String": "COMMIT"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"COMMIT"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest select_from_same_table
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "BEGIN"}
+Query {"String": "CREATE TABLE mytable (x int)"}
+Query {"String": "INSERT INTO mytable VALUES (1),(2),(3)"}
+Parse {"Name": "q1", "Query": "SELECT * FROM mytable"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Parse {"Name": "q2", "Query": "SELECT * FROM mytable"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Sync
+----
+
+
+until
+ReadyForQuery
+ReadyForQuery
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"CommandComplete","CommandTag":"INSERT 0 3"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+subtest end
+
+
+subtest bind_to_an_existing_active_portal
+
+send
+Parse {"Name": "q3", "Query": "SELECT * FROM mytable"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q3"}
+Execute {"Portal": "p2", "MaxRows": 2}
+Sync
+----
+
+
+until keepErrMessage
+ErrorResponse
+----
+{"Type":"ParseComplete"}
+{"Type":"ErrorResponse","Code":"42P03","Message":"portal \"p2\" already exists"}
+
+send
+Query {"String": "COMMIT"}
+Sync
+----
+
+# Rollback
+until
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"ReadyForQuery","TxStatus":"E"}
+{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest not_in_explicit_transaction
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,20)"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Parse {"Name": "q2", "Query": "SELECT * FROM generate_series(1,20)"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
+Execute {"Portal": "p2", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Sync
+----
+
+
+until
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Execute {"Portal": "p2", "MaxRows": 2}
+Sync
+----
+
+# p2 doesn't exist, as it is closed when the implicit txn is committed.
+until keepErrMessage
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"p2\""}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest drop_table_when_there_are_dependent_active_portals
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "BEGIN"}
+Query {"String": "CREATE TABLE mytable (x int)"}
+Query {"String": "INSERT INTO mytable VALUES (1),(2),(3)"}
+Parse {"Name": "q1", "Query": "SELECT * FROM mytable"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Sync
+----
+
+
+until
+ReadyForQuery
+ReadyForQuery
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"CommandComplete","CommandTag":"INSERT 0 3"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Query {"String": "DROP TABLE mytable"}
+----
+
+
+until noncrdb_only keepErrMessage
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"ErrorResponse","Code":"55006","Message":"cannot DROP TABLE \"mytable\" because it is being used by active queries in this session"}
+{"Type":"ReadyForQuery","TxStatus":"E"}
+
+# For cursor we have `cannot run schema change in a transaction with open DECLARE cursors`.
+# We should have something similar for portals as well.
+# https://github.com/cockroachdb/cockroach/issues/99085
+until crdb_only
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Query {"String": "COMMIT"}
+----
+
+until noncrdb_only
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+until crdb_only
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"COMMIT"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest different_portals_bind_to_the_same_statement
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "BEGIN"}
+Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,20)"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Sync
+----
+
+until
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Execute {"Portal": "p1", "MaxRows": 1}
+Sync
+----
+
+until
+ReadyForQuery
+----
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Query {"String": "COMMIT"}
+Sync
+----
+
+until
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"COMMIT"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest more_complicated_stmts
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "BEGIN; DROP TABLE IF EXISTS ta; DROP TABLE IF EXISTS tb; CREATE TABLE ta (x int, y int); CREATE TABLE tb (x int, z int); INSERT INTO ta VALUES (1,1), (2,2), (3,3), (4,4); INSERT INTO tb VALUES (1,2), (2,3), (3,4), (4,5)"}
+Parse {"Name": "q1", "Query": "SELECT z as myz FROM ta JOIN tb ON ta.x = tb.x ORDER BY myz"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 2}
+Sync
+----
+
+until
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
+{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"INSERT 0 4"}
+{"Type":"CommandComplete","CommandTag":"INSERT 0 4"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"3"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"3"}]}
+{"Type":"DataRow","Values":[{"text":"4"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Query {"String": "COMMIT"}
+Sync
+----
+
+until
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"COMMIT"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest not_supported_statements
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send crdb_only
+Query {"String": "BEGIN; DROP TABLE IF EXISTS t; CREATE TABLE t (x int)"}
+Parse {"Name": "q1", "Query": "WITH t AS (INSERT INTO t(x) VALUES (1), (2), (3) RETURNING x) SELECT * FROM t;"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Sync
+----
+
+until crdb_only keepErrMessage
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries"}
+{"Type":"ReadyForQuery","TxStatus":"E"}
+
+send crdb_only
+Query {"String": "COMMIT"}
+----
+
+until crdb_only
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send crdb_only
+Query {"String": "BEGIN"}
+Parse {"Name": "q1", "Query": "SELECT EXISTS (SELECT 1 FROM crdb_internal.zones WHERE target = 'hello')"}
+Parse {"Name": "q2", "Query": "SELECT EXISTS (SELECT 1 FROM crdb_internal.zones WHERE target = 'hello')"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Sync
+----
+
+until crdb_only keepErrMessage
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"f"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries"}
+{"Type":"ReadyForQuery","TxStatus":"E"}
+
+send crdb_only
+Query {"String": "COMMIT"}
+----
+
+until crdb_only
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send crdb_only
+Query {"String": "BEGIN; DROP TABLE IF EXISTS t; CREATE TABLE t (x int); INSERT INTO t VALUES (1), (2), (3)"}
+Parse {"Name": "q1", "Query": "UPDATE t SET x = 10 WHERE true RETURNING x;"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Sync
+----
+
+until crdb_only keepErrMessage
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"INSERT 0 3"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"10"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries"}
+{"Type":"ReadyForQuery","TxStatus":"E"}
+
+send crdb_only
+Query {"String": "COMMIT"}
+----
+
+until crdb_only
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send crdb_only
+Query {"String": "BEGIN; DROP TABLE IF EXISTS t; CREATE TABLE t (x int)"}
+Parse {"Name": "q1", "Query": "INSERT INTO t VALUES (1), (2), (3) RETURNING x;"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Sync
+----
+
+
+until crdb_only keepErrMessage
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries"}
+{"Type":"ReadyForQuery","TxStatus":"E"}
+
+send crdb_only
+Query {"String": "COMMIT"}
+----
+
+until crdb_only
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest query_timeout
+# https://github.com/cockroachdb/cockroach/issues/99140
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "BEGIN"}
+Query {"String": "SET statement_timeout='2s'"}
+Parse {"Name": "q1", "Query": "SELECT i, pg_sleep(0.5) FROM generate_series(1, 6) AS g(i)"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Sync
+----
+
+# The output for pg_sleep differ between cockroach and postgres:
+# https://github.com/cockroachdb/cockroach/issues/98913
+until crdb_only keepErrMessage
+ReadyForQuery
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"CommandComplete","CommandTag":"SET"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"},{"text":"t"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"},{"text":"t"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"3"},{"text":"t"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ErrorResponse","Code":"57014","Message":"query execution canceled due to statement timeout"}
+{"Type":"ReadyForQuery","TxStatus":"E"}
+
+until noncrdb_only keepErrMessage
+ReadyForQuery
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"CommandComplete","CommandTag":"SET"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"},null]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"},null]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"3"},null]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"4"},null]}
+{"Type":"PortalSuspended"}
+{"Type":"ErrorResponse","Code":"57014","Message":"canceling statement due to statement timeout"}
+{"Type":"ReadyForQuery","TxStatus":"E"}
+
+send
+Query {"String": "COMMIT"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+# timeout test with another query interleaving the portal execution.
+send
+Query {"String": "SET statement_timeout='2s'"}
+Parse {"Name": "q1", "Query": "SELECT i, pg_sleep(0.5) FROM generate_series(1, 6) AS g(i)"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p1", "MaxRows": 1}
+Query {"String": "SELECT pg_sleep(2)"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Sync
+----
+
+until crdb_only keepErrMessage
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"SET"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"},{"text":"t"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"},{"text":"t"}]}
+{"Type":"PortalSuspended"}
+{"Type":"RowDescription","Fields":[{"Name":"pg_sleep","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":16,"DataTypeSize":1,"TypeModifier":-1,"Format":0}]}
+{"Type":"ErrorResponse","Code":"57014","Message":"query execution canceled due to statement timeout"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"p1\""}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+until noncrdb_only keepErrMessage
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"SET"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"},null]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"},null]}
+{"Type":"PortalSuspended"}
+{"Type":"RowDescription","Fields":[{"Name":"pg_sleep","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":2278,"DataTypeSize":4,"TypeModifier":-1,"Format":0}]}
+{"Type":"ErrorResponse","Code":"57014","Message":"canceling statement due to statement timeout"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+{"Type":"ErrorResponse","Code":"34000","Message":"portal \"p1\" does not exist"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "RESET statement_timeout"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"RESET"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+subtest cancel_query_bug
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send crdb_only
+Query {"String": "BEGIN"}
+Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(11,21)"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Sync
+----
+
+until crdb_only
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"11"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+# We check the status of q1, cancel it, and recheck the status.
+send crdb_only
+Query {"String": "WITH x AS (SHOW CLUSTER STATEMENTS) SELECT query, phase FROM x WHERE query = 'SELECT * FROM ROWS FROM (generate_series(11, 21))'"}
+Query {"String": "CANCEL QUERY (WITH x AS (SHOW CLUSTER STATEMENTS) SELECT query_id FROM x WHERE query = 'SELECT * FROM ROWS FROM (generate_series(11, 21))');"}
+Query {"String": "WITH x AS (SHOW CLUSTER STATEMENTS) SELECT query, phase FROM x WHERE query = 'SELECT * FROM ROWS FROM (generate_series(11, 21))'"}
+Sync
+----
+
+# TODO(janexing): the query should have been cancelled but it still shows
+# `executing` status. It should be in `cancelled` status.
+until crdb_only ignore=RowDescription
+ReadyForQuery
+ReadyForQuery
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"DataRow","Values":[{"text":"SELECT * FROM ROWS FROM (generate_series(11, 21))"},{"text":"executing"}]}
+{"Type":"CommandComplete","CommandTag":"SELECT 1"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"CommandComplete","CommandTag":"CANCEL QUERIES 1"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"DataRow","Values":[{"text":"SELECT * FROM ROWS FROM (generate_series(11, 21))"},{"text":"executing"}]}
+{"Type":"CommandComplete","CommandTag":"SELECT 1"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+# q1 has been cancelled, so portals bound to it cannot be further executed.
+send crdb_only
+Execute {"Portal": "p1", "MaxRows": 1}
+Sync
+----
+
+until crdb_only ignore=RowDescription keepErrMessage
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"ErrorResponse","Code":"57014","Message":"query execution canceled"}
+{"Type":"ReadyForQuery","TxStatus":"E"}
+
+send crdb_only
+Query {"String": "COMMIT"}
+----
+
+until crdb_only
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest ingest_non_retriable_error
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+# We force a non-retriable error here, and check that in this case we close
+# all pausable portals.
+send crdb_only
+Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,20)"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Query {"String": "SELECT crdb_internal.force_error('','foo')"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Sync
+----
+
+until crdb_only keepErrMessage ignore=RowDescription
+ErrorResponse
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ErrorResponse","Code":"XXUUU","Message":"foo"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"p1\""}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest interleave_with_unpausable_portal
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "BEGIN; CREATE TABLE t (x int); INSERT INTO t VALUES (1), (2), (3)"}
+Parse {"Name": "q1", "Query": "SELECT * FROM t"}
+Parse {"Name": "q2", "Query": "UPDATE t SET x = 10 RETURNING x"}
+Parse {"Name": "q3", "Query": "INSERT INTO t VALUES (5), (6), (7)"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
+Bind {"DestinationPortal": "p3", "PreparedStatement": "q3"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Execute {"Portal": "p3", "MaxRows": 1}
+Sync
+----
+
+until keepErrMessage
+ReadyForQuery
+ErrorResponse
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"INSERT 0 3"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"ParseComplete"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"BindComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"10"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries"}
+{"Type":"ReadyForQuery","TxStatus":"E"}
+
+send
+Query {"String": "COMMIT"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest pausable_portals_with_virtual_tables
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "BEGIN; CREATE TABLE a (id int UNIQUE, name TEXT); CREATE TABLE t1(a int primary key, b int); CREATE TABLE t2(a int primary key, b int); CREATE TABLE t3(a int primary key, b int); CREATE TABLE t4(a int primary key, b int); CREATE TABLE t5(a int primary key, b int); CREATE TABLE t6(a int primary key, b int); CREATE TABLE t7(a int primary key, b int); CREATE TABLE t8(a int primary key, b int);"}
+Parse {"Name": "q", "Query": "SELECT a.attname, format_type(a.atttypid, a.atttypmod), pg_get_expr(d.adbin, d.adrelid), a.attnotnull, a.atttypid, a.atttypmod FROM pg_attribute a LEFT JOIN pg_attrdef d ON a.attrelid = d.adrelid AND a.attnum = d.adnum WHERE a.attrelid = 'a'::regclass AND a.attnum > 0 AND NOT a.attisdropped ORDER BY a.attnum;"}
+Bind {"DestinationPortal": "p", "PreparedStatement": "q"}
+Execute {"Portal": "p", "MaxRows": 1}
+Sync
+----
+
+until
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"id"},{"text":"bigint"},null,{"text":"f"},{"text":"20"},{"text":"-1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Execute {"Portal": "p", "MaxRows": 1}
+Sync
+----
+
+until
+ReadyForQuery
+----
+{"Type":"DataRow","Values":[{"text":"name"},{"text":"text"},null,{"text":"f"},{"text":"25"},{"text":"-1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Parse {"Name": "q2", "Query": "SELECT a.attname AS column_name, NOT (a.attnotnull OR ((t.typtype = 'd') AND t.typnotnull)) AS is_nullable, pg_get_expr(ad.adbin, ad.adrelid) AS column_default FROM pg_attribute AS a LEFT JOIN pg_attrdef AS ad ON (a.attrelid = ad.adrelid) AND (a.attnum = ad.adnum) JOIN pg_type AS t ON a.atttypid = t.oid JOIN pg_class AS c ON a.attrelid = c.oid JOIN pg_namespace AS n ON c.relnamespace = n.oid WHERE ( ( (c.relkind IN ('f', 'm', 'p', 'r', 'v')) AND (c.relname ILIKE '%t%') ) AND (n.nspname NOT IN ('pg_catalog', 'pg_toast')) ) AND pg_table_is_visible(c.oid) ORDER BY 1, 2;"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
+Execute {"Portal": "p2", "MaxRows": 3}
+Sync
+----
+
+until
+ReadyForQuery
+----
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]}
+{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]}
+{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Execute {"Portal": "p", "MaxRows": 1}
+Sync
+----
+
+until
+ReadyForQuery
+----
+{"Type":"DataRow","Values":[{"text":"rowid"},{"text":"bigint"},{"text":"unique_rowid()"},{"text":"t"},{"text":"20"},{"text":"-1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Execute {"Portal": "p2", "MaxRows": 5}
+Sync
+----
+
+until
+ReadyForQuery
+----
+{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]}
+{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]}
+{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]}
+{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]}
+{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send crdb_only
+Execute {"Portal": "p2"}
+Sync
+----
+
+until crdb_only
+ReadyForQuery
+----
+{"Type":"DataRow","Values":[{"text":"auth_name"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"auth_srid"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"coord_dimension"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"f_geometry_column"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"f_table_catalog"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"f_table_name"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"f_table_schema"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"proj4text"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"rowid"},{"text":"f"},{"text":"unique_rowid()"}]}
+{"Type":"DataRow","Values":[{"text":"rowid"},{"text":"f"},{"text":"unique_rowid()"}]}
+{"Type":"DataRow","Values":[{"text":"srid"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"srid"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"srtext"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"type"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"x"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"x"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"y"},{"text":"t"},null]}
+{"Type":"DataRow","Values":[{"text":"z"},{"text":"t"},null]}
+{"Type":"CommandComplete","CommandTag":"SELECT 26"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Parse {"Name": "q3", "Query": "WITH RECURSIVE cte(a, b) AS (SELECT 0, 0 UNION ALL SELECT a+1, b+10 FROM cte WHERE a < 5) SELECT * FROM cte;"}
+Parse {"Name": "q4", "Query": "WITH RECURSIVE cte(a, b) AS (SELECT 0, 0 UNION ALL SELECT a+1, b+10 FROM cte WHERE a < 5) SELECT * FROM cte;"}
+Bind {"DestinationPortal": "p3", "PreparedStatement": "q3"}
+Bind {"DestinationPortal": "p4", "PreparedStatement": "q4"}
+Execute {"Portal": "p3", "MaxRows": 1}
+Execute {"Portal": "p4", "MaxRows": 1}
+Execute {"Portal": "p4", "MaxRows": 1}
+Execute {"Portal": "p3", "MaxRows": 1}
+Sync
+----
+
+until crdb_only
+ReadyForQuery
+----
+{"Type":"ParseComplete"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"0"},{"text":"0"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"0"},{"text":"0"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"1"},{"text":"10"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"1"},{"text":"10"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Query {"String": "COMMIT"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"COMMIT"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest deallocating_prepared_stmt_should_not_interrupt_portal_execution
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "BEGIN"}
+Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,20)"}
+Parse {"Name": "q2", "Query": "SELECT * FROM generate_series(1,20)"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Query {"String": "DEALLOCATE q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Execute {"Portal": "p2", "MaxRows": 1}
+Sync
+----
+
+until
+ReadyForQuery
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+send
+Query {"String": "COMMIT"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"COMMIT"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+subtest end
+
+
+subtest close_conn_executor_without_committing
+
+send
+Query {"String": "DEALLOCATE ALL;"}
+----
+
+until
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
+{"Type":"ReadyForQuery","TxStatus":"I"}
+
+send
+Query {"String": "BEGIN"}
+Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,5)"}
+Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
+Execute {"Portal": "p1", "MaxRows": 1}
+Parse {"Name": "q2", "Query": "SELECT * FROM generate_series(8,10)"}
+Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
+Execute {"Portal": "p2", "MaxRows": 1}
+Execute {"Portal": "p1"}
+Sync
+----
+
+until
+ReadyForQuery
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"BEGIN"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"1"}]}
+{"Type":"PortalSuspended"}
+{"Type":"ParseComplete"}
+{"Type":"BindComplete"}
+{"Type":"DataRow","Values":[{"text":"8"}]}
+{"Type":"PortalSuspended"}
+{"Type":"DataRow","Values":[{"text":"2"}]}
+{"Type":"DataRow","Values":[{"text":"3"}]}
+{"Type":"DataRow","Values":[{"text":"4"}]}
+{"Type":"DataRow","Values":[{"text":"5"}]}
+{"Type":"CommandComplete","CommandTag":"SELECT 4"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
+subtest end
diff --git a/pkg/sql/pgwire/testdata/pgtest/portals b/pkg/sql/pgwire/testdata/pgtest/portals
index 3f0b9a472bc9..8e8f57c1c606 100644
--- a/pkg/sql/pgwire/testdata/pgtest/portals
+++ b/pkg/sql/pgwire/testdata/pgtest/portals
@@ -21,11 +21,11 @@ Execute
Sync
----
-until
+until keepErrMessage
ErrorResponse
ReadyForQuery
----
-{"Type":"ErrorResponse","Code":"34000"}
+{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"\""}
{"Type":"ReadyForQuery","TxStatus":"I"}
# Verify that closing a bound portal prevents execution.
@@ -38,14 +38,14 @@ Execute {"Portal": "p"}
Sync
----
-until
+until keepErrMessage
ErrorResponse
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"CloseComplete"}
-{"Type":"ErrorResponse","Code":"34000"}
+{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"p\""}
{"Type":"ReadyForQuery","TxStatus":"I"}
# The spec says that closing a prepared statement also closes its portals,
@@ -337,11 +337,11 @@ Execute
Sync
----
-until
+until keepErrMessage
ErrorResponse
ReadyForQuery
----
-{"Type":"ErrorResponse","Code":"34000"}
+{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"\""}
{"Type":"ReadyForQuery","TxStatus":"I"}
send
@@ -384,12 +384,12 @@ Execute
Sync
----
-until
+until keepErrMessage
ErrorResponse
ReadyForQuery
----
{"Type":"CloseComplete"}
-{"Type":"ErrorResponse","Code":"34000"}
+{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"\""}
{"Type":"ReadyForQuery","TxStatus":"E"}
send
@@ -928,6 +928,16 @@ ReadyForQuery
{"Type":"CommandComplete","CommandTag":"BEGIN"}
{"Type":"ReadyForQuery","TxStatus":"T"}
+send crdb_only
+Query {"String": "SET multiple_active_portals_enabled = true"}
+----
+
+until crdb_only ignore=NoticeResponse
+ReadyForQuery
+----
+{"Type":"CommandComplete","CommandTag":"SET"}
+{"Type":"ReadyForQuery","TxStatus":"T"}
+
# 'S' for Statement
# 49 = ASCII '1'
# ParameterFormatCodes = [0] for text format
@@ -959,7 +969,7 @@ Execute {"Portal": "por"}
Sync
----
-until noncrdb_only
+until
ReadyForQuery
----
{"Type":"ParseComplete"}
@@ -971,33 +981,17 @@ ReadyForQuery
{"Type":"CommandComplete","CommandTag":"SELECT 2"}
{"Type":"ReadyForQuery","TxStatus":"T"}
-until crdb_only
-ErrorResponse
-ReadyForQuery
-----
-{"Type":"ErrorResponse","Code":"0A000"}
-{"Type":"ReadyForQuery","TxStatus":"E"}
-
-send noncrdb_only
+send
Execute {"Portal": "pc4"}
Sync
----
-until noncrdb_only
+until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"COMMIT"}
{"Type":"ReadyForQuery","TxStatus":"I"}
-send crdb_only
-Query {"String": "ROLLBACK"}
-----
-
-until crdb_only
-ReadyForQuery
-----
-{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
-{"Type":"ReadyForQuery","TxStatus":"I"}
# Test that we can use a portal with MaxRows from an implicit transaction.
send
@@ -1185,7 +1179,7 @@ Execute
Sync
----
-until ignore_constraint_name
+until keepErrMessage ignore_constraint_name
ErrorResponse
ReadyForQuery
----
@@ -1194,7 +1188,7 @@ ReadyForQuery
{"Type":"CommandComplete","CommandTag":"INSERT 0 1"}
{"Type":"BindComplete"}
{"Type":"NoData"}
-{"Type":"ErrorResponse","Code":"23514"}
+{"Type":"ErrorResponse","Code":"23514","Message":"failed to satisfy CHECK constraint (a \u003e 1.0:::FLOAT8)"}
{"Type":"ReadyForQuery","TxStatus":"I"}
send
@@ -1230,11 +1224,11 @@ Execute {"Portal": "p16"}
Sync
----
-until
+until keepErrMessage
ErrorResponse
ReadyForQuery
----
-{"Type":"ErrorResponse","Code":"34000"}
+{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"p16\""}
{"Type":"ReadyForQuery","TxStatus":"I"}
# Verify that it's not possible to DECLARE a cursor of the same name as an open
@@ -1265,7 +1259,7 @@ Execute
Sync
----
-until
+until keepErrMessage
ErrorResponse
ReadyForQuery
----
@@ -1275,7 +1269,7 @@ ReadyForQuery
{"Type":"BindComplete"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
-{"Type":"ErrorResponse","Code":"42P03"}
+{"Type":"ErrorResponse","Code":"42P03","Message":"cursor \"p\" already exists as portal"}
{"Type":"ReadyForQuery","TxStatus":"E"}
send
@@ -1309,13 +1303,13 @@ Execute {"Portal": "fnord"}
Sync
----
-until
+until keepErrMessage
ErrorResponse
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
-{"Type":"ErrorResponse","Code":"42P03"}
+{"Type":"ErrorResponse","Code":"42P03","Message":"cursor \"fnord\" already exists as portal"}
{"Type":"ReadyForQuery","TxStatus":"E"}
# Test the converse case: ensure that a portal cannot be created with the same
@@ -1364,12 +1358,12 @@ Execute {"Portal": "sqlcursor"}
Sync
----
-until
+until keepErrMessage
ErrorResponse
ReadyForQuery
----
{"Type":"ParseComplete"}
-{"Type":"ErrorResponse","Code":"42P03"}
+{"Type":"ErrorResponse","Code":"42P03","Message":"portal \"sqlcursor\" already exists as cursor"}
{"Type":"ReadyForQuery","TxStatus":"E"}
send
diff --git a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs
index 1afeb7f1788f..8c682a98e330 100644
--- a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs
+++ b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs
@@ -5,89 +5,6 @@
only crdb
----
-# More behavior that differs from postgres. Try executing a new query
-# when a portal is suspended. Cockroach errors.
-
-send
-Query {"String": "BEGIN"}
-Parse {"Query": "SELECT * FROM generate_series(1, 2)"}
-Bind
-Execute {"MaxRows": 1}
-Query {"String": "SELECT 1"}
-Sync
-----
-
-until keepErrMessage
-ReadyForQuery
-ErrorResponse
-ReadyForQuery
-ReadyForQuery
-----
-{"Type":"CommandComplete","CommandTag":"BEGIN"}
-{"Type":"ReadyForQuery","TxStatus":"T"}
-{"Type":"ParseComplete"}
-{"Type":"BindComplete"}
-{"Type":"DataRow","Values":[{"text":"1"}]}
-{"Type":"PortalSuspended"}
-{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set sql.pgwire.multiple_active_portals.enabled to true. Note: this feature is in preview"}
-{"Type":"ReadyForQuery","TxStatus":"E"}
-{"Type":"ReadyForQuery","TxStatus":"E"}
-
-send
-Query {"String": "ROLLBACK"}
-Query {"String": "SELECT 'here'"}
-----
-
-until ignore=RowDescription
-ReadyForQuery
-ReadyForQuery
-----
-{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
-{"Type":"ReadyForQuery","TxStatus":"I"}
-{"Type":"DataRow","Values":[{"text":"here"}]}
-{"Type":"CommandComplete","CommandTag":"SELECT 1"}
-{"Type":"ReadyForQuery","TxStatus":"I"}
-
-# Also try binding another portal during suspension.
-
-send
-Query {"String": "BEGIN"}
-Parse {"Query": "SELECT * FROM generate_series(1, 2)"}
-Bind
-Execute {"MaxRows": 1}
-Bind
-Sync
-----
-
-until keepErrMessage
-ReadyForQuery
-ErrorResponse
-ReadyForQuery
-----
-{"Type":"CommandComplete","CommandTag":"BEGIN"}
-{"Type":"ReadyForQuery","TxStatus":"T"}
-{"Type":"ParseComplete"}
-{"Type":"BindComplete"}
-{"Type":"DataRow","Values":[{"text":"1"}]}
-{"Type":"PortalSuspended"}
-{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set sql.pgwire.multiple_active_portals.enabled to true. Note: this feature is in preview"}
-{"Type":"ReadyForQuery","TxStatus":"E"}
-
-send
-Query {"String": "ROLLBACK"}
-Query {"String": "SELECT 'here'"}
-----
-
-until ignore=RowDescription
-ReadyForQuery
-ReadyForQuery
-----
-{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
-{"Type":"ReadyForQuery","TxStatus":"I"}
-{"Type":"DataRow","Values":[{"text":"here"}]}
-{"Type":"CommandComplete","CommandTag":"SELECT 1"}
-{"Type":"ReadyForQuery","TxStatus":"I"}
-
##############################################################################
# Deviations from Postgres in how we handle portals' suspension and attempts #
# to execute exhausted portals. #
diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go
index 555a3db7eb39..e6076bd1dc27 100644
--- a/pkg/sql/planner.go
+++ b/pkg/sql/planner.go
@@ -272,7 +272,7 @@ type planner struct {
// hasFlowForPausablePortal returns true if the planner is for re-executing a
// portal. We reuse the flow stored in p.pausablePortal.pauseInfo.
func (p *planner) hasFlowForPausablePortal() bool {
- return p.pausablePortal != nil && p.pausablePortal.pauseInfo != nil && p.pausablePortal.pauseInfo.flow != nil
+ return p.pausablePortal != nil && p.pausablePortal.pauseInfo != nil && p.pausablePortal.pauseInfo.resumableFlow.flow != nil
}
// resumeFlowForPausablePortal is called when re-executing a portal. We reuse
@@ -282,8 +282,12 @@ func (p *planner) resumeFlowForPausablePortal(recv *DistSQLReceiver) error {
return errors.AssertionFailedf("no flow found for pausable portal")
}
recv.discardRows = p.instrumentation.ShouldDiscardRows()
- recv.outputTypes = p.pausablePortal.pauseInfo.outputTypes
- p.pausablePortal.pauseInfo.flow.Resume(recv)
+ recv.outputTypes = p.pausablePortal.pauseInfo.resumableFlow.outputTypes
+ flow := p.pausablePortal.pauseInfo.resumableFlow.flow
+ finishedSetupFn, cleanup := getFinishedSetupFn(p)
+ finishedSetupFn(flow)
+ defer cleanup()
+ flow.Resume(recv)
return recv.commErr
}
diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go
index 2f3834bda37c..16ca739a4f55 100644
--- a/pkg/sql/prepared_stmt.go
+++ b/pkg/sql/prepared_stmt.go
@@ -15,12 +15,17 @@ import (
"time"
"unsafe"
+ "github.com/cockroachdb/cockroach/pkg/server/telemetry"
+ "github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
+ "github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
+ "github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)
@@ -127,15 +132,15 @@ type PortalPausablity int64
const (
// PortalPausabilityDisabled is the default status of a portal when
- // sql.pgwire.multiple_active_portals.enabled is false.
+ // the session variable multiple_active_portals_enabled is false.
PortalPausabilityDisabled PortalPausablity = iota
- // PausablePortal is set when sql.pgwire.multiple_active_portals.enabled is
- // set to true and the underlying statement is a read-only SELECT query with
- // no sub-queries or post-queries.
+ // PausablePortal is set when the session variable multiple_active_portals_enabled
+ // is set to true and the underlying statement is a read-only SELECT query
+ // with no sub-queries or post-queries.
PausablePortal
// NotPausablePortalForUnsupportedStmt is used when the cluster setting
- // sql.pgwire.multiple_active_portals.enabled is set to true, while we don't
- // support underlying statement.
+ // the session variable multiple_active_portals_enabled is set to true, while
+ // we don't support underlying statement.
NotPausablePortalForUnsupportedStmt
)
@@ -180,9 +185,18 @@ func (ex *connExecutor) makePreparedPortal(
OutFormats: outFormats,
}
- if EnableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) {
- portal.pauseInfo = &portalPauseInfo{}
- portal.portalPausablity = PausablePortal
+ if ex.sessionData().MultipleActivePortalsEnabled && ex.executorType != executorTypeInternal {
+ telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals)
+ if tree.IsAllowedToPause(stmt.AST) {
+ portal.pauseInfo = &portalPauseInfo{}
+ portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{}
+ portal.portalPausablity = PausablePortal
+ } else {
+ telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
+ // We have set the session variable multiple_active_portals_enabled to
+ // true, but we don't support the underlying query for a pausable portal.
+ portal.portalPausablity = NotPausablePortalForUnsupportedStmt
+ }
}
return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
}
@@ -206,22 +220,163 @@ func (p *PreparedPortal) close(
) {
prepStmtsNamespaceMemAcc.Shrink(ctx, p.size(portalName))
p.Stmt.decRef(ctx)
+ if p.pauseInfo != nil {
+ p.pauseInfo.cleanupAll()
+ p.pauseInfo = nil
+ }
}
func (p *PreparedPortal) size(portalName string) int64 {
return int64(uintptr(len(portalName)) + unsafe.Sizeof(p))
}
+func (p *PreparedPortal) isPausable() bool {
+ return p.pauseInfo != nil
+}
+
+// cleanupFuncStack stores cleanup functions for a portal. The clean-up
+// functions are added during the first-time execution of a portal. When the
+// first-time execution is finished, we mark isComplete to true.
+type cleanupFuncStack struct {
+ stack []namedFunc
+ isComplete bool
+}
+
+func (n *cleanupFuncStack) appendFunc(f namedFunc) {
+ n.stack = append(n.stack, f)
+}
+
+func (n *cleanupFuncStack) run() {
+ for i := 0; i < len(n.stack); i++ {
+ n.stack[i].f()
+ }
+ *n = cleanupFuncStack{}
+}
+
+// namedFunc is function with name, which makes the debugging easier. It is
+// used just for clean up functions of a pausable portal.
+type namedFunc struct {
+ fName string
+ f func()
+}
+
+// instrumentationHelperWrapper wraps the instrumentation helper.
+// We need to maintain it for a paused portal.
+type instrumentationHelperWrapper struct {
+ ctx context.Context
+ ih instrumentationHelper
+}
+
// portalPauseInfo stores info that enables the pause of a portal. After pausing
// the portal, execute any other statement, and come back to re-execute it or
// close it.
type portalPauseInfo struct {
- // outputTypes are the types of the result columns produced by the physical plan.
- // We need this as when re-executing the portal, we are reusing the flow
- // with the new receiver, but not re-generating the physical plan.
- outputTypes []*types.T
- // We need to store the flow for a portal so that when re-executing it, we
- // continue from the previous execution. It lives along with the portal, and
- // will be cleaned-up when the portal is closed.
- flow flowinfra.Flow
+ // curRes is the result channel of the current (or last, if the portal is
+ // closing) portal execution. It is assumed to be a pgwire.limitedCommandResult.
+ curRes RestrictedCommandResult
+ // The following 4 structs store information to persist for a portal's
+ // execution, as well as cleanup functions to be called when the portal is
+ // closed. These structs correspond to a function call chain for a query's
+ // execution:
+ // - connExecutor.execPortal()
+ // - connExecutor.execStmtInOpenStateCleanup()
+ // - connExecutor.dispatchToExecutionEngine()
+ // - DistSQLPlanner.Run()
+ //
+ // Each struct has two main parts:
+ // 1. Fields that need to be retained for a resumption of a portal's execution.
+ // 2. A cleanup function stack that will be called when the portal's execution
+ // has to be terminated or when the portal is to be closed. Each stack is
+ // defined as a closure in its corresponding function.
+ //
+ // When closing a portal, we need to follow the reverse order of its execution,
+ // which means running the cleanup functions of the four structs in the
+ // following order:
+ // - exhaustPortal.cleanup
+ // - execStmtInOpenState.cleanup
+ // - dispatchToExecutionEngine.cleanup
+ // - resumableFlow.cleanup
+ //
+ // If an error occurs in any of these functions, we run the cleanup functions of
+ // this layer and its children layers, and propagate the error to the parent
+ // layer. For example, if an error occurs in execStmtInOpenStateCleanup(), we
+ // run the cleanup functions in the following order:
+ // - execStmtInOpenState.cleanup
+ // - dispatchToExecutionEngine.cleanup
+ // - resumableFlow.cleanup
+ //
+ // When exiting connExecutor.execStmtInOpenState(), we finally run the
+ // exhaustPortal.cleanup function in connExecutor.execPortal().
+ exhaustPortal struct {
+ cleanup cleanupFuncStack
+ }
+
+ // TODO(sql-session): replace certain fields here with planner.
+ // https://github.com/cockroachdb/cockroach/issues/99625
+ execStmtInOpenState struct {
+ spCtx context.Context
+ // queryID stores the id of the query that this portal bound to. When we re-execute
+ // an existing portal, we should use the same query id.
+ queryID clusterunique.ID
+ // ihWrapper stores the instrumentation helper that should be reused for
+ // each execution of the portal.
+ ihWrapper *instrumentationHelperWrapper
+ // cancelQueryFunc will be called to cancel the context of the query when
+ // the portal is closed.
+ cancelQueryFunc context.CancelFunc
+ // cancelQueryCtx is the context to be canceled when closing the portal.
+ cancelQueryCtx context.Context
+ // retPayload is needed for the cleanup steps as we will have to check the
+ // latest encountered error, so this field should be updated for each execution.
+ retPayload fsm.EventPayload
+ // retErr is needed for the cleanup steps as we will have to check the latest
+ // encountered error, so this field should be updated for each execution.
+ retErr error
+ cleanup cleanupFuncStack
+ }
+
+ // TODO(sql-session): replace certain fields here with planner.
+ // https://github.com/cockroachdb/cockroach/issues/99625
+ dispatchToExecutionEngine struct {
+ // rowsAffected is the number of rows queried with this portal. It is
+ // accumulated from each portal execution, and will be logged when this portal
+ // is closed.
+ rowsAffected int
+ // stmtFingerprintID is the fingerprint id of the underlying statement.
+ stmtFingerprintID appstatspb.StmtFingerprintID
+ // planTop collects the properties of the current plan being prepared.
+ // We reuse it when re-executing the portal.
+ // TODO(yuzefovich): consider refactoring distSQLFlowInfos from planTop to
+ // avoid storing the planTop here.
+ planTop planTop
+ // queryStats stores statistics on query execution. It is incremented for
+ // each execution of the portal.
+ queryStats *topLevelQueryStats
+ cleanup cleanupFuncStack
+ }
+
+ resumableFlow struct {
+ // We need to store the flow for a portal so that when re-executing it, we
+ // continue from the previous execution. It lives along with the portal, and
+ // will be cleaned-up when the portal is closed.
+ flow flowinfra.Flow
+ // outputTypes are the types of the result columns produced by the physical plan.
+ // We need this as when re-executing the portal, we are reusing the flow
+ // with the new receiver, but not re-generating the physical plan.
+ outputTypes []*types.T
+ cleanup cleanupFuncStack
+ }
+}
+
+// cleanupAll is to run all the cleanup layers.
+func (pm *portalPauseInfo) cleanupAll() {
+ pm.resumableFlow.cleanup.run()
+ pm.dispatchToExecutionEngine.cleanup.run()
+ pm.execStmtInOpenState.cleanup.run()
+ pm.exhaustPortal.cleanup.run()
+}
+
+// isQueryIDSet returns true if the query id for the portal is set.
+func (pm *portalPauseInfo) isQueryIDSet() bool {
+ return !pm.execStmtInOpenState.queryID.Equal(clusterunique.ID{}.Uint128)
}
diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go
index 723fb01f5b3d..686ccd1d1b0c 100644
--- a/pkg/sql/sem/tree/stmt.go
+++ b/pkg/sql/sem/tree/stmt.go
@@ -121,6 +121,30 @@ type canModifySchema interface {
modifiesSchema() bool
}
+// IsAllowedToPause returns true if the stmt cannot either modify the schema or
+// write data.
+// This function is to gate the queries allowed for pausable portals.
+// TODO(janexing): We should be more accurate about the stmt selection here.
+// Now we only allow SELECT, but is it too strict? And how to filter out
+// SELECT with data writes / schema changes?
+func IsAllowedToPause(stmt Statement) bool {
+ if stmt != nil && !CanModifySchema(stmt) && !CanWriteData(stmt) {
+ switch t := stmt.(type) {
+ case *Select:
+ if t.With != nil {
+ ctes := t.With.CTEList
+ for _, cte := range ctes {
+ if !IsAllowedToPause(cte.Stmt) {
+ return false
+ }
+ }
+ }
+ return true
+ }
+ }
+ return false
+}
+
// CanModifySchema returns true if the statement can modify
// the database schema.
func CanModifySchema(stmt Statement) bool {
diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto
index 9a5fd41d163d..19914953017a 100644
--- a/pkg/sql/sessiondatapb/local_only_session_data.proto
+++ b/pkg/sql/sessiondatapb/local_only_session_data.proto
@@ -369,6 +369,11 @@ message LocalOnlySessionData {
// DisableDropTenant causes errors when the client
// attempts to drop tenants or tenant records.
bool disable_drop_tenant = 99;
+ // MultipleActivePortalEnabled determines if the pgwire portal execution
+ // for certain queries can be paused. If true, portals with read-only SELECT
+ // query without sub/post queries can be executed in interleaving manner, but
+ // with a local execution plan.
+ bool multiple_active_portals_enabled = 100;
///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
diff --git a/pkg/sql/sqltelemetry/pgwire.go b/pkg/sql/sqltelemetry/pgwire.go
index 18a7ef68e867..fd766c88bbba 100644
--- a/pkg/sql/sqltelemetry/pgwire.go
+++ b/pkg/sql/sqltelemetry/pgwire.go
@@ -69,6 +69,20 @@ var CloseRequestCounter = telemetry.GetCounterOnce("pgwire.command.close")
// is made.
var FlushRequestCounter = telemetry.GetCounterOnce("pgwire.command.flush")
-// MultipleActivePortalCounter is to be incremented every time the cluster setting
-// sql.pgwire.multiple_active_portals.enabled is set true.
-var MultipleActivePortalCounter = telemetry.GetCounterOnce("pgwire.multiple_active_portals")
+// StmtsTriedWithPausablePortals is to be incremented every time there's a
+// not-internal statement executed with a pgwire portal and the session variable
+// multiple_active_portals_enabled has been set to true.
+// The statement might not satisfy the restriction for a pausable portal.
+var StmtsTriedWithPausablePortals = telemetry.GetCounterOnce("pgwire.pausable_portal_stmts")
+
+// NotReadOnlyStmtsTriedWithPausablePortals is to be incremented every time
+// there's a not-internal not-read-only statement executed with a pgwire portal
+// and the session variable multiple_active_portals_enabled has been set to true.
+// In this case the execution cannot be paused.
+var NotReadOnlyStmtsTriedWithPausablePortals = telemetry.GetCounterOnce("pgwire.pausable_portal_not_read_only_stmts")
+
+// SubOrPostQueryStmtsTriedWithPausablePortals is to be incremented every time
+// there's a not-internal statement with post or sub queries executed with a
+// pgwire portal and the session variable multiple_active_portals_enabled has
+// been set to true. In this case the execution cannot be paused.
+var SubOrPostQueryStmtsTriedWithPausablePortals = telemetry.GetCounterOnce("pgwire.pausable_portal_stmts_with_sub_or_post_queries")
diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go
index e5560b65ee2c..d86706b6b8a5 100644
--- a/pkg/sql/vars.go
+++ b/pkg/sql/vars.go
@@ -2648,6 +2648,23 @@ var varGen = map[string]sessionVar{
return formatBoolAsPostgresSetting(execinfra.UseStreamerEnabled.Get(sv))
},
},
+
+ // CockroachDB extension.
+ `multiple_active_portals_enabled`: {
+ GetStringVal: makePostgresBoolGetStringValFn(`multiple_active_portals_enabled`),
+ Set: func(_ context.Context, m sessionDataMutator, s string) error {
+ b, err := paramparse.ParseBoolVar("multiple_active_portals_enabled", s)
+ if err != nil {
+ return err
+ }
+ m.SetMultipleActivePortalsEnabled(b)
+ return nil
+ },
+ Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
+ return formatBoolAsPostgresSetting(evalCtx.SessionData().MultipleActivePortalsEnabled), nil
+ },
+ GlobalDefault: displayPgBool(util.ConstantWithMetamorphicTestBool("multiple_active_portals_enabled", false)),
+ },
}
// We want test coverage for this on and off so make it metamorphic.