Skip to content

Commit

Permalink
sql: add support for hash and merge joins in the new factory
Browse files Browse the repository at this point in the history
This commit adds implementation of `ConstructHashJoin` and
`ConstructMergeJoin` in the new factory by mostly refactoring and
reusing already existing code in the physical planner. Notably,
interleaved joins are not supported yet.

Release note: None
  • Loading branch information
yuzefovich committed Jun 24, 2020
1 parent e63a7b2 commit d55f5eb
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 112 deletions.
149 changes: 70 additions & 79 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2131,25 +2131,6 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
}
}

// Outline of the planning process for joins:
//
// - We create PhysicalPlans for the left and right side. Each plan has a set
// of output routers with result that will serve as input for the join.
//
// - We merge the list of processors and streams into a single plan. We keep
// track of the output routers for the left and right results.
//
// - We add a set of joiner processors (say K of them).
//
// - We configure the left and right output routers to send results to
// these joiners, distributing rows by hash (on the join equality columns).
// We are thus breaking up all input rows into K buckets such that rows
// that match on the equality columns end up in the same bucket. If there
// are no equality columns, we cannot distribute rows so we use a single
// joiner.
//
// - The routers of the joiner processors are the result routers of the plan.

leftPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.left.plan)
if err != nil {
return nil, err
Expand All @@ -2172,32 +2153,71 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
return nil, err
}

// Nodes where we will run the join processors.
var nodes []roachpb.NodeID

// We initialize these properties of the joiner. They will then be used to
// fill in the processor spec. See descriptions for HashJoinerSpec.
var leftEqCols, rightEqCols []uint32
var leftMergeOrd, rightMergeOrd execinfrapb.Ordering
joinType := n.joinType
// Set up the equality columns and the merge ordering.
leftEqCols := eqCols(n.pred.leftEqualityIndices, leftMap)
rightEqCols := eqCols(n.pred.rightEqualityIndices, rightMap)
leftMergeOrd := distsqlOrdering(n.mergeJoinOrdering, leftEqCols)
rightMergeOrd := distsqlOrdering(n.mergeJoinOrdering, rightEqCols)

// Figure out the left and right types.
leftTypes := leftPlan.ResultTypes
rightTypes := rightPlan.ResultTypes

// Set up the equality columns.
if numEq := len(n.pred.leftEqualityIndices); numEq != 0 {
leftEqCols = eqCols(n.pred.leftEqualityIndices, leftMap)
rightEqCols = eqCols(n.pred.rightEqualityIndices, rightMap)
joinResultTypes, err := getTypesForPlanResult(n, joinToStreamColMap)
if err != nil {
return nil, err
}

return dsp.planJoiners(&joinPlanningInfo{
leftPlan: leftPlan,
rightPlan: rightPlan,
joinType: n.joinType,
joinResultTypes: joinResultTypes,
onExpr: onExpr,
post: post,
joinToStreamColMap: joinToStreamColMap,
leftEqCols: leftEqCols,
rightEqCols: rightEqCols,
leftEqColsAreKey: n.pred.leftEqKey,
rightEqColsAreKey: n.pred.rightEqKey,
leftMergeOrd: leftMergeOrd,
rightMergeOrd: rightMergeOrd,
// In the old execFactory we can only have either local or fully
// distributed plans, so checking the last stage is sufficient to get
// the distribution of the whole plans.
leftPlanDistribution: leftPlan.GetLastStageDistribution(),
rightPlanDistribution: rightPlan.GetLastStageDistribution(),
}, n.reqOrdering), nil
}

func (dsp *DistSQLPlanner) planJoiners(
info *joinPlanningInfo, reqOrdering ReqOrdering,
) *PhysicalPlan {
// Outline of the planning process for joins when given PhysicalPlans for
// the left and right side (with each plan having a set of output routers
// with result that will serve as input for the join).
//
// - We merge the list of processors and streams into a single plan. We keep
// track of the output routers for the left and right results.
//
// - We add a set of joiner processors (say K of them).
//
// - We configure the left and right output routers to send results to
// these joiners, distributing rows by hash (on the join equality columns).
// We are thus breaking up all input rows into K buckets such that rows
// that match on the equality columns end up in the same bucket. If there
// are no equality columns, we cannot distribute rows so we use a single
// joiner.
//
// - The routers of the joiner processors are the result routers of the plan.

p := MakePhysicalPlan(dsp.gatewayNodeID)
leftRouters, rightRouters := physicalplan.MergePlans(
&p.PhysicalPlan, &leftPlan.PhysicalPlan, &rightPlan.PhysicalPlan,
&p.PhysicalPlan, &info.leftPlan.PhysicalPlan, &info.rightPlan.PhysicalPlan,
info.leftPlanDistribution, info.rightPlanDistribution,
)

// Set up the output columns.
if numEq := len(n.pred.leftEqualityIndices); numEq != 0 {
// Nodes where we will run the join processors.
var nodes []roachpb.NodeID
if numEq := len(info.leftEqCols); numEq != 0 {
nodes = findJoinProcessorNodes(leftRouters, rightRouters, p.Processors)
} else {
// Without column equality, we cannot distribute the join. Run a
Expand All @@ -2213,48 +2233,21 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
}
}

// Create the Core spec.
var core execinfrapb.ProcessorCoreUnion
if len(n.mergeJoinOrdering) == 0 {
core.HashJoiner = &execinfrapb.HashJoinerSpec{
LeftEqColumns: leftEqCols,
RightEqColumns: rightEqCols,
OnExpr: onExpr,
Type: joinType,
LeftEqColumnsAreKey: n.pred.leftEqKey,
RightEqColumnsAreKey: n.pred.rightEqKey,
}
} else {
leftMergeOrd = distsqlOrdering(n.mergeJoinOrdering, leftEqCols)
rightMergeOrd = distsqlOrdering(n.mergeJoinOrdering, rightEqCols)
core.MergeJoiner = &execinfrapb.MergeJoinerSpec{
LeftOrdering: leftMergeOrd,
RightOrdering: rightMergeOrd,
OnExpr: onExpr,
Type: joinType,
LeftEqColumnsAreKey: n.pred.leftEqKey,
RightEqColumnsAreKey: n.pred.rightEqKey,
}
}

p.AddJoinStage(
nodes, core, post, leftEqCols, rightEqCols, leftTypes, rightTypes,
leftMergeOrd, rightMergeOrd, leftRouters, rightRouters,
nodes, info.makeCoreSpec(), info.post,
info.leftEqCols, info.rightEqCols,
info.leftPlan.ResultTypes, info.rightPlan.ResultTypes,
info.leftMergeOrd, info.rightMergeOrd,
leftRouters, rightRouters,
)

p.PlanToStreamColMap = joinToStreamColMap
p.ResultTypes, err = getTypesForPlanResult(n, joinToStreamColMap)
if err != nil {
return nil, err
}
p.PlanToStreamColMap = info.joinToStreamColMap
p.ResultTypes = info.joinResultTypes

// Joiners may guarantee an ordering to outputs, so we ensure that
// ordering is propagated through the input synchronizer of the next stage.
// We can propagate the ordering from either side, we use the left side here.
// Note that n.props only has a non-empty ordering for inner joins, where it
// uses the mergeJoinOrdering.
p.SetMergeOrdering(dsp.convertOrdering(n.reqOrdering, p.PlanToStreamColMap))
return &p, nil
p.SetMergeOrdering(dsp.convertOrdering(reqOrdering, p.PlanToStreamColMap))
return &p
}

func (dsp *DistSQLPlanner) createPhysPlan(
Expand Down Expand Up @@ -2938,6 +2931,11 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
// Merge processors, streams, result routers, and stage counter.
leftRouters, rightRouters := physicalplan.MergePlans(
&p.PhysicalPlan, &leftPlan.PhysicalPlan, &rightPlan.PhysicalPlan,
// In the old execFactory we can only have either local or fully
// distributed plans, so checking the last stage is sufficient to get
// the distribution of the whole plans.
leftPlan.GetLastStageDistribution(),
rightPlan.GetLastStageDistribution(),
)

if n.unionType == tree.UnionOp {
Expand Down Expand Up @@ -2992,13 +2990,6 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
copy(post.OutputColumns, streamCols)

// Create the Core spec.
//
// TODO(radu): we currently only use merge joins when we have an ordering on
// all equality columns. We should relax this by either:
// - implementing a hybrid hash/merge processor which implements merge
// logic on the columns we have an ordering on, and within each merge
// group uses a hashmap on the remaining columns
// - or: adding a sort processor to complete the order
var core execinfrapb.ProcessorCoreUnion
if len(mergeOrdering.Columns) < len(streamCols) {
core.HashJoiner = &execinfrapb.HashJoinerSpec{
Expand Down
55 changes: 55 additions & 0 deletions pkg/sql/distsql_plan_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,65 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/errors"
)

// joinPlanningInfo is a utility struct that contains the information needed to
// perform the physical planning of hash and merge joins.
type joinPlanningInfo struct {
leftPlan, rightPlan *PhysicalPlan
joinType sqlbase.JoinType
joinResultTypes []*types.T
onExpr execinfrapb.Expression
post execinfrapb.PostProcessSpec
joinToStreamColMap []int
// leftEqCols and rightEqCols are the indices of equality columns. These
// are only used when planning a hash join.
leftEqCols, rightEqCols []uint32
leftEqColsAreKey, rightEqColsAreKey bool
// leftMergeOrd and rightMergeOrd are the orderings on both inputs to a
// merge join. They must be of the same length, and if the length is 0,
// then a hash join is planned.
leftMergeOrd, rightMergeOrd execinfrapb.Ordering
leftPlanDistribution, rightPlanDistribution physicalplan.PlanDistribution
}

// makeCoreSpec creates a processor core for hash and merge joins based on the
// join planning information. Merge ordering fields of info determine which
// kind of join is being planned.
func (info *joinPlanningInfo) makeCoreSpec() execinfrapb.ProcessorCoreUnion {
var core execinfrapb.ProcessorCoreUnion
if len(info.leftMergeOrd.Columns) != len(info.rightMergeOrd.Columns) {
panic(fmt.Sprintf(
"unexpectedly different merge join ordering lengths: left %d, right %d",
len(info.leftMergeOrd.Columns), len(info.rightMergeOrd.Columns),
))
}
if len(info.leftMergeOrd.Columns) == 0 {
// There is no required ordering on the columns, so we plan a hash join.
core.HashJoiner = &execinfrapb.HashJoinerSpec{
LeftEqColumns: info.leftEqCols,
RightEqColumns: info.rightEqCols,
OnExpr: info.onExpr,
Type: info.joinType,
LeftEqColumnsAreKey: info.leftEqColsAreKey,
RightEqColumnsAreKey: info.rightEqColsAreKey,
}
} else {
core.MergeJoiner = &execinfrapb.MergeJoinerSpec{
LeftOrdering: info.leftMergeOrd,
RightOrdering: info.rightMergeOrd,
OnExpr: info.onExpr,
Type: info.joinType,
LeftEqColumnsAreKey: info.leftEqColsAreKey,
RightEqColumnsAreKey: info.rightEqColsAreKey,
}
}
return core
}

var planInterleavedJoins = settings.RegisterBoolSetting(
"sql.distsql.interleaved_joins.enabled",
"if set we plan interleaved table joins instead of merge joins when possible",
Expand Down
82 changes: 74 additions & 8 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ func (e *distSQLSpecExecFactory) checkExprsAndMaybeMergeLastStage(
func (e *distSQLSpecExecFactory) ConstructFilter(
n exec.Node, filter tree.TypedExpr, reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
plan := n.(planMaybePhysical)
physPlan := plan.physPlan
physPlan, plan := getPhysPlan(n)
recommendation := e.checkExprsAndMaybeMergeLastStage(physPlan, []tree.TypedExpr{filter})
// AddFilter will attempt to push the filter into the last stage of
// processors.
Expand All @@ -259,8 +258,7 @@ func (e *distSQLSpecExecFactory) ConstructFilter(
func (e *distSQLSpecExecFactory) ConstructSimpleProject(
n exec.Node, cols []exec.NodeColumnOrdinal, colNames []string, reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
plan := n.(planMaybePhysical)
physPlan := plan.physPlan
physPlan, plan := getPhysPlan(n)
projection := make([]uint32, len(cols))
for i := range cols {
projection[i] = uint32(cols[i])
Expand All @@ -285,8 +283,7 @@ func (e *distSQLSpecExecFactory) ConstructRender(
exprs tree.TypedExprs,
reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
plan := n.(planMaybePhysical)
physPlan := plan.physPlan
physPlan, plan := getPhysPlan(n)
recommendation := e.checkExprsAndMaybeMergeLastStage(physPlan, exprs)
if err := physPlan.AddRendering(
exprs, e.getPlanCtx(recommendation), physPlan.PlanToStreamColMap, getTypesFromResultColumns(columns),
Expand All @@ -309,14 +306,21 @@ func (e *distSQLSpecExecFactory) ConstructApplyJoin(
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning")
}

// TODO(yuzefovich): move the decision whether to use an interleaved join from
// the physical planner into the execbuilder.

func (e *distSQLSpecExecFactory) ConstructHashJoin(
joinType sqlbase.JoinType,
left, right exec.Node,
leftEqCols, rightEqCols []exec.NodeColumnOrdinal,
leftEqColsAreKey, rightEqColsAreKey bool,
extraOnCond tree.TypedExpr,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning")
return e.constructHashOrMergeJoin(
joinType, left, right, extraOnCond, leftEqCols, rightEqCols,
leftEqColsAreKey, rightEqColsAreKey,
ReqOrdering{} /* mergeJoinOrdering */, exec.OutputOrdering{}, /* reqOrdering */
)
}

func (e *distSQLSpecExecFactory) ConstructMergeJoin(
Expand All @@ -327,7 +331,14 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin(
reqOrdering exec.OutputOrdering,
leftEqColsAreKey, rightEqColsAreKey bool,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning")
leftEqCols, rightEqCols, mergeJoinOrdering, err := getEqualityIndicesAndMergeJoinOrdering(leftOrdering, rightOrdering)
if err != nil {
return nil, err
}
return e.constructHashOrMergeJoin(
joinType, left, right, onCond, leftEqCols, rightEqCols,
leftEqColsAreKey, rightEqColsAreKey, mergeJoinOrdering, reqOrdering,
)
}

func (e *distSQLSpecExecFactory) ConstructGroupBy(
Expand Down Expand Up @@ -673,3 +684,58 @@ func (e *distSQLSpecExecFactory) ConstructExport(
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning")
}

func getPhysPlan(n exec.Node) (*PhysicalPlan, planMaybePhysical) {
plan := n.(planMaybePhysical)
return plan.physPlan, plan
}

func (e *distSQLSpecExecFactory) constructHashOrMergeJoin(
joinType sqlbase.JoinType,
left, right exec.Node,
onCond tree.TypedExpr,
leftEqCols, rightEqCols []exec.NodeColumnOrdinal,
leftEqColsAreKey, rightEqColsAreKey bool,
mergeJoinOrdering sqlbase.ColumnOrdering,
reqOrdering exec.OutputOrdering,
) (exec.Node, error) {
leftPlan, _ := getPhysPlan(left)
rightPlan, _ := getPhysPlan(right)
resultColumns := getJoinResultColumns(joinType, leftPlan.ResultColumns, rightPlan.ResultColumns)
leftMap, rightMap := leftPlan.PlanToStreamColMap, rightPlan.PlanToStreamColMap
helper := &joinPlanningHelper{
numLeftCols: len(leftPlan.ResultColumns),
numRightCols: len(rightPlan.ResultColumns),
leftPlanToStreamColMap: leftMap,
rightPlanToStreamColMap: rightMap,
}
post, joinToStreamColMap := helper.joinOutColumns(joinType, resultColumns)
// We always try to distribute the join, but planJoiners() itself might
// decide not to.
onExpr, err := helper.remapOnExpr(e.getPlanCtx(shouldDistribute), onCond)
if err != nil {
return nil, err
}

leftEqColsRemapped := eqCols(leftEqCols, leftMap)
rightEqColsRemapped := eqCols(rightEqCols, rightMap)
p := e.dsp.planJoiners(&joinPlanningInfo{
leftPlan: leftPlan,
rightPlan: rightPlan,
joinType: joinType,
joinResultTypes: getTypesFromResultColumns(resultColumns),
onExpr: onExpr,
post: post,
joinToStreamColMap: joinToStreamColMap,
leftEqCols: leftEqColsRemapped,
rightEqCols: rightEqColsRemapped,
leftEqColsAreKey: leftEqColsAreKey,
rightEqColsAreKey: rightEqColsAreKey,
leftMergeOrd: distsqlOrdering(mergeJoinOrdering, leftEqColsRemapped),
rightMergeOrd: distsqlOrdering(mergeJoinOrdering, rightEqColsRemapped),
leftPlanDistribution: leftPlan.Distribution,
rightPlanDistribution: rightPlan.Distribution,
}, ReqOrdering(reqOrdering))
p.ResultColumns = resultColumns
return planMaybePhysical{physPlan: p}, nil
}
Loading

0 comments on commit d55f5eb

Please sign in to comment.