Skip to content

Commit

Permalink
Renaming API to be more consistent with struct value
Browse files Browse the repository at this point in the history
  • Loading branch information
edmondop committed Nov 21, 2023
1 parent 879987b commit 3609e1b
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 39 deletions.
14 changes: 5 additions & 9 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,16 +302,12 @@ fn try_swapping_with_filter(
return Ok(None);
};

FilterExec::try_new(
new_predicate,
make_with_child(projection, filter.input())?,
).and_then(
|e| {
FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?)
.and_then(|e| {
let selectivity = filter.default_selectivity();
e.with_selectivity(selectivity)
}
)
.map(|e| Some(Arc::new(e) as _))
e.with_default_selectivity(selectivity)
})
.map(|e| Some(Arc::new(e) as _))
}

/// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ impl DefaultPhysicalPlanner {
)?;
let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
let filter = FilterExec::try_new(runtime_expr, physical_input)?;
Ok(Arc::new(filter.with_selectivity(selectivity)?))
Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
}
LogicalPlan::Union(Union { inputs, schema }) => {
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ impl TestParquetFile {
None,
));

let exec =
Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
Ok(exec)
} else {
Ok(Arc::new(ParquetExec::new(scan_config, None, None)))
Expand Down
33 changes: 12 additions & 21 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ impl FilterExec {
}
}

pub fn with_selectivity(mut self, default_selectivity: u8) -> Result<Self, DataFusionError>{
pub fn with_default_selectivity(
mut self,
default_selectivity: u8,
) -> Result<Self, DataFusionError> {
if default_selectivity > 100 {
return plan_err!("Default flter selectivity needs to be less than 100");
}
Expand Down Expand Up @@ -182,16 +185,12 @@ impl ExecutionPlan for FilterExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
FilterExec::try_new(
self.predicate.clone(),
children.swap_remove(0),
).and_then(
|e| {
FilterExec::try_new(self.predicate.clone(), children.swap_remove(0))
.and_then(|e| {
let selectivity = e.default_selectivity();
e.with_selectivity(selectivity)
}
)
.map(|e| Arc::new(e) as _)
e.with_default_selectivity(selectivity)
})
.map(|e| Arc::new(e) as _)
}

fn execute(
Expand Down Expand Up @@ -221,15 +220,7 @@ impl ExecutionPlan for FilterExec {
let input_stats = self.input.statistics()?;
let schema = self.schema();
if !check_support(predicate, &schema) {
let selectivity = self.default_selectivity as f32 / 100.0;
let mut stats = input_stats.clone().into_inexact();
if let Precision::Inexact(n) = stats.num_rows {
stats.num_rows = Precision::Inexact((selectivity * n as f32) as usize);
}
if let Precision::Inexact(n) = stats.total_byte_size {
stats.total_byte_size =
Precision::Inexact((selectivity * n as f32) as usize);
}
let selectivity = self.default_selectivity as f64 / 100.0;
let mut stats = input_stats.into_inexact();
stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
stats.total_byte_size = stats
Expand Down Expand Up @@ -1025,7 +1016,7 @@ mod tests {
}

#[tokio::test]
async fn test_validation_filter_selectivity() -> Result<()>{
async fn test_validation_filter_selectivity() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
Expand All @@ -1038,7 +1029,7 @@ mod tests {
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let filter = FilterExec::try_new(predicate, input)?;
assert!(filter.with_selectivity(120).is_err());
assert!(filter.with_default_selectivity(120).is_err());
Ok(())
}
}
7 changes: 3 additions & 4 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
let filter_selectivity = filter.default_filter_selectivity.try_into();
let filter = FilterExec::try_new(predicate, input)?;
match filter_selectivity {
Ok(filter_selectivity) => {
Ok(Arc::new(filter.with_selectivity(filter_selectivity)?))
}
Ok(filter_selectivity) => Ok(Arc::new(
filter.with_default_selectivity(filter_selectivity)?,
)),
Err(_) => Err(DataFusionError::Internal(
"filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
)),
}

}
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
parse_protobuf_file_scan_config(
Expand Down
4 changes: 2 additions & 2 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan |
| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys |
| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory |
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
| datafusion.optimizer.default_filter_selectivity | 20 | The assumed filter selectivity in from 0 (no rows) to 100 (all rows) used when it is not possibl to determine exactly the number of rows returned by a filter
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.default_filter_selectivity | 20 | The assumed filter selectivity in from 0 (no rows) to 100 (all rows) used when it is not possibl to determine exactly the number of rows returned by a filter |
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
Expand Down

0 comments on commit 3609e1b

Please sign in to comment.