Skip to content

Commit

Permalink
opt: add SerializingProject exec primitive
Browse files Browse the repository at this point in the history
The top-level projection of a query has a special property - it can project away
columns that we want an ordering on (e.g. `SELECT a FROM t ORDER BY b`).

The distsql physical planner was designed to tolerate such cases, as they were
much more common with the heuristic planner. But the new distsql exec factory
does not; it currently relies on a hack: it detects this case by checking if the
required output ordering is `nil`. This is fragile and doesn't work in all
cases.

This change adds a `SerializingProject` primitive which is like a SimpleProject
but it forces serialization of all parallel streams into one. The new primitive
is used to enforce the final query presentation. We only need to pass column
names for the presentation, so we remove `RenameColumns` and remove the column
names argument from `SimpleProject` (simplifying some execbuilder code).

We also fix a bug in `ConstructSimpleProject` where we weren't taking the
`PlanToStreamColMap` into account when building the projection.

Release note: None
  • Loading branch information
RaduBerinde committed Aug 5, 2020
1 parent 3291543 commit af11fa9
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 76 deletions.
30 changes: 20 additions & 10 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,27 +347,37 @@ func (e *distSQLSpecExecFactory) ConstructInvertedFilter(
}

func (e *distSQLSpecExecFactory) ConstructSimpleProject(
n exec.Node, cols []exec.NodeColumnOrdinal, colNames []string, reqOrdering exec.OutputOrdering,
n exec.Node, cols []exec.NodeColumnOrdinal, reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
physPlan, plan := getPhysPlan(n)
projection := make([]uint32, len(cols))
for i := range cols {
projection[i] = uint32(cols[i])
projection[i] = uint32(cols[physPlan.PlanToStreamColMap[i]])
}
physPlan.AddProjection(projection)
physPlan.ResultColumns = getResultColumnsForSimpleProject(cols, colNames, physPlan.ResultTypes, physPlan.ResultColumns)
physPlan.ResultColumns = getResultColumnsForSimpleProject(
cols, nil /* colNames */, physPlan.ResultTypes, physPlan.ResultColumns,
)
physPlan.PlanToStreamColMap = identityMap(physPlan.PlanToStreamColMap, len(cols))
if reqOrdering == nil {
// When reqOrdering is nil, we're adding a top-level (i.e. "final")
// projection. In such scenario we need to be careful to not simply
// reset the merge ordering that is currently set on the plan - we do
// so by merging the streams on the gateway node.
physPlan.EnsureSingleStreamOnGateway()
}
physPlan.SetMergeOrdering(e.dsp.convertOrdering(ReqOrdering(reqOrdering), physPlan.PlanToStreamColMap))
return plan, nil
}

func (e *distSQLSpecExecFactory) ConstructSerializingProject(
n exec.Node, cols []exec.NodeColumnOrdinal, colNames []string,
) (exec.Node, error) {
physPlan, plan := getPhysPlan(n)
physPlan.EnsureSingleStreamOnGateway()
projection := make([]uint32, len(cols))
for i := range cols {
projection[i] = uint32(cols[physPlan.PlanToStreamColMap[i]])
}
physPlan.AddProjection(projection)
physPlan.ResultColumns = getResultColumnsForSimpleProject(cols, colNames, physPlan.ResultTypes, physPlan.ResultColumns)
physPlan.PlanToStreamColMap = identityMap(physPlan.PlanToStreamColMap, len(cols))
return plan, nil
}

func (e *distSQLSpecExecFactory) ConstructRender(
n exec.Node,
columns sqlbase.ResultColumns,
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/exec_factory_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,10 @@ func constructVirtualScan(
if needed := params.NeededCols; needed.Len() != len(columns) {
// We are selecting a subset of columns; we need a projection.
cols := make([]exec.NodeColumnOrdinal, 0, needed.Len())
colNames := make([]string, len(cols))
for ord, ok := needed.Next(0); ok; ord, ok = needed.Next(ord + 1) {
cols = append(cols, exec.NodeColumnOrdinal(ord-1))
colNames = append(colNames, columns[ord-1].Name)
}
n, err = ef.ConstructSimpleProject(n, cols, colNames, nil /* reqOrdering */)
n, err = ef.ConstructSimpleProject(n, cols, nil /* reqOrdering */)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (b *Builder) buildMutationInput(
}
}

input, err = b.ensureColumns(input, colList, nil, inputExpr.ProvidedPhysical().Ordering)
input, err = b.ensureColumns(input, colList, inputExpr.ProvidedPhysical().Ordering)
if err != nil {
return execPlan{}, err
}
Expand Down
58 changes: 20 additions & 38 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) {

// Wrap the expression in a render expression if presentation requires it.
if p := e.RequiredPhysical(); !p.Presentation.Any() {
ep, err = b.applyPresentation(ep, p)
ep, err = b.applyPresentation(ep, p.Presentation)
}
return ep, err
}
Expand Down Expand Up @@ -608,7 +608,7 @@ func (b *Builder) applySimpleProject(
})
var err error
res.root, err = b.factory.ConstructSimpleProject(
input.root, colList, nil /* colNames */, exec.OutputOrdering(res.sqlOrdering(providedOrd)),
input.root, colList, exec.OutputOrdering(res.sqlOrdering(providedOrd)),
)
if err != nil {
return execPlan{}, err
Expand Down Expand Up @@ -1255,9 +1255,7 @@ func (b *Builder) buildDistinct(distinct memo.RelExpr) (execPlan, error) {
if input.outputCols.Len() == outCols.Len() {
return ep, nil
}
return b.ensureColumns(
ep, opt.ColSetToList(outCols), nil /* colNames */, distinct.ProvidedPhysical().Ordering,
)
return b.ensureColumns(ep, opt.ColSetToList(outCols), distinct.ProvidedPhysical().Ordering)
}

func (b *Builder) buildGroupByInput(groupBy memo.RelExpr) (execPlan, error) {
Expand Down Expand Up @@ -1309,9 +1307,7 @@ func (b *Builder) buildGroupByInput(groupBy memo.RelExpr) (execPlan, error) {

input.outputCols = newOutputCols
reqOrdering := input.reqOrdering(groupByInput)
input.root, err = b.factory.ConstructSimpleProject(
input.root, cols, nil /* colNames */, reqOrdering,
)
input.root, err = b.factory.ConstructSimpleProject(input.root, cols, reqOrdering)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -1351,15 +1347,11 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) {
// Note that (unless this is part of a larger query) the presentation property
// will ensure that the columns are presented correctly in the output (i.e. in
// the order `b, c, a`).
left, err = b.ensureColumns(
left, private.LeftCols, nil /* colNames */, leftExpr.ProvidedPhysical().Ordering,
)
left, err = b.ensureColumns(left, private.LeftCols, leftExpr.ProvidedPhysical().Ordering)
if err != nil {
return execPlan{}, err
}
right, err = b.ensureColumns(
right, private.RightCols, nil /* colNames */, rightExpr.ProvidedPhysical().Ordering,
)
right, err = b.ensureColumns(right, private.RightCols, rightExpr.ProvidedPhysical().Ordering)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -1759,7 +1751,7 @@ func (b *Builder) buildRecursiveCTE(rec *memo.RecursiveCTEExpr) (execPlan, error
}

// Make sure we have the columns in the correct order.
initial, err = b.ensureColumns(initial, rec.InitialCols, nil /* colNames */, nil /* ordering */)
initial, err = b.ensureColumns(initial, rec.InitialCols, nil /* ordering */)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -1794,9 +1786,7 @@ func (b *Builder) buildRecursiveCTE(rec *memo.RecursiveCTEExpr) (execPlan, error
return nil, err
}
// Ensure columns are output in the same order.
plan, err = innerBld.ensureColumns(
plan, rec.RecursiveCols, nil /* colNames */, nil, /* ordering */
)
plan, err = innerBld.ensureColumns(plan, rec.RecursiveCols, opt.Ordering{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1856,7 +1846,7 @@ func (b *Builder) buildWithScan(withScan *memo.WithScanExpr) (execPlan, error) {
res.outputCols.Set(int(withScan.OutCols[i]), i)
}
res.root, err = b.factory.ConstructSimpleProject(
res.root, cols, nil, /* colNames */
res.root, cols,
exec.OutputOrdering(res.sqlOrdering(withScan.ProvidedPhysical().Ordering)),
)
if err != nil {
Expand Down Expand Up @@ -2031,7 +2021,7 @@ func (b *Builder) buildWindow(w *memo.WindowExpr) (execPlan, error) {
// TODO(justin): this call to ensureColumns is kind of unfortunate because it
// can result in an extra render beneath each window function. Figure out a
// way to alleviate this.
input, err = b.ensureColumns(input, desiredCols, nil, opt.Ordering{})
input, err = b.ensureColumns(input, desiredCols, opt.Ordering{})
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -2257,18 +2247,10 @@ func (b *Builder) needProjection(
// ensureColumns applies a projection as necessary to make the output match the
// given list of columns; colNames is optional.
func (b *Builder) ensureColumns(
input execPlan, colList opt.ColList, colNames []string, provided opt.Ordering,
input execPlan, colList opt.ColList, provided opt.Ordering,
) (execPlan, error) {
cols, needProj := b.needProjection(input, colList)
if !needProj {
// No projection necessary.
if colNames != nil {
var err error
input.root, err = b.factory.ConstructRenameColumns(input.root, colNames)
if err != nil {
return execPlan{}, err
}
}
return input, nil
}
var res execPlan
Expand All @@ -2277,24 +2259,24 @@ func (b *Builder) ensureColumns(
}
reqOrdering := exec.OutputOrdering(res.sqlOrdering(provided))
var err error
res.root, err = b.factory.ConstructSimpleProject(input.root, cols, colNames, reqOrdering)
res.root, err = b.factory.ConstructSimpleProject(input.root, cols, reqOrdering)
return res, err
}

// applyPresentation adds a projection to a plan to satisfy a required
// Presentation property.
func (b *Builder) applyPresentation(input execPlan, p *physical.Required) (execPlan, error) {
pres := p.Presentation
colList := make(opt.ColList, len(pres))
func (b *Builder) applyPresentation(input execPlan, pres physical.Presentation) (execPlan, error) {
cols := make([]exec.NodeColumnOrdinal, len(pres))
colNames := make([]string, len(pres))
var res execPlan
for i := range pres {
colList[i] = pres[i].ID
cols[i] = input.getNodeColumnOrdinal(pres[i].ID)
res.outputCols.Set(int(pres[i].ID), i)
colNames[i] = pres[i].Alias
}
// The ordering is not useful for a top-level projection (it is used by the
// distsql planner for internal nodes); we might not even be able to represent
// it because it can refer to columns not in the presentation.
return b.ensureColumns(input, colList, colNames, nil /* provided */)
var err error
res.root, err = b.factory.ConstructSerializingProject(input.root, cols, colNames)
return res, err
}

// getEnvData consolidates the information that must be presented in
Expand Down
8 changes: 1 addition & 7 deletions pkg/sql/opt/exec/execbuilder/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,7 @@ func (b *Builder) buildCreateTable(ct *memo.CreateTableExpr) (execPlan, error) {
// Impose ordering and naming on input columns, so that they match the
// order and names of the table columns into which values will be
// inserted.
colList := make(opt.ColList, len(ct.InputCols))
colNames := make([]string, len(ct.InputCols))
for i := range ct.InputCols {
colList[i] = ct.InputCols[i].ID
colNames[i] = ct.InputCols[i].Alias
}
input, err = b.ensureColumns(input, colList, colNames, nil /* provided */)
input, err = b.applyPresentation(input, ct.InputCols)
if err != nil {
return execPlan{}, err
}
Expand Down
22 changes: 15 additions & 7 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,24 @@ define InvertedFilter {
define SimpleProject {
Input exec.Node
Cols []exec.NodeColumnOrdinal
ColNames []string
ReqOrdering exec.OutputOrdering
}

# SerializingProject is similar to SimpleProject, but it allows renaming of
# columns and forces distributed execution to serialize (merge) any parallel
# result streams into a single stream before the projection. This allows any
# required output ordering of the input node to be "materialized", which is
# important for cases where the projection no longer contains the ordering
# columns (e.g. SELECT a FROM t ORDER BY b).
#
# Typically used as the "root" (top-level) operator to ensure the correct
# ordering and naming of columns.
define SerializingProject {
Input exec.Node
Cols []exec.NodeColumnOrdinal
ColNames []string
}

# Render applies a projection on the results of the given input node. The
# projection can contain new expressions. The input expression slice will be
# modified.
Expand Down Expand Up @@ -323,12 +337,6 @@ define Window {
Window exec.WindowInfo
}

# RenameColumns modifies the column names of a node.
define RenameColumns {
Input exec.Node
ColNames []string
}

# Explain implements EXPLAIN (OPT), showing information about the given plan.
define ExplainOpt {
Plan string
Expand Down
42 changes: 32 additions & 10 deletions pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,13 @@ func (ef *execFactory) ConstructInvertedFilter(

// ConstructSimpleProject is part of the exec.Factory interface.
func (ef *execFactory) ConstructSimpleProject(
n exec.Node, cols []exec.NodeColumnOrdinal, colNames []string, reqOrdering exec.OutputOrdering,
n exec.Node, cols []exec.NodeColumnOrdinal, reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
return constructSimpleProjectForPlanNode(n.(planNode), cols, nil /* colNames */, reqOrdering)
}

func constructSimpleProjectForPlanNode(
n planNode, cols []exec.NodeColumnOrdinal, colNames []string, reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
// If the top node is already a renderNode, just rearrange the columns. But
// we don't want to duplicate a rendering expression (in case it is expensive
Expand Down Expand Up @@ -250,6 +256,31 @@ func hasDuplicates(cols []exec.NodeColumnOrdinal) bool {
return false
}

// ConstructSerializingProject is part of the exec.Factory interface.
func (ef *execFactory) ConstructSerializingProject(
n exec.Node, cols []exec.NodeColumnOrdinal, colNames []string,
) (exec.Node, error) {
node := n.(planNode)
// If we are just renaming columns, we can do that in place.
if len(cols) == len(planColumns(node)) {
identity := true
for i := range cols {
if cols[i] != exec.NodeColumnOrdinal(i) {
identity = false
break
}
}
if identity {
inputCols := planMutableColumns(node)
for i := range inputCols {
inputCols[i].Name = colNames[i]
}
return n, nil
}
}
return constructSimpleProjectForPlanNode(node, cols, colNames, nil /* reqOrdering */)
}

// ConstructRender is part of the exec.Factory interface.
// N.B.: The input exprs will be modified.
func (ef *execFactory) ConstructRender(
Expand All @@ -267,15 +298,6 @@ func (ef *execFactory) ConstructRender(
return rb.res, nil
}

// ConstructRenameColumns is part of the exec.Factory interface.
func (ef *execFactory) ConstructRenameColumns(n exec.Node, colNames []string) (exec.Node, error) {
inputCols := planMutableColumns(n.(planNode))
for i := range inputCols {
inputCols[i].Name = colNames[i]
}
return n, nil
}

// ConstructHashJoin is part of the exec.Factory interface.
func (ef *execFactory) ConstructHashJoin(
joinType sqlbase.JoinType,
Expand Down

0 comments on commit af11fa9

Please sign in to comment.