From 3609e1b17fd17f3fd87009891775c29d70e94de1 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Mon, 20 Nov 2023 18:37:17 -0800 Subject: [PATCH] Renaming API to be more consistent with struct value --- .../physical_optimizer/projection_pushdown.rs | 14 +++----- datafusion/core/src/physical_planner.rs | 2 +- datafusion/core/src/test_util/parquet.rs | 3 +- datafusion/physical-plan/src/filter.rs | 33 +++++++------------ datafusion/proto/src/physical_plan/mod.rs | 7 ++-- docs/source/user-guide/configs.md | 4 +-- 6 files changed, 24 insertions(+), 39 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index c8add21bd78b7..66b144a7a00c6 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -302,16 +302,12 @@ fn try_swapping_with_filter( return Ok(None); }; - FilterExec::try_new( - new_predicate, - make_with_child(projection, filter.input())?, - ).and_then( - |e| { + FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?) + .and_then(|e| { let selectivity = filter.default_selectivity(); - e.with_selectivity(selectivity) - } - ) - .map(|e| Some(Arc::new(e) as _)) + e.with_default_selectivity(selectivity) + }) + .map(|e| Some(Arc::new(e) as _)) } /// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index fb88b0c78644c..afdc9ec2bd07a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -910,7 +910,7 @@ impl DefaultPhysicalPlanner { )?; let selectivity = session_state.config().options().optimizer.default_filter_selectivity; let filter = FilterExec::try_new(runtime_expr, physical_input)?; - Ok(Arc::new(filter.with_selectivity(selectivity)?)) + Ok(Arc::new(filter.with_default_selectivity(selectivity)?)) } LogicalPlan::Union(Union { inputs, schema }) => { let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?; diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 47bf8d6abcd73..f3c0d2987a46c 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -179,8 +179,7 @@ impl TestParquetFile { None, )); - let exec = - Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); + let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); Ok(exec) } else { Ok(Arc::new(ParquetExec::new(scan_config, None, None))) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 2392f984420a2..be3cbb12d94b6 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -85,7 +85,10 @@ impl FilterExec { } } - pub fn with_selectivity(mut self, default_selectivity: u8) -> Result{ + pub fn with_default_selectivity( + mut self, + default_selectivity: u8, + ) -> Result { if default_selectivity > 100 { return plan_err!("Default flter selectivity needs to be less than 100"); } @@ -182,16 +185,12 @@ impl ExecutionPlan for FilterExec { self: Arc, mut children: Vec>, ) -> Result> { - FilterExec::try_new( - self.predicate.clone(), - children.swap_remove(0), - ).and_then( - |e| { + FilterExec::try_new(self.predicate.clone(), children.swap_remove(0)) + .and_then(|e| { let selectivity = e.default_selectivity(); - e.with_selectivity(selectivity) - } - ) - .map(|e| Arc::new(e) as _) + e.with_default_selectivity(selectivity) + }) + .map(|e| Arc::new(e) as _) } fn execute( @@ -221,15 +220,7 @@ impl ExecutionPlan for FilterExec { let input_stats = self.input.statistics()?; let schema = self.schema(); if !check_support(predicate, &schema) { - let selectivity = self.default_selectivity as f32 / 100.0; - let mut stats = input_stats.clone().into_inexact(); - if let Precision::Inexact(n) = stats.num_rows { - stats.num_rows = Precision::Inexact((selectivity * n as f32) as usize); - } - if let Precision::Inexact(n) = stats.total_byte_size { - stats.total_byte_size = - Precision::Inexact((selectivity * n as f32) as usize); - } + let selectivity = self.default_selectivity as f64 / 100.0; let mut stats = input_stats.into_inexact(); stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); stats.total_byte_size = stats @@ -1025,7 +1016,7 @@ mod tests { } #[tokio::test] - async fn test_validation_filter_selectivity() -> Result<()>{ + async fn test_validation_filter_selectivity() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); let input = Arc::new(StatisticsExec::new( Statistics::new_unknown(&schema), @@ -1038,7 +1029,7 @@ mod tests { Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), )); let filter = FilterExec::try_new(predicate, input)?; - assert!(filter.with_selectivity(120).is_err()); + assert!(filter.with_default_selectivity(120).is_err()); Ok(()) } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index d8bd644c26bea..db088e8605ca5 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -162,14 +162,13 @@ impl AsExecutionPlan for PhysicalPlanNode { let filter_selectivity = filter.default_filter_selectivity.try_into(); let filter = FilterExec::try_new(predicate, input)?; match filter_selectivity { - Ok(filter_selectivity) => { - Ok(Arc::new(filter.with_selectivity(filter_selectivity)?)) - } + Ok(filter_selectivity) => Ok(Arc::new( + filter.with_default_selectivity(filter_selectivity)?, + )), Err(_) => Err(DataFusionError::Internal( "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(), )), } - } PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new( parse_protobuf_file_scan_config( diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 1e14682012d59..d317d0f0ba770 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -98,8 +98,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | | datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | | datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition -| datafusion.optimizer.default_filter_selectivity | 20 | The assumed filter selectivity in from 0 (no rows) to 100 (all rows) used when it is not possibl to determine exactly the number of rows returned by a filter +| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.optimizer.default_filter_selectivity | 20 | The assumed filter selectivity in from 0 (no rows) to 100 (all rows) used when it is not possibl to determine exactly the number of rows returned by a filter | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |