diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index d7d8d4947f46..c2de384ea883 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -512,7 +512,7 @@ func (ex *connExecutor) execBind( } // Create the new PreparedPortal. - if err := ex.addPortal(ctx, portalName, ps, qargs, columnFormatCodes); err != nil { + if err := ex.addPortal(ctx, ex.planner.ExtendedEvalContext(), portalName, ps, qargs, columnFormatCodes); err != nil { return retErr(err) } @@ -530,6 +530,7 @@ func (ex *connExecutor) execBind( // for anonymous portals). func (ex *connExecutor) addPortal( ctx context.Context, + evalCtx *extendedEvalContext, portalName string, stmt *PreparedStatement, qargs tree.QueryArguments, @@ -542,7 +543,7 @@ func (ex *connExecutor) addPortal( panic(errors.AssertionFailedf("portal already exists as cursor: %q", portalName)) } - portal, err := ex.makePreparedPortal(ctx, portalName, stmt, qargs, outFormats) + portal, err := ex.makePreparedPortal(ctx, evalCtx, portalName, stmt, qargs, outFormats) if err != nil { return err } diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index d8e3bb98fb11..82e8f76e989e 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -1584,7 +1584,8 @@ var ( ErrStmtNotSupportedForPausablePortal = unimplemented.NewWithIssue( 98911, "the statement for a pausable portal must be a read-only SELECT query"+ - " with no sub-queries or post-queries", + " with no sub-queries, post-queries, or function calls (please set "+ + "session variable enable_functions_in_portals to true to enable functions)", ) // ErrLimitedResultClosed is a sentinel error produced by pgwire // indicating the portal should be closed without error. diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 7bac5de6c986..1b2ba4055e14 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -24,7 +24,7 @@ import ( "strings" "time" - "github.com/cockroachdb/apd/v3" + apd "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -3331,6 +3331,10 @@ func (m *sessionDataMutator) SetAlterColumnTypeGeneral(val bool) { m.data.AlterColumnTypeGeneralEnabled = val } +func (m *sessionDataMutator) SetEnableFunctionsInPortals(val bool) { + m.data.FunctionsInPortalsEnabled = val +} + func (m *sessionDataMutator) SetEnableSuperRegions(val bool) { m.data.EnableSuperRegions = val } diff --git a/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals b/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals index a6c3c6bf7fe5..6eca2b7af4af 100644 --- a/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals +++ b/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals @@ -1,12 +1,16 @@ send crdb_only Query {"String": "SET multiple_active_portals_enabled = true"} +Query {"String": "SET enable_functions_in_portals = true"} ---- until crdb_only ignore=NoticeResponse ReadyForQuery +ReadyForQuery ---- {"Type":"CommandComplete","CommandTag":"SET"} {"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} subtest select_from_individual_resources @@ -457,7 +461,7 @@ ReadyForQuery {"Type":"PortalSuspended"} {"Type":"DataRow","Values":[{"text":"2"}]} {"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} {"Type":"ReadyForQuery","TxStatus":"E"} send crdb_only @@ -504,7 +508,7 @@ ReadyForQuery {"Type":"BindComplete"} {"Type":"DataRow","Values":[{"text":"f"}]} {"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} {"Type":"ReadyForQuery","TxStatus":"E"} send crdb_only @@ -553,7 +557,7 @@ ReadyForQuery {"Type":"BindComplete"} {"Type":"DataRow","Values":[{"text":"10"}]} {"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} {"Type":"ReadyForQuery","TxStatus":"E"} send crdb_only @@ -602,7 +606,7 @@ ReadyForQuery {"Type":"BindComplete"} {"Type":"DataRow","Values":[{"text":"1"}]} {"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} {"Type":"ReadyForQuery","TxStatus":"E"} send crdb_only @@ -946,7 +950,7 @@ ReadyForQuery {"Type":"PortalSuspended"} {"Type":"DataRow","Values":[{"text":"10"}]} {"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} {"Type":"ReadyForQuery","TxStatus":"E"} send diff --git a/pkg/sql/pgwire/testdata/pgtest/portals b/pkg/sql/pgwire/testdata/pgtest/portals index 8e8f57c1c606..8f8af6bec3ed 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals +++ b/pkg/sql/pgwire/testdata/pgtest/portals @@ -118,7 +118,7 @@ ReadyForQuery send Query {"String": "BEGIN"} -Parse {"Query": "SELECT * FROM generate_series(1, 2)"} +Parse {"Query": "SELECT * FROM (VALUES (1), (2)) AS foo"} Bind Execute {"MaxRows": 1} Sync @@ -180,7 +180,7 @@ ReadyForQuery send Query {"String": "BEGIN"} -Parse {"Query": "SELECT * FROM generate_series(1, 2)"} +Parse {"Query": "SELECT * FROM (VALUES (1), (2)) AS foo"} Bind Execute {"MaxRows": 1} Flush @@ -255,7 +255,7 @@ ReadyForQuery send Query {"String": "BEGIN"} -Parse {"Query": "SELECT * FROM generate_series(1, 4)"} +Parse {"Query": "SELECT * FROM (VALUES (1), (2), (3), (4)) AS foo"} Bind Execute {"MaxRows": 1} Sync @@ -314,7 +314,7 @@ ReadyForQuery # on the second. send -Parse {"Query": "SELECT * FROM generate_series(1, 2)"} +Parse {"Query": "SELECT * FROM (VALUES (1), (2)) AS foo"} Bind Execute {"MaxRows": 1} Sync @@ -359,7 +359,7 @@ ReadyForQuery send Query {"String": "BEGIN"} -Parse {"Query": "SELECT * FROM generate_series(1, 2)"} +Parse {"Query": "SELECT * FROM (VALUES (1), (2)) AS foo"} Bind Execute {"MaxRows": 1} Sync @@ -942,7 +942,7 @@ ReadyForQuery # 49 = ASCII '1' # ParameterFormatCodes = [0] for text format send -Parse {"Name": "s10", "Query": "select n::int4 from generate_series(0,$1::int8) n", "ParameterOIDs": [20]} +Parse {"Name": "s10", "Query": "select n::int4 from (VALUES (0), (1), (2), (3), (4), (5), ($1)) AS v(n)", "ParameterOIDs": [20]} Describe {"ObjectType": "S", "Name": "s10"} Bind {"PreparedStatement": "s10", "ParameterFormatCodes": [0], "ResultFormatCodes": [0], "Parameters": [{"text":"1"}]} Execute {"MaxRows": 1} @@ -962,7 +962,7 @@ ReadyForQuery send Parse {"Name": "c4", "Query": "COMMIT"} -Parse {"Name": "s11", "Query": "select n::int4 from generate_series(0,1) n"} +Parse {"Name": "s11", "Query": "select n::int4 from (VALUES (0), (1)) AS v(n)"} Bind {"DestinationPortal": "por", "PreparedStatement": "s11"} Bind {"DestinationPortal": "pc4", "PreparedStatement": "c4"} Execute {"Portal": "por"} diff --git a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs index 8c682a98e330..8a9df23cdff8 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs +++ b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs @@ -302,3 +302,72 @@ ReadyForQuery subtest end + + +subtest functions_not_supported + +# We don't support UDFs in pausable portals since we don't allow mutations, and +# UDFs may contain mutations. The following test results in a duplicate key +# violation in postgres. + +send +Query {"String": "SET multiple_active_portals_enabled = true"} +---- + +send +Query {"String": "DROP TABLE IF EXISTS xy;"} +Query {"String": "DROP FUNCTION IF EXISTS f;"} +Query {"String": "DROP FUNCTION IF EXISTS g;"} +Query {"String": "DEALLOCATE ALL;"} +Query {"String": "CREATE TABLE xy (x INT PRIMARY KEY, y INT);"} +Query {"String": "CREATE FUNCTION f() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (1, 1), (2, 2) RETURNING *; $$"} +Query {"String": "CREATE FUNCTION g() RETURNS SETOF RECORD LANGUAGE SQL AS $$ INSERT INTO xy VALUES (2, 1), (3, 3) RETURNING *; $$"} +Parse {"Name": "q1", "Query": "SELECT f();"} +Parse {"Name": "q2", "Query": "SELECT g();"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until keepErrMessage +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +ErrorResponse +---- +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DROP FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE FUNCTION"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"(1,1)"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries, post-queries, or function calls (please set session variable enable_functions_in_portals to true to enable functions)","Detail":"cannot execute a portal while a different one is open","Hint":"You have attempted to use a feature that is not yet implemented.\nSee: https://go.crdb.dev/issue-v/98911/v23.2"} + +subtest end diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index c2a52be43a54..bb41affd7153 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -194,6 +194,7 @@ type PreparedPortal struct { // accountForCopy() doesn't need to be called on the prepared statement. func (ex *connExecutor) makePreparedPortal( ctx context.Context, + evalCtx *extendedEvalContext, name string, stmt *PreparedStatement, qargs tree.QueryArguments, @@ -208,7 +209,7 @@ func (ex *connExecutor) makePreparedPortal( if ex.sessionData().MultipleActivePortalsEnabled && ex.executorType != executorTypeInternal { telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals) - if tree.IsAllowedToPause(stmt.AST) { + if tree.IsAllowedToPause(stmt.AST, evalCtx.SessionData().FunctionsInPortalsEnabled) { portal.pauseInfo = &portalPauseInfo{} portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} portal.portalPausablity = PausablePortal diff --git a/pkg/sql/sem/tree/BUILD.bazel b/pkg/sql/sem/tree/BUILD.bazel index 499e1f143501..1495b0390289 100644 --- a/pkg/sql/sem/tree/BUILD.bazel +++ b/pkg/sql/sem/tree/BUILD.bazel @@ -203,6 +203,7 @@ go_test( "parse_tuple_test.go", "placeholders_test.go", "pretty_test.go", + "stmt_test.go", "table_name_test.go", "time_test.go", "txn_test.go", diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index d22aa6a50b8e..43b7a78bf37e 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -123,20 +123,46 @@ type canModifySchema interface { modifiesSchema() bool } +type checkPausableVisitor struct { + isNotPausable bool +} + +// VisitPre implements the Visitor interface. +func (v *checkPausableVisitor) VisitPre(expr Expr) (recurse bool, newExpr Expr) { + switch expr.(type) { + case *FuncExpr: + v.isNotPausable = true + return false, expr + } + return true, expr +} + +// VisitPost implements the Visitor interface. +func (v *checkPausableVisitor) VisitPost(expr Expr) Expr { + return expr +} + // IsAllowedToPause returns true if the stmt cannot either modify the schema or // write data. // This function is to gate the queries allowed for pausable portals. // TODO(janexing): We should be more accurate about the stmt selection here. // Now we only allow SELECT, but is it too strict? And how to filter out // SELECT with data writes / schema changes? -func IsAllowedToPause(stmt Statement) bool { +func IsAllowedToPause(stmt Statement, functionsInPortalsEnabled bool) bool { if stmt != nil && !CanModifySchema(stmt) && !CanWriteData(stmt) { + if !functionsInPortalsEnabled { + v := checkPausableVisitor{} + walkStmt(&v, stmt) + if v.isNotPausable { + return false + } + } switch t := stmt.(type) { case *Select: if t.With != nil { ctes := t.With.CTEList for _, cte := range ctes { - if !IsAllowedToPause(cte.Stmt) { + if !IsAllowedToPause(cte.Stmt, functionsInPortalsEnabled) { return false } } diff --git a/pkg/sql/sem/tree/stmt_test.go b/pkg/sql/sem/tree/stmt_test.go new file mode 100644 index 000000000000..dbe2590825f6 --- /dev/null +++ b/pkg/sql/sem/tree/stmt_test.go @@ -0,0 +1,56 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tree_test + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" +) + +var pmap = map[bool]string{ + true: "pausable", + false: "unpausable", +} + +func TestIsAllowedToPause(t *testing.T) { + testData := []struct { + qry string + pausable bool + }{ + {"SELECT 1;", true}, + {"WITH cte AS (SELECT * FROM t) SELECT * FROM cte;", true}, + {"INSERT INTO t VALUES (1);", false}, + {"WITH cte AS (INSERT INTO t VALUES (1) RETURNING *) SELECT * FROM cte;", false}, + {"WITH cte AS (SELECT * FROM t) UPDATE t SET v = cte.v FROM cte WHERE t.a = cte.a;", false}, + {"SELECT f();", false}, + {"WITH cte AS (SELECT f()) SELECT * FROM cte;", false}, + {"SELECT * FROM (VALUES (f()));", false}, + } + + for i, td := range testData { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + stmts, err := parser.Parse(td.qry) + if err != nil { + t.Fatal(err) + } + if len(stmts) != 1 { + t.Fatalf("Expected 1 parsed statement, got %d.", len(stmts)) + } + if pausable := tree.IsAllowedToPause(stmts[0].AST); pausable != td.pausable { + t.Errorf("Expected statement \"%s\" to be %s, got %s", td.qry, pmap[td.pausable], pmap[pausable]) + } + }) + } + +} diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index 0c7dbb7ff603..04b4f90d8dad 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -234,7 +234,7 @@ message LocalOnlySessionData { bool cost_scans_with_default_col_size = 61; // DefaultTxnQualityOfService indicates the default QoSLevel/WorkPriority of // newly created transactions. - int32 default_txn_quality_of_service = 62 [(gogoproto.casttype)="QoSLevel"]; + int32 default_txn_quality_of_service = 62 [(gogoproto.casttype) = "QoSLevel"]; // OptSplitScanLimit indicates the maximum number of UNION ALL statements a // Scan may be split into during query optimization to avoid a sort. int32 opt_split_scan_limit = 63; @@ -424,6 +424,10 @@ message LocalOnlySessionData { // better performance.) Weaker isolation levels always use durable locking. bool durable_locking_for_serializable = 109; + // FunctionsInPortalsEnabled is true if statements containing function calls + // are allowed in pausable portals. + bool functions_in_portals_enabled = 110; + /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // // be propagated to the remote nodes. If so, that parameter should live // diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index 1ab24352fc25..6d12982f88b9 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -1787,6 +1787,22 @@ var varGen = map[string]sessionVar{ }, }, + `enable_functions_in_portals`: { + GetStringVal: makePostgresBoolGetStringValFn(`enable_functions_in_portals`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + b, err := paramparse.ParseBoolVar("enable_functions_in_portals", s) + if err != nil { + return err + } + m.SetEnableFunctionsInPortals(b) + return nil + }, + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return formatBoolAsPostgresSetting(evalCtx.SessionData().FunctionsInPortalsEnabled), nil + }, + GlobalDefault: globalFalse, + }, + // TODO(rytaft): remove this once unique without index constraints are fully // supported. `experimental_enable_unique_without_index_constraints`: { @@ -2871,7 +2887,7 @@ func ReplicationModeFromString(s string) (sessiondatapb.ReplicationMode, error) } // We want test coverage for this on and off so make it metamorphic. -var copyFastPathDefault bool = util.ConstantWithMetamorphicTestBool("copy-fast-path-enabled-default", true) +var copyFastPathDefault = util.ConstantWithMetamorphicTestBool("copy-fast-path-enabled-default", true) const compatErrMsg = "this parameter is currently recognized only for compatibility and has no effect in CockroachDB."