Skip to content

Commit

Permalink
Refactor of Ordering and Prunability Traversals and States (#206)
Browse files Browse the repository at this point in the history
* refactor ordering and prunability graph traversals

* Simplifications

* Apply upstream updates

* Review

---------

Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
3 people authored and metesynnada committed Dec 15, 2023
1 parent b0f3633 commit 8bd6fca
Showing 1 changed file with 57 additions and 55 deletions.
112 changes: 57 additions & 55 deletions datafusion/physical-plan/src/joins/prunability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use datafusion_physical_expr::{
EquivalentClass, OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
SortProperties,
};
use itertools::Itertools;

/// Takes information about the join inputs (i.e. tables) and determines
/// which input can be pruned during the join operation.
Expand Down Expand Up @@ -92,15 +93,12 @@ pub fn is_filter_expr_prunable<
)
})?;

Ok(transformed_expr
.state
.map(|prunability_state| match prunability_state.prune_side {
TableSide::None => (false, false),
TableSide::Left => (true, false),
TableSide::Right => (false, true),
TableSide::Both => (true, true),
})
.unwrap_or((false, false)))
Ok(match transformed_expr.state.prune_side {
TableSide::None => (false, false),
TableSide::Left => (true, false),
TableSide::Right => (false, true),
TableSide::Both => (true, true),
})
}

/// Collects the expressions according to the given join side parameter,
Expand Down Expand Up @@ -187,44 +185,53 @@ struct PrunabilityState {
prune_side: TableSide,
}

impl Default for PrunabilityState {
fn default() -> Self {
Self {
sort_options: SortProperties::Unordered,
table_side: TableSide::None,
prune_side: TableSide::None,
}
}
}

/// When we aim to find the prunability of join tables with a predicate in the type of [`PhysicalExpr`],
/// a post-order propagation algorithm is run over that [`PhysicalExpr`]. During that propagation,
/// this struct provides the necessary information to calculate current node's state ([`PrunabilityState`]),
/// and stores the current node's.
/// this struct provides the necessary information to calculate current node's state ([`PrunabilityState`]).
#[derive(Debug)]
struct ExprPrunability {
expr: Arc<dyn PhysicalExpr>,
state: Option<PrunabilityState>,
children_states: Option<Vec<PrunabilityState>>,
state: PrunabilityState,
children_states: Vec<PrunabilityState>,
}

impl ExprPrunability {
/// Creates a new [`ExprPrunability`] with default states for `expr` and
/// its children.
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
let size = expr.children().len();
Self {
expr,
state: None,
children_states: None,
state: PrunabilityState::default(),
children_states: vec![PrunabilityState::default(); size],
}
}

fn children(&self) -> Vec<ExprPrunability> {
/// Updates this [`ExprPrunability`]'s children states with the given states.
pub fn with_new_children(mut self, children_states: Vec<PrunabilityState>) -> Self {
assert_eq!(self.children_states.len(), children_states.len());
self.children_states = children_states;
self
}

/// Creates new [`ExprPrunability`] objects for each child of the expression.
pub fn children_expr_prunabilities(&self) -> Vec<ExprPrunability> {
self.expr
.children()
.into_iter()
.map(ExprPrunability::new)
.collect()
}

pub fn new_with_children(
children_states: Vec<PrunabilityState>,
parent_expr: Arc<dyn PhysicalExpr>,
) -> Self {
Self {
expr: parent_expr,
state: None,
children_states: Some(children_states),
}
}
}

/// Indicates the table side information. It is either used for:
Expand Down Expand Up @@ -283,8 +290,9 @@ fn update_prunability<
return Ok(Transformed::Yes(node));
}

if let Some(children) = &node.children_states {
if !node.children_states.is_empty() {
// Handle the intermediate (non-leaf) node case:
let children = &node.children_states;
let children_sort_options = children
.iter()
.map(|prunability_state| prunability_state.sort_options)
Expand All @@ -308,11 +316,11 @@ fn update_prunability<
TableSide::None
};

node.state = Some(PrunabilityState {
node.state = PrunabilityState {
sort_options: parent_sort_options,
table_side: parent_table_side,
prune_side,
});
};
} else if let Some(column) = node.expr.as_any().downcast_ref::<Column>() {
// If we have a leaf node, it is either a Column or a Literal. Handle the former here:
let table_side = if left_indices
Expand Down Expand Up @@ -352,18 +360,18 @@ fn update_prunability<
_ => TableSide::None,
};

node.state = Some(PrunabilityState {
node.state = PrunabilityState {
sort_options: column_sort_options,
table_side,
prune_side,
});
};
} else {
// Last option, literal leaf:
node.state = Some(PrunabilityState {
node.state = PrunabilityState {
sort_options: node.expr.get_ordering(&[]),
table_side: TableSide::None,
prune_side: TableSide::None,
});
};
}
Ok(Transformed::Yes(node))
}
Expand All @@ -385,7 +393,7 @@ fn check_direct_matching(
.flatten()
.find(|(sort_expr, _)| sort_expr.expr.eq(&node.expr))
.map(|(sort_expr, side)| {
node.state = Some(PrunabilityState {
node.state = PrunabilityState {
sort_options: SortProperties::Ordered(sort_expr.options),
table_side: *side,
prune_side: if matches!(
Expand All @@ -401,7 +409,7 @@ fn check_direct_matching(
} else {
TableSide::None
},
});
};
true
})
.unwrap_or(false)
Expand Down Expand Up @@ -499,7 +507,7 @@ impl TreeNode for ExprPrunability {
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
for child in self.children_expr_prunabilities() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
Expand All @@ -513,26 +521,20 @@ impl TreeNode for ExprPrunability {
where
F: FnMut(Self) -> Result<Self>,
{
let children = self.children();
if children.is_empty() {
if self.children_states.is_empty() {
Ok(self)
} else {
let children_nodes = children
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;
Ok(ExprPrunability::new_with_children(
children_nodes
.iter()
.map(|c| {
c.state.clone().unwrap_or(PrunabilityState {
sort_options: SortProperties::Unordered,
table_side: TableSide::None,
prune_side: TableSide::None,
})
})
.collect(),
self.expr,
let child_expr_prunabilities = self.children_expr_prunabilities();
// After mapping over the children, the function `F` applies to the
// current object and updates its state.
Ok(self.with_new_children(
child_expr_prunabilities
.into_iter()
// Update children states after this transformation:
.map(transform)
// Extract the state (i.e. prunability) information:
.map_ok(|c| c.state)
.collect::<Result<Vec<_>>>()?,
))
}
}
Expand Down

0 comments on commit 8bd6fca

Please sign in to comment.