Skip to content

Commit

Permalink
Merge #27140
Browse files Browse the repository at this point in the history
27140: distsql: support window functions in distsql r=yuzefovich a=yuzefovich

Adds support of window functions in DistSQL. A stage of windower processors is added for a particular PARTITION BY scheme, all window functions with the same partitioning are processed by that stage. We also cache sorted partitions if more than one window function has the same ORDER BY clause.

A doc describing the details of the implementation is out.

Note: memory accounting is not done, I plan on adding that after this PR.

Resolves: #24425.

Release note: None

Co-authored-by: yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Aug 6, 2018
2 parents 7dab936 + d413ea9 commit 7a5876b
Show file tree
Hide file tree
Showing 12 changed files with 3,355 additions and 370 deletions.
193 changes: 193 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,9 @@ func (dsp *DistSQLPlanner) checkSupportForNode(node planNode) (distRecommendatio
case *zeroNode:
return canDistribute, nil

case *windowNode:
return dsp.checkSupportForNode(n.plan)

default:
return 0, newQueryNotSupportedErrorf("unsupported node %T", node)
}
Expand Down Expand Up @@ -2271,10 +2274,17 @@ func (dsp *DistSQLPlanner) createPlanForNode(
case *zeroNode:
plan, err = dsp.createPlanForZero(planCtx, n)

case *windowNode:
plan, err = dsp.createPlanForWindow(planCtx, n)

default:
panic(fmt.Sprintf("unsupported node type %T", n))
}

if err != nil {
return plan, err
}

if dsp.shouldPlanTestMetadata() {
if err := plan.CheckLastStagePost(); err != nil {
log.Fatal(planCtx.ctx, err)
Expand Down Expand Up @@ -2813,6 +2823,189 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
return p, nil
}

// createPlanForWindow creates a physical plan for computing window functions.
// We add a new stage of windower processors for each different partitioning
// scheme found in the query's window functions.
func (dsp *DistSQLPlanner) createPlanForWindow(
planCtx *planningCtx, n *windowNode,
) (physicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.plan)
if err != nil {
return physicalPlan{}, err
}

numWindowFuncProcessed := 0
windowPlanState := createWindowPlanState(n, planCtx.EvalContext(), &plan)
// Each iteration of this loop adds a new stage of windowers. The steps taken:
// 1. find a set of unprocessed window functions that have the same PARTITION BY
// clause. All of these will be computed using the single stage of windowers.
// 2. a) populate output types of the current stage of windowers. All columns
// that are arguments to a window function in the set will be replaced by
// a single output column; all columns that are not arguments to any
// window function in the set are simply passed through.
// b) create specs for all window functions in the set.
// 3. windowers' input schema is probably different from the output one, so we
// are adjusting indices of the columns accordingly.
// 4. decide whether to put windowers on a single or on multiple nodes.
// a) if we're putting windowers on multiple nodes, we'll put them onto
// every node that participated in the previous stage. We leverage hash
// routers to partition the data based on PARTITION BY clause of window
// functions in the set.
for numWindowFuncProcessed < len(n.funcs) {
samePartitionFuncs, partitionIdxs := windowPlanState.findUnprocessedWindowFnsWithSamePartition()
numWindowFuncProcessed += len(samePartitionFuncs)
for _, f := range samePartitionFuncs {
// The output of window function f will be put at f.argIdxStart'th column.
// If it is not possible - for example, when two window functions have the
// same argIdxStart, the function that appears later in n.funcs will put
// its result at argIdxStart + 1 - later call to adjustColumnIndices will
// take care of it.
windowPlanState.infos[f.funcIdx].outputColIdx = f.argIdxStart
}

windowerSpec := distsqlrun.WindowerSpec{
PartitionBy: partitionIdxs,
WindowFns: make([]distsqlrun.WindowerSpec_WindowFn, len(samePartitionFuncs)),
}

// Populating output types of this stage since windowers will likely change
// them from its input types. Let's go through an example query
// `SELECT lead(a, b, c) OVER (), avg(d) OVER, rank() OVER() FROM t`.
// 1. 'lead' takes in three arguments which are in columns [0; 3). After we
// compute it, we'll put the result in column 0 (in-place of 'a') and
// not output columns 1 and 2.
// 2. 'avg' takes in a single argument in column 3. We compute it and put
// the result in column 1 since previous window functions have produced
// only a single column so far.
// 3. 'row_number' takes no arguments. We compute it and put the result
// in column 2.
newResultTypes := make([]sqlbase.ColumnType, 0, len(plan.ResultTypes))
// inputColIdx is the index of the column that should be processed next
// among input columns to the current stage.
inputColIdx := 0
for windowFnSpecIdx, windowFn := range samePartitionFuncs {
// All window functions are sorted by their argIdxStart, so we copy all
// columns up to windowFn.argIdxStart (all window functions in
// samePartitionFuncs after windowFn have their arguments in later
// columns).
newResultTypes = append(newResultTypes, plan.ResultTypes[inputColIdx:windowFn.argIdxStart]...)

windowFnSpec, outputType, err := windowPlanState.createWindowFnSpec(windowFn)
if err != nil {
return physicalPlan{}, err
}

// Windower processor does not pass through ("consumes") all arguments of
// windowFn and puts the result of computation at windowFn.argIdxStart.
newResultTypes = append(newResultTypes, outputType)
inputColIdx = windowFn.argIdxStart + windowFn.argCount

windowerSpec.WindowFns[windowFnSpecIdx] = windowFnSpec
}
// We keep all the columns after the last window function
// that is being processed in the current stage.
newResultTypes = append(newResultTypes, plan.ResultTypes[inputColIdx:]...)

// We need to adjust indices since windower's output schema might not be
// equal to its input schema, and we need to maintain outputColIdx of
// processed window functions and argIdxStart of unprocessed window
// functions to point at the correct columns.
windowPlanState.adjustColumnIndices(samePartitionFuncs)

// Check if the previous stage is all on one node.
prevStageNode := plan.Processors[plan.ResultRouters[0]].Node
for i := 1; i < len(plan.ResultRouters); i++ {
if n := plan.Processors[plan.ResultRouters[i]].Node; n != prevStageNode {
prevStageNode = 0
break
}
}

if len(partitionIdxs) == 0 || len(plan.ResultRouters) == 1 {
// No PARTITION BY or we have a single stream. Use a single windower.
// If the previous stage was all on a single node, put the windower
// there. Otherwise, bring the results back on this node.
node := dsp.nodeDesc.NodeID
if prevStageNode != 0 {
node = prevStageNode
}
plan.AddSingleGroupStage(
node,
distsqlrun.ProcessorCoreUnion{Windower: &windowerSpec},
distsqlrun.PostProcessSpec{},
newResultTypes,
)
} else {
// Set up the output routers from the previous stage.
// We use hash routers with hashing on the columns
// from PARTITION BY clause of window functions
// we're processing in the current stage.
for _, resultProc := range plan.ResultRouters {
plan.Processors[resultProc].Spec.Output[0] = distsqlrun.OutputRouterSpec{
Type: distsqlrun.OutputRouterSpec_BY_HASH,
HashColumns: partitionIdxs,
}
}
stageID := plan.NewStageID()

// Get all nodes from the previous stage.
nodes := findJoinProcessorNodes(plan.ResultRouters, nil /* rightRouter */, plan.Processors, false /* includeRight */)
if len(nodes) != len(plan.ResultRouters) {
panic("unexpected number of nodes")
}

// We put a windower on each node and we connect it
// with all hash routers from the previous stage in
// a such way that each node has its designated
// SourceRouterSlot - namely, position in which
// a node appear in nodes.
prevStageRouters := plan.ResultRouters
plan.ResultRouters = make([]distsqlplan.ProcessorIdx, 0, len(nodes))
for bucket, nodeID := range nodes {
proc := distsqlplan.Processor{
Node: nodeID,
Spec: distsqlrun.ProcessorSpec{
Input: []distsqlrun.InputSyncSpec{{
Type: distsqlrun.InputSyncSpec_UNORDERED,
ColumnTypes: plan.ResultTypes,
}},
Core: distsqlrun.ProcessorCoreUnion{Windower: &windowerSpec},
Post: distsqlrun.PostProcessSpec{},
Output: []distsqlrun.OutputRouterSpec{{
Type: distsqlrun.OutputRouterSpec_PASS_THROUGH,
}},
StageID: stageID,
},
}
pIdx := plan.AddProcessor(proc)

for router := 0; router < len(nodes); router++ {
plan.Streams = append(plan.Streams, distsqlplan.Stream{
SourceProcessor: prevStageRouters[router],
SourceRouterSlot: bucket,
DestProcessor: pIdx,
DestInput: 0,
})
}
plan.ResultRouters = append(plan.ResultRouters, pIdx)
}

plan.ResultTypes = newResultTypes
}
}

// We probably added/removed columns throughout all the stages of windowers,
// so we need to update planToStreamColMap.
plan.planToStreamColMap = identityMap(plan.planToStreamColMap, len(plan.ResultTypes))

// After all window functions are computed, we might need to add rendering.
if err := windowPlanState.addRenderingIfNecessary(); err != nil {
return physicalPlan{}, err
}

return plan, nil
}

func (dsp *DistSQLPlanner) newPlanningCtx(
ctx context.Context, evalCtx *extendedEvalContext, txn *client.Txn,
) planningCtx {
Expand Down
Loading

0 comments on commit 7a5876b

Please sign in to comment.