diff --git a/pkg/sql/catalog/colinfo/ordering.go b/pkg/sql/catalog/colinfo/ordering.go index 5f590261d828..cfac6cf73937 100644 --- a/pkg/sql/catalog/colinfo/ordering.go +++ b/pkg/sql/catalog/colinfo/ordering.go @@ -30,6 +30,19 @@ type ColumnOrderInfo struct { // represents an ordering first by column 3 (descending), then by column 1 (ascending). type ColumnOrdering []ColumnOrderInfo +// Equal returns whether two ColumnOrderings are the same. +func (ordering ColumnOrdering) Equal(other ColumnOrdering) bool { + if len(ordering) != len(other) { + return false + } + for i, o := range ordering { + if o.ColIdx != other[i].ColIdx || o.Direction != other[i].Direction { + return false + } + } + return true +} + func (ordering ColumnOrdering) String(columns ResultColumns) string { var buf bytes.Buffer fmtCtx := tree.NewFmtCtx(tree.FmtSimple) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 0f3d9b2343fb..d5367a533902 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -621,7 +621,18 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { return canDistribute, nil case *windowNode: - return checkSupportForPlanNode(n.plan) + rec, err := checkSupportForPlanNode(n.plan) + if err != nil { + return cannotDistribute, err + } + for _, f := range n.funcs { + if len(f.partitionIdxs) > 0 { + // If at least one function has PARTITION BY clause, then we + // should distribute the execution. + return rec.compose(shouldDistribute), nil + } + } + return rec.compose(canDistribute), nil case *zeroNode: return canDistribute, nil @@ -3730,9 +3741,8 @@ func (dsp *DistSQLPlanner) createPlanForSetOp( return p, nil } -// createPlanForWindow creates a physical plan for computing window functions. -// We add a new stage of windower processors for each different partitioning -// scheme found in the query's window functions. +// createPlanForWindow creates a physical plan for computing window functions +// that have the same PARTITION BY and ORDER BY clauses. func (dsp *DistSQLPlanner) createPlanForWindow( ctx context.Context, planCtx *PlanningCtx, n *windowNode, ) (*PhysicalPlan, error) { @@ -3741,122 +3751,122 @@ func (dsp *DistSQLPlanner) createPlanForWindow( return nil, err } - numWindowFuncProcessed := 0 - windowPlanState := createWindowPlanState(n, planCtx, plan) - // Each iteration of this loop adds a new stage of windowers. The steps taken: - // 1. find a set of unprocessed window functions that have the same PARTITION BY - // clause. All of these will be computed using the single stage of windowers. - // 2. a) populate output types of the current stage of windowers. All input - // columns are being passed through, and windower will append output - // columns for each window function processed at the stage. - // b) create specs for all window functions in the set. - // 3. decide whether to put windowers on a single or on multiple nodes. - // a) if we're putting windowers on multiple nodes, we'll put them onto - // every node that participated in the previous stage. We leverage hash - // routers to partition the data based on PARTITION BY clause of window - // functions in the set. - for numWindowFuncProcessed < len(n.funcs) { - samePartitionFuncs, partitionIdxs := windowPlanState.findUnprocessedWindowFnsWithSamePartition() - numWindowFuncProcessed += len(samePartitionFuncs) - windowerSpec := execinfrapb.WindowerSpec{ - PartitionBy: partitionIdxs, - WindowFns: make([]execinfrapb.WindowerSpec_WindowFn, len(samePartitionFuncs)), - } - - newResultTypes := make([]*types.T, len(plan.GetResultTypes())+len(samePartitionFuncs)) - copy(newResultTypes, plan.GetResultTypes()) - for windowFnSpecIdx, windowFn := range samePartitionFuncs { - windowFnSpec, outputType, err := windowPlanState.createWindowFnSpec(windowFn) - if err != nil { - return nil, err - } - newResultTypes[windowFn.outputColIdx] = outputType - windowerSpec.WindowFns[windowFnSpecIdx] = windowFnSpec + partitionIdxs := make([]uint32, len(n.funcs[0].partitionIdxs)) + for i := range partitionIdxs { + partitionIdxs[i] = uint32(n.funcs[0].partitionIdxs[i]) + } + + // Check that all window functions have the same PARTITION BY and ORDER BY + // clauses. We can assume that because the optbuilder ensures that all + // window functions in the windowNode have the same PARTITION BY and ORDER + // BY clauses. + for _, f := range n.funcs[1:] { + if !n.funcs[0].samePartition(f) { + return nil, errors.AssertionFailedf( + "PARTITION BY clauses of window functions handled by the same "+ + "windowNode are different: %v, %v", n.funcs[0].partitionIdxs, f.partitionIdxs, + ) + } + if !n.funcs[0].columnOrdering.Equal(f.columnOrdering) { + return nil, errors.AssertionFailedf( + "ORDER BY clauses of window functions handled by the same "+ + "windowNode are different: %v, %v", n.funcs[0].columnOrdering, f.columnOrdering, + ) } + } - // Check if the previous stage is all on one node. - prevStageNode := plan.Processors[plan.ResultRouters[0]].SQLInstanceID - for i := 1; i < len(plan.ResultRouters); i++ { - if n := plan.Processors[plan.ResultRouters[i]].SQLInstanceID; n != prevStageNode { - prevStageNode = 0 - break - } + // All window functions in the windowNode will be computed using the single + // stage of windowers (because they have the same PARTITION BY and ORDER BY + // clauses). All input columns are being passed through, and windower will + // append output columns for each window function processed at the current + // stage. + windowerSpec := execinfrapb.WindowerSpec{ + PartitionBy: partitionIdxs, + WindowFns: make([]execinfrapb.WindowerSpec_WindowFn, len(n.funcs)), + } + + newResultTypes := make([]*types.T, len(plan.GetResultTypes())+len(n.funcs)) + copy(newResultTypes, plan.GetResultTypes()) + for windowFnSpecIdx, windowFn := range n.funcs { + windowFnSpec, outputType, err := createWindowFnSpec(planCtx, plan, windowFn) + if err != nil { + return nil, err } + newResultTypes[windowFn.outputColIdx] = outputType + windowerSpec.WindowFns[windowFnSpecIdx] = windowFnSpec + } - // Get all sqlInstanceIDs from the previous stage. - sqlInstanceIDs := getSQLInstanceIDsOfRouters(plan.ResultRouters, plan.Processors) - if len(partitionIdxs) == 0 || len(sqlInstanceIDs) == 1 { - // No PARTITION BY or we have a single node. Use a single windower. - // If the previous stage was all on a single node, put the windower - // there. Otherwise, bring the results back on this node. - sqlInstanceID := dsp.gatewaySQLInstanceID - if len(sqlInstanceIDs) == 1 { - sqlInstanceID = sqlInstanceIDs[0] + // Get all sqlInstanceIDs from the previous stage. + sqlInstanceIDs := getSQLInstanceIDsOfRouters(plan.ResultRouters, plan.Processors) + if len(partitionIdxs) == 0 || len(sqlInstanceIDs) == 1 { + // No PARTITION BY or we have a single node. Use a single windower. If + // the previous stage was all on a single node, put the windower there. + // Otherwise, bring the results back on this node. + sqlInstanceID := dsp.gatewaySQLInstanceID + if len(sqlInstanceIDs) == 1 { + sqlInstanceID = sqlInstanceIDs[0] + } + plan.AddSingleGroupStage( + sqlInstanceID, + execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec}, + execinfrapb.PostProcessSpec{}, + newResultTypes, + ) + } else { + // Set up the output routers from the previous stage. We use hash + // routers with hashing on the columns from PARTITION BY clause of + // window functions we're processing in the current stage. + for _, resultProc := range plan.ResultRouters { + plan.Processors[resultProc].Spec.Output[0] = execinfrapb.OutputRouterSpec{ + Type: execinfrapb.OutputRouterSpec_BY_HASH, + HashColumns: partitionIdxs, } - plan.AddSingleGroupStage( - sqlInstanceID, - execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec}, - execinfrapb.PostProcessSpec{}, - newResultTypes, - ) - } else { - // Set up the output routers from the previous stage. - // We use hash routers with hashing on the columns - // from PARTITION BY clause of window functions - // we're processing in the current stage. - for _, resultProc := range plan.ResultRouters { - plan.Processors[resultProc].Spec.Output[0] = execinfrapb.OutputRouterSpec{ - Type: execinfrapb.OutputRouterSpec_BY_HASH, - HashColumns: partitionIdxs, - } + } + // We have multiple streams, so we definitely have a processor planned + // on a remote node. + stageID := plan.NewStage(true /* containsRemoteProcessor */, false /* allowPartialDistribution */) + + // We put a windower on each node and we connect it with all hash + // routers from the previous stage in a such way that each node has its + // designated SourceRouterSlot - namely, position in which a node + // appears in nodes. + prevStageRouters := plan.ResultRouters + prevStageResultTypes := plan.GetResultTypes() + plan.ResultRouters = make([]physicalplan.ProcessorIdx, 0, len(sqlInstanceIDs)) + for bucket, sqlInstanceID := range sqlInstanceIDs { + proc := physicalplan.Processor{ + SQLInstanceID: sqlInstanceID, + Spec: execinfrapb.ProcessorSpec{ + Input: []execinfrapb.InputSyncSpec{{ + Type: execinfrapb.InputSyncSpec_PARALLEL_UNORDERED, + ColumnTypes: prevStageResultTypes, + }}, + Core: execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec}, + Post: execinfrapb.PostProcessSpec{}, + Output: []execinfrapb.OutputRouterSpec{{ + Type: execinfrapb.OutputRouterSpec_PASS_THROUGH, + }}, + StageID: stageID, + ResultTypes: newResultTypes, + }, } - // We have multiple streams, so we definitely have a processor planned - // on a remote node. - stageID := plan.NewStage(true /* containsRemoteProcessor */, false /* allowPartialDistribution */) - - // We put a windower on each node and we connect it - // with all hash routers from the previous stage in - // a such way that each node has its designated - // SourceRouterSlot - namely, position in which - // a node appears in nodes. - prevStageRouters := plan.ResultRouters - prevStageResultTypes := plan.GetResultTypes() - plan.ResultRouters = make([]physicalplan.ProcessorIdx, 0, len(sqlInstanceIDs)) - for bucket, sqlInstanceID := range sqlInstanceIDs { - proc := physicalplan.Processor{ - SQLInstanceID: sqlInstanceID, - Spec: execinfrapb.ProcessorSpec{ - Input: []execinfrapb.InputSyncSpec{{ - Type: execinfrapb.InputSyncSpec_PARALLEL_UNORDERED, - ColumnTypes: prevStageResultTypes, - }}, - Core: execinfrapb.ProcessorCoreUnion{Windower: &windowerSpec}, - Post: execinfrapb.PostProcessSpec{}, - Output: []execinfrapb.OutputRouterSpec{{ - Type: execinfrapb.OutputRouterSpec_PASS_THROUGH, - }}, - StageID: stageID, - ResultTypes: newResultTypes, - }, - } - pIdx := plan.AddProcessor(proc) - - for _, router := range prevStageRouters { - plan.Streams = append(plan.Streams, physicalplan.Stream{ - SourceProcessor: router, - SourceRouterSlot: bucket, - DestProcessor: pIdx, - DestInput: 0, - }) - } - plan.ResultRouters = append(plan.ResultRouters, pIdx) + pIdx := plan.AddProcessor(proc) + + for _, router := range prevStageRouters { + plan.Streams = append(plan.Streams, physicalplan.Stream{ + SourceProcessor: router, + SourceRouterSlot: bucket, + DestProcessor: pIdx, + DestInput: 0, + }) } + plan.ResultRouters = append(plan.ResultRouters, pIdx) } } - // We definitely added columns throughout all the stages of windowers, so we - // need to update PlanToStreamColMap. We need to update the map before adding - // rendering or projection because it is used there. + // We definitely added new columns, so we need to update PlanToStreamColMap. + // We need to update the map before adding rendering or projection because + // it is used there. plan.PlanToStreamColMap = identityMap(plan.PlanToStreamColMap, len(plan.GetResultTypes())) // windowers do not guarantee maintaining the order at the moment, so we @@ -3864,9 +3874,9 @@ func (dsp *DistSQLPlanner) createPlanForWindow( // defensively (see #35179). plan.SetMergeOrdering(execinfrapb.Ordering{}) - // After all window functions are computed, we need to add rendering or + // After the window functions are computed, we need to add rendering or // projection. - if err := windowPlanState.addRenderingOrProjection(); err != nil { + if err := addRenderingOrProjection(n, planCtx, plan); err != nil { return nil, err } diff --git a/pkg/sql/distsql_plan_window.go b/pkg/sql/distsql_plan_window.go index c8ec12de7627..242dffd27a9e 100644 --- a/pkg/sql/distsql_plan_window.go +++ b/pkg/sql/distsql_plan_window.go @@ -19,85 +19,11 @@ import ( "github.com/cockroachdb/errors" ) -type windowPlanState struct { - // infos contains information about windowFuncHolders in the same order as - // they appear in n.funcs. - infos []*windowFuncInfo - n *windowNode - planCtx *PlanningCtx - plan *PhysicalPlan -} - -func createWindowPlanState( - n *windowNode, planCtx *PlanningCtx, plan *PhysicalPlan, -) *windowPlanState { - infos := make([]*windowFuncInfo, 0, len(n.funcs)) - for _, holder := range n.funcs { - infos = append(infos, &windowFuncInfo{holder: holder}) - } - return &windowPlanState{ - infos: infos, - n: n, - planCtx: planCtx, - plan: plan, - } -} - -// windowFuncInfo contains runtime information about a window function. -type windowFuncInfo struct { - holder *windowFuncHolder - // isProcessed indicates whether holder has already been processed. It is set - // to true when holder is included in the set of window functions to be - // processed by findUnprocessedWindowFnsWithSamePartition. - isProcessed bool -} - -// findUnprocessedWindowFnsWithSamePartition finds a set of unprocessed window -// functions that use the same partitioning and updates their isProcessed flag -// accordingly. It returns the set of unprocessed window functions and indices -// of the columns in their PARTITION BY clause. -func (s *windowPlanState) findUnprocessedWindowFnsWithSamePartition() ( - samePartitionFuncs []*windowFuncHolder, - partitionIdxs []uint32, -) { - windowFnToProcessIdx := -1 - for windowFnIdx, windowFn := range s.infos { - if !windowFn.isProcessed { - windowFnToProcessIdx = windowFnIdx - break - } - } - if windowFnToProcessIdx == -1 { - panic("unexpected: no unprocessed window function") - } - - windowFnToProcess := s.infos[windowFnToProcessIdx].holder - partitionIdxs = make([]uint32, len(windowFnToProcess.partitionIdxs)) - for i, idx := range windowFnToProcess.partitionIdxs { - partitionIdxs[i] = uint32(idx) - } - - samePartitionFuncs = make([]*windowFuncHolder, 0, len(s.infos)-windowFnToProcessIdx) - samePartitionFuncs = append(samePartitionFuncs, windowFnToProcess) - s.infos[windowFnToProcessIdx].isProcessed = true - for _, windowFn := range s.infos[windowFnToProcessIdx+1:] { - if windowFn.isProcessed { - continue - } - if windowFnToProcess.samePartition(windowFn.holder) { - samePartitionFuncs = append(samePartitionFuncs, windowFn.holder) - windowFn.isProcessed = true - } - } - - return samePartitionFuncs, partitionIdxs -} - -func (s *windowPlanState) createWindowFnSpec( - funcInProgress *windowFuncHolder, +func createWindowFnSpec( + planCtx *PlanningCtx, plan *PhysicalPlan, funcInProgress *windowFuncHolder, ) (execinfrapb.WindowerSpec_WindowFn, *types.T, error) { for _, argIdx := range funcInProgress.argsIdxs { - if argIdx >= uint32(len(s.plan.GetResultTypes())) { + if argIdx >= uint32(len(plan.GetResultTypes())) { return execinfrapb.WindowerSpec_WindowFn{}, nil, errors.Errorf("ColIdx out of range (%d)", argIdx) } } @@ -108,7 +34,7 @@ func (s *windowPlanState) createWindowFnSpec( } argTypes := make([]*types.T, len(funcInProgress.argsIdxs)) for i, argIdx := range funcInProgress.argsIdxs { - argTypes[i] = s.plan.GetResultTypes()[argIdx] + argTypes[i] = plan.GetResultTypes()[argIdx] } _, outputType, err := execagg.GetWindowFunctionInfo(funcSpec, argTypes...) if err != nil { @@ -134,7 +60,7 @@ func (s *windowPlanState) createWindowFnSpec( if funcInProgress.frame != nil { // funcInProgress has a custom window frame. frameSpec := execinfrapb.WindowerSpec_Frame{} - if err := frameSpec.InitFromAST(funcInProgress.frame, s.planCtx.EvalContext()); err != nil { + if err := frameSpec.InitFromAST(funcInProgress.frame, planCtx.EvalContext()); err != nil { return execinfrapb.WindowerSpec_WindowFn{}, outputType, err } funcInProgressSpec.Frame = &frameSpec @@ -150,7 +76,7 @@ var windowerMergeOrdering = execinfrapb.Ordering{} // are used in another expression and, if they are, adds rendering to the plan. // If no rendering is required, it adds a projection to remove all columns that // were arguments to window functions or were used within OVER clauses. -func (s *windowPlanState) addRenderingOrProjection() error { +func addRenderingOrProjection(n *windowNode, planCtx *PlanningCtx, plan *PhysicalPlan) error { // numWindowFuncsAsIs is the number of window functions output of which is // used directly (i.e. simply as an output column). Note: the same window // function might appear multiple times in the query, but its every @@ -158,17 +84,17 @@ func (s *windowPlanState) addRenderingOrProjection() error { // query like 'SELECT avg(a) OVER (), avg(a) OVER () + 1 FROM t', only the // first window function is used "as is." numWindowFuncsAsIs := 0 - for _, render := range s.n.windowRender { + for _, render := range n.windowRender { if _, ok := render.(*windowFuncHolder); ok { numWindowFuncsAsIs++ } } - if numWindowFuncsAsIs == len(s.infos) { + if numWindowFuncsAsIs == len(n.funcs) { // All window functions' outputs are used directly, so there is no // rendering to do and simple projection is sufficient. - columns := make([]uint32, len(s.n.windowRender)) + columns := make([]uint32, len(n.windowRender)) passedThruColIdx := uint32(0) - for i, render := range s.n.windowRender { + for i, render := range n.windowRender { if render == nil { columns[i] = passedThruColIdx passedThruColIdx++ @@ -179,7 +105,7 @@ func (s *windowPlanState) addRenderingOrProjection() error { columns[i] = uint32(holder.outputColIdx) } } - s.plan.AddProjection(columns, windowerMergeOrdering) + plan.AddProjection(columns, windowerMergeOrdering) return nil } @@ -187,17 +113,17 @@ func (s *windowPlanState) addRenderingOrProjection() error { // 1) IndexedVars that refer to columns by their indices in the full table, // 2) IndexedVars that replaced regular aggregates that are above // "windowing level." - // The mapping of both types IndexedVars is stored in s.n.colAndAggContainer. - renderExprs := make([]tree.TypedExpr, len(s.n.windowRender)) + // The mapping of both types IndexedVars is stored in n.colAndAggContainer. + renderExprs := make([]tree.TypedExpr, len(n.windowRender)) visitor := replaceWindowFuncsVisitor{ - columnsMap: s.n.colAndAggContainer.idxMap, + columnsMap: n.colAndAggContainer.idxMap, } // All passed through columns are contiguous and at the beginning of the // output schema. passedThruColIdx := 0 - renderTypes := make([]*types.T, 0, len(s.n.windowRender)) - for i, render := range s.n.windowRender { + renderTypes := make([]*types.T, 0, len(n.windowRender)) + for i, render := range n.windowRender { if render != nil { // render contains at least one reference to windowFuncHolder, so we need // to walk over the render and replace all windowFuncHolders and (if found) @@ -205,13 +131,13 @@ func (s *windowPlanState) addRenderingOrProjection() error { renderExprs[i] = visitor.replace(render) } else { // render is nil meaning that a column is being passed through. - renderExprs[i] = tree.NewTypedOrdinalReference(passedThruColIdx, s.plan.GetResultTypes()[passedThruColIdx]) + renderExprs[i] = tree.NewTypedOrdinalReference(passedThruColIdx, plan.GetResultTypes()[passedThruColIdx]) passedThruColIdx++ } outputType := renderExprs[i].ResolvedType() renderTypes = append(renderTypes, outputType) } - return s.plan.AddRendering(renderExprs, s.planCtx, s.plan.PlanToStreamColMap, renderTypes, windowerMergeOrdering) + return plan.AddRendering(renderExprs, planCtx, plan.PlanToStreamColMap, renderTypes, windowerMergeOrdering) } // replaceWindowFuncsVisitor is used to populate render expressions containing diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_window b/pkg/sql/opt/exec/execbuilder/testdata/distsql_window new file mode 100644 index 000000000000..6e357910dee9 --- /dev/null +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_window @@ -0,0 +1,87 @@ +# LogicTest: 5node + +statement ok +CREATE TABLE data (a INT, b INT, c FLOAT, d DECIMAL, _bool BOOL, _bytes BYTES, _bit BIT, PRIMARY KEY (a, b, c, d)) + +# Split into ten parts. +statement ok +ALTER TABLE data SPLIT AT SELECT i FROM generate_series(1, 9) AS g(i) + +# Relocate the ten parts to the five nodes. +statement ok +ALTER TABLE data EXPERIMENTAL_RELOCATE + SELECT ARRAY[i%5+1], i FROM generate_series(0, 9) AS g(i) + +# Verify data placement. +query TTTI colnames,rowsort +SELECT start_key, end_key, replicas, lease_holder FROM [SHOW RANGES FROM TABLE data] +---- +start_key end_key replicas lease_holder +NULL /1 {1} 1 +/1 /2 {2} 2 +/2 /3 {3} 3 +/3 /4 {4} 4 +/4 /5 {5} 5 +/5 /6 {1} 1 +/6 /7 {2} 2 +/7 /8 {3} 3 +/8 /9 {4} 4 +/9 NULL {5} 5 + +# Verify that the window functions with the same PARTITION BY clause are +# evaluated as a single distributed windower stage followed by a couple of +# single-node stages. +query T +EXPLAIN (DISTSQL) SELECT + avg(a) OVER (), + min(b) OVER (PARTITION BY a), + avg(c) OVER (ORDER BY b), + max(c) OVER (PARTITION BY a) +FROM data +---- +distribution: full +vectorized: true +· +• window +│ +└── • window + │ + └── • window + │ + └── • scan + missing stats + table: data@data_pkey + spans: FULL SCAN +· +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJy8lVFv2jwUhu-_X2Gdq0TfQcFOgDZXZi2bIrXQBdR1mtBkiNWi0YQlYWtV8d8nh1aUCGxGyG6qJs7j857nWPgFsp9z8KF3d3PVDfrEugyGo-HnK5sMe1e9ixERv-4tYZPBbS8klo3kcRZbk7fnm244CkbBoE8-fCXCxuLr6dvqILzshWplojjxtFkpceRjOLgmkcgFIMRJJPviUWbgfwMKCAwQXEDwAKEFY4RFmkxlliWp-uSlAILoCfwmwixeLHP1eowwTVIJ_gvks3wuwYeRmMxlKEUkU6cJCJHMxWxelFGlufrzffFDPgPCRTJfPsaZTwSSCZIpIAwXQr1oOJSIOCKUJPmDTGG8QkiW-WvZTbXJM3kQ2cN2HU5hvBojZLm4l-DTFe6Jv9lnGSdpJFMZbe1U7FJu8MssjpLfMnVa21Xf2_YJV06vg77Fma3-695Z3LVLfWwyspoy0tIIurefLE5VosEyVymRM-Qu8jZyD3lrb0B3b0BtebqjvGu_O7Sc_b9JoxIg7yBv783hbeWgh59FevRZdGjDYTWdRnqySbdrO43scMnseMms4bg1SWYnk9ypTbJ7uGT3eMluw_FqkuyeTPJZbZK9wyV7x0v2Gk6rJsneySSf_5PLa0eCUGaLJM5kKevunZuqBxndy3XDWbJMp_ImTaZFmfXjoOCKn9JIZvl69fUhiNdLKuDhcKsK3KkCn1eBqaFpWqab72mmh5kWdrfgZhl2tTBletqrMmk9bJi0HjZMWg8bJm3o2dB0q8qk21V062GDbj1s0K2HDboNPRua7lTRfVZFtx426NbDBt162KDb0LOh6fMqummle8NAG4QbaINxA21Qburb1PjfXR7j1X9_AgAA__-og3q5 + +# Verify that all window functions with the PARTITION BY clause are distributed. +query T +EXPLAIN (DISTSQL) SELECT + avg(a) OVER (), + min(b) OVER (PARTITION BY a), + max(c) OVER (PARTITION BY a ORDER BY b), + avg(c) OVER (ORDER BY b), + max(d) OVER (PARTITION BY b ORDER BY c), + min(d) OVER (PARTITION BY a) +FROM data +---- +distribution: full +vectorized: true +· +• window +│ +└── • window + │ + └── • window + │ + └── • window + │ + └── • window + │ + └── • scan + missing stats + table: data@data_pkey + spans: FULL SCAN +· +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzMmN9v4kYQx9_7V6zmCdRFsGvz82nTO1oh5UJK0PWqClULXiWoxKa2ae8U5X-v7CSXmB8zoLEbXhDG_uzMd2e_o8EPkPy9ggEMv1xfXoyuRO3j6GZ68-tlXdwML4cfpsL-c1uzdTH-PJyIWl2K-2VYm79cX19MpqPpaHwlfvpd2Oyu_Vpb7L8rxpOPw0n2dV6X-bLfHyzcyZYI9i4xf11i8ZzJ_gdtXfw8GX8SgU0tSAijwF3Ze5fA4A9QIEGDBA8k-CChDTMJ6zhauCSJ4uyRhxwYBV9h0JKwDNebNPt5JmERxQ4GD5Au05WDAUztfOUmzgYubrZAQuBSu1zlYbLQJvv4c_2X-wYSPkSrzX2YDISVYi7FQooAJNysbfZbo6mEDQOhRJTeuRhmjxKiTfoc-TXg_Ju4s8ldMZRRMHucSUhSe-tgoB7lAQWv62zCKA5c7ILCSvkq2xp_W4ZB9K-Lm-1i1LcbPhAm29ZPo6ua0fWXb36dpUNXpEO1aCEXX2rGq785mEb_mOkab9LsEWm0NJ40vjRtabrSdE5UqgtKvaqU4iXTL0r9glLvsNKOND1pultiX4X4FQnRWyW7-PxLzaj9afZfMu1K0zuYaftgpmgeak8eh8_J9zxUK9-4_sF0OoV01PFdR3G6TlM1mrqivqNKK37nXftOeTqUOu--U6JSvGRV9x19vH00yz660fQqso8urRjdd7VPeTqUPm_7lKgUL1nV9vGOt4_Hso_XaPoV2ccrrRi9d7VPeTqUd972KVEpXrKq7eMfbx-fZR-_0WxXZB-_tGL039U-5elQ_nnbp0SleMn-zz-Ne9KcuGQdhYnbErR_5VYm1AW37mlXkmgTL9x1HC3yME-X45zLp9_AJenT3eeLUfh0K0vweLjLgRUrtGqz6D5OK5TWHLjHgRUrtOqwVLdwWqO0x4GJauGwYoVWxBnHaU2ccW-bbr2lfRz2Ubjoj9Y23EZhrXG6w-kpOEzsNxGZFZrqKQRNnNIup6fgMNFTcJjqKQRN9BRCNdFTepyegsNEtXCY6ikETZxxnKZ6Sp_TUxRrWiBoQjYVmxecMjeFUxMDb2TgzQzMoYE5NfDGBsWaGwiaKhpvcqBw6rzzZgfFGh7UzvRwktFxmhJOxOYFJ41O4NSZ2ZmcTjI6TlNGx2nS6AROGZ1QThkdH58or-E0VTRidOMFJ42O46TRd2aok4xOzCIsmhJOxOYFJ41O4NSZ2RmkTjI6TlNGx2nS6AROGZ1QTr0IwCcp6u84TlOvAogpjhecfBmA45TR9c4khRp99vjDfwEAAP__ad9y-A== diff --git a/pkg/sql/opt/exec/execbuilder/tests/5node/generated_test.go b/pkg/sql/opt/exec/execbuilder/tests/5node/generated_test.go index 13d12c3274c2..a8a32964565a 100644 --- a/pkg/sql/opt/exec/execbuilder/tests/5node/generated_test.go +++ b/pkg/sql/opt/exec/execbuilder/tests/5node/generated_test.go @@ -169,6 +169,13 @@ func TestExecBuild_distsql_union( runExecBuildLogicTest(t, "distsql_union") } +func TestExecBuild_distsql_window( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runExecBuildLogicTest(t, "distsql_window") +} + func TestExecBuild_experimental_distsql_planning_5node( t *testing.T, ) { diff --git a/pkg/sql/opt/optbuilder/testdata/window b/pkg/sql/opt/optbuilder/testdata/window index 36c692c7d150..83866ff3a556 100644 --- a/pkg/sql/opt/optbuilder/testdata/window +++ b/pkg/sql/opt/optbuilder/testdata/window @@ -1334,22 +1334,22 @@ FROM kv WINDOW w as (ORDER BY v), w2 as (PARTITION BY v) ---- project ├── columns: rank:10 rank:11 row_number:12 row_number:13 row_number:14 - └── window partition=(2) ordering=+4 + └── window partition=() ordering=+2 ├── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 rank:10 rank:11 row_number:12 row_number:13 row_number:14 - ├── window partition=(2) - │ ├── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 rank:10 rank:11 row_number:12 row_number:14 - │ ├── window partition=() ordering=+2 - │ │ ├── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 rank:10 rank:11 + ├── window partition=(2) ordering=+4 + │ ├── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 row_number:12 row_number:13 row_number:14 + │ ├── window partition=(2) + │ │ ├── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 row_number:12 row_number:14 │ │ ├── scan kv │ │ │ └── columns: k:1!null v:2 w:3 f:4 d:5 s:6 b:7 crdb_internal_mvcc_timestamp:8 tableoid:9 │ │ └── windows - │ │ ├── rank [as=rank:10] - │ │ └── rank [as=rank:11] + │ │ ├── row-number [as=row_number:12] + │ │ └── row-number [as=row_number:14] │ └── windows - │ ├── row-number [as=row_number:12] - │ └── row-number [as=row_number:14] + │ └── row-number [as=row_number:13] └── windows - └── row-number [as=row_number:13] + ├── rank [as=rank:10] + └── rank [as=rank:11] build SELECT diff --git a/pkg/sql/opt/optbuilder/window.go b/pkg/sql/opt/optbuilder/window.go index e8d0542830dd..ab5b3a8c9147 100644 --- a/pkg/sql/opt/optbuilder/window.go +++ b/pkg/sql/opt/optbuilder/window.go @@ -202,7 +202,19 @@ func (b *Builder) buildWindow(outScope *scope, inScope *scope) { ) } + // First construct all frames with the PARTITION BY clause - this allows the + // execution to be more distributed. for _, f := range frames { + if f.Partition.Empty() { + continue + } + outScope.expr = b.factory.ConstructWindow(outScope.expr, f.Windows, &f.WindowPrivate) + } + // Now construct all frames without the PARTITION BY clause. + for _, f := range frames { + if !f.Partition.Empty() { + continue + } outScope.expr = b.factory.ConstructWindow(outScope.expr, f.Windows, &f.WindowPrivate) } } diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index e17e2c54216f..e42f33408054 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1087,7 +1087,6 @@ func (ef *execFactory) ConstructWindow(root exec.Node, wi exec.WindowInfo) (exec expr: wi.Exprs[i], args: wi.Exprs[i].Exprs, argsIdxs: argsIdxs, - window: p, filterColIdx: wi.FilterIdxs[i], outputColIdx: wi.OutputIdxs[i], partitionIdxs: partitionIdxs, diff --git a/pkg/sql/window.go b/pkg/sql/window.go index 3f0c7dbba005..c9af301ab6c7 100644 --- a/pkg/sql/window.go +++ b/pkg/sql/window.go @@ -74,8 +74,6 @@ var _ tree.TypedExpr = &windowFuncHolder{} var _ tree.VariableExpr = &windowFuncHolder{} type windowFuncHolder struct { - window *windowNode - expr *tree.FuncExpr args []tree.Expr @@ -88,7 +86,7 @@ type windowFuncHolder struct { frame *tree.WindowFrame } -// samePartition returns whether f and other have the same PARTITION BY clause. +// samePartition returns whether w and other have the same PARTITION BY clause. func (w *windowFuncHolder) samePartition(other *windowFuncHolder) bool { if len(w.partitionIdxs) != len(other.partitionIdxs) { return false