diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index ba2072ecc151d..06e0529400c13 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -524,6 +524,10 @@ config_namespace! { /// The maximum estimated size in bytes for one input side of a HashJoin /// will be collected into a single partition pub hash_join_single_partition_threshold: usize, default = 1024 * 1024 + + /// The default filter selectivity used by Filter statistics + /// when an exact selectivity cannot be determined + pub default_filter_selectivity: u8, default = 20 } } @@ -877,6 +881,7 @@ config_field!(String); config_field!(bool); config_field!(usize); config_field!(f64); +config_field!(u8); config_field!(u64); /// An implementation trait used to recursively walk configuration diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 8e50492ae5e52..7ee167d23807f 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -302,8 +302,12 @@ fn try_swapping_with_filter( return Ok(None); }; - FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?) - .map(|e| Some(Arc::new(e) as _)) + FilterExec::try_new( + new_predicate, + make_with_child(projection, filter.input())?, + filter.default_selectivity(), + ) + .map(|e| Some(Arc::new(e) as _)) } /// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 9f9b529ace035..965b5316d0420 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -913,7 +913,8 @@ impl DefaultPhysicalPlanner { &input_schema, session_state, )?; - Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?)) + let selectivity = session_state.config().options().optimizer.default_filter_selectivity; + Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input, selectivity)?)) } LogicalPlan::Union(Union { inputs, schema }) => { let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?; diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 0d11526703b46..413ab24839ee6 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -178,7 +178,8 @@ impl TestParquetFile { None, )); - let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); + let exec = + Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec, 20)?); Ok(exec) } else { Ok(Arc::new(ParquetExec::new(scan_config, None, None))) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 822ddfdf3eb0a..6d07a4799e607 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -62,6 +62,8 @@ pub struct FilterExec { input: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// Selectivity for statistics. 0 = no rows, 100 all rows + default_selectivity: u8, } impl FilterExec { @@ -69,12 +71,17 @@ impl FilterExec { pub fn try_new( predicate: Arc, input: Arc, + default_selectivity: u8, ) -> Result { + if default_selectivity > 100 { + return plan_err!("Default flter selectivity needs to be less than 100"); + } match predicate.data_type(input.schema().as_ref())? { DataType::Boolean => Ok(Self { predicate, input: input.clone(), metrics: ExecutionPlanMetricsSet::new(), + default_selectivity, }), other => { plan_err!("Filter predicate must return boolean values, not {other:?}") @@ -91,6 +98,11 @@ impl FilterExec { pub fn input(&self) -> &Arc { &self.input } + + /// The default selectivity + pub fn default_selectivity(&self) -> u8 { + self.default_selectivity + } } impl DisplayAs for FilterExec { @@ -166,8 +178,12 @@ impl ExecutionPlan for FilterExec { self: Arc, mut children: Vec>, ) -> Result> { - FilterExec::try_new(self.predicate.clone(), children.swap_remove(0)) - .map(|e| Arc::new(e) as _) + FilterExec::try_new( + self.predicate.clone(), + children.swap_remove(0), + self.default_selectivity, + ) + .map(|e| Arc::new(e) as _) } fn execute( @@ -197,10 +213,7 @@ impl ExecutionPlan for FilterExec { let input_stats = self.input.statistics()?; let schema = self.schema(); if !check_support(predicate, &schema) { - // assume filter selects 20% of rows if we cannot do anything smarter - // tracking issue for making this configurable: - // https://github.com/apache/arrow-datafusion/issues/8133 - let selectivity = 0.2_f32; + let selectivity = self.default_selectivity as f32 / 100.0; let mut stats = input_stats.clone().into_inexact(); if let Precision::Inexact(n) = stats.num_rows { stats.num_rows = Precision::Inexact((selectivity * n as f32) as usize); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 793378a1ea87e..71494a0788ace 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1360,6 +1360,7 @@ message PhysicalNegativeNode { message FilterExecNode { PhysicalPlanNode input = 1; PhysicalExprNode expr = 2; + uint32 default_filter_selectivity = 3; } message FileGroup { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index a78da2a51c9d4..5c7e9acf77a56 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -7888,6 +7888,9 @@ impl serde::Serialize for FilterExecNode { if self.expr.is_some() { len += 1; } + if self.default_filter_selectivity != 0 { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FilterExecNode", len)?; if let Some(v) = self.input.as_ref() { struct_ser.serialize_field("input", v)?; @@ -7895,6 +7898,9 @@ impl serde::Serialize for FilterExecNode { if let Some(v) = self.expr.as_ref() { struct_ser.serialize_field("expr", v)?; } + if self.default_filter_selectivity != 0 { + struct_ser.serialize_field("defaultFilterSelectivity", &self.default_filter_selectivity)?; + } struct_ser.end() } } @@ -7907,12 +7913,15 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { const FIELDS: &[&str] = &[ "input", "expr", + "default_filter_selectivity", + "defaultFilterSelectivity", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { Input, Expr, + DefaultFilterSelectivity, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7936,6 +7945,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { match value { "input" => Ok(GeneratedField::Input), "expr" => Ok(GeneratedField::Expr), + "defaultFilterSelectivity" | "default_filter_selectivity" => Ok(GeneratedField::DefaultFilterSelectivity), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7957,6 +7967,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { { let mut input__ = None; let mut expr__ = None; + let mut default_filter_selectivity__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Input => { @@ -7971,11 +7982,20 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { } expr__ = map_.next_value()?; } + GeneratedField::DefaultFilterSelectivity => { + if default_filter_selectivity__.is_some() { + return Err(serde::de::Error::duplicate_field("defaultFilterSelectivity")); + } + default_filter_selectivity__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } } } Ok(FilterExecNode { input: input__, expr: expr__, + default_filter_selectivity: default_filter_selectivity__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 7b7b0afb92160..7551fb3df4246 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1908,6 +1908,8 @@ pub struct FilterExecNode { pub input: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(message, optional, tag = "2")] pub expr: ::core::option::Option, + #[prost(uint32, tag = "3")] + pub default_filter_selectivity: u32, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 1eedbe987ec14..b4d7ef91b8b35 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -157,7 +157,11 @@ impl AsExecutionPlan for PhysicalPlanNode { .to_owned(), ) })?; - Ok(Arc::new(FilterExec::try_new(predicate, input)?)) + Ok(Arc::new(FilterExec::try_new( + predicate, + input, + filter.default_filter_selectivity, + )?)) } PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new( parse_protobuf_file_scan_config( @@ -898,6 +902,7 @@ impl AsExecutionPlan for PhysicalPlanNode { protobuf::FilterExecNode { input: Some(Box::new(input)), expr: Some(exec.predicate().clone().try_into()?), + default_filter_selectivity: exec.default_selectivity(), }, ))), });