Skip to content

Commit

Permalink
sql: disable pausable portals for all statements with mutations
Browse files Browse the repository at this point in the history
Previously we examined the AST to determine whether a statement could be
executed in a pausable portal or not. However, this was insufficient to
identify volatile UDFs that could also contain mutations. We also
revoked pausable portals for operators for whom the portal type didn't
matter, since they would be executed to completion regardless.

This PR revokes a portal's pausability if the statement's plan contains
a mutation or DDL. That is, if the opt builder determines that any
operator is a mutation operator (see `IsMutationOp` and `IsDDLOp`).

Epic: None
Fixes: cockroachdb#107130

Release note: None
  • Loading branch information
rharding6373 committed Aug 22, 2023
1 parent eb78af2 commit 814b494
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 44 deletions.
37 changes: 28 additions & 9 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,21 +1629,35 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}

var err error

if ppInfo := getPausablePortalInfo(); ppInfo != nil {
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
err = ex.makeExecPlan(ctx, planner)
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
fName: "close planTop",
f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) },
})
err = ex.makeExecPlan(ctx, planner, res)
if flags := planner.curPlan.flags; err == nil && (flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) {
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
// We don't allow mutations in a pausable portal. Set it back to
// an un-pausable (normal) portal.
planner.pausablePortal.pauseInfo = nil
if err := res.RevokePortalPausability(); err != nil {
res.SetError(err)
return nil
}
err = res.RevokePortalPausability()
defer planner.curPlan.close(ctx)
} else {
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
fName: "close planTop",
f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) },
})
}
} else {
planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop
}
} else {
// Prepare the plan. Note, the error is processed below. Everything
// between here and there needs to happen even if there's an error.
err = ex.makeExecPlan(ctx, planner)
err = ex.makeExecPlan(ctx, planner, res)
defer planner.curPlan.close(ctx)
}

Expand Down Expand Up @@ -1759,6 +1773,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(
// un-pausable (normal) portal.
// With pauseInfo is nil, no cleanup function will be added to the stack
// and all clean-up steps will be performed as for normal portals.
// TODO(harding): We may need to move resetting pauseInfo before we add
// the pausable portal cleanup step above.
planner.pausablePortal.pauseInfo = nil
// We need this so that the result consumption for this portal cannot be
// paused either.
Expand Down Expand Up @@ -2132,8 +2148,11 @@ func (ex *connExecutor) handleTxnRowsWrittenReadLimits(ctx context.Context) erro
}

// makeExecPlan creates an execution plan and populates planner.curPlan using
// the cost-based optimizer.
func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error {
// the cost-based optimizer. If the planner has a pausable portal, the caller
// should check if it still exists when this function returns.
func (ex *connExecutor) makeExecPlan(
ctx context.Context, planner *planner, res RestrictedCommandResult,
) error {
if tree.CanModifySchema(planner.stmt.AST) {
if planner.Txn().IsoLevel().ToleratesWriteSkew() {
if planner.extendedEvalCtx.TxnIsSingleStmt && planner.extendedEvalCtx.TxnImplicit {
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,14 @@ func (c *conn) CreateStatementResult(
implicitTxn bool,
portalPausability sql.PortalPausablity,
) sql.CommandResult {
return c.newCommandResult(descOpt, pos, stmt, formatCodes, conv, location, limit, portalName, implicitTxn, portalPausability)
rowLimit := limit
if tree.ReturnsAtMostOneRow(stmt) {
// When a statement returns at most one row, the result row limit doesn't
// matter. We set it to 0 to fetch all rows, which allows us to clean up
// resources sooner if using a pausable portal.
rowLimit = 0
}
return c.newCommandResult(descOpt, pos, stmt, formatCodes, conv, location, rowLimit, portalName, implicitTxn, portalPausability)
}

// CreateSyncResult is part of the sql.ClientComm interface.
Expand Down
68 changes: 68 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/portals_crbugs
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,72 @@ ReadyForQuery
{"Type":"ReadyForQuery","TxStatus":"I"}


subtest end

subtest functions_not_supported

# We don't support UDFs in pausable portals since we don't allow mutations, and
# UDFs may contain mutations. The following test results in a duplicate key
# violation in postgres.

send
Query {"String": "SET multiple_active_portals_enabled = true"}
----

send
Query {"String": "DROP TABLE IF EXISTS xy;"}
Query {"String": "DROP FUNCTION IF EXISTS f;"}
Query {"String": "DROP FUNCTION IF EXISTS g;"}
Query {"String": "DEALLOCATE ALL;"}
Query {"String": "CREATE TABLE xy (x INT PRIMARY KEY, y INT);"}
Query {"String": "CREATE FUNCTION f() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (1, 1), (2, 2) RETURNING *; $$"}
Query {"String": "CREATE FUNCTION g() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (2, 1), (3, 3) RETURNING *; $$"}
Parse {"Name": "q1", "Query": "SELECT f();"}
Parse {"Name": "q2", "Query": "SELECT g();"}
Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
Execute {"Portal": "p1", "MaxRows": 1}
Execute {"Portal": "p2", "MaxRows": 1}
Execute {"Portal": "p1", "MaxRows": 1}
Execute {"Portal": "p2", "MaxRows": 1}
Sync
----

until keepErrMessage
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ErrorResponse
----
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"ParseComplete"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"(1,1)"}]}
{"Type":"PortalSuspended"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}

subtest end
15 changes: 5 additions & 10 deletions pkg/sql/prepared_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,11 @@ func (ex *connExecutor) makePreparedPortal(

if ex.sessionData().MultipleActivePortalsEnabled && ex.executorType != executorTypeInternal {
telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals)
if tree.IsAllowedToPause(stmt.AST) {
portal.pauseInfo = &portalPauseInfo{}
portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{}
portal.portalPausablity = PausablePortal
} else {
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
// We have set the session variable multiple_active_portals_enabled to
// true, but we don't support the underlying query for a pausable portal.
portal.portalPausablity = NotPausablePortalForUnsupportedStmt
}
// We will check whether the statement itself is pausable (i.e., that it
// doesn't contain DDL or mutations) when we build the plan.
portal.pauseInfo = &portalPauseInfo{}
portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{}
portal.portalPausablity = PausablePortal
}
return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
}
Expand Down
46 changes: 22 additions & 24 deletions pkg/sql/sem/tree/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,30 +143,6 @@ type canModifySchema interface {
modifiesSchema() bool
}

// IsAllowedToPause returns true if the stmt cannot either modify the schema or
// write data.
// This function is to gate the queries allowed for pausable portals.
// TODO(janexing): We should be more accurate about the stmt selection here.
// Now we only allow SELECT, but is it too strict? And how to filter out
// SELECT with data writes / schema changes?
func IsAllowedToPause(stmt Statement) bool {
if stmt != nil && !CanModifySchema(stmt) && !CanWriteData(stmt) {
switch t := stmt.(type) {
case *Select:
if t.With != nil {
ctes := t.With.CTEList
for _, cte := range ctes {
if !IsAllowedToPause(cte.Stmt) {
return false
}
}
}
return true
}
}
return false
}

// CanModifySchema returns true if the statement can modify
// the database schema.
func CanModifySchema(stmt Statement) bool {
Expand Down Expand Up @@ -203,6 +179,28 @@ func CanWriteData(stmt Statement) bool {
return false
}

// ReturnsAtMostOneRow returns true if the statement returns either no rows or
// a single row.
// TODO(harding): Expand this list.
func ReturnsAtMostOneRow(stmt Statement) bool {
switch stmt.(type) {
// Import operations.
case *CopyFrom, *Import, *Restore:
return true
// Backup creates a job and allows you to write into userfiles.
case *Backup:
return true
// CockroachDB extensions.
case *Scatter:
return true
// Replication operations.
case *CreateTenantFromReplication, *AlterTenantReplication:
return true
}
return false

}

// HiddenFromShowQueries is a pseudo-interface to be implemented
// by statements that should not show up in SHOW QUERIES (and are hence
// not cancellable using CANCEL QUERIES either). Usually implemented by
Expand Down

0 comments on commit 814b494

Please sign in to comment.