diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 97afdc251c4c..8086d12fcb71 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1473,7 +1473,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( var err error if ppInfo := getPausablePortalInfo(); ppInfo != nil { if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete { - err = ex.makeExecPlan(ctx, planner) + err = ex.makeExecPlan(ctx, planner, res) ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{ fName: "close planTop", @@ -1485,7 +1485,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( } else { // Prepare the plan. Note, the error is processed below. Everything // between here and there needs to happen even if there's an error. - err = ex.makeExecPlan(ctx, planner) + err = ex.makeExecPlan(ctx, planner, res) defer planner.curPlan.close(ctx) } @@ -1978,7 +1978,7 @@ func (ex *connExecutor) handleTxnRowsWrittenReadLimits(ctx context.Context) erro // makeExecPlan creates an execution plan and populates planner.curPlan using // the cost-based optimizer. -func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error { +func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner, res RestrictedCommandResult) error { if tree.CanModifySchema(planner.stmt.AST) { if planner.Txn().IsoLevel().ToleratesWriteSkew() { if planner.extendedEvalCtx.TxnIsSingleStmt && planner.extendedEvalCtx.TxnImplicit { @@ -2001,6 +2001,21 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro flags := planner.curPlan.flags + if flags.IsSet(planFlagContainsMutation) && planner.pausablePortal != nil { + telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals) + // We don't allow mutations in a pausable portal. Set it back to an + // un-pausable (normal) portal. + // With pauseInfo is nil, no cleanup function will be added to the stack + // and all clean-up steps will be performed as for normal portals. + planner.pausablePortal.pauseInfo = nil + // We need this so that the result consumption for this portal cannot be + // paused either. + if err := res.RevokePortalPausability(); err != nil { + res.SetError(err) + return nil + } + } + if flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan) { if ex.executorType == executorTypeExec && planner.EvalContext().SessionData().DisallowFullTableScans { hasLargeScan := flags.IsSet(planFlagContainsLargeFullIndexScan) || flags.IsSet(planFlagContainsLargeFullTableScan) diff --git a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs index 8c682a98e330..7928f5b58513 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs +++ b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs @@ -301,4 +301,72 @@ ReadyForQuery {"Type":"ReadyForQuery","TxStatus":"I"} +subtest end + +subtest functions_not_supported + +# We don't support UDFs in pausable portals since we don't allow mutations, and +# UDFs may contain mutations. The following test results in a duplicate key +# violation in postgres. + +send +Query {"String": "SET multiple_active_portals_enabled = true"} +---- + +send +Query {"String": "DROP TABLE IF EXISTS xy;"} +Query {"String": "DROP FUNCTION IF EXISTS f;"} +Query {"String": "DROP FUNCTION IF EXISTS g;"} +Query {"String": "DEALLOCATE ALL;"} +Query {"String": "CREATE TABLE xy (x INT PRIMARY KEY, y INT);"} +Query {"String": "CREATE FUNCTION f() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (1, 1), (2, 2) RETURNING *; $$"} +Query {"String": "CREATE FUNCTION g() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (2, 1), (3, 3) RETURNING *; $$"} +Parse {"Name": "q1", "Query": "SELECT f();"} +Parse {"Name": "q2", "Query": "SELECT g();"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until keepErrMessage +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ErrorResponse +---- +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"(1,1)"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} + subtest end