diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs index 19b862c127a2..e1f63ee7d39e 100644 --- a/datafusion/common/src/utils.rs +++ b/datafusion/common/src/utils.rs @@ -81,19 +81,22 @@ pub fn project_schema( /// Checks if the given projection is valid for the given schema. pub fn can_project(schema: &SchemaRef, projection: Option<&Vec>) -> Result<()> { match projection { - Some(columns) => - { - if columns.iter().max().map_or(false, |&i| i >= schema.fields().len()) { - Err(arrow_schema::ArrowError::SchemaError(format!( - "project index {} out of bounds, max field {}", - columns.iter().max().unwrap(), - schema.fields().len() - )) - .into()) - } else { - Ok(()) - } - } + Some(columns) => { + if columns + .iter() + .max() + .map_or(false, |&i| i >= schema.fields().len()) + { + Err(arrow_schema::ArrowError::SchemaError(format!( + "project index {} out of bounds, max field {}", + columns.iter().max().unwrap(), + schema.fields().len() + )) + .into()) + } else { + Ok(()) + } + } None => Ok(()), } } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index be98d3adadcf..0864e04b86d6 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -296,7 +296,7 @@ fn adjust_input_keys_ordering( new_conditions.0, filter.clone(), join_type, - // TODO: although projection is not used in the join here, because project pushdow is the last rule. Maybe we need to handle it later. Same as filter. + // TODO: although projection is not used in the join here, because project pushdow is the last rule. Maybe we need to handle it later. Same as filter. projection.clone(), PartitionMode::Partitioned, *null_equals_null, diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index d3eb0b05ceaf..2dff291bab54 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -139,16 +139,22 @@ fn swap_join_type(join_type: JoinType) -> JoinType { } } -fn swap_join_projection(left_schema_len: usize, right_schema_len: usize, projection: Option<&Vec>) -> Option> { - projection.map(|p| { - p.iter().map(|i| { - if *i < left_schema_len { - *i + right_schema_len - } else { - *i - left_schema_len - } - }).collect() - }) +fn swap_join_projection( + left_schema_len: usize, + right_schema_len: usize, + projection: Option<&Vec>, +) -> Option> { + projection.map(|p| { + p.iter() + .map(|i| { + if *i < left_schema_len { + *i + right_schema_len + } else { + *i - left_schema_len + } + }) + .collect() + }) } /// This function swaps the inputs of the given join operator. @@ -168,7 +174,11 @@ fn swap_hash_join( .collect(), swap_join_filter(hash_join.filter()), &swap_join_type(*hash_join.join_type()), - swap_join_projection(left.schema().fields().len(), right.schema().fields().len(), hash_join.projection.as_ref()), + swap_join_projection( + left.schema().fields().len(), + right.schema().fields().len(), + hash_join.projection.as_ref(), + ), partition_mode, hash_join.null_equals_null(), )?; diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 9362e3cefa46..090610284268 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -703,7 +703,7 @@ fn try_pushdown_through_hash_join( new_on, new_filter, hash_join.join_type(), - hash_join.projection.clone(), + hash_join.projection.clone(), *hash_join.partition_mode(), hash_join.null_equals_null, )?))) @@ -2412,7 +2412,7 @@ mod tests { ]), )), &JoinType::Inner, - None, + None, PartitionMode::Auto, true, )?); diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 1b99ab4cd849..8d895394557e 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -1443,7 +1443,7 @@ mod tests { vec![(Arc::new(left_col.clone()), Arc::new(right_col.clone()))], None, &JoinType::Inner, - None, + None, PartitionMode::Partitioned, false, ) diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 46214a05af37..6c3fca4340cb 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -222,7 +222,7 @@ pub fn hash_join_exec( on, filter, join_type, - None, + None, PartitionMode::Partitioned, true, )?)) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 9270fb723db6..e25f04dc4beb 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -153,7 +153,7 @@ async fn run_join_test( on_columns.clone(), None, &join_type, - None, + None, PartitionMode::Partitioned, false, ) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index a30b61b96324..4dfa7c4b2c43 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -310,6 +310,7 @@ impl HashJoinExec { /// /// # Error /// This function errors when it is not possible to join the left and right sides on keys `on`. + #[allow(clippy::too_many_arguments)] pub fn try_new( left: Arc, right: Arc, diff --git a/datafusion/physical-plan/src/joins/test_utils.rs b/datafusion/physical-plan/src/joins/test_utils.rs index df5388820350..9c492ac4d491 100644 --- a/datafusion/physical-plan/src/joins/test_utils.rs +++ b/datafusion/physical-plan/src/joins/test_utils.rs @@ -142,7 +142,7 @@ pub async fn partitioned_hash_join_with_filter( on, filter, join_type, - None, + None, PartitionMode::Partitioned, null_equals_null, )?); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 7d5f962330bb..160c296fcc82 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -589,7 +589,7 @@ impl AsExecutionPlan for PhysicalPlanNode { on, filter, &join_type.into(), - None, + None, partition_mode, hashjoin.null_equals_null, )?)) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 7ff6239e8ef0..85b5bc4778ea 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -217,8 +217,8 @@ fn roundtrip_hash_join() -> Result<()> { on.clone(), None, join_type, - // TODO: add a projectionexec for projection in the join - None, + // TODO: add a projectionexec for projection in the join + None, *partition_mode, false, )?))?;