Skip to content

Commit

Permalink
Cleanup TreeNode implementations (#8672)
Browse files Browse the repository at this point in the history
* Refactor TreeNode and cleanup some implementations

* More

* More

* Fix clippy

* avoid cloning in `TreeNode.children_nodes()` implementations where possible using `Cow`

* Remove more unnecessary apply_children

* Fix clippy

* Remove

---------

Co-authored-by: Peter Toth <[email protected]>
  • Loading branch information
viirya and peter-toth authored Jan 1, 2024
1 parent f0af5eb commit bf3bd92
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 204 deletions.
33 changes: 18 additions & 15 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! This module provides common traits for visiting or rewriting tree
//! data structures easily.

use std::borrow::Cow;
use std::sync::Arc;

use crate::Result;
Expand All @@ -32,7 +33,10 @@ use crate::Result;
/// [`PhysicalExpr`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.PhysicalExpr.html
/// [`LogicalPlan`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html
/// [`Expr`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html
pub trait TreeNode: Sized {
pub trait TreeNode: Sized + Clone {
/// Returns all children of the TreeNode
fn children_nodes(&self) -> Vec<Cow<Self>>;

/// Use preorder to iterate the node on the tree so that we can
/// stop fast for some cases.
///
Expand Down Expand Up @@ -211,7 +215,17 @@ pub trait TreeNode: Sized {
/// Apply the closure `F` to the node's children
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>;
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children_nodes() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
}

/// Apply transform `F` to the node's children, the transform `F` might have a direction(Preorder or Postorder)
fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -342,19 +356,8 @@ pub trait DynTreeNode {
/// Blanket implementation for Arc for any tye that implements
/// [`DynTreeNode`] (such as [`Arc<dyn PhysicalExpr>`])
impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.arc_children() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.arc_children().into_iter().map(Cow::Owned).collect()
}

fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down
32 changes: 6 additions & 26 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! according to the configuration), this rule increases partition counts in
//! the physical plan.

use std::borrow::Cow;
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
Expand All @@ -47,7 +48,7 @@ use crate::physical_plan::{
};

use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::map_columns_before_projection;
Expand Down Expand Up @@ -1409,18 +1410,8 @@ impl DistributionContext {
}

impl TreeNode for DistributionContext {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children_nodes.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -1483,19 +1474,8 @@ impl PlanWithKeyRequirements {
}

impl TreeNode for PlanWithKeyRequirements {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
33 changes: 6 additions & 27 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
//! in the physical plan. The first sort is unnecessary since its result is overwritten
//! by another [`SortExec`]. Therefore, this rule removes it from the physical plan.

use std::borrow::Cow;
use std::sync::Arc;

use crate::config::ConfigOptions;
Expand All @@ -57,7 +58,7 @@ use crate::physical_plan::{
with_new_children_if_necessary, Distribution, ExecutionPlan, InputOrderMode,
};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion_physical_plan::repartition::RepartitionExec;
Expand Down Expand Up @@ -145,19 +146,8 @@ impl PlanWithCorrespondingSort {
}

impl TreeNode for PlanWithCorrespondingSort {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children_nodes.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -237,19 +227,8 @@ impl PlanWithCorrespondingCoalescePartitions {
}

impl TreeNode for PlanWithCorrespondingCoalescePartitions {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children_nodes.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
18 changes: 4 additions & 14 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! infinite sources, if there are any. It will reject non-runnable query plans
//! that use pipeline-breaking operators on infinite input(s).

use std::borrow::Cow;
use std::sync::Arc;

use crate::config::ConfigOptions;
Expand All @@ -27,7 +28,7 @@ use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
Expand Down Expand Up @@ -91,19 +92,8 @@ impl PipelineStatePropagator {
}

impl TreeNode for PipelineStatePropagator {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! order-preserving variants when it is helpful; either in terms of
//! performance or to accommodate unbounded streams by fixing the pipeline.

use std::borrow::Cow;
use std::sync::Arc;

use super::utils::is_repartition;
Expand All @@ -29,7 +30,7 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_physical_plan::unbounded_output;

/// For a given `plan`, this object carries the information one needs from its
Expand Down Expand Up @@ -104,18 +105,8 @@ impl OrderPreservationContext {
}

impl TreeNode for OrderPreservationContext {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children_nodes.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
19 changes: 5 additions & 14 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::borrow::Cow;
use std::sync::Arc;

use crate::physical_optimizer::utils::{
Expand All @@ -28,7 +29,7 @@ use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, DataFusionError, JoinSide, Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
Expand Down Expand Up @@ -71,20 +72,10 @@ impl SortPushDown {
}

impl TreeNode for SortPushDown {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children_nodes.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
Expand Down
Loading

0 comments on commit bf3bd92

Please sign in to comment.