Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-20.1: sql: fix planning of merge joins #52046

Merged
merged 1 commit into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2241,9 +2241,10 @@ func (dsp *DistSQLPlanner) createPlanForJoin(
}
}

leftMap := leftPlan.PlanToStreamColMap
rightMap := rightPlan.PlanToStreamColMap
post, joinToStreamColMap := joinOutColumns(n, leftPlan.PlanToStreamColMap, rightMap)
onExpr, err := remapOnExpr(planCtx, n, leftPlan.PlanToStreamColMap, rightMap)
post, joinToStreamColMap := joinOutColumns(n, leftMap, rightMap, len(leftPlan.ResultTypes))
onExpr, err := remapOnExpr(planCtx, n, leftMap, rightMap)
if err != nil {
return PhysicalPlan{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql_plan_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin(

joinType := n.joinType

post, joinToStreamColMap := joinOutColumns(n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap)
post, joinToStreamColMap := joinOutColumns(n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap, len(plans[0].ResultTypes))
onExpr, err := remapOnExpr(planCtx, n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap)
if err != nil {
return PhysicalPlan{}, false, err
Expand Down Expand Up @@ -237,7 +237,7 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin(
}

func joinOutColumns(
n *joinNode, leftPlanToStreamColMap, rightPlanToStreamColMap []int,
n *joinNode, leftPlanToStreamColMap, rightPlanToStreamColMap []int, numAllLeftCols int,
) (post execinfrapb.PostProcessSpec, joinToStreamColMap []int) {
joinToStreamColMap = makePlanToStreamColMap(len(n.columns))
post.Projection = true
Expand All @@ -260,7 +260,7 @@ func joinOutColumns(
if n.pred.joinType != sqlbase.LeftSemiJoin && n.pred.joinType != sqlbase.LeftAntiJoin {
for i := 0; i < n.pred.numRightCols; i++ {
joinToStreamColMap[n.pred.numLeftCols+i] = addOutCol(
uint32(n.pred.numLeftCols + rightPlanToStreamColMap[i]),
uint32(numAllLeftCols + rightPlanToStreamColMap[i]),
)
}
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/distsql_join
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,28 @@ https://cockroachdb.github.io/distsqlplan/decode.html#eJy8ll1v-jYUh-_3KaxzBatRsB
# SELECT url FROM [EXPLAIN (DISTSQL) (SELECT l.k, r.k FROM (SELECT * FROM distsql_mj_test ORDER BY k) l INNER JOIN (SELECT * FROM distsql_mj_test ORDER BY k) r ON l.k = r.k)]
# ----
# https://cockroachdb.github.io/distsqlplan/decode.html#eJyskk9Lw0AQxe9-CpmT0pVk06SHgJBrBVup3qSUmB3jSpqNsxtQSr-7JHtIE8z2D952Z-f35r1ldlAqgYt0ixriV-DAIII1g4pUhlorasq2aS6-IfYZyLKqTVNeM8gUIcQ7MNIUCDEs1J2qvAgYCDSpLNq2PQNVmw7SJs0R4tmeHQhzt_BL-lbgClOB5Pk9eahIblP6SYTURn8Vm-3nxqA2wGBZm_g64TDmgJ_j4FmRQfJ4f3jCJ6PywaUBg_8KOL0g4PT0gOGofKdal4oEEorhThxv-cPjI1KOD0qWSF7YN1rgu7lJ-OT2nmT-YY_dF7EkGI0R9WIc2fAV6kqVGk9acb_JgCJH-yda1ZThE6msHWOvy5ZrCwK1sa8ze5mX9qkxeAhzJxy44cAJRz2YD-GpEw7dk8MzJgdDOHLC_mDyen_1GwAA___yFZqv

# Regression test for incorrectly populating PlanToStreamColMap for the right
# side of merge joins (#51883).
statement ok
CREATE TABLE customer (c_custkey INT8 PRIMARY KEY);
CREATE TABLE orders (
o_orderkey INT8 PRIMARY KEY, o_custkey INT8 NOT NULL,
CONSTRAINT orders_fkey_customer FOREIGN KEY (o_custkey) REFERENCES customer (c_custkey)
);
CREATE TABLE lineitem (
l_orderkey INT8 NOT NULL,
l_linenumber INT8 NOT NULL,
l_quantity FLOAT8 NOT NULL,
PRIMARY KEY (l_orderkey, l_linenumber),
CONSTRAINT lineitem_fkey_orders FOREIGN KEY (l_orderkey) REFERENCES orders (o_orderkey)
);
ALTER TABLE lineitem SPLIT AT SELECT i FROM generate_series(1, 9) AS g(i);
ALTER TABLE lineitem EXPERIMENTAL_RELOCATE SELECT ARRAY[i%5+1], i FROM generate_series(0, 9) AS g(i)

# This query checks that there is no type mismatch during the vectorized flow
# setup which would occur if the stream column indices got messed up.
statement ok
SET vectorize=on;
EXPLAIN (VEC) SELECT sum(l_quantity) FROM customer, orders INNER MERGE JOIN lineitem ON o_orderkey = l_orderkey WHERE o_orderkey IN ( SELECT l_orderkey FROM lineitem GROUP BY l_orderkey HAVING sum(l_quantity) > 300) AND c_custkey = o_custkey;
RESET vectorize