Skip to content

Commit

Permalink
Optimized propagation of empty relations apache#10290
Browse files Browse the repository at this point in the history
  • Loading branch information
demetribu committed May 1, 2024
1 parent d3237b2 commit 95eb2d6
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 55 deletions.
141 changes: 86 additions & 55 deletions datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
// under the License.

//! [`PropagateEmptyRelation`] eliminates nodes fed by `EmptyRelation`
use datafusion_common::{plan_err, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{EmptyRelation, JoinType, Projection, Union};

use std::sync::Arc;

use datafusion_common::tree_node::Transformed;
use datafusion_common::JoinType::Inner;
use datafusion_common::{internal_err, plan_err, Result};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{EmptyRelation, Projection, Union};

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};

Expand All @@ -38,32 +43,58 @@ impl PropagateEmptyRelation {
impl OptimizerRule for PropagateEmptyRelation {
fn try_optimize(
&self,
plan: &LogicalPlan,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called PropagateEmptyRelation::rewrite")
}

fn name(&self) -> &str {
"propagate_empty_relation"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::EmptyRelation(_) => {}
LogicalPlan::EmptyRelation(_) => Ok(Transformed::no(plan)),
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_)
| LogicalPlan::Window(_)
| LogicalPlan::Sort(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Repartition(_)
| LogicalPlan::Limit(_) => {
if let Some(empty) = empty_child(plan)? {
return Ok(Some(empty));
let empty = empty_child(&plan)?;
if let Some(empty_plan) = empty {
return Ok(Transformed::yes(empty_plan));
}
Ok(Transformed::no(plan))
}
LogicalPlan::CrossJoin(_) => {
let (left_empty, right_empty) = binary_plan_children_is_empty(plan)?;
LogicalPlan::CrossJoin(ref join) => {
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
if left_empty || right_empty {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: plan.schema().clone(),
})));
return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
EmptyRelation {
produce_one_row: false,
schema: plan.schema().clone(),
},
)));
}
Ok(Transformed::no(LogicalPlan::CrossJoin(join.clone())))
}
LogicalPlan::Join(join) => {

LogicalPlan::Join(ref join) if join.join_type == Inner => {
// TODO: For Join, more join type need to be careful:
// For LeftOuter/LeftSemi/LeftAnti Join, only the left side is empty, the Join result is empty.
// For LeftSemi Join, if the right side is empty, the Join result is empty.
Expand All @@ -76,17 +107,26 @@ impl OptimizerRule for PropagateEmptyRelation {
// columns + right side columns replaced with null values.
// For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
// columns + left side columns replaced with null values.
if join.join_type == JoinType::Inner {
let (left_empty, right_empty) = binary_plan_children_is_empty(plan)?;
if left_empty || right_empty {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
if left_empty || right_empty {
return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
EmptyRelation {
produce_one_row: false,
schema: plan.schema().clone(),
})));
schema: join.schema.clone(),
},
)));
}
Ok(Transformed::no(LogicalPlan::Join(join.clone())))
}
LogicalPlan::Aggregate(ref agg) => {
if !agg.group_expr.is_empty() {
if let Some(empty_plan) = empty_child(&plan)? {
return Ok(Transformed::yes(empty_plan));
}
}
Ok(Transformed::no(LogicalPlan::Aggregate(agg.clone())))
}
LogicalPlan::Union(union) => {
LogicalPlan::Union(ref union) => {
let new_inputs = union
.inputs
.iter()
Expand All @@ -98,49 +138,36 @@ impl OptimizerRule for PropagateEmptyRelation {
.collect::<Vec<_>>();

if new_inputs.len() == union.inputs.len() {
return Ok(None);
Ok(Transformed::no(plan))
} else if new_inputs.is_empty() {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: plan.schema().clone(),
})));
Ok(Transformed::yes(LogicalPlan::EmptyRelation(
EmptyRelation {
produce_one_row: false,
schema: plan.schema().clone(),
},
)))
} else if new_inputs.len() == 1 {
let child = (*new_inputs[0]).clone();
let child = unwrap_arc(new_inputs[0].clone());
if child.schema().eq(plan.schema()) {
return Ok(Some(child));
Ok(Transformed::yes(child))
} else {
return Ok(Some(LogicalPlan::Projection(
Ok(Transformed::yes(LogicalPlan::Projection(
Projection::new_from_schema(
Arc::new(child),
plan.schema().clone(),
),
)));
)))
}
} else {
return Ok(Some(LogicalPlan::Union(Union {
Ok(Transformed::yes(LogicalPlan::Union(Union {
inputs: new_inputs,
schema: union.schema.clone(),
})));
}
}
LogicalPlan::Aggregate(agg) => {
if !agg.group_expr.is_empty() {
if let Some(empty) = empty_child(plan)? {
return Ok(Some(empty));
}
})))
}
}
_ => {}
}
Ok(None)
}

fn name(&self) -> &str {
"propagate_empty_relation"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
_ => Ok(Transformed::no(plan)),
}
}
}

Expand Down Expand Up @@ -182,18 +209,22 @@ fn empty_child(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};

use datafusion_common::{Column, DFSchema, JoinType, ScalarValue};
use datafusion_expr::logical_plan::table_scan;
use datafusion_expr::{
binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Expr, Operator,
};

use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_nested_union::EliminateNestedUnion;
use crate::test::{
assert_optimized_plan_eq, assert_optimized_plan_eq_with_rules, test_table_scan,
test_table_scan_fields, test_table_scan_with_name,
};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::logical_plan::table_scan;
use datafusion_expr::{
binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Expr, Operator,
};

use super::*;

Expand Down
27 changes: 27 additions & 0 deletions datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};

use datafusion_common::config::ConfigOptions;
use datafusion_common::{plan_err, Result};
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
Expand Down Expand Up @@ -290,6 +291,32 @@ fn eliminate_nested_filters() {
assert_eq!(expected, format!("{plan:?}"));
}

#[test]
fn test_propagate_empty_relation_inner_join_and_unions() {
let sql = "\
SELECT A.col_int32 FROM test AS A \
INNER JOIN ( \
SELECT col_int32 FROM test WHERE 1 = 0 \
) AS B ON A.col_int32 = B.col_int32 \
UNION ALL \
SELECT test.col_int32 FROM test WHERE 1 = 1 \
UNION ALL \
SELECT test.col_int32 FROM test WHERE 0 = 0 \
UNION ALL \
SELECT test.col_int32 FROM test WHERE test.col_int32 < 0 \
UNION ALL \
SELECT test.col_int32 FROM test WHERE 1 = 0";

let plan = test_sql(sql).unwrap();
let expected = "\
Union\
\n TableScan: test projection=[col_int32]\
\n TableScan: test projection=[col_int32]\
\n Filter: test.col_int32 < Int32(0)\
\n TableScan: test projection=[col_int32]";
assert_eq!(expected, format!("{plan:?}"));
}

fn test_sql(sql: &str) -> Result<LogicalPlan> {
// parse the SQL
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
Expand Down

0 comments on commit 95eb2d6

Please sign in to comment.