diff --git a/build/README.md b/build/README.md index c4c94e95e046..0cd00c3db4ef 100644 --- a/build/README.md +++ b/build/README.md @@ -90,6 +90,9 @@ Dependencies are managed using `go mod`. We use `go mod vendor` so that we can i Run `go get -u `. To get a specific version, run `go get -u @`. +When updating a dependency, you should run `go mod tidy` after `go get` to ensure the old entries +are removed from go.sum. + You must then run `make -k vendor_rebuild` to ensure the modules are installed. These changes must then be committed in the submodule directory (see Working with Submodules). @@ -160,6 +163,18 @@ is important to re-run `go mod tidy `and `make -k vendor_rebuild` against the fetched, updated `vendor` ref, thus generating a new commit in the submodule that has as its parent the one from the earlier change. +### Recovering from a broken vendor directory + +If you happen to run into a broken `vendor` directory which is irrecoverable, +you can run the following commands which will restore the directory in +working order: + +``` +rm -rf vendor +git checkout HEAD vendor # you can replace HEAD with any branch/sha +git submodule update --init --recursive +``` + ### Repository Name We only want the vendor directory used by builds when it is explicitly checked diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 3eb5936558e0..d27bf4e8b8f1 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2131,25 +2131,6 @@ func (dsp *DistSQLPlanner) createPlanForJoin( } } - // Outline of the planning process for joins: - // - // - We create PhysicalPlans for the left and right side. Each plan has a set - // of output routers with result that will serve as input for the join. - // - // - We merge the list of processors and streams into a single plan. We keep - // track of the output routers for the left and right results. - // - // - We add a set of joiner processors (say K of them). - // - // - We configure the left and right output routers to send results to - // these joiners, distributing rows by hash (on the join equality columns). - // We are thus breaking up all input rows into K buckets such that rows - // that match on the equality columns end up in the same bucket. If there - // are no equality columns, we cannot distribute rows so we use a single - // joiner. - // - // - The routers of the joiner processors are the result routers of the plan. - leftPlan, err := dsp.createPhysPlanForPlanNode(planCtx, n.left.plan) if err != nil { return nil, err @@ -2159,47 +2140,85 @@ func (dsp *DistSQLPlanner) createPlanForJoin( return nil, err } - // Nodes where we will run the join processors. - var nodes []roachpb.NodeID + leftMap, rightMap := leftPlan.PlanToStreamColMap, rightPlan.PlanToStreamColMap + helper := &joinPlanningHelper{ + numLeftCols: n.pred.numLeftCols, + numRightCols: n.pred.numRightCols, + leftPlanToStreamColMap: leftMap, + rightPlanToStreamColMap: rightMap, + } + post, joinToStreamColMap := helper.joinOutColumns(n.joinType, n.columns) + onExpr, err := helper.remapOnExpr(planCtx, n.pred.onCond) + if err != nil { + return nil, err + } // We initialize these properties of the joiner. They will then be used to // fill in the processor spec. See descriptions for HashJoinerSpec. - var leftEqCols, rightEqCols []uint32 - var leftMergeOrd, rightMergeOrd execinfrapb.Ordering - joinType := n.joinType - - // Figure out the left and right types. - leftTypes := leftPlan.ResultTypes - rightTypes := rightPlan.ResultTypes + // Set up the equality columns and the merge ordering. + leftEqCols := eqCols(n.pred.leftEqualityIndices, leftMap) + rightEqCols := eqCols(n.pred.rightEqualityIndices, rightMap) + leftMergeOrd := distsqlOrdering(n.mergeJoinOrdering, leftEqCols) + rightMergeOrd := distsqlOrdering(n.mergeJoinOrdering, rightEqCols) - // Set up the equality columns. - if numEq := len(n.pred.leftEqualityIndices); numEq != 0 { - leftEqCols = eqCols(n.pred.leftEqualityIndices, leftPlan.PlanToStreamColMap) - rightEqCols = eqCols(n.pred.rightEqualityIndices, rightPlan.PlanToStreamColMap) + joinResultTypes, err := getTypesForPlanResult(n, joinToStreamColMap) + if err != nil { + return nil, err } + return dsp.planJoiners(&joinPlanningInfo{ + leftPlan: leftPlan, + rightPlan: rightPlan, + joinType: n.joinType, + joinResultTypes: joinResultTypes, + onExpr: onExpr, + post: post, + joinToStreamColMap: joinToStreamColMap, + leftEqCols: leftEqCols, + rightEqCols: rightEqCols, + leftEqColsAreKey: n.pred.leftEqKey, + rightEqColsAreKey: n.pred.rightEqKey, + leftMergeOrd: leftMergeOrd, + rightMergeOrd: rightMergeOrd, + // In the old execFactory we can only have either local or fully + // distributed plans, so checking the last stage is sufficient to get + // the distribution of the whole plans. + leftPlanDistribution: leftPlan.GetLastStageDistribution(), + rightPlanDistribution: rightPlan.GetLastStageDistribution(), + }, n.reqOrdering), nil +} + +func (dsp *DistSQLPlanner) planJoiners( + info *joinPlanningInfo, reqOrdering ReqOrdering, +) *PhysicalPlan { + // Outline of the planning process for joins when given PhysicalPlans for + // the left and right side (with each plan having a set of output routers + // with result that will serve as input for the join). + // + // - We merge the list of processors and streams into a single plan. We keep + // track of the output routers for the left and right results. + // + // - We add a set of joiner processors (say K of them). + // + // - We configure the left and right output routers to send results to + // these joiners, distributing rows by hash (on the join equality columns). + // We are thus breaking up all input rows into K buckets such that rows + // that match on the equality columns end up in the same bucket. If there + // are no equality columns, we cannot distribute rows so we use a single + // joiner. + // + // - The routers of the joiner processors are the result routers of the plan. + p := MakePhysicalPlan(dsp.gatewayNodeID) leftRouters, rightRouters := physicalplan.MergePlans( - &p.PhysicalPlan, &leftPlan.PhysicalPlan, &rightPlan.PhysicalPlan, + &p.PhysicalPlan, &info.leftPlan.PhysicalPlan, &info.rightPlan.PhysicalPlan, + info.leftPlanDistribution, info.rightPlanDistribution, ) - // Set up the output columns. - if numEq := len(n.pred.leftEqualityIndices); numEq != 0 { + // Nodes where we will run the join processors. + var nodes []roachpb.NodeID + if numEq := len(info.leftEqCols); numEq != 0 { nodes = findJoinProcessorNodes(leftRouters, rightRouters, p.Processors) - - if len(n.mergeJoinOrdering) > 0 { - // TODO(radu): we currently only use merge joins when we have an ordering on - // all equality columns. We should relax this by either: - // - implementing a hybrid hash/merge processor which implements merge - // logic on the columns we have an ordering on, and within each merge - // group uses a hashmap on the remaining columns - // - or: adding a sort processor to complete the order - if len(n.mergeJoinOrdering) == len(n.pred.leftEqualityIndices) { - // Excellent! We can use the merge joiner. - leftMergeOrd = distsqlOrdering(n.mergeJoinOrdering, leftEqCols) - rightMergeOrd = distsqlOrdering(n.mergeJoinOrdering, rightEqCols) - } - } } else { // Without column equality, we cannot distribute the join. Run a // single processor. @@ -2214,53 +2233,21 @@ func (dsp *DistSQLPlanner) createPlanForJoin( } } - rightMap := rightPlan.PlanToStreamColMap - post, joinToStreamColMap := joinOutColumns(n, leftPlan.PlanToStreamColMap, rightMap) - onExpr, err := remapOnExpr(planCtx, n, leftPlan.PlanToStreamColMap, rightMap) - if err != nil { - return nil, err - } - - // Create the Core spec. - var core execinfrapb.ProcessorCoreUnion - if leftMergeOrd.Columns == nil { - core.HashJoiner = &execinfrapb.HashJoinerSpec{ - LeftEqColumns: leftEqCols, - RightEqColumns: rightEqCols, - OnExpr: onExpr, - Type: joinType, - LeftEqColumnsAreKey: n.pred.leftEqKey, - RightEqColumnsAreKey: n.pred.rightEqKey, - } - } else { - core.MergeJoiner = &execinfrapb.MergeJoinerSpec{ - LeftOrdering: leftMergeOrd, - RightOrdering: rightMergeOrd, - OnExpr: onExpr, - Type: joinType, - LeftEqColumnsAreKey: n.pred.leftEqKey, - RightEqColumnsAreKey: n.pred.rightEqKey, - } - } - p.AddJoinStage( - nodes, core, post, leftEqCols, rightEqCols, leftTypes, rightTypes, - leftMergeOrd, rightMergeOrd, leftRouters, rightRouters, + nodes, info.makeCoreSpec(), info.post, + info.leftEqCols, info.rightEqCols, + info.leftPlan.ResultTypes, info.rightPlan.ResultTypes, + info.leftMergeOrd, info.rightMergeOrd, + leftRouters, rightRouters, ) - p.PlanToStreamColMap = joinToStreamColMap - p.ResultTypes, err = getTypesForPlanResult(n, joinToStreamColMap) - if err != nil { - return nil, err - } + p.PlanToStreamColMap = info.joinToStreamColMap + p.ResultTypes = info.joinResultTypes // Joiners may guarantee an ordering to outputs, so we ensure that // ordering is propagated through the input synchronizer of the next stage. - // We can propagate the ordering from either side, we use the left side here. - // Note that n.props only has a non-empty ordering for inner joins, where it - // uses the mergeJoinOrdering. - p.SetMergeOrdering(dsp.convertOrdering(n.reqOrdering, p.PlanToStreamColMap)) - return &p, nil + p.SetMergeOrdering(dsp.convertOrdering(reqOrdering, p.PlanToStreamColMap)) + return &p } func (dsp *DistSQLPlanner) createPhysPlan( @@ -2944,6 +2931,11 @@ func (dsp *DistSQLPlanner) createPlanForSetOp( // Merge processors, streams, result routers, and stage counter. leftRouters, rightRouters := physicalplan.MergePlans( &p.PhysicalPlan, &leftPlan.PhysicalPlan, &rightPlan.PhysicalPlan, + // In the old execFactory we can only have either local or fully + // distributed plans, so checking the last stage is sufficient to get + // the distribution of the whole plans. + leftPlan.GetLastStageDistribution(), + rightPlan.GetLastStageDistribution(), ) if n.unionType == tree.UnionOp { @@ -2998,13 +2990,6 @@ func (dsp *DistSQLPlanner) createPlanForSetOp( copy(post.OutputColumns, streamCols) // Create the Core spec. - // - // TODO(radu): we currently only use merge joins when we have an ordering on - // all equality columns. We should relax this by either: - // - implementing a hybrid hash/merge processor which implements merge - // logic on the columns we have an ordering on, and within each merge - // group uses a hashmap on the remaining columns - // - or: adding a sort processor to complete the order var core execinfrapb.ProcessorCoreUnion if len(mergeOrdering.Columns) < len(streamCols) { core.HashJoiner = &execinfrapb.HashJoinerSpec{ diff --git a/pkg/sql/distsql_plan_join.go b/pkg/sql/distsql_plan_join.go index 948adbf3bd98..da3cf712bae9 100644 --- a/pkg/sql/distsql_plan_join.go +++ b/pkg/sql/distsql_plan_join.go @@ -19,13 +19,69 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/errors" ) +// joinPlanningInfo is a utility struct that contains the information needed to +// perform the physical planning of hash and merge joins. +type joinPlanningInfo struct { + leftPlan, rightPlan *PhysicalPlan + joinType sqlbase.JoinType + joinResultTypes []*types.T + onExpr execinfrapb.Expression + post execinfrapb.PostProcessSpec + joinToStreamColMap []int + // leftEqCols and rightEqCols are the indices of equality columns. These + // are only used when planning a hash join. + leftEqCols, rightEqCols []uint32 + leftEqColsAreKey, rightEqColsAreKey bool + // leftMergeOrd and rightMergeOrd are the orderings on both inputs to a + // merge join. They must be of the same length, and if the length is 0, + // then a hash join is planned. + leftMergeOrd, rightMergeOrd execinfrapb.Ordering + leftPlanDistribution, rightPlanDistribution physicalplan.PlanDistribution +} + +// makeCoreSpec creates a processor core for hash and merge joins based on the +// join planning information. Merge ordering fields of info determine which +// kind of join is being planned. +func (info *joinPlanningInfo) makeCoreSpec() execinfrapb.ProcessorCoreUnion { + var core execinfrapb.ProcessorCoreUnion + if len(info.leftMergeOrd.Columns) != len(info.rightMergeOrd.Columns) { + panic(fmt.Sprintf( + "unexpectedly different merge join ordering lengths: left %d, right %d", + len(info.leftMergeOrd.Columns), len(info.rightMergeOrd.Columns), + )) + } + if len(info.leftMergeOrd.Columns) == 0 { + // There is no required ordering on the columns, so we plan a hash join. + core.HashJoiner = &execinfrapb.HashJoinerSpec{ + LeftEqColumns: info.leftEqCols, + RightEqColumns: info.rightEqCols, + OnExpr: info.onExpr, + Type: info.joinType, + LeftEqColumnsAreKey: info.leftEqColsAreKey, + RightEqColumnsAreKey: info.rightEqColsAreKey, + } + } else { + core.MergeJoiner = &execinfrapb.MergeJoinerSpec{ + LeftOrdering: info.leftMergeOrd, + RightOrdering: info.rightMergeOrd, + OnExpr: info.onExpr, + Type: info.joinType, + LeftEqColumnsAreKey: info.leftEqColsAreKey, + RightEqColumnsAreKey: info.rightEqColsAreKey, + } + } + return core +} + var planInterleavedJoins = settings.RegisterBoolSetting( "sql.distsql.interleaved_joins.enabled", "if set we plan interleaved table joins instead of merge joins when possible", @@ -56,7 +112,7 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin( var totalLimitHint int64 for i, t := range []struct { scan *scanNode - eqIndices []int + eqIndices []exec.NodeColumnOrdinal }{ { scan: leftScan, @@ -105,8 +161,15 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin( joinType := n.joinType - post, joinToStreamColMap := joinOutColumns(n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap) - onExpr, err := remapOnExpr(planCtx, n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap) + leftMap, rightMap := plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap + helper := &joinPlanningHelper{ + numLeftCols: n.pred.numLeftCols, + numRightCols: n.pred.numRightCols, + leftPlanToStreamColMap: leftMap, + rightPlanToStreamColMap: rightMap, + } + post, joinToStreamColMap := helper.joinOutColumns(n.joinType, n.columns) + onExpr, err := helper.remapOnExpr(planCtx, n.pred.onCond) if err != nil { return nil, false, err } @@ -228,10 +291,17 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin( return plan, true, nil } -func joinOutColumns( - n *joinNode, leftPlanToStreamColMap, rightPlanToStreamColMap []int, +// joinPlanningHelper is a utility struct that helps with the physical planning +// of joins. +type joinPlanningHelper struct { + numLeftCols, numRightCols int + leftPlanToStreamColMap, rightPlanToStreamColMap []int +} + +func (h *joinPlanningHelper) joinOutColumns( + joinType sqlbase.JoinType, columns sqlbase.ResultColumns, ) (post execinfrapb.PostProcessSpec, joinToStreamColMap []int) { - joinToStreamColMap = makePlanToStreamColMap(len(n.columns)) + joinToStreamColMap = makePlanToStreamColMap(len(columns)) post.Projection = true // addOutCol appends to post.OutputColumns and returns the index @@ -245,14 +315,14 @@ func joinOutColumns( // The join columns are in two groups: // - the columns on the left side (numLeftCols) // - the columns on the right side (numRightCols) - for i := 0; i < n.pred.numLeftCols; i++ { - joinToStreamColMap[i] = addOutCol(uint32(leftPlanToStreamColMap[i])) + for i := 0; i < h.numLeftCols; i++ { + joinToStreamColMap[i] = addOutCol(uint32(h.leftPlanToStreamColMap[i])) } - if n.pred.joinType != sqlbase.LeftSemiJoin && n.pred.joinType != sqlbase.LeftAntiJoin { - for i := 0; i < n.pred.numRightCols; i++ { - joinToStreamColMap[n.pred.numLeftCols+i] = addOutCol( - uint32(n.pred.numLeftCols + rightPlanToStreamColMap[i]), + if joinType != sqlbase.LeftSemiJoin && joinType != sqlbase.LeftAntiJoin { + for i := 0; i < h.numRightCols; i++ { + joinToStreamColMap[h.numLeftCols+i] = addOutCol( + uint32(h.numLeftCols + h.rightPlanToStreamColMap[i]), ) } } @@ -263,29 +333,29 @@ func joinOutColumns( // remapOnExpr remaps ordinal references in the on condition (which refer to the // join columns as described above) to values that make sense in the joiner (0 // to N-1 for the left input columns, N to N+M-1 for the right input columns). -func remapOnExpr( - planCtx *PlanningCtx, n *joinNode, leftPlanToStreamColMap, rightPlanToStreamColMap []int, +func (h *joinPlanningHelper) remapOnExpr( + planCtx *PlanningCtx, onCond tree.TypedExpr, ) (execinfrapb.Expression, error) { - if n.pred.onCond == nil { + if onCond == nil { return execinfrapb.Expression{}, nil } - joinColMap := make([]int, n.pred.numLeftCols+n.pred.numRightCols) + joinColMap := make([]int, h.numLeftCols+h.numRightCols) idx := 0 leftCols := 0 - for i := 0; i < n.pred.numLeftCols; i++ { - joinColMap[idx] = leftPlanToStreamColMap[i] - if leftPlanToStreamColMap[i] != -1 { + for i := 0; i < h.numLeftCols; i++ { + joinColMap[idx] = h.leftPlanToStreamColMap[i] + if h.leftPlanToStreamColMap[i] != -1 { leftCols++ } idx++ } - for i := 0; i < n.pred.numRightCols; i++ { - joinColMap[idx] = leftCols + rightPlanToStreamColMap[i] + for i := 0; i < h.numRightCols; i++ { + joinColMap[idx] = leftCols + h.rightPlanToStreamColMap[i] idx++ } - return physicalplan.MakeExpression(n.pred.onCond, planCtx, joinColMap) + return physicalplan.MakeExpression(onCond, planCtx, joinColMap) } // eqCols produces a slice of ordinal references for the plan columns specified @@ -293,7 +363,7 @@ func remapOnExpr( // That is: eqIndices contains a slice of plan column indexes and planToColMap // maps the plan column indexes to the ordinal references (index of the // intermediate row produced). -func eqCols(eqIndices, planToColMap []int) []uint32 { +func eqCols(eqIndices []exec.NodeColumnOrdinal, planToColMap []int) []uint32 { eqCols := make([]uint32, len(eqIndices)) for i, planCol := range eqIndices { eqCols[i] = uint32(planToColMap[planCol]) @@ -343,8 +413,8 @@ func useInterleavedJoin(n *joinNode) bool { return false } - var ancestorEqIndices []int - var descendantEqIndices []int + var ancestorEqIndices []exec.NodeColumnOrdinal + var descendantEqIndices []exec.NodeColumnOrdinal // We are guaranteed that both of the sources are scan nodes from // n.interleavedNodes(). if ancestor == n.left.plan.(*scanNode) { @@ -377,8 +447,8 @@ func useInterleavedJoin(n *joinNode) bool { // the index in scanNode.resultColumns. To convert the colID // from the index descriptor, we can use the map provided by // colIdxMap. - if ancestorEqIndices[info.ColIdx] != ancestor.colIdxMap[colID] || - descendantEqIndices[info.ColIdx] != descendant.colIdxMap[colID] { + if int(ancestorEqIndices[info.ColIdx]) != ancestor.colIdxMap[colID] || + int(descendantEqIndices[info.ColIdx]) != descendant.colIdxMap[colID] { // The column in the ordering does not correspond to // the column in the interleave prefix. // We should not try to do an interleaved join. diff --git a/pkg/sql/distsql_plan_join_test.go b/pkg/sql/distsql_plan_join_test.go index 34bc1e88ab89..68d0cce00824 100644 --- a/pkg/sql/distsql_plan_join_test.go +++ b/pkg/sql/distsql_plan_join_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -30,12 +31,14 @@ import ( "github.com/cockroachdb/errors" ) -func setTestEqColForSide(colName string, side *scanNode, equalityIndices *[]int) error { +func setTestEqColForSide( + colName string, side *scanNode, equalityIndices *[]exec.NodeColumnOrdinal, +) error { colFound := false for i, leftCol := range side.cols { if colName == leftCol.Name { - *equalityIndices = append(*equalityIndices, i) + *equalityIndices = append(*equalityIndices, exec.NodeColumnOrdinal(i)) colFound = true break } @@ -774,7 +777,7 @@ func TestAlignInterleavedSpans(t *testing.T) { // The returned ordering can be partial, i.e. only contains a subset of the // equality columns. func computeMergeJoinOrdering( - a, b sqlbase.ColumnOrdering, colA, colB []int, + a, b sqlbase.ColumnOrdering, colA, colB []exec.NodeColumnOrdinal, ) sqlbase.ColumnOrdering { if len(colA) != len(colB) { panic(fmt.Sprintf("invalid column lists %v; %v", colA, colB)) @@ -786,7 +789,7 @@ func computeMergeJoinOrdering( break } for j := range colA { - if colA[j] == a[i].ColIdx && colB[j] == b[i].ColIdx { + if int(colA[j]) == a[i].ColIdx && int(colB[j]) == b[i].ColIdx { result = append(result, sqlbase.ColumnOrderInfo{ ColIdx: j, Direction: a[i].Direction, diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 0677a69789af..6ec5aab3c909 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -11,7 +11,6 @@ package sql import ( - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/opt" @@ -244,8 +243,7 @@ func (e *distSQLSpecExecFactory) checkExprsAndMaybeMergeLastStage( func (e *distSQLSpecExecFactory) ConstructFilter( n exec.Node, filter tree.TypedExpr, reqOrdering exec.OutputOrdering, ) (exec.Node, error) { - plan := n.(planMaybePhysical) - physPlan := plan.physPlan + physPlan, plan := getPhysPlan(n) recommendation := e.checkExprsAndMaybeMergeLastStage(physPlan, []tree.TypedExpr{filter}) // AddFilter will attempt to push the filter into the last stage of // processors. @@ -259,8 +257,7 @@ func (e *distSQLSpecExecFactory) ConstructFilter( func (e *distSQLSpecExecFactory) ConstructSimpleProject( n exec.Node, cols []exec.NodeColumnOrdinal, colNames []string, reqOrdering exec.OutputOrdering, ) (exec.Node, error) { - plan := n.(planMaybePhysical) - physPlan := plan.physPlan + physPlan, plan := getPhysPlan(n) projection := make([]uint32, len(cols)) for i := range cols { projection[i] = uint32(cols[i]) @@ -285,8 +282,7 @@ func (e *distSQLSpecExecFactory) ConstructRender( exprs tree.TypedExprs, reqOrdering exec.OutputOrdering, ) (exec.Node, error) { - plan := n.(planMaybePhysical) - physPlan := plan.physPlan + physPlan, plan := getPhysPlan(n) recommendation := e.checkExprsAndMaybeMergeLastStage(physPlan, exprs) if err := physPlan.AddRendering( exprs, e.getPlanCtx(recommendation), physPlan.PlanToStreamColMap, getTypesFromResultColumns(columns), @@ -309,6 +305,9 @@ func (e *distSQLSpecExecFactory) ConstructApplyJoin( return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") } +// TODO(yuzefovich): move the decision whether to use an interleaved join from +// the physical planner into the execbuilder. + func (e *distSQLSpecExecFactory) ConstructHashJoin( joinType sqlbase.JoinType, left, right exec.Node, @@ -316,7 +315,11 @@ func (e *distSQLSpecExecFactory) ConstructHashJoin( leftEqColsAreKey, rightEqColsAreKey bool, extraOnCond tree.TypedExpr, ) (exec.Node, error) { - return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") + return e.constructHashOrMergeJoin( + joinType, left, right, extraOnCond, leftEqCols, rightEqCols, + leftEqColsAreKey, rightEqColsAreKey, + ReqOrdering{} /* mergeJoinOrdering */, exec.OutputOrdering{}, /* reqOrdering */ + ) } func (e *distSQLSpecExecFactory) ConstructMergeJoin( @@ -327,7 +330,14 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin( reqOrdering exec.OutputOrdering, leftEqColsAreKey, rightEqColsAreKey bool, ) (exec.Node, error) { - return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") + leftEqCols, rightEqCols, mergeJoinOrdering, err := getEqualityIndicesAndMergeJoinOrdering(leftOrdering, rightOrdering) + if err != nil { + return nil, err + } + return e.constructHashOrMergeJoin( + joinType, left, right, onCond, leftEqCols, rightEqCols, + leftEqColsAreKey, rightEqColsAreKey, mergeJoinOrdering, reqOrdering, + ) } func (e *distSQLSpecExecFactory) ConstructGroupBy( @@ -398,13 +408,13 @@ func (e *distSQLSpecExecFactory) ConstructLookupJoin( return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") } -func (e *distSQLSpecExecFactory) ConstructGeoLookupJoin( +func (e *distSQLSpecExecFactory) ConstructInvertedJoin( joinType sqlbase.JoinType, - geoRelationshipType geoindex.RelationshipType, + invertedExpr tree.TypedExpr, input exec.Node, table cat.Table, index cat.Index, - geoCol exec.NodeColumnOrdinal, + inputCol exec.NodeColumnOrdinal, lookupCols exec.TableColumnOrdinalSet, onCond tree.TypedExpr, reqOrdering exec.OutputOrdering, @@ -673,3 +683,58 @@ func (e *distSQLSpecExecFactory) ConstructExport( ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning") } + +func getPhysPlan(n exec.Node) (*PhysicalPlan, planMaybePhysical) { + plan := n.(planMaybePhysical) + return plan.physPlan, plan +} + +func (e *distSQLSpecExecFactory) constructHashOrMergeJoin( + joinType sqlbase.JoinType, + left, right exec.Node, + onCond tree.TypedExpr, + leftEqCols, rightEqCols []exec.NodeColumnOrdinal, + leftEqColsAreKey, rightEqColsAreKey bool, + mergeJoinOrdering sqlbase.ColumnOrdering, + reqOrdering exec.OutputOrdering, +) (exec.Node, error) { + leftPlan, _ := getPhysPlan(left) + rightPlan, _ := getPhysPlan(right) + resultColumns := getJoinResultColumns(joinType, leftPlan.ResultColumns, rightPlan.ResultColumns) + leftMap, rightMap := leftPlan.PlanToStreamColMap, rightPlan.PlanToStreamColMap + helper := &joinPlanningHelper{ + numLeftCols: len(leftPlan.ResultColumns), + numRightCols: len(rightPlan.ResultColumns), + leftPlanToStreamColMap: leftMap, + rightPlanToStreamColMap: rightMap, + } + post, joinToStreamColMap := helper.joinOutColumns(joinType, resultColumns) + // We always try to distribute the join, but planJoiners() itself might + // decide not to. + onExpr, err := helper.remapOnExpr(e.getPlanCtx(shouldDistribute), onCond) + if err != nil { + return nil, err + } + + leftEqColsRemapped := eqCols(leftEqCols, leftMap) + rightEqColsRemapped := eqCols(rightEqCols, rightMap) + p := e.dsp.planJoiners(&joinPlanningInfo{ + leftPlan: leftPlan, + rightPlan: rightPlan, + joinType: joinType, + joinResultTypes: getTypesFromResultColumns(resultColumns), + onExpr: onExpr, + post: post, + joinToStreamColMap: joinToStreamColMap, + leftEqCols: leftEqColsRemapped, + rightEqCols: rightEqColsRemapped, + leftEqColsAreKey: leftEqColsAreKey, + rightEqColsAreKey: rightEqColsAreKey, + leftMergeOrd: distsqlOrdering(mergeJoinOrdering, leftEqColsRemapped), + rightMergeOrd: distsqlOrdering(mergeJoinOrdering, rightEqColsRemapped), + leftPlanDistribution: leftPlan.Distribution, + rightPlanDistribution: rightPlan.Distribution, + }, ReqOrdering(reqOrdering)) + p.ResultColumns = resultColumns + return planMaybePhysical{physPlan: p}, nil +} diff --git a/pkg/sql/exec_factory_util.go b/pkg/sql/exec_factory_util.go index b7f24c0cafc5..044564da19b3 100644 --- a/pkg/sql/exec_factory_util.go +++ b/pkg/sql/exec_factory_util.go @@ -177,3 +177,35 @@ func getResultColumnsForSimpleProject( } return resultCols } + +func getEqualityIndicesAndMergeJoinOrdering( + leftOrdering, rightOrdering sqlbase.ColumnOrdering, +) ( + leftEqualityIndices, rightEqualityIndices []exec.NodeColumnOrdinal, + mergeJoinOrdering sqlbase.ColumnOrdering, + err error, +) { + n := len(leftOrdering) + if n == 0 || len(rightOrdering) != n { + return nil, nil, nil, errors.Errorf( + "orderings from the left and right side must be the same non-zero length", + ) + } + leftEqualityIndices = make([]exec.NodeColumnOrdinal, n) + rightEqualityIndices = make([]exec.NodeColumnOrdinal, n) + for i := 0; i < n; i++ { + leftColIdx, rightColIdx := leftOrdering[i].ColIdx, rightOrdering[i].ColIdx + leftEqualityIndices[i] = exec.NodeColumnOrdinal(leftColIdx) + rightEqualityIndices[i] = exec.NodeColumnOrdinal(rightColIdx) + } + + mergeJoinOrdering = make(sqlbase.ColumnOrdering, n) + for i := 0; i < n; i++ { + // The mergeJoinOrdering "columns" are equality column indices. Because of + // the way we constructed the equality indices, the ordering will always be + // 0,1,2,3.. + mergeJoinOrdering[i].ColIdx = i + mergeJoinOrdering[i].Direction = leftOrdering[i].Direction + } + return leftEqualityIndices, rightEqualityIndices, mergeJoinOrdering, nil +} diff --git a/pkg/sql/execinfrapb/processors_sql.pb.go b/pkg/sql/execinfrapb/processors_sql.pb.go index cf5b02d4bd4c..6257c8074125 100644 --- a/pkg/sql/execinfrapb/processors_sql.pb.go +++ b/pkg/sql/execinfrapb/processors_sql.pb.go @@ -64,7 +64,7 @@ func (x *ScanVisibility) UnmarshalJSON(data []byte) error { return nil } func (ScanVisibility) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{0} + return fileDescriptor_processors_sql_684af93f61811791, []int{0} } // These mirror the aggregate functions supported by sql/parser. See @@ -181,7 +181,7 @@ func (x *AggregatorSpec_Func) UnmarshalJSON(data []byte) error { return nil } func (AggregatorSpec_Func) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{12, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{12, 0} } type AggregatorSpec_Type int32 @@ -227,7 +227,7 @@ func (x *AggregatorSpec_Type) UnmarshalJSON(data []byte) error { return nil } func (AggregatorSpec_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{12, 1} + return fileDescriptor_processors_sql_684af93f61811791, []int{12, 1} } type WindowerSpec_WindowFunc int32 @@ -291,7 +291,7 @@ func (x *WindowerSpec_WindowFunc) UnmarshalJSON(data []byte) error { return nil } func (WindowerSpec_WindowFunc) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 0} } // Mode indicates which mode of framing is used. @@ -335,7 +335,7 @@ func (x *WindowerSpec_Frame_Mode) UnmarshalJSON(data []byte) error { return nil } func (WindowerSpec_Frame_Mode) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1, 0} } // BoundType indicates which type of boundary is used. @@ -382,7 +382,7 @@ func (x *WindowerSpec_Frame_BoundType) UnmarshalJSON(data []byte) error { return nil } func (WindowerSpec_Frame_BoundType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1, 1} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1, 1} } // Exclusion specifies the type of frame exclusion. @@ -425,7 +425,7 @@ func (x *WindowerSpec_Frame_Exclusion) UnmarshalJSON(data []byte) error { return nil } func (WindowerSpec_Frame_Exclusion) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1, 2} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1, 2} } // ValuesCoreSpec is the core of a processor that has no inputs and generates @@ -445,7 +445,7 @@ func (m *ValuesCoreSpec) Reset() { *m = ValuesCoreSpec{} } func (m *ValuesCoreSpec) String() string { return proto.CompactTextString(m) } func (*ValuesCoreSpec) ProtoMessage() {} func (*ValuesCoreSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{0} + return fileDescriptor_processors_sql_684af93f61811791, []int{0} } func (m *ValuesCoreSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -550,7 +550,7 @@ func (m *TableReaderSpec) Reset() { *m = TableReaderSpec{} } func (m *TableReaderSpec) String() string { return proto.CompactTextString(m) } func (*TableReaderSpec) ProtoMessage() {} func (*TableReaderSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{1} + return fileDescriptor_processors_sql_684af93f61811791, []int{1} } func (m *TableReaderSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -608,7 +608,7 @@ func (m *IndexSkipTableReaderSpec) Reset() { *m = IndexSkipTableReaderSp func (m *IndexSkipTableReaderSpec) String() string { return proto.CompactTextString(m) } func (*IndexSkipTableReaderSpec) ProtoMessage() {} func (*IndexSkipTableReaderSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{2} + return fileDescriptor_processors_sql_684af93f61811791, []int{2} } func (m *IndexSkipTableReaderSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -710,7 +710,7 @@ func (m *JoinReaderSpec) Reset() { *m = JoinReaderSpec{} } func (m *JoinReaderSpec) String() string { return proto.CompactTextString(m) } func (*JoinReaderSpec) ProtoMessage() {} func (*JoinReaderSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{3} + return fileDescriptor_processors_sql_684af93f61811791, []int{3} } func (m *JoinReaderSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -754,7 +754,7 @@ func (m *SorterSpec) Reset() { *m = SorterSpec{} } func (m *SorterSpec) String() string { return proto.CompactTextString(m) } func (*SorterSpec) ProtoMessage() {} func (*SorterSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{4} + return fileDescriptor_processors_sql_684af93f61811791, []int{4} } func (m *SorterSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -816,7 +816,7 @@ func (m *DistinctSpec) Reset() { *m = DistinctSpec{} } func (m *DistinctSpec) String() string { return proto.CompactTextString(m) } func (*DistinctSpec) ProtoMessage() {} func (*DistinctSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{5} + return fileDescriptor_processors_sql_684af93f61811791, []int{5} } func (m *DistinctSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -851,7 +851,7 @@ func (m *OrdinalitySpec) Reset() { *m = OrdinalitySpec{} } func (m *OrdinalitySpec) String() string { return proto.CompactTextString(m) } func (*OrdinalitySpec) ProtoMessage() {} func (*OrdinalitySpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{6} + return fileDescriptor_processors_sql_684af93f61811791, []int{6} } func (m *OrdinalitySpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -909,7 +909,7 @@ func (m *ZigzagJoinerSpec) Reset() { *m = ZigzagJoinerSpec{} } func (m *ZigzagJoinerSpec) String() string { return proto.CompactTextString(m) } func (*ZigzagJoinerSpec) ProtoMessage() {} func (*ZigzagJoinerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{7} + return fileDescriptor_processors_sql_684af93f61811791, []int{7} } func (m *ZigzagJoinerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -985,7 +985,7 @@ func (m *MergeJoinerSpec) Reset() { *m = MergeJoinerSpec{} } func (m *MergeJoinerSpec) String() string { return proto.CompactTextString(m) } func (*MergeJoinerSpec) ProtoMessage() {} func (*MergeJoinerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{8} + return fileDescriptor_processors_sql_684af93f61811791, []int{8} } func (m *MergeJoinerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1072,7 +1072,7 @@ func (m *HashJoinerSpec) Reset() { *m = HashJoinerSpec{} } func (m *HashJoinerSpec) String() string { return proto.CompactTextString(m) } func (*HashJoinerSpec) ProtoMessage() {} func (*HashJoinerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{9} + return fileDescriptor_processors_sql_684af93f61811791, []int{9} } func (m *HashJoinerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1133,16 +1133,18 @@ type InvertedJoinerSpec struct { // Index of the column in the input stream that is to be joined with // the inverted index. LookupColumn uint32 `protobuf:"varint,3,opt,name=lookup_column,json=lookupColumn" json:"lookup_column"` - // Expression involving only the indexed column and the lookup column, where - // @1 refers to the lookup column and @2 to the indexed column. The - // expression is used to construct an implementation of - // RowToInvertedIndexExpr which will be fed each input row and output - // an expression to evaluate over the inverted index. + // Expression involving only the indexed column and the lookup column. + // Assuming that the input stream has N columns and the table that has been + // indexed has M columns, in this expression variables @1 to @N refer to + // columns of the input stream and variables @(N+1) to @(N+M) refer to + // columns in the table. Although the numbering includes all columns, only + // columns corresponding to the indexed column and the lookup column may be + // present in this expression. Note that the column numbering matches the + // numbering used below by the on expression. // - // TODO(sumeer): RowToInvertedIndexExpr will be added with the - // invertedJoiner implementation. And update this comment when all the - // expression generation machinery is in place to refer to actual code - // abstractions. + // The expression is passed to xform.NewDatumToInvertedExpr to construct an + // implementation of invertedexpr.DatumToInvertedExpr, which will be fed each + // input row and output an expression to evaluate over the inverted index. InvertedExpr Expression `protobuf:"bytes,4,opt,name=inverted_expr,json=invertedExpr" json:"inverted_expr"` // Optional expression involving the columns in the index (other than the // inverted column) and the columns in the input stream. Assuming that the @@ -1151,7 +1153,8 @@ type InvertedJoinerSpec struct { // input stream and variables @(N+1) to @(N+M) refer to columns in the // table. The numbering does not omit the column in the table corresponding // to the inverted column, or other table columns absent from the index, but - // they cannot be present in this expression. + // they cannot be present in this expression. Note that the column numbering + // matches the numbering used above by the inverted expression. OnExpr Expression `protobuf:"bytes,5,opt,name=on_expr,json=onExpr" json:"on_expr"` // Only INNER, LEFT_OUTER, LEFT_SEMI, LEFT_ANTI are supported. For indexes // that produce false positives for user expressions, like geospatial @@ -1166,7 +1169,7 @@ func (m *InvertedJoinerSpec) Reset() { *m = InvertedJoinerSpec{} } func (m *InvertedJoinerSpec) String() string { return proto.CompactTextString(m) } func (*InvertedJoinerSpec) ProtoMessage() {} func (*InvertedJoinerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{10} + return fileDescriptor_processors_sql_684af93f61811791, []int{10} } func (m *InvertedJoinerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1218,7 +1221,7 @@ func (m *InvertedFiltererSpec) Reset() { *m = InvertedFiltererSpec{} } func (m *InvertedFiltererSpec) String() string { return proto.CompactTextString(m) } func (*InvertedFiltererSpec) ProtoMessage() {} func (*InvertedFiltererSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{11} + return fileDescriptor_processors_sql_684af93f61811791, []int{11} } func (m *InvertedFiltererSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1264,7 +1267,7 @@ func (m *AggregatorSpec) Reset() { *m = AggregatorSpec{} } func (m *AggregatorSpec) String() string { return proto.CompactTextString(m) } func (*AggregatorSpec) ProtoMessage() {} func (*AggregatorSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{12} + return fileDescriptor_processors_sql_684af93f61811791, []int{12} } func (m *AggregatorSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1315,7 +1318,7 @@ func (m *AggregatorSpec_Aggregation) Reset() { *m = AggregatorSpec_Aggre func (m *AggregatorSpec_Aggregation) String() string { return proto.CompactTextString(m) } func (*AggregatorSpec_Aggregation) ProtoMessage() {} func (*AggregatorSpec_Aggregation) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{12, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{12, 0} } func (m *AggregatorSpec_Aggregation) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1390,7 +1393,7 @@ func (m *InterleavedReaderJoinerSpec) Reset() { *m = InterleavedReaderJo func (m *InterleavedReaderJoinerSpec) String() string { return proto.CompactTextString(m) } func (*InterleavedReaderJoinerSpec) ProtoMessage() {} func (*InterleavedReaderJoinerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{13} + return fileDescriptor_processors_sql_684af93f61811791, []int{13} } func (m *InterleavedReaderJoinerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1446,7 +1449,7 @@ func (m *InterleavedReaderJoinerSpec_Table) Reset() { *m = InterleavedRe func (m *InterleavedReaderJoinerSpec_Table) String() string { return proto.CompactTextString(m) } func (*InterleavedReaderJoinerSpec_Table) ProtoMessage() {} func (*InterleavedReaderJoinerSpec_Table) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{13, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{13, 0} } func (m *InterleavedReaderJoinerSpec_Table) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1486,7 +1489,7 @@ func (m *ProjectSetSpec) Reset() { *m = ProjectSetSpec{} } func (m *ProjectSetSpec) String() string { return proto.CompactTextString(m) } func (*ProjectSetSpec) ProtoMessage() {} func (*ProjectSetSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{14} + return fileDescriptor_processors_sql_684af93f61811791, []int{14} } func (m *ProjectSetSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1528,7 +1531,7 @@ func (m *WindowerSpec) Reset() { *m = WindowerSpec{} } func (m *WindowerSpec) String() string { return proto.CompactTextString(m) } func (*WindowerSpec) ProtoMessage() {} func (*WindowerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15} + return fileDescriptor_processors_sql_684af93f61811791, []int{15} } func (m *WindowerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1564,7 +1567,7 @@ func (m *WindowerSpec_Func) Reset() { *m = WindowerSpec_Func{} } func (m *WindowerSpec_Func) String() string { return proto.CompactTextString(m) } func (*WindowerSpec_Func) ProtoMessage() {} func (*WindowerSpec_Func) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 0} } func (m *WindowerSpec_Func) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1600,7 +1603,7 @@ func (m *WindowerSpec_Frame) Reset() { *m = WindowerSpec_Frame{} } func (m *WindowerSpec_Frame) String() string { return proto.CompactTextString(m) } func (*WindowerSpec_Frame) ProtoMessage() {} func (*WindowerSpec_Frame) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1} } func (m *WindowerSpec_Frame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1641,7 +1644,7 @@ func (m *WindowerSpec_Frame_Bound) Reset() { *m = WindowerSpec_Frame_Bou func (m *WindowerSpec_Frame_Bound) String() string { return proto.CompactTextString(m) } func (*WindowerSpec_Frame_Bound) ProtoMessage() {} func (*WindowerSpec_Frame_Bound) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1, 0} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1, 0} } func (m *WindowerSpec_Frame_Bound) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1677,7 +1680,7 @@ func (m *WindowerSpec_Frame_Bounds) Reset() { *m = WindowerSpec_Frame_Bo func (m *WindowerSpec_Frame_Bounds) String() string { return proto.CompactTextString(m) } func (*WindowerSpec_Frame_Bounds) ProtoMessage() {} func (*WindowerSpec_Frame_Bounds) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 1, 1} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 1, 1} } func (m *WindowerSpec_Frame_Bounds) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1727,7 +1730,7 @@ func (m *WindowerSpec_WindowFn) Reset() { *m = WindowerSpec_WindowFn{} } func (m *WindowerSpec_WindowFn) String() string { return proto.CompactTextString(m) } func (*WindowerSpec_WindowFn) ProtoMessage() {} func (*WindowerSpec_WindowFn) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_sql_fccc336145676e72, []int{15, 2} + return fileDescriptor_processors_sql_684af93f61811791, []int{15, 2} } func (m *WindowerSpec_WindowFn) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -7930,10 +7933,10 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/processors_sql.proto", fileDescriptor_processors_sql_fccc336145676e72) + proto.RegisterFile("sql/execinfrapb/processors_sql.proto", fileDescriptor_processors_sql_684af93f61811791) } -var fileDescriptor_processors_sql_fccc336145676e72 = []byte{ +var fileDescriptor_processors_sql_684af93f61811791 = []byte{ // 2645 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x5a, 0x4b, 0x73, 0x1b, 0xc7, 0x11, 0xe6, 0xe2, 0x41, 0x02, 0x8d, 0x07, 0x57, 0x23, 0xda, 0x82, 0x21, 0x17, 0x45, 0xc1, 0x2f, diff --git a/pkg/sql/execinfrapb/processors_sql.proto b/pkg/sql/execinfrapb/processors_sql.proto index d997c266ba74..7711e1c5f233 100644 --- a/pkg/sql/execinfrapb/processors_sql.proto +++ b/pkg/sql/execinfrapb/processors_sql.proto @@ -499,16 +499,18 @@ message InvertedJoinerSpec { // The join expression is a conjunction of inverted_expr and on_expr. - // Expression involving only the indexed column and the lookup column, where - // @1 refers to the lookup column and @2 to the indexed column. The - // expression is used to construct an implementation of - // RowToInvertedIndexExpr which will be fed each input row and output - // an expression to evaluate over the inverted index. + // Expression involving only the indexed column and the lookup column. + // Assuming that the input stream has N columns and the table that has been + // indexed has M columns, in this expression variables @1 to @N refer to + // columns of the input stream and variables @(N+1) to @(N+M) refer to + // columns in the table. Although the numbering includes all columns, only + // columns corresponding to the indexed column and the lookup column may be + // present in this expression. Note that the column numbering matches the + // numbering used below by the on expression. // - // TODO(sumeer): RowToInvertedIndexExpr will be added with the - // invertedJoiner implementation. And update this comment when all the - // expression generation machinery is in place to refer to actual code - // abstractions. + // The expression is passed to xform.NewDatumToInvertedExpr to construct an + // implementation of invertedexpr.DatumToInvertedExpr, which will be fed each + // input row and output an expression to evaluate over the inverted index. optional Expression inverted_expr = 4 [(gogoproto.nullable) = false]; // Optional expression involving the columns in the index (other than the @@ -518,7 +520,8 @@ message InvertedJoinerSpec { // input stream and variables @(N+1) to @(N+M) refer to columns in the // table. The numbering does not omit the column in the table corresponding // to the inverted column, or other table columns absent from the index, but - // they cannot be present in this expression. + // they cannot be present in this expression. Note that the column numbering + // matches the numbering used above by the inverted expression. optional Expression on_expr = 5 [(gogoproto.nullable) = false]; // Only INNER, LEFT_OUTER, LEFT_SEMI, LEFT_ANTI are supported. For indexes diff --git a/pkg/sql/join_predicate.go b/pkg/sql/join_predicate.go index b71d969ffc0c..9c84f1522515 100644 --- a/pkg/sql/join_predicate.go +++ b/pkg/sql/join_predicate.go @@ -11,6 +11,7 @@ package sql import ( + "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -36,8 +37,8 @@ type joinPredicate struct { // on the left and right input row arrays, respectively. // Only columns with the same left and right value types can be equality // columns. - leftEqualityIndices []int - rightEqualityIndices []int + leftEqualityIndices []exec.NodeColumnOrdinal + rightEqualityIndices []exec.NodeColumnOrdinal // The list of names for the columns listed in leftEqualityIndices. // Used mainly for pretty-printing. @@ -67,14 +68,14 @@ type joinPredicate struct { rightEqKey bool } -// makePredicate constructs a joinPredicate object for joins. The equality -// columns / on condition must be initialized separately. -func makePredicate(joinType sqlbase.JoinType, left, right sqlbase.ResultColumns) *joinPredicate { +// getJoinResultColumns returns the result columns of a join. +func getJoinResultColumns( + joinType sqlbase.JoinType, left, right sqlbase.ResultColumns, +) sqlbase.ResultColumns { // For anti and semi joins, the right columns are omitted from the output (but // they must be available internally for the ON condition evaluation). omitRightColumns := joinType == sqlbase.LeftSemiJoin || joinType == sqlbase.LeftAntiJoin - // Prepare the metadata for the result columns. // The structure of the join data source results is like this: // - all the left columns, // - then all the right columns (except for anti/semi join). @@ -83,14 +84,19 @@ func makePredicate(joinType sqlbase.JoinType, left, right sqlbase.ResultColumns) if !omitRightColumns { columns = append(columns, right...) } + return columns +} +// makePredicate constructs a joinPredicate object for joins. The equality +// columns / on condition must be initialized separately. +func makePredicate(joinType sqlbase.JoinType, left, right sqlbase.ResultColumns) *joinPredicate { pred := &joinPredicate{ joinType: joinType, numLeftCols: len(left), numRightCols: len(right), leftCols: left, rightCols: right, - cols: columns, + cols: getJoinResultColumns(joinType, left, right), } // We must initialize the indexed var helper in all cases, even when // there is no on condition, so that getNeededColumns() does not get diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_join b/pkg/sql/logictest/testdata/logic_test/distsql_join index 18621c83cd9e..e887ba7f3346 100644 --- a/pkg/sql/logictest/testdata/logic_test/distsql_join +++ b/pkg/sql/logictest/testdata/logic_test/distsql_join @@ -54,8 +54,3 @@ query III SELECT pk, a, b FROM tab0 WHERE a < 10 AND b = 2 ORDER BY a DESC, pk; ---- 0 1 2 - -query T -SELECT feature_name FROM crdb_internal.feature_usage WHERE feature_name='sql.exec.query.is-distributed' AND usage_count > 0 ----- -sql.exec.query.is-distributed diff --git a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning index 759be4efda96..fb5ab3231dc0 100644 --- a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning +++ b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning @@ -60,3 +60,15 @@ SELECT * FROM kv WHERE k > v ---- 2 1 3 2 + +statement ok +INSERT INTO kv VALUES (4, NULL), (5, 3) + +query I +SELECT v FROM kv ORDER BY k +---- +1 +1 +2 +NULL +3 diff --git a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node index 2479b65385b5..47910818b9b3 100644 --- a/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node +++ b/pkg/sql/logictest/testdata/logic_test/experimental_distsql_planning_5node @@ -142,3 +142,23 @@ EXPLAIN SELECT k::REGCLASS FROM kv ---- · distribution partial · vectorized true + +# Check that hash join is supported by the new factory. +query II rowsort +SELECT kv.k, v FROM kv, kw WHERE v = w +---- +1 1 +2 2 +3 3 +4 4 +5 5 + +# Check that merge join is supported by the new factory. +query I +SELECT kv.k FROM kv, kw WHERE kv.k = kw.k ORDER BY 1 +---- +1 +2 +3 +4 +5 diff --git a/pkg/sql/logictest/testdata/logic_test/hash_join b/pkg/sql/logictest/testdata/logic_test/hash_join index 736ff50ee9d6..a3d68ebba65b 100644 --- a/pkg/sql/logictest/testdata/logic_test/hash_join +++ b/pkg/sql/logictest/testdata/logic_test/hash_join @@ -1,3 +1,5 @@ +# LogicTest: default-configs local-spec-planning fakedist-spec-planning + statement ok CREATE TABLE t1 (k INT PRIMARY KEY, v INT) diff --git a/pkg/sql/logictest/testdata/logic_test/hash_join_dist b/pkg/sql/logictest/testdata/logic_test/hash_join_dist index 5729c5de53c1..d98690996c4d 100644 --- a/pkg/sql/logictest/testdata/logic_test/hash_join_dist +++ b/pkg/sql/logictest/testdata/logic_test/hash_join_dist @@ -66,3 +66,8 @@ SELECT small.b, large.d FROM large RIGHT HASH JOIN small ON small.b = large.c AN 9 NULL 12 24 15 NULL + +query T +SELECT feature_name FROM crdb_internal.feature_usage WHERE feature_name='sql.exec.query.is-distributed' AND usage_count > 0 +---- +sql.exec.query.is-distributed diff --git a/pkg/sql/logictest/testdata/logic_test/merge_join b/pkg/sql/logictest/testdata/logic_test/merge_join index d3ea189fc18f..14de3c603a13 100644 --- a/pkg/sql/logictest/testdata/logic_test/merge_join +++ b/pkg/sql/logictest/testdata/logic_test/merge_join @@ -1,3 +1,5 @@ +# LogicTest: default-configs local-spec-planning fakedist-spec-planning + # Basic tables, no nulls statement ok diff --git a/pkg/sql/opt/bench/stub_factory.go b/pkg/sql/opt/bench/stub_factory.go index d984e42627f9..bfe965dfb5c9 100644 --- a/pkg/sql/opt/bench/stub_factory.go +++ b/pkg/sql/opt/bench/stub_factory.go @@ -11,7 +11,6 @@ package bench import ( - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -165,13 +164,13 @@ func (f *stubFactory) ConstructLookupJoin( return struct{}{}, nil } -func (f *stubFactory) ConstructGeoLookupJoin( +func (f *stubFactory) ConstructInvertedJoin( joinType sqlbase.JoinType, - geoRelationshipType geoindex.RelationshipType, + invertedExpr tree.TypedExpr, input exec.Node, table cat.Table, index cat.Index, - geoCol exec.NodeColumnOrdinal, + inputCol exec.NodeColumnOrdinal, lookupCols exec.TableColumnOrdinalSet, onCond tree.TypedExpr, reqOrdering exec.OutputOrdering, diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index d89ab2562618..9badf367b049 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -215,8 +215,8 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { case *memo.LookupJoinExpr: ep, err = b.buildLookupJoin(t) - case *memo.GeoLookupJoinExpr: - ep, err = b.buildGeoLookupJoin(t) + case *memo.InvertedJoinExpr: + ep, err = b.buildInvertedJoin(t) case *memo.ZigzagJoinExpr: ep, err = b.buildZigzagJoin(t) @@ -1361,7 +1361,7 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) { return res, nil } -func (b *Builder) buildGeoLookupJoin(join *memo.GeoLookupJoinExpr) (execPlan, error) { +func (b *Builder) buildInvertedJoin(join *memo.InvertedJoinExpr) (execPlan, error) { input, err := b.buildRelational(join.Input) if err != nil { return execPlan{}, err @@ -1385,6 +1385,10 @@ func (b *Builder) buildGeoLookupJoin(join *memo.GeoLookupJoinExpr) (execPlan, er ivh: tree.MakeIndexedVarHelper(nil /* container */, allCols.Len()), ivarMap: allCols, } + invertedExpr, err := b.buildScalar(&ctx, join.InvertedExpr) + if err != nil { + return execPlan{}, err + } onExpr, err := b.buildScalar(&ctx, &join.On) if err != nil { return execPlan{}, err @@ -1393,13 +1397,13 @@ func (b *Builder) buildGeoLookupJoin(join *memo.GeoLookupJoinExpr) (execPlan, er tab := md.Table(join.Table) idx := tab.Index(join.Index) - res.root, err = b.factory.ConstructGeoLookupJoin( + res.root, err = b.factory.ConstructInvertedJoin( joinOpToJoinType(join.JoinType), - join.GeoRelationshipType, + invertedExpr, input.root, tab, idx, - input.getNodeColumnOrdinal(join.GeoCol), + input.getNodeColumnOrdinal(join.InputCol), lookupOrdinals, onExpr, res.reqOrdering(join), diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index f8c9ae5bd6eb..2de8a6920507 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -13,7 +13,6 @@ package exec import ( "context" - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -232,22 +231,22 @@ type Factory interface { reqOrdering OutputOrdering, ) (Node, error) - // ConstructGeoLookupJoin returns a node that performs a geospatial lookup - // join. geoRelationshipType describes the type of geospatial relationship - // represented by the join. geoCol is the geospatial column from the input - // that will be used to look up into the index; lookupCols are ordinals for - // the table columns we are retrieving. + // ConstructInvertedJoin returns a node that performs an inverted join. + // invertedExpr is used along with inputCol (a column from the input) to + // find the keys to look up in the index; lookupCols are ordinals for the + // table columns we are retrieving. // // The node produces the columns in the input and (unless join type is // LeftSemiJoin or LeftAntiJoin) the lookupCols, ordered by ordinal. The ON - // condition can refer to these using IndexedVars. - ConstructGeoLookupJoin( + // condition can refer to these using IndexedVars. Note that lookupCols does + // not include the inverted column. + ConstructInvertedJoin( joinType sqlbase.JoinType, - geoRelationshipType geoindex.RelationshipType, + invertedExpr tree.TypedExpr, input Node, table cat.Table, index cat.Index, - geoCol NodeColumnOrdinal, + inputCol NodeColumnOrdinal, lookupCols TableColumnOrdinalSet, onCond tree.TypedExpr, reqOrdering OutputOrdering, diff --git a/pkg/sql/opt/invertedexpr/expression.go b/pkg/sql/opt/invertedexpr/expression.go index d25717ac5e5b..9c16194ae50b 100644 --- a/pkg/sql/opt/invertedexpr/expression.go +++ b/pkg/sql/opt/invertedexpr/expression.go @@ -12,13 +12,26 @@ package invertedexpr import ( "bytes" + "context" "fmt" "strconv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/treeprinter" ) +// DatumToInvertedExpr is an interface that is used by the +// rowexec.invertedJoiner to extract a SpanExpressionProto given an +// input row. The rowexec.invertedJoiner calls Convert and uses the resulting +// SpanExpressionProto.SpansToRead to determine which spans to read from the +// inverted index. Then it computes a set expression on the scanned rows as +// defined by the SpanExpressionProto.Node. +type DatumToInvertedExpr interface { + // Convert uses the lookup column to construct an inverted expression. + Convert(context.Context, sqlbase.EncDatum) (*SpanExpressionProto, error) +} + // EncInvertedVal is the encoded form of a value in the inverted column. // This library does not care about how the value is encoded. The following // encoding comment is only relevant for integration purposes, and to justify diff --git a/pkg/sql/opt/memo/expr.go b/pkg/sql/opt/memo/expr.go index 4ff3a38eeb6e..d89f5f8e35ca 100644 --- a/pkg/sql/opt/memo/expr.go +++ b/pkg/sql/opt/memo/expr.go @@ -453,7 +453,7 @@ func (lj *LookupJoinExpr) initUnexportedFields(mem *Memo) { // lookupProps are initialized as necessary by the logical props builder. } -func (gj *GeoLookupJoinExpr) initUnexportedFields(mem *Memo) { +func (gj *InvertedJoinExpr) initUnexportedFields(mem *Memo) { // lookupProps are initialized as necessary by the logical props builder. } diff --git a/pkg/sql/opt/memo/expr_format.go b/pkg/sql/opt/memo/expr_format.go index 762538566089..8edfca81a236 100644 --- a/pkg/sql/opt/memo/expr_format.go +++ b/pkg/sql/opt/memo/expr_format.go @@ -192,8 +192,8 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) { FormatPrivate(f, e.Private(), required) f.Buffer.WriteByte(')') - case *GeoLookupJoinExpr: - fmt.Fprintf(f.Buffer, "%v (geo-lookup", t.JoinType) + case *InvertedJoinExpr: + fmt.Fprintf(f.Buffer, "%v (inverted-lookup", t.JoinType) FormatPrivate(f, e.Private(), required) f.Buffer.WriteByte(')') @@ -451,11 +451,12 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) { tp.Childf("lookup columns are key") } - case *GeoLookupJoinExpr: + case *InvertedJoinExpr: if !t.Flags.Empty() { tp.Childf("flags: %s", t.Flags.String()) } - tp.Childf("geo-relationship: %v", t.GeoRelationshipType) + n := tp.Child("inverted-expr") + f.formatExpr(t.InvertedExpr, n) case *ZigzagJoinExpr: if !f.HasFlags(ExprFmtHideColumns) { @@ -1253,7 +1254,7 @@ func FormatPrivate(f *ExprFmtCtx, private interface{}, physProps *physical.Requi fmt.Fprintf(f.Buffer, " %s@%s", tab.Name(), tab.Index(t.Index).Name()) } - case *GeoLookupJoinPrivate: + case *InvertedJoinPrivate: tab := f.Memo.metadata.Table(t.Table) fmt.Fprintf(f.Buffer, " %s@%s", tab.Name(), tab.Index(t.Index).Name()) diff --git a/pkg/sql/opt/memo/interner.go b/pkg/sql/opt/memo/interner.go index e73474d5c053..c88a58cfd573 100644 --- a/pkg/sql/opt/memo/interner.go +++ b/pkg/sql/opt/memo/interner.go @@ -17,7 +17,6 @@ import ( "reflect" "unsafe" - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/props/physical" @@ -561,10 +560,6 @@ func (h *hasher) HashLockingItem(val *tree.LockingItem) { } } -func (h *hasher) HashGeoRelationshipType(val geoindex.RelationshipType) { - h.HashUint64(uint64(val)) -} - func (h *hasher) HashRelExpr(val RelExpr) { h.HashUint64(uint64(reflect.ValueOf(val).Pointer())) } @@ -906,10 +901,6 @@ func (h *hasher) IsLockingItemEqual(l, r *tree.LockingItem) bool { return l.Strength == r.Strength && l.WaitPolicy == r.WaitPolicy } -func (h *hasher) IsGeoRelationshipTypeEqual(l, r geoindex.RelationshipType) bool { - return l == r -} - func (h *hasher) IsPointerEqual(l, r unsafe.Pointer) bool { return l == r } diff --git a/pkg/sql/opt/memo/logical_props_builder.go b/pkg/sql/opt/memo/logical_props_builder.go index ed0aaa16f88c..89e342c61b46 100644 --- a/pkg/sql/opt/memo/logical_props_builder.go +++ b/pkg/sql/opt/memo/logical_props_builder.go @@ -399,8 +399,8 @@ func (b *logicalPropsBuilder) buildLookupJoinProps(join *LookupJoinExpr, rel *pr b.buildJoinProps(join, rel) } -func (b *logicalPropsBuilder) buildGeoLookupJoinProps( - join *GeoLookupJoinExpr, rel *props.Relational, +func (b *logicalPropsBuilder) buildInvertedJoinProps( + join *InvertedJoinExpr, rel *props.Relational, ) { b.buildJoinProps(join, rel) } @@ -1691,11 +1691,9 @@ func ensureLookupJoinInputProps(join *LookupJoinExpr, sb *statisticsBuilder) *pr return relational } -// ensureGeoLookupJoinInputProps lazily populates the relational properties +// ensureInvertedJoinInputProps lazily populates the relational properties // that apply to the lookup side of the join, as if it were a Scan operator. -func ensureGeoLookupJoinInputProps( - join *GeoLookupJoinExpr, sb *statisticsBuilder, -) *props.Relational { +func ensureInvertedJoinInputProps(join *InvertedJoinExpr, sb *statisticsBuilder) *props.Relational { relational := &join.lookupProps if relational.OutputCols.Empty() { md := join.Memo().Metadata() @@ -1833,16 +1831,16 @@ func (h *joinPropsHelper) init(b *logicalPropsBuilder, joinExpr RelExpr) { h.filterIsTrue = false h.filterIsFalse = h.filters.IsFalse() - case *GeoLookupJoinExpr: + case *InvertedJoinExpr: h.leftProps = joinExpr.Child(0).(RelExpr).Relational() - ensureGeoLookupJoinInputProps(join, &b.sb) + ensureInvertedJoinInputProps(join, &b.sb) h.joinType = join.JoinType h.rightProps = &join.lookupProps h.filters = join.On b.addFiltersToFuncDep(h.filters, &h.filtersFD) h.filterNotNullCols = b.rejectNullCols(h.filters) - // Geospatial lookup join always has a filter condition on the index keys. + // Inverted join always has a filter condition on the index keys. h.filterIsTrue = false h.filterIsFalse = h.filters.IsFalse() diff --git a/pkg/sql/opt/memo/statistics_builder.go b/pkg/sql/opt/memo/statistics_builder.go index 5b59194ed265..38ad49004409 100644 --- a/pkg/sql/opt/memo/statistics_builder.go +++ b/pkg/sql/opt/memo/statistics_builder.go @@ -210,8 +210,8 @@ func (sb *statisticsBuilder) availabilityFromInput(e RelExpr) bool { ensureLookupJoinInputProps(t, sb) return t.lookupProps.Stats.Available && t.Input.Relational().Stats.Available - case *GeoLookupJoinExpr: - ensureGeoLookupJoinInputProps(t, sb) + case *InvertedJoinExpr: + ensureInvertedJoinInputProps(t, sb) return t.lookupProps.Stats.Available && t.Input.Relational().Stats.Available case *ZigzagJoinExpr: @@ -240,7 +240,7 @@ func (sb *statisticsBuilder) colStatFromInput( colSet opt.ColSet, e RelExpr, ) (*props.ColumnStatistic, *props.Statistics) { var lookupJoin *LookupJoinExpr - var geospatialLookupJoin *GeoLookupJoinExpr + var invertedJoin *InvertedJoinExpr var zigzagJoin *ZigzagJoinExpr switch t := e.(type) { @@ -254,16 +254,16 @@ func (sb *statisticsBuilder) colStatFromInput( lookupJoin = t ensureLookupJoinInputProps(lookupJoin, sb) - case *GeoLookupJoinExpr: - geospatialLookupJoin = t - ensureGeoLookupJoinInputProps(geospatialLookupJoin, sb) + case *InvertedJoinExpr: + invertedJoin = t + ensureInvertedJoinInputProps(invertedJoin, sb) case *ZigzagJoinExpr: zigzagJoin = t ensureZigzagJoinInputProps(zigzagJoin, sb) } - if lookupJoin != nil || geospatialLookupJoin != nil || zigzagJoin != nil || + if lookupJoin != nil || invertedJoin != nil || zigzagJoin != nil || opt.IsJoinOp(e) || e.Op() == opt.MergeJoinOp { var leftProps *props.Relational if zigzagJoin != nil { @@ -276,8 +276,8 @@ func (sb *statisticsBuilder) colStatFromInput( var intersectsRight bool if lookupJoin != nil { intersectsRight = lookupJoin.lookupProps.OutputCols.Intersects(colSet) - } else if geospatialLookupJoin != nil { - intersectsRight = geospatialLookupJoin.lookupProps.OutputCols.Intersects(colSet) + } else if invertedJoin != nil { + intersectsRight = invertedJoin.lookupProps.OutputCols.Intersects(colSet) } else if zigzagJoin != nil { intersectsRight = zigzagJoin.rightProps.OutputCols.Intersects(colSet) } else { @@ -300,10 +300,10 @@ func (sb *statisticsBuilder) colStatFromInput( return sb.colStatTable(lookupJoin.Table, colSet), sb.makeTableStatistics(lookupJoin.Table) } - if geospatialLookupJoin != nil { + if invertedJoin != nil { // TODO(rytaft): use inverted index stats when available. - return sb.colStatTable(geospatialLookupJoin.Table, colSet), - sb.makeTableStatistics(geospatialLookupJoin.Table) + return sb.colStatTable(invertedJoin.Table, colSet), + sb.makeTableStatistics(invertedJoin.Table) } if zigzagJoin != nil { return sb.colStatTable(zigzagJoin.RightTable, colSet), @@ -360,7 +360,7 @@ func (sb *statisticsBuilder) colStat(colSet opt.ColSet, e RelExpr) *props.Column case opt.InnerJoinOp, opt.LeftJoinOp, opt.RightJoinOp, opt.FullJoinOp, opt.SemiJoinOp, opt.AntiJoinOp, opt.InnerJoinApplyOp, opt.LeftJoinApplyOp, opt.SemiJoinApplyOp, opt.AntiJoinApplyOp, opt.MergeJoinOp, opt.LookupJoinOp, - opt.GeoLookupJoinOp, opt.ZigzagJoinOp: + opt.InvertedJoinOp, opt.ZigzagJoinOp: return sb.colStatJoin(colSet, e) case opt.IndexJoinOp: @@ -1009,8 +1009,8 @@ func (sb *statisticsBuilder) buildJoin( s.ApplySelectivity(sb.selectivityFromEquivalencies(equivReps, &h.filtersFD, join, s)) } - if join.Op() == opt.GeoLookupJoinOp { - s.ApplySelectivity(sb.selectivityFromGeoRelationship(join, s)) + if join.Op() == opt.InvertedJoinOp { + s.ApplySelectivity(sb.selectivityFromInvertedJoinCondition(join, s)) } s.ApplySelectivity(sb.selectivityFromHistograms(histCols, join, s)) s.ApplySelectivity(sb.selectivityFromMultiColDistinctCounts( @@ -1154,10 +1154,10 @@ func (sb *statisticsBuilder) colStatJoin(colSet opt.ColSet, join RelExpr) *props ensureLookupJoinInputProps(j, sb) rightProps = &j.lookupProps - case *GeoLookupJoinExpr: + case *InvertedJoinExpr: joinType = j.JoinType leftProps = j.Input.Relational() - ensureGeoLookupJoinInputProps(j, sb) + ensureInvertedJoinInputProps(j, sb) rightProps = &j.lookupProps case *ZigzagJoinExpr: @@ -2466,13 +2466,13 @@ func (sb *statisticsBuilder) rowsProcessed(e RelExpr) float64 { withoutOn := e.Memo().MemoizeLookupJoin(t.Input, nil /* on */, lookupJoinPrivate) return withoutOn.Relational().Stats.RowCount - case *GeoLookupJoinExpr: - var lookupJoinPrivate *GeoLookupJoinPrivate + case *InvertedJoinExpr: + var lookupJoinPrivate *InvertedJoinPrivate switch t.JoinType { case opt.SemiJoinOp, opt.SemiJoinApplyOp, opt.AntiJoinOp, opt.AntiJoinApplyOp: // The number of rows processed for semi and anti joins is closer to the // number of output rows for an equivalent inner join. - copy := t.GeoLookupJoinPrivate + copy := t.InvertedJoinPrivate copy.JoinType = semiAntiJoinToInnerJoin(t.JoinType) lookupJoinPrivate = © @@ -2482,12 +2482,12 @@ func (sb *statisticsBuilder) rowsProcessed(e RelExpr) float64 { // equals the number of output rows. return e.Relational().Stats.RowCount } - lookupJoinPrivate = &t.GeoLookupJoinPrivate + lookupJoinPrivate = &t.InvertedJoinPrivate } // We need to determine the row count of the join before the // ON conditions are applied. - withoutOn := e.Memo().MemoizeGeoLookupJoin(t.Input, nil /* on */, lookupJoinPrivate) + withoutOn := e.Memo().MemoizeInvertedJoin(t.Input, nil /* on */, lookupJoinPrivate) return withoutOn.Relational().Stats.RowCount case *MergeJoinExpr: @@ -2627,10 +2627,9 @@ const ( // it worth adding the overhead of using a histogram. minCardinalityForHistogram = 100 - // This is the default selectivity estimated for geospatial lookup joins - // until we can get better statistics on inverted indexes and geospatial - // columns. - unknownGeoRelationshipSelectivity = 1.0 / 100.0 + // This is the default selectivity estimated for inverted joins until we can + // get better statistics on inverted indexes. + unknownInvertedJoinSelectivity = 1.0 / 100.0 // multiColWeight is the weight to assign the selectivity calculation using // multi-column statistics versus the calculation using single-column @@ -3568,10 +3567,10 @@ func (sb *statisticsBuilder) selectivityFromEquivalencySemiJoin( return fraction(minDistinctCountRight, maxDistinctCountLeft) } -func (sb *statisticsBuilder) selectivityFromGeoRelationship( +func (sb *statisticsBuilder) selectivityFromInvertedJoinCondition( e RelExpr, s *props.Statistics, ) (selectivity float64) { - return unknownGeoRelationshipSelectivity + return unknownInvertedJoinSelectivity } func (sb *statisticsBuilder) selectivityFromUnappliedConjuncts( diff --git a/pkg/sql/opt/ops/relational.opt b/pkg/sql/opt/ops/relational.opt index aee61f6cd9a7..3dc074b6c64c 100644 --- a/pkg/sql/opt/ops/relational.opt +++ b/pkg/sql/opt/ops/relational.opt @@ -302,38 +302,16 @@ define LookupJoinPrivate { _ JoinPrivate } -# GeoLookupJoin represents a join between an input expression and an index, -# where the index is an inverted index on a Geometry or Geography column. -# -# A GeoLookupJoin can be generated for queries containing a join where one of -# the join conditions is a geospatial binary function such as ST_Covers or -# ST_CoveredBy, and at least one of the two inputs to the function is an -# indexed geospatial column. The type of geospatial function implies the -# GeoRelationshipType (Covers, CoveredBy or Intersects) for the join, which is -# stored in the GeoLookupJoinPrivate and affects how the join is executed. For -# a full list of the geospatial functions that can be index-accelerated and -# their corresponding GeoRelationshipTypes, see geoRelationshipMap in -# xform/custom_funcs.go. -# -# The GeoLookupJoin has no false negatives, but it may return false positives -# that would not have been returned by the original geospatial function -# join predicate. Therefore, the original function must still be applied on -# the output of the join. Since the inverted index does not actually include -# the geospatial column (or any other columns besides the primary key columns), -# the GeoLookupJoin will be wrapped in an index join. The geospatial function -# and any other filters on non-key columns will be appied as filters on the -# outer index join. +# InvertedJoin represents a join between an input expression and an inverted +# index. The type of join is in the InvertedJoinPrivate field. [Relational] -define GeoLookupJoin { +define InvertedJoin { Input RelExpr # On only contains filters on the input columns and primary key columns of - # the inverted index's base table. (Since the indexed geospatial column is - # not actually included in the index, the GeoLookupJoin must be wrapped in - # an index join, which will contain the original geospatial function as one - # of its On conditions.) + # the inverted index's base table. On FiltersExpr - _ GeoLookupJoinPrivate + _ InvertedJoinPrivate # lookupProps caches relational properties for the "table" side of the lookup # join, treating it as if it were another relational input. This makes the @@ -342,28 +320,29 @@ define GeoLookupJoin { } [Private] -define GeoLookupJoinPrivate { +define InvertedJoinPrivate { # JoinType is InnerJoin, LeftJoin, SemiJoin, or AntiJoin. JoinType Operator - # GeoRelationshipType is Covers, CoveredBy, or Intersects. - GeoRelationshipType GeoRelationshipType + # InvertedExpr is the inverted join condition. It is used to get the keys + # to lookup in the inverted index based on the value of the input column. + InvertedExpr ScalarExpr # Table identifies the table do to lookups in. Table TableID - # Index identifies the geospatial inverted index to do lookups in. It can - # be passed to the cat.Table.Index() method in order to fetch the cat.Index - # metadata. + # Index identifies the inverted index to do lookups in. It can be passed to + # the cat.Table.Index() method in order to fetch the cat.Index metadata. Index IndexOrdinal - # GeoCol is the geospatial column (produced by the input) used to - # determine the keys (i.e., s2 CellIDs) to scan in the inverted index. - GeoCol ColumnID + # InputCol is the column (produced by the input) that will be bound to + # InvertedExpr and used to determine the keys to scan in the inverted + # index. + InputCol ColumnID - # Cols is the set of columns produced by the geospatial lookup join. This - # set can contain columns from the input and columns from the index. Any - # columns not in the input are retrieved from the index. + # Cols is the set of columns produced by the inverted join. This set can + # contain columns from the input and columns from the index. Any columns + # not in the input are retrieved from the index. Cols ColSet _ JoinPrivate } diff --git a/pkg/sql/opt/optgen/cmd/optgen/exprs_gen.go b/pkg/sql/opt/optgen/cmd/optgen/exprs_gen.go index 7000680399ee..323780f1be6f 100644 --- a/pkg/sql/opt/optgen/cmd/optgen/exprs_gen.go +++ b/pkg/sql/opt/optgen/cmd/optgen/exprs_gen.go @@ -35,7 +35,6 @@ func (g *exprsGen) generate(compiled *lang.CompiledExpr, w io.Writer) { fmt.Fprintf(g.w, "import (\n") fmt.Fprintf(g.w, " \"unsafe\"\n") fmt.Fprintf(g.w, "\n") - fmt.Fprintf(g.w, " \"github.com/cockroachdb/cockroach/pkg/geo/geoindex\"\n") fmt.Fprintf(g.w, " \"github.com/cockroachdb/cockroach/pkg/sql/opt\"\n") fmt.Fprintf(g.w, " \"github.com/cockroachdb/cockroach/pkg/sql/opt/cat\"\n") fmt.Fprintf(g.w, " \"github.com/cockroachdb/cockroach/pkg/sql/opt/constraint\"\n") diff --git a/pkg/sql/opt/optgen/cmd/optgen/metadata.go b/pkg/sql/opt/optgen/cmd/optgen/metadata.go index 01345fddc995..49e0db977586 100644 --- a/pkg/sql/opt/optgen/cmd/optgen/metadata.go +++ b/pkg/sql/opt/optgen/cmd/optgen/metadata.go @@ -185,56 +185,55 @@ func newMetadata(compiled *lang.CompiledExpr, pkg string) *metadata { // Add all types used in Optgen defines here. md.types = map[string]*typeDef{ - "RelExpr": {fullName: "memo.RelExpr", isExpr: true, isInterface: true}, - "Expr": {fullName: "opt.Expr", isExpr: true, isInterface: true}, - "ScalarExpr": {fullName: "opt.ScalarExpr", isExpr: true, isInterface: true}, - "Operator": {fullName: "opt.Operator", passByVal: true}, - "ColumnID": {fullName: "opt.ColumnID", passByVal: true}, - "ColSet": {fullName: "opt.ColSet", passByVal: true}, - "ColList": {fullName: "opt.ColList", passByVal: true}, - "TableID": {fullName: "opt.TableID", passByVal: true}, - "SchemaID": {fullName: "opt.SchemaID", passByVal: true}, - "SequenceID": {fullName: "opt.SequenceID", passByVal: true}, - "UniqueID": {fullName: "opt.UniqueID", passByVal: true}, - "WithID": {fullName: "opt.WithID", passByVal: true}, - "Ordering": {fullName: "opt.Ordering", passByVal: true}, - "OrderingChoice": {fullName: "physical.OrderingChoice", passByVal: true}, - "TupleOrdinal": {fullName: "memo.TupleOrdinal", passByVal: true}, - "ScanLimit": {fullName: "memo.ScanLimit", passByVal: true}, - "ScanFlags": {fullName: "memo.ScanFlags", passByVal: true}, - "JoinFlags": {fullName: "memo.JoinFlags", passByVal: true}, - "WindowFrame": {fullName: "memo.WindowFrame", passByVal: true}, - "FKCascades": {fullName: "memo.FKCascades", passByVal: true}, - "ExplainOptions": {fullName: "tree.ExplainOptions", passByVal: true}, - "StatementType": {fullName: "tree.StatementType", passByVal: true}, - "ShowTraceType": {fullName: "tree.ShowTraceType", passByVal: true}, - "bool": {fullName: "bool", passByVal: true}, - "int": {fullName: "int", passByVal: true}, - "string": {fullName: "string", passByVal: true}, - "Type": {fullName: "types.T", isPointer: true}, - "Datum": {fullName: "tree.Datum", isInterface: true}, - "TypedExpr": {fullName: "tree.TypedExpr", isInterface: true}, - "Statement": {fullName: "tree.Statement", isInterface: true}, - "Subquery": {fullName: "tree.Subquery", isPointer: true, usePointerIntern: true}, - "CreateTable": {fullName: "tree.CreateTable", isPointer: true, usePointerIntern: true}, - "Constraint": {fullName: "constraint.Constraint", isPointer: true, usePointerIntern: true}, - "FuncProps": {fullName: "tree.FunctionProperties", isPointer: true, usePointerIntern: true}, - "FuncOverload": {fullName: "tree.Overload", isPointer: true, usePointerIntern: true}, - "PhysProps": {fullName: "physical.Required", isPointer: true}, - "Presentation": {fullName: "physical.Presentation", passByVal: true}, - "RelProps": {fullName: "props.Relational"}, - "RelPropsPtr": {fullName: "props.Relational", isPointer: true, usePointerIntern: true}, - "ScalarProps": {fullName: "props.Scalar"}, - "FuncDepSet": {fullName: "props.FuncDepSet"}, - "JoinMultiplicity": {fullName: "props.JoinMultiplicity"}, - "OpaqueMetadata": {fullName: "opt.OpaqueMetadata", isInterface: true}, - "JobCommand": {fullName: "tree.JobCommand", passByVal: true}, - "IndexOrdinal": {fullName: "cat.IndexOrdinal", passByVal: true}, - "ViewDeps": {fullName: "opt.ViewDeps", passByVal: true}, - "LockingItem": {fullName: "tree.LockingItem", isPointer: true}, - "MaterializeClause": {fullName: "tree.MaterializeClause", passByVal: true}, - "GeoRelationshipType": {fullName: "geoindex.RelationshipType", passByVal: true}, - "SpanExpression": {fullName: "invertedexpr.SpanExpression", isPointer: true, usePointerIntern: true}, + "RelExpr": {fullName: "memo.RelExpr", isExpr: true, isInterface: true}, + "Expr": {fullName: "opt.Expr", isExpr: true, isInterface: true}, + "ScalarExpr": {fullName: "opt.ScalarExpr", isExpr: true, isInterface: true}, + "Operator": {fullName: "opt.Operator", passByVal: true}, + "ColumnID": {fullName: "opt.ColumnID", passByVal: true}, + "ColSet": {fullName: "opt.ColSet", passByVal: true}, + "ColList": {fullName: "opt.ColList", passByVal: true}, + "TableID": {fullName: "opt.TableID", passByVal: true}, + "SchemaID": {fullName: "opt.SchemaID", passByVal: true}, + "SequenceID": {fullName: "opt.SequenceID", passByVal: true}, + "UniqueID": {fullName: "opt.UniqueID", passByVal: true}, + "WithID": {fullName: "opt.WithID", passByVal: true}, + "Ordering": {fullName: "opt.Ordering", passByVal: true}, + "OrderingChoice": {fullName: "physical.OrderingChoice", passByVal: true}, + "TupleOrdinal": {fullName: "memo.TupleOrdinal", passByVal: true}, + "ScanLimit": {fullName: "memo.ScanLimit", passByVal: true}, + "ScanFlags": {fullName: "memo.ScanFlags", passByVal: true}, + "JoinFlags": {fullName: "memo.JoinFlags", passByVal: true}, + "WindowFrame": {fullName: "memo.WindowFrame", passByVal: true}, + "FKCascades": {fullName: "memo.FKCascades", passByVal: true}, + "ExplainOptions": {fullName: "tree.ExplainOptions", passByVal: true}, + "StatementType": {fullName: "tree.StatementType", passByVal: true}, + "ShowTraceType": {fullName: "tree.ShowTraceType", passByVal: true}, + "bool": {fullName: "bool", passByVal: true}, + "int": {fullName: "int", passByVal: true}, + "string": {fullName: "string", passByVal: true}, + "Type": {fullName: "types.T", isPointer: true}, + "Datum": {fullName: "tree.Datum", isInterface: true}, + "TypedExpr": {fullName: "tree.TypedExpr", isInterface: true}, + "Statement": {fullName: "tree.Statement", isInterface: true}, + "Subquery": {fullName: "tree.Subquery", isPointer: true, usePointerIntern: true}, + "CreateTable": {fullName: "tree.CreateTable", isPointer: true, usePointerIntern: true}, + "Constraint": {fullName: "constraint.Constraint", isPointer: true, usePointerIntern: true}, + "FuncProps": {fullName: "tree.FunctionProperties", isPointer: true, usePointerIntern: true}, + "FuncOverload": {fullName: "tree.Overload", isPointer: true, usePointerIntern: true}, + "PhysProps": {fullName: "physical.Required", isPointer: true}, + "Presentation": {fullName: "physical.Presentation", passByVal: true}, + "RelProps": {fullName: "props.Relational"}, + "RelPropsPtr": {fullName: "props.Relational", isPointer: true, usePointerIntern: true}, + "ScalarProps": {fullName: "props.Scalar"}, + "FuncDepSet": {fullName: "props.FuncDepSet"}, + "JoinMultiplicity": {fullName: "props.JoinMultiplicity"}, + "OpaqueMetadata": {fullName: "opt.OpaqueMetadata", isInterface: true}, + "JobCommand": {fullName: "tree.JobCommand", passByVal: true}, + "IndexOrdinal": {fullName: "cat.IndexOrdinal", passByVal: true}, + "ViewDeps": {fullName: "opt.ViewDeps", passByVal: true}, + "LockingItem": {fullName: "tree.LockingItem", isPointer: true}, + "MaterializeClause": {fullName: "tree.MaterializeClause", passByVal: true}, + "SpanExpression": {fullName: "invertedexpr.SpanExpression", isPointer: true, usePointerIntern: true}, } // Add types of generated op and private structs. diff --git a/pkg/sql/opt/optgen/cmd/optgen/testdata/exprs b/pkg/sql/opt/optgen/cmd/optgen/testdata/exprs index c2fbab376808..c69d21555683 100644 --- a/pkg/sql/opt/optgen/cmd/optgen/testdata/exprs +++ b/pkg/sql/opt/optgen/cmd/optgen/testdata/exprs @@ -49,7 +49,6 @@ package memo import ( "unsafe" - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -524,7 +523,6 @@ package memo import ( "unsafe" - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" diff --git a/pkg/sql/opt/xform/coster.go b/pkg/sql/opt/xform/coster.go index 634e81d184d4..15f20c4547f9 100644 --- a/pkg/sql/opt/xform/coster.go +++ b/pkg/sql/opt/xform/coster.go @@ -187,8 +187,8 @@ func (c *coster) ComputeCost(candidate memo.RelExpr, required *physical.Required case opt.LookupJoinOp: cost = c.computeLookupJoinCost(candidate.(*memo.LookupJoinExpr), required) - case opt.GeoLookupJoinOp: - cost = c.computeGeoLookupJoinCost(candidate.(*memo.GeoLookupJoinExpr), required) + case opt.InvertedJoinOp: + cost = c.computeInvertedJoinCost(candidate.(*memo.InvertedJoinExpr), required) case opt.ZigzagJoinOp: cost = c.computeZigzagJoinCost(candidate.(*memo.ZigzagJoinExpr)) @@ -483,8 +483,8 @@ func (c *coster) computeLookupJoinCost( return cost } -func (c *coster) computeGeoLookupJoinCost( - join *memo.GeoLookupJoinExpr, required *physical.Required, +func (c *coster) computeInvertedJoinCost( + join *memo.InvertedJoinExpr, required *physical.Required, ) memo.Cost { lookupCount := join.Input.Relational().Stats.RowCount @@ -501,7 +501,7 @@ func (c *coster) computeGeoLookupJoinCost( // We shouldn't ever get here. Since we don't allow the memo // to be optimized twice, the coster should never be used after // logPropsBuilder.clear() is called. - panic(errors.AssertionFailedf("could not get rows processed for geolookup join")) + panic(errors.AssertionFailedf("could not get rows processed for inverted join")) } // Lookup joins can return early if enough rows have been found. An otherwise diff --git a/pkg/sql/opt/xform/custom_funcs.go b/pkg/sql/opt/xform/custom_funcs.go index 04dce8864402..e2d349f685de 100644 --- a/pkg/sql/opt/xform/custom_funcs.go +++ b/pkg/sql/opt/xform/custom_funcs.go @@ -975,7 +975,9 @@ func (c *CustomFuncs) GenerateInvertedIndexScans( // Check whether the filter can constrain the index. // TODO(rytaft): Unify these two cases so both return an invertedConstraint. - invertedConstraint, geoOk = c.tryConstrainGeoIndex(filters, scanPrivate.Table, iter.Index()) + invertedConstraint, geoOk = tryConstrainGeoIndex( + c.e.evalCtx.Context, filters, scanPrivate.Table, iter.Index(), + ) if geoOk { // Geo index scans can never be tight, so remaining filters is always the // same as filters. @@ -1153,193 +1155,6 @@ func (c *CustomFuncs) canMaybeConstrainIndex( return false } -// getSpanExprForGeoIndexFn is a function that returns a SpanExpression that -// constrains the given geo index according to the given constant and -// geospatial relationship. It is implemented by getSpanExprForGeographyIndex -// and getSpanExprForGeometryIndex and used in constrainGeoIndex. -type getSpanExprForGeoIndexFn func( - tree.Datum, geoindex.RelationshipType, cat.Index, -) *invertedexpr.SpanExpression - -// tryConstrainGeoIndex tries to derive an inverted index constraint for the -// given geospatial index from the specified filters. If a constraint is -// derived, it is returned with ok=true. If no constraint can be derived, -// then tryConstrainGeoIndex returns ok=false. -func (c *CustomFuncs) tryConstrainGeoIndex( - filters memo.FiltersExpr, tabID opt.TableID, index cat.Index, -) (invertedConstraint *invertedexpr.SpanExpression, ok bool) { - config := index.GeoConfig() - var getSpanExpr getSpanExprForGeoIndexFn - if geoindex.IsGeographyConfig(config) { - getSpanExpr = c.getSpanExprForGeographyIndex - } else if geoindex.IsGeometryConfig(config) { - getSpanExpr = c.getSpanExprForGeometryIndex - } else { - return nil, false - } - - var invertedExpr invertedexpr.InvertedExpression - for i := range filters { - invertedExprLocal := c.constrainGeoIndex(filters[i].Condition, tabID, index, getSpanExpr) - if invertedExpr == nil { - invertedExpr = invertedExprLocal - } else { - invertedExpr = invertedexpr.And(invertedExpr, invertedExprLocal) - } - } - - if invertedExpr == nil { - return nil, false - } - - spanExpr, ok := invertedExpr.(*invertedexpr.SpanExpression) - if !ok { - return nil, false - } - - return spanExpr, true -} - -// getSpanExprForGeographyIndex gets a SpanExpression that constrains the given -// geography index according to the given constant and geospatial relationship. -func (c *CustomFuncs) getSpanExprForGeographyIndex( - d tree.Datum, relationship geoindex.RelationshipType, index cat.Index, -) *invertedexpr.SpanExpression { - geogIdx := geoindex.NewS2GeographyIndex(*index.GeoConfig().S2Geography) - geog := d.(*tree.DGeography).Geography - var spanExpr *invertedexpr.SpanExpression - - switch relationship { - case geoindex.Covers: - unionKeySpans, err := geogIdx.Covers(c.e.evalCtx.Context, geog) - if err != nil { - panic(err) - } - spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) - - case geoindex.CoveredBy: - rpKeyExpr, err := geogIdx.CoveredBy(c.e.evalCtx.Context, geog) - if err != nil { - panic(err) - } - if spanExpr, err = invertedexpr.GeoRPKeyExprToSpanExpr(rpKeyExpr); err != nil { - panic(err) - } - - case geoindex.Intersects: - unionKeySpans, err := geogIdx.Intersects(c.e.evalCtx.Context, geog) - if err != nil { - panic(err) - } - spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) - - default: - panic(errors.AssertionFailedf("unhandled relationship: %v", relationship)) - } - - return spanExpr -} - -// getSpanExprForGeometryIndex gets a SpanExpression that constrains the given -// geometry index according to the given constant and geospatial relationship. -func (c *CustomFuncs) getSpanExprForGeometryIndex( - d tree.Datum, relationship geoindex.RelationshipType, index cat.Index, -) *invertedexpr.SpanExpression { - geomIdx := geoindex.NewS2GeometryIndex(*index.GeoConfig().S2Geometry) - geom := d.(*tree.DGeometry).Geometry - var spanExpr *invertedexpr.SpanExpression - - switch relationship { - case geoindex.Covers: - unionKeySpans, err := geomIdx.Covers(c.e.evalCtx.Context, geom) - if err != nil { - panic(err) - } - spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) - - case geoindex.CoveredBy: - rpKeyExpr, err := geomIdx.CoveredBy(c.e.evalCtx.Context, geom) - if err != nil { - panic(err) - } - if spanExpr, err = invertedexpr.GeoRPKeyExprToSpanExpr(rpKeyExpr); err != nil { - panic(err) - } - - case geoindex.Intersects: - unionKeySpans, err := geomIdx.Intersects(c.e.evalCtx.Context, geom) - if err != nil { - panic(err) - } - spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) - - default: - panic(errors.AssertionFailedf("unhandled relationship: %v", relationship)) - } - - return spanExpr -} - -// constrainGeoIndex returns an InvertedExpression representing a constraint -// of the given geospatial index. -func (c *CustomFuncs) constrainGeoIndex( - expr opt.ScalarExpr, tabID opt.TableID, index cat.Index, getSpanExpr getSpanExprForGeoIndexFn, -) (_ invertedexpr.InvertedExpression) { - var fn *memo.FunctionExpr - switch t := expr.(type) { - case *memo.AndExpr: - return invertedexpr.And( - c.constrainGeoIndex(t.Left, tabID, index, getSpanExpr), - c.constrainGeoIndex(t.Right, tabID, index, getSpanExpr), - ) - - case *memo.OrExpr: - return invertedexpr.Or( - c.constrainGeoIndex(t.Left, tabID, index, getSpanExpr), - c.constrainGeoIndex(t.Right, tabID, index, getSpanExpr), - ) - - case *memo.FunctionExpr: - fn = t - - default: - return invertedexpr.NonInvertedColExpression{} - } - - if !IsGeoIndexFunction(fn) { - return invertedexpr.NonInvertedColExpression{} - } - - if fn.Args.ChildCount() < 2 { - panic(errors.AssertionFailedf( - "all index-accelerated geospatial functions should have at least two arguments", - )) - } - - // The first argument should be a constant. - if !memo.CanExtractConstDatum(fn.Args.Child(0)) { - return invertedexpr.NonInvertedColExpression{} - } - d := memo.ExtractConstDatum(fn.Args.Child(0)) - - // The second argument should be a variable corresponding to the index - // column. - variable, ok := fn.Args.Child(1).(*memo.VariableExpr) - if !ok { - // TODO(rytaft): Commute the geospatial function in this case. - // Covers <-> CoveredBy - // Intersects <-> Intersects - return invertedexpr.NonInvertedColExpression{} - } - if variable.Col != tabID.ColumnID(index.Column(0).Ordinal) { - // The column in the function does not match the index column. - return invertedexpr.NonInvertedColExpression{} - } - - relationship := geoRelationshipMap[fn.Name] - return getSpanExpr(d, relationship, index) -} - // ---------------------------------------------------------------------- // // Limit Rules @@ -2014,6 +1829,10 @@ func (c *CustomFuncs) GenerateLookupJoins( // covering, all geospatial lookup joins must be wrapped in an index join with // the primary index of the table. See the description of Case 2 in the comment // above GenerateLookupJoins for details about how this works. +// TODO(rytaft): generalize this function to be GenerateInvertedJoins and add +// support for JSON and array inverted indexes. +// TODO(rytaft): handle more complicated geo-spatial expressions +// e.g. ST_Intersects(x, y) AND ST_Covers(x, y) where y is the indexed value. func (c *CustomFuncs) GenerateGeoLookupJoins( grp memo.RelExpr, joinType opt.Operator, @@ -2033,18 +1852,16 @@ func (c *CustomFuncs) GenerateGeoLookupJoins( return } - function := fn.(*memo.FunctionExpr) - inputProps := input.Relational() - - // Extract the geospatial relationship as well as the variable inputs to - // the geospatial function. - relationship, ok := geoRelationshipMap[function.Name] - if !ok { + if !IsGeoIndexFunction(fn) { panic(errors.AssertionFailedf( "GenerateGeoLookupJoins called on a function that cannot be index-accelerated", )) } + function := fn.(*memo.FunctionExpr) + inputProps := input.Relational() + + // Extract the the variable inputs to the geospatial function. if function.Args.ChildCount() < 2 { panic(errors.AssertionFailedf( "all index-accelerated geospatial functions should have at least two arguments", @@ -2100,27 +1917,27 @@ func (c *CustomFuncs) GenerateGeoLookupJoins( // primary key columns from it. indexCols := pkCols.ToSet() - lookupJoin := memo.GeoLookupJoinExpr{Input: input} + lookupJoin := memo.InvertedJoinExpr{Input: input} lookupJoin.JoinPrivate = *joinPrivate lookupJoin.JoinType = joinType lookupJoin.Table = scanPrivate.Table lookupJoin.Index = iter.IndexOrdinal() - lookupJoin.GeoRelationshipType = relationship - lookupJoin.GeoCol = inputGeoCol + lookupJoin.InvertedExpr = function + lookupJoin.InputCol = inputGeoCol lookupJoin.Cols = indexCols.Union(inputProps.OutputCols) var indexJoin memo.LookupJoinExpr // ON may have some conditions that are bound by the columns in the index // and some conditions that refer to other columns. We can put the former - // in the GeospatialLookupJoin and the latter in the index join. + // in the InvertedJoin and the latter in the index join. lookupJoin.On = c.ExtractBoundConditions(on, lookupJoin.Cols) indexJoin.On = c.ExtractUnboundConditions(on, lookupJoin.Cols) - indexJoin.Input = c.e.f.ConstructGeoLookupJoin( + indexJoin.Input = c.e.f.ConstructInvertedJoin( lookupJoin.Input, lookupJoin.On, - &lookupJoin.GeoLookupJoinPrivate, + &lookupJoin.InvertedJoinPrivate, ) indexJoin.JoinType = joinType indexJoin.Table = scanPrivate.Table @@ -2134,44 +1951,12 @@ func (c *CustomFuncs) GenerateGeoLookupJoins( } } -// geoRelationshipMap contains all the geospatial functions that can be index- -// accelerated. Each function implies a certain type of geospatial relationship, -// which affects how the index is queried as part of a constrained scan or -// geospatial lookup join. geoRelationshipMap maps the function name to its -// corresponding relationship (Covers, CoveredBy, or Intersects). -// -// Note that for all of these functions, a geospatial lookup join or constrained -// index scan may produce false positives. Therefore, the original function must -// be called on the output of the index operation to filter the results. -// TODO(rytaft): add ST_DFullyWithin (geoindex.Covers) and ST_DWithin -// (geoindex.Intersects) once we add support for extending a geometry. -var geoRelationshipMap = map[string]geoindex.RelationshipType{ - "st_covers": geoindex.Covers, - "st_coveredby": geoindex.CoveredBy, - "st_contains": geoindex.Covers, - "st_containsproperly": geoindex.Covers, - "st_crosses": geoindex.Intersects, - "st_equals": geoindex.Intersects, - "st_intersects": geoindex.Intersects, - "st_overlaps": geoindex.Intersects, - "st_touches": geoindex.Intersects, - "st_within": geoindex.CoveredBy, -} - // IsGeoIndexFunction returns true if the given function is a geospatial // function that can be index-accelerated. func (c *CustomFuncs) IsGeoIndexFunction(fn opt.ScalarExpr) bool { return IsGeoIndexFunction(fn) } -// IsGeoIndexFunction returns true if the given function is a geospatial -// function that can be index-accelerated. -func IsGeoIndexFunction(fn opt.ScalarExpr) bool { - function := fn.(*memo.FunctionExpr) - _, ok := geoRelationshipMap[function.Name] - return ok -} - // HasAllVariableArgs returns true if all the arguments to the given function // are variables. func (c *CustomFuncs) HasAllVariableArgs(fn opt.ScalarExpr) bool { @@ -3253,3 +3038,15 @@ func (c *CustomFuncs) AddPrimaryKeyColsToScanPrivate(sp *memo.ScanPrivate) *memo Locking: sp.Locking, } } + +// NewDatumToInvertedExpr returns a new DatumToInvertedExpr. Currently there +// is only one possible implementation returned, geoDatumToInvertedExpr. +func NewDatumToInvertedExpr( + expr tree.TypedExpr, desc *sqlbase.IndexDescriptor, +) (invertedexpr.DatumToInvertedExpr, error) { + if geoindex.IsEmptyConfig(&desc.GeoConfig) { + return nil, fmt.Errorf("inverted joins are currently only supported for geospatial indexes") + } + + return NewGeoDatumToInvertedExpr(expr, &desc.GeoConfig) +} diff --git a/pkg/sql/opt/xform/geo.go b/pkg/sql/opt/xform/geo.go new file mode 100644 index 000000000000..3e847467a61e --- /dev/null +++ b/pkg/sql/opt/xform/geo.go @@ -0,0 +1,323 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package xform + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/geo/geoindex" + "github.com/cockroachdb/cockroach/pkg/sql/opt" + "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" + "github.com/cockroachdb/cockroach/pkg/sql/opt/invertedexpr" + "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/errors" +) + +// This file contains functions for building geospatial inverted index scans +// and joins that are used throughout the xform package. + +// geoRelationshipMap contains all the geospatial functions that can be index- +// accelerated. Each function implies a certain type of geospatial relationship, +// which affects how the index is queried as part of a constrained scan or +// geospatial lookup join. geoRelationshipMap maps the function name to its +// corresponding relationship (Covers, CoveredBy, or Intersects). +// +// Note that for all of these functions, a geospatial lookup join or constrained +// index scan may produce false positives. Therefore, the original function must +// be called on the output of the index operation to filter the results. +// TODO(rytaft): add ST_DFullyWithin (geoindex.Covers) and ST_DWithin +// (geoindex.Intersects) once we add support for extending a geometry. +var geoRelationshipMap = map[string]geoindex.RelationshipType{ + "st_covers": geoindex.Covers, + "st_coveredby": geoindex.CoveredBy, + "st_contains": geoindex.Covers, + "st_containsproperly": geoindex.Covers, + "st_crosses": geoindex.Intersects, + "st_equals": geoindex.Intersects, + "st_intersects": geoindex.Intersects, + "st_overlaps": geoindex.Intersects, + "st_touches": geoindex.Intersects, + "st_within": geoindex.CoveredBy, +} + +// IsGeoIndexFunction returns true if the given function is a geospatial +// function that can be index-accelerated. +func IsGeoIndexFunction(fn opt.ScalarExpr) bool { + function := fn.(*memo.FunctionExpr) + _, ok := geoRelationshipMap[function.Name] + return ok +} + +// getSpanExprForGeoIndexFn is a function that returns a SpanExpression that +// constrains the given geo index according to the given constant and +// geospatial relationship. It is implemented by getSpanExprForGeographyIndex +// and getSpanExprForGeometryIndex and used in constrainGeoIndex. +type getSpanExprForGeoIndexFn func( + context.Context, tree.Datum, geoindex.RelationshipType, *geoindex.Config, +) *invertedexpr.SpanExpression + +// tryConstrainGeoIndex tries to derive an inverted index constraint for the +// given geospatial index from the specified filters. If a constraint is +// derived, it is returned with ok=true. If no constraint can be derived, +// then tryConstrainGeoIndex returns ok=false. +func tryConstrainGeoIndex( + ctx context.Context, filters memo.FiltersExpr, tabID opt.TableID, index cat.Index, +) (invertedConstraint *invertedexpr.SpanExpression, ok bool) { + config := index.GeoConfig() + var getSpanExpr getSpanExprForGeoIndexFn + if geoindex.IsGeographyConfig(config) { + getSpanExpr = getSpanExprForGeographyIndex + } else if geoindex.IsGeometryConfig(config) { + getSpanExpr = getSpanExprForGeometryIndex + } else { + return nil, false + } + + var invertedExpr invertedexpr.InvertedExpression + for i := range filters { + invertedExprLocal := constrainGeoIndex( + ctx, filters[i].Condition, tabID, index, getSpanExpr, + ) + if invertedExpr == nil { + invertedExpr = invertedExprLocal + } else { + invertedExpr = invertedexpr.And(invertedExpr, invertedExprLocal) + } + } + + if invertedExpr == nil { + return nil, false + } + + spanExpr, ok := invertedExpr.(*invertedexpr.SpanExpression) + if !ok { + return nil, false + } + + return spanExpr, true +} + +// getSpanExprForGeographyIndex gets a SpanExpression that constrains the given +// geography index according to the given constant and geospatial relationship. +func getSpanExprForGeographyIndex( + ctx context.Context, + d tree.Datum, + relationship geoindex.RelationshipType, + indexConfig *geoindex.Config, +) *invertedexpr.SpanExpression { + geogIdx := geoindex.NewS2GeographyIndex(*indexConfig.S2Geography) + geog := d.(*tree.DGeography).Geography + var spanExpr *invertedexpr.SpanExpression + + switch relationship { + case geoindex.Covers: + unionKeySpans, err := geogIdx.Covers(ctx, geog) + if err != nil { + panic(err) + } + spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) + + case geoindex.CoveredBy: + rpKeyExpr, err := geogIdx.CoveredBy(ctx, geog) + if err != nil { + panic(err) + } + if spanExpr, err = invertedexpr.GeoRPKeyExprToSpanExpr(rpKeyExpr); err != nil { + panic(err) + } + + case geoindex.Intersects: + unionKeySpans, err := geogIdx.Intersects(ctx, geog) + if err != nil { + panic(err) + } + spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) + + default: + panic(errors.AssertionFailedf("unhandled relationship: %v", relationship)) + } + + return spanExpr +} + +// getSpanExprForGeometryIndex gets a SpanExpression that constrains the given +// geometry index according to the given constant and geospatial relationship. +func getSpanExprForGeometryIndex( + ctx context.Context, + d tree.Datum, + relationship geoindex.RelationshipType, + indexConfig *geoindex.Config, +) *invertedexpr.SpanExpression { + geomIdx := geoindex.NewS2GeometryIndex(*indexConfig.S2Geometry) + geom := d.(*tree.DGeometry).Geometry + var spanExpr *invertedexpr.SpanExpression + + switch relationship { + case geoindex.Covers: + unionKeySpans, err := geomIdx.Covers(ctx, geom) + if err != nil { + panic(err) + } + spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) + + case geoindex.CoveredBy: + rpKeyExpr, err := geomIdx.CoveredBy(ctx, geom) + if err != nil { + panic(err) + } + if spanExpr, err = invertedexpr.GeoRPKeyExprToSpanExpr(rpKeyExpr); err != nil { + panic(err) + } + + case geoindex.Intersects: + unionKeySpans, err := geomIdx.Intersects(ctx, geom) + if err != nil { + panic(err) + } + spanExpr = invertedexpr.GeoUnionKeySpansToSpanExpr(unionKeySpans) + + default: + panic(errors.AssertionFailedf("unhandled relationship: %v", relationship)) + } + + return spanExpr +} + +// constrainGeoIndex returns an InvertedExpression representing a constraint +// of the given geospatial index. +func constrainGeoIndex( + ctx context.Context, + expr opt.ScalarExpr, + tabID opt.TableID, + index cat.Index, + getSpanExpr getSpanExprForGeoIndexFn, +) (_ invertedexpr.InvertedExpression) { + var fn *memo.FunctionExpr + switch t := expr.(type) { + case *memo.AndExpr: + return invertedexpr.And( + constrainGeoIndex(ctx, t.Left, tabID, index, getSpanExpr), + constrainGeoIndex(ctx, t.Right, tabID, index, getSpanExpr), + ) + + case *memo.OrExpr: + return invertedexpr.Or( + constrainGeoIndex(ctx, t.Left, tabID, index, getSpanExpr), + constrainGeoIndex(ctx, t.Right, tabID, index, getSpanExpr), + ) + + case *memo.FunctionExpr: + fn = t + + default: + return invertedexpr.NonInvertedColExpression{} + } + + if !IsGeoIndexFunction(fn) { + return invertedexpr.NonInvertedColExpression{} + } + + if fn.Args.ChildCount() < 2 { + panic(errors.AssertionFailedf( + "all index-accelerated geospatial functions should have at least two arguments", + )) + } + + // The first argument should be a constant. + if !memo.CanExtractConstDatum(fn.Args.Child(0)) { + return invertedexpr.NonInvertedColExpression{} + } + d := memo.ExtractConstDatum(fn.Args.Child(0)) + + // The second argument should be a variable corresponding to the index + // column. + variable, ok := fn.Args.Child(1).(*memo.VariableExpr) + if !ok { + // TODO(rytaft): Commute the geospatial function in this case. + // Covers <-> CoveredBy + // Intersects <-> Intersects + return invertedexpr.NonInvertedColExpression{} + } + if variable.Col != tabID.ColumnID(index.Column(0).Ordinal) { + // The column in the function does not match the index column. + return invertedexpr.NonInvertedColExpression{} + } + + relationship := geoRelationshipMap[fn.Name] + return getSpanExpr(ctx, d, relationship, index.GeoConfig()) +} + +// geoDatumToInvertedExpr implements invertedexpr.DatumToInvertedExpr for +// geospatial columns. +type geoDatumToInvertedExpr struct { + relationship geoindex.RelationshipType + indexConfig *geoindex.Config + typ *types.T + getSpanExpr getSpanExprForGeoIndexFn + alloc sqlbase.DatumAlloc +} + +var _ invertedexpr.DatumToInvertedExpr = &geoDatumToInvertedExpr{} + +// NewGeoDatumToInvertedExpr returns a new geoDatumToInvertedExpr. +func NewGeoDatumToInvertedExpr( + expr tree.TypedExpr, config *geoindex.Config, +) (invertedexpr.DatumToInvertedExpr, error) { + if geoindex.IsEmptyConfig(config) { + return nil, fmt.Errorf("inverted joins are currently only supported for geospatial indexes") + } + + fn, ok := expr.(*tree.FuncExpr) + if !ok { + return nil, fmt.Errorf("inverted joins are currently only supported for single geospatial functions") + } + + name := fn.Func.FunctionReference.String() + relationship, ok := geoRelationshipMap[name] + if !ok { + return nil, fmt.Errorf("%s cannot be index-accelerated", name) + } + + g := &geoDatumToInvertedExpr{ + relationship: relationship, + indexConfig: config, + } + if geoindex.IsGeographyConfig(config) { + g.typ = types.Geography + g.getSpanExpr = getSpanExprForGeographyIndex + } else if geoindex.IsGeometryConfig(config) { + g.typ = types.Geometry + g.getSpanExpr = getSpanExprForGeometryIndex + } else { + panic(errors.AssertionFailedf("not a geography or geometry index")) + } + + return g, nil +} + +// Convert implements the invertedexpr.DatumToInvertedExpr interface. +func (g *geoDatumToInvertedExpr) Convert( + ctx context.Context, d sqlbase.EncDatum, +) (*invertedexpr.SpanExpressionProto, error) { + if err := d.EnsureDecoded(g.typ, &g.alloc); err != nil { + return nil, err + } + spanExpr := g.getSpanExpr(ctx, d.Datum, g.relationship, g.indexConfig) + return spanExpr.ToProto(), nil +} + +func (g *geoDatumToInvertedExpr) String() string { + return fmt.Sprintf("geo-relationship: %v", g.relationship) +} diff --git a/pkg/sql/opt/xform/testdata/rules/join b/pkg/sql/opt/xform/testdata/rules/join index dc2db718c829..c99c215ee4b6 100644 --- a/pkg/sql/opt/xform/testdata/rules/join +++ b/pkg/sql/opt/xform/testdata/rules/join @@ -1668,9 +1668,10 @@ project │ │ ├── lookup columns are key │ │ ├── immutable │ │ ├── fd: (9)==(12), (12)==(9) - │ │ ├── inner-join (geo-lookup nyc_census_blocks@nyc_census_blocks_geo_idx) + │ │ ├── inner-join (inverted-lookup nyc_census_blocks@nyc_census_blocks_geo_idx) │ │ │ ├── columns: c.gid:1!null n.boroname:12 name:13!null n.geom:14 - │ │ │ ├── geo-relationship: intersects + │ │ │ ├── inverted-expr + │ │ │ │ └── st_intersects(n.geom:14, c.geom:10) │ │ │ ├── select │ │ │ │ ├── columns: n.boroname:12 name:13!null n.geom:14 │ │ │ │ ├── scan n @@ -1723,9 +1724,9 @@ memo (optimized, ~23KB, required=[presentation: name:13,popn_per_sqkm:16]) │ ├── best: (select G14 G15) │ └── cost: 139.35 ├── G9: (filters G16 G17) - ├── G10: (geo-lookup-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) + ├── G10: (inverted-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) │ └── [] - │ ├── best: (geo-lookup-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) + │ ├── best: (inverted-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) │ └── cost: 1754.40 ├── G11: (sum G19) ├── G12: (variable sum) diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 82797b947367..30e3c3a6f6cf 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -19,7 +19,6 @@ import ( "net/url" "strings" - "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -355,17 +354,13 @@ func (ef *execFactory) ConstructHashJoin( pred := makePredicate(joinType, leftSrc.columns, rightSrc.columns) numEqCols := len(leftEqCols) - // Save some allocations by putting both sides in the same slice. - intBuf := make([]int, 2*numEqCols) - pred.leftEqualityIndices = intBuf[:numEqCols:numEqCols] - pred.rightEqualityIndices = intBuf[numEqCols:] + pred.leftEqualityIndices = leftEqCols + pred.rightEqualityIndices = rightEqCols nameBuf := make(tree.NameList, 2*numEqCols) pred.leftColNames = nameBuf[:numEqCols:numEqCols] pred.rightColNames = nameBuf[numEqCols:] for i := range leftEqCols { - pred.leftEqualityIndices[i] = int(leftEqCols[i]) - pred.rightEqualityIndices[i] = int(rightEqCols[i]) pred.leftColNames[i] = tree.Name(leftSrc.columns[leftEqCols[i]].Name) pred.rightColNames[i] = tree.Name(rightSrc.columns[rightEqCols[i]].Name) } @@ -400,40 +395,29 @@ func (ef *execFactory) ConstructMergeJoin( reqOrdering exec.OutputOrdering, leftEqColsAreKey, rightEqColsAreKey bool, ) (exec.Node, error) { + var err error p := ef.planner leftSrc := asDataSource(left) rightSrc := asDataSource(right) pred := makePredicate(joinType, leftSrc.columns, rightSrc.columns) pred.onCond = pred.iVarHelper.Rebind(onCond) + node := p.makeJoinNode(leftSrc, rightSrc, pred) pred.leftEqKey = leftEqColsAreKey pred.rightEqKey = rightEqColsAreKey - n := len(leftOrdering) - if n == 0 || len(rightOrdering) != n { - return nil, errors.Errorf("orderings from the left and right side must be the same non-zero length") + pred.leftEqualityIndices, pred.rightEqualityIndices, node.mergeJoinOrdering, err = getEqualityIndicesAndMergeJoinOrdering(leftOrdering, rightOrdering) + if err != nil { + return nil, err } - pred.leftEqualityIndices = make([]int, n) - pred.rightEqualityIndices = make([]int, n) + n := len(leftOrdering) pred.leftColNames = make(tree.NameList, n) pred.rightColNames = make(tree.NameList, n) for i := 0; i < n; i++ { leftColIdx, rightColIdx := leftOrdering[i].ColIdx, rightOrdering[i].ColIdx - pred.leftEqualityIndices[i] = leftColIdx - pred.rightEqualityIndices[i] = rightColIdx pred.leftColNames[i] = tree.Name(leftSrc.columns[leftColIdx].Name) pred.rightColNames[i] = tree.Name(rightSrc.columns[rightColIdx].Name) } - node := p.makeJoinNode(leftSrc, rightSrc, pred) - node.mergeJoinOrdering = make(sqlbase.ColumnOrdering, n) - for i := 0; i < n; i++ { - // The mergeJoinOrdering "columns" are equality column indices. Because of - // the way we constructed the equality indices, the ordering will always be - // 0,1,2,3.. - node.mergeJoinOrdering[i].ColIdx = i - node.mergeJoinOrdering[i].Direction = leftOrdering[i].Direction - } - // Set up node.props, which tells the distsql planner to maintain the // resulting ordering (if needed). node.reqOrdering = ReqOrdering(reqOrdering) @@ -744,13 +728,13 @@ func (ef *execFactory) constructVirtualTableLookupJoin( return n, nil } -func (ef *execFactory) ConstructGeoLookupJoin( +func (ef *execFactory) ConstructInvertedJoin( joinType sqlbase.JoinType, - geoRelationshipType geoindex.RelationshipType, + invertedExpr tree.TypedExpr, input exec.Node, table cat.Table, index cat.Index, - geoCol exec.NodeColumnOrdinal, + inputCol exec.NodeColumnOrdinal, lookupCols exec.TableColumnOrdinalSet, onCond tree.TypedExpr, reqOrdering exec.OutputOrdering, diff --git a/pkg/sql/physicalplan/physical_plan.go b/pkg/sql/physicalplan/physical_plan.go index 2a43c43d6027..a06dfed1983f 100644 --- a/pkg/sql/physicalplan/physical_plan.go +++ b/pkg/sql/physicalplan/physical_plan.go @@ -959,7 +959,9 @@ func (p *PhysicalPlan) GenerateFlowSpecs() map[roachpb.NodeID]*execinfrapb.FlowS // The result routers for each side are returned (they point at processors in // the merged plan). func MergePlans( - mergedPlan *PhysicalPlan, left, right *PhysicalPlan, + mergedPlan *PhysicalPlan, + left, right *PhysicalPlan, + leftPlanDistribution, rightPlanDistribution PlanDistribution, ) (leftRouters []ProcessorIdx, rightRouters []ProcessorIdx) { mergedPlan.Processors = append(left.Processors, right.Processors...) rightProcStart := ProcessorIdx(len(left.Processors)) @@ -1003,6 +1005,7 @@ func MergePlans( mergedPlan.MaxEstimatedRowCount = left.MaxEstimatedRowCount } + mergedPlan.Distribution = leftPlanDistribution.compose(rightPlanDistribution) return leftRouters, rightRouters } @@ -1277,10 +1280,20 @@ func (p *PhysicalPlan) EnsureSingleStreamPerNode() { } } +// GetLastStageDistribution returns the distribution *only* of the last stage. +// Note that if the last stage consists of a single processor planned on a +// remote node, such stage is considered distributed. +func (p *PhysicalPlan) GetLastStageDistribution() PlanDistribution { + if len(p.ResultRouters) == 1 && p.Processors[p.ResultRouters[0]].Node == p.GatewayNodeID { + return LocalPlan + } + return FullyDistributedPlan +} + // IsLastStageDistributed returns whether the last stage of processors is // distributed (meaning that it contains at least one remote processor). func (p *PhysicalPlan) IsLastStageDistributed() bool { - return len(p.ResultRouters) > 1 || p.Processors[p.ResultRouters[0]].Node != p.GatewayNodeID + return p.GetLastStageDistribution() != LocalPlan } // PlanDistribution describes the distribution of the physical plan. diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index a925a88852b4..7ba9b39a78bf 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/opt/invertedexpr" + "github.com/cockroachdb/cockroach/pkg/sql/opt/xform" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/scrub" @@ -54,14 +55,6 @@ const ( ijEmittingRows ) -// DatumToInvertedExpr is constructed by the caller using -// InvertedJoinerSpec.InvertedExpr -- the invertedJoiner computes the returned -// expression. -type DatumToInvertedExpr interface { - // Convert uses the lookup column to construct an inverted expression. - Convert(sqlbase.EncDatum) (*invertedexpr.SpanExpressionProto, error) -} - type invertedJoiner struct { execinfra.ProcessorBase @@ -110,7 +103,7 @@ type invertedJoiner struct { input execinfra.RowSource inputTypes []*types.T lookupColumnIdx uint32 - datumToInvertedExpr DatumToInvertedExpr + datumToInvertedExpr invertedexpr.DatumToInvertedExpr // Batch size for fetches. Not a constant so we can lower for testing. batchSize int @@ -158,7 +151,7 @@ func newInvertedJoiner( flowCtx *execinfra.FlowCtx, processorID int32, spec *execinfrapb.InvertedJoinerSpec, - datumToInvertedExpr DatumToInvertedExpr, + datumToInvertedExpr invertedexpr.DatumToInvertedExpr, input execinfra.RowSource, post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, @@ -231,6 +224,17 @@ func newInvertedJoiner( } ij.combinedRow = make(sqlbase.EncDatumRow, 0, len(onExprColTypes)) + if ij.datumToInvertedExpr == nil { + var invertedExprHelper execinfra.ExprHelper + if err := invertedExprHelper.Init(spec.InvertedExpr, onExprColTypes, ij.EvalCtx); err != nil { + return nil, err + } + ij.datumToInvertedExpr, err = xform.NewDatumToInvertedExpr(invertedExprHelper.Expr, ij.index) + if err != nil { + return nil, err + } + } + var fetcher row.Fetcher // In general we need all the columns in the index to compute the set // expression. There may be InvertedJoinerSpec.InvertedExpr that are known @@ -387,7 +391,7 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce // result in an empty set as the evaluation result. ij.batchedExprEval.exprs = append(ij.batchedExprEval.exprs, nil) } else { - expr, err := ij.datumToInvertedExpr.Convert(row[ij.lookupColumnIdx]) + expr, err := ij.datumToInvertedExpr.Convert(ij.Ctx, row[ij.lookupColumnIdx]) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() diff --git a/pkg/sql/rowexec/inverted_joiner_test.go b/pkg/sql/rowexec/inverted_joiner_test.go index 320f81b536f2..6be51ef731de 100644 --- a/pkg/sql/rowexec/inverted_joiner_test.go +++ b/pkg/sql/rowexec/inverted_joiner_test.go @@ -59,10 +59,10 @@ const numRows = 99 // 50, since 50%10 = 0, 50/10 = 5. type arrayIntersectionExpr struct{} -var _ DatumToInvertedExpr = &arrayIntersectionExpr{} +var _ invertedexpr.DatumToInvertedExpr = &arrayIntersectionExpr{} func (arrayIntersectionExpr) Convert( - datum sqlbase.EncDatum, + ctx context.Context, datum sqlbase.EncDatum, ) (*invertedexpr.SpanExpressionProto, error) { d := int64(*(datum.Datum.(*tree.DInt))) d1Span := invertedexpr.MakeSingleInvertedValSpan(intToEncodedInvertedVal(d / 10)) @@ -79,10 +79,10 @@ func (arrayIntersectionExpr) Convert( // match a right side row with row index d. type jsonIntersectionExpr struct{} -var _ DatumToInvertedExpr = &jsonIntersectionExpr{} +var _ invertedexpr.DatumToInvertedExpr = &jsonIntersectionExpr{} func (jsonIntersectionExpr) Convert( - datum sqlbase.EncDatum, + ctx context.Context, datum sqlbase.EncDatum, ) (*invertedexpr.SpanExpressionProto, error) { d := int64(*(datum.Datum.(*tree.DInt))) d1 := d / 10 @@ -112,9 +112,11 @@ func (jsonIntersectionExpr) Convert( // {1..9, 15, 25, 35, ..., 95}. type jsonUnionExpr struct{} -var _ DatumToInvertedExpr = &jsonUnionExpr{} +var _ invertedexpr.DatumToInvertedExpr = &jsonUnionExpr{} -func (jsonUnionExpr) Convert(datum sqlbase.EncDatum) (*invertedexpr.SpanExpressionProto, error) { +func (jsonUnionExpr) Convert( + ctx context.Context, datum sqlbase.EncDatum, +) (*invertedexpr.SpanExpressionProto, error) { d := int64(*(datum.Datum.(*tree.DInt))) d1 := d / 10 d2 := d % 10 @@ -173,7 +175,7 @@ func TestInvertedJoiner(t *testing.T) { onExpr string input [][]tree.Datum lookupCol uint32 - datumToExpr DatumToInvertedExpr + datumToExpr invertedexpr.DatumToInvertedExpr joinType sqlbase.JoinType inputTypes []*types.T outputTypes []*types.T