Skip to content

Commit

Permalink
sql: improve physical planning of window functions
Browse files Browse the repository at this point in the history
This commit improves the physical planning of window functions in
several ways.

First, the optimizer is updated so that all window functions with a
PARTITION BY clause are constructed first followed by the remaining
window functions without PARTITION BY. This is needed by the execution
which can only evaluate functions with PARTITION BY in the distributed
fashion - as a result of this change, we are now more likely to get
partial distributed execution (previously things depended on the order
in which window functions were mentioned in the query).

Second, the physical planner now thinks that we "should distribute" the
plan if it finds at least one window function with PARTITION BY clause.
Previously, we didn't make any recommendation about the distribution
based on the presence of the window functions (i.e. we relied on the
rest of the plan to do so), but they can be quite computation-intensive,
so whenever we can distribute the execution, we should do so.

Additionally, this commit removes some of the code in the physical
planner which tries to find window functions with the same PARTITION BY
and ORDER BY clauses - that code has been redundant for long time given
that the optimizer does that too.

Release note: None
  • Loading branch information
yuzefovich committed Aug 1, 2022
1 parent f8b8d4c commit 561383e
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 218 deletions.
13 changes: 13 additions & 0 deletions pkg/sql/catalog/colinfo/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ type ColumnOrderInfo struct {
// represents an ordering first by column 3 (descending), then by column 1 (ascending).
type ColumnOrdering []ColumnOrderInfo

// Equal returns whether two ColumnOrderings are the same.
func (ordering ColumnOrdering) Equal(other ColumnOrdering) bool {
if len(ordering) != len(other) {
return false
}
for i, o := range ordering {
if o.ColIdx != other[i].ColIdx || o.Direction != other[i].Direction {
return false
}
}
return true
}

func (ordering ColumnOrdering) String(columns ResultColumns) string {
var buf bytes.Buffer
fmtCtx := tree.NewFmtCtx(tree.FmtSimple)
Expand Down
234 changes: 122 additions & 112 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,18 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return canDistribute, nil

case *windowNode:
return checkSupportForPlanNode(n.plan)
rec, err := checkSupportForPlanNode(n.plan)
if err != nil {
return cannotDistribute, err
}
for _, f := range n.funcs {
if len(f.partitionIdxs) > 0 {
// If at least one function has PARTITION BY clause, then we
// should distribute the execution.
return rec.compose(shouldDistribute), nil
}
}
return rec.compose(canDistribute), nil

case *zeroNode:
return canDistribute, nil
Expand Down Expand Up @@ -3730,9 +3741,8 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
return p, nil
}

// createPlanForWindow creates a physical plan for computing window functions.
// We add a new stage of windower processors for each different partitioning
// scheme found in the query's window functions.
// createPlanForWindow creates a physical plan for computing window functions
// that have the same PARTITION BY and ORDER BY clauses.
func (dsp *DistSQLPlanner) createPlanForWindow(
ctx context.Context, planCtx *PlanningCtx, n *windowNode,
) (*PhysicalPlan, error) {
Expand All @@ -3741,132 +3751,132 @@ func (dsp *DistSQLPlanner) createPlanForWindow(
return nil, err
}

numWindowFuncProcessed := 0
windowPlanState := createWindowPlanState(n, planCtx, plan)
// Each iteration of this loop adds a new stage of windowers. The steps taken:
// 1. find a set of unprocessed window functions that have the same PARTITION BY
// clause. All of these will be computed using the single stage of windowers.
// 2. a) populate output types of the current stage of windowers. All input
// columns are being passed through, and windower will append output
// columns for each window function processed at the stage.
// b) create specs for all window functions in the set.
// 3. decide whether to put windowers on a single or on multiple nodes.
// a) if we're putting windowers on multiple nodes, we'll put them onto
// every node that participated in the previous stage. We leverage hash
// routers to partition the data based on PARTITION BY clause of window
// functions in the set.
for numWindowFuncProcessed < len(n.funcs) {
samePartitionFuncs, partitionIdxs := windowPlanState.findUnprocessedWindowFnsWithSamePartition()
numWindowFuncProcessed += len(samePartitionFuncs)
windowerSpec := execinfrapb.WindowerSpec{
PartitionBy: partitionIdxs,
WindowFns: make([]execinfrapb.WindowerSpec_WindowFn, len(samePartitionFuncs)),
}

newResultTypes := make([]*types.T, len(plan.GetResultTypes())+len(samePartitionFuncs))
copy(newResultTypes, plan.GetResultTypes())
for windowFnSpecIdx, windowFn := range samePartitionFuncs {
windowFnSpec, outputType, err := windowPlanState.createWindowFnSpec(windowFn)
if err != nil {
return nil, err
}
newResultTypes[windowFn.outputColIdx] = outputType
windowerSpec.WindowFns[windowFnSpecIdx] = windowFnSpec
partitionIdxs := make([]uint32, len(n.funcs[0].partitionIdxs))
for i := range partitionIdxs {
partitionIdxs[i] = uint32(n.funcs[0].partitionIdxs[i])
}

// Check that all window functions have the same PARTITION BY and ORDER BY
// clauses. We can assume that because the optbuilder ensures that all
// window functions in the windowNode have the same PARTITION BY and ORDER
// BY clauses.
for _, f := range n.funcs[1:] {
if !n.funcs[0].samePartition(f) {
return nil, errors.AssertionFailedf(
"PARTITION BY clauses of window functions handled by the same "+
"windowNode are different: %v, %v", n.funcs[0].partitionIdxs, f.partitionIdxs,
)
}
if !n.funcs[0].columnOrdering.Equal(f.columnOrdering) {
return nil, errors.AssertionFailedf(
"ORDER BY clauses of window functions handled by the same "+
"windowNode are different: %v, %v", n.funcs[0].columnOrdering, f.columnOrdering,
)
}
}

// Check if the previous stage is all on one node.
prevStageNode := plan.Processors[plan.ResultRouters[0]].SQLInstanceID
for i := 1; i < len(plan.ResultRouters); i++ {
if n := plan.Processors[plan.ResultRouters[i]].SQLInstanceID; n != prevStageNode {
prevStageNode = 0
break
}
// All window functions in the windowNode will be computed using the single
// stage of windowers (because they have the same PARTITION BY and ORDER BY
// clauses). All input columns are being passed through, and windower will
// append output columns for each window function processed at the current
// stage.
windowerSpec := execinfrapb.WindowerSpec{
PartitionBy: partitionIdxs,
WindowFns: make([]execinfrapb.WindowerSpec_WindowFn, len(n.funcs)),
}

newResultTypes := make([]*types.T, len(plan.GetResultTypes())+len(n.funcs))
copy(newResultTypes, plan.GetResultTypes())
for windowFnSpecIdx, windowFn := range n.funcs {
windowFnSpec, outputType, err := createWindowFnSpec(planCtx, plan, windowFn)
if err != nil {
return nil, err
}
newResultTypes[windowFn.outputColIdx] = outputType
windowerSpec.WindowFns[windowFnSpecIdx] = windowFnSpec
}

// Get all sqlInstanceIDs from the previous stage.
sqlInstanceIDs := getSQLInstanceIDsOfRouters(plan.ResultRouters, plan.Processors)
if len(partitionIdxs) == 0 || len(sqlInstanceIDs) == 1 {
// No PARTITION BY or we have a single node. Use a single windower.
// If the previous stage was all on a single node, put the windower
// there. Otherwise, bring the results back on this node.
sqlInstanceID := dsp.gatewaySQLInstanceID
if len(sqlInstanceIDs) == 1 {
sqlInstanceID = sqlInstanceIDs[0]
// Get all sqlInstanceIDs from the previous stage.
sqlInstanceIDs := getSQLInstanceIDsOfRouters(plan.ResultRouters, plan.Processors)
if len(partitionIdxs) == 0 || len(sqlInstanceIDs) == 1 {
// No PARTITION BY or we have a single node. Use a single windower. If
// the previous stage was all on a single node, put the windower there.
// Otherwise, bring the results back on this node.
sqlInstanceID := dsp.gatewaySQLInstanceID
if len(sqlInstanceIDs) == 1 {
sqlInstanceID = sqlInstanceIDs[0]
}
plan.AddSingleGroupStage(
sqlInstanceID,
execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec},
execinfrapb.PostProcessSpec{},
newResultTypes,
)
} else {
// Set up the output routers from the previous stage. We use hash
// routers with hashing on the columns from PARTITION BY clause of
// window functions we're processing in the current stage.
for _, resultProc := range plan.ResultRouters {
plan.Processors[resultProc].Spec.Output[0] = execinfrapb.OutputRouterSpec{
Type: execinfrapb.OutputRouterSpec_BY_HASH,
HashColumns: partitionIdxs,
}
plan.AddSingleGroupStage(
sqlInstanceID,
execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec},
execinfrapb.PostProcessSpec{},
newResultTypes,
)
} else {
// Set up the output routers from the previous stage.
// We use hash routers with hashing on the columns
// from PARTITION BY clause of window functions
// we're processing in the current stage.
for _, resultProc := range plan.ResultRouters {
plan.Processors[resultProc].Spec.Output[0] = execinfrapb.OutputRouterSpec{
Type: execinfrapb.OutputRouterSpec_BY_HASH,
HashColumns: partitionIdxs,
}
}
// We have multiple streams, so we definitely have a processor planned
// on a remote node.
stageID := plan.NewStage(true /* containsRemoteProcessor */, false /* allowPartialDistribution */)

// We put a windower on each node and we connect it with all hash
// routers from the previous stage in a such way that each node has its
// designated SourceRouterSlot - namely, position in which a node
// appears in nodes.
prevStageRouters := plan.ResultRouters
prevStageResultTypes := plan.GetResultTypes()
plan.ResultRouters = make([]physicalplan.ProcessorIdx, 0, len(sqlInstanceIDs))
for bucket, sqlInstanceID := range sqlInstanceIDs {
proc := physicalplan.Processor{
SQLInstanceID: sqlInstanceID,
Spec: execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{
Type: execinfrapb.InputSyncSpec_PARALLEL_UNORDERED,
ColumnTypes: prevStageResultTypes,
}},
Core: execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec},
Post: execinfrapb.PostProcessSpec{},
Output: []execinfrapb.OutputRouterSpec{{
Type: execinfrapb.OutputRouterSpec_PASS_THROUGH,
}},
StageID: stageID,
ResultTypes: newResultTypes,
},
}
// We have multiple streams, so we definitely have a processor planned
// on a remote node.
stageID := plan.NewStage(true /* containsRemoteProcessor */, false /* allowPartialDistribution */)

// We put a windower on each node and we connect it
// with all hash routers from the previous stage in
// a such way that each node has its designated
// SourceRouterSlot - namely, position in which
// a node appears in nodes.
prevStageRouters := plan.ResultRouters
prevStageResultTypes := plan.GetResultTypes()
plan.ResultRouters = make([]physicalplan.ProcessorIdx, 0, len(sqlInstanceIDs))
for bucket, sqlInstanceID := range sqlInstanceIDs {
proc := physicalplan.Processor{
SQLInstanceID: sqlInstanceID,
Spec: execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{
Type: execinfrapb.InputSyncSpec_PARALLEL_UNORDERED,
ColumnTypes: prevStageResultTypes,
}},
Core: execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec},
Post: execinfrapb.PostProcessSpec{},
Output: []execinfrapb.OutputRouterSpec{{
Type: execinfrapb.OutputRouterSpec_PASS_THROUGH,
}},
StageID: stageID,
ResultTypes: newResultTypes,
},
}
pIdx := plan.AddProcessor(proc)

for _, router := range prevStageRouters {
plan.Streams = append(plan.Streams, physicalplan.Stream{
SourceProcessor: router,
SourceRouterSlot: bucket,
DestProcessor: pIdx,
DestInput: 0,
})
}
plan.ResultRouters = append(plan.ResultRouters, pIdx)
pIdx := plan.AddProcessor(proc)

for _, router := range prevStageRouters {
plan.Streams = append(plan.Streams, physicalplan.Stream{
SourceProcessor: router,
SourceRouterSlot: bucket,
DestProcessor: pIdx,
DestInput: 0,
})
}
plan.ResultRouters = append(plan.ResultRouters, pIdx)
}
}

// We definitely added columns throughout all the stages of windowers, so we
// need to update PlanToStreamColMap. We need to update the map before adding
// rendering or projection because it is used there.
// We definitely added new columns, so we need to update PlanToStreamColMap.
// We need to update the map before adding rendering or projection because
// it is used there.
plan.PlanToStreamColMap = identityMap(plan.PlanToStreamColMap, len(plan.GetResultTypes()))

// windowers do not guarantee maintaining the order at the moment, so we
// reset MergeOrdering. There shouldn't be an ordering here, but we reset it
// defensively (see #35179).
plan.SetMergeOrdering(execinfrapb.Ordering{})

// After all window functions are computed, we need to add rendering or
// After the window functions are computed, we need to add rendering or
// projection.
if err := windowPlanState.addRenderingOrProjection(); err != nil {
if err := addRenderingOrProjection(n, planCtx, plan); err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit 561383e

Please sign in to comment.