Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: improve physical planning of window functions #85355

Merged
merged 2 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/sql/catalog/colinfo/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ type ColumnOrderInfo struct {
// represents an ordering first by column 3 (descending), then by column 1 (ascending).
type ColumnOrdering []ColumnOrderInfo

// Equal returns whether two ColumnOrderings are the same.
func (ordering ColumnOrdering) Equal(other ColumnOrdering) bool {
if len(ordering) != len(other) {
return false
}
for i, o := range ordering {
if o.ColIdx != other[i].ColIdx || o.Direction != other[i].Direction {
return false
}
}
return true
}

func (ordering ColumnOrdering) String(columns ResultColumns) string {
var buf bytes.Buffer
fmtCtx := tree.NewFmtCtx(tree.FmtSimple)
Expand Down
247 changes: 125 additions & 122 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,27 +312,20 @@ const (
// cannotDistribute indicates that a plan cannot be distributed.
cannotDistribute distRecommendation = iota

// shouldNotDistribute indicates that a plan could suffer if distributed.
shouldNotDistribute

// canDistribute indicates that a plan will probably not benefit but will
// probably not suffer if distributed.
// canDistribute indicates that a plan can be distributed, but it's not
// clear whether it'll be benefit from that.
canDistribute

// shouldDistribute indicates that a plan will likely benefit if distributed.
shouldDistribute
)

// compose returns the recommendation for a plan given recommendations for two
// parts of it: if we shouldNotDistribute either part, then we
// shouldNotDistribute the overall plan either.
// parts of it.
func (a distRecommendation) compose(b distRecommendation) distRecommendation {
if a == cannotDistribute || b == cannotDistribute {
return cannotDistribute
}
if a == shouldNotDistribute || b == shouldNotDistribute {
return shouldNotDistribute
}
if a == shouldDistribute || b == shouldDistribute {
return shouldDistribute
}
Expand Down Expand Up @@ -628,7 +621,18 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return canDistribute, nil

case *windowNode:
return checkSupportForPlanNode(n.plan)
rec, err := checkSupportForPlanNode(n.plan)
if err != nil {
return cannotDistribute, err
}
for _, f := range n.funcs {
if len(f.partitionIdxs) > 0 {
// If at least one function has PARTITION BY clause, then we
// should distribute the execution.
return rec.compose(shouldDistribute), nil
}
}
return rec.compose(canDistribute), nil

case *zeroNode:
return canDistribute, nil
Expand Down Expand Up @@ -3737,9 +3741,8 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
return p, nil
}

// createPlanForWindow creates a physical plan for computing window functions.
// We add a new stage of windower processors for each different partitioning
// scheme found in the query's window functions.
// createPlanForWindow creates a physical plan for computing window functions
// that have the same PARTITION BY and ORDER BY clauses.
func (dsp *DistSQLPlanner) createPlanForWindow(
ctx context.Context, planCtx *PlanningCtx, n *windowNode,
) (*PhysicalPlan, error) {
Expand All @@ -3748,132 +3751,132 @@ func (dsp *DistSQLPlanner) createPlanForWindow(
return nil, err
}

numWindowFuncProcessed := 0
windowPlanState := createWindowPlanState(n, planCtx, plan)
// Each iteration of this loop adds a new stage of windowers. The steps taken:
// 1. find a set of unprocessed window functions that have the same PARTITION BY
// clause. All of these will be computed using the single stage of windowers.
// 2. a) populate output types of the current stage of windowers. All input
// columns are being passed through, and windower will append output
// columns for each window function processed at the stage.
// b) create specs for all window functions in the set.
// 3. decide whether to put windowers on a single or on multiple nodes.
// a) if we're putting windowers on multiple nodes, we'll put them onto
// every node that participated in the previous stage. We leverage hash
// routers to partition the data based on PARTITION BY clause of window
// functions in the set.
for numWindowFuncProcessed < len(n.funcs) {
samePartitionFuncs, partitionIdxs := windowPlanState.findUnprocessedWindowFnsWithSamePartition()
numWindowFuncProcessed += len(samePartitionFuncs)
windowerSpec := execinfrapb.WindowerSpec{
PartitionBy: partitionIdxs,
WindowFns: make([]execinfrapb.WindowerSpec_WindowFn, len(samePartitionFuncs)),
}

newResultTypes := make([]*types.T, len(plan.GetResultTypes())+len(samePartitionFuncs))
copy(newResultTypes, plan.GetResultTypes())
for windowFnSpecIdx, windowFn := range samePartitionFuncs {
windowFnSpec, outputType, err := windowPlanState.createWindowFnSpec(windowFn)
if err != nil {
return nil, err
}
newResultTypes[windowFn.outputColIdx] = outputType
windowerSpec.WindowFns[windowFnSpecIdx] = windowFnSpec
partitionIdxs := make([]uint32, len(n.funcs[0].partitionIdxs))
for i := range partitionIdxs {
partitionIdxs[i] = uint32(n.funcs[0].partitionIdxs[i])
}

// Check that all window functions have the same PARTITION BY and ORDER BY
// clauses. We can assume that because the optbuilder ensures that all
// window functions in the windowNode have the same PARTITION BY and ORDER
// BY clauses.
for _, f := range n.funcs[1:] {
if !n.funcs[0].samePartition(f) {
return nil, errors.AssertionFailedf(
"PARTITION BY clauses of window functions handled by the same "+
"windowNode are different: %v, %v", n.funcs[0].partitionIdxs, f.partitionIdxs,
)
}
if !n.funcs[0].columnOrdering.Equal(f.columnOrdering) {
return nil, errors.AssertionFailedf(
"ORDER BY clauses of window functions handled by the same "+
"windowNode are different: %v, %v", n.funcs[0].columnOrdering, f.columnOrdering,
)
}
}

// Check if the previous stage is all on one node.
prevStageNode := plan.Processors[plan.ResultRouters[0]].SQLInstanceID
for i := 1; i < len(plan.ResultRouters); i++ {
if n := plan.Processors[plan.ResultRouters[i]].SQLInstanceID; n != prevStageNode {
prevStageNode = 0
break
}
// All window functions in the windowNode will be computed using the single
// stage of windowers (because they have the same PARTITION BY and ORDER BY
// clauses). All input columns are being passed through, and windower will
// append output columns for each window function processed at the current
// stage.
windowerSpec := execinfrapb.WindowerSpec{
PartitionBy: partitionIdxs,
WindowFns: make([]execinfrapb.WindowerSpec_WindowFn, len(n.funcs)),
}

newResultTypes := make([]*types.T, len(plan.GetResultTypes())+len(n.funcs))
copy(newResultTypes, plan.GetResultTypes())
for windowFnSpecIdx, windowFn := range n.funcs {
windowFnSpec, outputType, err := createWindowFnSpec(planCtx, plan, windowFn)
if err != nil {
return nil, err
}
newResultTypes[windowFn.outputColIdx] = outputType
windowerSpec.WindowFns[windowFnSpecIdx] = windowFnSpec
}

// Get all sqlInstanceIDs from the previous stage.
sqlInstanceIDs := getSQLInstanceIDsOfRouters(plan.ResultRouters, plan.Processors)
if len(partitionIdxs) == 0 || len(sqlInstanceIDs) == 1 {
// No PARTITION BY or we have a single node. Use a single windower.
// If the previous stage was all on a single node, put the windower
// there. Otherwise, bring the results back on this node.
sqlInstanceID := dsp.gatewaySQLInstanceID
if len(sqlInstanceIDs) == 1 {
sqlInstanceID = sqlInstanceIDs[0]
// Get all sqlInstanceIDs from the previous stage.
sqlInstanceIDs := getSQLInstanceIDsOfRouters(plan.ResultRouters, plan.Processors)
if len(partitionIdxs) == 0 || len(sqlInstanceIDs) == 1 {
// No PARTITION BY or we have a single node. Use a single windower. If
// the previous stage was all on a single node, put the windower there.
// Otherwise, bring the results back on this node.
sqlInstanceID := dsp.gatewaySQLInstanceID
if len(sqlInstanceIDs) == 1 {
sqlInstanceID = sqlInstanceIDs[0]
}
plan.AddSingleGroupStage(
sqlInstanceID,
execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec},
execinfrapb.PostProcessSpec{},
newResultTypes,
)
} else {
// Set up the output routers from the previous stage. We use hash
// routers with hashing on the columns from PARTITION BY clause of
// window functions we're processing in the current stage.
for _, resultProc := range plan.ResultRouters {
plan.Processors[resultProc].Spec.Output[0] = execinfrapb.OutputRouterSpec{
Type: execinfrapb.OutputRouterSpec_BY_HASH,
HashColumns: partitionIdxs,
}
plan.AddSingleGroupStage(
sqlInstanceID,
execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec},
execinfrapb.PostProcessSpec{},
newResultTypes,
)
} else {
// Set up the output routers from the previous stage.
// We use hash routers with hashing on the columns
// from PARTITION BY clause of window functions
// we're processing in the current stage.
for _, resultProc := range plan.ResultRouters {
plan.Processors[resultProc].Spec.Output[0] = execinfrapb.OutputRouterSpec{
Type: execinfrapb.OutputRouterSpec_BY_HASH,
HashColumns: partitionIdxs,
}
}
// We have multiple streams, so we definitely have a processor planned
// on a remote node.
stageID := plan.NewStage(true /* containsRemoteProcessor */, false /* allowPartialDistribution */)

// We put a windower on each node and we connect it with all hash
// routers from the previous stage in a such way that each node has its
// designated SourceRouterSlot - namely, position in which a node
// appears in nodes.
prevStageRouters := plan.ResultRouters
prevStageResultTypes := plan.GetResultTypes()
plan.ResultRouters = make([]physicalplan.ProcessorIdx, 0, len(sqlInstanceIDs))
for bucket, sqlInstanceID := range sqlInstanceIDs {
proc := physicalplan.Processor{
SQLInstanceID: sqlInstanceID,
Spec: execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{
Type: execinfrapb.InputSyncSpec_PARALLEL_UNORDERED,
ColumnTypes: prevStageResultTypes,
}},
Core: execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec},
Post: execinfrapb.PostProcessSpec{},
Output: []execinfrapb.OutputRouterSpec{{
Type: execinfrapb.OutputRouterSpec_PASS_THROUGH,
}},
StageID: stageID,
ResultTypes: newResultTypes,
},
}
// We have multiple streams, so we definitely have a processor planned
// on a remote node.
stageID := plan.NewStage(true /* containsRemoteProcessor */, false /* allowPartialDistribution */)

// We put a windower on each node and we connect it
// with all hash routers from the previous stage in
// a such way that each node has its designated
// SourceRouterSlot - namely, position in which
// a node appears in nodes.
prevStageRouters := plan.ResultRouters
prevStageResultTypes := plan.GetResultTypes()
plan.ResultRouters = make([]physicalplan.ProcessorIdx, 0, len(sqlInstanceIDs))
for bucket, sqlInstanceID := range sqlInstanceIDs {
proc := physicalplan.Processor{
SQLInstanceID: sqlInstanceID,
Spec: execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{
Type: execinfrapb.InputSyncSpec_PARALLEL_UNORDERED,
ColumnTypes: prevStageResultTypes,
}},
Core: execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec},
Post: execinfrapb.PostProcessSpec{},
Output: []execinfrapb.OutputRouterSpec{{
Type: execinfrapb.OutputRouterSpec_PASS_THROUGH,
}},
StageID: stageID,
ResultTypes: newResultTypes,
},
}
pIdx := plan.AddProcessor(proc)

for _, router := range prevStageRouters {
plan.Streams = append(plan.Streams, physicalplan.Stream{
SourceProcessor: router,
SourceRouterSlot: bucket,
DestProcessor: pIdx,
DestInput: 0,
})
}
plan.ResultRouters = append(plan.ResultRouters, pIdx)
pIdx := plan.AddProcessor(proc)

for _, router := range prevStageRouters {
plan.Streams = append(plan.Streams, physicalplan.Stream{
SourceProcessor: router,
SourceRouterSlot: bucket,
DestProcessor: pIdx,
DestInput: 0,
})
}
plan.ResultRouters = append(plan.ResultRouters, pIdx)
}
}

// We definitely added columns throughout all the stages of windowers, so we
// need to update PlanToStreamColMap. We need to update the map before adding
// rendering or projection because it is used there.
// We definitely added new columns, so we need to update PlanToStreamColMap.
// We need to update the map before adding rendering or projection because
// it is used there.
plan.PlanToStreamColMap = identityMap(plan.PlanToStreamColMap, len(plan.GetResultTypes()))

// windowers do not guarantee maintaining the order at the moment, so we
// reset MergeOrdering. There shouldn't be an ordering here, but we reset it
// defensively (see #35179).
plan.SetMergeOrdering(execinfrapb.Ordering{})

// After all window functions are computed, we need to add rendering or
// After the window functions are computed, we need to add rendering or
// projection.
if err := windowPlanState.addRenderingOrProjection(); err != nil {
if err := addRenderingOrProjection(n, planCtx, plan); err != nil {
return nil, err
}

Expand Down
Loading