From fde1a75850c98fcd3355f0e9d082924a91a37471 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Tue, 8 Aug 2023 15:56:26 -0700 Subject: [PATCH] sql: disable pausable portals for all statements with mutations Previously we examined the AST to determine whether a statement could be executed in a pausable portal or not. However, this was insufficient to identify volatile UDFs that could also contain mutations. We also revoked pausable portals for operators for whom the portal type didn't matter, since they would be executed to completion regardless. This PR revokes a portal's pausability if the statement's plan contains a mutation or DDL. That is, if the opt builder determines that any operator is a mutation operator (see `IsMutationOp` and `IsDDLOp`). Epic: None Fixes: #107130 Release note: None --- pkg/sql/conn_executor_exec.go | 41 ++++++++--- pkg/sql/pgwire/testdata/pgtest/portals_crbugs | 68 +++++++++++++++++++ pkg/sql/prepared_stmt.go | 15 ++-- pkg/sql/sem/tree/stmt.go | 24 ------- 4 files changed, 103 insertions(+), 45 deletions(-) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 80d598a1d9c9..ae6b46a0d4ff 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1629,22 +1629,23 @@ func (ex *connExecutor) dispatchToExecutionEngine( } var err error - if ppInfo := getPausablePortalInfo(); ppInfo != nil { - if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete { - err = ex.makeExecPlan(ctx, planner) + + if ppInfo := getPausablePortalInfo(); ppInfo != nil && ppInfo.dispatchToExecutionEngine.cleanup.isComplete { + planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop + } else { + // Prepare the plan. Note, the error is processed below. Everything + // between here and there needs to happen even if there's an error. + err = ex.makeExecPlan(ctx, planner, res) + // The pausable portal may have been revoked when making the execution plan. + if ppInfo = getPausablePortalInfo(); ppInfo != nil { ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{ fName: "close planTop", f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) }, }) } else { - planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop + defer planner.curPlan.close(ctx) } - } else { - // Prepare the plan. Note, the error is processed below. Everything - // between here and there needs to happen even if there's an error. - err = ex.makeExecPlan(ctx, planner) - defer planner.curPlan.close(ctx) } // Include gist in error reports. @@ -2132,8 +2133,11 @@ func (ex *connExecutor) handleTxnRowsWrittenReadLimits(ctx context.Context) erro } // makeExecPlan creates an execution plan and populates planner.curPlan using -// the cost-based optimizer. -func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error { +// the cost-based optimizer. If the planner has a pausable portal, the caller +// should check if it still exists when this function returns. +func (ex *connExecutor) makeExecPlan( + ctx context.Context, planner *planner, res RestrictedCommandResult, +) error { if tree.CanModifySchema(planner.stmt.AST) { if planner.Txn().IsoLevel().ToleratesWriteSkew() { if planner.extendedEvalCtx.TxnIsSingleStmt && planner.extendedEvalCtx.TxnImplicit { @@ -2156,6 +2160,21 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro flags := planner.curPlan.flags + if planner.pausablePortal != nil && (flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) { + telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals) + // We don't allow mutations in a pausable portal. Set it back to an + // un-pausable (normal) portal. + // When pauseInfo is nil, no cleanup function will be added to the stack + // and all clean-up steps will be performed as for normal portals. + planner.pausablePortal.pauseInfo = nil + // We need this so that the result consumption for this portal cannot be + // paused either. + if err := res.RevokePortalPausability(); err != nil { + res.SetError(err) + return nil + } + } + if flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan) { if ex.executorType == executorTypeExec && planner.EvalContext().SessionData().DisallowFullTableScans { hasLargeScan := flags.IsSet(planFlagContainsLargeFullIndexScan) || flags.IsSet(planFlagContainsLargeFullTableScan) diff --git a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs index 8c682a98e330..7928f5b58513 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs +++ b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs @@ -301,4 +301,72 @@ ReadyForQuery {"Type":"ReadyForQuery","TxStatus":"I"} +subtest end + +subtest functions_not_supported + +# We don't support UDFs in pausable portals since we don't allow mutations, and +# UDFs may contain mutations. The following test results in a duplicate key +# violation in postgres. + +send +Query {"String": "SET multiple_active_portals_enabled = true"} +---- + +send +Query {"String": "DROP TABLE IF EXISTS xy;"} +Query {"String": "DROP FUNCTION IF EXISTS f;"} +Query {"String": "DROP FUNCTION IF EXISTS g;"} +Query {"String": "DEALLOCATE ALL;"} +Query {"String": "CREATE TABLE xy (x INT PRIMARY KEY, y INT);"} +Query {"String": "CREATE FUNCTION f() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (1, 1), (2, 2) RETURNING *; $$"} +Query {"String": "CREATE FUNCTION g() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (2, 1), (3, 3) RETURNING *; $$"} +Parse {"Name": "q1", "Query": "SELECT f();"} +Parse {"Name": "q2", "Query": "SELECT g();"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until keepErrMessage +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ErrorResponse +---- +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"(1,1)"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} + subtest end diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index c2a52be43a54..c1a0736f611f 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -208,16 +208,11 @@ func (ex *connExecutor) makePreparedPortal( if ex.sessionData().MultipleActivePortalsEnabled && ex.executorType != executorTypeInternal { telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals) - if tree.IsAllowedToPause(stmt.AST) { - portal.pauseInfo = &portalPauseInfo{} - portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} - portal.portalPausablity = PausablePortal - } else { - telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals) - // We have set the session variable multiple_active_portals_enabled to - // true, but we don't support the underlying query for a pausable portal. - portal.portalPausablity = NotPausablePortalForUnsupportedStmt - } + // We will check whether the statement itself is pausable (i.e., that it + // doesn't contain DDL or mutations) when we build the plan. + portal.pauseInfo = &portalPauseInfo{} + portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} + portal.portalPausablity = PausablePortal } return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) } diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index fd735720f79c..2dadb12d998f 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -143,30 +143,6 @@ type canModifySchema interface { modifiesSchema() bool } -// IsAllowedToPause returns true if the stmt cannot either modify the schema or -// write data. -// This function is to gate the queries allowed for pausable portals. -// TODO(janexing): We should be more accurate about the stmt selection here. -// Now we only allow SELECT, but is it too strict? And how to filter out -// SELECT with data writes / schema changes? -func IsAllowedToPause(stmt Statement) bool { - if stmt != nil && !CanModifySchema(stmt) && !CanWriteData(stmt) { - switch t := stmt.(type) { - case *Select: - if t.With != nil { - ctes := t.With.CTEList - for _, cte := range ctes { - if !IsAllowedToPause(cte.Stmt) { - return false - } - } - } - return true - } - } - return false -} - // CanModifySchema returns true if the statement can modify // the database schema. func CanModifySchema(stmt Statement) bool {