diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 89fc9aeeca2f..52bc2955af8c 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -43,23 +43,39 @@ type distSQLSpecExecFactory struct { localPlanCtx *PlanningCtx } singleTenant bool + planningMode distSQLPlanningMode gatewayNodeID roachpb.NodeID } var _ exec.Factory = &distSQLSpecExecFactory{} -func newDistSQLSpecExecFactory(p *planner) exec.Factory { +// distSQLPlanningMode indicates the planning mode in which +// distSQLSpecExecFactory is operating. +type distSQLPlanningMode int + +const ( + // distSQLDefaultPlanning is the default planning mode in which the factory + // can create a physical plan with any plan distribution (local, partially + // distributed, or fully distributed). + distSQLDefaultPlanning distSQLPlanningMode = iota + // distSQLLocalOnlyPlanning is the planning mode in which the factory + // only creates local physical plans. + distSQLLocalOnlyPlanning +) + +func newDistSQLSpecExecFactory(p *planner, planningMode distSQLPlanningMode) exec.Factory { return &distSQLSpecExecFactory{ planner: p, dsp: p.extendedEvalCtx.DistSQLPlanner, singleTenant: p.execCfg.Codec.ForSystemTenant(), + planningMode: planningMode, gatewayNodeID: p.extendedEvalCtx.DistSQLPlanner.gatewayNodeID, } } func (e *distSQLSpecExecFactory) getPlanCtx(recommendation distRecommendation) *PlanningCtx { distribute := false - if e.singleTenant { + if e.singleTenant && e.planningMode != distSQLLocalOnlyPlanning { distribute = shouldDistributeGivenRecAndMode(recommendation, e.planner.extendedEvalCtx.SessionData.DistSQLMode) } if distribute { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index a44730c9b355..b7a38d60d84c 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -2030,6 +2030,10 @@ func (m *sessionDataMutator) SetExperimentalDistSQLPlanning( m.data.ExperimentalDistSQLPlanningMode = val } +func (m *sessionDataMutator) SetPartiallyDistributedPlansDisabled(val bool) { + m.data.PartiallyDistributedPlansDisabled = val +} + func (m *sessionDataMutator) SetRequireExplicitPrimaryKeys(val bool) { m.data.RequireExplicitPrimaryKeys = val } diff --git a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning index e6a525b01b98..fafa6785cc7a 100644 --- a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning +++ b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning @@ -1,22 +1,23 @@ -# Test that we can set the session variable and cluster setting. -statement ok -SET experimental_distsql_planning = off - -statement ok -SET experimental_distsql_planning = on +# NOTE: all queries in this file should run with 'experimental_distsql_planning' +# set to 'always' unless they are known to be unsupported. If you do need to +# execute an unsupported query, use the following pattern: +# RESET experimental_distsql_planning +# +# SET experimental_distsql_planning = always statement ok -SET CLUSTER SETTING sql.defaults.experimental_distsql_planning = off +SET experimental_distsql_planning = always -statement ok -SET CLUSTER SETTING sql.defaults.experimental_distsql_planning = on +# Check that we get an error on an unsupported query. +query error pq: unimplemented: experimental opt-driven distsql planning: create table +CREATE TABLE foo (bar INT) statement ok +RESET experimental_distsql_planning; +CREATE TABLE kv (k INT PRIMARY KEY, v INT); +INSERT INTO kv VALUES (1, 1), (2, 1), (3, 2); SET experimental_distsql_planning = always -statement ok -CREATE TABLE kv (k INT PRIMARY KEY, v INT); INSERT INTO kv VALUES (1, 1), (2, 1), (3, 2) - query II colnames,rowsort SELECT * FROM kv ---- @@ -62,7 +63,9 @@ SELECT * FROM kv WHERE k > v 3 2 statement ok -INSERT INTO kv VALUES (4, NULL), (5, 3) +RESET experimental_distsql_planning; +INSERT INTO kv VALUES (4, NULL), (5, 3); +SET experimental_distsql_planning = always query I SELECT v FROM kv ORDER BY k @@ -87,9 +90,3 @@ query I SELECT min(v) FROM kv ---- 1 - -# We currently have experimental_distsql_planning set to 'always' which returns -# an error for SELECT query with an unsupported processor core. Let's verify -# that behavior. -statement error pq: unimplemented: experimental opt-driven distsql planning: window -SELECT min(v) OVER () FROM kv diff --git a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node index 47910818b9b3..8cfd2931c0e2 100644 --- a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node +++ b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node @@ -56,7 +56,6 @@ EXPLAIN (DISTSQL) SELECT * FROM kv ---- true https://cockroachdb.github.io/distsqlplan/decode.html#eJyk0s9O4zAQBvD7PkX0nXZXjvJ_DzktgiJFKm1pekBCOYR4VEWkcbAdBKry7qjOobQqYOjR9nz-zUizhXpqkGJyt5heZDPn91WWr_Lb6R8nn0wnlyvnr3O9nN84j89gaAWnWbkhhfQeARhCMERgiMGQoGDopKhIKSF3JVsTyPgLUp-hbrte764LhkpIQrqFrnVDSLEqHxpaUslJej4YOOmybgzTyXpTytf_poG8K1uVOq4XohgYRK_3fypdrglpMLAP3D3Xt0JyksQPqGI40dlMuKLzkqPC03R4QAf2Iwdfj-yFrhdZDh3ay6GFHLlebClH9nJkIceul1jKsb0cW8iJ-4MFO-EuSXWiVWS1P_5uAYmvadxWJXpZ0UKKyjDjcW5y5oKT0uNrMB6y1jyZBt-Hg0_D_w7C_nE4PEeOzgnH54STb4WL4ddbAAAA__9oS6Od - # Note that we want to test DistSQL physical planning and the obvious choice # would be to use EXPLAIN (DISTSQL). However, this explain variant doesn't have # a textual mode which is easier to verify, so we use EXPLAIN (VEC) instead. @@ -162,3 +161,19 @@ SELECT kv.k FROM kv, kw WHERE kv.k = kw.k ORDER BY 1 3 4 5 + +# Disable the partially distributed plans and check that a local plan is +# produced instead. +statement ok +SET disable_partially_distributed_plans = true + +query T +EXPLAIN (VEC) SELECT k::REGCLASS FROM kv +---- +│ +└ Node 1 + └ *colexec.castOp + └ *colfetcher.colBatchScan + +statement ok +SET disable_partially_distributed_plans = false diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 550963ee095e..0d1a44aae519 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -1727,6 +1727,7 @@ default_tablespace · NULL NU default_transaction_isolation serializable NULL NULL NULL string default_transaction_priority normal NULL NULL NULL string default_transaction_read_only off NULL NULL NULL string +disable_partially_distributed_plans off NULL NULL NULL string distsql off NULL NULL NULL string enable_experimental_alter_column_type_general off NULL NULL NULL string enable_implicit_select_for_update on NULL NULL NULL string @@ -1794,6 +1795,7 @@ default_tablespace · NULL user default_transaction_isolation serializable NULL user NULL default default default_transaction_priority normal NULL user NULL normal normal default_transaction_read_only off NULL user NULL off off +disable_partially_distributed_plans off NULL user NULL off off distsql off NULL user NULL off off enable_experimental_alter_column_type_general off NULL user NULL off off enable_implicit_select_for_update on NULL user NULL on on @@ -1857,6 +1859,7 @@ default_tablespace NULL NULL NULL NULL default_transaction_isolation NULL NULL NULL NULL NULL default_transaction_priority NULL NULL NULL NULL NULL default_transaction_read_only NULL NULL NULL NULL NULL +disable_partially_distributed_plans NULL NULL NULL NULL NULL distsql NULL NULL NULL NULL NULL enable_experimental_alter_column_type_general NULL NULL NULL NULL NULL enable_implicit_select_for_update NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/set b/pkg/sql/logictest/testdata/logic_test/set index 5ab2e5e9c56f..0dafb615e049 100644 --- a/pkg/sql/logictest/testdata/logic_test/set +++ b/pkg/sql/logictest/testdata/logic_test/set @@ -350,3 +350,8 @@ SET enable_zigzag_join = no # Check error code. statement error pgcode 22023 parameter "enable_zigzag_join" requires a Boolean value SET enable_zigzag_join = nonsense + +statement ok +SET experimental_distsql_planning = always; +SET experimental_distsql_planning = on; +SET experimental_distsql_planning = off diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index e019e496fc24..4a5f1f8cbd9b 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -37,6 +37,7 @@ default_tablespace · default_transaction_isolation serializable default_transaction_priority normal default_transaction_read_only off +disable_partially_distributed_plans off distsql off enable_experimental_alter_column_type_general off enable_implicit_select_for_update on diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 8c6c53e6fd69..057f24590868 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -321,6 +322,37 @@ func (p *planMaybePhysical) isPhysicalPlan() bool { return p.physPlan != nil } +func (p *planMaybePhysical) isPartiallyDistributed() bool { + // By default, we assume that the plan is "local" (it doesn't matter + // whether the plan is actually "distributed" or not, only that is not + // "partially distributed"). + distribution := physicalplan.LocalPlan + // Next we check all possible scenarios in which we might have partially + // distributed plans. + if p.isPhysicalPlan() { + distribution = p.physPlan.Distribution + } else { + // Even when the whole plan is not physical, we might have EXPLAIN + // planNodes that themselves contain a physical plan, so we need to + // peek inside of those. + switch n := p.planNode.(type) { + case *explainPlanNode: + if n.plan.main.isPhysicalPlan() { + distribution = n.plan.main.physPlan.Distribution + } + case *explainDistSQLNode: + if n.plan.main.isPhysicalPlan() { + distribution = n.plan.main.physPlan.Distribution + } + case *explainVecNode: + if n.plan.isPhysicalPlan() { + distribution = n.plan.physPlan.Distribution + } + } + } + return distribution == physicalplan.PartiallyDistributedPlan +} + func (p *planMaybePhysical) planColumns() sqlbase.ResultColumns { if p.isPhysicalPlan() { return p.physPlan.ResultColumns diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go index eb0b26558144..196e4554735b 100644 --- a/pkg/sql/plan_opt.go +++ b/pkg/sql/plan_opt.go @@ -12,6 +12,7 @@ package sql import ( "context" + "strings" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" @@ -184,23 +185,36 @@ func (p *planner) makeOptimizerPlan(ctx context.Context) error { // we probably could pool those allocations using sync.Pool. Investigate // this. if mode := p.SessionData().ExperimentalDistSQLPlanningMode; mode != sessiondata.ExperimentalDistSQLPlanningOff { - bld = execbuilder.New(newDistSQLSpecExecFactory(p), execMemo, &opc.catalog, root, p.EvalContext()) + bld = execbuilder.New(newDistSQLSpecExecFactory(p, distSQLDefaultPlanning), execMemo, &opc.catalog, root, p.EvalContext()) plan, err = bld.Build() if err != nil { if mode == sessiondata.ExperimentalDistSQLPlanningAlways && - p.stmt.AST.StatementTag() == "SELECT" { + !strings.Contains(p.stmt.AST.StatementTag(), "SET") { // We do not fallback to the old path because experimental - // planning is set to 'always' and we have a SELECT statement, - // so we return an error. - // We use a simple heuristic to check whether the statement is - // a SELECT, and the reasoning behind it is that we want to be - // able to run certain statement types (e.g. SET) regardless of - // whether they are supported by the new factory. - // TODO(yuzefovich): update this once we support more than just - // SELECT statements (see #47473). + // planning is set to 'always' and we don't have a SET + // statement, so we return an error. SET statements are + // exceptions because we want to be able to execute them + // regardless of whether they are supported by the new factory. + // TODO(yuzefovich): update this once SET statements are + // supported (see #47473). return err } // We will fallback to the old path. + } + if err == nil { + m := plan.(*planTop).main + if m.isPartiallyDistributed() && p.SessionData().PartiallyDistributedPlansDisabled { + // The planning has succeeded, but we've created a partially + // distributed plan yet the session variable prohibits such + // plan distribution - we need to replan with a new factory + // that forces local planning. + // TODO(yuzefovich): remove this logic when deleting old + // execFactory. + bld = execbuilder.New(newDistSQLSpecExecFactory(p, distSQLLocalOnlyPlanning), execMemo, &opc.catalog, root, p.EvalContext()) + plan, err = bld.Build() + } + } + if err != nil { bld = nil // TODO(yuzefovich): make the logging conditional on the verbosity // level once new DistSQL planning is no longer experimental. diff --git a/pkg/sql/sessiondata/session_data.go b/pkg/sql/sessiondata/session_data.go index 7dea7e076ea3..d4cd6e1a0b5a 100644 --- a/pkg/sql/sessiondata/session_data.go +++ b/pkg/sql/sessiondata/session_data.go @@ -49,6 +49,12 @@ type SessionData struct { // ExperimentalDistSQLPlanningMode indicates whether the experimental // DistSQL planning driven by the optimizer is enabled. ExperimentalDistSQLPlanningMode ExperimentalDistSQLPlanningMode + // PartiallyDistributedPlansDisabled indicates whether the partially + // distributed plans produced by distSQLSpecExecFactory are disabled. It + // should be set to 'true' only in tests that verify that the old and the + // new factories return exactly the same physical plans. + // TODO(yuzefovich): remove it when deleting old sql.execFactory. + PartiallyDistributedPlansDisabled bool // OptimizerFKCascadesLimit is the maximum number of cascading operations that // are run for a single query. OptimizerFKCascadesLimit int diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index bc0034344449..e261bfef10a3 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -403,6 +403,25 @@ var varGen = map[string]sessionVar{ }, }, + // CockroachDB extension. + `disable_partially_distributed_plans`: { + GetStringVal: makePostgresBoolGetStringValFn(`disable_partially_distributed_plans`), + Set: func(_ context.Context, m *sessionDataMutator, s string) error { + b, err := parseBoolVar("disable_partially_distributed_plans", s) + if err != nil { + return err + } + m.SetPartiallyDistributedPlansDisabled(b) + return nil + }, + Get: func(evalCtx *extendedEvalContext) string { + return formatBoolAsPostgresSetting(evalCtx.SessionData.PartiallyDistributedPlansDisabled) + }, + GlobalDefault: func(sv *settings.Values) string { + return formatBoolAsPostgresSetting(false) + }, + }, + // CockroachDB extension. `experimental_enable_enums`: { GetStringVal: makePostgresBoolGetStringValFn(`experimental_enable_enums`),