Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: add SerializingProject exec primitive #52386

Merged
merged 1 commit into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,27 +347,37 @@ func (e *distSQLSpecExecFactory) ConstructInvertedFilter(
}

func (e *distSQLSpecExecFactory) ConstructSimpleProject(
n exec.Node, cols []exec.NodeColumnOrdinal, colNames []string, reqOrdering exec.OutputOrdering,
n exec.Node, cols []exec.NodeColumnOrdinal, reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
physPlan, plan := getPhysPlan(n)
projection := make([]uint32, len(cols))
for i := range cols {
projection[i] = uint32(cols[i])
projection[i] = uint32(cols[physPlan.PlanToStreamColMap[i]])
}
physPlan.AddProjection(projection)
physPlan.ResultColumns = getResultColumnsForSimpleProject(cols, colNames, physPlan.ResultTypes, physPlan.ResultColumns)
physPlan.ResultColumns = getResultColumnsForSimpleProject(
cols, nil /* colNames */, physPlan.ResultTypes, physPlan.ResultColumns,
)
physPlan.PlanToStreamColMap = identityMap(physPlan.PlanToStreamColMap, len(cols))
if reqOrdering == nil {
// When reqOrdering is nil, we're adding a top-level (i.e. "final")
// projection. In such scenario we need to be careful to not simply
// reset the merge ordering that is currently set on the plan - we do
// so by merging the streams on the gateway node.
physPlan.EnsureSingleStreamOnGateway()
}
physPlan.SetMergeOrdering(e.dsp.convertOrdering(ReqOrdering(reqOrdering), physPlan.PlanToStreamColMap))
return plan, nil
}

func (e *distSQLSpecExecFactory) ConstructSerializingProject(
n exec.Node, cols []exec.NodeColumnOrdinal, colNames []string,
) (exec.Node, error) {
physPlan, plan := getPhysPlan(n)
physPlan.EnsureSingleStreamOnGateway()
projection := make([]uint32, len(cols))
for i := range cols {
projection[i] = uint32(cols[physPlan.PlanToStreamColMap[i]])
}
physPlan.AddProjection(projection)
physPlan.ResultColumns = getResultColumnsForSimpleProject(cols, colNames, physPlan.ResultTypes, physPlan.ResultColumns)
physPlan.PlanToStreamColMap = identityMap(physPlan.PlanToStreamColMap, len(cols))
return plan, nil
}

func (e *distSQLSpecExecFactory) ConstructRender(
n exec.Node,
columns sqlbase.ResultColumns,
Expand Down Expand Up @@ -721,16 +731,6 @@ func (e *distSQLSpecExecFactory) ConstructWindow(
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: window")
}

func (e *distSQLSpecExecFactory) ConstructRenameColumns(
input exec.Node, colNames []string,
) (exec.Node, error) {
inputCols := input.(planMaybePhysical).physPlan.ResultColumns
for i := range inputCols {
inputCols[i].Name = colNames[i]
}
return input, nil
}

func (e *distSQLSpecExecFactory) ConstructPlan(
root exec.Node, subqueries []exec.Subquery, cascades []exec.Cascade, checks []exec.Node,
) (exec.Plan, error) {
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/exec_factory_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,10 @@ func constructVirtualScan(
if needed := params.NeededCols; needed.Len() != len(columns) {
// We are selecting a subset of columns; we need a projection.
cols := make([]exec.NodeColumnOrdinal, 0, needed.Len())
colNames := make([]string, len(cols))
for ord, ok := needed.Next(0); ok; ord, ok = needed.Next(ord + 1) {
cols = append(cols, exec.NodeColumnOrdinal(ord-1))
colNames = append(colNames, columns[ord-1].Name)
}
n, err = ef.ConstructSimpleProject(n, cols, colNames, nil /* reqOrdering */)
n, err = ef.ConstructSimpleProject(n, cols, nil /* reqOrdering */)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (b *Builder) buildMutationInput(
}
}

input, err = b.ensureColumns(input, colList, nil, inputExpr.ProvidedPhysical().Ordering)
input, err = b.ensureColumns(input, colList, inputExpr.ProvidedPhysical().Ordering)
if err != nil {
return execPlan{}, err
}
Expand Down
58 changes: 20 additions & 38 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) {

// Wrap the expression in a render expression if presentation requires it.
if p := e.RequiredPhysical(); !p.Presentation.Any() {
ep, err = b.applyPresentation(ep, p)
ep, err = b.applyPresentation(ep, p.Presentation)
}
return ep, err
}
Expand Down Expand Up @@ -608,7 +608,7 @@ func (b *Builder) applySimpleProject(
})
var err error
res.root, err = b.factory.ConstructSimpleProject(
input.root, colList, nil /* colNames */, exec.OutputOrdering(res.sqlOrdering(providedOrd)),
input.root, colList, exec.OutputOrdering(res.sqlOrdering(providedOrd)),
)
if err != nil {
return execPlan{}, err
Expand Down Expand Up @@ -1255,9 +1255,7 @@ func (b *Builder) buildDistinct(distinct memo.RelExpr) (execPlan, error) {
if input.outputCols.Len() == outCols.Len() {
return ep, nil
}
return b.ensureColumns(
ep, opt.ColSetToList(outCols), nil /* colNames */, distinct.ProvidedPhysical().Ordering,
)
return b.ensureColumns(ep, opt.ColSetToList(outCols), distinct.ProvidedPhysical().Ordering)
}

func (b *Builder) buildGroupByInput(groupBy memo.RelExpr) (execPlan, error) {
Expand Down Expand Up @@ -1309,9 +1307,7 @@ func (b *Builder) buildGroupByInput(groupBy memo.RelExpr) (execPlan, error) {

input.outputCols = newOutputCols
reqOrdering := input.reqOrdering(groupByInput)
input.root, err = b.factory.ConstructSimpleProject(
input.root, cols, nil /* colNames */, reqOrdering,
)
input.root, err = b.factory.ConstructSimpleProject(input.root, cols, reqOrdering)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -1351,15 +1347,11 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) {
// Note that (unless this is part of a larger query) the presentation property
// will ensure that the columns are presented correctly in the output (i.e. in
// the order `b, c, a`).
left, err = b.ensureColumns(
left, private.LeftCols, nil /* colNames */, leftExpr.ProvidedPhysical().Ordering,
)
left, err = b.ensureColumns(left, private.LeftCols, leftExpr.ProvidedPhysical().Ordering)
if err != nil {
return execPlan{}, err
}
right, err = b.ensureColumns(
right, private.RightCols, nil /* colNames */, rightExpr.ProvidedPhysical().Ordering,
)
right, err = b.ensureColumns(right, private.RightCols, rightExpr.ProvidedPhysical().Ordering)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -1759,7 +1751,7 @@ func (b *Builder) buildRecursiveCTE(rec *memo.RecursiveCTEExpr) (execPlan, error
}

// Make sure we have the columns in the correct order.
initial, err = b.ensureColumns(initial, rec.InitialCols, nil /* colNames */, nil /* ordering */)
initial, err = b.ensureColumns(initial, rec.InitialCols, nil /* ordering */)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -1794,9 +1786,7 @@ func (b *Builder) buildRecursiveCTE(rec *memo.RecursiveCTEExpr) (execPlan, error
return nil, err
}
// Ensure columns are output in the same order.
plan, err = innerBld.ensureColumns(
plan, rec.RecursiveCols, nil /* colNames */, nil, /* ordering */
)
plan, err = innerBld.ensureColumns(plan, rec.RecursiveCols, opt.Ordering{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1856,7 +1846,7 @@ func (b *Builder) buildWithScan(withScan *memo.WithScanExpr) (execPlan, error) {
res.outputCols.Set(int(withScan.OutCols[i]), i)
}
res.root, err = b.factory.ConstructSimpleProject(
res.root, cols, nil, /* colNames */
res.root, cols,
exec.OutputOrdering(res.sqlOrdering(withScan.ProvidedPhysical().Ordering)),
)
if err != nil {
Expand Down Expand Up @@ -2031,7 +2021,7 @@ func (b *Builder) buildWindow(w *memo.WindowExpr) (execPlan, error) {
// TODO(justin): this call to ensureColumns is kind of unfortunate because it
// can result in an extra render beneath each window function. Figure out a
// way to alleviate this.
input, err = b.ensureColumns(input, desiredCols, nil, opt.Ordering{})
input, err = b.ensureColumns(input, desiredCols, opt.Ordering{})
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -2257,18 +2247,10 @@ func (b *Builder) needProjection(
// ensureColumns applies a projection as necessary to make the output match the
// given list of columns; colNames is optional.
func (b *Builder) ensureColumns(
input execPlan, colList opt.ColList, colNames []string, provided opt.Ordering,
input execPlan, colList opt.ColList, provided opt.Ordering,
) (execPlan, error) {
cols, needProj := b.needProjection(input, colList)
if !needProj {
// No projection necessary.
if colNames != nil {
var err error
input.root, err = b.factory.ConstructRenameColumns(input.root, colNames)
if err != nil {
return execPlan{}, err
}
}
return input, nil
}
var res execPlan
Expand All @@ -2277,24 +2259,24 @@ func (b *Builder) ensureColumns(
}
reqOrdering := exec.OutputOrdering(res.sqlOrdering(provided))
var err error
res.root, err = b.factory.ConstructSimpleProject(input.root, cols, colNames, reqOrdering)
res.root, err = b.factory.ConstructSimpleProject(input.root, cols, reqOrdering)
return res, err
}

// applyPresentation adds a projection to a plan to satisfy a required
// Presentation property.
func (b *Builder) applyPresentation(input execPlan, p *physical.Required) (execPlan, error) {
pres := p.Presentation
colList := make(opt.ColList, len(pres))
func (b *Builder) applyPresentation(input execPlan, pres physical.Presentation) (execPlan, error) {
cols := make([]exec.NodeColumnOrdinal, len(pres))
colNames := make([]string, len(pres))
var res execPlan
for i := range pres {
colList[i] = pres[i].ID
cols[i] = input.getNodeColumnOrdinal(pres[i].ID)
res.outputCols.Set(int(pres[i].ID), i)
colNames[i] = pres[i].Alias
}
// The ordering is not useful for a top-level projection (it is used by the
// distsql planner for internal nodes); we might not even be able to represent
// it because it can refer to columns not in the presentation.
return b.ensureColumns(input, colList, colNames, nil /* provided */)
var err error
res.root, err = b.factory.ConstructSerializingProject(input.root, cols, colNames)
return res, err
}

// getEnvData consolidates the information that must be presented in
Expand Down
8 changes: 1 addition & 7 deletions pkg/sql/opt/exec/execbuilder/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,7 @@ func (b *Builder) buildCreateTable(ct *memo.CreateTableExpr) (execPlan, error) {
// Impose ordering and naming on input columns, so that they match the
// order and names of the table columns into which values will be
// inserted.
colList := make(opt.ColList, len(ct.InputCols))
colNames := make([]string, len(ct.InputCols))
for i := range ct.InputCols {
colList[i] = ct.InputCols[i].ID
colNames[i] = ct.InputCols[i].Alias
}
input, err = b.ensureColumns(input, colList, colNames, nil /* provided */)
input, err = b.applyPresentation(input, ct.InputCols)
if err != nil {
return execPlan{}, err
}
Expand Down
22 changes: 15 additions & 7 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,24 @@ define InvertedFilter {
define SimpleProject {
Input exec.Node
Cols []exec.NodeColumnOrdinal
ColNames []string
ReqOrdering exec.OutputOrdering
}

# SerializingProject is similar to SimpleProject, but it allows renaming of
# columns and forces distributed execution to serialize (merge) any parallel
# result streams into a single stream before the projection. This allows any
# required output ordering of the input node to be "materialized", which is
# important for cases where the projection no longer contains the ordering
# columns (e.g. SELECT a FROM t ORDER BY b).
#
# Typically used as the "root" (top-level) operator to ensure the correct
# ordering and naming of columns.
define SerializingProject {
Input exec.Node
Cols []exec.NodeColumnOrdinal
ColNames []string
}

# Render applies a projection on the results of the given input node. The
# projection can contain new expressions. The input expression slice will be
# modified.
Expand Down Expand Up @@ -323,12 +337,6 @@ define Window {
Window exec.WindowInfo
}

# RenameColumns modifies the column names of a node.
define RenameColumns {
Input exec.Node
ColNames []string
}

# Explain implements EXPLAIN (OPT), showing information about the given plan.
define ExplainOpt {
Plan string
Expand Down
42 changes: 32 additions & 10 deletions pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,13 @@ func (ef *execFactory) ConstructInvertedFilter(

// ConstructSimpleProject is part of the exec.Factory interface.
func (ef *execFactory) ConstructSimpleProject(
n exec.Node, cols []exec.NodeColumnOrdinal, colNames []string, reqOrdering exec.OutputOrdering,
n exec.Node, cols []exec.NodeColumnOrdinal, reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
return constructSimpleProjectForPlanNode(n.(planNode), cols, nil /* colNames */, reqOrdering)
}

func constructSimpleProjectForPlanNode(
n planNode, cols []exec.NodeColumnOrdinal, colNames []string, reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
// If the top node is already a renderNode, just rearrange the columns. But
// we don't want to duplicate a rendering expression (in case it is expensive
Expand Down Expand Up @@ -250,6 +256,31 @@ func hasDuplicates(cols []exec.NodeColumnOrdinal) bool {
return false
}

// ConstructSerializingProject is part of the exec.Factory interface.
func (ef *execFactory) ConstructSerializingProject(
n exec.Node, cols []exec.NodeColumnOrdinal, colNames []string,
) (exec.Node, error) {
node := n.(planNode)
// If we are just renaming columns, we can do that in place.
if len(cols) == len(planColumns(node)) {
identity := true
for i := range cols {
if cols[i] != exec.NodeColumnOrdinal(i) {
identity = false
break
}
}
if identity {
inputCols := planMutableColumns(node)
for i := range inputCols {
inputCols[i].Name = colNames[i]
}
return n, nil
}
}
return constructSimpleProjectForPlanNode(node, cols, colNames, nil /* reqOrdering */)
}

// ConstructRender is part of the exec.Factory interface.
// N.B.: The input exprs will be modified.
func (ef *execFactory) ConstructRender(
Expand All @@ -267,15 +298,6 @@ func (ef *execFactory) ConstructRender(
return rb.res, nil
}

// ConstructRenameColumns is part of the exec.Factory interface.
func (ef *execFactory) ConstructRenameColumns(n exec.Node, colNames []string) (exec.Node, error) {
inputCols := planMutableColumns(n.(planNode))
for i := range inputCols {
inputCols[i].Name = colNames[i]
}
return n, nil
}

// ConstructHashJoin is part of the exec.Factory interface.
func (ef *execFactory) ConstructHashJoin(
joinType sqlbase.JoinType,
Expand Down