Skip to content

Commit

Permalink
Pipe schema information through to TableScan and ParquetExec to facil…
Browse files Browse the repository at this point in the history
…itate unnecessary FilterExec removal
  • Loading branch information
itsjunetime committed Sep 13, 2024
1 parent 5c29552 commit ef0affe
Show file tree
Hide file tree
Showing 20 changed files with 195 additions and 169 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{
coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat,
FilePushdownSupport, FileFormatFactory, FileScanConfig,
FileFormatFactory, FilePushdownSupport, FileScanConfig,
};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
Expand Down
27 changes: 15 additions & 12 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,19 +792,22 @@ impl TableProvider for ListingTable {
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

let filters = if let Some(expr) = conjunction(filters.to_vec()) {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters =
create_physical_expr(&expr, &table_df_schema, state.execution_props())?;
Some(filters)
} else {
None
};
let filters = conjunction(filters.to_vec())
.map(|expr| -> Result<_> {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters = create_physical_expr(
&expr,
&table_df_schema,
state.execution_props(),
)?;
Ok(Some(filters))
})
.unwrap_or(Ok(None))?;

let object_store_url = if let Some(url) = self.table_paths.first() {
url.object_store()
} else {
let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl FileScanConfig {
(projected_schema, table_stats, projected_output_ordering)
}

#[allow(unused)] // Only used by avro
#[cfg_attr(not(feature = "avro"), expect(unused))] // Only used by avro
pub(crate) fn projected_file_column_names(&self) -> Option<Vec<String>> {
self.projection.as_ref().map(|p| {
p.iter()
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ mod tests {
Field::new("c3", DataType::Float64, true),
]));

let adapter = DefaultSchemaAdapterFactory::default().create(table_schema.clone());
let adapter = DefaultSchemaAdapterFactory
.create(table_schema.clone(), table_schema.clone());

let file_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Expand Down Expand Up @@ -573,7 +574,7 @@ mod tests {

let indices = vec![1, 2, 4];
let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
let adapter = DefaultSchemaAdapterFactory::default().create(schema);
let adapter = DefaultSchemaAdapterFactory.create(schema, table_schema.clone());
let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();

let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ impl ParquetExecBuilder {

let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();

let cache = ParquetExec::compute_properties(
projected_schema,
&projected_output_ordering,
Expand Down Expand Up @@ -708,7 +709,7 @@ impl ExecutionPlan for ParquetExec {
let schema_adapter_factory = self
.schema_adapter_factory
.clone()
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory::default()));
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));

let opener = ParquetOpener {
partition_index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ impl FileOpener for ParquetOpener {

let projected_schema =
SchemaRef::from(self.table_schema.project(&self.projection)?);
let schema_adapter = self.schema_adapter_factory.create(projected_schema);
let schema_adapter = self
.schema_adapter_factory
.create(projected_schema, self.table_schema.clone());
let predicate = self.predicate.clone();
let pruning_predicate = self.pruning_predicate.clone();
let page_pruning_predicate = self.page_pruning_predicate.clone();
Expand Down
29 changes: 13 additions & 16 deletions datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ struct FilterCandidateBuilder<'a> {
/// The schema of the table (merged schema) -- columns may be in different
/// order than in the file and have columns that are not in the file schema
table_schema: &'a Schema,
required_column_indices: BTreeSet<usize>,
}

impl<'a> FilterCandidateBuilder<'a> {
Expand All @@ -250,7 +249,6 @@ impl<'a> FilterCandidateBuilder<'a> {
expr,
file_schema,
table_schema,
required_column_indices: BTreeSet::default(),
}
}

Expand All @@ -262,23 +260,20 @@ impl<'a> FilterCandidateBuilder<'a> {
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
/// * `Err(e)` if an error occurs while building the candidate
pub fn build(self, metadata: &ParquetMetaData) -> Result<Option<FilterCandidate>> {
let Some((projection, rewritten_expr)) = pushdown_columns(
Arc::clone(&self.expr),
self.file_schema,
self.table_schema,
)?
let Some((required_indices, rewritten_expr)) =
pushdown_columns(self.expr, self.file_schema, self.table_schema)?
else {
return Ok(None);
};

let required_bytes = size_of_columns(&self.required_column_indices, metadata)?;
let can_use_index = columns_sorted(&self.required_column_indices, metadata)?;
let required_bytes = size_of_columns(&required_indices, metadata)?;
let can_use_index = columns_sorted(&required_indices, metadata)?;

Ok(Some(FilterCandidate {
expr: rewritten_expr,
required_bytes,
can_use_index,
projection,
projection: required_indices.into_iter().collect(),
}))
}
}
Expand Down Expand Up @@ -383,7 +378,7 @@ impl<'schema> TreeNodeRewriter for PushdownChecker<'schema> {
}
}

type ProjectionAndExpr = (Vec<usize>, Arc<dyn PhysicalExpr>);
type ProjectionAndExpr = (BTreeSet<usize>, Arc<dyn PhysicalExpr>);

// Checks if a given expression can be pushed down into `ParquetExec` as opposed to being evaluated
// post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns returns all the
Expand All @@ -398,8 +393,7 @@ fn pushdown_columns(

let expr = expr.rewrite(&mut checker).data()?;

Ok((!checker.prevents_pushdown())
.then(|| (checker.required_column_indices.into_iter().collect(), expr)))
Ok((!checker.prevents_pushdown()).then_some((checker.required_column_indices, expr)))
}

/// creates a PushdownChecker for a single use to check a given column with the given schemes. Used
Expand Down Expand Up @@ -539,11 +533,13 @@ pub fn build_row_filter(
// Determine which conjuncts can be evaluated as ArrowPredicates, if any
let mut candidates: Vec<FilterCandidate> = predicates
.into_iter()
.flat_map(|expr| {
.map(|expr| {
FilterCandidateBuilder::new(expr.clone(), file_schema, table_schema)
.build(metadata)
.unwrap_or_default()
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten()
.collect();

// no candidates
Expand Down Expand Up @@ -679,8 +675,9 @@ mod test {
false,
)]);

let table_ref = Arc::new(table_schema.clone());
let schema_adapter =
DefaultSchemaAdapterFactory {}.create(Arc::new(table_schema.clone()));
DefaultSchemaAdapterFactory.create(Arc::clone(&table_ref), table_ref);
let (schema_mapping, _) = schema_adapter
.map_schema(&file_schema)
.expect("creating schema mapping");
Expand Down
Loading

0 comments on commit ef0affe

Please sign in to comment.