diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 5e34b0e6a8c6..c210ff2cfe7b 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2951,7 +2951,7 @@ func (dsp *DistSQLPlanner) createPlanForOrdinality( } func createProjectSetSpec( - planCtx *PlanningCtx, n *projectSetNode, indexVarMap []int, + planCtx *PlanningCtx, n *projectSetPlanningInfo, indexVarMap []int, ) (*execinfrapb.ProjectSetSpec, error) { spec := execinfrapb.ProjectSetSpec{ Exprs: make([]execinfrapb.Expression, len(n.exprs)), @@ -2981,12 +2981,21 @@ func (dsp *DistSQLPlanner) createPlanForProjectSet( if err != nil { return nil, err } + err = dsp.addProjectSet(plan, planCtx, &n.projectSetPlanningInfo) + return plan, err +} + +// addProjectSet adds a grouping stage consisting of a single +// projectSetProcessor that is planned on the gateway. +func (dsp *DistSQLPlanner) addProjectSet( + plan *PhysicalPlan, planCtx *PlanningCtx, info *projectSetPlanningInfo, +) error { numResults := len(plan.ResultTypes) // Create the project set processor spec. - projectSetSpec, err := createProjectSetSpec(planCtx, n, plan.PlanToStreamColMap) + projectSetSpec, err := createProjectSetSpec(planCtx, info, plan.PlanToStreamColMap) if err != nil { - return nil, err + return err } spec := execinfrapb.ProcessorCoreUnion{ ProjectSet: projectSetSpec, @@ -3004,8 +3013,7 @@ func (dsp *DistSQLPlanner) createPlanForProjectSet( for i := range projectSetSpec.GeneratedColumns { plan.PlanToStreamColMap = append(plan.PlanToStreamColMap, numResults+i) } - - return plan, nil + return nil } // isOnlyOnGateway returns true if a physical plan is executed entirely on the diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index c17f5d55755d..d9c24408a85e 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -693,7 +693,26 @@ func (e *distSQLSpecExecFactory) ConstructMax1Row( func (e *distSQLSpecExecFactory) ConstructProjectSet( n exec.Node, exprs tree.TypedExprs, zipCols sqlbase.ResultColumns, numColsPerGen []int, ) (exec.Node, error) { - return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: project set") + physPlan, plan := getPhysPlan(n) + cols := append(plan.physPlan.ResultColumns, zipCols...) + err := e.dsp.addProjectSet( + physPlan, + // Currently, projectSetProcessors are always planned as a "grouping" + // stage (meaning a single processor on the gateway), so we use + // cannotDistribute as the recommendation. + e.getPlanCtx(cannotDistribute), + &projectSetPlanningInfo{ + columns: cols, + numColsInSource: len(plan.physPlan.ResultColumns), + exprs: exprs, + numColsPerGen: numColsPerGen, + }, + ) + if err != nil { + return nil, err + } + physPlan.ResultColumns = cols + return plan, nil } func (e *distSQLSpecExecFactory) ConstructWindow( diff --git a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning index 805d1f106395..23227bd8f49b 100644 --- a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning +++ b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning @@ -100,3 +100,12 @@ SELECT v FROM kv ORDER BY v DESC 1 1 NULL + +# Check that an SRF is supported. +query I colnames +SELECT * FROM generate_series(1, 3) +---- +generate_series +1 +2 +3 diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index c4184435f0f9..af8a188e52b7 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -922,11 +922,13 @@ func (ef *execFactory) ConstructProjectSet( src := asDataSource(n) cols := append(src.columns, zipCols...) return &projectSetNode{ - source: src.plan, - columns: cols, - numColsInSource: len(src.columns), - exprs: exprs, - numColsPerGen: numColsPerGen, + source: src.plan, + projectSetPlanningInfo: projectSetPlanningInfo{ + columns: cols, + numColsInSource: len(src.columns), + exprs: exprs, + numColsPerGen: numColsPerGen, + }, }, nil } diff --git a/pkg/sql/project_set.go b/pkg/sql/project_set.go index 8660cdf756d3..da5dac5a9b14 100644 --- a/pkg/sql/project_set.go +++ b/pkg/sql/project_set.go @@ -33,7 +33,12 @@ import ( // with zip(a,b,c). type projectSetNode struct { source planNode + projectSetPlanningInfo +} +// projectSetPlanningInfo is a helper struct that is extracted from +// projectSetNode to be reused during physical planning. +type projectSetPlanningInfo struct { // columns contains all the columns from the source, and then // the columns from the generators. columns sqlbase.ResultColumns