Skip to content

Commit

Permalink
sql: disable pausable portals for all statements with mutations
Browse files Browse the repository at this point in the history
Previously we examined the AST to determine whether a statement could be
executed in a pausable portal or not. However, this was insufficient to
identify volatile UDFs that could also contain mutations. We also
revoked pausable portals for operators for whom the portal type didn't
matter, since they would be executed to completion regardless.

This PR revokes a portal's pausability if the statement's plan contains
a mutation or DDL. That is, if the opt builder determines that any
operator is a mutation operator (see `IsMutationOp` and `IsDDLOp`).

Epic: None
Fixes: cockroachdb#107130

Release note: None
  • Loading branch information
rharding6373 committed Aug 21, 2023
1 parent eb78af2 commit fde1a75
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 45 deletions.
41 changes: 30 additions & 11 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,22 +1629,23 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}

var err error
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
err = ex.makeExecPlan(ctx, planner)

if ppInfo := getPausablePortalInfo(); ppInfo != nil && ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop
} else {
// Prepare the plan. Note, the error is processed below. Everything
// between here and there needs to happen even if there's an error.
err = ex.makeExecPlan(ctx, planner, res)
// The pausable portal may have been revoked when making the execution plan.
if ppInfo = getPausablePortalInfo(); ppInfo != nil {
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
fName: "close planTop",
f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) },
})
} else {
planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop
defer planner.curPlan.close(ctx)
}
} else {
// Prepare the plan. Note, the error is processed below. Everything
// between here and there needs to happen even if there's an error.
err = ex.makeExecPlan(ctx, planner)
defer planner.curPlan.close(ctx)
}

// Include gist in error reports.
Expand Down Expand Up @@ -2132,8 +2133,11 @@ func (ex *connExecutor) handleTxnRowsWrittenReadLimits(ctx context.Context) erro
}

// makeExecPlan creates an execution plan and populates planner.curPlan using
// the cost-based optimizer.
func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error {
// the cost-based optimizer. If the planner has a pausable portal, the caller
// should check if it still exists when this function returns.
func (ex *connExecutor) makeExecPlan(
ctx context.Context, planner *planner, res RestrictedCommandResult,
) error {
if tree.CanModifySchema(planner.stmt.AST) {
if planner.Txn().IsoLevel().ToleratesWriteSkew() {
if planner.extendedEvalCtx.TxnIsSingleStmt && planner.extendedEvalCtx.TxnImplicit {
Expand All @@ -2156,6 +2160,21 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro

flags := planner.curPlan.flags

if planner.pausablePortal != nil && (flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) {
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
// We don't allow mutations in a pausable portal. Set it back to an
// un-pausable (normal) portal.
// When pauseInfo is nil, no cleanup function will be added to the stack
// and all clean-up steps will be performed as for normal portals.
planner.pausablePortal.pauseInfo = nil
// We need this so that the result consumption for this portal cannot be
// paused either.
if err := res.RevokePortalPausability(); err != nil {
res.SetError(err)
return nil
}
}

if flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan) {
if ex.executorType == executorTypeExec && planner.EvalContext().SessionData().DisallowFullTableScans {
hasLargeScan := flags.IsSet(planFlagContainsLargeFullIndexScan) || flags.IsSet(planFlagContainsLargeFullTableScan)
Expand Down
68 changes: 68 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/portals_crbugs
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,72 @@ ReadyForQuery
{"Type":"ReadyForQuery","TxStatus":"I"}


subtest end

subtest functions_not_supported

# We don't support UDFs in pausable portals since we don't allow mutations, and
# UDFs may contain mutations. The following test results in a duplicate key
# violation in postgres.

send
Query {"String": "SET multiple_active_portals_enabled = true"}
----

send
Query {"String": "DROP TABLE IF EXISTS xy;"}
Query {"String": "DROP FUNCTION IF EXISTS f;"}
Query {"String": "DROP FUNCTION IF EXISTS g;"}
Query {"String": "DEALLOCATE ALL;"}
Query {"String": "CREATE TABLE xy (x INT PRIMARY KEY, y INT);"}
Query {"String": "CREATE FUNCTION f() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (1, 1), (2, 2) RETURNING *; $$"}
Query {"String": "CREATE FUNCTION g() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (2, 1), (3, 3) RETURNING *; $$"}
Parse {"Name": "q1", "Query": "SELECT f();"}
Parse {"Name": "q2", "Query": "SELECT g();"}
Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
Execute {"Portal": "p1", "MaxRows": 1}
Execute {"Portal": "p2", "MaxRows": 1}
Execute {"Portal": "p1", "MaxRows": 1}
Execute {"Portal": "p2", "MaxRows": 1}
Sync
----

until keepErrMessage
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ErrorResponse
----
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"ParseComplete"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"(1,1)"}]}
{"Type":"PortalSuspended"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}

subtest end
15 changes: 5 additions & 10 deletions pkg/sql/prepared_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,11 @@ func (ex *connExecutor) makePreparedPortal(

if ex.sessionData().MultipleActivePortalsEnabled && ex.executorType != executorTypeInternal {
telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals)
if tree.IsAllowedToPause(stmt.AST) {
portal.pauseInfo = &portalPauseInfo{}
portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{}
portal.portalPausablity = PausablePortal
} else {
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
// We have set the session variable multiple_active_portals_enabled to
// true, but we don't support the underlying query for a pausable portal.
portal.portalPausablity = NotPausablePortalForUnsupportedStmt
}
// We will check whether the statement itself is pausable (i.e., that it
// doesn't contain DDL or mutations) when we build the plan.
portal.pauseInfo = &portalPauseInfo{}
portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{}
portal.portalPausablity = PausablePortal
}
return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
}
Expand Down
24 changes: 0 additions & 24 deletions pkg/sql/sem/tree/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,30 +143,6 @@ type canModifySchema interface {
modifiesSchema() bool
}

// IsAllowedToPause returns true if the stmt cannot either modify the schema or
// write data.
// This function is to gate the queries allowed for pausable portals.
// TODO(janexing): We should be more accurate about the stmt selection here.
// Now we only allow SELECT, but is it too strict? And how to filter out
// SELECT with data writes / schema changes?
func IsAllowedToPause(stmt Statement) bool {
if stmt != nil && !CanModifySchema(stmt) && !CanWriteData(stmt) {
switch t := stmt.(type) {
case *Select:
if t.With != nil {
ctes := t.With.CTEList
for _, cte := range ctes {
if !IsAllowedToPause(cte.Stmt) {
return false
}
}
}
return true
}
}
return false
}

// CanModifySchema returns true if the statement can modify
// the database schema.
func CanModifySchema(stmt Statement) bool {
Expand Down

0 comments on commit fde1a75

Please sign in to comment.