diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 80d598a1d9c9..ae6b46a0d4ff 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1629,22 +1629,23 @@ func (ex *connExecutor) dispatchToExecutionEngine( } var err error - if ppInfo := getPausablePortalInfo(); ppInfo != nil { - if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete { - err = ex.makeExecPlan(ctx, planner) + + if ppInfo := getPausablePortalInfo(); ppInfo != nil && ppInfo.dispatchToExecutionEngine.cleanup.isComplete { + planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop + } else { + // Prepare the plan. Note, the error is processed below. Everything + // between here and there needs to happen even if there's an error. + err = ex.makeExecPlan(ctx, planner, res) + // The pausable portal may have been revoked when making the execution plan. + if ppInfo = getPausablePortalInfo(); ppInfo != nil { ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{ fName: "close planTop", f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) }, }) } else { - planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop + defer planner.curPlan.close(ctx) } - } else { - // Prepare the plan. Note, the error is processed below. Everything - // between here and there needs to happen even if there's an error. - err = ex.makeExecPlan(ctx, planner) - defer planner.curPlan.close(ctx) } // Include gist in error reports. @@ -2132,8 +2133,11 @@ func (ex *connExecutor) handleTxnRowsWrittenReadLimits(ctx context.Context) erro } // makeExecPlan creates an execution plan and populates planner.curPlan using -// the cost-based optimizer. -func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error { +// the cost-based optimizer. If the planner has a pausable portal, the caller +// should check if it still exists when this function returns. +func (ex *connExecutor) makeExecPlan( + ctx context.Context, planner *planner, res RestrictedCommandResult, +) error { if tree.CanModifySchema(planner.stmt.AST) { if planner.Txn().IsoLevel().ToleratesWriteSkew() { if planner.extendedEvalCtx.TxnIsSingleStmt && planner.extendedEvalCtx.TxnImplicit { @@ -2156,6 +2160,21 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro flags := planner.curPlan.flags + if planner.pausablePortal != nil && (flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) { + telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals) + // We don't allow mutations in a pausable portal. Set it back to an + // un-pausable (normal) portal. + // When pauseInfo is nil, no cleanup function will be added to the stack + // and all clean-up steps will be performed as for normal portals. + planner.pausablePortal.pauseInfo = nil + // We need this so that the result consumption for this portal cannot be + // paused either. + if err := res.RevokePortalPausability(); err != nil { + res.SetError(err) + return nil + } + } + if flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan) { if ex.executorType == executorTypeExec && planner.EvalContext().SessionData().DisallowFullTableScans { hasLargeScan := flags.IsSet(planFlagContainsLargeFullIndexScan) || flags.IsSet(planFlagContainsLargeFullTableScan) diff --git a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs index 8c682a98e330..7928f5b58513 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs +++ b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs @@ -301,4 +301,72 @@ ReadyForQuery {"Type":"ReadyForQuery","TxStatus":"I"} +subtest end + +subtest functions_not_supported + +# We don't support UDFs in pausable portals since we don't allow mutations, and +# UDFs may contain mutations. The following test results in a duplicate key +# violation in postgres. + +send +Query {"String": "SET multiple_active_portals_enabled = true"} +---- + +send +Query {"String": "DROP TABLE IF EXISTS xy;"} +Query {"String": "DROP FUNCTION IF EXISTS f;"} +Query {"String": "DROP FUNCTION IF EXISTS g;"} +Query {"String": "DEALLOCATE ALL;"} +Query {"String": "CREATE TABLE xy (x INT PRIMARY KEY, y INT);"} +Query {"String": "CREATE FUNCTION f() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (1, 1), (2, 2) RETURNING *; $$"} +Query {"String": "CREATE FUNCTION g() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (2, 1), (3, 3) RETURNING *; $$"} +Parse {"Name": "q1", "Query": "SELECT f();"} +Parse {"Name": "q2", "Query": "SELECT g();"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until keepErrMessage +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ErrorResponse +---- +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"(1,1)"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} + subtest end diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index c2a52be43a54..c1a0736f611f 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -208,16 +208,11 @@ func (ex *connExecutor) makePreparedPortal( if ex.sessionData().MultipleActivePortalsEnabled && ex.executorType != executorTypeInternal { telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals) - if tree.IsAllowedToPause(stmt.AST) { - portal.pauseInfo = &portalPauseInfo{} - portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} - portal.portalPausablity = PausablePortal - } else { - telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals) - // We have set the session variable multiple_active_portals_enabled to - // true, but we don't support the underlying query for a pausable portal. - portal.portalPausablity = NotPausablePortalForUnsupportedStmt - } + // We will check whether the statement itself is pausable (i.e., that it + // doesn't contain DDL or mutations) when we build the plan. + portal.pauseInfo = &portalPauseInfo{} + portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} + portal.portalPausablity = PausablePortal } return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) } diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index fd735720f79c..2dadb12d998f 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -143,30 +143,6 @@ type canModifySchema interface { modifiesSchema() bool } -// IsAllowedToPause returns true if the stmt cannot either modify the schema or -// write data. -// This function is to gate the queries allowed for pausable portals. -// TODO(janexing): We should be more accurate about the stmt selection here. -// Now we only allow SELECT, but is it too strict? And how to filter out -// SELECT with data writes / schema changes? -func IsAllowedToPause(stmt Statement) bool { - if stmt != nil && !CanModifySchema(stmt) && !CanWriteData(stmt) { - switch t := stmt.(type) { - case *Select: - if t.With != nil { - ctes := t.With.CTEList - for _, cte := range ctes { - if !IsAllowedToPause(cte.Stmt) { - return false - } - } - } - return true - } - } - return false -} - // CanModifySchema returns true if the statement can modify // the database schema. func CanModifySchema(stmt Statement) bool {