diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 8c6a9db7bd2a..b086b569f24c 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2241,9 +2241,10 @@ func (dsp *DistSQLPlanner) createPlanForJoin( } } + leftMap := leftPlan.PlanToStreamColMap rightMap := rightPlan.PlanToStreamColMap - post, joinToStreamColMap := joinOutColumns(n, leftPlan.PlanToStreamColMap, rightMap) - onExpr, err := remapOnExpr(planCtx, n, leftPlan.PlanToStreamColMap, rightMap) + post, joinToStreamColMap := joinOutColumns(n, leftMap, rightMap, len(leftPlan.ResultTypes)) + onExpr, err := remapOnExpr(planCtx, n, leftMap, rightMap) if err != nil { return PhysicalPlan{}, err } diff --git a/pkg/sql/distsql_plan_join.go b/pkg/sql/distsql_plan_join.go index 7ec5a429ace3..ec728bfdfc2d 100644 --- a/pkg/sql/distsql_plan_join.go +++ b/pkg/sql/distsql_plan_join.go @@ -104,7 +104,7 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin( joinType := n.joinType - post, joinToStreamColMap := joinOutColumns(n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap) + post, joinToStreamColMap := joinOutColumns(n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap, len(plans[0].ResultTypes)) onExpr, err := remapOnExpr(planCtx, n, plans[0].PlanToStreamColMap, plans[1].PlanToStreamColMap) if err != nil { return PhysicalPlan{}, false, err @@ -237,7 +237,7 @@ func (dsp *DistSQLPlanner) tryCreatePlanForInterleavedJoin( } func joinOutColumns( - n *joinNode, leftPlanToStreamColMap, rightPlanToStreamColMap []int, + n *joinNode, leftPlanToStreamColMap, rightPlanToStreamColMap []int, numAllLeftCols int, ) (post execinfrapb.PostProcessSpec, joinToStreamColMap []int) { joinToStreamColMap = makePlanToStreamColMap(len(n.columns)) post.Projection = true @@ -260,7 +260,7 @@ func joinOutColumns( if n.pred.joinType != sqlbase.LeftSemiJoin && n.pred.joinType != sqlbase.LeftAntiJoin { for i := 0; i < n.pred.numRightCols; i++ { joinToStreamColMap[n.pred.numLeftCols+i] = addOutCol( - uint32(n.pred.numLeftCols + rightPlanToStreamColMap[i]), + uint32(numAllLeftCols + rightPlanToStreamColMap[i]), ) } } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_join b/pkg/sql/opt/exec/execbuilder/testdata/distsql_join index b9e511169002..dc00c2aa6b4b 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_join +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_join @@ -197,3 +197,28 @@ https://cockroachdb.github.io/distsqlplan/decode.html#eJy8ll1v-jYUh-_3KaxzBatRsB # SELECT url FROM [EXPLAIN (DISTSQL) (SELECT l.k, r.k FROM (SELECT * FROM distsql_mj_test ORDER BY k) l INNER JOIN (SELECT * FROM distsql_mj_test ORDER BY k) r ON l.k = r.k)] # ---- # https://cockroachdb.github.io/distsqlplan/decode.html#eJyskk9Lw0AQxe9-CpmT0pVk06SHgJBrBVup3qSUmB3jSpqNsxtQSr-7JHtIE8z2D952Z-f35r1ldlAqgYt0ixriV-DAIII1g4pUhlorasq2aS6-IfYZyLKqTVNeM8gUIcQ7MNIUCDEs1J2qvAgYCDSpLNq2PQNVmw7SJs0R4tmeHQhzt_BL-lbgClOB5Pk9eahIblP6SYTURn8Vm-3nxqA2wGBZm_g64TDmgJ_j4FmRQfJ4f3jCJ6PywaUBg_8KOL0g4PT0gOGofKdal4oEEorhThxv-cPjI1KOD0qWSF7YN1rgu7lJ-OT2nmT-YY_dF7EkGI0R9WIc2fAV6kqVGk9acb_JgCJH-yda1ZThE6msHWOvy5ZrCwK1sa8ze5mX9qkxeAhzJxy44cAJRz2YD-GpEw7dk8MzJgdDOHLC_mDyen_1GwAA___yFZqv + +# Regression test for incorrectly populating PlanToStreamColMap for the right +# side of merge joins (#51883). +statement ok +CREATE TABLE customer (c_custkey INT8 PRIMARY KEY); +CREATE TABLE orders ( + o_orderkey INT8 PRIMARY KEY, o_custkey INT8 NOT NULL, + CONSTRAINT orders_fkey_customer FOREIGN KEY (o_custkey) REFERENCES customer (c_custkey) +); +CREATE TABLE lineitem ( + l_orderkey INT8 NOT NULL, + l_linenumber INT8 NOT NULL, + l_quantity FLOAT8 NOT NULL, + PRIMARY KEY (l_orderkey, l_linenumber), + CONSTRAINT lineitem_fkey_orders FOREIGN KEY (l_orderkey) REFERENCES orders (o_orderkey) +); +ALTER TABLE lineitem SPLIT AT SELECT i FROM generate_series(1, 9) AS g(i); +ALTER TABLE lineitem EXPERIMENTAL_RELOCATE SELECT ARRAY[i%5+1], i FROM generate_series(0, 9) AS g(i) + +# This query checks that there is no type mismatch during the vectorized flow +# setup which would occur if the stream column indices got messed up. +statement ok +SET vectorize=on; +EXPLAIN (VEC) SELECT sum(l_quantity) FROM customer, orders INNER MERGE JOIN lineitem ON o_orderkey = l_orderkey WHERE o_orderkey IN ( SELECT l_orderkey FROM lineitem GROUP BY l_orderkey HAVING sum(l_quantity) > 300) AND c_custkey = o_custkey; +RESET vectorize