Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unparsing plans after applying optimize_projections rule #13267

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,10 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false

/// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections.
/// This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module. It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution.
pub optimize_projections_preserve_existing_projections: bool, default = false
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might it be better to make this flag more generic, for example, just preserve_existing_projections or prefer_existing_plan_nodes, so it can be reused in the future in similar cases

}
}

Expand Down
15 changes: 15 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,21 @@ impl SessionConfig {
self
}

/// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections.
/// This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module.
/// It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution.
///
/// [optimize_projections_preserve_existing_projections]: datafusion_common::config::OptimizerOptions::optimize_projections_preserve_existing_projections
pub fn with_optimize_projections_preserve_existing_projections(
mut self,
enabled: bool,
) -> Self {
self.options
.optimizer
.optimize_projections_preserve_existing_projections = enabled;
self
}

/// Enables or disables the use of pruning predicate for parquet readers to skip row groups
pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.pruning = enabled;
Expand Down
91 changes: 81 additions & 10 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ fn optimize_projections(
// that its input only contains absolutely necessary columns for
// the aggregate expressions. Note that necessary_indices refer to
// fields in `aggregate.input.schema()`.
add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)
add_projection_on_top_if_helpful(aggregate_input, necessary_exprs, config)
})?
.map_data(|aggregate_input| {
// Create a new aggregate plan with the updated input and only the
Expand Down Expand Up @@ -233,9 +233,14 @@ fn optimize_projections(
// refers to that schema
let required_exprs =
required_indices.get_required_exprs(&input_schema);
let window_child =
add_projection_on_top_if_helpful(window_child, required_exprs)?
.data;

let window_child = add_projection_on_top_if_helpful(
window_child,
required_exprs,
config,
)?
.data;

Window::try_new(new_window_expr, Arc::new(window_child))
.map(LogicalPlan::Window)
.map(Transformed::yes)
Expand Down Expand Up @@ -409,7 +414,7 @@ fn optimize_projections(
optimize_projections(child, config, required_indices)?.transform_data(
|new_input| {
if projection_beneficial {
add_projection_on_top_if_helpful(new_input, project_exprs)
add_projection_on_top_if_helpful(new_input, project_exprs, config)
} else {
Ok(Transformed::no(new_input))
}
Expand Down Expand Up @@ -708,16 +713,23 @@ fn split_join_requirements(
///
/// * `plan` - The input `LogicalPlan` to potentially add a projection to.
/// * `project_exprs` - A list of expressions for the projection.
/// * `config` - A reference to the optimizer configuration.
///
/// # Returns
///
/// A `Transformed` indicating if a projection was added
fn add_projection_on_top_if_helpful(
plan: LogicalPlan,
project_exprs: Vec<Expr>,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
// Make sure projection decreases the number of columns, otherwise it is unnecessary.
if project_exprs.len() >= plan.schema().fields().len() {
if config
.options()
.optimizer
.optimize_projections_preserve_existing_projections
|| project_exprs.len() >= plan.schema().fields().len()
{
Ok(Transformed::no(plan))
} else {
Projection::try_new(project_exprs, Arc::new(plan))
Expand Down Expand Up @@ -759,7 +771,7 @@ fn rewrite_projection_given_requirements(
// projection down
optimize_projections(Arc::unwrap_or_clone(input), config, required_indices)?
.transform_data(|input| {
if is_projection_unnecessary(&input, &exprs_used)? {
if is_projection_unnecessary(&input, &exprs_used, config)? {
Ok(Transformed::yes(input))
} else {
Projection::try_new(exprs_used, Arc::new(input))
Expand All @@ -770,9 +782,22 @@ fn rewrite_projection_given_requirements(
}

/// Projection is unnecessary, when
/// - `optimize_projections_preserve_existing_projections` optimizer config is false, and
/// - input schema of the projection, output schema of the projection are same, and
/// - all projection expressions are either Column or Literal
fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> Result<bool> {
fn is_projection_unnecessary(
input: &LogicalPlan,
proj_exprs: &[Expr],
config: &dyn OptimizerConfig,
) -> Result<bool> {
if config
.options()
.optimizer
.optimize_projections_preserve_existing_projections
{
return Ok(false);
}

let proj_schema = projection_schema(input, proj_exprs)?;
Ok(&proj_schema == input.schema() && proj_exprs.iter().all(is_expr_trivial))
}
Expand All @@ -789,11 +814,12 @@ mod tests {
use crate::optimize_projections::OptimizeProjections;
use crate::optimizer::Optimizer;
use crate::test::{
assert_fields_eq, assert_optimized_plan_eq, scan_empty, test_table_scan,
test_table_scan_fields, test_table_scan_with_name,
assert_fields_eq, assert_optimized_plan_eq, assert_optimized_plan_with_config_eq,
scan_empty, test_table_scan, test_table_scan_fields, test_table_scan_with_name,
};
use crate::{OptimizerContext, OptimizerRule};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{
Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference,
};
Expand Down Expand Up @@ -1980,4 +2006,49 @@ mod tests {
optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
Ok(optimized_plan)
}

#[test]
fn aggregate_filter_pushdown_preserve_projections() -> Result<()> {
let table_scan = test_table_scan()?;
let aggr_with_filter = count_udaf()
.call(vec![col("b")])
.filter(col("c").gt(lit(42)))
.build()?;
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(
vec![col("a")],
vec![count(col("b")), aggr_with_filter.alias("count2")],
)?
.project(vec![col("a"), col("count(test.b)"), col("count2")])?
.build()?;

let expected_default = "Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
\n TableScan: test projection=[a, b, c]";

let expected_preserve_projections = "Projection: test.a, count(test.b), count2\
\n Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
\n TableScan: test projection=[a, b, c]";

let scenarios = [
(false, expected_default),
(true, expected_preserve_projections),
];

for (preserve_projections, expected_plan) in scenarios.into_iter() {
let mut config = ConfigOptions::new();
config
.optimizer
.optimize_projections_preserve_existing_projections =
preserve_projections;
let optimizer_context = OptimizerContext::new_with_options(config);
assert_optimized_plan_with_config_eq(
Arc::new(OptimizeProjections::new()),
plan.clone(),
expected_plan,
&optimizer_context,
)?;
}

Ok(())
}
}
10 changes: 10 additions & 0 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ pub struct OptimizerContext {
/// Alias generator used to generate unique aliases for subqueries
alias_generator: Arc<AliasGenerator>,

/// Configuration options for the optimizer
options: ConfigOptions,
}

Expand All @@ -161,6 +162,15 @@ impl OptimizerContext {
}
}

/// Create optimizer config with the given configuration options
pub fn new_with_options(options: ConfigOptions) -> Self {
Self {
query_execution_start_time: Utc::now(),
alias_generator: Arc::new(AliasGenerator::new()),
options,
}
}

/// Specify whether to enable the filter_null_keys rule
pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
self.options.optimizer.filter_null_join_keys = filter_null_keys;
Expand Down
13 changes: 11 additions & 2 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::analyzer::{Analyzer, AnalyzerRule};
use crate::optimizer::Optimizer;
use crate::{OptimizerContext, OptimizerRule};
use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{assert_contains, Result};
Expand Down Expand Up @@ -173,8 +173,17 @@ pub fn assert_optimized_plan_eq(
// Apply the rule once
let opt_context = OptimizerContext::new().with_max_passes(1);

assert_optimized_plan_with_config_eq(rule, plan, expected, &opt_context)
}

pub fn assert_optimized_plan_with_config_eq(
rule: Arc<dyn OptimizerRule + Send + Sync>,
plan: LogicalPlan,
expected: &str,
config: &dyn OptimizerConfig,
) -> Result<()> {
let optimizer = Optimizer::with_rules(vec![Arc::clone(&rule)]);
let optimized_plan = optimizer.optimize(plan, &opt_context, observe)?;
let optimized_plan = optimizer.optimize(plan, config, observe)?;
let formatted_plan = format!("{optimized_plan}");
assert_eq!(formatted_plan, expected);

Expand Down
60 changes: 36 additions & 24 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,11 @@ impl Unparser<'_> {
) -> Result<()> {
match plan {
LogicalPlan::TableScan(scan) => {
if let Some(unparsed_table_scan) =
Self::unparse_table_scan_pushdown(plan, None)?
{
if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
plan,
None,
select.already_projected(),
)? {
return self.select_to_sql_recursively(
&unparsed_table_scan,
query,
Expand Down Expand Up @@ -567,6 +569,7 @@ impl Unparser<'_> {
let unparsed_table_scan = Self::unparse_table_scan_pushdown(
plan,
Some(plan_alias.alias.clone()),
select.already_projected(),
)?;
// if the child plan is a TableScan with pushdown operations, we don't need to
// create an additional subquery for it
Expand Down Expand Up @@ -696,6 +699,7 @@ impl Unparser<'_> {
fn unparse_table_scan_pushdown(
plan: &LogicalPlan,
alias: Option<TableReference>,
already_projected: bool,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::TableScan(table_scan) => {
Expand Down Expand Up @@ -725,24 +729,29 @@ impl Unparser<'_> {
}
}

if let Some(project_vec) = &table_scan.projection {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let schema = table_scan.source.schema();
let field = schema.field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(
Some(table_scan.table_name.clone()),
field.name().clone(),
)
}
})
.collect::<Vec<_>>();
builder = builder.project(project_columns)?;
// Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
// For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
// information included in the TableScan node.
if !already_projected {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prevents from generating queries like SELECT a, b from (SELECT a, b from my_table).

if let Some(project_vec) = &table_scan.projection {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let schema = table_scan.source.schema();
let field = schema.field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(
Some(table_scan.table_name.clone()),
field.name().clone(),
)
}
})
.collect::<Vec<_>>();
builder = builder.project(project_columns)?;
}
}

let filter_expr: Result<Option<Expr>> = table_scan
Expand Down Expand Up @@ -787,14 +796,17 @@ impl Unparser<'_> {
Self::unparse_table_scan_pushdown(
&subquery_alias.input,
Some(subquery_alias.alias.clone()),
already_projected,
)
}
// SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
// The inner table scan could be a scan with pushdown operations.
LogicalPlan::Projection(projection) => {
if let Some(plan) =
Self::unparse_table_scan_pushdown(&projection.input, alias.clone())?
{
if let Some(plan) = Self::unparse_table_scan_pushdown(
&projection.input,
alias.clone(),
already_projected,
)? {
let exprs = if alias.is_some() {
let mut alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,7 @@ fn test_table_scan_pushdown() -> Result<()> {
let query_from_table_scan_with_projection = LogicalPlanBuilder::from(
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?,
)
.project(vec![col("id"), col("age")])?
.project(vec![wildcard()])?
Copy link
Member Author

@sgrebnov sgrebnov Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this actually be a real plan with a wildcard projection and a TableScan that includes only two columns? I would expect them to match. If this is a real use case I will improve logic above (check for parent projection is a wildcard or does not match). Running all TPC-H and TPC-DS queries I've not found query where it was the case.
https://github.com/apache/datafusion/pull/13267/files#r1830100022

.build()?;
let query_from_table_scan_with_projection =
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ datafusion.optimizer.filter_null_join_keys false
datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
datafusion.optimizer.max_passes 3
datafusion.optimizer.optimize_projections_preserve_existing_projections false
datafusion.optimizer.prefer_existing_sort false
datafusion.optimizer.prefer_existing_union false
datafusion.optimizer.prefer_hash_join true
Expand Down Expand Up @@ -326,6 +327,7 @@ datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
datafusion.optimizer.optimize_projections_preserve_existing_projections false When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections. This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module. It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution.
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave
datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
Expand Down
Loading