Skip to content

Commit

Permalink
sql: do not allow pausable portals on statements containing functions
Browse files Browse the repository at this point in the history
We currently do not allow pausable portals when a statement may mutate
data. However, functions may also mutate data. This change adds a
visitor that walks the statements checked by IsAllowedToPause and checks
for function expressions. If there is a function expression anywhere in
the AST, the statement is not allowed to pause.

Users can allow function calls in pausable portals by setting the
session setting `enable_functions_in_portals` to true. This setting is
false by default. When true, the pausable portal behavior is the same as
before this PR.

Epic: None
Informs: cockroachdb#107130

Release note (sql): This change prevents statements containing function
calls from being executed in pausable portals in order to prevent
race conditions if the functions modify the database. This behavior is
controlled by a session setting called `enable_functions_in_portals`,
which is false by default. In order to achieve the old behavior and
allow statments containing functions to use portals, users should use
`SET enable_functions_in_portals = true`.
  • Loading branch information
rharding6373 committed Aug 7, 2023
1 parent 509f8c8 commit c658060
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 21 deletions.
5 changes: 3 additions & 2 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (ex *connExecutor) execBind(
}

// Create the new PreparedPortal.
if err := ex.addPortal(ctx, portalName, ps, qargs, columnFormatCodes); err != nil {
if err := ex.addPortal(ctx, ex.planner.ExtendedEvalContext(), portalName, ps, qargs, columnFormatCodes); err != nil {
return retErr(err)
}

Expand All @@ -530,6 +530,7 @@ func (ex *connExecutor) execBind(
// for anonymous portals).
func (ex *connExecutor) addPortal(
ctx context.Context,
evalCtx *extendedEvalContext,
portalName string,
stmt *PreparedStatement,
qargs tree.QueryArguments,
Expand All @@ -542,7 +543,7 @@ func (ex *connExecutor) addPortal(
panic(errors.AssertionFailedf("portal already exists as cursor: %q", portalName))
}

portal, err := ex.makePreparedPortal(ctx, portalName, stmt, qargs, outFormats)
portal, err := ex.makePreparedPortal(ctx, evalCtx, portalName, stmt, qargs, outFormats)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,8 @@ var (
ErrStmtNotSupportedForPausablePortal = unimplemented.NewWithIssue(
98911,
"the statement for a pausable portal must be a read-only SELECT query"+
" with no sub-queries or post-queries",
" with no sub-queries, post-queries, or function calls (please set "+
"session variable enable_functions_in_portals to true to enable functions)",
)
// ErrLimitedResultClosed is a sentinel error produced by pgwire
// indicating the portal should be closed without error.
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/apd/v3"
apd "github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
Expand Down Expand Up @@ -3331,6 +3331,10 @@ func (m *sessionDataMutator) SetAlterColumnTypeGeneral(val bool) {
m.data.AlterColumnTypeGeneralEnabled = val
}

func (m *sessionDataMutator) SetEnableFunctionsInPortals(val bool) {
m.data.FunctionsInPortalsEnabled = val
}

func (m *sessionDataMutator) SetEnableSuperRegions(val bool) {
m.data.EnableSuperRegions = val
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/sql/pgwire/testdata/pgtest/multiple_active_portals
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
send crdb_only
Query {"String": "SET multiple_active_portals_enabled = true"}
Query {"String": "SET enable_functions_in_portals = true"}
----

until crdb_only ignore=NoticeResponse
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

subtest select_from_individual_resources

Expand Down Expand Up @@ -457,7 +461,7 @@ ReadyForQuery
{"Type":"PortalSuspended"}
{"Type":"DataRow","Values":[{"text":"2"}]}
{"Type":"PortalSuspended"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}
{"Type":"ReadyForQuery","TxStatus":"E"}

send crdb_only
Expand Down Expand Up @@ -504,7 +508,7 @@ ReadyForQuery
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"f"}]}
{"Type":"PortalSuspended"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}
{"Type":"ReadyForQuery","TxStatus":"E"}

send crdb_only
Expand Down Expand Up @@ -553,7 +557,7 @@ ReadyForQuery
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"10"}]}
{"Type":"PortalSuspended"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}
{"Type":"ReadyForQuery","TxStatus":"E"}

send crdb_only
Expand Down Expand Up @@ -602,7 +606,7 @@ ReadyForQuery
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}
{"Type":"ReadyForQuery","TxStatus":"E"}

send crdb_only
Expand Down Expand Up @@ -946,7 +950,7 @@ ReadyForQuery
{"Type":"PortalSuspended"}
{"Type":"DataRow","Values":[{"text":"10"}]}
{"Type":"PortalSuspended"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}
{"Type":"ReadyForQuery","TxStatus":"E"}

send
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/pgwire/testdata/pgtest/portals
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ ReadyForQuery

send
Query {"String": "BEGIN"}
Parse {"Query": "SELECT * FROM generate_series(1, 2)"}
Parse {"Query": "SELECT * FROM (VALUES (1), (2)) AS foo"}
Bind
Execute {"MaxRows": 1}
Sync
Expand Down Expand Up @@ -180,7 +180,7 @@ ReadyForQuery

send
Query {"String": "BEGIN"}
Parse {"Query": "SELECT * FROM generate_series(1, 2)"}
Parse {"Query": "SELECT * FROM (VALUES (1), (2)) AS foo"}
Bind
Execute {"MaxRows": 1}
Flush
Expand Down Expand Up @@ -255,7 +255,7 @@ ReadyForQuery

send
Query {"String": "BEGIN"}
Parse {"Query": "SELECT * FROM generate_series(1, 4)"}
Parse {"Query": "SELECT * FROM (VALUES (1), (2), (3), (4)) AS foo"}
Bind
Execute {"MaxRows": 1}
Sync
Expand Down Expand Up @@ -314,7 +314,7 @@ ReadyForQuery
# on the second.

send
Parse {"Query": "SELECT * FROM generate_series(1, 2)"}
Parse {"Query": "SELECT * FROM (VALUES (1), (2)) AS foo"}
Bind
Execute {"MaxRows": 1}
Sync
Expand Down Expand Up @@ -359,7 +359,7 @@ ReadyForQuery

send
Query {"String": "BEGIN"}
Parse {"Query": "SELECT * FROM generate_series(1, 2)"}
Parse {"Query": "SELECT * FROM (VALUES (1), (2)) AS foo"}
Bind
Execute {"MaxRows": 1}
Sync
Expand Down Expand Up @@ -942,7 +942,7 @@ ReadyForQuery
# 49 = ASCII '1'
# ParameterFormatCodes = [0] for text format
send
Parse {"Name": "s10", "Query": "select n::int4 from generate_series(0,$1::int8) n", "ParameterOIDs": [20]}
Parse {"Name": "s10", "Query": "select n::int4 from (VALUES (0), (1), (2), (3), (4), (5), ($1)) AS v(n)", "ParameterOIDs": [20]}
Describe {"ObjectType": "S", "Name": "s10"}
Bind {"PreparedStatement": "s10", "ParameterFormatCodes": [0], "ResultFormatCodes": [0], "Parameters": [{"text":"1"}]}
Execute {"MaxRows": 1}
Expand All @@ -962,7 +962,7 @@ ReadyForQuery

send
Parse {"Name": "c4", "Query": "COMMIT"}
Parse {"Name": "s11", "Query": "select n::int4 from generate_series(0,1) n"}
Parse {"Name": "s11", "Query": "select n::int4 from (VALUES (0), (1)) AS v(n)"}
Bind {"DestinationPortal": "por", "PreparedStatement": "s11"}
Bind {"DestinationPortal": "pc4", "PreparedStatement": "c4"}
Execute {"Portal": "por"}
Expand Down
69 changes: 69 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/portals_crbugs
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,72 @@ ReadyForQuery


subtest end


subtest functions_not_supported

# We don't support UDFs in pausable portals since we don't allow mutations, and
# UDFs may contain mutations. The following test results in a duplicate key
# violation in postgres.

send
Query {"String": "SET multiple_active_portals_enabled = true"}
----

send
Query {"String": "DROP TABLE IF EXISTS xy;"}
Query {"String": "DROP FUNCTION IF EXISTS f;"}
Query {"String": "DROP FUNCTION IF EXISTS g;"}
Query {"String": "DEALLOCATE ALL;"}
Query {"String": "CREATE TABLE xy (x INT PRIMARY KEY, y INT);"}
Query {"String": "CREATE FUNCTION f() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (1, 1), (2, 2) RETURNING *; $$"}
Query {"String": "CREATE FUNCTION g() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (2, 1), (3, 3) RETURNING *; $$"}
Parse {"Name": "q1", "Query": "SELECT f();"}
Parse {"Name": "q2", "Query": "SELECT g();"}
Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"}
Execute {"Portal": "p1", "MaxRows": 1}
Execute {"Portal": "p2", "MaxRows": 1}
Execute {"Portal": "p1", "MaxRows": 1}
Execute {"Portal": "p2", "MaxRows": 1}
Sync
----

until keepErrMessage
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
ErrorResponse
----
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"ParseComplete"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"(1,1)"}]}
{"Type":"PortalSuspended"}
{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"}

subtest end
3 changes: 2 additions & 1 deletion pkg/sql/prepared_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ type PreparedPortal struct {
// accountForCopy() doesn't need to be called on the prepared statement.
func (ex *connExecutor) makePreparedPortal(
ctx context.Context,
evalCtx *extendedEvalContext,
name string,
stmt *PreparedStatement,
qargs tree.QueryArguments,
Expand All @@ -208,7 +209,7 @@ func (ex *connExecutor) makePreparedPortal(

if ex.sessionData().MultipleActivePortalsEnabled && ex.executorType != executorTypeInternal {
telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals)
if tree.IsAllowedToPause(stmt.AST) {
if tree.IsAllowedToPause(stmt.AST, evalCtx.SessionData().FunctionsInPortalsEnabled) {
portal.pauseInfo = &portalPauseInfo{}
portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{}
portal.portalPausablity = PausablePortal
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/tree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ go_test(
"parse_tuple_test.go",
"placeholders_test.go",
"pretty_test.go",
"stmt_test.go",
"table_name_test.go",
"time_test.go",
"txn_test.go",
Expand Down
30 changes: 28 additions & 2 deletions pkg/sql/sem/tree/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,46 @@ type canModifySchema interface {
modifiesSchema() bool
}

type checkPausableVisitor struct {
isNotPausable bool
}

// VisitPre implements the Visitor interface.
func (v *checkPausableVisitor) VisitPre(expr Expr) (recurse bool, newExpr Expr) {
switch expr.(type) {
case *FuncExpr:
v.isNotPausable = true
return false, expr
}
return true, expr
}

// VisitPost implements the Visitor interface.
func (v *checkPausableVisitor) VisitPost(expr Expr) Expr {
return expr
}

// IsAllowedToPause returns true if the stmt cannot either modify the schema or
// write data.
// This function is to gate the queries allowed for pausable portals.
// TODO(janexing): We should be more accurate about the stmt selection here.
// Now we only allow SELECT, but is it too strict? And how to filter out
// SELECT with data writes / schema changes?
func IsAllowedToPause(stmt Statement) bool {
func IsAllowedToPause(stmt Statement, functionsInPortalsEnabled bool) bool {
if stmt != nil && !CanModifySchema(stmt) && !CanWriteData(stmt) {
if !functionsInPortalsEnabled {
v := checkPausableVisitor{}
walkStmt(&v, stmt)
if v.isNotPausable {
return false
}
}
switch t := stmt.(type) {
case *Select:
if t.With != nil {
ctes := t.With.CTEList
for _, cte := range ctes {
if !IsAllowedToPause(cte.Stmt) {
if !IsAllowedToPause(cte.Stmt, functionsInPortalsEnabled) {
return false
}
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/sql/sem/tree/stmt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tree_test

import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

var pmap = map[bool]string{
true: "pausable",
false: "unpausable",
}

func TestIsAllowedToPause(t *testing.T) {
testData := []struct {
qry string
pausable bool
}{
{"SELECT 1;", true},
{"WITH cte AS (SELECT * FROM t) SELECT * FROM cte;", true},
{"INSERT INTO t VALUES (1);", false},
{"WITH cte AS (INSERT INTO t VALUES (1) RETURNING *) SELECT * FROM cte;", false},
{"WITH cte AS (SELECT * FROM t) UPDATE t SET v = cte.v FROM cte WHERE t.a = cte.a;", false},
{"SELECT f();", false},
{"WITH cte AS (SELECT f()) SELECT * FROM cte;", false},
{"SELECT * FROM (VALUES (f()));", false},
}

for i, td := range testData {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
stmts, err := parser.Parse(td.qry)
if err != nil {
t.Fatal(err)
}
if len(stmts) != 1 {
t.Fatalf("Expected 1 parsed statement, got %d.", len(stmts))
}
if pausable := tree.IsAllowedToPause(stmts[0].AST); pausable != td.pausable {
t.Errorf("Expected statement \"%s\" to be %s, got %s", td.qry, pmap[td.pausable], pmap[pausable])
}
})
}

}
Loading

0 comments on commit c658060

Please sign in to comment.