Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
27409: opt: lookup joins execution and costing r=RaduBerinde a=RaduBerinde

This change implements the execution path for lookup joins and enables
them by implementing a cost function.

The execution path is distsql-only. For local execution, I temporarily
added a kludge to reuse the hash join code (with a full table scan on
the lookup side).

Release note: None

Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Jul 18, 2018
2 parents eed443a + 14748ec commit aeb6907
Show file tree
Hide file tree
Showing 18 changed files with 1,299 additions and 574 deletions.
94 changes: 94 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,15 @@ func (dsp *DistSQLPlanner) checkSupportForNode(node planNode) (distRecommendatio
}
return dsp.checkSupportForNode(n.index)

case *lookupJoinNode:
if err := dsp.checkExpr(n.onCond); err != nil {
return 0, err
}
if _, err := dsp.checkSupportForNode(n.input); err != nil {
return 0, err
}
return shouldDistribute, nil

case *groupNode:
rec, err := dsp.checkSupportForNode(n.plan)
if err != nil {
Expand Down Expand Up @@ -1721,6 +1730,88 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin(
return plan, nil
}

// createPlanForLookupJoin creates a distributed plan for a lookupJoinNode.
// Note that this is a separate code path from the experimental path which
// converts joins to lookup joins.
func (dsp *DistSQLPlanner) createPlanForLookupJoin(
planCtx *planningCtx, n *lookupJoinNode,
) (physicalPlan, error) {
plan, err := dsp.createPlanForNode(planCtx, n.input)
if err != nil {
return physicalPlan{}, err
}

joinReaderSpec := distsqlrun.JoinReaderSpec{
Table: *n.table.desc,
Type: n.joinType,
}
joinReaderSpec.IndexIdx, err = getIndexIdx(n.table)
if err != nil {
return physicalPlan{}, err
}
joinReaderSpec.LookupColumns = make([]uint32, len(n.keyCols))
for i, col := range n.keyCols {
if plan.planToStreamColMap[col] == -1 {
panic("lookup column not in planToStreamColMap")
}
joinReaderSpec.LookupColumns[i] = uint32(plan.planToStreamColMap[col])
}

// The n.table node can be configured with an arbitrary set of columns. Apply
// the corresponding projection.
// The internal schema of the join reader is:
// <input columns>... <table columns>...
numLeftCols := len(plan.ResultTypes)
numOutCols := numLeftCols + len(n.table.cols)
post := distsqlrun.PostProcessSpec{Projection: true}

post.OutputColumns = make([]uint32, numOutCols)
types := make([]sqlbase.ColumnType, numOutCols)

for i := 0; i < numLeftCols; i++ {
types[i] = plan.ResultTypes[i]
post.OutputColumns[i] = uint32(i)
}
for i := range n.table.cols {
types[numLeftCols+i] = n.table.cols[i].Type
ord := tableOrdinal(n.table.desc, n.table.cols[i].ID)
post.OutputColumns[numLeftCols+i] = uint32(numLeftCols + ord)
}

// Map the columns of the lookupJoinNode to the result streams of the
// JoinReader.
planToStreamColMap := makePlanToStreamColMap(len(n.columns))
copy(planToStreamColMap, plan.planToStreamColMap)
numInputNodeCols := len(planColumns(n.input))
for i := range n.table.cols {
planToStreamColMap[numInputNodeCols+i] = numLeftCols + i
}

// Set the ON condition.
if n.onCond != nil {
// Note that the ON condition refers to the *internal* columns of the
// processor (before the OutputColumns projection).
indexVarMap := makePlanToStreamColMap(len(n.columns))
copy(indexVarMap, plan.planToStreamColMap)
for i := range n.table.cols {
indexVarMap[numInputNodeCols+i] = int(post.OutputColumns[numLeftCols+i])
}
joinReaderSpec.OnExpr = distsqlplan.MakeExpression(
n.onCond, planCtx.EvalContext(), indexVarMap,
)
}

// Instantiate one join reader for every stream.
plan.AddNoGroupingStage(
distsqlrun.ProcessorCoreUnion{JoinReader: &joinReaderSpec},
post,
types,
dsp.convertOrdering(planPhysicalProps(n), plan.planToStreamColMap),
)
plan.planToStreamColMap = planToStreamColMap
return plan, nil
}

// getTypesForPlanResult returns the types of the elements in the result streams
// of a plan that corresponds to a given planNode. If planToStreamColMap is nil,
// a 1-1 mapping is assumed.
Expand Down Expand Up @@ -2066,6 +2157,9 @@ func (dsp *DistSQLPlanner) createPlanForNode(
case *indexJoinNode:
plan, err = dsp.createPlanForIndexJoin(planCtx, n)

case *lookupJoinNode:
plan, err = dsp.createPlanForLookupJoin(planCtx, n)

case *joinNode:
plan, err = dsp.createPlanForJoin(planCtx, n)

Expand Down
34 changes: 34 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/distsql_lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,40 @@ statement ok
CREATE TABLE distsql_lookup_test_3 (g INT, h INT, INDEX g_idx (g));
INSERT INTO distsql_lookup_test_3 VALUES (NULL, 1)

# Set up the statistics as if the first table is much smaller than the others.
# This will make lookup join into the second table be the best plan.
# TODO(radu): we have to use very small row counts because of the poor row
# count estimation for joins (left-rows * right-rows / 10).
statement ok
ALTER TABLE distsql_lookup_test_1 INJECT STATISTICS '[
{
"columns": ["a"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 1,
"distinct_count": 1
}
]'

statement ok
ALTER TABLE distsql_lookup_test_2 INJECT STATISTICS '[
{
"columns": ["f"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 10,
"distinct_count": 10
}
]'

statement ok
ALTER TABLE distsql_lookup_test_3 INJECT STATISTICS '[
{
"columns": ["g"],
"created_at": "2018-01-01 1:00:00.00000+00:00",
"row_count": 10,
"distinct_count": 10
}
]'

query IIIIII rowsort
SELECT * FROM distsql_lookup_test_1 JOIN distsql_lookup_test_2 ON f = b
----
Expand Down
Loading

0 comments on commit aeb6907

Please sign in to comment.