Skip to content

Commit

Permalink
fmt & clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
my-vegetable-has-exploded committed Feb 16, 2024
1 parent 1928003 commit 768e4e5
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 34 deletions.
29 changes: 16 additions & 13 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,22 @@ pub fn project_schema(
/// Checks if the given projection is valid for the given schema.
pub fn can_project(schema: &SchemaRef, projection: Option<&Vec<usize>>) -> Result<()> {
match projection {
Some(columns) =>
{
if columns.iter().max().map_or(false, |&i| i >= schema.fields().len()) {
Err(arrow_schema::ArrowError::SchemaError(format!(
"project index {} out of bounds, max field {}",
columns.iter().max().unwrap(),
schema.fields().len()
))
.into())
} else {
Ok(())
}
}
Some(columns) => {
if columns
.iter()
.max()
.map_or(false, |&i| i >= schema.fields().len())
{
Err(arrow_schema::ArrowError::SchemaError(format!(
"project index {} out of bounds, max field {}",
columns.iter().max().unwrap(),
schema.fields().len()
))
.into())
} else {
Ok(())
}
}
None => Ok(()),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ fn adjust_input_keys_ordering(
new_conditions.0,
filter.clone(),
join_type,
// TODO: although projection is not used in the join here, because project pushdow is the last rule. Maybe we need to handle it later. Same as filter.
// TODO: although projection is not used in the join here, because project pushdow is the last rule. Maybe we need to handle it later. Same as filter.
projection.clone(),
PartitionMode::Partitioned,
*null_equals_null,
Expand Down
32 changes: 21 additions & 11 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,22 @@ fn swap_join_type(join_type: JoinType) -> JoinType {
}
}

fn swap_join_projection(left_schema_len: usize, right_schema_len: usize, projection: Option<&Vec<usize>>) -> Option<Vec<usize>> {
projection.map(|p| {
p.iter().map(|i| {
if *i < left_schema_len {
*i + right_schema_len
} else {
*i - left_schema_len
}
}).collect()
})
fn swap_join_projection(
left_schema_len: usize,
right_schema_len: usize,
projection: Option<&Vec<usize>>,
) -> Option<Vec<usize>> {
projection.map(|p| {
p.iter()
.map(|i| {
if *i < left_schema_len {
*i + right_schema_len
} else {
*i - left_schema_len
}
})
.collect()
})
}

/// This function swaps the inputs of the given join operator.
Expand All @@ -168,7 +174,11 @@ fn swap_hash_join(
.collect(),
swap_join_filter(hash_join.filter()),
&swap_join_type(*hash_join.join_type()),
swap_join_projection(left.schema().fields().len(), right.schema().fields().len(), hash_join.projection.as_ref()),
swap_join_projection(
left.schema().fields().len(),
right.schema().fields().len(),
hash_join.projection.as_ref(),
),
partition_mode,
hash_join.null_equals_null(),
)?;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ fn try_pushdown_through_hash_join(
new_on,
new_filter,
hash_join.join_type(),
hash_join.projection.clone(),
hash_join.projection.clone(),
*hash_join.partition_mode(),
hash_join.null_equals_null,
)?)))
Expand Down Expand Up @@ -2412,7 +2412,7 @@ mod tests {
]),
)),
&JoinType::Inner,
None,
None,
PartitionMode::Auto,
true,
)?);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,7 @@ mod tests {
vec![(Arc::new(left_col.clone()), Arc::new(right_col.clone()))],
None,
&JoinType::Inner,
None,
None,
PartitionMode::Partitioned,
false,
)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ pub fn hash_join_exec(
on,
filter,
join_type,
None,
None,
PartitionMode::Partitioned,
true,
)?))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn run_join_test(
on_columns.clone(),
None,
&join_type,
None,
None,
PartitionMode::Partitioned,
false,
)
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ impl HashJoinExec {
///
/// # Error
/// This function errors when it is not possible to join the left and right sides on keys `on`.
#[allow(clippy::too_many_arguments)]
pub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub async fn partitioned_hash_join_with_filter(
on,
filter,
join_type,
None,
None,
PartitionMode::Partitioned,
null_equals_null,
)?);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
on,
filter,
&join_type.into(),
None,
None,
partition_mode,
hashjoin.null_equals_null,
)?))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ fn roundtrip_hash_join() -> Result<()> {
on.clone(),
None,
join_type,
// TODO: add a projectionexec for projection in the join
None,
// TODO: add a projectionexec for projection in the join
None,
*partition_mode,
false,
)?))?;
Expand Down

0 comments on commit 768e4e5

Please sign in to comment.