From 229a83bd82bf9157f1e8c54bce9ec1f14e58d558 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 27 Feb 2023 12:17:11 +0000 Subject: [PATCH] Parquet limit pushdown (#5404) --- datafusion/core/src/physical_plan/file_format/parquet.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 5bb03b4f42bb..e2d8cc94dcce 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -376,6 +376,7 @@ impl ExecutionPlan for ParquetExec { partition_index, projection: Arc::from(projection), batch_size: ctx.session_config().batch_size(), + limit: self.base_config.limit, predicate: self.predicate.clone(), pruning_predicate: self.pruning_predicate.clone(), page_pruning_predicate: self.page_pruning_predicate.clone(), @@ -460,6 +461,7 @@ struct ParquetOpener { partition_index: usize, projection: Arc<[usize]>, batch_size: usize, + limit: Option, predicate: Option>, pruning_predicate: Option>, page_pruning_predicate: Option>, @@ -500,6 +502,7 @@ impl FileOpener for ParquetOpener { let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; let enable_page_index = self.enable_page_index; + let limit = self.limit; Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); @@ -562,6 +565,10 @@ impl FileOpener for ParquetOpener { } } + if let Some(limit) = limit { + builder = builder.with_limit(limit) + } + let stream = builder .with_projection(mask) .with_batch_size(batch_size)