Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return TableProviderFilterPushDown::Exact when Parquet Pushdown Enabled #12135

Merged
merged 24 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4e29e20
feat: Preemptively filter for pushdown-preventing columns in ListingT…
itsjunetime Aug 19, 2024
762f397
Fix behavior to make all previous tests work and lay groundwork for f…
itsjunetime Aug 20, 2024
30f75e0
fix: Add some more tests and fix small issue with pushdown specificity
itsjunetime Aug 23, 2024
d76fea3
test: Revive unneccesarily removed test
itsjunetime Aug 23, 2024
187a121
ci: Fix CI issues with different combinations of exprs
itsjunetime Aug 23, 2024
11c62dc
fix: run fmt
itsjunetime Aug 23, 2024
fb53778
Fix doc publicity issues
itsjunetime Aug 24, 2024
539b2e8
Add ::new fn for PushdownChecker
itsjunetime Aug 29, 2024
e13b89f
Remove unnecessary 'pub' qualifier
itsjunetime Aug 29, 2024
5e86c0b
Fix naming and doc comment of non_pushdown_columns to reflect what it…
itsjunetime Sep 3, 2024
6106ecd
fmt
itsjunetime Sep 3, 2024
c7d6211
Extend FileFormat trait to allow library users to define formats whic…
itsjunetime Sep 3, 2024
ba463b5
fmt
itsjunetime Sep 3, 2024
89f423a
fix: reference real fn in doc to fix CI
itsjunetime Sep 6, 2024
53b7046
Minor: Add tests for using FilterExec when parquet was pushed down
alamb Sep 6, 2024
5c29552
Update datafusion/core/src/datasource/file_format/mod.rs
alamb Sep 6, 2024
ef0affe
Pipe schema information through to TableScan and ParquetExec to facil…
itsjunetime Sep 13, 2024
b1ee813
- Remove collect::<(_, _)> to satisfy msrv
itsjunetime Sep 16, 2024
f8adb43
Add more details in comments for `map_partial_batch`
itsjunetime Sep 16, 2024
ec5f21b
Remove reference to issue #4028 as it will be closed
itsjunetime Sep 16, 2024
19f4310
Convert normal comments to doc-comments
itsjunetime Sep 16, 2024
9cfbb5d
Clarify meaning of word `projected` in comment
itsjunetime Sep 16, 2024
832f7e2
Clarify more how `table_schema` is used differently from `projected_t…
itsjunetime Sep 16, 2024
ca26f03
Finish partially-written comment about SchemaMapping struct
itsjunetime Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::physical_plan::{ExecutionPlan, Statistics};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_expr::Expr;
use datafusion_physical_expr::PhysicalExpr;

use async_trait::async_trait;
Expand Down Expand Up @@ -138,6 +139,33 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}

/// Check if the specified file format has support for pushing down the provided filters within
itsjunetime marked this conversation as resolved.
Show resolved Hide resolved
/// the given schemas. Added initially to support the Parquet file format's ability to do this.
fn supports_filters_pushdown(
&self,
_file_schema: &Schema,
_table_schema: &Schema,
_filters: &[&Expr],
) -> Result<FilePushdownSupport> {
Ok(FilePushdownSupport::NoSupport)
}
}

/// An enum to distinguish between different states when determining if certain filters can be
/// pushed down to file scanning
#[derive(Debug, PartialEq)]
pub enum FilePushdownSupport {
/// The file format/system being asked does not support any sort of pushdown. This should be
/// used even if the file format theoretically supports some sort of pushdown, but it's not
/// enabled or implemented yet.
NoSupport,
/// The file format/system being asked *does* support pushdown, but it can't make it work for
/// the provided filter/expression
NotSupportedForFilter,
/// The file format/system being asked *does* support pushdown and *can* make it work for the
/// provided filter/expression
Supported,
}

/// A container of [FileFormatFactory] which also implements [FileType].
Expand Down
28 changes: 26 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{
coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat,
FileFormatFactory, FileScanConfig,
FileFormatFactory, FilePushdownSupport, FileScanConfig,
};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
Expand All @@ -53,6 +53,7 @@ use datafusion_common::{
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_expr::Expr;
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;
Expand All @@ -78,7 +79,9 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;

use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use crate::datasource::physical_plan::parquet::{
can_expr_be_pushed_down_with_schemas, ParquetExecBuilder,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
Expand Down Expand Up @@ -414,6 +417,27 @@ impl FileFormat for ParquetFormat {
order_requirements,
)) as _)
}

fn supports_filters_pushdown(
&self,
file_schema: &Schema,
table_schema: &Schema,
filters: &[&Expr],
) -> Result<FilePushdownSupport> {
if !self.options().global.pushdown_filters {
return Ok(FilePushdownSupport::NoSupport);
}

let all_supported = filters.iter().all(|filter| {
can_expr_be_pushed_down_with_schemas(filter, file_schema, table_schema)
});

Ok(if all_supported {
FilePushdownSupport::Supported
} else {
FilePushdownSupport::NotSupportedForFilter
})
}
}

/// Fetches parquet metadata from ObjectStore for given object
Expand Down
110 changes: 54 additions & 56 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,66 +53,64 @@ use object_store::{ObjectMeta, ObjectStore};
/// was performed
pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
itsjunetime marked this conversation as resolved.
Show resolved Hide resolved
let mut is_applicable = true;
expr.apply(|expr| {
match expr {
Expr::Column(Column { ref name, .. }) => {
is_applicable &= col_names.contains(&name.as_str());
if is_applicable {
Ok(TreeNodeRecursion::Jump)
} else {
Ok(TreeNodeRecursion::Stop)
}
expr.apply(|expr| match expr {
Expr::Column(Column { ref name, .. }) => {
is_applicable &= col_names.contains(&name.as_str());
if is_applicable {
Ok(TreeNodeRecursion::Jump)
} else {
Ok(TreeNodeRecursion::Stop)
}
Expr::Literal(_)
| Expr::Alias(_)
| Expr::OuterReferenceColumn(_, _)
| Expr::ScalarVariable(_, _)
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsUnknown(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::IsNotUnknown(_)
| Expr::Negative(_)
| Expr::Cast { .. }
| Expr::TryCast { .. }
| Expr::BinaryExpr { .. }
| Expr::Between { .. }
| Expr::Like { .. }
| Expr::SimilarTo { .. }
| Expr::InList { .. }
| Expr::Exists { .. }
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::GroupingSet(_)
| Expr::Case { .. } => Ok(TreeNodeRecursion::Continue),

Expr::ScalarFunction(scalar_function) => {
match scalar_function.func.signature().volatility {
Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable | Volatility::Volatile => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
}
Expr::Literal(_)
| Expr::Alias(_)
| Expr::OuterReferenceColumn(_, _)
| Expr::ScalarVariable(_, _)
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsUnknown(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::IsNotUnknown(_)
| Expr::Negative(_)
| Expr::Cast(_)
| Expr::TryCast(_)
| Expr::BinaryExpr(_)
| Expr::Between(_)
| Expr::Like(_)
| Expr::SimilarTo(_)
| Expr::InList(_)
| Expr::Exists(_)
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::GroupingSet(_)
| Expr::Case(_) => Ok(TreeNodeRecursion::Continue),

Expr::ScalarFunction(scalar_function) => {
match scalar_function.func.signature().volatility {
Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable | Volatility::Volatile => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
}
}

// TODO other expressions are not handled yet:
// - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
// - ScalarVariable could be `applicable`, but that would require access to the context
Expr::AggregateFunction { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
| Expr::Placeholder(_) => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
// TODO other expressions are not handled yet:
// - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
// - ScalarVariable could be `applicable`, but that would require access to the context
Expr::AggregateFunction { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
| Expr::Placeholder(_) => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
})
.unwrap();
Expand Down
71 changes: 43 additions & 28 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
//! The table implementation.

use std::collections::HashMap;
use std::str::FromStr;
use std::{any::Any, sync::Arc};
use std::{any::Any, str::FromStr, sync::Arc};

use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
use super::PartitionedFile;
use super::{ListingTableUrl, PartitionedFile};

use super::ListingTableUrl;
use crate::datasource::{create_ordering, get_statistics_with_limit};
use crate::datasource::{
file_format::{file_compression_type::FileCompressionType, FileFormat},
create_ordering,
file_format::{
file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
},
get_statistics_with_limit,
physical_plan::{FileScanConfig, FileSinkConfig},
};
use crate::execution::context::SessionState;
Expand All @@ -43,8 +44,9 @@ use datafusion_common::{
config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
SchemaExt, ToDFSchema,
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_execution::cache::{
cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
};
use datafusion_physical_expr::{
create_physical_expr, LexOrdering, PhysicalSortRequirement,
};
Expand Down Expand Up @@ -789,19 +791,22 @@ impl TableProvider for ListingTable {
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

let filters = if let Some(expr) = conjunction(filters.to_vec()) {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters =
create_physical_expr(&expr, &table_df_schema, state.execution_props())?;
Some(filters)
} else {
None
};
let filters = conjunction(filters.to_vec())
.map(|expr| -> Result<_> {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters = create_physical_expr(
&expr,
&table_df_schema,
state.execution_props(),
)?;
Ok(Some(filters))
})
.unwrap_or(Ok(None))?;

let object_store_url = if let Some(url) = self.table_paths.first() {
url.object_store()
} else {
let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};

Expand All @@ -826,27 +831,37 @@ impl TableProvider for ListingTable {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(filters
filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
.map(|x| x.0.as_str())
.map(|col| col.0.as_str())
itsjunetime marked this conversation as resolved.
Show resolved Hide resolved
.collect::<Vec<_>>(),
filter,
) {
// if filter can be handled by partition pruning, it is exact
TableProviderFilterPushDown::Exact
} else {
// otherwise, we still might be able to handle the filter with file
// level mechanisms such as Parquet row group pruning.
TableProviderFilterPushDown::Inexact
return Ok(TableProviderFilterPushDown::Exact);
}

// if we can't push it down completely with only the filename-based/path-based
itsjunetime marked this conversation as resolved.
Show resolved Hide resolved
// column names, then we should check if we can do parquet predicate pushdown
let supports_pushdown = self.options.format.supports_filters_pushdown(
&self.file_schema,
&self.table_schema,
&[filter],
)?;

if supports_pushdown == FilePushdownSupport::Supported {
return Ok(TableProviderFilterPushDown::Exact);
}

Ok(TableProviderFilterPushDown::Inexact)
})
.collect())
.collect()
}

fn get_table_definition(&self) -> Option<&str> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl FileScanConfig {
(projected_schema, table_stats, projected_output_ordering)
}

#[allow(unused)] // Only used by avro
#[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro
pub(crate) fn projected_file_column_names(&self) -> Option<Vec<String>> {
self.projection.as_ref().map(|p| {
p.iter()
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ mod tests {
Field::new("c3", DataType::Float64, true),
]));

let adapter = DefaultSchemaAdapterFactory::default().create(table_schema.clone());
let adapter = DefaultSchemaAdapterFactory
.create(table_schema.clone(), table_schema.clone());

let file_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Expand Down Expand Up @@ -573,7 +574,7 @@ mod tests {

let indices = vec![1, 2, 4];
let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
let adapter = DefaultSchemaAdapterFactory::default().create(schema);
let adapter = DefaultSchemaAdapterFactory.create(schema, table_schema.clone());
let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();

let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use writer::plan_to_parquet;

/// Execution plan for reading one or more Parquet files.
Expand Down Expand Up @@ -405,6 +406,7 @@ impl ParquetExecBuilder {

let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();

let cache = ParquetExec::compute_properties(
projected_schema,
&projected_output_ordering,
Expand Down Expand Up @@ -707,7 +709,7 @@ impl ExecutionPlan for ParquetExec {
let schema_adapter_factory = self
.schema_adapter_factory
.clone()
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory::default()));
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));

let opener = ParquetOpener {
partition_index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ impl FileOpener for ParquetOpener {

let projected_schema =
SchemaRef::from(self.table_schema.project(&self.projection)?);
let schema_adapter = self.schema_adapter_factory.create(projected_schema);
let schema_adapter = self
.schema_adapter_factory
.create(projected_schema, self.table_schema.clone());
let predicate = self.predicate.clone();
let pruning_predicate = self.pruning_predicate.clone();
let page_pruning_predicate = self.page_pruning_predicate.clone();
Expand Down
Loading
Loading