From 1a193fc700a8a9480b7d1a13cae3facca3242d46 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Tue, 8 Aug 2023 15:56:26 -0700 Subject: [PATCH] sql: disable pausable portals for all statements with mutations Previously we examined the AST to determine whether a statement could be executed in a pausable portal or not. However, this was insufficient to identify volatile UDFs that could also contain mutations. We also revoked pausable portals for operators for whom the portal type didn't matter, since they would be executed to completion regardless. This PR revokes a portal's pausability if the statement's plan contains a mutation or DDL. That is, if the opt builder determines that any operator is a mutation operator (see `IsMutationOp` and `IsDDLOp`). Epic: None Fixes: #107130 Release note: None --- pkg/sql/conn_executor_exec.go | 22 ++++-- pkg/sql/pgwire/conn.go | 9 ++- pkg/sql/pgwire/testdata/pgtest/portals_crbugs | 68 +++++++++++++++++++ pkg/sql/prepared_stmt.go | 15 ++-- pkg/sql/sem/tree/stmt.go | 46 ++++++------- 5 files changed, 120 insertions(+), 40 deletions(-) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 80d598a1d9c9..2d892702eb83 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1629,14 +1629,24 @@ func (ex *connExecutor) dispatchToExecutionEngine( } var err error + if ppInfo := getPausablePortalInfo(); ppInfo != nil { if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete { err = ex.makeExecPlan(ctx, planner) - ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan - ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{ - fName: "close planTop", - f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) }, - }) + if flags := planner.curPlan.flags; err == nil && (flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) { + telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals) + // We don't allow mutations in a pausable portal. Set it back to + // an un-pausable (normal) portal. + planner.pausablePortal.pauseInfo = nil + err = res.RevokePortalPausability() + defer planner.curPlan.close(ctx) + } else { + ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan + ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{ + fName: "close planTop", + f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) }, + }) + } } else { planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop } @@ -1759,6 +1769,8 @@ func (ex *connExecutor) dispatchToExecutionEngine( // un-pausable (normal) portal. // With pauseInfo is nil, no cleanup function will be added to the stack // and all clean-up steps will be performed as for normal portals. + // TODO(harding): We may need to move resetting pauseInfo before we add + // the pausable portal cleanup step above. planner.pausablePortal.pauseInfo = nil // We need this so that the result consumption for this portal cannot be // paused either. diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index f29cc8a1fed7..61154160cf19 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -1345,7 +1345,14 @@ func (c *conn) CreateStatementResult( implicitTxn bool, portalPausability sql.PortalPausablity, ) sql.CommandResult { - return c.newCommandResult(descOpt, pos, stmt, formatCodes, conv, location, limit, portalName, implicitTxn, portalPausability) + rowLimit := limit + if tree.ReturnsAtMostOneRow(stmt) { + // When a statement returns at most one row, the result row limit doesn't + // matter. We set it to 0 to fetch all rows, which allows us to clean up + // resources sooner if using a pausable portal. + rowLimit = 0 + } + return c.newCommandResult(descOpt, pos, stmt, formatCodes, conv, location, rowLimit, portalName, implicitTxn, portalPausability) } // CreateSyncResult is part of the sql.ClientComm interface. diff --git a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs index 8c682a98e330..7928f5b58513 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs +++ b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs @@ -301,4 +301,72 @@ ReadyForQuery {"Type":"ReadyForQuery","TxStatus":"I"} +subtest end + +subtest functions_not_supported + +# We don't support UDFs in pausable portals since we don't allow mutations, and +# UDFs may contain mutations. The following test results in a duplicate key +# violation in postgres. + +send +Query {"String": "SET multiple_active_portals_enabled = true"} +---- + +send +Query {"String": "DROP TABLE IF EXISTS xy;"} +Query {"String": "DROP FUNCTION IF EXISTS f;"} +Query {"String": "DROP FUNCTION IF EXISTS g;"} +Query {"String": "DEALLOCATE ALL;"} +Query {"String": "CREATE TABLE xy (x INT PRIMARY KEY, y INT);"} +Query {"String": "CREATE FUNCTION f() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (1, 1), (2, 2) RETURNING *; $$"} +Query {"String": "CREATE FUNCTION g() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (2, 1), (3, 3) RETURNING *; $$"} +Parse {"Name": "q1", "Query": "SELECT f();"} +Parse {"Name": "q2", "Query": "SELECT g();"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until keepErrMessage +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ErrorResponse +---- +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"(1,1)"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} + subtest end diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index c2a52be43a54..c1a0736f611f 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -208,16 +208,11 @@ func (ex *connExecutor) makePreparedPortal( if ex.sessionData().MultipleActivePortalsEnabled && ex.executorType != executorTypeInternal { telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals) - if tree.IsAllowedToPause(stmt.AST) { - portal.pauseInfo = &portalPauseInfo{} - portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} - portal.portalPausablity = PausablePortal - } else { - telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals) - // We have set the session variable multiple_active_portals_enabled to - // true, but we don't support the underlying query for a pausable portal. - portal.portalPausablity = NotPausablePortalForUnsupportedStmt - } + // We will check whether the statement itself is pausable (i.e., that it + // doesn't contain DDL or mutations) when we build the plan. + portal.pauseInfo = &portalPauseInfo{} + portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} + portal.portalPausablity = PausablePortal } return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) } diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index fd735720f79c..c2c88e28c232 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -143,30 +143,6 @@ type canModifySchema interface { modifiesSchema() bool } -// IsAllowedToPause returns true if the stmt cannot either modify the schema or -// write data. -// This function is to gate the queries allowed for pausable portals. -// TODO(janexing): We should be more accurate about the stmt selection here. -// Now we only allow SELECT, but is it too strict? And how to filter out -// SELECT with data writes / schema changes? -func IsAllowedToPause(stmt Statement) bool { - if stmt != nil && !CanModifySchema(stmt) && !CanWriteData(stmt) { - switch t := stmt.(type) { - case *Select: - if t.With != nil { - ctes := t.With.CTEList - for _, cte := range ctes { - if !IsAllowedToPause(cte.Stmt) { - return false - } - } - } - return true - } - } - return false -} - // CanModifySchema returns true if the statement can modify // the database schema. func CanModifySchema(stmt Statement) bool { @@ -203,6 +179,28 @@ func CanWriteData(stmt Statement) bool { return false } +// ReturnsAtMostOneRow returns true if the statement returns either no rows or +// a single row. +// TODO(harding): Expand this list. +func ReturnsAtMostOneRow(stmt Statement) bool { + switch stmt.(type) { + // Import operations. + case *CopyFrom, *Import, *Restore: + return true + // Backup creates a job and allows you to write into userfiles. + case *Backup: + return true + // CockroachDB extensions. + case *Scatter: + return true + // Replication operations. + case *CreateTenantFromReplication, *AlterTenantReplication: + return true + } + return false + +} + // HiddenFromShowQueries is a pseudo-interface to be implemented // by statements that should not show up in SHOW QUERIES (and are hence // not cancellable using CANCEL QUERIES either). Usually implemented by