diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 80d598a1d9c9..49702430baca 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1629,21 +1629,31 @@ func (ex *connExecutor) dispatchToExecutionEngine( } var err error + if ppInfo := getPausablePortalInfo(); ppInfo != nil { if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete { - err = ex.makeExecPlan(ctx, planner) - ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan - ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{ - fName: "close planTop", - f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) }, - }) + err = ex.makeExecPlan(ctx, planner, res) + if flags := planner.curPlan.flags; err == nil && (flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) { + telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals) + // We don't allow mutations in a pausable portal. Set it back to + // an un-pausable (normal) portal. + planner.pausablePortal.pauseInfo = nil + err = res.RevokePortalPausability() + defer planner.curPlan.close(ctx) + } else { + ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan + ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{ + fName: "close planTop", + f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) }, + }) + } } else { planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop } } else { // Prepare the plan. Note, the error is processed below. Everything // between here and there needs to happen even if there's an error. - err = ex.makeExecPlan(ctx, planner) + err = ex.makeExecPlan(ctx, planner, res) defer planner.curPlan.close(ctx) } @@ -1759,6 +1769,8 @@ func (ex *connExecutor) dispatchToExecutionEngine( // un-pausable (normal) portal. // With pauseInfo is nil, no cleanup function will be added to the stack // and all clean-up steps will be performed as for normal portals. + // TODO(harding): We may need to move resetting pauseInfo before we add + // the pausable portal cleanup step above. planner.pausablePortal.pauseInfo = nil // We need this so that the result consumption for this portal cannot be // paused either. @@ -2133,7 +2145,9 @@ func (ex *connExecutor) handleTxnRowsWrittenReadLimits(ctx context.Context) erro // makeExecPlan creates an execution plan and populates planner.curPlan using // the cost-based optimizer. -func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error { +func (ex *connExecutor) makeExecPlan( + ctx context.Context, planner *planner, +) error { if tree.CanModifySchema(planner.stmt.AST) { if planner.Txn().IsoLevel().ToleratesWriteSkew() { if planner.extendedEvalCtx.TxnIsSingleStmt && planner.extendedEvalCtx.TxnImplicit { diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index f29cc8a1fed7..61154160cf19 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -1345,7 +1345,14 @@ func (c *conn) CreateStatementResult( implicitTxn bool, portalPausability sql.PortalPausablity, ) sql.CommandResult { - return c.newCommandResult(descOpt, pos, stmt, formatCodes, conv, location, limit, portalName, implicitTxn, portalPausability) + rowLimit := limit + if tree.ReturnsAtMostOneRow(stmt) { + // When a statement returns at most one row, the result row limit doesn't + // matter. We set it to 0 to fetch all rows, which allows us to clean up + // resources sooner if using a pausable portal. + rowLimit = 0 + } + return c.newCommandResult(descOpt, pos, stmt, formatCodes, conv, location, rowLimit, portalName, implicitTxn, portalPausability) } // CreateSyncResult is part of the sql.ClientComm interface. diff --git a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs index 8c682a98e330..7928f5b58513 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs +++ b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs @@ -301,4 +301,72 @@ ReadyForQuery {"Type":"ReadyForQuery","TxStatus":"I"} +subtest end + +subtest functions_not_supported + +# We don't support UDFs in pausable portals since we don't allow mutations, and +# UDFs may contain mutations. The following test results in a duplicate key +# violation in postgres. + +send +Query {"String": "SET multiple_active_portals_enabled = true"} +---- + +send +Query {"String": "DROP TABLE IF EXISTS xy;"} +Query {"String": "DROP FUNCTION IF EXISTS f;"} +Query {"String": "DROP FUNCTION IF EXISTS g;"} +Query {"String": "DEALLOCATE ALL;"} +Query {"String": "CREATE TABLE xy (x INT PRIMARY KEY, y INT);"} +Query {"String": "CREATE FUNCTION f() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (1, 1), (2, 2) RETURNING *; $$"} +Query {"String": "CREATE FUNCTION g() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (2, 1), (3, 3) RETURNING *; $$"} +Parse {"Name": "q1", "Query": "SELECT f();"} +Parse {"Name": "q2", "Query": "SELECT g();"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until keepErrMessage +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ErrorResponse +---- +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"(1,1)"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} + subtest end diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index c2a52be43a54..c1a0736f611f 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -208,16 +208,11 @@ func (ex *connExecutor) makePreparedPortal( if ex.sessionData().MultipleActivePortalsEnabled && ex.executorType != executorTypeInternal { telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals) - if tree.IsAllowedToPause(stmt.AST) { - portal.pauseInfo = &portalPauseInfo{} - portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} - portal.portalPausablity = PausablePortal - } else { - telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals) - // We have set the session variable multiple_active_portals_enabled to - // true, but we don't support the underlying query for a pausable portal. - portal.portalPausablity = NotPausablePortalForUnsupportedStmt - } + // We will check whether the statement itself is pausable (i.e., that it + // doesn't contain DDL or mutations) when we build the plan. + portal.pauseInfo = &portalPauseInfo{} + portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} + portal.portalPausablity = PausablePortal } return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) } diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index fd735720f79c..c2c88e28c232 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -143,30 +143,6 @@ type canModifySchema interface { modifiesSchema() bool } -// IsAllowedToPause returns true if the stmt cannot either modify the schema or -// write data. -// This function is to gate the queries allowed for pausable portals. -// TODO(janexing): We should be more accurate about the stmt selection here. -// Now we only allow SELECT, but is it too strict? And how to filter out -// SELECT with data writes / schema changes? -func IsAllowedToPause(stmt Statement) bool { - if stmt != nil && !CanModifySchema(stmt) && !CanWriteData(stmt) { - switch t := stmt.(type) { - case *Select: - if t.With != nil { - ctes := t.With.CTEList - for _, cte := range ctes { - if !IsAllowedToPause(cte.Stmt) { - return false - } - } - } - return true - } - } - return false -} - // CanModifySchema returns true if the statement can modify // the database schema. func CanModifySchema(stmt Statement) bool { @@ -203,6 +179,28 @@ func CanWriteData(stmt Statement) bool { return false } +// ReturnsAtMostOneRow returns true if the statement returns either no rows or +// a single row. +// TODO(harding): Expand this list. +func ReturnsAtMostOneRow(stmt Statement) bool { + switch stmt.(type) { + // Import operations. + case *CopyFrom, *Import, *Restore: + return true + // Backup creates a job and allows you to write into userfiles. + case *Backup: + return true + // CockroachDB extensions. + case *Scatter: + return true + // Replication operations. + case *CreateTenantFromReplication, *AlterTenantReplication: + return true + } + return false + +} + // HiddenFromShowQueries is a pseudo-interface to be implemented // by statements that should not show up in SHOW QUERIES (and are hence // not cancellable using CANCEL QUERIES either). Usually implemented by