diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 336513035036..4ebfe76726f5 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -636,6 +636,10 @@ config_namespace! { /// then the output will be coerced to a non-view. /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. pub expand_views_at_output: bool, default = false + + /// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections. + /// This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module. It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution. + pub optimize_projections_preserve_existing_projections: bool, default = false } } diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 53646dc5b468..dd1c16e3f42e 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -338,6 +338,21 @@ impl SessionConfig { self } + /// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections. + /// This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module. + /// It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution. + /// + /// [optimize_projections_preserve_existing_projections]: datafusion_common::config::OptimizerOptions::optimize_projections_preserve_existing_projections + pub fn with_optimize_projections_preserve_existing_projections( + mut self, + enabled: bool, + ) -> Self { + self.options + .optimizer + .optimize_projections_preserve_existing_projections = enabled; + self + } + /// Enables or disables the use of pruning predicate for parquet readers to skip row groups pub fn with_parquet_pruning(mut self, enabled: bool) -> Self { self.options.execution.parquet.pruning = enabled; diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 67d888abda52..e2a333c024fe 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -189,7 +189,7 @@ fn optimize_projections( // that its input only contains absolutely necessary columns for // the aggregate expressions. Note that necessary_indices refer to // fields in `aggregate.input.schema()`. - add_projection_on_top_if_helpful(aggregate_input, necessary_exprs) + add_projection_on_top_if_helpful(aggregate_input, necessary_exprs, config) })? .map_data(|aggregate_input| { // Create a new aggregate plan with the updated input and only the @@ -233,9 +233,14 @@ fn optimize_projections( // refers to that schema let required_exprs = required_indices.get_required_exprs(&input_schema); - let window_child = - add_projection_on_top_if_helpful(window_child, required_exprs)? - .data; + + let window_child = add_projection_on_top_if_helpful( + window_child, + required_exprs, + config, + )? + .data; + Window::try_new(new_window_expr, Arc::new(window_child)) .map(LogicalPlan::Window) .map(Transformed::yes) @@ -409,7 +414,7 @@ fn optimize_projections( optimize_projections(child, config, required_indices)?.transform_data( |new_input| { if projection_beneficial { - add_projection_on_top_if_helpful(new_input, project_exprs) + add_projection_on_top_if_helpful(new_input, project_exprs, config) } else { Ok(Transformed::no(new_input)) } @@ -708,6 +713,7 @@ fn split_join_requirements( /// /// * `plan` - The input `LogicalPlan` to potentially add a projection to. /// * `project_exprs` - A list of expressions for the projection. +/// * `config` - A reference to the optimizer configuration. /// /// # Returns /// @@ -715,9 +721,15 @@ fn split_join_requirements( fn add_projection_on_top_if_helpful( plan: LogicalPlan, project_exprs: Vec, + config: &dyn OptimizerConfig, ) -> Result> { // Make sure projection decreases the number of columns, otherwise it is unnecessary. - if project_exprs.len() >= plan.schema().fields().len() { + if config + .options() + .optimizer + .optimize_projections_preserve_existing_projections + || project_exprs.len() >= plan.schema().fields().len() + { Ok(Transformed::no(plan)) } else { Projection::try_new(project_exprs, Arc::new(plan)) @@ -759,7 +771,7 @@ fn rewrite_projection_given_requirements( // projection down optimize_projections(Arc::unwrap_or_clone(input), config, required_indices)? .transform_data(|input| { - if is_projection_unnecessary(&input, &exprs_used)? { + if is_projection_unnecessary(&input, &exprs_used, config)? { Ok(Transformed::yes(input)) } else { Projection::try_new(exprs_used, Arc::new(input)) @@ -770,9 +782,22 @@ fn rewrite_projection_given_requirements( } /// Projection is unnecessary, when +/// - `optimize_projections_preserve_existing_projections` optimizer config is false, and /// - input schema of the projection, output schema of the projection are same, and /// - all projection expressions are either Column or Literal -fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> Result { +fn is_projection_unnecessary( + input: &LogicalPlan, + proj_exprs: &[Expr], + config: &dyn OptimizerConfig, +) -> Result { + if config + .options() + .optimizer + .optimize_projections_preserve_existing_projections + { + return Ok(false); + } + let proj_schema = projection_schema(input, proj_exprs)?; Ok(&proj_schema == input.schema() && proj_exprs.iter().all(is_expr_trivial)) } @@ -789,11 +814,12 @@ mod tests { use crate::optimize_projections::OptimizeProjections; use crate::optimizer::Optimizer; use crate::test::{ - assert_fields_eq, assert_optimized_plan_eq, scan_empty, test_table_scan, - test_table_scan_fields, test_table_scan_with_name, + assert_fields_eq, assert_optimized_plan_eq, assert_optimized_plan_with_config_eq, + scan_empty, test_table_scan, test_table_scan_fields, test_table_scan_with_name, }; use crate::{OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::config::ConfigOptions; use datafusion_common::{ Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, }; @@ -1980,4 +2006,49 @@ mod tests { optimizer.optimize(plan, &OptimizerContext::new(), observe)?; Ok(optimized_plan) } + + #[test] + fn aggregate_filter_pushdown_preserve_projections() -> Result<()> { + let table_scan = test_table_scan()?; + let aggr_with_filter = count_udaf() + .call(vec![col("b")]) + .filter(col("c").gt(lit(42))) + .build()?; + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate( + vec![col("a")], + vec![count(col("b")), aggr_with_filter.alias("count2")], + )? + .project(vec![col("a"), col("count(test.b)"), col("count2")])? + .build()?; + + let expected_default = "Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\ + \n TableScan: test projection=[a, b, c]"; + + let expected_preserve_projections = "Projection: test.a, count(test.b), count2\ + \n Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\ + \n TableScan: test projection=[a, b, c]"; + + let scenarios = [ + (false, expected_default), + (true, expected_preserve_projections), + ]; + + for (preserve_projections, expected_plan) in scenarios.into_iter() { + let mut config = ConfigOptions::new(); + config + .optimizer + .optimize_projections_preserve_existing_projections = + preserve_projections; + let optimizer_context = OptimizerContext::new_with_options(config); + assert_optimized_plan_with_config_eq( + Arc::new(OptimizeProjections::new()), + plan.clone(), + expected_plan, + &optimizer_context, + )?; + } + + Ok(()) + } } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 975150cd6122..b3e5a51e8132 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -145,6 +145,7 @@ pub struct OptimizerContext { /// Alias generator used to generate unique aliases for subqueries alias_generator: Arc, + /// Configuration options for the optimizer options: ConfigOptions, } @@ -161,6 +162,15 @@ impl OptimizerContext { } } + /// Create optimizer config with the given configuration options + pub fn new_with_options(options: ConfigOptions) -> Self { + Self { + query_execution_start_time: Utc::now(), + alias_generator: Arc::new(AliasGenerator::new()), + options, + } + } + /// Specify whether to enable the filter_null_keys rule pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self { self.options.optimizer.filter_null_join_keys = filter_null_keys; diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 94d07a0791b3..3330e1f24c4d 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -17,7 +17,7 @@ use crate::analyzer::{Analyzer, AnalyzerRule}; use crate::optimizer::Optimizer; -use crate::{OptimizerContext, OptimizerRule}; +use crate::{OptimizerConfig, OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::{assert_contains, Result}; @@ -173,8 +173,17 @@ pub fn assert_optimized_plan_eq( // Apply the rule once let opt_context = OptimizerContext::new().with_max_passes(1); + assert_optimized_plan_with_config_eq(rule, plan, expected, &opt_context) +} + +pub fn assert_optimized_plan_with_config_eq( + rule: Arc, + plan: LogicalPlan, + expected: &str, + config: &dyn OptimizerConfig, +) -> Result<()> { let optimizer = Optimizer::with_rules(vec![Arc::clone(&rule)]); - let optimized_plan = optimizer.optimize(plan, &opt_context, observe)?; + let optimized_plan = optimizer.optimize(plan, config, observe)?; let formatted_plan = format!("{optimized_plan}"); assert_eq!(formatted_plan, expected); diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 8167ddacffb4..d9c1af1818a9 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -278,9 +278,11 @@ impl Unparser<'_> { ) -> Result<()> { match plan { LogicalPlan::TableScan(scan) => { - if let Some(unparsed_table_scan) = - Self::unparse_table_scan_pushdown(plan, None)? - { + if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown( + plan, + None, + select.already_projected(), + )? { return self.select_to_sql_recursively( &unparsed_table_scan, query, @@ -567,6 +569,7 @@ impl Unparser<'_> { let unparsed_table_scan = Self::unparse_table_scan_pushdown( plan, Some(plan_alias.alias.clone()), + select.already_projected(), )?; // if the child plan is a TableScan with pushdown operations, we don't need to // create an additional subquery for it @@ -696,6 +699,7 @@ impl Unparser<'_> { fn unparse_table_scan_pushdown( plan: &LogicalPlan, alias: Option, + already_projected: bool, ) -> Result> { match plan { LogicalPlan::TableScan(table_scan) => { @@ -725,24 +729,29 @@ impl Unparser<'_> { } } - if let Some(project_vec) = &table_scan.projection { - let project_columns = project_vec - .iter() - .cloned() - .map(|i| { - let schema = table_scan.source.schema(); - let field = schema.field(i); - if alias.is_some() { - Column::new(alias.clone(), field.name().clone()) - } else { - Column::new( - Some(table_scan.table_name.clone()), - field.name().clone(), - ) - } - }) - .collect::>(); - builder = builder.project(project_columns)?; + // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists. + // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection + // information included in the TableScan node. + if !already_projected { + if let Some(project_vec) = &table_scan.projection { + let project_columns = project_vec + .iter() + .cloned() + .map(|i| { + let schema = table_scan.source.schema(); + let field = schema.field(i); + if alias.is_some() { + Column::new(alias.clone(), field.name().clone()) + } else { + Column::new( + Some(table_scan.table_name.clone()), + field.name().clone(), + ) + } + }) + .collect::>(); + builder = builder.project(project_columns)?; + } } let filter_expr: Result> = table_scan @@ -787,14 +796,17 @@ impl Unparser<'_> { Self::unparse_table_scan_pushdown( &subquery_alias.input, Some(subquery_alias.alias.clone()), + already_projected, ) } // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns]. // The inner table scan could be a scan with pushdown operations. LogicalPlan::Projection(projection) => { - if let Some(plan) = - Self::unparse_table_scan_pushdown(&projection.input, alias.clone())? - { + if let Some(plan) = Self::unparse_table_scan_pushdown( + &projection.input, + alias.clone(), + already_projected, + )? { let exprs = if alias.is_some() { let mut alias_rewriter = alias.as_ref().map(|alias_name| TableAliasRewriter { diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 669f9f06f035..d1e921110072 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -882,6 +882,7 @@ fn test_table_scan_pushdown() -> Result<()> { let query_from_table_scan_with_projection = LogicalPlanBuilder::from( table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?, ) + .project(vec![col("id"), col("age")])? .project(vec![wildcard()])? .build()?; let query_from_table_scan_with_projection = diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 84d18233d572..ad5346544dcf 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -233,6 +233,7 @@ datafusion.optimizer.filter_null_join_keys false datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 datafusion.optimizer.max_passes 3 +datafusion.optimizer.optimize_projections_preserve_existing_projections false datafusion.optimizer.prefer_existing_sort false datafusion.optimizer.prefer_existing_union false datafusion.optimizer.prefer_hash_join true @@ -326,6 +327,7 @@ datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan +datafusion.optimizer.optimize_projections_preserve_existing_projections false When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections. This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module. It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution. datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6a49fda668a9..349a1a52d18b 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -115,6 +115,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | +| datafusion.optimizer.optimize_projections_preserve_existing_projections | false | When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections. This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module. It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution. | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |