Skip to content

Commit

Permalink
Merge pull request #52046 from yuzefovich/fix-joins-20.1
Browse files Browse the repository at this point in the history
release-20.1: sql: fix planning of merge joins
  • Loading branch information
yuzefovich authored Jul 29, 2020
2 parents e72a892 + f517c55 commit 97b6fc0
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
5 changes: 3 additions & 2 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2241,9 +2241,10 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
}
}

leftMap := leftPlan.PlanToStreamColMap
rightMap := rightPlan.PlanToStreamColMap
post, joinToStreamColMap := joinOutColumns(n, leftPlan.PlanToStreamColMap, rightMap)
onExpr, err := remapOnExpr(planCtx, n, leftPlan.PlanToStreamColMap, rightMap)
post, joinToStreamColMap := joinOutColumns(n, leftMap, rightMap, len(leftPlan.ResultTypes))
onExpr, err := remapOnExpr(planCtx, n, leftMap, rightMap)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql_plan_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin(

joinType := n.joinType

post, joinToStreamColMap := joinOutColumns(n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap)
post, joinToStreamColMap := joinOutColumns(n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap, len(plans[0].ResultTypes))
onExpr, err := remapOnExpr(planCtx, n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap)
if err != nil {
return PhysicalPlan{}, false, err
Expand Down Expand Up @@ -237,7 +237,7 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin(
}

func joinOutColumns(
n *joinNode, leftPlanToStreamColMap, rightPlanToStreamColMap []int,
n *joinNode, leftPlanToStreamColMap, rightPlanToStreamColMap []int, numAllLeftCols int,
) (post execinfrapb.PostProcessSpec, joinToStreamColMap []int) {
joinToStreamColMap = makePlanToStreamColMap(len(n.columns))
post.Projection = true
Expand All @@ -260,7 +260,7 @@ func joinOutColumns(
if n.pred.joinType != sqlbase.LeftSemiJoin && n.pred.joinType != sqlbase.LeftAntiJoin {
for i := 0; i < n.pred.numRightCols; i++ {
joinToStreamColMap[n.pred.numLeftCols+i] = addOutCol(
uint32(n.pred.numLeftCols + rightPlanToStreamColMap[i]),
uint32(numAllLeftCols + rightPlanToStreamColMap[i]),
)
}
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/distsql_join
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,28 @@ https://cockroachdb.github.io/distsqlplan/decode.html#eJy8ll1v-jYUh-_3KaxzBatRsB
# SELECT url FROM [EXPLAIN (DISTSQL) (SELECT l.k, r.k FROM (SELECT * FROM distsql_mj_test ORDER BY k) l INNER JOIN (SELECT * FROM distsql_mj_test ORDER BY k) r ON l.k = r.k)]
# ----
# https://cockroachdb.github.io/distsqlplan/decode.html#eJyskk9Lw0AQxe9-CpmT0pVk06SHgJBrBVup3qSUmB3jSpqNsxtQSr-7JHtIE8z2D952Z-f35r1ldlAqgYt0ixriV-DAIII1g4pUhlorasq2aS6-IfYZyLKqTVNeM8gUIcQ7MNIUCDEs1J2qvAgYCDSpLNq2PQNVmw7SJs0R4tmeHQhzt_BL-lbgClOB5Pk9eahIblP6SYTURn8Vm-3nxqA2wGBZm_g64TDmgJ_j4FmRQfJ4f3jCJ6PywaUBg_8KOL0g4PT0gOGofKdal4oEEorhThxv-cPjI1KOD0qWSF7YN1rgu7lJ-OT2nmT-YY_dF7EkGI0R9WIc2fAV6kqVGk9acb_JgCJH-yda1ZThE6msHWOvy5ZrCwK1sa8ze5mX9qkxeAhzJxy44cAJRz2YD-GpEw7dk8MzJgdDOHLC_mDyen_1GwAA___yFZqv

# Regression test for incorrectly populating PlanToStreamColMap for the right
# side of merge joins (#51883).
statement ok
CREATE TABLE customer (c_custkey INT8 PRIMARY KEY);
CREATE TABLE orders (
o_orderkey INT8 PRIMARY KEY, o_custkey INT8 NOT NULL,
CONSTRAINT orders_fkey_customer FOREIGN KEY (o_custkey) REFERENCES customer (c_custkey)
);
CREATE TABLE lineitem (
l_orderkey INT8 NOT NULL,
l_linenumber INT8 NOT NULL,
l_quantity FLOAT8 NOT NULL,
PRIMARY KEY (l_orderkey, l_linenumber),
CONSTRAINT lineitem_fkey_orders FOREIGN KEY (l_orderkey) REFERENCES orders (o_orderkey)
);
ALTER TABLE lineitem SPLIT AT SELECT i FROM generate_series(1, 9) AS g(i);
ALTER TABLE lineitem EXPERIMENTAL_RELOCATE SELECT ARRAY[i%5+1], i FROM generate_series(0, 9) AS g(i)

# This query checks that there is no type mismatch during the vectorized flow
# setup which would occur if the stream column indices got messed up.
statement ok
SET vectorize=on;
EXPLAIN (VEC) SELECT sum(l_quantity) FROM customer, orders INNER MERGE JOIN lineitem ON o_orderkey = l_orderkey WHERE o_orderkey IN ( SELECT l_orderkey FROM lineitem GROUP BY l_orderkey HAVING sum(l_quantity) > 300) AND c_custkey = o_custkey;
RESET vectorize

0 comments on commit 97b6fc0

Please sign in to comment.