Skip to content

Commit

Permalink
Turning filter selectivity as a configurable parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
edmondop committed Nov 20, 2023
1 parent 4fb4b21 commit d2390a6
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 11 deletions.
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,10 @@ config_namespace! {
/// The maximum estimated size in bytes for one input side of a HashJoin
/// will be collected into a single partition
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024

/// The default filter selectivity used by Filter statistics
/// when an exact selectivity cannot be determined
pub default_filter_selectivity: u8, default = 20
}
}

Expand Down Expand Up @@ -877,6 +881,7 @@ config_field!(String);
config_field!(bool);
config_field!(usize);
config_field!(f64);
config_field!(u8);
config_field!(u64);

/// An implementation trait used to recursively walk configuration
Expand Down
8 changes: 6 additions & 2 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,12 @@ fn try_swapping_with_filter(
return Ok(None);
};

FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?)
.map(|e| Some(Arc::new(e) as _))
FilterExec::try_new(
new_predicate,
make_with_child(projection, filter.input())?,
filter.default_selectivity(),
)
.map(|e| Some(Arc::new(e) as _))
}

/// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,8 @@ impl DefaultPhysicalPlanner {
&input_schema,
session_state,
)?;
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?))
let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input, selectivity)?))
}
LogicalPlan::Union(Union { inputs, schema }) => {
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ impl TestParquetFile {
None,
));

let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
let exec =
Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec, 20)?);
Ok(exec)
} else {
Ok(Arc::new(ParquetExec::new(scan_config, None, None)))
Expand Down
25 changes: 19 additions & 6 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,26 @@ pub struct FilterExec {
input: Arc<dyn ExecutionPlan>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Selectivity for statistics. 0 = no rows, 100 all rows
default_selectivity: u8,
}

impl FilterExec {
/// Create a FilterExec on an input
pub fn try_new(
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
default_selectivity: u8,
) -> Result<Self> {
if default_selectivity > 100 {
return plan_err!("Default flter selectivity needs to be less than 100");
}
match predicate.data_type(input.schema().as_ref())? {
DataType::Boolean => Ok(Self {
predicate,
input: input.clone(),
metrics: ExecutionPlanMetricsSet::new(),
default_selectivity,
}),
other => {
plan_err!("Filter predicate must return boolean values, not {other:?}")
Expand All @@ -91,6 +98,11 @@ impl FilterExec {
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}

/// The default selectivity
pub fn default_selectivity(&self) -> u8 {
self.default_selectivity
}
}

impl DisplayAs for FilterExec {
Expand Down Expand Up @@ -166,8 +178,12 @@ impl ExecutionPlan for FilterExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
FilterExec::try_new(self.predicate.clone(), children.swap_remove(0))
.map(|e| Arc::new(e) as _)
FilterExec::try_new(
self.predicate.clone(),
children.swap_remove(0),
self.default_selectivity,
)
.map(|e| Arc::new(e) as _)
}

fn execute(
Expand Down Expand Up @@ -197,10 +213,7 @@ impl ExecutionPlan for FilterExec {
let input_stats = self.input.statistics()?;
let schema = self.schema();
if !check_support(predicate, &schema) {
// assume filter selects 20% of rows if we cannot do anything smarter
// tracking issue for making this configurable:
// https://github.com/apache/arrow-datafusion/issues/8133
let selectivity = 0.2_f32;
let selectivity = self.default_selectivity as f32 / 100.0;
let mut stats = input_stats.clone().into_inexact();
if let Precision::Inexact(n) = stats.num_rows {
stats.num_rows = Precision::Inexact((selectivity * n as f32) as usize);
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,7 @@ message PhysicalNegativeNode {
message FilterExecNode {
PhysicalPlanNode input = 1;
PhysicalExprNode expr = 2;
uint32 default_filter_selectivity = 3;
}

message FileGroup {
Expand Down
20 changes: 20 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,11 @@ impl AsExecutionPlan for PhysicalPlanNode {
.to_owned(),
)
})?;
Ok(Arc::new(FilterExec::try_new(predicate, input)?))
Ok(Arc::new(FilterExec::try_new(
predicate,
input,
filter.default_filter_selectivity,
)?))
}
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
parse_protobuf_file_scan_config(
Expand Down Expand Up @@ -898,6 +902,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
protobuf::FilterExecNode {
input: Some(Box::new(input)),
expr: Some(exec.predicate().clone().try_into()?),
default_filter_selectivity: exec.default_selectivity(),
},
))),
});
Expand Down

0 comments on commit d2390a6

Please sign in to comment.