Skip to content

Commit

Permalink
distsql: Implement INTERSECT/EXCEPT ALL plans
Browse files Browse the repository at this point in the history
Part of cockroachdb#10432, cockroachdb#21661, and cockroachdb#21706.

Release note (performance improvement): Support distributed execution of
INTERSECT ALL and EXCEPT ALL queries.
  • Loading branch information
Abhishek Madan committed Jan 29, 2018
1 parent af29319 commit c8139ab
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 96 deletions.
206 changes: 118 additions & 88 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ func (dsp *DistSQLPlanner) checkSupportForNode(node planNode) (distRecommendatio
return dsp.checkSupportForNode(n.plan)

case *unionNode:
// Only UNION and UNION ALL are supported so far.
if n.unionType == tree.UnionOp {
// EXCEPT and INTERSECT are currently not supported.
if n.all || n.unionType == tree.UnionOp {
recLeft, err := dsp.checkSupportForNode(n.left)
if err != nil {
return 0, err
Expand Down Expand Up @@ -1589,23 +1589,7 @@ func (dsp *DistSQLPlanner) createPlanForJoin(

// Set up the output columns.
if numEq := len(n.pred.leftEqualityIndices); numEq != 0 {
// TODO(radu): for now we run a join processor on every node that produces
// data for either source. In the future we should be smarter here.
seen := make(map[roachpb.NodeID]struct{})
for _, pIdx := range leftRouters {
n := p.Processors[pIdx].Node
if _, ok := seen[n]; !ok {
seen[n] = struct{}{}
nodes = append(nodes, n)
}
}
for _, pIdx := range rightRouters {
n := p.Processors[pIdx].Node
if _, ok := seen[n]; !ok {
seen[n] = struct{}{}
nodes = append(nodes, n)
}
}
nodes = findJoinProcessorNodes(leftRouters, rightRouters, p.Processors)

// Set up the equality columns.
leftEqCols = eqCols(n.pred.leftEqualityIndices, leftPlan.planToStreamColMap)
Expand Down Expand Up @@ -1659,61 +1643,10 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
}
}

pIdxStart := distsqlplan.ProcessorIdx(len(p.Processors))
stageID := p.NewStageID()

// Each node has a join processor.
for _, n := range nodes {
proc := distsqlplan.Processor{
Node: n,
Spec: distsqlrun.ProcessorSpec{
Input: []distsqlrun.InputSyncSpec{
{ColumnTypes: leftTypes},
{ColumnTypes: rightTypes},
},
Core: core,
Post: post,
Output: []distsqlrun.OutputRouterSpec{{Type: distsqlrun.OutputRouterSpec_PASS_THROUGH}},
StageID: stageID,
},
}
p.Processors = append(p.Processors, proc)
}

if len(nodes) > 1 {
// Parallel hash or merge join: we distribute rows (by hash of
// equality columns) to len(nodes) join processors.

// Set up the left routers.
for _, resultProc := range leftRouters {
p.Processors[resultProc].Spec.Output[0] = distsqlrun.OutputRouterSpec{
Type: distsqlrun.OutputRouterSpec_BY_HASH,
HashColumns: leftEqCols,
}
}
// Set up the right routers.
for _, resultProc := range rightRouters {
p.Processors[resultProc].Spec.Output[0] = distsqlrun.OutputRouterSpec{
Type: distsqlrun.OutputRouterSpec_BY_HASH,
HashColumns: rightEqCols,
}
}
}
p.ResultRouters = p.ResultRouters[:0]

// Connect the left and right routers to the output joiners. Each joiner
// corresponds to a hash bucket.
for bucket := 0; bucket < len(nodes); bucket++ {
pIdx := pIdxStart + distsqlplan.ProcessorIdx(bucket)

// Connect left routers to the processor's first input. Currently the join
// node doesn't care about the orderings of the left and right results.
p.MergeResultStreams(leftRouters, bucket, leftMergeOrd, pIdx, 0)
// Connect right routers to the processor's second input.
p.MergeResultStreams(rightRouters, bucket, rightMergeOrd, pIdx, 1)

p.ResultRouters = append(p.ResultRouters, pIdx)
}
p.AddJoinStage(
nodes, core, post, leftTypes, rightTypes, leftEqCols, rightEqCols,
leftMergeOrd, rightMergeOrd, leftRouters, rightRouters,
)

p.planToStreamColMap = joinToStreamColMap
p.ResultTypes = getTypesForPlanResult(n, joinToStreamColMap)
Expand Down Expand Up @@ -1795,7 +1728,7 @@ func (dsp *DistSQLPlanner) createPlanForNode(
return dsp.createPlanForDistinct(planCtx, n)

case *unionNode:
return dsp.createPlanForUnion(planCtx, n)
return dsp.createPlanForSetOp(planCtx, n)

case *valuesNode:
return dsp.createPlanForValues(planCtx, n)
Expand Down Expand Up @@ -1943,7 +1876,7 @@ func (dsp *DistSQLPlanner) isOnlyOnGateway(plan *physicalPlan) bool {
return false
}

func (dsp *DistSQLPlanner) createPlanForUnion(
func (dsp *DistSQLPlanner) createPlanForSetOp(
planCtx *planningCtx, n *unionNode,
) (physicalPlan, error) {
leftPlan, err := dsp.createPlanForNode(planCtx, n.left)
Expand All @@ -1954,6 +1887,9 @@ func (dsp *DistSQLPlanner) createPlanForUnion(
if err != nil {
return physicalPlan{}, err
}
if n.inverted {
leftPlan, rightPlan = rightPlan, leftPlan
}
childPlans := []*physicalPlan{&leftPlan, &rightPlan}

var distinctSpec distsqlrun.ProcessorCoreUnion
Expand All @@ -1967,7 +1903,7 @@ func (dsp *DistSQLPlanner) createPlanForUnion(
// group stage. In the worst case (total duplication), this causes double
// the amount of data to be streamed as necessary.
var distinctColumns []uint32
for planCol := range planColumns(n.left) {
for planCol := range planColumns(n) {
if streamCol := leftPlan.planToStreamColMap[planCol]; streamCol != -1 {
distinctColumns = append(distinctColumns, uint32(streamCol))
}
Expand All @@ -1991,8 +1927,8 @@ func (dsp *DistSQLPlanner) createPlanForUnion(
var p physicalPlan

// Merge the plans' planToStreamColMap, which should be equivalent.
// TODO(solon): Are there any valid UNION ALL cases where these differ? If
// we encounter any, we could handle them similarly to the differing
// TODO(solon): Are there any valid UNION/INTERSECT/EXCEPT cases where these
// differ? If we encounter any, we could handle them similarly to the differing
// ResultTypes case below.
if !reflect.DeepEqual(leftPlan.planToStreamColMap, rightPlan.planToStreamColMap) {
return physicalPlan{}, errors.Errorf(
Expand All @@ -2001,6 +1937,14 @@ func (dsp *DistSQLPlanner) createPlanForUnion(
}
p.planToStreamColMap = leftPlan.planToStreamColMap

columns := make([]uint32, 0, len(p.planToStreamColMap))
for _, col := range p.planToStreamColMap {
if col < 0 {
continue
}
columns = append(columns, uint32(col))
}

// Merge the plans' result types and merge ordering.
resultTypes, err := distsqlplan.MergeResultTypes(leftPlan.ResultTypes, rightPlan.ResultTypes)
mergeOrdering := leftPlan.MergeOrdering
Expand All @@ -2009,14 +1953,6 @@ func (dsp *DistSQLPlanner) createPlanForUnion(
// pathological cases, like if they have incompatible ORDER BY clauses.
// Resolve this by collecting results on a single node and adding a
// projection to the results that will be unioned.
columns := make([]uint32, 0, len(p.planToStreamColMap))
for _, col := range p.planToStreamColMap {
if col < 0 {
continue
}
columns = append(columns, uint32(col))
}

for _, plan := range childPlans {
plan.AddSingleGroupStage(
dsp.nodeDesc.NodeID,
Expand All @@ -2032,13 +1968,107 @@ func (dsp *DistSQLPlanner) createPlanForUnion(
return physicalPlan{}, err
}
mergeOrdering = distsqlrun.Ordering{}
} else if n.unionType != tree.UnionOp {
// In INTERSECT and EXCEPT cases where the left and right sides have
// the same merge ordering, but the merge ordering contains columns that
// don't appear in the output (e.g. SELECT k FROM kv ORDER BY v), we cannot
// keep the ordering, since some ORDER BY columns are not also equality
// columns. As a result, update the ordering to remove these columns, and
// add a projection on the left and right sides to strip out these (now
// unused) columns.
var newMergeOrdering distsqlrun.Ordering
newMergeOrdering.Columns = make([]distsqlrun.Ordering_Column, 0, len(mergeOrdering.Columns))
for _, ord := range mergeOrdering.Columns {
if ord.ColIdx < uint32(len(columns)) {
newMergeOrdering.Columns = append(newMergeOrdering.Columns, ord)
}
}
for _, plan := range childPlans {
plan.MergeOrdering = newMergeOrdering
projCols := make([]uint32, len(columns))
copy(projCols, columns)
plan.AddProjection(projCols)
}

// Re-merge the result types now that the child plans have been updated.
resultTypes, err = distsqlplan.MergeResultTypes(leftPlan.ResultTypes, rightPlan.ResultTypes)
if err != nil {
return physicalPlan{}, err
}

mergeOrdering = newMergeOrdering
}

// Merge processors, streams, result routers, and stage counter.
var leftRouters, rightRouters []distsqlplan.ProcessorIdx
p.PhysicalPlan, leftRouters, rightRouters = distsqlplan.MergePlans(
&leftPlan.PhysicalPlan, &rightPlan.PhysicalPlan)
p.ResultRouters = append(leftRouters, rightRouters...)

if n.unionType == tree.UnionOp {
// We just need to append the left and right streams together, so append
// the left and right output routers.
p.ResultRouters = append(leftRouters, rightRouters...)
} else {
// We plan INTERSECT ALL and EXCEPT ALL queries with joiners. Get the
// appropriate join type.
joinType := distsqlSetOpJoinType(n.unionType)

// Nodes where we will run the join processors.
nodes := findJoinProcessorNodes(leftRouters, rightRouters, p.Processors)

// Set up the equality columns.
eqCols := make([]uint32, len(columns))
for i, c := range columns {
eqCols[i] = uint32(c)
}

canUseMergeJoinForSetOp := func(mergeOrdering distsqlrun.Ordering, columns []uint32) bool {
if len(mergeOrdering.Columns) != len(columns) {
return false
}
for i := range columns {
if columns[i] != mergeOrdering.Columns[i].ColIdx {
return false
}
}
return true
}
// TODO(radu): we currently only use merge joins when we have an ordering on
// all equality columns. We should relax this by either:
// - implementing a hybrid hash/merge processor which implements merge
// logic on the columns we have an ordering on, and within each merge
// group uses a hashmap on the remaining columns
// - or: adding a sort processor to complete the order
if !planMergeJoins.Get(&dsp.st.SV) || !canUseMergeJoinForSetOp(mergeOrdering, columns) {
// We must do a hash join, so there's no guaranteed output ordering.
mergeOrdering = distsqlrun.Ordering{}
}

// Project the left-side columns only.
post := distsqlrun.PostProcessSpec{Projection: true}
post.OutputColumns = make([]uint32, len(columns))
copy(post.OutputColumns, columns)

// Create the Core spec.
var core distsqlrun.ProcessorCoreUnion
if mergeOrdering.Columns == nil {
core.HashJoiner = &distsqlrun.HashJoinerSpec{
LeftEqColumns: eqCols,
RightEqColumns: eqCols,
Type: joinType,
}
} else {
core.MergeJoiner = &distsqlrun.MergeJoinerSpec{
LeftOrdering: mergeOrdering,
RightOrdering: mergeOrdering,
Type: joinType,
}
}

p.AddSetOpStage(
nodes, core, post, resultTypes, eqCols, mergeOrdering, leftRouters, rightRouters,
)
}

p.ResultTypes = resultTypes
p.MergeOrdering = mergeOrdering
Expand Down
34 changes: 34 additions & 0 deletions pkg/sql/distsql_plan_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,3 +787,37 @@ func distsqlJoinType(joinType joinType) distsqlrun.JoinType {

panic(fmt.Sprintf("invalid join type %d", joinType))
}

func distsqlSetOpJoinType(setOpType tree.UnionType) distsqlrun.JoinType {
switch setOpType {
case tree.ExceptOp:
return distsqlrun.JoinType_EXCEPT_ALL
case tree.IntersectOp:
return distsqlrun.JoinType_INTERSECT_ALL
default:
panic(fmt.Sprintf("set op type %v unsupported by joins", setOpType))
}
}

func findJoinProcessorNodes(
leftRouters, rightRouters []distsqlplan.ProcessorIdx, processors []distsqlplan.Processor,
) (nodes []roachpb.NodeID) {
// TODO(radu): for now we run a join processor on every node that produces
// data for either source. In the future we should be smarter here.
seen := make(map[roachpb.NodeID]struct{})
for _, pIdx := range leftRouters {
n := processors[pIdx].Node
if _, ok := seen[n]; !ok {
seen[n] = struct{}{}
nodes = append(nodes, n)
}
}
for _, pIdx := range rightRouters {
n := processors[pIdx].Node
if _, ok := seen[n]; !ok {
seen[n] = struct{}{}
nodes = append(nodes, n)
}
}
return nodes
}
Loading

0 comments on commit c8139ab

Please sign in to comment.