Skip to content

Commit

Permalink
Fix bug when pushing projection under joins (apache#11333)
Browse files Browse the repository at this point in the history
* Fix bug in `ProjectionPushdown`

* add order by

* Fix join on
  • Loading branch information
jonahgao authored and xinlifoobar committed Jul 18, 2024
1 parent 3562712 commit 8ba2fbb
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 19 deletions.
50 changes: 31 additions & 19 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{DataFusionError, JoinSide};
use datafusion_common::{internal_err, JoinSide};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::{
utils::collect_columns, Partitioning, PhysicalExpr, PhysicalExprRef,
Expand Down Expand Up @@ -640,6 +640,7 @@ fn try_pushdown_through_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
hash_join.on(),
hash_join.left().schema().fields().len(),
) else {
return Ok(None);
};
Expand All @@ -649,8 +650,7 @@ fn try_pushdown_through_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
hash_join.left(),
hash_join.right(),
hash_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
Expand Down Expand Up @@ -750,8 +750,7 @@ fn try_swapping_with_nested_loop_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
nl_join.left(),
nl_join.right(),
nl_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
Expand Down Expand Up @@ -806,6 +805,7 @@ fn try_swapping_with_sort_merge_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
sm_join.on(),
sm_join.left().schema().fields().len(),
) else {
return Ok(None);
};
Expand Down Expand Up @@ -859,6 +859,7 @@ fn try_swapping_with_sym_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
sym_join.on(),
sym_join.left().schema().fields().len(),
) else {
return Ok(None);
};
Expand All @@ -868,8 +869,7 @@ fn try_swapping_with_sym_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
sym_join.left(),
sym_join.right(),
sym_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
Expand Down Expand Up @@ -1090,6 +1090,7 @@ fn update_join_on(
proj_left_exprs: &[(Column, String)],
proj_right_exprs: &[(Column, String)],
hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
left_field_size: usize,
) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
// TODO: Clippy wants the "map" call removed, but doing so generates
// a compilation error. Remove the clippy directive once this
Expand All @@ -1100,8 +1101,9 @@ fn update_join_on(
.map(|(left, right)| (left, right))
.unzip();

let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs);
let new_right_columns = new_columns_for_join_on(&right_idx, proj_right_exprs);
let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
let new_right_columns =
new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);

match (new_left_columns, new_right_columns) {
(Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
Expand All @@ -1112,9 +1114,14 @@ fn update_join_on(
/// This function generates a new set of columns to be used in a hash join
/// operation based on a set of equi-join conditions (`hash_join_on`) and a
/// list of projection expressions (`projection_exprs`).
///
/// Notes: Column indices in the projection expressions are based on the join schema,
/// whereas the join on expressions are based on the join child schema. `column_index_offset`
/// represents the offset between them.
fn new_columns_for_join_on(
hash_join_on: &[&PhysicalExprRef],
projection_exprs: &[(Column, String)],
column_index_offset: usize,
) -> Option<Vec<PhysicalExprRef>> {
let new_columns = hash_join_on
.iter()
Expand All @@ -1130,6 +1137,8 @@ fn new_columns_for_join_on(
.enumerate()
.find(|(_, (proj_column, _))| {
column.name() == proj_column.name()
&& column.index() + column_index_offset
== proj_column.index()
})
.map(|(index, (_, alias))| Column::new(alias, index));
if let Some(new_column) = new_column {
Expand All @@ -1138,10 +1147,10 @@ fn new_columns_for_join_on(
// If the column is not found in the projection expressions,
// it means that the column is not projected. In this case,
// we cannot push the projection down.
Err(DataFusionError::Internal(format!(
internal_err!(
"Column {:?} not found in projection expressions",
column
)))
)
}
} else {
Ok(Transformed::no(expr))
Expand All @@ -1160,21 +1169,20 @@ fn update_join_filter(
projection_left_exprs: &[(Column, String)],
projection_right_exprs: &[(Column, String)],
join_filter: &JoinFilter,
join_left: &Arc<dyn ExecutionPlan>,
join_right: &Arc<dyn ExecutionPlan>,
left_field_size: usize,
) -> Option<JoinFilter> {
let mut new_left_indices = new_indices_for_join_filter(
join_filter,
JoinSide::Left,
projection_left_exprs,
join_left.schema(),
0,
)
.into_iter();
let mut new_right_indices = new_indices_for_join_filter(
join_filter,
JoinSide::Right,
projection_right_exprs,
join_right.schema(),
left_field_size,
)
.into_iter();

Expand Down Expand Up @@ -1204,20 +1212,24 @@ fn update_join_filter(
/// This function determines and returns a vector of indices representing the
/// positions of columns in `projection_exprs` that are involved in `join_filter`,
/// and correspond to a particular side (`join_side`) of the join operation.
///
/// Notes: Column indices in the projection expressions are based on the join schema,
/// whereas the join filter is based on the join child schema. `column_index_offset`
/// represents the offset between them.
fn new_indices_for_join_filter(
join_filter: &JoinFilter,
join_side: JoinSide,
projection_exprs: &[(Column, String)],
join_child_schema: SchemaRef,
column_index_offset: usize,
) -> Vec<usize> {
join_filter
.column_indices()
.iter()
.filter(|col_idx| col_idx.side == join_side)
.filter_map(|col_idx| {
projection_exprs.iter().position(|(col, _)| {
col.name() == join_child_schema.fields()[col_idx.index].name()
})
projection_exprs
.iter()
.position(|(col, _)| col_idx.index + column_index_offset == col.index())
})
.collect()
}
Expand Down
58 changes: 58 additions & 0 deletions datafusion/sqllogictest/test_files/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -986,3 +986,61 @@ DROP TABLE employees

statement ok
DROP TABLE department


# Test issue: https://github.com/apache/datafusion/issues/11269
statement ok
CREATE TABLE t1 (v0 BIGINT) AS VALUES (-503661263);

statement ok
CREATE TABLE t2 (v0 DOUBLE) AS VALUES (-1.663563947387);

statement ok
CREATE TABLE t3 (v0 DOUBLE) AS VALUES (0.05112015193508901);

query RR
SELECT t3.v0, t2.v0 FROM t1,t2,t3 WHERE t3.v0 >= t1.v0;
----
0.051120151935 -1.663563947387

statement ok
DROP TABLE t1;

statement ok
DROP TABLE t2;

statement ok
DROP TABLE t3;


# Test issue: https://github.com/apache/datafusion/issues/11275
statement ok
CREATE TABLE t0 (v1 BOOLEAN) AS VALUES (false), (null);

statement ok
CREATE TABLE t1 (v1 BOOLEAN) AS VALUES (false), (null), (false);

statement ok
CREATE TABLE t2 (v1 BOOLEAN) AS VALUES (false), (true);

query BB
SELECT t2.v1, t1.v1 FROM t0, t1, t2 WHERE t2.v1 IS DISTINCT FROM t0.v1 ORDER BY 1,2;
----
false false
false false
false NULL
true false
true false
true false
true false
true NULL
true NULL

statement ok
DROP TABLE t0;

statement ok
DROP TABLE t1;

statement ok
DROP TABLE t2;

0 comments on commit 8ba2fbb

Please sign in to comment.