diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 2a83b22b4e76..a95a50d32e0f 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -249,7 +249,7 @@ impl SchemaAdapter { let projected_schema = Arc::new(self.table_schema.clone().project(projections)?); - let merged_batch = RecordBatch::try_new(projected_schema, cols)?; + let merged_batch = RecordBatch::try_new(projected_schema, cols.clone())?; Ok(merged_batch) } diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index e19384925e75..063023b5488d 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -25,7 +25,7 @@ use std::{any::Any, convert::TryInto}; use crate::datasource::object_store::ObjectStore; use crate::datasource::PartitionedFile; -use crate::field_util::{FieldExt, SchemaExt}; +use crate::field_util::SchemaExt; use crate::record_batch::RecordBatch; use crate::{ error::{DataFusionError, Result}, @@ -40,7 +40,6 @@ use crate::{ }, scalar::ScalarValue, }; -use arrow::array::new_null_array; use arrow::error::ArrowError; use arrow::{ array::ArrayRef, @@ -463,7 +462,8 @@ fn read_partition( limit: Option, mut partition_column_projector: PartitionColumnProjector, ) -> Result<()> { - for partitioned_file in partition { + let mut total_rows = 0; + 'outer: for partitioned_file in partition { debug!("Reading file {}", &partitioned_file.file_meta.path()); let file_metrics = ParquetFileMetrics::new( @@ -494,32 +494,16 @@ fn read_partition( ))); } - let read_schema = Arc::new(file_schema.clone().project(&adapted_projections)?); - - let total_cols = &file_schema.fields().len(); + let read_schema = record_reader.schema().clone(); for chunk_r in record_reader { match chunk_r { Ok(chunk) => { - let mut cols: Vec = Vec::with_capacity(*total_cols); - let batch_cols = chunk.columns().to_vec(); - - for field_idx in projection { - let merged_field = &file_schema.fields()[*field_idx]; - if let Some((batch_idx, _name)) = - read_schema.column_with_name(merged_field.name()) - { - cols.push(batch_cols[batch_idx].clone()); - } else { - cols.push( - new_null_array( - merged_field.data_type().clone(), - chunk.len(), - ) - .into(), - ) - } - } - let batch = RecordBatch::try_new(read_schema.clone(), cols)?; + total_rows += chunk.len(); + + let batch = RecordBatch::try_new( + read_schema.clone(), + chunk.columns().to_vec(), + )?; let adapted_batch = schema_adapter.adapt_batch(batch, projection)?; @@ -528,6 +512,9 @@ fn read_partition( response_tx .blocking_send(proj_batch) .map_err(|x| DataFusionError::Execution(format!("{}", x)))?; + if limit.map(|l| total_rows >= l).unwrap_or(false) { + break 'outer; + } } Err(e) => { let err_msg =