From f8b8d4cc892c809b8fb70d4c074215bc26d7e711 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 29 Jul 2022 17:26:47 -0700 Subject: [PATCH 1/2] sql: remove shouldNotDistribute recommendation It doesn't seem to be used much. Release note: None --- pkg/sql/distsql_physical_planner.go | 13 +++---------- pkg/sql/distsql_spec_exec_factory.go | 2 +- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 5c312937f2d7..0f3d9b2343fb 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -312,11 +312,8 @@ const ( // cannotDistribute indicates that a plan cannot be distributed. cannotDistribute distRecommendation = iota - // shouldNotDistribute indicates that a plan could suffer if distributed. - shouldNotDistribute - - // canDistribute indicates that a plan will probably not benefit but will - // probably not suffer if distributed. + // canDistribute indicates that a plan can be distributed, but it's not + // clear whether it'll be benefit from that. canDistribute // shouldDistribute indicates that a plan will likely benefit if distributed. @@ -324,15 +321,11 @@ const ( ) // compose returns the recommendation for a plan given recommendations for two -// parts of it: if we shouldNotDistribute either part, then we -// shouldNotDistribute the overall plan either. +// parts of it. func (a distRecommendation) compose(b distRecommendation) distRecommendation { if a == cannotDistribute || b == cannotDistribute { return cannotDistribute } - if a == shouldNotDistribute || b == shouldNotDistribute { - return shouldNotDistribute - } if a == shouldDistribute || b == shouldDistribute { return shouldDistribute } diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 94f114eb0991..4e76116d5a02 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -304,7 +304,7 @@ func (e *distSQLSpecExecFactory) checkExprsAndMaybeMergeLastStage( // of processors (if there is such). recommendation := shouldDistribute if physPlan != nil && !physPlan.IsLastStageDistributed() { - recommendation = shouldNotDistribute + recommendation = cannotDistribute } for _, expr := range exprs { if err := checkExpr(expr); err != nil { From 561383ed789d4d677cbd3932fdb2bd7ace8bed10 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 29 Jul 2022 18:41:19 -0700 Subject: [PATCH 2/2] sql: improve physical planning of window functions This commit improves the physical planning of window functions in several ways. First, the optimizer is updated so that all window functions with a PARTITION BY clause are constructed first followed by the remaining window functions without PARTITION BY. This is needed by the execution which can only evaluate functions with PARTITION BY in the distributed fashion - as a result of this change, we are now more likely to get partial distributed execution (previously things depended on the order in which window functions were mentioned in the query). Second, the physical planner now thinks that we "should distribute" the plan if it finds at least one window function with PARTITION BY clause. Previously, we didn't make any recommendation about the distribution based on the presence of the window functions (i.e. we relied on the rest of the plan to do so), but they can be quite computation-intensive, so whenever we can distribute the execution, we should do so. Additionally, this commit removes some of the code in the physical planner which tries to find window functions with the same PARTITION BY and ORDER BY clauses - that code has been redundant for long time given that the optimizer does that too. Release note: None --- pkg/sql/catalog/colinfo/ordering.go | 13 + pkg/sql/distsql_physical_planner.go | 234 +++++++++--------- pkg/sql/distsql_plan_window.go | 110 ++------ .../exec/execbuilder/testdata/distsql_window | 87 +++++++ .../execbuilder/tests/5node/generated_test.go | 7 + pkg/sql/opt/optbuilder/testdata/window | 20 +- pkg/sql/opt/optbuilder/window.go | 12 + pkg/sql/opt_exec_factory.go | 1 - pkg/sql/window.go | 4 +- 9 files changed, 270 insertions(+), 218 deletions(-) create mode 100644 pkg/sql/opt/exec/execbuilder/testdata/distsql_window diff --git a/pkg/sql/catalog/colinfo/ordering.go b/pkg/sql/catalog/colinfo/ordering.go index 5f590261d828..cfac6cf73937 100644 --- a/pkg/sql/catalog/colinfo/ordering.go +++ b/pkg/sql/catalog/colinfo/ordering.go @@ -30,6 +30,19 @@ type ColumnOrderInfo struct { // represents an ordering first by column 3 (descending), then by column 1 (ascending). type ColumnOrdering []ColumnOrderInfo +// Equal returns whether two ColumnOrderings are the same. +func (ordering ColumnOrdering) Equal(other ColumnOrdering) bool { + if len(ordering) != len(other) { + return false + } + for i, o := range ordering { + if o.ColIdx != other[i].ColIdx || o.Direction != other[i].Direction { + return false + } + } + return true +} + func (ordering ColumnOrdering) String(columns ResultColumns) string { var buf bytes.Buffer fmtCtx := tree.NewFmtCtx(tree.FmtSimple) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 0f3d9b2343fb..d5367a533902 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -621,7 +621,18 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { return canDistribute, nil case *windowNode: - return checkSupportForPlanNode(n.plan) + rec, err := checkSupportForPlanNode(n.plan) + if err != nil { + return cannotDistribute, err + } + for _, f := range n.funcs { + if len(f.partitionIdxs) > 0 { + // If at least one function has PARTITION BY clause, then we + // should distribute the execution. + return rec.compose(shouldDistribute), nil + } + } + return rec.compose(canDistribute), nil case *zeroNode: return canDistribute, nil @@ -3730,9 +3741,8 @@ func (dsp *DistSQLPlanner) createPlanForSetOp( return p, nil } -// createPlanForWindow creates a physical plan for computing window functions. -// We add a new stage of windower processors for each different partitioning -// scheme found in the query's window functions. +// createPlanForWindow creates a physical plan for computing window functions +// that have the same PARTITION BY and ORDER BY clauses. func (dsp *DistSQLPlanner) createPlanForWindow( ctx context.Context, planCtx *PlanningCtx, n *windowNode, ) (*PhysicalPlan, error) { @@ -3741,122 +3751,122 @@ func (dsp *DistSQLPlanner) createPlanForWindow( return nil, err } - numWindowFuncProcessed := 0 - windowPlanState := createWindowPlanState(n, planCtx, plan) - // Each iteration of this loop adds a new stage of windowers. The steps taken: - // 1. find a set of unprocessed window functions that have the same PARTITION BY - // clause. All of these will be computed using the single stage of windowers. - // 2. a) populate output types of the current stage of windowers. All input - // columns are being passed through, and windower will append output - // columns for each window function processed at the stage. - // b) create specs for all window functions in the set. - // 3. decide whether to put windowers on a single or on multiple nodes. - // a) if we're putting windowers on multiple nodes, we'll put them onto - // every node that participated in the previous stage. We leverage hash - // routers to partition the data based on PARTITION BY clause of window - // functions in the set. - for numWindowFuncProcessed < len(n.funcs) { - samePartitionFuncs, partitionIdxs := windowPlanState.findUnprocessedWindowFnsWithSamePartition() - numWindowFuncProcessed += len(samePartitionFuncs) - windowerSpec := execinfrapb.WindowerSpec{ - PartitionBy: partitionIdxs, - WindowFns: make([]execinfrapb.WindowerSpec_WindowFn, len(samePartitionFuncs)), - } - - newResultTypes := make([]*types.T, len(plan.GetResultTypes())+len(samePartitionFuncs)) - copy(newResultTypes, plan.GetResultTypes()) - for windowFnSpecIdx, windowFn := range samePartitionFuncs { - windowFnSpec, outputType, err := windowPlanState.createWindowFnSpec(windowFn) - if err != nil { - return nil, err - } - newResultTypes[windowFn.outputColIdx] = outputType - windowerSpec.WindowFns[windowFnSpecIdx] = windowFnSpec + partitionIdxs := make([]uint32, len(n.funcs[0].partitionIdxs)) + for i := range partitionIdxs { + partitionIdxs[i] = uint32(n.funcs[0].partitionIdxs[i]) + } + + // Check that all window functions have the same PARTITION BY and ORDER BY + // clauses. We can assume that because the optbuilder ensures that all + // window functions in the windowNode have the same PARTITION BY and ORDER + // BY clauses. + for _, f := range n.funcs[1:] { + if !n.funcs[0].samePartition(f) { + return nil, errors.AssertionFailedf( + "PARTITION BY clauses of window functions handled by the same "+ + "windowNode are different: %v, %v", n.funcs[0].partitionIdxs, f.partitionIdxs, + ) + } + if !n.funcs[0].columnOrdering.Equal(f.columnOrdering) { + return nil, errors.AssertionFailedf( + "ORDER BY clauses of window functions handled by the same "+ + "windowNode are different: %v, %v", n.funcs[0].columnOrdering, f.columnOrdering, + ) } + } - // Check if the previous stage is all on one node. - prevStageNode := plan.Processors[plan.ResultRouters[0]].SQLInstanceID - for i := 1; i < len(plan.ResultRouters); i++ { - if n := plan.Processors[plan.ResultRouters[i]].SQLInstanceID; n != prevStageNode { - prevStageNode = 0 - break - } + // All window functions in the windowNode will be computed using the single + // stage of windowers (because they have the same PARTITION BY and ORDER BY + // clauses). All input columns are being passed through, and windower will + // append output columns for each window function processed at the current + // stage. + windowerSpec := execinfrapb.WindowerSpec{ + PartitionBy: partitionIdxs, + WindowFns: make([]execinfrapb.WindowerSpec_WindowFn, len(n.funcs)), + } + + newResultTypes := make([]*types.T, len(plan.GetResultTypes())+len(n.funcs)) + copy(newResultTypes, plan.GetResultTypes()) + for windowFnSpecIdx, windowFn := range n.funcs { + windowFnSpec, outputType, err := createWindowFnSpec(planCtx, plan, windowFn) + if err != nil { + return nil, err } + newResultTypes[windowFn.outputColIdx] = outputType + windowerSpec.WindowFns[windowFnSpecIdx] = windowFnSpec + } - // Get all sqlInstanceIDs from the previous stage. - sqlInstanceIDs := getSQLInstanceIDsOfRouters(plan.ResultRouters, plan.Processors) - if len(partitionIdxs) == 0 || len(sqlInstanceIDs) == 1 { - // No PARTITION BY or we have a single node. Use a single windower. - // If the previous stage was all on a single node, put the windower - // there. Otherwise, bring the results back on this node. - sqlInstanceID := dsp.gatewaySQLInstanceID - if len(sqlInstanceIDs) == 1 { - sqlInstanceID = sqlInstanceIDs[0] + // Get all sqlInstanceIDs from the previous stage. + sqlInstanceIDs := getSQLInstanceIDsOfRouters(plan.ResultRouters, plan.Processors) + if len(partitionIdxs) == 0 || len(sqlInstanceIDs) == 1 { + // No PARTITION BY or we have a single node. Use a single windower. If + // the previous stage was all on a single node, put the windower there. + // Otherwise, bring the results back on this node. + sqlInstanceID := dsp.gatewaySQLInstanceID + if len(sqlInstanceIDs) == 1 { + sqlInstanceID = sqlInstanceIDs[0] + } + plan.AddSingleGroupStage( + sqlInstanceID, + execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec}, + execinfrapb.PostProcessSpec{}, + newResultTypes, + ) + } else { + // Set up the output routers from the previous stage. We use hash + // routers with hashing on the columns from PARTITION BY clause of + // window functions we're processing in the current stage. + for _, resultProc := range plan.ResultRouters { + plan.Processors[resultProc].Spec.Output[0] = execinfrapb.OutputRouterSpec{ + Type: execinfrapb.OutputRouterSpec_BY_HASH, + HashColumns: partitionIdxs, } - plan.AddSingleGroupStage( - sqlInstanceID, - execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec}, - execinfrapb.PostProcessSpec{}, - newResultTypes, - ) - } else { - // Set up the output routers from the previous stage. - // We use hash routers with hashing on the columns - // from PARTITION BY clause of window functions - // we're processing in the current stage. - for _, resultProc := range plan.ResultRouters { - plan.Processors[resultProc].Spec.Output[0] = execinfrapb.OutputRouterSpec{ - Type: execinfrapb.OutputRouterSpec_BY_HASH, - HashColumns: partitionIdxs, - } + } + // We have multiple streams, so we definitely have a processor planned + // on a remote node. + stageID := plan.NewStage(true /* containsRemoteProcessor */, false /* allowPartialDistribution */) + + // We put a windower on each node and we connect it with all hash + // routers from the previous stage in a such way that each node has its + // designated SourceRouterSlot - namely, position in which a node + // appears in nodes. + prevStageRouters := plan.ResultRouters + prevStageResultTypes := plan.GetResultTypes() + plan.ResultRouters = make([]physicalplan.ProcessorIdx, 0, len(sqlInstanceIDs)) + for bucket, sqlInstanceID := range sqlInstanceIDs { + proc := physicalplan.Processor{ + SQLInstanceID: sqlInstanceID, + Spec: execinfrapb.ProcessorSpec{ + Input: []execinfrapb.InputSyncSpec{{ + Type: execinfrapb.InputSyncSpec_PARALLEL_UNORDERED, + ColumnTypes: prevStageResultTypes, + }}, + Core: execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec}, + Post: execinfrapb.PostProcessSpec{}, + Output: []execinfrapb.OutputRouterSpec{{ + Type: execinfrapb.OutputRouterSpec_PASS_THROUGH, + }}, + StageID: stageID, + ResultTypes: newResultTypes, + }, } - // We have multiple streams, so we definitely have a processor planned - // on a remote node. - stageID := plan.NewStage(true /* containsRemoteProcessor */, false /* allowPartialDistribution */) - - // We put a windower on each node and we connect it - // with all hash routers from the previous stage in - // a such way that each node has its designated - // SourceRouterSlot - namely, position in which - // a node appears in nodes. - prevStageRouters := plan.ResultRouters - prevStageResultTypes := plan.GetResultTypes() - plan.ResultRouters = make([]physicalplan.ProcessorIdx, 0, len(sqlInstanceIDs)) - for bucket, sqlInstanceID := range sqlInstanceIDs { - proc := physicalplan.Processor{ - SQLInstanceID: sqlInstanceID, - Spec: execinfrapb.ProcessorSpec{ - Input: []execinfrapb.InputSyncSpec{{ - Type: execinfrapb.InputSyncSpec_PARALLEL_UNORDERED, - ColumnTypes: prevStageResultTypes, - }}, - Core: execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec}, - Post: execinfrapb.PostProcessSpec{}, - Output: []execinfrapb.OutputRouterSpec{{ - Type: execinfrapb.OutputRouterSpec_PASS_THROUGH, - }}, - StageID: stageID, - ResultTypes: newResultTypes, - }, - } - pIdx := plan.AddProcessor(proc) - - for _, router := range prevStageRouters { - plan.Streams = append(plan.Streams, physicalplan.Stream{ - SourceProcessor: router, - SourceRouterSlot: bucket, - DestProcessor: pIdx, - DestInput: 0, - }) - } - plan.ResultRouters = append(plan.ResultRouters, pIdx) + pIdx := plan.AddProcessor(proc) + + for _, router := range prevStageRouters { + plan.Streams = append(plan.Streams, physicalplan.Stream{ + SourceProcessor: router, + SourceRouterSlot: bucket, + DestProcessor: pIdx, + DestInput: 0, + }) } + plan.ResultRouters = append(plan.ResultRouters, pIdx) } } - // We definitely added columns throughout all the stages of windowers, so we - // need to update PlanToStreamColMap. We need to update the map before adding - // rendering or projection because it is used there. + // We definitely added new columns, so we need to update PlanToStreamColMap. + // We need to update the map before adding rendering or projection because + // it is used there. plan.PlanToStreamColMap = identityMap(plan.PlanToStreamColMap, len(plan.GetResultTypes())) // windowers do not guarantee maintaining the order at the moment, so we @@ -3864,9 +3874,9 @@ func (dsp *DistSQLPlanner) createPlanForWindow( // defensively (see #35179). plan.SetMergeOrdering(execinfrapb.Ordering{}) - // After all window functions are computed, we need to add rendering or + // After the window functions are computed, we need to add rendering or // projection. - if err := windowPlanState.addRenderingOrProjection(); err != nil { + if err := addRenderingOrProjection(n, planCtx, plan); err != nil { return nil, err } diff --git a/pkg/sql/distsql_plan_window.go b/pkg/sql/distsql_plan_window.go index c8ec12de7627..242dffd27a9e 100644 --- a/pkg/sql/distsql_plan_window.go +++ b/pkg/sql/distsql_plan_window.go @@ -19,85 +19,11 @@ import ( "github.com/cockroachdb/errors" ) -type windowPlanState struct { - // infos contains information about windowFuncHolders in the same order as - // they appear in n.funcs. - infos []*windowFuncInfo - n *windowNode - planCtx *PlanningCtx - plan *PhysicalPlan -} - -func createWindowPlanState( - n *windowNode, planCtx *PlanningCtx, plan *PhysicalPlan, -) *windowPlanState { - infos := make([]*windowFuncInfo, 0, len(n.funcs)) - for _, holder := range n.funcs { - infos = append(infos, &windowFuncInfo{holder: holder}) - } - return &windowPlanState{ - infos: infos, - n: n, - planCtx: planCtx, - plan: plan, - } -} - -// windowFuncInfo contains runtime information about a window function. -type windowFuncInfo struct { - holder *windowFuncHolder - // isProcessed indicates whether holder has already been processed. It is set - // to true when holder is included in the set of window functions to be - // processed by findUnprocessedWindowFnsWithSamePartition. - isProcessed bool -} - -// findUnprocessedWindowFnsWithSamePartition finds a set of unprocessed window -// functions that use the same partitioning and updates their isProcessed flag -// accordingly. It returns the set of unprocessed window functions and indices -// of the columns in their PARTITION BY clause. -func (s *windowPlanState) findUnprocessedWindowFnsWithSamePartition() ( - samePartitionFuncs []*windowFuncHolder, - partitionIdxs []uint32, -) { - windowFnToProcessIdx := -1 - for windowFnIdx, windowFn := range s.infos { - if !windowFn.isProcessed { - windowFnToProcessIdx = windowFnIdx - break - } - } - if windowFnToProcessIdx == -1 { - panic("unexpected: no unprocessed window function") - } - - windowFnToProcess := s.infos[windowFnToProcessIdx].holder - partitionIdxs = make([]uint32, len(windowFnToProcess.partitionIdxs)) - for i, idx := range windowFnToProcess.partitionIdxs { - partitionIdxs[i] = uint32(idx) - } - - samePartitionFuncs = make([]*windowFuncHolder, 0, len(s.infos)-windowFnToProcessIdx) - samePartitionFuncs = append(samePartitionFuncs, windowFnToProcess) - s.infos[windowFnToProcessIdx].isProcessed = true - for _, windowFn := range s.infos[windowFnToProcessIdx+1:] { - if windowFn.isProcessed { - continue - } - if windowFnToProcess.samePartition(windowFn.holder) { - samePartitionFuncs = append(samePartitionFuncs, windowFn.holder) - windowFn.isProcessed = true - } - } - - return samePartitionFuncs, partitionIdxs -} - -func (s *windowPlanState) createWindowFnSpec( - funcInProgress *windowFuncHolder, +func createWindowFnSpec( + planCtx *PlanningCtx, plan *PhysicalPlan, funcInProgress *windowFuncHolder, ) (execinfrapb.WindowerSpec_WindowFn, *types.T, error) { for _, argIdx := range funcInProgress.argsIdxs { - if argIdx >= uint32(len(s.plan.GetResultTypes())) { + if argIdx >= uint32(len(plan.GetResultTypes())) { return execinfrapb.WindowerSpec_WindowFn{}, nil, errors.Errorf("ColIdx out of range (%d)", argIdx) } } @@ -108,7 +34,7 @@ func (s *windowPlanState) createWindowFnSpec( } argTypes := make([]*types.T, len(funcInProgress.argsIdxs)) for i, argIdx := range funcInProgress.argsIdxs { - argTypes[i] = s.plan.GetResultTypes()[argIdx] + argTypes[i] = plan.GetResultTypes()[argIdx] } _, outputType, err := execagg.GetWindowFunctionInfo(funcSpec, argTypes...) if err != nil { @@ -134,7 +60,7 @@ func (s *windowPlanState) createWindowFnSpec( if funcInProgress.frame != nil { // funcInProgress has a custom window frame. frameSpec := execinfrapb.WindowerSpec_Frame{} - if err := frameSpec.InitFromAST(funcInProgress.frame, s.planCtx.EvalContext()); err != nil { + if err := frameSpec.InitFromAST(funcInProgress.frame, planCtx.EvalContext()); err != nil { return execinfrapb.WindowerSpec_WindowFn{}, outputType, err } funcInProgressSpec.Frame = &frameSpec @@ -150,7 +76,7 @@ var windowerMergeOrdering = execinfrapb.Ordering{} // are used in another expression and, if they are, adds rendering to the plan. // If no rendering is required, it adds a projection to remove all columns that // were arguments to window functions or were used within OVER clauses. -func (s *windowPlanState) addRenderingOrProjection() error { +func addRenderingOrProjection(n *windowNode, planCtx *PlanningCtx, plan *PhysicalPlan) error { // numWindowFuncsAsIs is the number of window functions output of which is // used directly (i.e. simply as an output column). Note: the same window // function might appear multiple times in the query, but its every @@ -158,17 +84,17 @@ func (s *windowPlanState) addRenderingOrProjection() error { // query like 'SELECT avg(a) OVER (), avg(a) OVER () + 1 FROM t', only the // first window function is used "as is." numWindowFuncsAsIs := 0 - for _, render := range s.n.windowRender { + for _, render := range n.windowRender { if _, ok := render.(*windowFuncHolder); ok { numWindowFuncsAsIs++ } } - if numWindowFuncsAsIs == len(s.infos) { + if numWindowFuncsAsIs == len(n.funcs) { // All window functions' outputs are used directly, so there is no // rendering to do and simple projection is sufficient. - columns := make([]uint32, len(s.n.windowRender)) + columns := make([]uint32, len(n.windowRender)) passedThruColIdx := uint32(0) - for i, render := range s.n.windowRender { + for i, render := range n.windowRender { if render == nil { columns[i] = passedThruColIdx passedThruColIdx++ @@ -179,7 +105,7 @@ func (s *windowPlanState) addRenderingOrProjection() error { columns[i] = uint32(holder.outputColIdx) } } - s.plan.AddProjection(columns, windowerMergeOrdering) + plan.AddProjection(columns, windowerMergeOrdering) return nil } @@ -187,17 +113,17 @@ func (s *windowPlanState) addRenderingOrProjection() error { // 1) IndexedVars that refer to columns by their indices in the full table, // 2) IndexedVars that replaced regular aggregates that are above // "windowing level." - // The mapping of both types IndexedVars is stored in s.n.colAndAggContainer. - renderExprs := make([]tree.TypedExpr, len(s.n.windowRender)) + // The mapping of both types IndexedVars is stored in n.colAndAggContainer. + renderExprs := make([]tree.TypedExpr, len(n.windowRender)) visitor := replaceWindowFuncsVisitor{ - columnsMap: s.n.colAndAggContainer.idxMap, + columnsMap: n.colAndAggContainer.idxMap, } // All passed through columns are contiguous and at the beginning of the // output schema. passedThruColIdx := 0 - renderTypes := make([]*types.T, 0, len(s.n.windowRender)) - for i, render := range s.n.windowRender { + renderTypes := make([]*types.T, 0, len(n.windowRender)) + for i, render := range n.windowRender { if render != nil { // render contains at least one reference to windowFuncHolder, so we need // to walk over the render and replace all windowFuncHolders and (if found) @@ -205,13 +131,13 @@ func (s *windowPlanState) addRenderingOrProjection() error { renderExprs[i] = visitor.replace(render) } else { // render is nil meaning that a column is being passed through. - renderExprs[i] = tree.NewTypedOrdinalReference(passedThruColIdx, s.plan.GetResultTypes()[passedThruColIdx]) + renderExprs[i] = tree.NewTypedOrdinalReference(passedThruColIdx, plan.GetResultTypes()[passedThruColIdx]) passedThruColIdx++ } outputType := renderExprs[i].ResolvedType() renderTypes = append(renderTypes, outputType) } - return s.plan.AddRendering(renderExprs, s.planCtx, s.plan.PlanToStreamColMap, renderTypes, windowerMergeOrdering) + return plan.AddRendering(renderExprs, planCtx, plan.PlanToStreamColMap, renderTypes, windowerMergeOrdering) } // replaceWindowFuncsVisitor is used to populate render expressions containing diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_window b/pkg/sql/opt/exec/execbuilder/testdata/distsql_window new file mode 100644 index 000000000000..6e357910dee9 --- /dev/null +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_window @@ -0,0 +1,87 @@ +# LogicTest: 5node + +statement ok +CREATE TABLE data (a INT, b INT, c FLOAT, d DECIMAL, _bool BOOL, _bytes BYTES, _bit BIT, PRIMARY KEY (a, b, c, d)) + +# Split into ten parts. +statement ok +ALTER TABLE data SPLIT AT SELECT i FROM generate_series(1, 9) AS g(i) + +# Relocate the ten parts to the five nodes. +statement ok +ALTER TABLE data EXPERIMENTAL_RELOCATE + SELECT ARRAY[i%5+1], i FROM generate_series(0, 9) AS g(i) + +# Verify data placement. +query TTTI colnames,rowsort +SELECT start_key, end_key, replicas, lease_holder FROM [SHOW RANGES FROM TABLE data] +---- +start_key end_key replicas lease_holder +NULL /1 {1} 1 +/1 /2 {2} 2 +/2 /3 {3} 3 +/3 /4 {4} 4 +/4 /5 {5} 5 +/5 /6 {1} 1 +/6 /7 {2} 2 +/7 /8 {3} 3 +/8 /9 {4} 4 +/9 NULL {5} 5 + +# Verify that the window functions with the same PARTITION BY clause are +# evaluated as a single distributed windower stage followed by a couple of +# single-node stages. +query T +EXPLAIN (DISTSQL) SELECT + avg(a) OVER (), + min(b) OVER (PARTITION BY a), + avg(c) OVER (ORDER BY b), + max(c) OVER (PARTITION BY a) +FROM data +---- +distribution: full +vectorized: true +· +• window +│ +└── • window + │ + └── • window + │ + └── • scan + missing stats + table: data@data_pkey + spans: FULL SCAN +· +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJy8lVFv2jwUhu-_X2Gdq0TfQcFOgDZXZi2bIrXQBdR1mtBkiNWi0YQlYWtV8d8nh1aUCGxGyG6qJs7j857nWPgFsp9z8KF3d3PVDfrEugyGo-HnK5sMe1e9ixERv-4tYZPBbS8klo3kcRZbk7fnm244CkbBoE8-fCXCxuLr6dvqILzshWplojjxtFkpceRjOLgmkcgFIMRJJPviUWbgfwMKCAwQXEDwAKEFY4RFmkxlliWp-uSlAILoCfwmwixeLHP1eowwTVIJ_gvks3wuwYeRmMxlKEUkU6cJCJHMxWxelFGlufrzffFDPgPCRTJfPsaZTwSSCZIpIAwXQr1oOJSIOCKUJPmDTGG8QkiW-WvZTbXJM3kQ2cN2HU5hvBojZLm4l-DTFe6Jv9lnGSdpJFMZbe1U7FJu8MssjpLfMnVa21Xf2_YJV06vg77Fma3-695Z3LVLfWwyspoy0tIIurefLE5VosEyVymRM-Qu8jZyD3lrb0B3b0BtebqjvGu_O7Sc_b9JoxIg7yBv783hbeWgh59FevRZdGjDYTWdRnqySbdrO43scMnseMms4bg1SWYnk9ypTbJ7uGT3eMluw_FqkuyeTPJZbZK9wyV7x0v2Gk6rJsneySSf_5PLa0eCUGaLJM5kKevunZuqBxndy3XDWbJMp_ImTaZFmfXjoOCKn9JIZvl69fUhiNdLKuDhcKsK3KkCn1eBqaFpWqab72mmh5kWdrfgZhl2tTBletqrMmk9bJi0HjZMWg8bJm3o2dB0q8qk21V062GDbj1s0K2HDboNPRua7lTRfVZFtx426NbDBt162KDb0LOh6fMqummle8NAG4QbaINxA21Qburb1PjfXR7j1X9_AgAA__-og3q5 + +# Verify that all window functions with the PARTITION BY clause are distributed. +query T +EXPLAIN (DISTSQL) SELECT + avg(a) OVER (), + min(b) OVER (PARTITION BY a), + max(c) OVER (PARTITION BY a ORDER BY b), + avg(c) OVER (ORDER BY b), + max(d) OVER (PARTITION BY b ORDER BY c), + min(d) OVER (PARTITION BY a) +FROM data +---- +distribution: full +vectorized: true +· +• window +│ +└── • window + │ + └── • window + │ + └── • window + │ + └── • window + │ + └── • scan + missing stats + table: data@data_pkey + spans: FULL SCAN +· +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzMmN9v4kYQx9_7V6zmCdRFsGvz82nTO1oh5UJK0PWqClULXiWoxKa2ae8U5X-v7CSXmB8zoLEbXhDG_uzMd2e_o8EPkPy9ggEMv1xfXoyuRO3j6GZ68-tlXdwML4cfpsL-c1uzdTH-PJyIWl2K-2VYm79cX19MpqPpaHwlfvpd2Oyu_Vpb7L8rxpOPw0n2dV6X-bLfHyzcyZYI9i4xf11i8ZzJ_gdtXfw8GX8SgU0tSAijwF3Ze5fA4A9QIEGDBA8k-CChDTMJ6zhauCSJ4uyRhxwYBV9h0JKwDNebNPt5JmERxQ4GD5Au05WDAUztfOUmzgYubrZAQuBSu1zlYbLQJvv4c_2X-wYSPkSrzX2YDISVYi7FQooAJNysbfZbo6mEDQOhRJTeuRhmjxKiTfoc-TXg_Ju4s8ldMZRRMHucSUhSe-tgoB7lAQWv62zCKA5c7ILCSvkq2xp_W4ZB9K-Lm-1i1LcbPhAm29ZPo6ua0fWXb36dpUNXpEO1aCEXX2rGq785mEb_mOkab9LsEWm0NJ40vjRtabrSdE5UqgtKvaqU4iXTL0r9glLvsNKOND1pultiX4X4FQnRWyW7-PxLzaj9afZfMu1K0zuYaftgpmgeak8eh8_J9zxUK9-4_sF0OoV01PFdR3G6TlM1mrqivqNKK37nXftOeTqUOu--U6JSvGRV9x19vH00yz660fQqso8urRjdd7VPeTqUPm_7lKgUL1nV9vGOt4_Hso_XaPoV2ccrrRi9d7VPeTqUd972KVEpXrKq7eMfbx-fZR-_0WxXZB-_tGL039U-5elQ_nnbp0SleMn-zz-Ne9KcuGQdhYnbErR_5VYm1AW37mlXkmgTL9x1HC3yME-X45zLp9_AJenT3eeLUfh0K0vweLjLgRUrtGqz6D5OK5TWHLjHgRUrtOqwVLdwWqO0x4GJauGwYoVWxBnHaU2ccW-bbr2lfRz2Ubjoj9Y23EZhrXG6w-kpOEzsNxGZFZrqKQRNnNIup6fgMNFTcJjqKQRN9BRCNdFTepyegsNEtXCY6ikETZxxnKZ6Sp_TUxRrWiBoQjYVmxecMjeFUxMDb2TgzQzMoYE5NfDGBsWaGwiaKhpvcqBw6rzzZgfFGh7UzvRwktFxmhJOxOYFJ41O4NSZ2ZmcTjI6TlNGx2nS6AROGZ1QThkdH58or-E0VTRidOMFJ42O46TRd2aok4xOzCIsmhJOxOYFJ41O4NSZ2RmkTjI6TlNGx2nS6AROGZ1QTr0IwCcp6u84TlOvAogpjhecfBmA45TR9c4khRp99vjDfwEAAP__ad9y-A== diff --git a/pkg/sql/opt/exec/execbuilder/tests/5node/generated_test.go b/pkg/sql/opt/exec/execbuilder/tests/5node/generated_test.go index 13d12c3274c2..a8a32964565a 100644 --- a/pkg/sql/opt/exec/execbuilder/tests/5node/generated_test.go +++ b/pkg/sql/opt/exec/execbuilder/tests/5node/generated_test.go @@ -169,6 +169,13 @@ func TestExecBuild_distsql_union( runExecBuildLogicTest(t, "distsql_union") } +func TestExecBuild_distsql_window( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runExecBuildLogicTest(t, "distsql_window") +} + func TestExecBuild_experimental_distsql_planning_5node( t *testing.T, ) { diff --git a/pkg/sql/opt/optbuilder/testdata/window b/pkg/sql/opt/optbuilder/testdata/window index 36c692c7d150..83866ff3a556 100644 --- a/pkg/sql/opt/optbuilder/testdata/window +++ b/pkg/sql/opt/optbuilder/testdata/window @@ -1334,22 +1334,22 @@ FROM kv WINDOW w as (ORDER BY v), w2 as (PARTITION BY v) ---- project ├── columns: rank:10 rank:11 row_number:12 row_number:13 row_number:14 - └── window partition=(2) ordering=+4 + └── window partition=() ordering=+2 ├── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 rank:10 rank:11 row_number:12 row_number:13 row_number:14 - ├── window partition=(2) - │ ├── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 rank:10 rank:11 row_number:12 row_number:14 - │ ├── window partition=() ordering=+2 - │ │ ├── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 rank:10 rank:11 + ├── window partition=(2) ordering=+4 + │ ├── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 row_number:12 row_number:13 row_number:14 + │ ├── window partition=(2) + │ │ ├── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 row_number:12 row_number:14 │ │ ├── scan kv │ │ │ └── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 │ │ └── windows - │ │ ├── rank [as=rank:10] - │ │ └── rank [as=rank:11] + │ │ ├── row-number [as=row_number:12] + │ │ └── row-number [as=row_number:14] │ └── windows - │ ├── row-number [as=row_number:12] - │ └── row-number [as=row_number:14] + │ └── row-number [as=row_number:13] └── windows - └── row-number [as=row_number:13] + ├── rank [as=rank:10] + └── rank [as=rank:11] build SELECT diff --git a/pkg/sql/opt/optbuilder/window.go b/pkg/sql/opt/optbuilder/window.go index e8d0542830dd..ab5b3a8c9147 100644 --- a/pkg/sql/opt/optbuilder/window.go +++ b/pkg/sql/opt/optbuilder/window.go @@ -202,7 +202,19 @@ func (b *Builder) buildWindow(outScope *scope, inScope *scope) { ) } + // First construct all frames with the PARTITION BY clause - this allows the + // execution to be more distributed. for _, f := range frames { + if f.Partition.Empty() { + continue + } + outScope.expr = b.factory.ConstructWindow(outScope.expr, f.Windows, &f.WindowPrivate) + } + // Now construct all frames without the PARTITION BY clause. + for _, f := range frames { + if !f.Partition.Empty() { + continue + } outScope.expr = b.factory.ConstructWindow(outScope.expr, f.Windows, &f.WindowPrivate) } } diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index e17e2c54216f..e42f33408054 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1087,7 +1087,6 @@ func (ef *execFactory) ConstructWindow(root exec.Node, wi exec.WindowInfo) (exec expr: wi.Exprs[i], args: wi.Exprs[i].Exprs, argsIdxs: argsIdxs, - window: p, filterColIdx: wi.FilterIdxs[i], outputColIdx: wi.OutputIdxs[i], partitionIdxs: partitionIdxs, diff --git a/pkg/sql/window.go b/pkg/sql/window.go index 3f0c7dbba005..c9af301ab6c7 100644 --- a/pkg/sql/window.go +++ b/pkg/sql/window.go @@ -74,8 +74,6 @@ var _ tree.TypedExpr = &windowFuncHolder{} var _ tree.VariableExpr = &windowFuncHolder{} type windowFuncHolder struct { - window *windowNode - expr *tree.FuncExpr args []tree.Expr @@ -88,7 +86,7 @@ type windowFuncHolder struct { frame *tree.WindowFrame } -// samePartition returns whether f and other have the same PARTITION BY clause. +// samePartition returns whether w and other have the same PARTITION BY clause. func (w *windowFuncHolder) samePartition(other *windowFuncHolder) bool { if len(w.partitionIdxs) != len(other.partitionIdxs) { return false