Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
my-vegetable-has-exploded committed Feb 15, 2024
1 parent 1d8576a commit e909443
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 5 deletions.
23 changes: 23 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! projection reaches a source, it can even dissappear from the plan entirely.

use std::collections::HashMap;
use std::ops::Index;
use std::sync::Arc;

use super::output_requirements::OutputRequirementExec;
Expand Down Expand Up @@ -524,6 +525,28 @@ fn try_pushdown_through_union(
Ok(Some(Arc::new(UnionExec::new(new_children))))
}

fn try_embed_to_hash_join(
projection: &ProjectionExec,
hash_join: &HashJoinExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
return Ok(None);
};

if projection_as_columns.len() >= hash_join.schema().fields().len() {
return Ok(None);
}

let projection_index = projection_as_columns
.iter()
.map(|(c, _)| c.index())
.collect::<Vec<_>>();

let new_hash_join = hash_join.with_projection(projection_index)?;

Ok(Some(Arc::new(new_hash_join)))
}

/// Tries to push `projection` down through `hash_join`. If possible, performs the
/// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections
/// as its children. Otherwise, returns `None`.
Expand Down
48 changes: 43 additions & 5 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use crate::{handle_state, DisplayAs};
use crate::{handle_state, projection, DisplayAs};

use super::{
utils::{OnceAsync, OnceFut},
Expand All @@ -60,8 +60,8 @@ use arrow::util::bit_util;
use arrow_array::cast::downcast_array;
use arrow_schema::ArrowError;
use datafusion_common::{
internal_datafusion_err, internal_err, plan_err, DataFusionError, JoinSide, JoinType,
Result,
internal_datafusion_err, internal_err, plan_err, project_schema, DataFusionError,
JoinSide, JoinType, Result,
};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -408,6 +408,30 @@ impl HashJoinExec {
// In current implementation right side is always probe side.
JoinSide::Right
}

pub fn with_projection(&self, projection: Vec<usize>) -> Result<Self> {
let new_schema = project_schema(&self.schema, Some(&projection))?;
let new_column_indices = projection
.iter()
.map(|i| self.column_indices[*i].clone())
.collect();
Ok(Self {
left: self.left.clone(),
right: self.right.clone(),
on: self.on.clone(),
filter: self.filter.clone(),
join_type: self.join_type,
schema: new_schema,
left_fut: Default::default(),
random_state: self.random_state.clone(),
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
column_indices: new_column_indices,
null_equals_null: self.null_equals_null,
// Todo@wy to check output_order modification
output_order: self.output_order.clone(),
})
}
}

impl DisplayAs for HashJoinExec {
Expand All @@ -418,6 +442,20 @@ impl DisplayAs for HashJoinExec {
|| "".to_string(),
|f| format!(", filter={}", f.expression()),
);
let display_projections = if self.schema.fields.len()
!= build_join_schema(
&self.left.schema(),
&self.right.schema(),
&self.join_type,
)
.0
.fields
.len()
{
format!(", projection={:?}", self.schema)
} else {
"".to_string()
};
let on = self
.on
.iter()
Expand All @@ -426,8 +464,8 @@ impl DisplayAs for HashJoinExec {
.join(", ");
write!(
f,
"HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}",
self.mode, self.join_type, on, display_filter
"HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}",
self.mode, self.join_type, on, display_filter, display_projections
)
}
}
Expand Down

0 comments on commit e909443

Please sign in to comment.