Skip to content

Commit

Permalink
[feat] Support using offset index in ParquetRecordBatchStream when pu… (
Browse files Browse the repository at this point in the history
#3616)

* [feat] Support using offset index in ParquetRecordBatchStream when pushing down RowFilter.

Signed-off-by: yangjiang <[email protected]>

* Update datafusion/core/src/physical_plan/file_format/parquet.rs

Co-authored-by: Raphael Taylor-Davies <[email protected]>

Signed-off-by: yangjiang <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
Ted-Jiang and tustvold authored Sep 28, 2022
1 parent 451e441 commit 87faf86
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 3 deletions.
73 changes: 73 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,15 @@ mod tests {
use datafusion_common::ScalarValue;
use futures::stream::BoxStream;
use futures::StreamExt;
use log::error;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{GetResult, ListResult, MultipartId};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::page_index::index::Index;
use tokio::fs::File;
use tokio::io::AsyncWrite;

#[tokio::test]
Expand Down Expand Up @@ -1126,6 +1132,73 @@ mod tests {

Ok(())
}
#[tokio::test]
async fn test_read_parquet_page_index() -> Result<()> {
let testdata = crate::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages.parquet", testdata);
let file = File::open(path).await.unwrap();
let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone())
.await
.unwrap()
.metadata()
.clone();
check_page_index_validation(builder.page_indexes(), builder.offset_indexes());

let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let file = File::open(path).await.unwrap();

let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
.await
.unwrap()
.metadata()
.clone();
check_page_index_validation(builder.page_indexes(), builder.offset_indexes());

Ok(())
}

fn check_page_index_validation(
page_index: Option<&ParquetColumnIndex>,
offset_index: Option<&ParquetOffsetIndex>,
) {
assert!(page_index.is_some());
assert!(offset_index.is_some());

let page_index = page_index.unwrap();
let offset_index = offset_index.unwrap();

// there is only one row group in one file.
assert_eq!(page_index.len(), 1);
assert_eq!(offset_index.len(), 1);
let page_index = page_index.get(0).unwrap();
let offset_index = offset_index.get(0).unwrap();

// 13 col in one row group
assert_eq!(page_index.len(), 13);
assert_eq!(offset_index.len(), 13);

// test result in int_col
let int_col_index = page_index.get(4).unwrap();
let int_col_offset = offset_index.get(4).unwrap();

// 325 pages in int_col
assert_eq!(int_col_offset.len(), 325);
match int_col_index {
Index::INT32(index) => {
assert_eq!(index.indexes.len(), 325);
for min_max in index.clone().indexes {
assert!(min_max.min.is_some());
assert!(min_max.max.is_some());
assert!(min_max.null_count.is_some());
}
}
_ => {
error!("fail to read page index.")
}
}
}

fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
let actual = exec
Expand Down
18 changes: 17 additions & 1 deletion datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use log::debug;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::basic::{ConvertedType, LogicalType};
Expand All @@ -78,6 +79,11 @@ pub struct ParquetScanOptions {
/// If true, the generated `RowFilter` may reorder the predicate `Expr`s to try and optimize
/// the cost of filter evaluation.
reorder_predicates: bool,
/// If enabled, the reader will read the page index
/// This is used to optimise filter pushdown
/// via `RowSelector` and `RowFilter` by
/// eliminating unnecessary IO and decoding
enable_page_index: bool,
}

impl ParquetScanOptions {
Expand All @@ -92,6 +98,12 @@ impl ParquetScanOptions {
self.reorder_predicates = reorder_predicates;
self
}

/// Set whether to read page index when reading parquet
pub fn with_page_index(mut self, page_index: bool) -> Self {
self.enable_page_index = page_index;
self
}
}

/// Execution plan for scanning one or more Parquet partitions
Expand Down Expand Up @@ -393,9 +405,13 @@ impl FileOpener for ParquetOpener {
let table_schema = self.table_schema.clone();
let reorder_predicates = self.scan_options.reorder_predicates;
let pushdown_filters = self.scan_options.pushdown_filters;
let enable_page_index = self.scan_options.enable_page_index;

Ok(Box::pin(async move {
let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);
let mut builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await?;
let adapted_projections =
schema_adapter.map_projections(builder.schema(), &projection)?;

Expand Down
2 changes: 1 addition & 1 deletion testing
Submodule testing updated 52 files
+ data/arrow-ipc-file/clusterfuzz-testcase-arrow-ipc-file-fuzz-5873085270589440
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5480145071243264
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5577412021190656
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5749190446153728
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5864855240835072.fuzz
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6023524637081600
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6177196536889344
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6318558565498880.fuzz
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-5298734406172672
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-5502930036326400
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6065820480962560.fuzz
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6537416932982784
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6598997234548736
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-stream-fuzz-4895056843112448
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6674891504484352
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-4757582821064704
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-4961281405222912
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-5281967462023168
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-6589380504977408.fuzz
+37 −0 data/avro/README.md
+ data/parquet/ARROW-17100.parquet
+24 −0 data/parquet/README.md
+ data/parquet/fuzzing/clusterfuzz-testcase-5913005913407488
+ data/parquet/fuzzing/clusterfuzz-testcase-6606237035003904
+ data/parquet/fuzzing/clusterfuzz-testcase-dictbitwidth-4680774947569664
+ data/parquet/fuzzing/clusterfuzz-testcase-dictbitwidth-5882232959270912
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4738122420715520
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4866999088447488
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4938338763669504
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5004902418481152
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5103039558582272
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5106889906585600
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5152654819459072.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5251250357141504
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5385788188131328
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5798108001337344
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5841507574743040
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5915095763386368
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6122962147737600.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6125206807642112.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6289584196026368
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6358005443592192
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6539993600884736
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6696667471020032
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5004902418481152
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5415048864989184
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5973249794637824
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-6196357887557632.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-6702965604876288
+ data/parquet/fuzzing/crash-61d6204d481340860da54e30f1937b67234ad0f7
+ data/parquet/fuzzing/crash-649c71a618ae2fd80cec177a9676eb3e280fc1fa
+ data/parquet/fuzzing/crash-9840a7b1a0d24996069f6ee0779bbe9875e8aca3

0 comments on commit 87faf86

Please sign in to comment.