Skip to content

Commit

Permalink
sql: implement ConstructProjectSet in the new factory
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
yuzefovich committed Aug 4, 2020
1 parent 8e46313 commit fe44189
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 11 deletions.
18 changes: 13 additions & 5 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2951,7 +2951,7 @@ func (dsp *DistSQLPlanner) createPlanForOrdinality(
}

func createProjectSetSpec(
planCtx *PlanningCtx, n *projectSetNode, indexVarMap []int,
planCtx *PlanningCtx, n *projectSetPlanningInfo, indexVarMap []int,
) (*execinfrapb.ProjectSetSpec, error) {
spec := execinfrapb.ProjectSetSpec{
Exprs: make([]execinfrapb.Expression, len(n.exprs)),
Expand Down Expand Up @@ -2981,12 +2981,21 @@ func (dsp *DistSQLPlanner) createPlanForProjectSet(
if err != nil {
return nil, err
}
err = dsp.addProjectSet(plan, planCtx, &n.projectSetPlanningInfo)
return plan, err
}

// addProjectSet adds a grouping stage consisting of a single
// projectSetProcessor that is planned on the gateway.
func (dsp *DistSQLPlanner) addProjectSet(
plan *PhysicalPlan, planCtx *PlanningCtx, info *projectSetPlanningInfo,
) error {
numResults := len(plan.ResultTypes)

// Create the project set processor spec.
projectSetSpec, err := createProjectSetSpec(planCtx, n, plan.PlanToStreamColMap)
projectSetSpec, err := createProjectSetSpec(planCtx, info, plan.PlanToStreamColMap)
if err != nil {
return nil, err
return err
}
spec := execinfrapb.ProcessorCoreUnion{
ProjectSet: projectSetSpec,
Expand All @@ -3004,8 +3013,7 @@ func (dsp *DistSQLPlanner) createPlanForProjectSet(
for i := range projectSetSpec.GeneratedColumns {
plan.PlanToStreamColMap = append(plan.PlanToStreamColMap, numResults+i)
}

return plan, nil
return nil
}

// isOnlyOnGateway returns true if a physical plan is executed entirely on the
Expand Down
21 changes: 20 additions & 1 deletion pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,26 @@ func (e *distSQLSpecExecFactory) ConstructMax1Row(
func (e *distSQLSpecExecFactory) ConstructProjectSet(
n exec.Node, exprs tree.TypedExprs, zipCols sqlbase.ResultColumns, numColsPerGen []int,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: project set")
physPlan, plan := getPhysPlan(n)
cols := append(plan.physPlan.ResultColumns, zipCols...)
err := e.dsp.addProjectSet(
physPlan,
// Currently, projectSetProcessors are always planned as a "grouping"
// stage (meaning a single processor on the gateway), so we use
// cannotDistribute as the recommendation.
e.getPlanCtx(cannotDistribute),
&projectSetPlanningInfo{
columns: cols,
numColsInSource: len(plan.physPlan.ResultColumns),
exprs: exprs,
numColsPerGen: numColsPerGen,
},
)
if err != nil {
return nil, err
}
physPlan.ResultColumns = cols
return plan, nil
}

func (e *distSQLSpecExecFactory) ConstructWindow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,12 @@ SELECT v FROM kv ORDER BY v DESC
1
1
NULL

# Check that an SRF is supported.
query I colnames
SELECT * FROM generate_series(1, 3)
----
generate_series
1
2
3
12 changes: 7 additions & 5 deletions pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,11 +922,13 @@ func (ef *execFactory) ConstructProjectSet(
src := asDataSource(n)
cols := append(src.columns, zipCols...)
return &projectSetNode{
source: src.plan,
columns: cols,
numColsInSource: len(src.columns),
exprs: exprs,
numColsPerGen: numColsPerGen,
source: src.plan,
projectSetPlanningInfo: projectSetPlanningInfo{
columns: cols,
numColsInSource: len(src.columns),
exprs: exprs,
numColsPerGen: numColsPerGen,
},
}, nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/project_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ import (
// with zip(a,b,c).
type projectSetNode struct {
source planNode
projectSetPlanningInfo
}

// projectSetPlanningInfo is a helper struct that is extracted from
// projectSetNode to be reused during physical planning.
type projectSetPlanningInfo struct {
// columns contains all the columns from the source, and then
// the columns from the generators.
columns sqlbase.ResultColumns
Expand Down

0 comments on commit fe44189

Please sign in to comment.