Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
50252: opt: change GeoLookupJoin to InvertedLookupJoin r=rytaft a=rytaft

This commit converts the `GeoLookupJoin` operator in the optimizer into
a more general operator, `InvertedLookupJoin`. This new operator maps
directly to the `invertedJoiner` DistSQL processor. In the future, the
`InvertedLookupJoin` operator can also be used for lookup joins on inverted
JSON and array indexes in addition to geospatial indexes.

This commit also adds a new structure, `geoDatumToInvertedExpr`, which
implements the `DatumToInvertedExpr` interface for geospatial data types.
This will enable the optimized plan to be easily converted to a DistSQL
plan for execution by the `invertedJoiner`.

Release note: None


50450: sql: add support for hash and merge joins in the new factory r=yuzefovich a=yuzefovich

**sql: minor cleanup of joiner planning**

`joinNode.mergeJoinOrdering` is now set to non-zero length by the
optimizer only when we can use a merge join (meaning that number of
equality columns is non-zero and equals the length of the ordering we
have). This allows us to slightly simplify the setup up of the merge
joiners.

Additionally, this commit switching to using `[]exec.NodeColumnOrdinal`
instead of `int` for equality columns in `joinPredicate` which allows us
to remove one conversion step when planning hash joiners.

Also we introduce a small helper that will be reused by the follow-up
work.

Release note: None

**sql: add support for hash and merge joins in the new factory**

This commit adds implementation of `ConstructHashJoin` and
`ConstructMergeJoin` in the new factory by mostly refactoring and
reusing already existing code in the physical planner. Notably,
interleaved joins are not supported yet.

Fixes: #50291.
Addresses: #47473.

Release note: None

50646: build: update instructions for updating dependencies r=jbowens a=otan

Release note: None

Co-authored-by: Rebecca Taft <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
4 people committed Jun 25, 2020
4 parents 18957b4 + 3ad1f87 + d55f5eb + cfae01b commit ae0f360
Show file tree
Hide file tree
Showing 36 changed files with 978 additions and 657 deletions.
15 changes: 15 additions & 0 deletions build/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ Dependencies are managed using `go mod`. We use `go mod vendor` so that we can i

Run `go get -u <dependency>`. To get a specific version, run `go get -u <dependency>@<version|branch|sha>`.

When updating a dependency, you should run `go mod tidy` after `go get` to ensure the old entries
are removed from go.sum.

You must then run `make -k vendor_rebuild` to ensure the modules are installed. These changes must
then be committed in the submodule directory (see Working with Submodules).

Expand Down Expand Up @@ -160,6 +163,18 @@ is important to re-run `go mod tidy `and `make -k vendor_rebuild`
against the fetched, updated `vendor` ref, thus generating a new commit in
the submodule that has as its parent the one from the earlier change.

### Recovering from a broken vendor directory

If you happen to run into a broken `vendor` directory which is irrecoverable,
you can run the following commands which will restore the directory in
working order:

```
rm -rf vendor
git checkout HEAD vendor # you can replace HEAD with any branch/sha
git submodule update --init --recursive
```

### Repository Name

We only want the vendor directory used by builds when it is explicitly checked
Expand Down
179 changes: 82 additions & 97 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2131,25 +2131,6 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
}
}

// Outline of the planning process for joins:
//
// - We create PhysicalPlans for the left and right side. Each plan has a set
// of output routers with result that will serve as input for the join.
//
// - We merge the list of processors and streams into a single plan. We keep
// track of the output routers for the left and right results.
//
// - We add a set of joiner processors (say K of them).
//
// - We configure the left and right output routers to send results to
// these joiners, distributing rows by hash (on the join equality columns).
// We are thus breaking up all input rows into K buckets such that rows
// that match on the equality columns end up in the same bucket. If there
// are no equality columns, we cannot distribute rows so we use a single
// joiner.
//
// - The routers of the joiner processors are the result routers of the plan.

leftPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.left.plan)
if err != nil {
return nil, err
Expand All @@ -2159,47 +2140,85 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
return nil, err
}

// Nodes where we will run the join processors.
var nodes []roachpb.NodeID
leftMap, rightMap := leftPlan.PlanToStreamColMap, rightPlan.PlanToStreamColMap
helper := &joinPlanningHelper{
numLeftCols: n.pred.numLeftCols,
numRightCols: n.pred.numRightCols,
leftPlanToStreamColMap: leftMap,
rightPlanToStreamColMap: rightMap,
}
post, joinToStreamColMap := helper.joinOutColumns(n.joinType, n.columns)
onExpr, err := helper.remapOnExpr(planCtx, n.pred.onCond)
if err != nil {
return nil, err
}

// We initialize these properties of the joiner. They will then be used to
// fill in the processor spec. See descriptions for HashJoinerSpec.
var leftEqCols, rightEqCols []uint32
var leftMergeOrd, rightMergeOrd execinfrapb.Ordering
joinType := n.joinType

// Figure out the left and right types.
leftTypes := leftPlan.ResultTypes
rightTypes := rightPlan.ResultTypes
// Set up the equality columns and the merge ordering.
leftEqCols := eqCols(n.pred.leftEqualityIndices, leftMap)
rightEqCols := eqCols(n.pred.rightEqualityIndices, rightMap)
leftMergeOrd := distsqlOrdering(n.mergeJoinOrdering, leftEqCols)
rightMergeOrd := distsqlOrdering(n.mergeJoinOrdering, rightEqCols)

// Set up the equality columns.
if numEq := len(n.pred.leftEqualityIndices); numEq != 0 {
leftEqCols = eqCols(n.pred.leftEqualityIndices, leftPlan.PlanToStreamColMap)
rightEqCols = eqCols(n.pred.rightEqualityIndices, rightPlan.PlanToStreamColMap)
joinResultTypes, err := getTypesForPlanResult(n, joinToStreamColMap)
if err != nil {
return nil, err
}

return dsp.planJoiners(&joinPlanningInfo{
leftPlan: leftPlan,
rightPlan: rightPlan,
joinType: n.joinType,
joinResultTypes: joinResultTypes,
onExpr: onExpr,
post: post,
joinToStreamColMap: joinToStreamColMap,
leftEqCols: leftEqCols,
rightEqCols: rightEqCols,
leftEqColsAreKey: n.pred.leftEqKey,
rightEqColsAreKey: n.pred.rightEqKey,
leftMergeOrd: leftMergeOrd,
rightMergeOrd: rightMergeOrd,
// In the old execFactory we can only have either local or fully
// distributed plans, so checking the last stage is sufficient to get
// the distribution of the whole plans.
leftPlanDistribution: leftPlan.GetLastStageDistribution(),
rightPlanDistribution: rightPlan.GetLastStageDistribution(),
}, n.reqOrdering), nil
}

func (dsp *DistSQLPlanner) planJoiners(
info *joinPlanningInfo, reqOrdering ReqOrdering,
) *PhysicalPlan {
// Outline of the planning process for joins when given PhysicalPlans for
// the left and right side (with each plan having a set of output routers
// with result that will serve as input for the join).
//
// - We merge the list of processors and streams into a single plan. We keep
// track of the output routers for the left and right results.
//
// - We add a set of joiner processors (say K of them).
//
// - We configure the left and right output routers to send results to
// these joiners, distributing rows by hash (on the join equality columns).
// We are thus breaking up all input rows into K buckets such that rows
// that match on the equality columns end up in the same bucket. If there
// are no equality columns, we cannot distribute rows so we use a single
// joiner.
//
// - The routers of the joiner processors are the result routers of the plan.

p := MakePhysicalPlan(dsp.gatewayNodeID)
leftRouters, rightRouters := physicalplan.MergePlans(
&p.PhysicalPlan, &leftPlan.PhysicalPlan, &rightPlan.PhysicalPlan,
&p.PhysicalPlan, &info.leftPlan.PhysicalPlan, &info.rightPlan.PhysicalPlan,
info.leftPlanDistribution, info.rightPlanDistribution,
)

// Set up the output columns.
if numEq := len(n.pred.leftEqualityIndices); numEq != 0 {
// Nodes where we will run the join processors.
var nodes []roachpb.NodeID
if numEq := len(info.leftEqCols); numEq != 0 {
nodes = findJoinProcessorNodes(leftRouters, rightRouters, p.Processors)

if len(n.mergeJoinOrdering) > 0 {
// TODO(radu): we currently only use merge joins when we have an ordering on
// all equality columns. We should relax this by either:
// - implementing a hybrid hash/merge processor which implements merge
// logic on the columns we have an ordering on, and within each merge
// group uses a hashmap on the remaining columns
// - or: adding a sort processor to complete the order
if len(n.mergeJoinOrdering) == len(n.pred.leftEqualityIndices) {
// Excellent! We can use the merge joiner.
leftMergeOrd = distsqlOrdering(n.mergeJoinOrdering, leftEqCols)
rightMergeOrd = distsqlOrdering(n.mergeJoinOrdering, rightEqCols)
}
}
} else {
// Without column equality, we cannot distribute the join. Run a
// single processor.
Expand All @@ -2214,53 +2233,21 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
}
}

rightMap := rightPlan.PlanToStreamColMap
post, joinToStreamColMap := joinOutColumns(n, leftPlan.PlanToStreamColMap, rightMap)
onExpr, err := remapOnExpr(planCtx, n, leftPlan.PlanToStreamColMap, rightMap)
if err != nil {
return nil, err
}

// Create the Core spec.
var core execinfrapb.ProcessorCoreUnion
if leftMergeOrd.Columns == nil {
core.HashJoiner = &execinfrapb.HashJoinerSpec{
LeftEqColumns: leftEqCols,
RightEqColumns: rightEqCols,
OnExpr: onExpr,
Type: joinType,
LeftEqColumnsAreKey: n.pred.leftEqKey,
RightEqColumnsAreKey: n.pred.rightEqKey,
}
} else {
core.MergeJoiner = &execinfrapb.MergeJoinerSpec{
LeftOrdering: leftMergeOrd,
RightOrdering: rightMergeOrd,
OnExpr: onExpr,
Type: joinType,
LeftEqColumnsAreKey: n.pred.leftEqKey,
RightEqColumnsAreKey: n.pred.rightEqKey,
}
}

p.AddJoinStage(
nodes, core, post, leftEqCols, rightEqCols, leftTypes, rightTypes,
leftMergeOrd, rightMergeOrd, leftRouters, rightRouters,
nodes, info.makeCoreSpec(), info.post,
info.leftEqCols, info.rightEqCols,
info.leftPlan.ResultTypes, info.rightPlan.ResultTypes,
info.leftMergeOrd, info.rightMergeOrd,
leftRouters, rightRouters,
)

p.PlanToStreamColMap = joinToStreamColMap
p.ResultTypes, err = getTypesForPlanResult(n, joinToStreamColMap)
if err != nil {
return nil, err
}
p.PlanToStreamColMap = info.joinToStreamColMap
p.ResultTypes = info.joinResultTypes

// Joiners may guarantee an ordering to outputs, so we ensure that
// ordering is propagated through the input synchronizer of the next stage.
// We can propagate the ordering from either side, we use the left side here.
// Note that n.props only has a non-empty ordering for inner joins, where it
// uses the mergeJoinOrdering.
p.SetMergeOrdering(dsp.convertOrdering(n.reqOrdering, p.PlanToStreamColMap))
return &p, nil
p.SetMergeOrdering(dsp.convertOrdering(reqOrdering, p.PlanToStreamColMap))
return &p
}

func (dsp *DistSQLPlanner) createPhysPlan(
Expand Down Expand Up @@ -2944,6 +2931,11 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
// Merge processors, streams, result routers, and stage counter.
leftRouters, rightRouters := physicalplan.MergePlans(
&p.PhysicalPlan, &leftPlan.PhysicalPlan, &rightPlan.PhysicalPlan,
// In the old execFactory we can only have either local or fully
// distributed plans, so checking the last stage is sufficient to get
// the distribution of the whole plans.
leftPlan.GetLastStageDistribution(),
rightPlan.GetLastStageDistribution(),
)

if n.unionType == tree.UnionOp {
Expand Down Expand Up @@ -2998,13 +2990,6 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
copy(post.OutputColumns, streamCols)

// Create the Core spec.
//
// TODO(radu): we currently only use merge joins when we have an ordering on
// all equality columns. We should relax this by either:
// - implementing a hybrid hash/merge processor which implements merge
// logic on the columns we have an ordering on, and within each merge
// group uses a hashmap on the remaining columns
// - or: adding a sort processor to complete the order
var core execinfrapb.ProcessorCoreUnion
if len(mergeOrdering.Columns) < len(streamCols) {
core.HashJoiner = &execinfrapb.HashJoinerSpec{
Expand Down
Loading

0 comments on commit ae0f360

Please sign in to comment.