Skip to content

Commit

Permalink
TreeNode refactor code deduplication: Part 3 (#8817)
Browse files Browse the repository at this point in the history
* Reduce code duplication

* fix doc tests

* addressing ozan's todos

* resolving merge conflicts

* remove test duplication

* Update enforce_sorting.rs

* tmp

* remove coalesce fix

* enforce dist and sort refactors

* Diff decreasing

* Review Part 1

* Review Part 2

* optimize tests

* Review Part 3

* remove new_default

* Review Part 4

* Review Part 5

* Review Part 6

* Review Part 7

* Resolve logical conflicts

* Review Part 8

* Remove clone from PlanContext

* renaming

* Remove deriving Clone on ExprContext

* Review Part 9

* Review Part 10

* Fix failing tests

* Review Part 11

* Stabilize previously unstable tests

* Review Part 12

* Adapt tests to upstream changes

* Review Part 13

* Review Part 14

* Fix import

* Move cross check inside assert optimized

* Move cross check to asset_optimized macro

* Retract aggregate topk tests (these will be solved with another PR)

* better code documentation

---------

Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Berkay Şahin <[email protected]>
  • Loading branch information
4 people authored Jan 27, 2024
1 parent 7a5f205 commit 9c4affe
Show file tree
Hide file tree
Showing 17 changed files with 1,479 additions and 1,785 deletions.
137 changes: 84 additions & 53 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,29 @@
//! This module provides common traits for visiting or rewriting tree
//! data structures easily.

use std::borrow::Cow;
use std::sync::Arc;

use crate::Result;

/// If the function returns [`VisitRecursion::Continue`], the normal execution of the
/// function continues. If it returns [`VisitRecursion::Skip`], the function returns
/// with [`VisitRecursion::Continue`] to jump next recursion step, bypassing further
/// exploration of the current step. In case of [`VisitRecursion::Stop`], the function
/// return with [`VisitRecursion::Stop`] and recursion halts.
#[macro_export]
macro_rules! handle_tree_recursion {
($EXPR:expr) => {
match $EXPR {
VisitRecursion::Continue => {}
// If the recursion should skip, do not apply to its children, let
// the recursion continue:
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
// If the recursion should stop, do not apply to its children:
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
};
}

/// Defines a visitable and rewriteable a tree node. This trait is
/// implemented for plans ([`ExecutionPlan`] and [`LogicalPlan`]) as
/// well as expression trees ([`PhysicalExpr`], [`Expr`]) in
Expand All @@ -33,27 +51,18 @@ use crate::Result;
/// [`PhysicalExpr`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.PhysicalExpr.html
/// [`LogicalPlan`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html
/// [`Expr`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html
pub trait TreeNode: Sized + Clone {
/// Returns all children of the TreeNode
fn children_nodes(&self) -> Vec<Cow<Self>>;

/// Use preorder to iterate the node on the tree so that we can
/// stop fast for some cases.
pub trait TreeNode: Sized {
/// Applies `op` to the node and its children. `op` is applied in a preoder way,
/// and it is controlled by [`VisitRecursion`], which means result of the `op`
/// on the self node can cause an early return.
///
/// The `op` closure can be used to collect some info from the
/// tree node or do some checking for the tree node.
fn apply<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
match op(self)? {
VisitRecursion::Continue => {}
// If the recursion should skip, do not apply to its children. And let the recursion continue
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
// If the recursion should stop, do not apply to its children
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
};

fn apply<F: FnMut(&Self) -> Result<VisitRecursion>>(
&self,
op: &mut F,
) -> Result<VisitRecursion> {
handle_tree_recursion!(op(self)?);
self.apply_children(&mut |node| node.apply(op))
}

Expand Down Expand Up @@ -89,22 +98,8 @@ pub trait TreeNode: Sized + Clone {
&self,
visitor: &mut V,
) -> Result<VisitRecursion> {
match visitor.pre_visit(self)? {
VisitRecursion::Continue => {}
// If the recursion should skip, do not apply to its children. And let the recursion continue
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
// If the recursion should stop, do not apply to its children
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
};

match self.apply_children(&mut |node| node.visit(visitor))? {
VisitRecursion::Continue => {}
// If the recursion should skip, do not apply to its children. And let the recursion continue
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
// If the recursion should stop, do not apply to its children
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}

handle_tree_recursion!(visitor.pre_visit(self)?);
handle_tree_recursion!(self.apply_children(&mut |node| node.visit(visitor))?);
visitor.post_visit(self)
}

Expand Down Expand Up @@ -148,7 +143,6 @@ pub trait TreeNode: Sized + Clone {
F: Fn(Self) -> Result<Transformed<Self>>,
{
let after_op_children = self.map_children(|node| node.transform_up(op))?;

let new_node = op(after_op_children)?.into();
Ok(new_node)
}
Expand All @@ -161,7 +155,6 @@ pub trait TreeNode: Sized + Clone {
F: FnMut(Self) -> Result<Transformed<Self>>,
{
let after_op_children = self.map_children(|node| node.transform_up_mut(op))?;

let new_node = op(after_op_children)?.into();
Ok(new_node)
}
Expand Down Expand Up @@ -215,17 +208,7 @@ pub trait TreeNode: Sized + Clone {
/// Apply the closure `F` to the node's children
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children_nodes() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
}
F: FnMut(&Self) -> Result<VisitRecursion>;

/// Apply transform `F` to the node's children, the transform `F` might have a direction(Preorder or Postorder)
fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -356,8 +339,15 @@ pub trait DynTreeNode {
/// Blanket implementation for Arc for any tye that implements
/// [`DynTreeNode`] (such as [`Arc<dyn PhysicalExpr>`])
impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.arc_children().into_iter().map(Cow::Owned).collect()
/// Apply the closure `F` to the node's children
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.arc_children() {
handle_tree_recursion!(op(&child)?)
}
Ok(VisitRecursion::Continue)
}

fn map_children<F>(self, transform: F) -> Result<Self>
Expand All @@ -366,12 +356,53 @@ impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
{
let children = self.arc_children();
if !children.is_empty() {
let new_children: Result<Vec<_>> =
children.into_iter().map(transform).collect();
let new_children =
children.into_iter().map(transform).collect::<Result<_>>()?;
let arc_self = Arc::clone(&self);
self.with_new_arc_children(arc_self, new_children?)
self.with_new_arc_children(arc_self, new_children)
} else {
Ok(self)
}
}
}

/// Instead of implementing [`TreeNode`], it's recommended to implement a [`ConcreteTreeNode`] for
/// trees that contain nodes with payloads. This approach ensures safe execution of algorithms
/// involving payloads, by enforcing rules for detaching and reattaching child nodes.
pub trait ConcreteTreeNode: Sized {
/// Provides read-only access to child nodes.
fn children(&self) -> Vec<&Self>;

/// Detaches the node from its children, returning the node itself and its detached children.
fn take_children(self) -> (Self, Vec<Self>);

/// Reattaches updated child nodes to the node, returning the updated node.
fn with_new_children(self, children: Vec<Self>) -> Result<Self>;
}

impl<T: ConcreteTreeNode> TreeNode for T {
/// Apply the closure `F` to the node's children
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
handle_tree_recursion!(op(child)?)
}
Ok(VisitRecursion::Continue)
}

fn map_children<F>(self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
let (new_self, children) = self.take_children();
if !children.is_empty() {
let new_children =
children.into_iter().map(transform).collect::<Result<_>>()?;
new_self.with_new_children(new_children)
} else {
Ok(new_self)
}
}
}
Loading

0 comments on commit 9c4affe

Please sign in to comment.