diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0db6fa634c08..be022a29318c 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -577,9 +577,15 @@ mod tests { use datafusion_common::ScalarValue; use futures::stream::BoxStream; use futures::StreamExt; + use log::error; use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::{GetResult, ListResult, MultipartId}; + use parquet::arrow::arrow_reader::ArrowReaderOptions; + use parquet::arrow::ParquetRecordBatchStreamBuilder; + use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex}; + use parquet::file::page_index::index::Index; + use tokio::fs::File; use tokio::io::AsyncWrite; #[tokio::test] @@ -1126,6 +1132,73 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_read_parquet_page_index() -> Result<()> { + let testdata = crate::test_util::parquet_test_data(); + let path = format!("{}/alltypes_tiny_pages.parquet", testdata); + let file = File::open(path).await.unwrap(); + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone()) + .await + .unwrap() + .metadata() + .clone(); + check_page_index_validation(builder.page_indexes(), builder.offset_indexes()); + + let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); + let file = File::open(path).await.unwrap(); + + let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options) + .await + .unwrap() + .metadata() + .clone(); + check_page_index_validation(builder.page_indexes(), builder.offset_indexes()); + + Ok(()) + } + + fn check_page_index_validation( + page_index: Option<&ParquetColumnIndex>, + offset_index: Option<&ParquetOffsetIndex>, + ) { + assert!(page_index.is_some()); + assert!(offset_index.is_some()); + + let page_index = page_index.unwrap(); + let offset_index = offset_index.unwrap(); + + // there is only one row group in one file. + assert_eq!(page_index.len(), 1); + assert_eq!(offset_index.len(), 1); + let page_index = page_index.get(0).unwrap(); + let offset_index = offset_index.get(0).unwrap(); + + // 13 col in one row group + assert_eq!(page_index.len(), 13); + assert_eq!(offset_index.len(), 13); + + // test result in int_col + let int_col_index = page_index.get(4).unwrap(); + let int_col_offset = offset_index.get(4).unwrap(); + + // 325 pages in int_col + assert_eq!(int_col_offset.len(), 325); + match int_col_index { + Index::INT32(index) => { + assert_eq!(index.indexes.len(), 325); + for min_max in index.clone().indexes { + assert!(min_max.min.is_some()); + assert!(min_max.max.is_some()); + assert!(min_max.null_count.is_some()); + } + } + _ => { + error!("fail to read page index.") + } + } + } fn assert_bytes_scanned(exec: Arc, expected: usize) { let actual = exec diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index ff6507c7f65e..68c902a40b50 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -57,6 +57,7 @@ use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use log::debug; use object_store::{ObjectMeta, ObjectStore}; +use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::basic::{ConvertedType, LogicalType}; @@ -78,6 +79,11 @@ pub struct ParquetScanOptions { /// If true, the generated `RowFilter` may reorder the predicate `Expr`s to try and optimize /// the cost of filter evaluation. reorder_predicates: bool, + /// If enabled, the reader will read the page index + /// This is used to optimise filter pushdown + /// via `RowSelector` and `RowFilter` by + /// eliminating unnecessary IO and decoding + enable_page_index: bool, } impl ParquetScanOptions { @@ -92,6 +98,12 @@ impl ParquetScanOptions { self.reorder_predicates = reorder_predicates; self } + + /// Set whether to read page index when reading parquet + pub fn with_page_index(mut self, page_index: bool) -> Self { + self.enable_page_index = page_index; + self + } } /// Execution plan for scanning one or more Parquet partitions @@ -393,9 +405,13 @@ impl FileOpener for ParquetOpener { let table_schema = self.table_schema.clone(); let reorder_predicates = self.scan_options.reorder_predicates; let pushdown_filters = self.scan_options.pushdown_filters; + let enable_page_index = self.scan_options.enable_page_index; Ok(Box::pin(async move { - let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?; + let options = ArrowReaderOptions::new().with_page_index(enable_page_index); + let mut builder = + ParquetRecordBatchStreamBuilder::new_with_options(reader, options) + .await?; let adapted_projections = schema_adapter.map_projections(builder.schema(), &projection)?; diff --git a/parquet-testing b/parquet-testing index ddd898958803..a11fc8f148f8 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit ddd898958803cb89b7156c6350584d1cda0fe8de +Subproject commit a11fc8f148f8a7a89d9281cc0da3eb9d56095fbf diff --git a/testing b/testing index a8f7be380531..5bab2f264a23 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit a8f7be380531758eb7962542a5eb020d8795aa20 +Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88