Skip to content

Commit

Permalink
sql: disable pausable portals for all statements with mutations
Browse files Browse the repository at this point in the history
Previously we examined the AST to determine whether a statement could be
executed in a pausable portal or not. However, this was insufficient to
identify volatile UDFs that could also contain mutations.

This PR revokes a portal's pausability if the statement's plan contains
a mutation. That is, if the opt builder determines that any operator is
a mutation operator (see `IsMutationOp`).

Although there is some overlap between this PR and the existing
`IsAllowedToPause`, this PR leaves the latter in place, since it
restricts some statements that are not considered mutation operators,
e.g., import operators.

Epic: None
Fixes: #107130

Release note: None
  • Loading branch information
rharding6373 committed Aug 8, 2023
1 parent d6f6de2 commit aac62fc
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
21 changes: 18 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
var err error
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
err = ex.makeExecPlan(ctx, planner)
err = ex.makeExecPlan(ctx, planner, res)
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
fName: "close planTop",
Expand All @@ -1485,7 +1485,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
} else {
// Prepare the plan. Note, the error is processed below. Everything
// between here and there needs to happen even if there's an error.
err = ex.makeExecPlan(ctx, planner)
err = ex.makeExecPlan(ctx, planner, res)
defer planner.curPlan.close(ctx)
}

Expand Down Expand Up @@ -1978,7 +1978,7 @@ func (ex *connExecutor) handleTxnRowsWrittenReadLimits(ctx context.Context) erro

// makeExecPlan creates an execution plan and populates planner.curPlan using
// the cost-based optimizer.
func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error {
func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner, res RestrictedCommandResult) error {
if tree.CanModifySchema(planner.stmt.AST) {
if planner.Txn().IsoLevel().ToleratesWriteSkew() {
if planner.extendedEvalCtx.TxnIsSingleStmt && planner.extendedEvalCtx.TxnImplicit {
Expand All @@ -2001,6 +2001,21 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro

flags := planner.curPlan.flags

if flags.IsSet(planFlagContainsMutation) && planner.pausablePortal != nil {
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
// We don't allow mutations in a pausable portal. Set it back to an
// un-pausable (normal) portal.
// With pauseInfo is nil, no cleanup function will be added to the stack
// and all clean-up steps will be performed as for normal portals.
planner.pausablePortal.pauseInfo = nil
// We need this so that the result consumption for this portal cannot be
// paused either.
if err := res.RevokePortalPausability(); err != nil {
res.SetError(err)
return nil
}
}

if flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan) {
if ex.executorType == executorTypeExec && planner.EvalContext().SessionData().DisallowFullTableScans {
hasLargeScan := flags.IsSet(planFlagContainsLargeFullIndexScan) || flags.IsSet(planFlagContainsLargeFullTableScan)
Expand Down
68 changes: 68 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/portals_crbugs
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,72 @@ ReadyForQuery
{"Type":"ReadyForQuery","TxStatus":"I"}


subtest end

subtest functions_not_supported

# We don't support UDFs in pausable portals since we don't allow mutations, and
# UDFs may contain mutations. The following test results in a duplicate key
# violation in postgres.

send
Query {"String": "SET multiple_active_portals_enabled = true"}
----

send
Query {"String": "DROP TABLE IF EXISTS xy;"}
Query {"String": "DROP FUNCTION IF EXISTS f;"}
Query {"String": "DROP FUNCTION IF EXISTS g;"}
Query {"String": "DEALLOCATE ALL;"}
Query {"String": "CREATE TABLE xy (x INT PRIMARY KEY, y INT);"}
Query {"String": "CREATE FUNCTION f() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (1, 1), (2, 2) RETURNING *; $$"}
Query {"String": "CREATE FUNCTION g() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (2, 1), (3, 3) RETURNING *; $$"}
Parse {"Name": "q1", "Query": "SELECT f();"}
Parse {"Name": "q2", "Query": "SELECT g();"}
Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
Execute {"Portal": "p1", "MaxRows": 1}
Execute {"Portal": "p2", "MaxRows": 1}
Execute {"Portal": "p1", "MaxRows": 1}
Execute {"Portal": "p2", "MaxRows": 1}
Sync
----

until keepErrMessage
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ErrorResponse
----
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"ParseComplete"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"(1,1)"}]}
{"Type":"PortalSuspended"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}

subtest end

0 comments on commit aac62fc

Please sign in to comment.